Merge lp:~allenap/maas/rpc-client-now-avoiding-storm--2.1 into lp:maas/2.1

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: no longer in the source branch.
Merged at revision: 5590
Proposed branch: lp:~allenap/maas/rpc-client-now-avoiding-storm--2.1
Merge into: lp:maas/2.1
Diff against target: 139 lines (+48/-14)
2 files modified
src/provisioningserver/rpc/clusterservice.py (+19/-9)
src/provisioningserver/rpc/tests/test_clusterservice.py (+29/-5)
To merge this branch: bzr merge lp:~allenap/maas/rpc-client-now-avoiding-storm--2.1
Reviewer Review Type Date Requested Status
Gavin Panella (community) Approve
Review via email: mp+318362@code.launchpad.net

Commit message

Backport r5754 from trunk: Concurrent calls to getClientNow will not cause an update storm.

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

Self-reviewing backport.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/provisioningserver/rpc/clusterservice.py'
2--- src/provisioningserver/rpc/clusterservice.py 2017-02-14 18:24:19 +0000
3+++ src/provisioningserver/rpc/clusterservice.py 2017-02-27 14:32:44 +0000
4@@ -95,6 +95,7 @@
5 select_c_utf8_bytes_locale,
6 )
7 from provisioningserver.utils.twisted import (
8+ call,
9 callOut,
10 deferred,
11 DeferredValue,
12@@ -913,6 +914,13 @@
13 else:
14 twisted.web.client.URI = PatchedURI
15
16+ # When _doUpdate is called we capture it into _updateInProgress so
17+ # that concurrent calls can piggyback rather than initiating extra
18+ # calls. We start with an already-fired DeferredValue: _tryUpdate
19+ # checks if it is set to decide whether or not to call _doUpdate.
20+ self._updateInProgress = DeferredValue()
21+ self._updateInProgress.set(None)
22+
23 def startService(self):
24 self.time_started = self.clock.seconds()
25 super(ClusterClientService, self).startService()
26@@ -947,9 +955,7 @@
27 try:
28 return self.getClient()
29 except exceptions.NoConnectionsAvailable:
30- d = self._tryUpdate()
31- d.addCallback(lambda _: self.getClient())
32- return d
33+ return self._tryUpdate().addCallback(call, self.getClient)
34
35 def getAllClients(self):
36 """Return a list of all connected :class:`common.Client`s."""
37@@ -958,15 +964,19 @@
38 def _tryUpdate(self):
39 """Attempt to refresh outgoing connections.
40
41- This simply wraps self.update in a deferred to log errors and keep us
42- from dying if an exception is raised in update.
43+ This ensures that calls to `_doUpdate` are deferred, with errors
44+ logged but not propagated. It also ensures that `_doUpdate` is never
45+ called concurrently.
46 """
47- d = maybeDeferred(self.update)
48- d.addErrback(log.err, "Cluster client update failed.")
49- return d
50+ if self._updateInProgress.isSet:
51+ d = maybeDeferred(self._doUpdate).addErrback(
52+ log.err, "Cluster client update failed.")
53+ self._updateInProgress = DeferredValue()
54+ self._updateInProgress.capture(d)
55+ return self._updateInProgress.get()
56
57 @inlineCallbacks
58- def update(self):
59+ def _doUpdate(self):
60 """Refresh outgoing connections.
61
62 This obtains a list of endpoints from the region then connects
63
64=== modified file 'src/provisioningserver/rpc/tests/test_clusterservice.py'
65--- src/provisioningserver/rpc/tests/test_clusterservice.py 2017-02-14 18:24:19 +0000
66+++ src/provisioningserver/rpc/tests/test_clusterservice.py 2017-02-27 14:32:44 +0000
67@@ -131,6 +131,7 @@
68 reactor,
69 )
70 from twisted.internet.defer import (
71+ Deferred,
72 fail,
73 inlineCallbacks,
74 succeed,
75@@ -482,7 +483,7 @@
76 observed_rpc_info_url = ClusterClientService._get_rpc_info_url()
77 self.assertThat(observed_rpc_info_url, Equals(expected_rpc_info_url))
78
79- def test_update_connect_503_error_is_logged_tersely(self):
80+ def test__doUpdate_connect_503_error_is_logged_tersely(self):
81 getPage = self.patch(clusterservice, "getPage")
82 getPage.return_value = fail(web.error.Error("503"))
83
84@@ -505,13 +506,13 @@
85 logger = self.useFixture(TwistedLoggerFixture())
86
87 service = ClusterClientService(Clock())
88- update = self.patch(service, "update")
89- update.side_effect = error.ConnectionRefusedError()
90+ _doUpdate = self.patch(service, "_doUpdate")
91+ _doUpdate.side_effect = error.ConnectionRefusedError()
92
93 # Starting the service causes the first update to be performed, which
94 # will fail because of above.
95 service.startService()
96- self.assertThat(update, MockCalledOnceWith())
97+ self.assertThat(_doUpdate, MockCalledOnceWith())
98
99 dump = logger.dump()
100 self.assertIn('Connection was refused by other side.', dump)
101@@ -570,7 +571,7 @@
102 },
103 }).encode("ascii")
104
105- def test_update_calls__update_connections(self):
106+ def test__doUpdate_calls__update_connections(self):
107 maas_url = "http://localhost/%s/" % factory.make_name("path")
108 self.useFixture(ClusterConfigurationFixture(maas_url=maas_url))
109 self.patch_autospec(socket, 'getaddrinfo').return_value = (
110@@ -850,6 +851,29 @@
111 failure.value, exceptions.NoConnectionsAvailable))
112 return d
113
114+ def test__tryUpdate_prevents_concurrent_calls_to__doUpdate(self):
115+ service = ClusterClientService(Clock())
116+
117+ d_doUpdate_1, d_doUpdate_2 = Deferred(), Deferred()
118+ _doUpdate = self.patch(service, "_doUpdate")
119+ _doUpdate.side_effect = [d_doUpdate_1, d_doUpdate_2]
120+
121+ # Try updating a couple of times concurrently.
122+ d_tryUpdate_1 = service._tryUpdate()
123+ d_tryUpdate_2 = service._tryUpdate()
124+ # _doUpdate completes and returns `done`.
125+ d_doUpdate_1.callback(sentinel.done1)
126+ # Both _tryUpdate calls yield the same result.
127+ self.assertThat(extract_result(d_tryUpdate_1), Is(sentinel.done1))
128+ self.assertThat(extract_result(d_tryUpdate_2), Is(sentinel.done1))
129+ # _doUpdate was called only once.
130+ self.assertThat(_doUpdate, MockCalledOnceWith())
131+
132+ # The mechanism has reset and is ready to go again.
133+ d_tryUpdate_3 = service._tryUpdate()
134+ d_doUpdate_2.callback(sentinel.done2)
135+ self.assertThat(extract_result(d_tryUpdate_3), Is(sentinel.done2))
136+
137 def test_getAllClients(self):
138 service = ClusterClientService(Clock())
139 uuid1 = factory.make_UUID()

Subscribers

People subscribed via source and target branches

to all changes: