Merge ~cgrabowski/maas:fix_connections_dict_update_while_updating_connection_count into maas:master

Proposed by Christian Grabowski
Status: Merged
Approved by: Christian Grabowski
Approved revision: 68c26b36716e4010c0fc59ef5d12c971474b790f
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~cgrabowski/maas:fix_connections_dict_update_while_updating_connection_count
Merge into: maas:master
Diff against target: 84 lines (+56/-2)
2 files modified
src/maasserver/ipc.py (+1/-1)
src/maasserver/tests/test_ipc.py (+55/-1)
Reviewer Review Type Date Requested Status
Adam Collard (community) Approve
Björn Tillenius Needs Information
MAAS Lander Approve
Review via email: mp+429663@code.launchpad.net

Commit message

copy conns dict in rpc connection update to allow concurrent IPC calls

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b fix_connections_dict_update_while_updating_connection_count lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/580/consoleText
COMMIT: 72a63c7381dbbf757194e11e8f3a497497cd3fef

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b fix_connections_dict_update_while_updating_connection_count lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 68c26b36716e4010c0fc59ef5d12c971474b790f

review: Approve
Revision history for this message
Björn Tillenius (bjornt) wrote :

Your patch probably fixes the traceback. But I'd like to understand a bit what is actually modifying the dictionary?

I wonder if we have another problem here, and we're only fixing the symptoms. I did run the test you added, but that doesn't seem to trigger the problem.

review: Needs Information
Revision history for this message
Christian Grabowski (cgrabowski) wrote :

> Your patch probably fixes the traceback. But I'd like to understand a bit what
> is actually modifying the dictionary?
>
> I wonder if we have another problem here, and we're only fixing the symptoms.
> I did run the test you added, but that doesn't seem to trigger the problem.

So the test does pass without the fix _most_ of the time, but so what's happening is `registerWorkerRPC` is being called at the same time as `update`, the way I was able to reproduce this is having a region controller running for awhile, and then restarting the rack controller in a loop, so it has some (or at least previous) connectivity, and new connections are being added at the same time. This becomes more apparent when running with multiple threads and rpc workers, which is a bit difficult to express in the test. So by copying the dict, the original / main connections dict can receive the new connections, while updating the DB based on the copied dict.

Revision history for this message
Adam Collard (adam-collard) wrote :

LGTM

review: Approve
Revision history for this message
Björn Tillenius (bjornt) wrote :

ok. I think this is ok to merge, but I'd still like to understand the issue a bit more.

I think we have a thread issue here. We modify and read conn["rpc"]["connections"] in different threads. Ideally we shouldn't do this, but assuming that list.copy() is thread-safe, your patch should fix the traceback.

But are we processing all the connections properly when this issue occurs? Can you outline the execution workflow when update() and registerWorkerRPC() are called in parallel?

Revision history for this message
Christian Grabowski (cgrabowski) wrote :

Well registerWorkerRPC() doesn't have this issue, it's registerWorkerRPCConnection() that does, which might be splitting hairs, but the big difference is the issue occurred when adding a connection, which is what registerWorkerRPCConnection() is doing, in parallel to updating the DB on the connection state, which is what update() was doing when the stacktrace occurs. Adding RPC workers, which is what registerWorkerRPC() is doing, which update() also iterates over the top level dict that that would affect, but I believe since the workers register on startup, the top level dict doesn't change while update() is called.

Revision history for this message
Björn Tillenius (bjornt) wrote :

What calls .update()?

Revision history for this message
Christian Grabowski (cgrabowski) wrote :

In the __init__ of this IPCMasterService, there's a LoopingCall(self.update), so it calls update() itself in a loop.

Revision history for this message
Björn Tillenius (bjornt) wrote :

Ok. So I do think that we still have a problem. What can happen is that registerWorkerRPCConnection() registers a new connection in the database, but then update() works on the old "connections" dict and removes the connection from the database. Then after 60 seconds update() runs again with the new "connections" dict and adds it back.

Not sure if this causes any actual problems, though.

Revision history for this message
Christian Grabowski (cgrabowski) wrote :

Hmm I suppose that is possible, we could use a lock instead, but in order to address that particular issue, I believe we'd have to lock for the entire registerRPCWorkerConnection() and _updateConnection(), which might be fine, but it might be a problem if many connections come in at once.

Revision history for this message
Björn Tillenius (bjornt) wrote :

A lock would be one solution, but with Twisted there's another approach. You can basically move the copy of the dict to outside of of _update() and pass it to deferToDatabase(). That way you get an automatic lock for the connections dict, since both the copy and the modifications run in the main thread.

Revision history for this message
Adam Collard (adam-collard) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maasserver/ipc.py b/src/maasserver/ipc.py
2index 9fbb0ef..a199b2b 100644
3--- a/src/maasserver/ipc.py
4+++ b/src/maasserver/ipc.py
5@@ -563,7 +563,7 @@ class IPCMasterService(service.Service):
6 else:
7 # RPC is not running, no endpoints.
8 self._updateEndpoints(process, [])
9- self._updateConnections(process, conn["rpc"]["connections"])
10+ self._updateConnections(process, conn["rpc"]["connections"].copy())
11 previous_process_ids.discard(process.id)
12
13 # Delete all the old processes that are dead.
14diff --git a/src/maasserver/tests/test_ipc.py b/src/maasserver/tests/test_ipc.py
15index af1edba..78355bc 100644
16--- a/src/maasserver/tests/test_ipc.py
17+++ b/src/maasserver/tests/test_ipc.py
18@@ -13,7 +13,7 @@ import uuid
19 from fixtures import EnvironmentVariableFixture
20 from testtools.matchers import MatchesStructure
21 from twisted.internet import reactor
22-from twisted.internet.defer import inlineCallbacks, succeed
23+from twisted.internet.defer import DeferredList, inlineCallbacks, succeed
24
25 from maasserver import ipc, workers
26 from maasserver.enum import SERVICE_STATUS
27@@ -508,3 +508,57 @@ class TestIPCCommunication(MAASTransactionServerTestCase):
28 self.assertEqual(rpc_connections, [])
29
30 yield master.stopService()
31+
32+ @wait_for_reactor
33+ @inlineCallbacks
34+ def test_update_allows_new_connections_while_updating_connections_in_db(
35+ self,
36+ ):
37+ yield deferToDatabase(load_builtin_scripts)
38+ master = self.make_IPCMasterService()
39+ yield master.startService()
40+
41+ pid = random.randint(1, 512)
42+ port = random.randint(1, 512)
43+ ip = factory.make_ip_address()
44+ yield master.registerWorker(pid, MagicMock())
45+ yield master.registerWorkerRPC(pid, port)
46+
47+ rack_controller = yield deferToDatabase(factory.make_RackController)
48+
49+ for _ in range(2):
50+ master.registerWorkerRPCConnection(
51+ pid,
52+ random.randint(1, 512),
53+ rack_controller.system_id,
54+ ip,
55+ random.randint(1, 512),
56+ )
57+
58+ defers = DeferredList(
59+ [
60+ master.update(),
61+ master.registerWorkerRPCConnection(
62+ pid,
63+ random.randint(1, 512),
64+ rack_controller.system_id,
65+ ip,
66+ random.randint(1, 512),
67+ ),
68+ ]
69+ )
70+
71+ yield defers
72+
73+ def _get_conn_count():
74+ return RegionRackRPCConnection.objects.filter(
75+ rack_controller=rack_controller
76+ ).count()
77+
78+ count = yield deferToDatabase(_get_conn_count)
79+
80+ self.assertEqual(
81+ count, len(master.connections[pid]["rpc"]["connections"].values())
82+ )
83+
84+ yield master.stopService()

Subscribers

People subscribed via source and target branches