Merge lp:~facundo/ubuntuone-client/aq-better-waiting-structures into lp:ubuntuone-client

Proposed by Facundo Batista
Status: Merged
Approved by: Facundo Batista
Approved revision: 913
Merged at revision: 911
Proposed branch: lp:~facundo/ubuntuone-client/aq-better-waiting-structures
Merge into: lp:ubuntuone-client
Diff against target: 601 lines (+198/-141)
2 files modified
tests/syncdaemon/test_action_queue.py (+155/-102)
ubuntuone/syncdaemon/action_queue.py (+43/-39)
To merge this branch: bzr merge lp:~facundo/ubuntuone-client/aq-better-waiting-structures
Reviewer Review Type Date Requested Status
Facundo Batista (community) Approve
Lucio Torre (community) Approve
Review via email: mp+52557@code.launchpad.net

Commit message

Better waiting structures for 'conditions' and 'inactive queue' (LP: #720844)

Description of the change

Better waiting structures for 'conditions' and 'inactive queue'

Both types of waiting needs are very different, so there's an explanation
for each.

Wait for conditions, this rarely happen, so there is no problem to create a deferred for each command to lock in this case. The problem here was that on each "check_conditions" called (that happens frequently, and will happen more frequently after a bug is fixed in VM), *all* commands were called to check_conditions, and this could be very expensive when a lot of commands were queued.

Now, a waiting structure is created for this case. The command just locks through it, and the check conditions are called only through this self-locked commands.

Wait for active queue: this happened a lot (every time the client disconnected), and each time it happened, a deferred were created for each command queued, which could be very expensive memory-wise. Furthermore, when the queue was run again, all commands were sequentially called to unlock that deferred, which was expensive cpu-wise.

Now, a single deferred is used for all commands (that live in the queue). All commands wait for that deferred, and when the queue runs, it just triggers that deferred.

Tests included for everything.

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) :
review: Approve
Revision history for this message
Facundo Batista (facundo) wrote :

Approving with one review

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'tests/syncdaemon/test_action_queue.py'
--- tests/syncdaemon/test_action_queue.py 2011-03-07 16:54:25 +0000
+++ tests/syncdaemon/test_action_queue.py 2011-03-08 13:52:39 +0000
@@ -63,7 +63,7 @@
63 CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,63 CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,
64 TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList,64 TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList,
65 ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree,65 ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree,
66 InterruptibleDeferred, DeferredInterrupted,66 InterruptibleDeferred, DeferredInterrupted, ConditionsLocker,
67)67)
68from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS68from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
69from ubuntuone.syncdaemon.marker import MDMarker69from ubuntuone.syncdaemon.marker import MDMarker
@@ -105,7 +105,6 @@
105105
106 is_runnable = True106 is_runnable = True
107 paused = False107 paused = False
108 resumed = False
109 conditions_checked = False108 conditions_checked = False
110109
111 def __init__(self, share_id=None, node_id=None):110 def __init__(self, share_id=None, node_id=None):
@@ -120,10 +119,6 @@
120 """Mark as paused."""119 """Mark as paused."""
121 self.paused = True120 self.paused = True
122121
123 def resume(self):
124 """Mark as resumed."""
125 self.resumed = True
126
127 @property122 @property
128 def uniqueness(self):123 def uniqueness(self):
129 """Fake uniqueness."""124 """Fake uniqueness."""
@@ -136,10 +131,6 @@
136 """Cancel!"""131 """Cancel!"""
137 self.cancelled = True132 self.cancelled = True
138133
139 def check_conditions(self):
140 """Mark as checked."""
141 self.conditions_checked = True
142
143134
144class FakedEventQueue(EventQueue):135class FakedEventQueue(EventQueue):
145 """Faked event queue."""136 """Faked event queue."""
@@ -534,23 +525,20 @@
534 """RQ borns not active."""525 """RQ borns not active."""
535 self.assertFalse(self.rq.active)526 self.assertFalse(self.rq.active)
536527
528 def test_init_activedef(self):
529 """Just instanced queue has the deferred to take."""
530 self.assertTrue(isinstance(self.rq.active_deferred, defer.Deferred))
531
537 def test_run_goes_active(self):532 def test_run_goes_active(self):
538 """Activate on run."""533 """Activate on run."""
539 self.rq.run()534 self.rq.run()
540 self.assertTrue(self.rq.active)535 self.assertTrue(self.rq.active)
541536
542 def test_run_resume_commands(self):537 def test_run_triggers_activedef(self):
543 """Resume all queued command on run."""538 """Trigger the active_deferred on run."""
544 # set up539 assert not self.rq.active_deferred.called
545 cmd1 = FakeCommand()
546 cmd2 = FakeCommand()
547 self.rq.waiting.extend((cmd1, cmd2))
548 assert not cmd1.resumed and not cmd2.resumed
549
550 # run and check
551 self.rq.run()540 self.rq.run()
552 self.assertTrue(cmd1.resumed)541 self.assertTrue(self.rq.active_deferred.called)
553 self.assertTrue(cmd2.resumed)
554542
555 def test_stop_goes_inactive(self):543 def test_stop_goes_inactive(self):
556 """Desactivate on stop."""544 """Desactivate on stop."""
@@ -571,18 +559,24 @@
571 self.assertTrue(cmd1.paused)559 self.assertTrue(cmd1.paused)
572 self.assertTrue(cmd2.paused)560 self.assertTrue(cmd2.paused)
573561
574 def test_check_conditions(self):562 def test_stop_pause_useful_activedef(self):
575 """Check all conditions on the commands."""563 """Refresh the active_deferred before pausing."""
576 # set up564 checked = defer.Deferred()
577 cmd1 = FakeCommand()565
578 cmd2 = FakeCommand()566 def fake_pause():
579 self.rq.waiting.extend((cmd1, cmd2))567 """Check that RQ has a useful active_deferred."""
580 assert not cmd1.conditions_checked and not cmd2.conditions_checked568 self.assertTrue(isinstance(self.rq.active_deferred,
581569 defer.Deferred))
582 # check conditions and test570 self.assertFalse(self.rq.active_deferred.called)
583 self.rq.check_conditions()571 checked.callback(True)
584 self.assertTrue(cmd1.conditions_checked)572
585 self.assertTrue(cmd2.conditions_checked)573 cmd = FakeCommand()
574 cmd.pause = fake_pause
575 self.rq.waiting.append(cmd)
576
577 # stop and test
578 self.rq.stop()
579 return checked
586580
587 def test_unqueue_remove(self):581 def test_unqueue_remove(self):
588 """Remove the command from queue on unqueue."""582 """Remove the command from queue on unqueue."""
@@ -1781,11 +1775,14 @@
1781 # run first time1775 # run first time
1782 self.cmd.run()1776 self.cmd.run()
1783 self.assertFalse(called)1777 self.assertFalse(called)
1778 self.assertTrue(self.handler.check_debug(
1779 'not running because of inactive queue'))
1780 self.assertFalse(self.handler.check_debug('unblocked: queue active'))
17841781
1785 # active the queue1782 # active the queue
1786 self.rq.active = True1783 self.rq.run()
1787 self.cmd.resume()
1788 self.assertTrue(called)1784 self.assertTrue(called)
1785 self.assertTrue(self.handler.check_debug('unblocked: queue active'))
17891786
1790 def test_run_command_not_runnable(self):1787 def test_run_command_not_runnable(self):
1791 """Waiting cycle for command not runnable."""1788 """Waiting cycle for command not runnable."""
@@ -1800,11 +1797,15 @@
1800 # run first time1797 # run first time
1801 self.cmd.run()1798 self.cmd.run()
1802 self.assertFalse(called)1799 self.assertFalse(called)
1800 self.assertTrue(self.handler.check_debug(
1801 'not running because of conditions'))
1802 self.assertFalse(self.handler.check_debug('unblocked: conditions ok'))
18031803
1804 # active the command1804 # active the command
1805 self.cmd.is_runnable = True1805 self.cmd.is_runnable = True
1806 self.cmd.check_conditions()1806 self.action_queue.conditions_locker.check_conditions()
1807 self.assertTrue(called)1807 self.assertTrue(called)
1808 self.assertTrue(self.handler.check_debug('unblocked: conditions ok'))
18081809
1809 def test_run_notrunnable_inactivequeue(self):1810 def test_run_notrunnable_inactivequeue(self):
1810 """Mixed behaviour between both stoppers."""1811 """Mixed behaviour between both stoppers."""
@@ -1821,19 +1822,17 @@
18211822
1822 # active the queue but inactive the command1823 # active the queue but inactive the command
1823 self.cmd.is_runnable = False1824 self.cmd.is_runnable = False
1824 self.rq.active = True1825 self.rq.run()
1825 self.cmd.resume()
1826 self.assertFalse(called)1826 self.assertFalse(called)
18271827
1828 # active the command but inactive the queue again!1828 # active the command but inactive the queue again!
1829 self.rq.active = False1829 self.rq.stop()
1830 self.cmd.is_runnable = True1830 self.cmd.is_runnable = True
1831 self.cmd.check_conditions()1831 self.action_queue.conditions_locker.check_conditions()
1832 self.assertFalse(called)1832 self.assertFalse(called)
18331833
1834 # finally resume the queue1834 # finally resume the queue
1835 self.rq.active = True1835 self.rq.run()
1836 self.cmd.resume()
1837 self.assertTrue(called)1836 self.assertTrue(called)
18381837
1839 def test_run_inactivequeue_cancel(self):1838 def test_run_inactivequeue_cancel(self):
@@ -1851,8 +1850,7 @@
1851 self.cmd.cancel()1850 self.cmd.cancel()
18521851
1853 # active the queue1852 # active the queue
1854 self.rq.active = True1853 self.rq.run()
1855 self.cmd.resume()
1856 self.assertFalse(called)1854 self.assertFalse(called)
1857 self.assertTrue(self.handler.check_debug(1855 self.assertTrue(self.handler.check_debug(
1858 'cancelled before trying to run'))1856 'cancelled before trying to run'))
@@ -1873,7 +1871,7 @@
18731871
1874 # active the command1872 # active the command
1875 self.cmd.is_runnable = True1873 self.cmd.is_runnable = True
1876 self.cmd.check_conditions()1874 self.action_queue.conditions_locker.check_conditions()
1877 self.assertFalse(called)1875 self.assertFalse(called)
1878 self.handler.debug = True1876 self.handler.debug = True
1879 self.assertTrue(self.handler.check_debug(1877 self.assertTrue(self.handler.check_debug(
@@ -1967,29 +1965,23 @@
1967 called = []1965 called = []
1968 self.cmd.finish = lambda: called.append(True)1966 self.cmd.finish = lambda: called.append(True)
1969 self.cmd.markers_resolved_deferred = defer.succeed(True)1967 self.cmd.markers_resolved_deferred = defer.succeed(True)
1968 self.rq.waiting.append(self.cmd)
1970 assert self.rq.active1969 assert self.rq.active
19711970
1972 # deferreds, first one stucks, the second allows to continue1971 # deferreds, first one stucks, the second allows to continue
1973 deferreds = [defer.Deferred(), defer.succeed(True)]1972 deferreds = [defer.Deferred(), defer.succeed(True)]
19741973 self.cmd._run = lambda: deferreds.pop(0)
1975 def fake_run():
1976 """Set the queue inactive to avoid retry loop and fail."""
1977 self.rq.active = False
1978 return deferreds.pop(0)
1979
1980 # set up and test
1981 self.cmd._run = fake_run
19821974
1983 # run and check finish was not called1975 # run and check finish was not called
1984 self.cmd.run()1976 self.cmd.run()
1985 self.assertFalse(called)1977 self.assertFalse(called)
19861978
1987 # pause, still nothing called1979 # pause, still nothing called
1988 self.cmd.pause()1980 self.rq.stop()
1981 self.assertFalse(called)
19891982
1990 # resume, now it finished!1983 # resume, now it finished!
1991 self.rq.active = True1984 self.rq.run()
1992 self.cmd.resume()
1993 self.assertTrue(called)1985 self.assertTrue(called)
19941986
1995 @defer.inlineCallbacks1987 @defer.inlineCallbacks
@@ -2050,36 +2042,6 @@
2050 self.assertTrue(self.handler.check_debug("pausing"))2042 self.assertTrue(self.handler.check_debug("pausing"))
2051 self.assertTrue(called)2043 self.assertTrue(called)
20522044
2053 def test_resume(self):
2054 """Trigger the deferred only if there."""
2055 # nothing called when no deferred
2056 assert self.cmd.wait_for_queue is None
2057 self.cmd.resume()
2058 self.assertFalse(self.handler.check_debug('resuming'))
2059
2060 # the deferred is triggered if there
2061 d = defer.Deferred()
2062 self.cmd.wait_for_queue = d
2063 self.cmd.resume()
2064 self.assertIdentical(self.cmd.wait_for_queue, None)
2065 self.assertTrue(d.called)
2066 self.assertTrue(self.handler.check_debug('resuming'))
2067
2068 def test_check_conditions(self):
2069 """Trigger the deferred only if there."""
2070 # nothing called when no deferred
2071 assert self.cmd.wait_for_conditions is None
2072 self.cmd.check_conditions()
2073 self.assertFalse(self.handler.check_debug('unblocking conditions'))
2074
2075 # the deferred is triggered if there
2076 d = defer.Deferred()
2077 self.cmd.wait_for_conditions = d
2078 self.cmd.check_conditions()
2079 self.assertIdentical(self.cmd.wait_for_conditions, None)
2080 self.assertTrue(d.called)
2081 self.assertTrue(self.handler.check_debug('unblocking conditions'))
2082
2083 def test_cancel_works(self):2045 def test_cancel_works(self):
2084 """Do default cleaning."""2046 """Do default cleaning."""
2085 called = []2047 called = []
@@ -2093,18 +2055,9 @@
2093 self.assertTrue(self.handler.check_debug('cancelled'))2055 self.assertTrue(self.handler.check_debug('cancelled'))
20942056
2095 def test_cancel_releases_conditions(self):2057 def test_cancel_releases_conditions(self):
2096 """Cancel unlocks the conditions deferred."""2058 """Cancel calls the conditions locker for the command."""
2097 self.cmd.finish = lambda: None # don't try to unqueue!2059 self.cmd.finish = lambda: None # don't try to unqueue!
2098 d = defer.Deferred()2060 d = self.action_queue.conditions_locker.get_lock(self.cmd)
2099 self.cmd.wait_for_conditions = d
2100 self.cmd.cancel()
2101 self.assertTrue(d.called)
2102
2103 def test_cancel_releases_queue(self):
2104 """Cancel unlocks the wait-for-queue deferred."""
2105 self.cmd.finish = lambda: None # don't try to unqueue!
2106 d = defer.Deferred()
2107 self.cmd.wait_for_queue = d
2108 self.cmd.cancel()2061 self.cmd.cancel()
2109 self.assertTrue(d.called)2062 self.assertTrue(d.called)
21102063
@@ -4896,7 +4849,7 @@
48964849
4897 # fix conditions and check them4850 # fix conditions and check them
4898 self.cmd.is_runnable = True4851 self.cmd.is_runnable = True
4899 self.queue.check_conditions()4852 self.action_queue.conditions_locker.check_conditions()
49004853
4901 # all check4854 # all check
4902 self.assertEqual(called, ['run', 'finish'])4855 self.assertEqual(called, ['run', 'finish'])
@@ -4920,7 +4873,7 @@
4920 self.cmd.go()4873 self.cmd.go()
49214874
4922 # before the command finishes, all conditions are checked4875 # before the command finishes, all conditions are checked
4923 self.queue.check_conditions()4876 self.action_queue.conditions_locker.check_conditions()
49244877
4925 # command finished4878 # command finished
4926 d.callback(2)4879 d.callback(2)
@@ -5051,7 +5004,7 @@
50515004
5052 # fix conditions5005 # fix conditions
5053 self.cmd.is_runnable = True5006 self.cmd.is_runnable = True
5054 self.queue.check_conditions()5007 self.action_queue.conditions_locker.check_conditions()
50555008
5056 # need to wait the callLater5009 # need to wait the callLater
5057 yield finished5010 yield finished
@@ -5127,7 +5080,7 @@
5127 def test_cancel_while_waiting_queue(self):5080 def test_cancel_while_waiting_queue(self):
5128 """Cancel the command while waiting for queue."""5081 """Cancel the command while waiting for queue."""
5129 # stop the queue, and fake the pathlock to test releasing5082 # stop the queue, and fake the pathlock to test releasing
5130 self.queue.active = False5083 self.queue.stop()
5131 released = []5084 released = []
5132 self.cmd._acquire_pathlock = lambda: defer.succeed(5085 self.cmd._acquire_pathlock = lambda: defer.succeed(
5133 lambda: released.append(True))5086 lambda: released.append(True))
@@ -5137,6 +5090,10 @@
5137 self.cmd.go()5090 self.cmd.go()
5138 self.cmd.cancel()5091 self.cmd.cancel()
51395092
5093 # now, set the queue active again, it should release everything
5094 # even if was cancelled before
5095 self.queue.run()
5096
5140 # all check5097 # all check
5141 self._check_finished_ok()5098 self._check_finished_ok()
5142 self.assertTrue(released)5099 self.assertTrue(released)
@@ -5311,3 +5268,99 @@
53115268
5312 # further callback to original deferred is harmless5269 # further callback to original deferred is harmless
5313 origdef.errback(ValueError('foo'))5270 origdef.errback(ValueError('foo'))
5271
5272
5273class ConditionsLockerTests(TwistedTestCase):
5274 """Test the ConditionsLocker."""
5275
5276 def setUp(self):
5277 """Set up."""
5278 self.cl = ConditionsLocker()
5279
5280 def test_get_locking_deferred_returns_deferred(self):
5281 """The locking is done by a deferred."""
5282 d = self.cl.get_lock('command')
5283 d.callback(True)
5284 return d
5285
5286 def test_get_locking_different_commands_different_deferreds(self):
5287 """Asked by two commands, get two deferreds."""
5288 d1 = self.cl.get_lock('command1')
5289 d2 = self.cl.get_lock('command2')
5290 self.assertNotIdentical(d1, d2)
5291
5292 def test_get_locking_same_command_same_deferred(self):
5293 """If asked twice by the same command, return the same deferred.
5294
5295 This is more a safe guard than a feature; if misused by the same
5296 command we're assuring than we will not overwrite a second deferred
5297 over the first one (so, never releasing the first one).
5298 """
5299 d1 = self.cl.get_lock('command')
5300 d2 = self.cl.get_lock('command')
5301 self.assertIdentical(d1, d2)
5302
5303 def test_check_conditions_simple_runnable(self):
5304 """Release the command."""
5305 cmd = FakeCommand()
5306 locking_d = self.cl.get_lock(cmd)
5307 self.assertFalse(locking_d.called)
5308 self.assertIn(cmd, self.cl.locked)
5309
5310 # release it!
5311 assert cmd.is_runnable
5312 self.cl.check_conditions()
5313 self.assertTrue(locking_d.called)
5314 self.assertNotIn(cmd, self.cl.locked)
5315
5316 def test_check_conditions_simple_notrunnable_then_ok(self):
5317 """First don't release the command, then release it."""
5318 cmd = FakeCommand()
5319 locking_d = self.cl.get_lock(cmd)
5320 self.assertFalse(locking_d.called)
5321
5322 # check for conditions, do not release
5323 cmd.is_runnable = False
5324 self.cl.check_conditions()
5325 self.assertFalse(locking_d.called)
5326
5327 # conditions are ok now, release
5328 cmd.is_runnable = True
5329 self.cl.check_conditions()
5330 self.assertTrue(locking_d.called)
5331
5332 def test_check_conditions_mixed(self):
5333 """Several commands, mixed situation."""
5334 cmd1 = FakeCommand()
5335 cmd1.is_runnable = False
5336 cmd2 = FakeCommand()
5337 assert cmd2.is_runnable
5338
5339 # get lock for both, and check conditions
5340 locking_d1 = self.cl.get_lock(cmd1)
5341 locking_d2 = self.cl.get_lock(cmd2)
5342 self.cl.check_conditions()
5343
5344 # one should be released, the other should not
5345 self.assertFalse(locking_d1.called)
5346 self.assertTrue(locking_d2.called)
5347
5348 def test_cancel_command_nothold(self):
5349 """It's ok to cancel a command not there."""
5350 self.cl.cancel_command('command')
5351
5352 def test_cancel_releases_cancelled_command(self):
5353 """It releases the cancelled command, even not runnable."""
5354 cmd1 = FakeCommand()
5355 cmd1.is_runnable = False
5356 cmd2 = FakeCommand()
5357 assert cmd2.is_runnable
5358
5359 # get lock for both, and cancel only 1
5360 locking_d1 = self.cl.get_lock(cmd1)
5361 locking_d2 = self.cl.get_lock(cmd2)
5362 self.cl.cancel_command(cmd1)
5363
5364 # 1 should be released, 2 should not (even with conditions ok)
5365 self.assertTrue(locking_d1.called)
5366 self.assertFalse(locking_d2.called)
53145367
=== modified file 'ubuntuone/syncdaemon/action_queue.py'
--- ubuntuone/syncdaemon/action_queue.py 2011-03-07 16:54:25 +0000
+++ ubuntuone/syncdaemon/action_queue.py 2011-03-08 13:52:39 +0000
@@ -455,6 +455,7 @@
455 self.hashed_waiting = {}455 self.hashed_waiting = {}
456 self.active = False456 self.active = False
457 self.transfers_semaphore = defer.DeferredSemaphore(SIMULT_TRANSFERS)457 self.transfers_semaphore = defer.DeferredSemaphore(SIMULT_TRANSFERS)
458 self.active_deferred = defer.Deferred()
458459
459 def __len__(self):460 def __len__(self):
460 """Return the length of the waiting queue."""461 """Return the length of the waiting queue."""
@@ -488,20 +489,15 @@
488 if len(self.waiting) == 0:489 if len(self.waiting) == 0:
489 self.action_queue.event_queue.push('SYS_QUEUE_DONE')490 self.action_queue.event_queue.push('SYS_QUEUE_DONE')
490491
491 def check_conditions(self):
492 """Check conditions on which the commands may be waiting."""
493 for command in self.waiting[:]:
494 command.check_conditions()
495
496 def run(self):492 def run(self):
497 """Go active and run all commands in the queue."""493 """Go active and run all commands in the queue."""
498 self.active = True494 self.active = True
499 for command in self.waiting[:]:495 self.active_deferred.callback(True)
500 command.resume()
501496
502 def stop(self):497 def stop(self):
503 """Stop the pool and cleanup the running commands."""498 """Stop the pool and cleanup the running commands."""
504 self.active = False499 self.active = False
500 self.active_deferred = defer.Deferred()
505 for command in self.waiting:501 for command in self.waiting:
506 command.pause()502 command.pause()
507503
@@ -557,6 +553,36 @@
557 d.errback(failure)553 d.errback(failure)
558554
559555
556class ConditionsLocker(object):
557 """Structure to hold commands waiting because of conditions.
558
559 On each call to lock it will return a deferred for the received
560 command. When check_conditions is called, it will trigger each
561 command deferred if it's runnable.
562 """
563 def __init__(self):
564 self.locked = {}
565
566 def get_lock(self, command):
567 """Return the deferred that will lock the command."""
568 if command not in self.locked:
569 self.locked[command] = defer.Deferred()
570 return self.locked[command]
571
572 def check_conditions(self):
573 """Check for all commands' conditions, and release accordingly."""
574 for cmd in self.locked.keys():
575 if cmd.is_runnable:
576 deferred = self.locked.pop(cmd)
577 deferred.callback(True)
578
579 def cancel_command(self, command):
580 """The command was cancelled, if lock hold, release it and clean."""
581 if command in self.locked:
582 deferred = self.locked.pop(command)
583 deferred.callback(True)
584
585
560class UploadProgressWrapper(object):586class UploadProgressWrapper(object):
561 """A wrapper around the file-like object used for Uploads.587 """A wrapper around the file-like object used for Uploads.
562588
@@ -630,13 +656,14 @@
630 self.pathlock = PathLockingTree()656 self.pathlock = PathLockingTree()
631 self.uuid_map = DeferredMap()657 self.uuid_map = DeferredMap()
632 self.zip_queue = ZipQueue()658 self.zip_queue = ZipQueue()
659 self.conditions_locker = ConditionsLocker()
633660
634 self.estimated_free_space = {}661 self.estimated_free_space = {}
635 event_queue.subscribe(self)662 event_queue.subscribe(self)
636663
637 def check_conditions(self):664 def check_conditions(self):
638 """Poll conditions on which running actions may be waiting."""665 """Check conditions in the locker, to release all the waiting ops."""
639 self.queue.check_conditions()666 self.conditions_locker.check_conditions()
640667
641 def have_sufficient_space_for_upload(self, share_id, upload_size):668 def have_sufficient_space_for_upload(self, share_id, upload_size):
642 """Returns True if we have sufficient space for the given upload."""669 """Returns True if we have sufficient space for the given upload."""
@@ -1099,7 +1126,7 @@
10991126
1100 __slots__ = ('_queue', 'running', 'pathlock_release', 'log',1127 __slots__ = ('_queue', 'running', 'pathlock_release', 'log',
1101 'markers_resolved_deferred', 'action_queue', 'cancelled',1128 'markers_resolved_deferred', 'action_queue', 'cancelled',
1102 'wait_for_queue', 'wait_for_conditions', 'running_deferred')1129 'running_deferred')
11031130
1104 def __init__(self, request_queue):1131 def __init__(self, request_queue):
1105 """Initialize a command instance."""1132 """Initialize a command instance."""
@@ -1110,9 +1137,6 @@
1110 self.markers_resolved_deferred = defer.Deferred()1137 self.markers_resolved_deferred = defer.Deferred()
1111 self.pathlock_release = None1138 self.pathlock_release = None
1112 self.cancelled = False1139 self.cancelled = False
1113
1114 self.wait_for_queue = None
1115 self.wait_for_conditions = None
1116 self.running_deferred = None1140 self.running_deferred = None
11171141
1118 def to_dict(self):1142 def to_dict(self):
@@ -1196,20 +1220,6 @@
1196 self.running_deferred.interrupt()1220 self.running_deferred.interrupt()
1197 self.cleanup()1221 self.cleanup()
11981222
1199 def resume(self):
1200 """Unlock the command because the queue is back alive."""
1201 if self.wait_for_queue is not None:
1202 self.log.debug('resuming')
1203 self.wait_for_queue.callback(True)
1204 self.wait_for_queue = None
1205
1206 def check_conditions(self):
1207 """If conditions are ok, run the command again."""
1208 if self.is_runnable and self.wait_for_conditions is not None:
1209 self.log.debug('unblocking conditions')
1210 self.wait_for_conditions.callback(True)
1211 self.wait_for_conditions = None
1212
1213 @defer.inlineCallbacks1223 @defer.inlineCallbacks
1214 def go(self):1224 def go(self):
1215 """Execute all the steps for a command."""1225 """Execute all the steps for a command."""
@@ -1261,14 +1271,14 @@
1261 # if queue not active, wait for it and check again1271 # if queue not active, wait for it and check again
1262 if not self._queue.active:1272 if not self._queue.active:
1263 self.log.debug('not running because of inactive queue')1273 self.log.debug('not running because of inactive queue')
1264 self.wait_for_queue = defer.Deferred()1274 yield self._queue.active_deferred
1265 yield self.wait_for_queue1275 self.log.debug('unblocked: queue active')
1266 continue1276 continue
12671277
1268 if not self.is_runnable:1278 if not self.is_runnable:
1269 self.log.debug('not running because of conditions')1279 self.log.debug('not running because of conditions')
1270 self.wait_for_conditions = defer.Deferred()1280 yield self.action_queue.conditions_locker.get_lock(self)
1271 yield self.wait_for_conditions1281 self.log.debug('unblocked: conditions ok')
1272 continue1282 continue
12731283
1274 try:1284 try:
@@ -1313,8 +1323,7 @@
1313 def cancel(self):1323 def cancel(self):
1314 """Cancel the command.1324 """Cancel the command.
13151325
1316 Also trigger the wait_for_condition and wait_for_queue deferreds, to1326 Also cancel the command in the conditions locker.
1317 unlock the command and finally release the pathlock.
13181327
1319 Do nothing if already cancelled (as cancellation can come from other1328 Do nothing if already cancelled (as cancellation can come from other
1320 thread, it can come at any time, so we need to support double1329 thread, it can come at any time, so we need to support double
@@ -1327,12 +1336,7 @@
13271336
1328 self.cancelled = True1337 self.cancelled = True
1329 self.log.debug('cancelled')1338 self.log.debug('cancelled')
1330 if self.wait_for_conditions is not None:1339 self.action_queue.conditions_locker.cancel_command(self)
1331 self.wait_for_conditions.callback(True)
1332 self.wait_for_conditions = None
1333 if self.wait_for_queue is not None:
1334 self.wait_for_queue.callback(True)
1335 self.wait_for_queue = None
1336 self.cleanup()1340 self.cleanup()
1337 self.finish()1341 self.finish()
1338 return True1342 return True

Subscribers

People subscribed via source and target branches