I've looked at the code, and unless I read the code too quickly (which is honestly quite a possibility given the amount of snow and travelling plans clogging in my head), I think we are having a race condition:
(code at [1]:)
while not self._queue.empty():
update = self._queue.get()
yield update
The race is:
- thread A entering the while because queue is not empty
- (thread A hasn't done queue.get() yet)
- thread B entering the while because queue is not empty
- thread B doing queue.get()
- thread A doing queue.get() -> but perhaps now the queue is empty
I find that the design of the multi-threading here could perhaps be simpler: instead of looping on the creation of short-lived threads that compete on emptying the queue, then are recreated by the while loop at [2], I wonder if it wouldn't be better to have long-lived threads that block on a queue.get(True) (True means blocking). Doing a blocking queue.get() solves the race condition above, and since the threads don't die shortly after stopping to work, they don't need to be recreated by a spawn_n inside a loop.
Given than I'm no concurrency issue expert, I'm might as well have missed something important.
I've looked at the code, and unless I read the code too quickly (which is honestly quite a possibility given the amount of snow and travelling plans clogging in my head), I think we are having a race condition:
(code at [1]:)
while not self._queue. empty() :
update = self._queue.get()
yield update
The race is:
- thread A entering the while because queue is not empty
- (thread A hasn't done queue.get() yet)
- thread B entering the while because queue is not empty
- thread B doing queue.get()
- thread A doing queue.get() -> but perhaps now the queue is empty
I find that the design of the multi-threading here could perhaps be simpler: instead of looping on the creation of short-lived threads that compete on emptying the queue, then are recreated by the while loop at [2], I wonder if it wouldn't be better to have long-lived threads that block on a queue.get(True) (True means blocking). Doing a blocking queue.get() solves the race condition above, and since the threads don't die shortly after stopping to work, they don't need to be recreated by a spawn_n inside a loop.
Given than I'm no concurrency issue expert, I'm might as well have missed something important.
[1] https:/ /github. com/openstack/ neutron/ blob/master/ neutron/ agent/linux/ ip_conntrack. py#L54- L56 /github. com/openstack/ neutron/ blob/master/ neutron/ agent/linux/ ip_conntrack. py#L94
[2] https:/