Merge ~cgrabowski/maas:fix_connections_dict_update_while_updating_connection_count into maas:master

Proposed by Christian Grabowski
Status: Merged
Approved by: Christian Grabowski
Approved revision: 68c26b36716e4010c0fc59ef5d12c971474b790f
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~cgrabowski/maas:fix_connections_dict_update_while_updating_connection_count
Merge into: maas:master
Diff against target: 84 lines (+56/-2)
2 files modified
src/maasserver/ipc.py (+1/-1)
src/maasserver/tests/test_ipc.py (+55/-1)
Reviewer Review Type Date Requested Status
Adam Collard (community) Approve
Björn Tillenius Needs Information
MAAS Lander Approve
Review via email: mp+429663@code.launchpad.net

Commit message

copy conns dict in rpc connection update to allow concurrent IPC calls

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b fix_connections_dict_update_while_updating_connection_count lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/580/consoleText
COMMIT: 72a63c7381dbbf757194e11e8f3a497497cd3fef

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b fix_connections_dict_update_while_updating_connection_count lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 68c26b36716e4010c0fc59ef5d12c971474b790f

review: Approve
Revision history for this message
Björn Tillenius (bjornt) wrote :

Your patch probably fixes the traceback. But I'd like to understand a bit what is actually modifying the dictionary?

I wonder if we have another problem here, and we're only fixing the symptoms. I did run the test you added, but that doesn't seem to trigger the problem.

review: Needs Information
Revision history for this message
Christian Grabowski (cgrabowski) wrote :

> Your patch probably fixes the traceback. But I'd like to understand a bit what
> is actually modifying the dictionary?
>
> I wonder if we have another problem here, and we're only fixing the symptoms.
> I did run the test you added, but that doesn't seem to trigger the problem.

So the test does pass without the fix _most_ of the time, but so what's happening is `registerWorkerRPC` is being called at the same time as `update`, the way I was able to reproduce this is having a region controller running for awhile, and then restarting the rack controller in a loop, so it has some (or at least previous) connectivity, and new connections are being added at the same time. This becomes more apparent when running with multiple threads and rpc workers, which is a bit difficult to express in the test. So by copying the dict, the original / main connections dict can receive the new connections, while updating the DB based on the copied dict.

Revision history for this message
Adam Collard (adam-collard) wrote :

LGTM

review: Approve
Revision history for this message
Björn Tillenius (bjornt) wrote :

ok. I think this is ok to merge, but I'd still like to understand the issue a bit more.

I think we have a thread issue here. We modify and read conn["rpc"]["connections"] in different threads. Ideally we shouldn't do this, but assuming that list.copy() is thread-safe, your patch should fix the traceback.

But are we processing all the connections properly when this issue occurs? Can you outline the execution workflow when update() and registerWorkerRPC() are called in parallel?

Revision history for this message
Christian Grabowski (cgrabowski) wrote :

Well registerWorkerRPC() doesn't have this issue, it's registerWorkerRPCConnection() that does, which might be splitting hairs, but the big difference is the issue occurred when adding a connection, which is what registerWorkerRPCConnection() is doing, in parallel to updating the DB on the connection state, which is what update() was doing when the stacktrace occurs. Adding RPC workers, which is what registerWorkerRPC() is doing, which update() also iterates over the top level dict that that would affect, but I believe since the workers register on startup, the top level dict doesn't change while update() is called.

Revision history for this message
Björn Tillenius (bjornt) wrote :

What calls .update()?

Revision history for this message
Christian Grabowski (cgrabowski) wrote :

In the __init__ of this IPCMasterService, there's a LoopingCall(self.update), so it calls update() itself in a loop.

Revision history for this message
Björn Tillenius (bjornt) wrote :

Ok. So I do think that we still have a problem. What can happen is that registerWorkerRPCConnection() registers a new connection in the database, but then update() works on the old "connections" dict and removes the connection from the database. Then after 60 seconds update() runs again with the new "connections" dict and adds it back.

Not sure if this causes any actual problems, though.

Revision history for this message
Christian Grabowski (cgrabowski) wrote :

Hmm I suppose that is possible, we could use a lock instead, but in order to address that particular issue, I believe we'd have to lock for the entire registerRPCWorkerConnection() and _updateConnection(), which might be fine, but it might be a problem if many connections come in at once.

Revision history for this message
Björn Tillenius (bjornt) wrote :

A lock would be one solution, but with Twisted there's another approach. You can basically move the copy of the dict to outside of of _update() and pass it to deferToDatabase(). That way you get an automatic lock for the connections dict, since both the copy and the modifications run in the main thread.

Revision history for this message
Adam Collard (adam-collard) wrote :

Update scan failed

At least one of the branches involved have failed to scan. You can manually schedule a rescan if required.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/src/maasserver/ipc.py b/src/maasserver/ipc.py
index 9fbb0ef..a199b2b 100644
--- a/src/maasserver/ipc.py
+++ b/src/maasserver/ipc.py
@@ -563,7 +563,7 @@ class IPCMasterService(service.Service):
563 else:563 else:
564 # RPC is not running, no endpoints.564 # RPC is not running, no endpoints.
565 self._updateEndpoints(process, [])565 self._updateEndpoints(process, [])
566 self._updateConnections(process, conn["rpc"]["connections"])566 self._updateConnections(process, conn["rpc"]["connections"].copy())
567 previous_process_ids.discard(process.id)567 previous_process_ids.discard(process.id)
568568
569 # Delete all the old processes that are dead.569 # Delete all the old processes that are dead.
diff --git a/src/maasserver/tests/test_ipc.py b/src/maasserver/tests/test_ipc.py
index af1edba..78355bc 100644
--- a/src/maasserver/tests/test_ipc.py
+++ b/src/maasserver/tests/test_ipc.py
@@ -13,7 +13,7 @@ import uuid
13from fixtures import EnvironmentVariableFixture13from fixtures import EnvironmentVariableFixture
14from testtools.matchers import MatchesStructure14from testtools.matchers import MatchesStructure
15from twisted.internet import reactor15from twisted.internet import reactor
16from twisted.internet.defer import inlineCallbacks, succeed16from twisted.internet.defer import DeferredList, inlineCallbacks, succeed
1717
18from maasserver import ipc, workers18from maasserver import ipc, workers
19from maasserver.enum import SERVICE_STATUS19from maasserver.enum import SERVICE_STATUS
@@ -508,3 +508,57 @@ class TestIPCCommunication(MAASTransactionServerTestCase):
508 self.assertEqual(rpc_connections, [])508 self.assertEqual(rpc_connections, [])
509509
510 yield master.stopService()510 yield master.stopService()
511
512 @wait_for_reactor
513 @inlineCallbacks
514 def test_update_allows_new_connections_while_updating_connections_in_db(
515 self,
516 ):
517 yield deferToDatabase(load_builtin_scripts)
518 master = self.make_IPCMasterService()
519 yield master.startService()
520
521 pid = random.randint(1, 512)
522 port = random.randint(1, 512)
523 ip = factory.make_ip_address()
524 yield master.registerWorker(pid, MagicMock())
525 yield master.registerWorkerRPC(pid, port)
526
527 rack_controller = yield deferToDatabase(factory.make_RackController)
528
529 for _ in range(2):
530 master.registerWorkerRPCConnection(
531 pid,
532 random.randint(1, 512),
533 rack_controller.system_id,
534 ip,
535 random.randint(1, 512),
536 )
537
538 defers = DeferredList(
539 [
540 master.update(),
541 master.registerWorkerRPCConnection(
542 pid,
543 random.randint(1, 512),
544 rack_controller.system_id,
545 ip,
546 random.randint(1, 512),
547 ),
548 ]
549 )
550
551 yield defers
552
553 def _get_conn_count():
554 return RegionRackRPCConnection.objects.filter(
555 rack_controller=rack_controller
556 ).count()
557
558 count = yield deferToDatabase(_get_conn_count)
559
560 self.assertEqual(
561 count, len(master.connections[pid]["rpc"]["connections"].values())
562 )
563
564 yield master.stopService()

Subscribers

People subscribed via source and target branches