Merge lp:~facundo/ubuntuone-client/aq-better-waiting-structures into lp:ubuntuone-client
- aq-better-waiting-structures
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Facundo Batista |
Approved revision: | 913 |
Merged at revision: | 911 |
Proposed branch: | lp:~facundo/ubuntuone-client/aq-better-waiting-structures |
Merge into: | lp:ubuntuone-client |
Diff against target: |
601 lines (+198/-141) 2 files modified
tests/syncdaemon/test_action_queue.py (+155/-102) ubuntuone/syncdaemon/action_queue.py (+43/-39) |
To merge this branch: | bzr merge lp:~facundo/ubuntuone-client/aq-better-waiting-structures |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Facundo Batista (community) | Approve | ||
Lucio Torre (community) | Approve | ||
Review via email: mp+52557@code.launchpad.net |
Commit message
Better waiting structures for 'conditions' and 'inactive queue' (LP: #720844)
Description of the change
Better waiting structures for 'conditions' and 'inactive queue'
Both types of waiting needs are very different, so there's an explanation
for each.
Wait for conditions, this rarely happen, so there is no problem to create a deferred for each command to lock in this case. The problem here was that on each "check_conditions" called (that happens frequently, and will happen more frequently after a bug is fixed in VM), *all* commands were called to check_conditions, and this could be very expensive when a lot of commands were queued.
Now, a waiting structure is created for this case. The command just locks through it, and the check conditions are called only through this self-locked commands.
Wait for active queue: this happened a lot (every time the client disconnected), and each time it happened, a deferred were created for each command queued, which could be very expensive memory-wise. Furthermore, when the queue was run again, all commands were sequentially called to unlock that deferred, which was expensive cpu-wise.
Now, a single deferred is used for all commands (that live in the queue). All commands wait for that deferred, and when the queue runs, it just triggers that deferred.
Tests included for everything.
Lucio Torre (lucio.torre) : | # |
Preview Diff
1 | === modified file 'tests/syncdaemon/test_action_queue.py' | |||
2 | --- tests/syncdaemon/test_action_queue.py 2011-03-07 16:54:25 +0000 | |||
3 | +++ tests/syncdaemon/test_action_queue.py 2011-03-08 13:52:39 +0000 | |||
4 | @@ -63,7 +63,7 @@ | |||
5 | 63 | CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch, | 63 | CreateShare, DeleteShare, GetPublicFiles, GetDelta, GetDeltaFromScratch, |
6 | 64 | TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList, | 64 | TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, DeltaList, |
7 | 65 | ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree, | 65 | ZipQueue, DeferredMap, ThrottlingStorageClient, PathLockingTree, |
9 | 66 | InterruptibleDeferred, DeferredInterrupted, | 66 | InterruptibleDeferred, DeferredInterrupted, ConditionsLocker, |
10 | 67 | ) | 67 | ) |
11 | 68 | from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS | 68 | from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS |
12 | 69 | from ubuntuone.syncdaemon.marker import MDMarker | 69 | from ubuntuone.syncdaemon.marker import MDMarker |
13 | @@ -105,7 +105,6 @@ | |||
14 | 105 | 105 | ||
15 | 106 | is_runnable = True | 106 | is_runnable = True |
16 | 107 | paused = False | 107 | paused = False |
17 | 108 | resumed = False | ||
18 | 109 | conditions_checked = False | 108 | conditions_checked = False |
19 | 110 | 109 | ||
20 | 111 | def __init__(self, share_id=None, node_id=None): | 110 | def __init__(self, share_id=None, node_id=None): |
21 | @@ -120,10 +119,6 @@ | |||
22 | 120 | """Mark as paused.""" | 119 | """Mark as paused.""" |
23 | 121 | self.paused = True | 120 | self.paused = True |
24 | 122 | 121 | ||
25 | 123 | def resume(self): | ||
26 | 124 | """Mark as resumed.""" | ||
27 | 125 | self.resumed = True | ||
28 | 126 | |||
29 | 127 | @property | 122 | @property |
30 | 128 | def uniqueness(self): | 123 | def uniqueness(self): |
31 | 129 | """Fake uniqueness.""" | 124 | """Fake uniqueness.""" |
32 | @@ -136,10 +131,6 @@ | |||
33 | 136 | """Cancel!""" | 131 | """Cancel!""" |
34 | 137 | self.cancelled = True | 132 | self.cancelled = True |
35 | 138 | 133 | ||
36 | 139 | def check_conditions(self): | ||
37 | 140 | """Mark as checked.""" | ||
38 | 141 | self.conditions_checked = True | ||
39 | 142 | |||
40 | 143 | 134 | ||
41 | 144 | class FakedEventQueue(EventQueue): | 135 | class FakedEventQueue(EventQueue): |
42 | 145 | """Faked event queue.""" | 136 | """Faked event queue.""" |
43 | @@ -534,23 +525,20 @@ | |||
44 | 534 | """RQ borns not active.""" | 525 | """RQ borns not active.""" |
45 | 535 | self.assertFalse(self.rq.active) | 526 | self.assertFalse(self.rq.active) |
46 | 536 | 527 | ||
47 | 528 | def test_init_activedef(self): | ||
48 | 529 | """Just instanced queue has the deferred to take.""" | ||
49 | 530 | self.assertTrue(isinstance(self.rq.active_deferred, defer.Deferred)) | ||
50 | 531 | |||
51 | 537 | def test_run_goes_active(self): | 532 | def test_run_goes_active(self): |
52 | 538 | """Activate on run.""" | 533 | """Activate on run.""" |
53 | 539 | self.rq.run() | 534 | self.rq.run() |
54 | 540 | self.assertTrue(self.rq.active) | 535 | self.assertTrue(self.rq.active) |
55 | 541 | 536 | ||
65 | 542 | def test_run_resume_commands(self): | 537 | def test_run_triggers_activedef(self): |
66 | 543 | """Resume all queued command on run.""" | 538 | """Trigger the active_deferred on run.""" |
67 | 544 | # set up | 539 | assert not self.rq.active_deferred.called |
59 | 545 | cmd1 = FakeCommand() | ||
60 | 546 | cmd2 = FakeCommand() | ||
61 | 547 | self.rq.waiting.extend((cmd1, cmd2)) | ||
62 | 548 | assert not cmd1.resumed and not cmd2.resumed | ||
63 | 549 | |||
64 | 550 | # run and check | ||
68 | 551 | self.rq.run() | 540 | self.rq.run() |
71 | 552 | self.assertTrue(cmd1.resumed) | 541 | self.assertTrue(self.rq.active_deferred.called) |
70 | 553 | self.assertTrue(cmd2.resumed) | ||
72 | 554 | 542 | ||
73 | 555 | def test_stop_goes_inactive(self): | 543 | def test_stop_goes_inactive(self): |
74 | 556 | """Desactivate on stop.""" | 544 | """Desactivate on stop.""" |
75 | @@ -571,18 +559,24 @@ | |||
76 | 571 | self.assertTrue(cmd1.paused) | 559 | self.assertTrue(cmd1.paused) |
77 | 572 | self.assertTrue(cmd2.paused) | 560 | self.assertTrue(cmd2.paused) |
78 | 573 | 561 | ||
91 | 574 | def test_check_conditions(self): | 562 | def test_stop_pause_useful_activedef(self): |
92 | 575 | """Check all conditions on the commands.""" | 563 | """Refresh the active_deferred before pausing.""" |
93 | 576 | # set up | 564 | checked = defer.Deferred() |
94 | 577 | cmd1 = FakeCommand() | 565 | |
95 | 578 | cmd2 = FakeCommand() | 566 | def fake_pause(): |
96 | 579 | self.rq.waiting.extend((cmd1, cmd2)) | 567 | """Check that RQ has a useful active_deferred.""" |
97 | 580 | assert not cmd1.conditions_checked and not cmd2.conditions_checked | 568 | self.assertTrue(isinstance(self.rq.active_deferred, |
98 | 581 | 569 | defer.Deferred)) | |
99 | 582 | # check conditions and test | 570 | self.assertFalse(self.rq.active_deferred.called) |
100 | 583 | self.rq.check_conditions() | 571 | checked.callback(True) |
101 | 584 | self.assertTrue(cmd1.conditions_checked) | 572 | |
102 | 585 | self.assertTrue(cmd2.conditions_checked) | 573 | cmd = FakeCommand() |
103 | 574 | cmd.pause = fake_pause | ||
104 | 575 | self.rq.waiting.append(cmd) | ||
105 | 576 | |||
106 | 577 | # stop and test | ||
107 | 578 | self.rq.stop() | ||
108 | 579 | return checked | ||
109 | 586 | 580 | ||
110 | 587 | def test_unqueue_remove(self): | 581 | def test_unqueue_remove(self): |
111 | 588 | """Remove the command from queue on unqueue.""" | 582 | """Remove the command from queue on unqueue.""" |
112 | @@ -1781,11 +1775,14 @@ | |||
113 | 1781 | # run first time | 1775 | # run first time |
114 | 1782 | self.cmd.run() | 1776 | self.cmd.run() |
115 | 1783 | self.assertFalse(called) | 1777 | self.assertFalse(called) |
116 | 1778 | self.assertTrue(self.handler.check_debug( | ||
117 | 1779 | 'not running because of inactive queue')) | ||
118 | 1780 | self.assertFalse(self.handler.check_debug('unblocked: queue active')) | ||
119 | 1784 | 1781 | ||
120 | 1785 | # active the queue | 1782 | # active the queue |
123 | 1786 | self.rq.active = True | 1783 | self.rq.run() |
122 | 1787 | self.cmd.resume() | ||
124 | 1788 | self.assertTrue(called) | 1784 | self.assertTrue(called) |
125 | 1785 | self.assertTrue(self.handler.check_debug('unblocked: queue active')) | ||
126 | 1789 | 1786 | ||
127 | 1790 | def test_run_command_not_runnable(self): | 1787 | def test_run_command_not_runnable(self): |
128 | 1791 | """Waiting cycle for command not runnable.""" | 1788 | """Waiting cycle for command not runnable.""" |
129 | @@ -1800,11 +1797,15 @@ | |||
130 | 1800 | # run first time | 1797 | # run first time |
131 | 1801 | self.cmd.run() | 1798 | self.cmd.run() |
132 | 1802 | self.assertFalse(called) | 1799 | self.assertFalse(called) |
133 | 1800 | self.assertTrue(self.handler.check_debug( | ||
134 | 1801 | 'not running because of conditions')) | ||
135 | 1802 | self.assertFalse(self.handler.check_debug('unblocked: conditions ok')) | ||
136 | 1803 | 1803 | ||
137 | 1804 | # active the command | 1804 | # active the command |
138 | 1805 | self.cmd.is_runnable = True | 1805 | self.cmd.is_runnable = True |
140 | 1806 | self.cmd.check_conditions() | 1806 | self.action_queue.conditions_locker.check_conditions() |
141 | 1807 | self.assertTrue(called) | 1807 | self.assertTrue(called) |
142 | 1808 | self.assertTrue(self.handler.check_debug('unblocked: conditions ok')) | ||
143 | 1808 | 1809 | ||
144 | 1809 | def test_run_notrunnable_inactivequeue(self): | 1810 | def test_run_notrunnable_inactivequeue(self): |
145 | 1810 | """Mixed behaviour between both stoppers.""" | 1811 | """Mixed behaviour between both stoppers.""" |
146 | @@ -1821,19 +1822,17 @@ | |||
147 | 1821 | 1822 | ||
148 | 1822 | # active the queue but inactive the command | 1823 | # active the queue but inactive the command |
149 | 1823 | self.cmd.is_runnable = False | 1824 | self.cmd.is_runnable = False |
152 | 1824 | self.rq.active = True | 1825 | self.rq.run() |
151 | 1825 | self.cmd.resume() | ||
153 | 1826 | self.assertFalse(called) | 1826 | self.assertFalse(called) |
154 | 1827 | 1827 | ||
155 | 1828 | # active the command but inactive the queue again! | 1828 | # active the command but inactive the queue again! |
157 | 1829 | self.rq.active = False | 1829 | self.rq.stop() |
158 | 1830 | self.cmd.is_runnable = True | 1830 | self.cmd.is_runnable = True |
160 | 1831 | self.cmd.check_conditions() | 1831 | self.action_queue.conditions_locker.check_conditions() |
161 | 1832 | self.assertFalse(called) | 1832 | self.assertFalse(called) |
162 | 1833 | 1833 | ||
163 | 1834 | # finally resume the queue | 1834 | # finally resume the queue |
166 | 1835 | self.rq.active = True | 1835 | self.rq.run() |
165 | 1836 | self.cmd.resume() | ||
167 | 1837 | self.assertTrue(called) | 1836 | self.assertTrue(called) |
168 | 1838 | 1837 | ||
169 | 1839 | def test_run_inactivequeue_cancel(self): | 1838 | def test_run_inactivequeue_cancel(self): |
170 | @@ -1851,8 +1850,7 @@ | |||
171 | 1851 | self.cmd.cancel() | 1850 | self.cmd.cancel() |
172 | 1852 | 1851 | ||
173 | 1853 | # active the queue | 1852 | # active the queue |
176 | 1854 | self.rq.active = True | 1853 | self.rq.run() |
175 | 1855 | self.cmd.resume() | ||
177 | 1856 | self.assertFalse(called) | 1854 | self.assertFalse(called) |
178 | 1857 | self.assertTrue(self.handler.check_debug( | 1855 | self.assertTrue(self.handler.check_debug( |
179 | 1858 | 'cancelled before trying to run')) | 1856 | 'cancelled before trying to run')) |
180 | @@ -1873,7 +1871,7 @@ | |||
181 | 1873 | 1871 | ||
182 | 1874 | # active the command | 1872 | # active the command |
183 | 1875 | self.cmd.is_runnable = True | 1873 | self.cmd.is_runnable = True |
185 | 1876 | self.cmd.check_conditions() | 1874 | self.action_queue.conditions_locker.check_conditions() |
186 | 1877 | self.assertFalse(called) | 1875 | self.assertFalse(called) |
187 | 1878 | self.handler.debug = True | 1876 | self.handler.debug = True |
188 | 1879 | self.assertTrue(self.handler.check_debug( | 1877 | self.assertTrue(self.handler.check_debug( |
189 | @@ -1967,29 +1965,23 @@ | |||
190 | 1967 | called = [] | 1965 | called = [] |
191 | 1968 | self.cmd.finish = lambda: called.append(True) | 1966 | self.cmd.finish = lambda: called.append(True) |
192 | 1969 | self.cmd.markers_resolved_deferred = defer.succeed(True) | 1967 | self.cmd.markers_resolved_deferred = defer.succeed(True) |
193 | 1968 | self.rq.waiting.append(self.cmd) | ||
194 | 1970 | assert self.rq.active | 1969 | assert self.rq.active |
195 | 1971 | 1970 | ||
196 | 1972 | # deferreds, first one stucks, the second allows to continue | 1971 | # deferreds, first one stucks, the second allows to continue |
197 | 1973 | deferreds = [defer.Deferred(), defer.succeed(True)] | 1972 | deferreds = [defer.Deferred(), defer.succeed(True)] |
206 | 1974 | 1973 | self.cmd._run = lambda: deferreds.pop(0) | |
199 | 1975 | def fake_run(): | ||
200 | 1976 | """Set the queue inactive to avoid retry loop and fail.""" | ||
201 | 1977 | self.rq.active = False | ||
202 | 1978 | return deferreds.pop(0) | ||
203 | 1979 | |||
204 | 1980 | # set up and test | ||
205 | 1981 | self.cmd._run = fake_run | ||
207 | 1982 | 1974 | ||
208 | 1983 | # run and check finish was not called | 1975 | # run and check finish was not called |
209 | 1984 | self.cmd.run() | 1976 | self.cmd.run() |
210 | 1985 | self.assertFalse(called) | 1977 | self.assertFalse(called) |
211 | 1986 | 1978 | ||
212 | 1987 | # pause, still nothing called | 1979 | # pause, still nothing called |
214 | 1988 | self.cmd.pause() | 1980 | self.rq.stop() |
215 | 1981 | self.assertFalse(called) | ||
216 | 1989 | 1982 | ||
217 | 1990 | # resume, now it finished! | 1983 | # resume, now it finished! |
220 | 1991 | self.rq.active = True | 1984 | self.rq.run() |
219 | 1992 | self.cmd.resume() | ||
221 | 1993 | self.assertTrue(called) | 1985 | self.assertTrue(called) |
222 | 1994 | 1986 | ||
223 | 1995 | @defer.inlineCallbacks | 1987 | @defer.inlineCallbacks |
224 | @@ -2050,36 +2042,6 @@ | |||
225 | 2050 | self.assertTrue(self.handler.check_debug("pausing")) | 2042 | self.assertTrue(self.handler.check_debug("pausing")) |
226 | 2051 | self.assertTrue(called) | 2043 | self.assertTrue(called) |
227 | 2052 | 2044 | ||
228 | 2053 | def test_resume(self): | ||
229 | 2054 | """Trigger the deferred only if there.""" | ||
230 | 2055 | # nothing called when no deferred | ||
231 | 2056 | assert self.cmd.wait_for_queue is None | ||
232 | 2057 | self.cmd.resume() | ||
233 | 2058 | self.assertFalse(self.handler.check_debug('resuming')) | ||
234 | 2059 | |||
235 | 2060 | # the deferred is triggered if there | ||
236 | 2061 | d = defer.Deferred() | ||
237 | 2062 | self.cmd.wait_for_queue = d | ||
238 | 2063 | self.cmd.resume() | ||
239 | 2064 | self.assertIdentical(self.cmd.wait_for_queue, None) | ||
240 | 2065 | self.assertTrue(d.called) | ||
241 | 2066 | self.assertTrue(self.handler.check_debug('resuming')) | ||
242 | 2067 | |||
243 | 2068 | def test_check_conditions(self): | ||
244 | 2069 | """Trigger the deferred only if there.""" | ||
245 | 2070 | # nothing called when no deferred | ||
246 | 2071 | assert self.cmd.wait_for_conditions is None | ||
247 | 2072 | self.cmd.check_conditions() | ||
248 | 2073 | self.assertFalse(self.handler.check_debug('unblocking conditions')) | ||
249 | 2074 | |||
250 | 2075 | # the deferred is triggered if there | ||
251 | 2076 | d = defer.Deferred() | ||
252 | 2077 | self.cmd.wait_for_conditions = d | ||
253 | 2078 | self.cmd.check_conditions() | ||
254 | 2079 | self.assertIdentical(self.cmd.wait_for_conditions, None) | ||
255 | 2080 | self.assertTrue(d.called) | ||
256 | 2081 | self.assertTrue(self.handler.check_debug('unblocking conditions')) | ||
257 | 2082 | |||
258 | 2083 | def test_cancel_works(self): | 2045 | def test_cancel_works(self): |
259 | 2084 | """Do default cleaning.""" | 2046 | """Do default cleaning.""" |
260 | 2085 | called = [] | 2047 | called = [] |
261 | @@ -2093,18 +2055,9 @@ | |||
262 | 2093 | self.assertTrue(self.handler.check_debug('cancelled')) | 2055 | self.assertTrue(self.handler.check_debug('cancelled')) |
263 | 2094 | 2056 | ||
264 | 2095 | def test_cancel_releases_conditions(self): | 2057 | def test_cancel_releases_conditions(self): |
277 | 2096 | """Cancel unlocks the conditions deferred.""" | 2058 | """Cancel calls the conditions locker for the command.""" |
278 | 2097 | self.cmd.finish = lambda: None # don't try to unqueue! | 2059 | self.cmd.finish = lambda: None # don't try to unqueue! |
279 | 2098 | d = defer.Deferred() | 2060 | d = self.action_queue.conditions_locker.get_lock(self.cmd) |
268 | 2099 | self.cmd.wait_for_conditions = d | ||
269 | 2100 | self.cmd.cancel() | ||
270 | 2101 | self.assertTrue(d.called) | ||
271 | 2102 | |||
272 | 2103 | def test_cancel_releases_queue(self): | ||
273 | 2104 | """Cancel unlocks the wait-for-queue deferred.""" | ||
274 | 2105 | self.cmd.finish = lambda: None # don't try to unqueue! | ||
275 | 2106 | d = defer.Deferred() | ||
276 | 2107 | self.cmd.wait_for_queue = d | ||
280 | 2108 | self.cmd.cancel() | 2061 | self.cmd.cancel() |
281 | 2109 | self.assertTrue(d.called) | 2062 | self.assertTrue(d.called) |
282 | 2110 | 2063 | ||
283 | @@ -4896,7 +4849,7 @@ | |||
284 | 4896 | 4849 | ||
285 | 4897 | # fix conditions and check them | 4850 | # fix conditions and check them |
286 | 4898 | self.cmd.is_runnable = True | 4851 | self.cmd.is_runnable = True |
288 | 4899 | self.queue.check_conditions() | 4852 | self.action_queue.conditions_locker.check_conditions() |
289 | 4900 | 4853 | ||
290 | 4901 | # all check | 4854 | # all check |
291 | 4902 | self.assertEqual(called, ['run', 'finish']) | 4855 | self.assertEqual(called, ['run', 'finish']) |
292 | @@ -4920,7 +4873,7 @@ | |||
293 | 4920 | self.cmd.go() | 4873 | self.cmd.go() |
294 | 4921 | 4874 | ||
295 | 4922 | # before the command finishes, all conditions are checked | 4875 | # before the command finishes, all conditions are checked |
297 | 4923 | self.queue.check_conditions() | 4876 | self.action_queue.conditions_locker.check_conditions() |
298 | 4924 | 4877 | ||
299 | 4925 | # command finished | 4878 | # command finished |
300 | 4926 | d.callback(2) | 4879 | d.callback(2) |
301 | @@ -5051,7 +5004,7 @@ | |||
302 | 5051 | 5004 | ||
303 | 5052 | # fix conditions | 5005 | # fix conditions |
304 | 5053 | self.cmd.is_runnable = True | 5006 | self.cmd.is_runnable = True |
306 | 5054 | self.queue.check_conditions() | 5007 | self.action_queue.conditions_locker.check_conditions() |
307 | 5055 | 5008 | ||
308 | 5056 | # need to wait the callLater | 5009 | # need to wait the callLater |
309 | 5057 | yield finished | 5010 | yield finished |
310 | @@ -5127,7 +5080,7 @@ | |||
311 | 5127 | def test_cancel_while_waiting_queue(self): | 5080 | def test_cancel_while_waiting_queue(self): |
312 | 5128 | """Cancel the command while waiting for queue.""" | 5081 | """Cancel the command while waiting for queue.""" |
313 | 5129 | # stop the queue, and fake the pathlock to test releasing | 5082 | # stop the queue, and fake the pathlock to test releasing |
315 | 5130 | self.queue.active = False | 5083 | self.queue.stop() |
316 | 5131 | released = [] | 5084 | released = [] |
317 | 5132 | self.cmd._acquire_pathlock = lambda: defer.succeed( | 5085 | self.cmd._acquire_pathlock = lambda: defer.succeed( |
318 | 5133 | lambda: released.append(True)) | 5086 | lambda: released.append(True)) |
319 | @@ -5137,6 +5090,10 @@ | |||
320 | 5137 | self.cmd.go() | 5090 | self.cmd.go() |
321 | 5138 | self.cmd.cancel() | 5091 | self.cmd.cancel() |
322 | 5139 | 5092 | ||
323 | 5093 | # now, set the queue active again, it should release everything | ||
324 | 5094 | # even if was cancelled before | ||
325 | 5095 | self.queue.run() | ||
326 | 5096 | |||
327 | 5140 | # all check | 5097 | # all check |
328 | 5141 | self._check_finished_ok() | 5098 | self._check_finished_ok() |
329 | 5142 | self.assertTrue(released) | 5099 | self.assertTrue(released) |
330 | @@ -5311,3 +5268,99 @@ | |||
331 | 5311 | 5268 | ||
332 | 5312 | # further callback to original deferred is harmless | 5269 | # further callback to original deferred is harmless |
333 | 5313 | origdef.errback(ValueError('foo')) | 5270 | origdef.errback(ValueError('foo')) |
334 | 5271 | |||
335 | 5272 | |||
336 | 5273 | class ConditionsLockerTests(TwistedTestCase): | ||
337 | 5274 | """Test the ConditionsLocker.""" | ||
338 | 5275 | |||
339 | 5276 | def setUp(self): | ||
340 | 5277 | """Set up.""" | ||
341 | 5278 | self.cl = ConditionsLocker() | ||
342 | 5279 | |||
343 | 5280 | def test_get_locking_deferred_returns_deferred(self): | ||
344 | 5281 | """The locking is done by a deferred.""" | ||
345 | 5282 | d = self.cl.get_lock('command') | ||
346 | 5283 | d.callback(True) | ||
347 | 5284 | return d | ||
348 | 5285 | |||
349 | 5286 | def test_get_locking_different_commands_different_deferreds(self): | ||
350 | 5287 | """Asked by two commands, get two deferreds.""" | ||
351 | 5288 | d1 = self.cl.get_lock('command1') | ||
352 | 5289 | d2 = self.cl.get_lock('command2') | ||
353 | 5290 | self.assertNotIdentical(d1, d2) | ||
354 | 5291 | |||
355 | 5292 | def test_get_locking_same_command_same_deferred(self): | ||
356 | 5293 | """If asked twice by the same command, return the same deferred. | ||
357 | 5294 | |||
358 | 5295 | This is more a safe guard than a feature; if misused by the same | ||
359 | 5296 | command we're assuring than we will not overwrite a second deferred | ||
360 | 5297 | over the first one (so, never releasing the first one). | ||
361 | 5298 | """ | ||
362 | 5299 | d1 = self.cl.get_lock('command') | ||
363 | 5300 | d2 = self.cl.get_lock('command') | ||
364 | 5301 | self.assertIdentical(d1, d2) | ||
365 | 5302 | |||
366 | 5303 | def test_check_conditions_simple_runnable(self): | ||
367 | 5304 | """Release the command.""" | ||
368 | 5305 | cmd = FakeCommand() | ||
369 | 5306 | locking_d = self.cl.get_lock(cmd) | ||
370 | 5307 | self.assertFalse(locking_d.called) | ||
371 | 5308 | self.assertIn(cmd, self.cl.locked) | ||
372 | 5309 | |||
373 | 5310 | # release it! | ||
374 | 5311 | assert cmd.is_runnable | ||
375 | 5312 | self.cl.check_conditions() | ||
376 | 5313 | self.assertTrue(locking_d.called) | ||
377 | 5314 | self.assertNotIn(cmd, self.cl.locked) | ||
378 | 5315 | |||
379 | 5316 | def test_check_conditions_simple_notrunnable_then_ok(self): | ||
380 | 5317 | """First don't release the command, then release it.""" | ||
381 | 5318 | cmd = FakeCommand() | ||
382 | 5319 | locking_d = self.cl.get_lock(cmd) | ||
383 | 5320 | self.assertFalse(locking_d.called) | ||
384 | 5321 | |||
385 | 5322 | # check for conditions, do not release | ||
386 | 5323 | cmd.is_runnable = False | ||
387 | 5324 | self.cl.check_conditions() | ||
388 | 5325 | self.assertFalse(locking_d.called) | ||
389 | 5326 | |||
390 | 5327 | # conditions are ok now, release | ||
391 | 5328 | cmd.is_runnable = True | ||
392 | 5329 | self.cl.check_conditions() | ||
393 | 5330 | self.assertTrue(locking_d.called) | ||
394 | 5331 | |||
395 | 5332 | def test_check_conditions_mixed(self): | ||
396 | 5333 | """Several commands, mixed situation.""" | ||
397 | 5334 | cmd1 = FakeCommand() | ||
398 | 5335 | cmd1.is_runnable = False | ||
399 | 5336 | cmd2 = FakeCommand() | ||
400 | 5337 | assert cmd2.is_runnable | ||
401 | 5338 | |||
402 | 5339 | # get lock for both, and check conditions | ||
403 | 5340 | locking_d1 = self.cl.get_lock(cmd1) | ||
404 | 5341 | locking_d2 = self.cl.get_lock(cmd2) | ||
405 | 5342 | self.cl.check_conditions() | ||
406 | 5343 | |||
407 | 5344 | # one should be released, the other should not | ||
408 | 5345 | self.assertFalse(locking_d1.called) | ||
409 | 5346 | self.assertTrue(locking_d2.called) | ||
410 | 5347 | |||
411 | 5348 | def test_cancel_command_nothold(self): | ||
412 | 5349 | """It's ok to cancel a command not there.""" | ||
413 | 5350 | self.cl.cancel_command('command') | ||
414 | 5351 | |||
415 | 5352 | def test_cancel_releases_cancelled_command(self): | ||
416 | 5353 | """It releases the cancelled command, even not runnable.""" | ||
417 | 5354 | cmd1 = FakeCommand() | ||
418 | 5355 | cmd1.is_runnable = False | ||
419 | 5356 | cmd2 = FakeCommand() | ||
420 | 5357 | assert cmd2.is_runnable | ||
421 | 5358 | |||
422 | 5359 | # get lock for both, and cancel only 1 | ||
423 | 5360 | locking_d1 = self.cl.get_lock(cmd1) | ||
424 | 5361 | locking_d2 = self.cl.get_lock(cmd2) | ||
425 | 5362 | self.cl.cancel_command(cmd1) | ||
426 | 5363 | |||
427 | 5364 | # 1 should be released, 2 should not (even with conditions ok) | ||
428 | 5365 | self.assertTrue(locking_d1.called) | ||
429 | 5366 | self.assertFalse(locking_d2.called) | ||
430 | 5314 | 5367 | ||
431 | === modified file 'ubuntuone/syncdaemon/action_queue.py' | |||
432 | --- ubuntuone/syncdaemon/action_queue.py 2011-03-07 16:54:25 +0000 | |||
433 | +++ ubuntuone/syncdaemon/action_queue.py 2011-03-08 13:52:39 +0000 | |||
434 | @@ -455,6 +455,7 @@ | |||
435 | 455 | self.hashed_waiting = {} | 455 | self.hashed_waiting = {} |
436 | 456 | self.active = False | 456 | self.active = False |
437 | 457 | self.transfers_semaphore = defer.DeferredSemaphore(SIMULT_TRANSFERS) | 457 | self.transfers_semaphore = defer.DeferredSemaphore(SIMULT_TRANSFERS) |
438 | 458 | self.active_deferred = defer.Deferred() | ||
439 | 458 | 459 | ||
440 | 459 | def __len__(self): | 460 | def __len__(self): |
441 | 460 | """Return the length of the waiting queue.""" | 461 | """Return the length of the waiting queue.""" |
442 | @@ -488,20 +489,15 @@ | |||
443 | 488 | if len(self.waiting) == 0: | 489 | if len(self.waiting) == 0: |
444 | 489 | self.action_queue.event_queue.push('SYS_QUEUE_DONE') | 490 | self.action_queue.event_queue.push('SYS_QUEUE_DONE') |
445 | 490 | 491 | ||
446 | 491 | def check_conditions(self): | ||
447 | 492 | """Check conditions on which the commands may be waiting.""" | ||
448 | 493 | for command in self.waiting[:]: | ||
449 | 494 | command.check_conditions() | ||
450 | 495 | |||
451 | 496 | def run(self): | 492 | def run(self): |
452 | 497 | """Go active and run all commands in the queue.""" | 493 | """Go active and run all commands in the queue.""" |
453 | 498 | self.active = True | 494 | self.active = True |
456 | 499 | for command in self.waiting[:]: | 495 | self.active_deferred.callback(True) |
455 | 500 | command.resume() | ||
457 | 501 | 496 | ||
458 | 502 | def stop(self): | 497 | def stop(self): |
459 | 503 | """Stop the pool and cleanup the running commands.""" | 498 | """Stop the pool and cleanup the running commands.""" |
460 | 504 | self.active = False | 499 | self.active = False |
461 | 500 | self.active_deferred = defer.Deferred() | ||
462 | 505 | for command in self.waiting: | 501 | for command in self.waiting: |
463 | 506 | command.pause() | 502 | command.pause() |
464 | 507 | 503 | ||
465 | @@ -557,6 +553,36 @@ | |||
466 | 557 | d.errback(failure) | 553 | d.errback(failure) |
467 | 558 | 554 | ||
468 | 559 | 555 | ||
469 | 556 | class ConditionsLocker(object): | ||
470 | 557 | """Structure to hold commands waiting because of conditions. | ||
471 | 558 | |||
472 | 559 | On each call to lock it will return a deferred for the received | ||
473 | 560 | command. When check_conditions is called, it will trigger each | ||
474 | 561 | command deferred if it's runnable. | ||
475 | 562 | """ | ||
476 | 563 | def __init__(self): | ||
477 | 564 | self.locked = {} | ||
478 | 565 | |||
479 | 566 | def get_lock(self, command): | ||
480 | 567 | """Return the deferred that will lock the command.""" | ||
481 | 568 | if command not in self.locked: | ||
482 | 569 | self.locked[command] = defer.Deferred() | ||
483 | 570 | return self.locked[command] | ||
484 | 571 | |||
485 | 572 | def check_conditions(self): | ||
486 | 573 | """Check for all commands' conditions, and release accordingly.""" | ||
487 | 574 | for cmd in self.locked.keys(): | ||
488 | 575 | if cmd.is_runnable: | ||
489 | 576 | deferred = self.locked.pop(cmd) | ||
490 | 577 | deferred.callback(True) | ||
491 | 578 | |||
492 | 579 | def cancel_command(self, command): | ||
493 | 580 | """The command was cancelled, if lock hold, release it and clean.""" | ||
494 | 581 | if command in self.locked: | ||
495 | 582 | deferred = self.locked.pop(command) | ||
496 | 583 | deferred.callback(True) | ||
497 | 584 | |||
498 | 585 | |||
499 | 560 | class UploadProgressWrapper(object): | 586 | class UploadProgressWrapper(object): |
500 | 561 | """A wrapper around the file-like object used for Uploads. | 587 | """A wrapper around the file-like object used for Uploads. |
501 | 562 | 588 | ||
502 | @@ -630,13 +656,14 @@ | |||
503 | 630 | self.pathlock = PathLockingTree() | 656 | self.pathlock = PathLockingTree() |
504 | 631 | self.uuid_map = DeferredMap() | 657 | self.uuid_map = DeferredMap() |
505 | 632 | self.zip_queue = ZipQueue() | 658 | self.zip_queue = ZipQueue() |
506 | 659 | self.conditions_locker = ConditionsLocker() | ||
507 | 633 | 660 | ||
508 | 634 | self.estimated_free_space = {} | 661 | self.estimated_free_space = {} |
509 | 635 | event_queue.subscribe(self) | 662 | event_queue.subscribe(self) |
510 | 636 | 663 | ||
511 | 637 | def check_conditions(self): | 664 | def check_conditions(self): |
514 | 638 | """Poll conditions on which running actions may be waiting.""" | 665 | """Check conditions in the locker, to release all the waiting ops.""" |
515 | 639 | self.queue.check_conditions() | 666 | self.conditions_locker.check_conditions() |
516 | 640 | 667 | ||
517 | 641 | def have_sufficient_space_for_upload(self, share_id, upload_size): | 668 | def have_sufficient_space_for_upload(self, share_id, upload_size): |
518 | 642 | """Returns True if we have sufficient space for the given upload.""" | 669 | """Returns True if we have sufficient space for the given upload.""" |
519 | @@ -1099,7 +1126,7 @@ | |||
520 | 1099 | 1126 | ||
521 | 1100 | __slots__ = ('_queue', 'running', 'pathlock_release', 'log', | 1127 | __slots__ = ('_queue', 'running', 'pathlock_release', 'log', |
522 | 1101 | 'markers_resolved_deferred', 'action_queue', 'cancelled', | 1128 | 'markers_resolved_deferred', 'action_queue', 'cancelled', |
524 | 1102 | 'wait_for_queue', 'wait_for_conditions', 'running_deferred') | 1129 | 'running_deferred') |
525 | 1103 | 1130 | ||
526 | 1104 | def __init__(self, request_queue): | 1131 | def __init__(self, request_queue): |
527 | 1105 | """Initialize a command instance.""" | 1132 | """Initialize a command instance.""" |
528 | @@ -1110,9 +1137,6 @@ | |||
529 | 1110 | self.markers_resolved_deferred = defer.Deferred() | 1137 | self.markers_resolved_deferred = defer.Deferred() |
530 | 1111 | self.pathlock_release = None | 1138 | self.pathlock_release = None |
531 | 1112 | self.cancelled = False | 1139 | self.cancelled = False |
532 | 1113 | |||
533 | 1114 | self.wait_for_queue = None | ||
534 | 1115 | self.wait_for_conditions = None | ||
535 | 1116 | self.running_deferred = None | 1140 | self.running_deferred = None |
536 | 1117 | 1141 | ||
537 | 1118 | def to_dict(self): | 1142 | def to_dict(self): |
538 | @@ -1196,20 +1220,6 @@ | |||
539 | 1196 | self.running_deferred.interrupt() | 1220 | self.running_deferred.interrupt() |
540 | 1197 | self.cleanup() | 1221 | self.cleanup() |
541 | 1198 | 1222 | ||
542 | 1199 | def resume(self): | ||
543 | 1200 | """Unlock the command because the queue is back alive.""" | ||
544 | 1201 | if self.wait_for_queue is not None: | ||
545 | 1202 | self.log.debug('resuming') | ||
546 | 1203 | self.wait_for_queue.callback(True) | ||
547 | 1204 | self.wait_for_queue = None | ||
548 | 1205 | |||
549 | 1206 | def check_conditions(self): | ||
550 | 1207 | """If conditions are ok, run the command again.""" | ||
551 | 1208 | if self.is_runnable and self.wait_for_conditions is not None: | ||
552 | 1209 | self.log.debug('unblocking conditions') | ||
553 | 1210 | self.wait_for_conditions.callback(True) | ||
554 | 1211 | self.wait_for_conditions = None | ||
555 | 1212 | |||
556 | 1213 | @defer.inlineCallbacks | 1223 | @defer.inlineCallbacks |
557 | 1214 | def go(self): | 1224 | def go(self): |
558 | 1215 | """Execute all the steps for a command.""" | 1225 | """Execute all the steps for a command.""" |
559 | @@ -1261,14 +1271,14 @@ | |||
560 | 1261 | # if queue not active, wait for it and check again | 1271 | # if queue not active, wait for it and check again |
561 | 1262 | if not self._queue.active: | 1272 | if not self._queue.active: |
562 | 1263 | self.log.debug('not running because of inactive queue') | 1273 | self.log.debug('not running because of inactive queue') |
565 | 1264 | self.wait_for_queue = defer.Deferred() | 1274 | yield self._queue.active_deferred |
566 | 1265 | yield self.wait_for_queue | 1275 | self.log.debug('unblocked: queue active') |
567 | 1266 | continue | 1276 | continue |
568 | 1267 | 1277 | ||
569 | 1268 | if not self.is_runnable: | 1278 | if not self.is_runnable: |
570 | 1269 | self.log.debug('not running because of conditions') | 1279 | self.log.debug('not running because of conditions') |
573 | 1270 | self.wait_for_conditions = defer.Deferred() | 1280 | yield self.action_queue.conditions_locker.get_lock(self) |
574 | 1271 | yield self.wait_for_conditions | 1281 | self.log.debug('unblocked: conditions ok') |
575 | 1272 | continue | 1282 | continue |
576 | 1273 | 1283 | ||
577 | 1274 | try: | 1284 | try: |
578 | @@ -1313,8 +1323,7 @@ | |||
579 | 1313 | def cancel(self): | 1323 | def cancel(self): |
580 | 1314 | """Cancel the command. | 1324 | """Cancel the command. |
581 | 1315 | 1325 | ||
584 | 1316 | Also trigger the wait_for_condition and wait_for_queue deferreds, to | 1326 | Also cancel the command in the conditions locker. |
583 | 1317 | unlock the command and finally release the pathlock. | ||
585 | 1318 | 1327 | ||
586 | 1319 | Do nothing if already cancelled (as cancellation can come from other | 1328 | Do nothing if already cancelled (as cancellation can come from other |
587 | 1320 | thread, it can come at any time, so we need to support double | 1329 | thread, it can come at any time, so we need to support double |
588 | @@ -1327,12 +1336,7 @@ | |||
589 | 1327 | 1336 | ||
590 | 1328 | self.cancelled = True | 1337 | self.cancelled = True |
591 | 1329 | self.log.debug('cancelled') | 1338 | self.log.debug('cancelled') |
598 | 1330 | if self.wait_for_conditions is not None: | 1339 | self.action_queue.conditions_locker.cancel_command(self) |
593 | 1331 | self.wait_for_conditions.callback(True) | ||
594 | 1332 | self.wait_for_conditions = None | ||
595 | 1333 | if self.wait_for_queue is not None: | ||
596 | 1334 | self.wait_for_queue.callback(True) | ||
597 | 1335 | self.wait_for_queue = None | ||
599 | 1336 | self.cleanup() | 1340 | self.cleanup() |
600 | 1337 | self.finish() | 1341 | self.finish() |
601 | 1338 | return True | 1342 | return True |
Approving with one review