Merge lp:~facundo/ubuntuone-client/aq-better-waiting-structures into lp:ubuntuone-client

Proposed by Facundo Batista
Status: Merged
Approved by: Facundo Batista
Approved revision: 913
Merged at revision: 911
Proposed branch: lp:~facundo/ubuntuone-client/aq-better-waiting-structures
Merge into: lp:ubuntuone-client
Diff against target: 601 lines (+198/-141)
2 files modified
tests/syncdaemon/test_action_queue.py (+155/-102)
ubuntuone/syncdaemon/action_queue.py (+43/-39)
To merge this branch: bzr merge lp:~facundo/ubuntuone-client/aq-better-waiting-structures
Reviewer Review Type Date Requested Status
Facundo Batista (community) Approve
Lucio Torre (community) Approve
Review via email: mp+52557@code.launchpad.net

Commit message

Better waiting structures for 'conditions' and 'inactive queue' (LP: #720844)

Description of the change

Better waiting structures for 'conditions' and 'inactive queue'

Both types of waiting needs are very different, so there's an explanation
for each.

Wait for conditions, this rarely happen, so there is no problem to create a deferred for each command to lock in this case. The problem here was that on each "check_conditions" called (that happens frequently, and will happen more frequently after a bug is fixed in VM), *all* commands were called to check_conditions, and this could be very expensive when a lot of commands were queued.

Now, a waiting structure is created for this case. The command just locks through it, and the check conditions are called only through this self-locked commands.

Wait for active queue: this happened a lot (every time the client disconnected), and each time it happened, a deferred were created for each command queued, which could be very expensive memory-wise. Furthermore, when the queue was run again, all commands were sequentially called to unlock that deferred, which was expensive cpu-wise.

Now, a single deferred is used for all commands (that live in the queue). All commands wait for that deferred, and when the queue runs, it just triggers that deferred.

Tests included for everything.

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) :
review: Approve
Revision history for this message
Facundo Batista (facundo) wrote :

Approving with one review

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_action_queue.py'
2--- tests/syncdaemon/test_action_queue.py 2011-03-07 16:54:25 +0000
3+++ tests/syncdaemon/test_action_queue.py 2011-03-08 13:52:39 +0000
4@@ -63,7 +63,7 @@
5 CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,
6 TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList,
7 ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree,
8- InterruptibleDeferred, DeferredInterrupted,
9+ InterruptibleDeferred, DeferredInterrupted, ConditionsLocker,
10 )
11 from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
12 from ubuntuone.syncdaemon.marker import MDMarker
13@@ -105,7 +105,6 @@
14
15 is_runnable = True
16 paused = False
17- resumed = False
18 conditions_checked = False
19
20 def __init__(self, share_id=None, node_id=None):
21@@ -120,10 +119,6 @@
22 """Mark as paused."""
23 self.paused = True
24
25- def resume(self):
26- """Mark as resumed."""
27- self.resumed = True
28-
29 @property
30 def uniqueness(self):
31 """Fake uniqueness."""
32@@ -136,10 +131,6 @@
33 """Cancel!"""
34 self.cancelled = True
35
36- def check_conditions(self):
37- """Mark as checked."""
38- self.conditions_checked = True
39-
40
41 class FakedEventQueue(EventQueue):
42 """Faked event queue."""
43@@ -534,23 +525,20 @@
44 """RQ borns not active."""
45 self.assertFalse(self.rq.active)
46
47+ def test_init_activedef(self):
48+ """Just instanced queue has the deferred to take."""
49+ self.assertTrue(isinstance(self.rq.active_deferred, defer.Deferred))
50+
51 def test_run_goes_active(self):
52 """Activate on run."""
53 self.rq.run()
54 self.assertTrue(self.rq.active)
55
56- def test_run_resume_commands(self):
57- """Resume all queued command on run."""
58- # set up
59- cmd1 = FakeCommand()
60- cmd2 = FakeCommand()
61- self.rq.waiting.extend((cmd1, cmd2))
62- assert not cmd1.resumed and not cmd2.resumed
63-
64- # run and check
65+ def test_run_triggers_activedef(self):
66+ """Trigger the active_deferred on run."""
67+ assert not self.rq.active_deferred.called
68 self.rq.run()
69- self.assertTrue(cmd1.resumed)
70- self.assertTrue(cmd2.resumed)
71+ self.assertTrue(self.rq.active_deferred.called)
72
73 def test_stop_goes_inactive(self):
74 """Desactivate on stop."""
75@@ -571,18 +559,24 @@
76 self.assertTrue(cmd1.paused)
77 self.assertTrue(cmd2.paused)
78
79- def test_check_conditions(self):
80- """Check all conditions on the commands."""
81- # set up
82- cmd1 = FakeCommand()
83- cmd2 = FakeCommand()
84- self.rq.waiting.extend((cmd1, cmd2))
85- assert not cmd1.conditions_checked and not cmd2.conditions_checked
86-
87- # check conditions and test
88- self.rq.check_conditions()
89- self.assertTrue(cmd1.conditions_checked)
90- self.assertTrue(cmd2.conditions_checked)
91+ def test_stop_pause_useful_activedef(self):
92+ """Refresh the active_deferred before pausing."""
93+ checked = defer.Deferred()
94+
95+ def fake_pause():
96+ """Check that RQ has a useful active_deferred."""
97+ self.assertTrue(isinstance(self.rq.active_deferred,
98+ defer.Deferred))
99+ self.assertFalse(self.rq.active_deferred.called)
100+ checked.callback(True)
101+
102+ cmd = FakeCommand()
103+ cmd.pause = fake_pause
104+ self.rq.waiting.append(cmd)
105+
106+ # stop and test
107+ self.rq.stop()
108+ return checked
109
110 def test_unqueue_remove(self):
111 """Remove the command from queue on unqueue."""
112@@ -1781,11 +1775,14 @@
113 # run first time
114 self.cmd.run()
115 self.assertFalse(called)
116+ self.assertTrue(self.handler.check_debug(
117+ 'not running because of inactive queue'))
118+ self.assertFalse(self.handler.check_debug('unblocked: queue active'))
119
120 # active the queue
121- self.rq.active = True
122- self.cmd.resume()
123+ self.rq.run()
124 self.assertTrue(called)
125+ self.assertTrue(self.handler.check_debug('unblocked: queue active'))
126
127 def test_run_command_not_runnable(self):
128 """Waiting cycle for command not runnable."""
129@@ -1800,11 +1797,15 @@
130 # run first time
131 self.cmd.run()
132 self.assertFalse(called)
133+ self.assertTrue(self.handler.check_debug(
134+ 'not running because of conditions'))
135+ self.assertFalse(self.handler.check_debug('unblocked: conditions ok'))
136
137 # active the command
138 self.cmd.is_runnable = True
139- self.cmd.check_conditions()
140+ self.action_queue.conditions_locker.check_conditions()
141 self.assertTrue(called)
142+ self.assertTrue(self.handler.check_debug('unblocked: conditions ok'))
143
144 def test_run_notrunnable_inactivequeue(self):
145 """Mixed behaviour between both stoppers."""
146@@ -1821,19 +1822,17 @@
147
148 # active the queue but inactive the command
149 self.cmd.is_runnable = False
150- self.rq.active = True
151- self.cmd.resume()
152+ self.rq.run()
153 self.assertFalse(called)
154
155 # active the command but inactive the queue again!
156- self.rq.active = False
157+ self.rq.stop()
158 self.cmd.is_runnable = True
159- self.cmd.check_conditions()
160+ self.action_queue.conditions_locker.check_conditions()
161 self.assertFalse(called)
162
163 # finally resume the queue
164- self.rq.active = True
165- self.cmd.resume()
166+ self.rq.run()
167 self.assertTrue(called)
168
169 def test_run_inactivequeue_cancel(self):
170@@ -1851,8 +1850,7 @@
171 self.cmd.cancel()
172
173 # active the queue
174- self.rq.active = True
175- self.cmd.resume()
176+ self.rq.run()
177 self.assertFalse(called)
178 self.assertTrue(self.handler.check_debug(
179 'cancelled before trying to run'))
180@@ -1873,7 +1871,7 @@
181
182 # active the command
183 self.cmd.is_runnable = True
184- self.cmd.check_conditions()
185+ self.action_queue.conditions_locker.check_conditions()
186 self.assertFalse(called)
187 self.handler.debug = True
188 self.assertTrue(self.handler.check_debug(
189@@ -1967,29 +1965,23 @@
190 called = []
191 self.cmd.finish = lambda: called.append(True)
192 self.cmd.markers_resolved_deferred = defer.succeed(True)
193+ self.rq.waiting.append(self.cmd)
194 assert self.rq.active
195
196 # deferreds, first one stucks, the second allows to continue
197 deferreds = [defer.Deferred(), defer.succeed(True)]
198-
199- def fake_run():
200- """Set the queue inactive to avoid retry loop and fail."""
201- self.rq.active = False
202- return deferreds.pop(0)
203-
204- # set up and test
205- self.cmd._run = fake_run
206+ self.cmd._run = lambda: deferreds.pop(0)
207
208 # run and check finish was not called
209 self.cmd.run()
210 self.assertFalse(called)
211
212 # pause, still nothing called
213- self.cmd.pause()
214+ self.rq.stop()
215+ self.assertFalse(called)
216
217 # resume, now it finished!
218- self.rq.active = True
219- self.cmd.resume()
220+ self.rq.run()
221 self.assertTrue(called)
222
223 @defer.inlineCallbacks
224@@ -2050,36 +2042,6 @@
225 self.assertTrue(self.handler.check_debug("pausing"))
226 self.assertTrue(called)
227
228- def test_resume(self):
229- """Trigger the deferred only if there."""
230- # nothing called when no deferred
231- assert self.cmd.wait_for_queue is None
232- self.cmd.resume()
233- self.assertFalse(self.handler.check_debug('resuming'))
234-
235- # the deferred is triggered if there
236- d = defer.Deferred()
237- self.cmd.wait_for_queue = d
238- self.cmd.resume()
239- self.assertIdentical(self.cmd.wait_for_queue, None)
240- self.assertTrue(d.called)
241- self.assertTrue(self.handler.check_debug('resuming'))
242-
243- def test_check_conditions(self):
244- """Trigger the deferred only if there."""
245- # nothing called when no deferred
246- assert self.cmd.wait_for_conditions is None
247- self.cmd.check_conditions()
248- self.assertFalse(self.handler.check_debug('unblocking conditions'))
249-
250- # the deferred is triggered if there
251- d = defer.Deferred()
252- self.cmd.wait_for_conditions = d
253- self.cmd.check_conditions()
254- self.assertIdentical(self.cmd.wait_for_conditions, None)
255- self.assertTrue(d.called)
256- self.assertTrue(self.handler.check_debug('unblocking conditions'))
257-
258 def test_cancel_works(self):
259 """Do default cleaning."""
260 called = []
261@@ -2093,18 +2055,9 @@
262 self.assertTrue(self.handler.check_debug('cancelled'))
263
264 def test_cancel_releases_conditions(self):
265- """Cancel unlocks the conditions deferred."""
266- self.cmd.finish = lambda: None # don't try to unqueue!
267- d = defer.Deferred()
268- self.cmd.wait_for_conditions = d
269- self.cmd.cancel()
270- self.assertTrue(d.called)
271-
272- def test_cancel_releases_queue(self):
273- """Cancel unlocks the wait-for-queue deferred."""
274- self.cmd.finish = lambda: None # don't try to unqueue!
275- d = defer.Deferred()
276- self.cmd.wait_for_queue = d
277+ """Cancel calls the conditions locker for the command."""
278+ self.cmd.finish = lambda: None # don't try to unqueue!
279+ d = self.action_queue.conditions_locker.get_lock(self.cmd)
280 self.cmd.cancel()
281 self.assertTrue(d.called)
282
283@@ -4896,7 +4849,7 @@
284
285 # fix conditions and check them
286 self.cmd.is_runnable = True
287- self.queue.check_conditions()
288+ self.action_queue.conditions_locker.check_conditions()
289
290 # all check
291 self.assertEqual(called, ['run', 'finish'])
292@@ -4920,7 +4873,7 @@
293 self.cmd.go()
294
295 # before the command finishes, all conditions are checked
296- self.queue.check_conditions()
297+ self.action_queue.conditions_locker.check_conditions()
298
299 # command finished
300 d.callback(2)
301@@ -5051,7 +5004,7 @@
302
303 # fix conditions
304 self.cmd.is_runnable = True
305- self.queue.check_conditions()
306+ self.action_queue.conditions_locker.check_conditions()
307
308 # need to wait the callLater
309 yield finished
310@@ -5127,7 +5080,7 @@
311 def test_cancel_while_waiting_queue(self):
312 """Cancel the command while waiting for queue."""
313 # stop the queue, and fake the pathlock to test releasing
314- self.queue.active = False
315+ self.queue.stop()
316 released = []
317 self.cmd._acquire_pathlock = lambda: defer.succeed(
318 lambda: released.append(True))
319@@ -5137,6 +5090,10 @@
320 self.cmd.go()
321 self.cmd.cancel()
322
323+ # now, set the queue active again, it should release everything
324+ # even if was cancelled before
325+ self.queue.run()
326+
327 # all check
328 self._check_finished_ok()
329 self.assertTrue(released)
330@@ -5311,3 +5268,99 @@
331
332 # further callback to original deferred is harmless
333 origdef.errback(ValueError('foo'))
334+
335+
336+class ConditionsLockerTests(TwistedTestCase):
337+ """Test the ConditionsLocker."""
338+
339+ def setUp(self):
340+ """Set up."""
341+ self.cl = ConditionsLocker()
342+
343+ def test_get_locking_deferred_returns_deferred(self):
344+ """The locking is done by a deferred."""
345+ d = self.cl.get_lock('command')
346+ d.callback(True)
347+ return d
348+
349+ def test_get_locking_different_commands_different_deferreds(self):
350+ """Asked by two commands, get two deferreds."""
351+ d1 = self.cl.get_lock('command1')
352+ d2 = self.cl.get_lock('command2')
353+ self.assertNotIdentical(d1, d2)
354+
355+ def test_get_locking_same_command_same_deferred(self):
356+ """If asked twice by the same command, return the same deferred.
357+
358+ This is more a safe guard than a feature; if misused by the same
359+ command we're assuring than we will not overwrite a second deferred
360+ over the first one (so, never releasing the first one).
361+ """
362+ d1 = self.cl.get_lock('command')
363+ d2 = self.cl.get_lock('command')
364+ self.assertIdentical(d1, d2)
365+
366+ def test_check_conditions_simple_runnable(self):
367+ """Release the command."""
368+ cmd = FakeCommand()
369+ locking_d = self.cl.get_lock(cmd)
370+ self.assertFalse(locking_d.called)
371+ self.assertIn(cmd, self.cl.locked)
372+
373+ # release it!
374+ assert cmd.is_runnable
375+ self.cl.check_conditions()
376+ self.assertTrue(locking_d.called)
377+ self.assertNotIn(cmd, self.cl.locked)
378+
379+ def test_check_conditions_simple_notrunnable_then_ok(self):
380+ """First don't release the command, then release it."""
381+ cmd = FakeCommand()
382+ locking_d = self.cl.get_lock(cmd)
383+ self.assertFalse(locking_d.called)
384+
385+ # check for conditions, do not release
386+ cmd.is_runnable = False
387+ self.cl.check_conditions()
388+ self.assertFalse(locking_d.called)
389+
390+ # conditions are ok now, release
391+ cmd.is_runnable = True
392+ self.cl.check_conditions()
393+ self.assertTrue(locking_d.called)
394+
395+ def test_check_conditions_mixed(self):
396+ """Several commands, mixed situation."""
397+ cmd1 = FakeCommand()
398+ cmd1.is_runnable = False
399+ cmd2 = FakeCommand()
400+ assert cmd2.is_runnable
401+
402+ # get lock for both, and check conditions
403+ locking_d1 = self.cl.get_lock(cmd1)
404+ locking_d2 = self.cl.get_lock(cmd2)
405+ self.cl.check_conditions()
406+
407+ # one should be released, the other should not
408+ self.assertFalse(locking_d1.called)
409+ self.assertTrue(locking_d2.called)
410+
411+ def test_cancel_command_nothold(self):
412+ """It's ok to cancel a command not there."""
413+ self.cl.cancel_command('command')
414+
415+ def test_cancel_releases_cancelled_command(self):
416+ """It releases the cancelled command, even not runnable."""
417+ cmd1 = FakeCommand()
418+ cmd1.is_runnable = False
419+ cmd2 = FakeCommand()
420+ assert cmd2.is_runnable
421+
422+ # get lock for both, and cancel only 1
423+ locking_d1 = self.cl.get_lock(cmd1)
424+ locking_d2 = self.cl.get_lock(cmd2)
425+ self.cl.cancel_command(cmd1)
426+
427+ # 1 should be released, 2 should not (even with conditions ok)
428+ self.assertTrue(locking_d1.called)
429+ self.assertFalse(locking_d2.called)
430
431=== modified file 'ubuntuone/syncdaemon/action_queue.py'
432--- ubuntuone/syncdaemon/action_queue.py 2011-03-07 16:54:25 +0000
433+++ ubuntuone/syncdaemon/action_queue.py 2011-03-08 13:52:39 +0000
434@@ -455,6 +455,7 @@
435 self.hashed_waiting = {}
436 self.active = False
437 self.transfers_semaphore = defer.DeferredSemaphore(SIMULT_TRANSFERS)
438+ self.active_deferred = defer.Deferred()
439
440 def __len__(self):
441 """Return the length of the waiting queue."""
442@@ -488,20 +489,15 @@
443 if len(self.waiting) == 0:
444 self.action_queue.event_queue.push('SYS_QUEUE_DONE')
445
446- def check_conditions(self):
447- """Check conditions on which the commands may be waiting."""
448- for command in self.waiting[:]:
449- command.check_conditions()
450-
451 def run(self):
452 """Go active and run all commands in the queue."""
453 self.active = True
454- for command in self.waiting[:]:
455- command.resume()
456+ self.active_deferred.callback(True)
457
458 def stop(self):
459 """Stop the pool and cleanup the running commands."""
460 self.active = False
461+ self.active_deferred = defer.Deferred()
462 for command in self.waiting:
463 command.pause()
464
465@@ -557,6 +553,36 @@
466 d.errback(failure)
467
468
469+class ConditionsLocker(object):
470+ """Structure to hold commands waiting because of conditions.
471+
472+ On each call to lock it will return a deferred for the received
473+ command. When check_conditions is called, it will trigger each
474+ command deferred if it's runnable.
475+ """
476+ def __init__(self):
477+ self.locked = {}
478+
479+ def get_lock(self, command):
480+ """Return the deferred that will lock the command."""
481+ if command not in self.locked:
482+ self.locked[command] = defer.Deferred()
483+ return self.locked[command]
484+
485+ def check_conditions(self):
486+ """Check for all commands' conditions, and release accordingly."""
487+ for cmd in self.locked.keys():
488+ if cmd.is_runnable:
489+ deferred = self.locked.pop(cmd)
490+ deferred.callback(True)
491+
492+ def cancel_command(self, command):
493+ """The command was cancelled, if lock hold, release it and clean."""
494+ if command in self.locked:
495+ deferred = self.locked.pop(command)
496+ deferred.callback(True)
497+
498+
499 class UploadProgressWrapper(object):
500 """A wrapper around the file-like object used for Uploads.
501
502@@ -630,13 +656,14 @@
503 self.pathlock = PathLockingTree()
504 self.uuid_map = DeferredMap()
505 self.zip_queue = ZipQueue()
506+ self.conditions_locker = ConditionsLocker()
507
508 self.estimated_free_space = {}
509 event_queue.subscribe(self)
510
511 def check_conditions(self):
512- """Poll conditions on which running actions may be waiting."""
513- self.queue.check_conditions()
514+ """Check conditions in the locker, to release all the waiting ops."""
515+ self.conditions_locker.check_conditions()
516
517 def have_sufficient_space_for_upload(self, share_id, upload_size):
518 """Returns True if we have sufficient space for the given upload."""
519@@ -1099,7 +1126,7 @@
520
521 __slots__ = ('_queue', 'running', 'pathlock_release', 'log',
522 'markers_resolved_deferred', 'action_queue', 'cancelled',
523- 'wait_for_queue', 'wait_for_conditions', 'running_deferred')
524+ 'running_deferred')
525
526 def __init__(self, request_queue):
527 """Initialize a command instance."""
528@@ -1110,9 +1137,6 @@
529 self.markers_resolved_deferred = defer.Deferred()
530 self.pathlock_release = None
531 self.cancelled = False
532-
533- self.wait_for_queue = None
534- self.wait_for_conditions = None
535 self.running_deferred = None
536
537 def to_dict(self):
538@@ -1196,20 +1220,6 @@
539 self.running_deferred.interrupt()
540 self.cleanup()
541
542- def resume(self):
543- """Unlock the command because the queue is back alive."""
544- if self.wait_for_queue is not None:
545- self.log.debug('resuming')
546- self.wait_for_queue.callback(True)
547- self.wait_for_queue = None
548-
549- def check_conditions(self):
550- """If conditions are ok, run the command again."""
551- if self.is_runnable and self.wait_for_conditions is not None:
552- self.log.debug('unblocking conditions')
553- self.wait_for_conditions.callback(True)
554- self.wait_for_conditions = None
555-
556 @defer.inlineCallbacks
557 def go(self):
558 """Execute all the steps for a command."""
559@@ -1261,14 +1271,14 @@
560 # if queue not active, wait for it and check again
561 if not self._queue.active:
562 self.log.debug('not running because of inactive queue')
563- self.wait_for_queue = defer.Deferred()
564- yield self.wait_for_queue
565+ yield self._queue.active_deferred
566+ self.log.debug('unblocked: queue active')
567 continue
568
569 if not self.is_runnable:
570 self.log.debug('not running because of conditions')
571- self.wait_for_conditions = defer.Deferred()
572- yield self.wait_for_conditions
573+ yield self.action_queue.conditions_locker.get_lock(self)
574+ self.log.debug('unblocked: conditions ok')
575 continue
576
577 try:
578@@ -1313,8 +1323,7 @@
579 def cancel(self):
580 """Cancel the command.
581
582- Also trigger the wait_for_condition and wait_for_queue deferreds, to
583- unlock the command and finally release the pathlock.
584+ Also cancel the command in the conditions locker.
585
586 Do nothing if already cancelled (as cancellation can come from other
587 thread, it can come at any time, so we need to support double
588@@ -1327,12 +1336,7 @@
589
590 self.cancelled = True
591 self.log.debug('cancelled')
592- if self.wait_for_conditions is not None:
593- self.wait_for_conditions.callback(True)
594- self.wait_for_conditions = None
595- if self.wait_for_queue is not None:
596- self.wait_for_queue.callback(True)
597- self.wait_for_queue = None
598+ self.action_queue.conditions_locker.cancel_command(self)
599 self.cleanup()
600 self.finish()
601 return True

Subscribers

People subscribed via source and target branches