Merge lp:~facundo/ubuntuone-client/aq-memory-improv-2 into lp:ubuntuone-client

Proposed by Facundo Batista
Status: Merged
Approved by: Guillermo Gonzalez
Approved revision: 655
Merged at revision: 665
Proposed branch: lp:~facundo/ubuntuone-client/aq-memory-improv-2
Merge into: lp:ubuntuone-client
Diff against target: 2065 lines (+1017/-347)
7 files modified
contrib/testing/testcase.py (+1/-8)
tests/syncdaemon/test_action_queue.py (+482/-40)
tests/syncdaemon/test_sync.py (+347/-17)
ubuntuone/syncdaemon/action_queue.py (+122/-152)
ubuntuone/syncdaemon/event_queue.py (+2/-2)
ubuntuone/syncdaemon/sync.py (+26/-26)
ubuntuone/syncdaemon/u1fsfsm.py (+37/-102)
To merge this branch: bzr merge lp:~facundo/ubuntuone-client/aq-memory-improv-2
Reviewer Review Type Date Requested Status
Guillermo Gonzalez Approve
Lucio Torre (community) Approve
Review via email: mp+34101@code.launchpad.net

Commit message

Memory improvements in the AQ queues, markers revisited.

Description of the change

Memory improvements in the AQ queues, markers revisited.

A simple one:

- Stop storing a wrapped function in ZipQueue.zip.

And a complicated one:

- Stop storing for ever the markers in DeferredMap.

To achieve this last one improvement, several changes were made, but basically now the responsible of releasing the marker is Sync, and it does it at the same time than setting the node_id in FSM... so now the marker can be removed from the map, as it will not be needed any more.

This implies that the commands must get the marker deferred when they're queued, so I also changed that.

I'll do another branch after this one to fix some method names in the commands ('start', that is actually 'queue', and 'start_unqueued'), and to simplify how the markers are retrieved and demarked in ACQ. This branch is large enough (2k lines, but +1100 of those are tests, and ~300 of spreadsheet).

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) wrote :

nice work

review: Approve
Revision history for this message
Guillermo Gonzalez (verterok) wrote :

nice

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'contrib/testing/testcase.py'
2--- contrib/testing/testcase.py 2010-08-25 12:58:01 +0000
3+++ contrib/testing/testcase.py 2010-08-30 17:41:19 +0000
4@@ -120,14 +120,7 @@
5 self.eq = self.event_queue = eq
6 self.uploading = {}
7 self.downloading = {}
8- # pylint: disable-msg=C0103
9- class UUID_Map(object):
10- """mock uuid map"""
11- def set(self, *args):
12- """mock set method"""
13- pass
14-
15- self.uuid_map = UUID_Map()
16+ self.uuid_map = action_queue.DeferredMap()
17 self.content_queue = action_queue.ContentQueue('CONTENT_QUEUE', self)
18 self.meta_queue = action_queue.RequestQueue('META_QUEUE', self)
19
20
21=== modified file 'tests/syncdaemon/test_action_queue.py'
22--- tests/syncdaemon/test_action_queue.py 2010-08-23 19:52:04 +0000
23+++ tests/syncdaemon/test_action_queue.py 2010-08-30 17:41:19 +0000
24@@ -52,7 +52,7 @@
25 NoisyRequestQueue, RequestQueue, UploadProgressWrapper, Upload,
26 CreateShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,
27 TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir,
28- DeltaList,
29+ DeltaList, ZipQueue, DeferredMap, UniqueRequestQueue
30 )
31 from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
32 from ubuntuone.syncdaemon.marker import MDMarker
33@@ -90,6 +90,10 @@
34 executed = False
35 when_executing = 'ok'
36
37+ def __init__(self, share_id=None, node_id=None):
38+ self.share_id = share_id
39+ self.node_id = node_id
40+
41 def run(self):
42 """Run that just succeeds."""
43 self.executed = True
44@@ -234,23 +238,20 @@
45 self.action_queue.upload('foo', 'bar', 0, 0, 0, 0, 0)
46 self.assertEqual(len(self.action_queue.content_queue.waiting), 1)
47
48+ @defer.inlineCallbacks
49 def test_content_queue_has_only_one_op_per_node_even_with_markers(self):
50 """ Check that the content queue uniquifies operations per node.
51
52 Even when some of the operations were added using markers.
53 """
54- self.action_queue.download('foo', 'bar', 0, 0)
55+ m1 = MDMarker('foo')
56+ m2 = MDMarker('bar')
57+ yield self.action_queue.download(m1, m2, 0, 0)
58 self.action_queue.uuid_map.set('foo', 'feh')
59 self.action_queue.uuid_map.set('bar', 'bah')
60 self.action_queue.upload('feh', 'bah', 0, 0, 0, 0, 0)
61 self.assertEqual(len(self.action_queue.content_queue.waiting), 1)
62
63- def test_aq_resolve_uuid_maybe(self):
64- """Check action_queue.resolve_uuid_maybe does what it's supposed to."""
65- self.assertEqual(self.action_queue.resolve_uuid_maybe('foo'), 'foo')
66- self.action_queue.uuid_map.set('foo', 'feh')
67- self.assertEqual(self.action_queue.resolve_uuid_maybe('foo'), 'feh')
68-
69 @defer.inlineCallbacks
70 def test_get_root_and_demark(self):
71 """Get the received Root and demark its mdid."""
72@@ -614,6 +615,216 @@
73 ])
74
75
76+class TestDeferredMap(TwistedTestCase):
77+ """Test the deferred map."""
78+
79+ def setUp(self):
80+ """Set up."""
81+ self.dm = DeferredMap()
82+
83+ def test_one_get_returns_stored_deferred(self):
84+ """Get will return the stored deferred."""
85+ d = self.dm.get('foo')
86+ self.assertEqual(self.dm.waiting, {'foo': [d]})
87+
88+ def test_two_gets_returns_second_deferred_other_key(self):
89+ """A second get for other key will return other deferred."""
90+ d1 = self.dm.get('foo')
91+ d2 = self.dm.get('bar')
92+ self.assertEqual(self.dm.waiting, {'foo': [d1], 'bar': [d2]})
93+
94+ def test_two_gets_returns_second_deferred_same_key(self):
95+ """A second get for the same key will return other deferred."""
96+ d1 = self.dm.get('foo')
97+ d2 = self.dm.get('foo')
98+ self.assertEqual(self.dm.waiting, {'foo': [d1, d2]})
99+
100+ def test_mixed_gets(self):
101+ """Several gets with different keys."""
102+ d1 = self.dm.get('foo')
103+ d2 = self.dm.get('bar')
104+ d3 = self.dm.get('foo')
105+ d4 = self.dm.get('baz')
106+ self.assertEqual(self.dm.waiting,
107+ {'foo': [d1, d3], 'bar': [d2], 'baz': [d4]})
108+
109+ def test_set_to_nothing(self):
110+ """It's ok to set a key that is not being waited."""
111+ self.dm.set('not there', 'value')
112+
113+ @defer.inlineCallbacks
114+ def test_set_fires_deferred_single(self):
115+ """The set fires the unique waiting deferred with the value."""
116+ d1 = self.dm.get('foo')
117+ d2 = self.dm.get('bar')
118+ d3 = self.dm.get('foo')
119+ self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
120+
121+ self.dm.set('bar', 'value')
122+ res = yield d2
123+ self.assertEqual(res, 'value')
124+ self.assertEqual(self.dm.waiting, {'foo': [d1, d3]})
125+
126+ @defer.inlineCallbacks
127+ def test_set_fires_deferred_multiple(self):
128+ """The set fires the multiple waiting deferreds with the value."""
129+ d1 = self.dm.get('foo')
130+ d2 = self.dm.get('bar')
131+ d3 = self.dm.get('foo')
132+ self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
133+
134+ self.dm.set('foo', 'value')
135+ res1 = yield d1
136+ res2 = yield d3
137+ self.assertEqual(res1, 'value')
138+ self.assertEqual(res2, 'value')
139+ self.assertEqual(self.dm.waiting, {'bar': [d2]})
140+
141+ def test_err_to_nothing(self):
142+ """It's ok to err a key that is not being waited."""
143+ self.dm.err('not there', 'failure')
144+
145+ @defer.inlineCallbacks
146+ def test_err_fires_deferred_single(self):
147+ """The set fires the unique waiting deferred with the failure."""
148+ d1 = self.dm.get('foo')
149+ d2 = self.dm.get('bar')
150+ d3 = self.dm.get('foo')
151+ self.assertEqual(self.dm.waiting, {'foo': [d1, d3], 'bar': [d2]})
152+
153+ exc = Exception('problem!')
154+ self.dm.err('bar', Failure(exc))
155+ try:
156+ yield d2
157+ except Exception, e:
158+ self.assertEqual(e, exc)
159+ else:
160+ self.fail("It didn't fired the deferred with a failure!")
161+ self.assertEqual(self.dm.waiting, {'foo': [d1, d3]})
162+
163+
164+
165+class TestUniqueRequestQueue(TwistedTestCase):
166+ """Tests for the UniqueRequestQueue."""
167+
168+ def setUp(self):
169+ """Set up."""
170+
171+ class FakeAQ(object):
172+ """Fake AQ."""
173+ event_queue = self.eq = FakedEventQueue()
174+
175+ self.urq = UniqueRequestQueue(name='META_QUEUE', action_queue=FakeAQ())
176+
177+ # add a Memento handler to the logger
178+ self.handler = MementoHandler()
179+ self.handler.setLevel(logging.DEBUG)
180+ self._logger = logging.getLogger('ubuntuone.SyncDaemon')
181+ self._logger.addHandler(self.handler)
182+
183+ def tearDown(self):
184+ """Tear down."""
185+ self._logger.removeHandler(self.handler)
186+ self.eq.shutdown()
187+
188+ def test_queue_one_ok(self):
189+ """First command is queued."""
190+ cmd = FakeCommand('a', 'b')
191+ self.urq.queue(cmd)
192+ self.assertEqual(list(self.urq.waiting), [cmd])
193+
194+ def test_queue_second_ok_if_different(self):
195+ """Second command is queued if different than first."""
196+ cmd1 = FakeCommand('a', 'b')
197+ cmd2 = FakeCommand('a', 'c')
198+ self.urq.queue(cmd1)
199+ self.urq.queue(cmd2)
200+ self.assertEqual(list(self.urq.waiting), [cmd1, cmd2])
201+
202+ def test_queue_second_equal_first_removes(self):
203+ """Second command removes the first one if equal."""
204+ cmd1 = FakeCommand('a', 'b')
205+ cmd2 = FakeCommand('a', 'b')
206+ self.urq.queue(cmd1)
207+ self.urq.queue(cmd2)
208+ self.assertEqual(list(self.urq.waiting), [cmd2])
209+ self.assertTrue(self.handler.check_debug("Request removed"))
210+
211+ def test_queue_complex(self):
212+ """A more complex scenario."""
213+ cmd1 = FakeCommand('a', 'b')
214+ cmd2 = FakeCommand('a', 'c')
215+ cmd3 = FakeCommand('a', 'd')
216+ cmd4 = FakeCommand('a', 'e')
217+ cmd5 = FakeCommand('a', 'd') # same than cmd3
218+ for cmd in cmd1, cmd2, cmd3, cmd4, cmd5:
219+ self.urq.queue(cmd)
220+ self.assertEqual(list(self.urq.waiting), [cmd1, cmd2, cmd4, cmd5])
221+
222+
223+class TestZipQueue(TwistedTestCase):
224+ """Test the zipping queue."""
225+
226+ def setUp(self):
227+ """Set up."""
228+ self.zq = ZipQueue()
229+
230+ @defer.inlineCallbacks
231+ def test_zip_calls_compress_in_thread(self):
232+ """Test that self._compress is called in another thread."""
233+ called = []
234+ def fake_compress(deferred, upload):
235+ """Fake the _compress method."""
236+ self.assertEqual(upload, 'foo')
237+ called.append(True)
238+ deferred.callback(True)
239+
240+ self.zq._compress = fake_compress
241+ yield self.zq.zip('foo')
242+ self.assertTrue(called)
243+
244+ @defer.inlineCallbacks
245+ def test_zip_acquire_lock(self):
246+ """Test that it acquires the lock."""
247+ called = []
248+ self.zq._compress = lambda deferred, upload: deferred.callback(True)
249+
250+ def fake_acquire():
251+ """Fake the acquire method."""
252+ self.zq.tokens = 1
253+ called.append(True)
254+ return defer.succeed(True)
255+
256+ self.zq.acquire = fake_acquire
257+ yield self.zq.zip('foo')
258+ self.assertTrue(called)
259+
260+ @defer.inlineCallbacks
261+ def test_zip_release_lock_ok(self):
262+ """Test that it releases the lock when all ok."""
263+ called = []
264+ self.zq._compress = lambda deferred, upload: deferred.callback(True)
265+ self.zq.release = lambda: called.append(True)
266+
267+ yield self.zq.zip('foo')
268+ self.assertTrue(called)
269+
270+ @defer.inlineCallbacks
271+ def test_zip_release_lock_error(self):
272+ """Test that it releases the lock even on error."""
273+ called = []
274+ exc = Exception('bad')
275+ self.zq._compress = lambda deferred, upload: deferred.errback(exc)
276+ self.zq.release = lambda: called.append(True)
277+
278+ try:
279+ yield self.zq.zip('foo')
280+ except Exception, e:
281+ # need to silent the exception we're generating in the test
282+ self.assertEqual(e, exc)
283+ self.assertTrue(called)
284+
285+
286 class FactoryBaseTestCase(BasicTestCase):
287 """Helper for by-pass Twisted."""
288
289@@ -1067,9 +1278,13 @@
290 b = 'foo'
291 c = u'año'
292
293- request_queue = RequestQueue(name='fu', action_queue=self.action_queue)
294+ def _run(self):
295+ return defer.succeed(True)
296+
297+ request_queue = RequestQueue(name='META_QUEUE',
298+ action_queue=self.action_queue)
299 self.cmd = MyCommand(request_queue)
300- self.cmd.start_unqueued() # just for it to create its logger
301+ self.cmd.log = self.cmd.make_logger()
302
303 return res
304
305@@ -1097,66 +1312,49 @@
306 result = yield d
307 self.assertEqual(result, ['node_id'])
308 self.assertTrue(self.handler.check_debug(
309- "waiting for the real value of 'foo'"))
310+ "waiting for the real value of marker:foo"))
311
312 @defer.inlineCallbacks
313 def test_demark_with_marker_ready(self):
314 """Test demark with a marker that had data."""
315 marker = MDMarker('foo')
316+ d = self.cmd.demark(marker)
317 self.action_queue.uuid_map.set(marker, 'node_id')
318- result = yield self.cmd.demark(marker)
319+ result = yield d
320 self.assertEqual(result, ['node_id'])
321 self.assertTrue(self.handler.check_debug(
322- "waiting for the real value of 'foo'"))
323+ "waiting for the real value of marker:foo"))
324
325 @defer.inlineCallbacks
326 def test_demark_mixed_markers(self):
327 """Test demark with both a marker and not."""
328 # call demark with both
329 marker = MDMarker('foo')
330+ d = self.cmd.demark('notamarker', marker)
331 self.action_queue.uuid_map.set(marker, 'node_id')
332- result = yield self.cmd.demark('notamarker', marker)
333+ result = yield d
334
335 # check
336 self.assertEqual(result, ['notamarker', 'node_id'])
337 self.assertTrue(self.handler.check_debug(
338- "waiting for the real value of 'foo'"))
339+ "waiting for the real value of marker:foo"))
340 self.assertFalse(self.handler.check_debug(
341 "waiting for the real value of 'notamarker'"))
342
343 @defer.inlineCallbacks
344- def test_demark_marker_ready_got_ok(self):
345- """Test demark getting a marker triggered ok now."""
346- marker = MDMarker('foo')
347- self.action_queue.uuid_map.set(marker, 'node_id')
348- result = yield self.cmd.demark(marker)
349- self.assertEqual(result, ['node_id'])
350- self.assertTrue(self.handler.check_debug("got 'foo'"))
351-
352- @defer.inlineCallbacks
353 def test_demark_marker_future_got_ok(self):
354 """Test demark getting a marker triggered ok later."""
355 # don't have the info now
356 marker = MDMarker('foo')
357 d = self.cmd.demark(marker)
358- self.assertFalse(self.handler.check_debug("got 'foo'"))
359+ self.assertFalse(self.handler.check_debug("for marker:foo"))
360
361 # set and check
362 self.action_queue.uuid_map.set(marker, 'node_id')
363 result = yield d
364 self.assertEqual(result, ['node_id'])
365- self.assertTrue(self.handler.check_debug("got 'foo'"))
366-
367- @defer.inlineCallbacks
368- def test_demark_marker_ready_got_failure(self):
369- """Test demark getting a marker triggered with failure now."""
370- marker = MDMarker('foo')
371- self.action_queue.uuid_map.err(marker, Failure(Exception('bad')))
372- try:
373- yield self.cmd.demark(marker)
374- except Exception, e:
375- self.assertEqual(str(e), 'bad')
376- self.assertTrue(self.handler.check_error("failed 'foo'"))
377+ self.assertTrue(self.handler.check_debug(
378+ "for marker:foo got value 'node_id'"))
379
380 @defer.inlineCallbacks
381 def test_demark_marker_future_got_failure(self):
382@@ -1164,7 +1362,7 @@
383 # don't have the info now
384 marker = MDMarker('foo')
385 d = self.cmd.demark(marker)
386- self.assertFalse(self.handler.check_error("failed 'foo'"))
387+ self.assertFalse(self.handler.check_error("failed marker:foo"))
388
389 # set the marker and check
390 self.action_queue.uuid_map.err(marker, Failure(Exception('bad')))
391@@ -1172,8 +1370,113 @@
392 yield d
393 except Exception, e:
394 self.assertEqual(str(e), 'bad')
395- self.assertTrue(self.handler.check_error("failed 'foo'"))
396-
397+ self.assertTrue(self.handler.check_error("failed marker:foo"))
398+
399+ @defer.inlineCallbacks
400+ def test_run_sets_running(self):
401+ """Set the flag."""
402+ assert not self.cmd.running
403+ d = defer.Deferred()
404+ def check():
405+ """Checks."""
406+ self.assertTrue(self.cmd.running)
407+ d.callback(True)
408+
409+ self.cmd._start = check
410+ self.cmd.run()
411+ # wait the deferred to be sure it was checked
412+ yield d
413+
414+ @defer.inlineCallbacks
415+ def test_run_call_start_if_not_started(self):
416+ """Call ._start if it's not started."""
417+ called = []
418+ self.cmd._start = lambda: called.append(True) or defer.succeed(True)
419+ self.cmd.start_done = False
420+ self.cmd.markers_resolved_deferred.callback(True)
421+ yield self.cmd.run()
422+ self.assertTrue(called)
423+ self.assertTrue(self.cmd.start_done)
424+
425+ def test_run_dont_call_start_if_started(self):
426+ """Do not call ._start if it's started."""
427+ called = []
428+ self.cmd._start = lambda: called.append(True) or defer.Deferred()
429+ self.cmd.start_done = True
430+ self.cmd.run()
431+ self.assertFalse(called)
432+
433+ def test_run_waits_markers_dereferencing(self):
434+ """Don't call _ready_to_run until have the markers."""
435+ called = []
436+ self.cmd._ready_to_run = lambda: called.append(True)
437+ self.cmd.run()
438+ self.assertFalse(called)
439+ self.cmd.markers_resolved_deferred.callback(True)
440+ self.assertTrue(called)
441+
442+ def test_run_not_enderrback_if_ok(self):
443+ """If _start is ok, do not call end_errback."""
444+ called = []
445+ self.cmd.end_errback = lambda _: called.append(True)
446+ # default _start will always succeed
447+ self.cmd.run()
448+ self.assertFalse(called)
449+
450+ def test_run_enderrback_if_problem(self):
451+ """If _start fails, call end_errback."""
452+ called = []
453+ self.cmd.end_errback = lambda _: called.append(True)
454+ self.cmd._start = lambda: defer.fail(Exception('foo'))
455+ self.cmd.run()
456+ self.assertTrue(called)
457+
458+ @defer.inlineCallbacks
459+ def test_run_done_running_if_ok(self):
460+ """If _start is ok, done running."""
461+ # default _start will always succeed
462+ self.cmd.markers_resolved_deferred.callback(True)
463+ yield self.cmd.run()
464+ self.assertFalse(self.cmd.running)
465+
466+ def test_run_done_running_if_problem(self):
467+ """If _start fails, done running."""
468+ self.cmd.end_errback = lambda _: None # consume the failure
469+ self.cmd._start = lambda: defer.fail(Exception('foo'))
470+ self.cmd.run()
471+ self.assertFalse(self.cmd.running)
472+
473+ def test_start_calls_unqueued(self):
474+ """Do the pre-queue when queued."""
475+ called = []
476+ self.cmd.start_unqueued = lambda: called.append(True)
477+ self.cmd.start()
478+ self.assertTrue(called)
479+
480+ def test_unqueued_calls_demark(self):
481+ """Pre-queue implies dereferencing markers."""
482+ called = []
483+ self.cmd._get_possible_markers = lambda: called.append(True) or ()
484+ self.cmd.start_unqueued()
485+ self.assertTrue(called)
486+
487+ @defer.inlineCallbacks
488+ def test_start_default(self):
489+ """Default _start just returns a triggered deferred and sets done."""
490+ yield self.cmd._start()
491+
492+ @defer.inlineCallbacks
493+ def test_get_possible_markers_default(self):
494+ """Non-overwritten method just returns a triggered deferred."""
495+ yield self.cmd._get_possible_markers()
496+
497+ @defer.inlineCallbacks
498+ def test_marker_storing(self):
499+ """Pre-queueing deferres the marker storing."""
500+ d = defer.Deferred()
501+ self.cmd.store_marker_result = lambda _: d.callback(True)
502+ self.cmd.start_unqueued()
503+ yield d
504
505 class CreateUDFTestCase(ConnectedBaseTestCase):
506 """Test for CreateUDF ActionQueueCommand."""
507@@ -1588,6 +1891,17 @@
508 self.assertIn(event, self.command.action_queue.event_queue.events)
509 self.assertTrue(res is None)
510
511+ def test_get_possible_markers(self):
512+ """Test that it returns the correct values."""
513+ res = self.command._get_possible_markers()
514+ self.assertEqual(res, (VOLUME, NODE))
515+
516+ def test_store_marker_result(self):
517+ """Test that it stores the correct values."""
518+ self.command.store_marker_result(('a', 'b'))
519+ self.assertEqual(self.command.share_id, 'a')
520+ self.assertEqual(self.command.node_id, 'b')
521+
522
523 class GetPublicFilesTestCase(ConnectedBaseTestCase):
524 """Tests for GetPublicFiles ActionQueueCommand."""
525@@ -1783,6 +2097,17 @@
526 self.assertEqual(self.command.n_bytes_read_last,
527 5+TRANSFER_PROGRESS_THRESHOLD)
528
529+ def test_get_possible_markers(self):
530+ """Test that it returns the correct values."""
531+ res = self.command._get_possible_markers()
532+ self.assertEqual(res, ('a_share_id', 'a_node_id'))
533+
534+ def test_store_marker_result(self):
535+ """Test that it stores the correct values."""
536+ self.command.store_marker_result(('a', 'b'))
537+ self.assertEqual(self.command.share_id, 'a')
538+ self.assertEqual(self.command.node_id, 'b')
539+
540
541 class UploadUnconnectedTestCase(FactoryBaseTestCase):
542 """Test for Upload ActionQueueCommand, no connection"""
543@@ -1832,6 +2157,7 @@
544 self.command._run()
545 self.assertEqual(self.command.n_bytes_written_last, 0)
546
547+
548 class UploadProgressWrapperTestCase(BaseTwistedTestCase):
549 """Test for the UploadProgressWrapper helper class."""
550
551@@ -2000,6 +2326,17 @@
552 self.assertEqual(self.command.n_bytes_written_last,
553 14+TRANSFER_PROGRESS_THRESHOLD)
554
555+ def test_get_possible_markers(self):
556+ """Test that it returns the correct values."""
557+ res = self.command._get_possible_markers()
558+ self.assertEqual(res, (self.command.share_id, 'a_node_id'))
559+
560+ def test_store_marker_result(self):
561+ """Test that it stores the correct values."""
562+ self.command.store_marker_result(('a', 'b'))
563+ self.assertEqual(self.command.share_id, 'a')
564+ self.assertEqual(self.command.node_id, 'b')
565+
566
567 class CreateShareTestCase(ConnectedBaseTestCase):
568 """Test for CreateShare ActionQueueCommand."""
569@@ -2060,6 +2397,20 @@
570 self.assertEqual('share_name', name)
571 self.assertTrue(read_only)
572
573+ def test_get_possible_markers(self):
574+ """Test that it returns the correct values."""
575+ cmd = CreateShare(self.request_queue, 'node_id', 'shareto@example.com',
576+ 'share_name', 'View', 'marker')
577+ res = cmd._get_possible_markers()
578+ self.assertEqual(res, ('node_id',))
579+
580+ def test_store_marker_result(self):
581+ """Test that it stores the correct values."""
582+ cmd = CreateShare(self.request_queue, 'node_id', 'shareto@example.com',
583+ 'share_name', 'View', 'marker')
584+ cmd.store_marker_result(('a',))
585+ self.assertEqual(cmd.node_id, 'a')
586+
587
588 class RequestQueueManagerTestCase(FactoryBaseTestCase):
589 """Test how RequestQueue manages the queues."""
590@@ -3037,6 +3388,20 @@
591 node_id='node_id', new_generation=13)
592 self.assertEqual(received, ('AQ_UNLINK_OK', (), info))
593
594+ def test_get_possible_markers(self):
595+ """Test that it returns the correct values."""
596+ cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id')
597+ res = cmd._get_possible_markers()
598+ self.assertEqual(res, (VOLUME, 'node_id', 'parent_id'))
599+
600+ def test_store_marker_result(self):
601+ """Test that it stores the correct values."""
602+ cmd = Unlink(self.rq, VOLUME, 'parent_id', 'node_id')
603+ cmd.store_marker_result(('a', 'b', 'c'))
604+ self.assertEqual(cmd.share_id, 'a')
605+ self.assertEqual(cmd.node_id, 'b')
606+ self.assertEqual(cmd.parent_id, 'c')
607+
608
609 class MoveTestCase(ConnectedBaseTestCase):
610 """Test for Move ActionQueueCommand."""
611@@ -3064,6 +3429,21 @@
612 info = dict(share_id=VOLUME, node_id='node', new_generation=13)
613 self.assertEqual(received, ('AQ_MOVE_OK', (), info))
614
615+ def test_get_possible_markers(self):
616+ """Test that it returns the correct values."""
617+ cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name')
618+ res = cmd._get_possible_markers()
619+ self.assertEqual(res, (VOLUME, 'node', 'o_parent', 'n_parent'))
620+
621+ def test_store_marker_result(self):
622+ """Test that it stores the correct values."""
623+ cmd = Move(self.rq, VOLUME, 'node', 'o_parent', 'n_parent', 'n_name')
624+ cmd.store_marker_result(('a', 'b', 'c', 'd'))
625+ self.assertEqual(cmd.share_id, 'a')
626+ self.assertEqual(cmd.node_id, 'b')
627+ self.assertEqual(cmd.old_parent_id, 'c')
628+ self.assertEqual(cmd.new_parent_id, 'd')
629+
630
631 class MakeFileTestCase(ConnectedBaseTestCase):
632 """Test for MakeFile ActionQueueCommand."""
633@@ -3093,6 +3473,37 @@
634 volume_id=VOLUME)
635 self.assertEqual(received, ('AQ_FILE_NEW_OK', (), info))
636
637+ def test_handle_failure_push_event(self):
638+ """Test AQ_FILE_NEW_ERROR is pushed on error."""
639+ # create a request and fill it with succesful information
640+ request = client.MakeFile(self.action_queue.client, VOLUME,
641+ 'parent', 'name')
642+ request.new_id = 'new_id'
643+ request.new_generation = 13
644+
645+ # create a command and trigger it fail
646+ cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker')
647+ failure = Failure(Exception('foo'))
648+ cmd.handle_failure(failure)
649+
650+ # check for successful event
651+ received = self.action_queue.event_queue.events[0]
652+ info = dict(marker='marker', failure=failure)
653+ self.assertEqual(received, ('AQ_FILE_NEW_ERROR', (), info))
654+
655+ def test_get_possible_markers(self):
656+ """Test that it returns the correct values."""
657+ cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker')
658+ res = cmd._get_possible_markers()
659+ self.assertEqual(res, (VOLUME, 'parent'))
660+
661+ def test_store_marker_result(self):
662+ """Test that it stores the correct values."""
663+ cmd = MakeFile(self.rq, VOLUME, 'parent', 'name', 'marker')
664+ cmd.store_marker_result(('a', 'b'))
665+ self.assertEqual(cmd.share_id, 'a')
666+ self.assertEqual(cmd.parent_id, 'b')
667+
668
669 class MakeDirTestCase(ConnectedBaseTestCase):
670 """Test for MakeDir ActionQueueCommand."""
671@@ -3122,6 +3533,37 @@
672 volume_id=VOLUME)
673 self.assertEqual(received, ('AQ_DIR_NEW_OK', (), info))
674
675+ def test_handle_failure_push_event(self):
676+ """Test AQ_DIR_NEW_ERROR is pushed on error."""
677+ # create a request and fill it with succesful information
678+ request = client.MakeDir(self.action_queue.client, VOLUME,
679+ 'parent', 'name')
680+ request.new_id = 'new_id'
681+ request.new_generation = 13
682+
683+ # create a command and trigger it fail
684+ cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker')
685+ failure = Failure(Exception('foo'))
686+ cmd.handle_failure(failure)
687+
688+ # check for successful event
689+ received = self.action_queue.event_queue.events[0]
690+ info = dict(marker='marker', failure=failure)
691+ self.assertEqual(received, ('AQ_DIR_NEW_ERROR', (), info))
692+
693+ def test_get_possible_markers(self):
694+ """Test that it returns the correct values."""
695+ cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker')
696+ res = cmd._get_possible_markers()
697+ self.assertEqual(res, (VOLUME, 'parent'))
698+
699+ def test_store_marker_result(self):
700+ """Test that it stores the correct values."""
701+ cmd = MakeDir(self.rq, VOLUME, 'parent', 'name', 'marker')
702+ cmd.store_marker_result(('a', 'b'))
703+ self.assertEqual(cmd.share_id, 'a')
704+ self.assertEqual(cmd.parent_id, 'b')
705+
706
707 class TestDeltaList(unittest.TestCase):
708 """Tests for DeltaList."""
709
710=== modified file 'tests/syncdaemon/test_sync.py'
711--- tests/syncdaemon/test_sync.py 2010-08-10 14:08:40 +0000
712+++ tests/syncdaemon/test_sync.py 2010-08-30 17:41:19 +0000
713@@ -27,6 +27,9 @@
714 import unittest
715 import uuid
716
717+from twisted.internet import defer
718+from twisted.python.failure import Failure
719+
720 from contrib.testing.testcase import (
721 FakeMain,
722 FakeVolumeManager,
723@@ -222,6 +225,12 @@
724 class TestSync(BaseSync):
725 """Test for Sync."""
726
727+ def setUp(self):
728+ """Set up."""
729+ BaseSync.setUp(self)
730+ self.sync = Sync(main=self.main)
731+ self.fsm = self.main.fs
732+
733 def test_deleting_open_files_is_no_cause_for_despair(self):
734 """test_deleting_open_files_is_no_cause_for_despair."""
735 def cb(_):
736@@ -254,7 +263,6 @@
737
738 def test_handle_AQ_DOWNLOAD_DOES_NOT_EXIST(self):
739 """handle_AQ_DOWNLOAD_DOES_NOT_EXIST."""
740- sync = Sync(main=self.main)
741 self.called = False
742
743 def faked_nothing(ssmr, event, params, *args):
744@@ -263,12 +271,11 @@
745 self.patch(SyncStateMachineRunner, 'nothing', faked_nothing)
746
747 kwargs = dict(share_id='share_id', node_id='node_id')
748- sync.handle_AQ_DOWNLOAD_DOES_NOT_EXIST(**kwargs)
749+ self.sync.handle_AQ_DOWNLOAD_DOES_NOT_EXIST(**kwargs)
750 self.assertTrue(self.called, 'nothing was called')
751
752 def test_handle_FILE_CREATE_while_LOCAL(self):
753 """A FS_FILE_CREATE is received with the node in LOCAL."""
754- sync = Sync(main=self.main)
755 self.called = False
756
757 def faked_nothing(ssmr, event, params, *args):
758@@ -277,18 +284,16 @@
759 self.patch(SyncStateMachineRunner, 'nothing', faked_nothing)
760
761 # create a file and put it in local
762- fsm = self.main.fs
763 somepath = os.path.join(self.root, 'somepath')
764- mdid = fsm.create(somepath, '')
765- fsm.set_by_mdid(mdid, local_hash='somehash')
766+ mdid = self.fsm.create(somepath, '')
767+ self.fsm.set_by_mdid(mdid, local_hash='somehash')
768
769 # send the event, and check that it called its .nothing()
770- sync.handle_FS_FILE_CREATE(somepath)
771+ self.sync.handle_FS_FILE_CREATE(somepath)
772 self.assertTrue(self.called)
773
774 def test_SV_HASH_NEW_with_file_uploadinterrupted(self):
775 """A SV_HASH_NEW is received after upload interrupted."""
776- sync = Sync(main=self.main)
777 self.called = False
778
779 def fake_meth(_, event, params, hash):
780@@ -300,17 +305,232 @@
781
782 # create a file and put it in local, without server_hash, as
783 # if the upload was cut in the middle after the make file
784- fsm = self.main.fs
785 somepath = os.path.join(self.root, 'somepath')
786- mdid = fsm.create(somepath, '', node_id='node_id')
787- fsm.set_by_mdid(mdid, local_hash='somehash', crc32='crc32',
788- stat='stat', size='size')
789+ mdid = self.fsm.create(somepath, '', node_id='node_id')
790+ self.fsm.set_by_mdid(mdid, local_hash='somehash', crc32='crc32',
791+ stat='stat', size='size')
792
793- # send the event and check
794- mdobj = fsm.get_by_mdid(mdid)
795- sync._handle_SV_HASH_NEW(mdobj.share_id, mdobj.node_id, '') # no content
796+ # send the event with no content and check
797+ mdobj = self.fsm.get_by_mdid(mdid)
798+ self.sync._handle_SV_HASH_NEW(mdobj.share_id, mdobj.node_id, '')
799 self.assertTrue(self.called)
800
801+ def test_AQ_FILE_NEW_OK_with_md_in_none(self):
802+ """Created the file, and MD says it's in NONE."""
803+ # fake method
804+ called = []
805+ self.patch(SyncStateMachineRunner, 'new_local_file_created',
806+ lambda *a: called.extend(a))
807+
808+ # create the node and set it up
809+ somepath = os.path.join(self.root, 'somepath')
810+ mdid = self.fsm.create(somepath, '', node_id='node_id')
811+ assert self.fsm.changed(mdid=mdid) == 'NONE', 'test badly set up'
812+
813+ # send the event and check args after the ssmr instance
814+ mdobj = self.fsm.get_by_mdid(mdid)
815+ self.sync.handle_AQ_FILE_NEW_OK(mdobj.share_id, mdid, 'new_id', 'gen')
816+ self.assertEqual(called[1:], ['AQ_FILE_NEW_OK', {}, 'new_id', mdid])
817+
818+ def test_AQ_FILE_NEW_OK_with_md_in_local(self):
819+ """Created the file, and MD says it's in LOCAL."""
820+ # fake method
821+ called = []
822+ self.patch(SyncStateMachineRunner, 'new_local_file_created',
823+ lambda *a: called.extend(a))
824+
825+ # create the node and set it up
826+ somepath = os.path.join(self.root, 'somepath')
827+ mdid = self.fsm.create(somepath, '', node_id='node_id')
828+ self.fsm.set_by_mdid(mdid, local_hash='somehash')
829+ assert self.fsm.changed(mdid=mdid) == 'LOCAL', 'test badly set up'
830+
831+ # send the event and check args after the ssmr instance
832+ mdobj = self.fsm.get_by_mdid(mdid)
833+ self.sync.handle_AQ_FILE_NEW_OK(mdobj.share_id, mdid, 'new_id', 'gen')
834+ self.assertEqual(called[1:], ['AQ_FILE_NEW_OK', {}, 'new_id', mdid])
835+
836+ def test_AQ_FILE_NEW_OK_no_md(self):
837+ """Created the file, but MD is no longer there."""
838+ # fake method
839+ called = []
840+ self.patch(SyncStateMachineRunner, 'release_marker_ok',
841+ lambda *a: called.extend(a))
842+
843+ # send the event and check args after the ssmr instance
844+ self.sync.handle_AQ_FILE_NEW_OK('share', 'mrker', 'new_id', 'gen')
845+ self.assertEqual(called[1:], ['AQ_FILE_NEW_OK', {}, 'new_id', 'mrker'])
846+
847+ def test_AQ_FILE_NEW_OK_md_says_dir(self):
848+ """Created the file, but MD says it's now a directory."""
849+ # fake method
850+ called = []
851+ self.patch(SyncStateMachineRunner, 'release_marker_ok',
852+ lambda *a: called.extend(a))
853+
854+ # create the node as a dir
855+ somepath = os.path.join(self.root, 'somepath')
856+ mdid = self.fsm.create(somepath, '', node_id='node_id', is_dir=True)
857+
858+ # send the event and check args after the ssmr instance
859+ mdobj = self.fsm.get_by_mdid(mdid)
860+ self.sync.handle_AQ_FILE_NEW_OK(mdobj.share_id, mdid, 'new_id', 'gen')
861+ self.assertEqual(called[1:], ['AQ_FILE_NEW_OK', {}, 'new_id', mdid])
862+
863+ def test_AQ_DIR_NEW_OK_md_says_file(self):
864+ """Created the dir, but MD says it's now a file."""
865+ # fake method
866+ called = []
867+ self.patch(SyncStateMachineRunner, 'release_marker_ok',
868+ lambda *a: called.extend(a))
869+
870+ # create the node as a file
871+ somepath = os.path.join(self.root, 'somepath')
872+ mdid = self.fsm.create(somepath, '', node_id='node_id')
873+
874+ # send the event and check args after the ssmr instance
875+ mdobj = self.fsm.get_by_mdid(mdid)
876+ self.sync.handle_AQ_DIR_NEW_OK(mdobj.share_id, mdid, 'new_id', 'gen')
877+ self.assertEqual(called[1:], ['AQ_DIR_NEW_OK', {}, 'new_id', mdid])
878+
879+ def test_AQ_DIR_NEW_OK_no_md(self):
880+ """Created the dir, but MD is no longer there."""
881+ # fake method
882+ called = []
883+ self.patch(SyncStateMachineRunner, 'release_marker_ok',
884+ lambda *a: called.extend(a))
885+
886+ # send the event and check args after the ssmr instance
887+ self.sync.handle_AQ_DIR_NEW_OK('share', 'marker', 'new_id', 'gen')
888+ self.assertEqual(called[1:], ['AQ_DIR_NEW_OK', {}, 'new_id', 'marker'])
889+
890+ def test_AQ_DIR_NEW_OK_md_in_NONE(self):
891+ """Created the dir, and MD says it's in NONE."""
892+ # fake method
893+ called = []
894+ self.patch(SyncStateMachineRunner, 'new_local_dir_created',
895+ lambda *a: called.extend(a))
896+
897+ # create the node and set it up
898+ somepath = os.path.join(self.root, 'somepath')
899+ mdid = self.fsm.create(somepath, '', node_id='node_id', is_dir=True)
900+ assert self.fsm.changed(mdid=mdid) == 'NONE', 'test badly set up'
901+
902+ # send the event and check args after the ssmr instance
903+ mdobj = self.fsm.get_by_mdid(mdid)
904+ self.sync.handle_AQ_DIR_NEW_OK(mdobj.share_id, mdid, 'new_id', 'gen')
905+ self.assertEqual(called[1:], ['AQ_DIR_NEW_OK', {}, 'new_id', mdid])
906+
907+ def test_AQ_FILE_NEW_ERROR_no_md(self):
908+ """Error creating the file, MD is no longer there."""
909+ # fake method
910+ called = []
911+ self.patch(SyncStateMachineRunner, 'release_marker_error',
912+ lambda *a: called.extend(a))
913+
914+ # send the event and check args after the ssmr instance
915+ failure = Failure(Exception('foo'))
916+ params = {'not_authorized': 'F', 'not_available': 'F'}
917+ self.sync.handle_AQ_FILE_NEW_ERROR('marker', failure)
918+ self.assertEqual(called[1:],
919+ ['AQ_FILE_NEW_ERROR', params, failure, 'marker'])
920+
921+ def test_AQ_FILE_NEW_ERROR_md_ok(self):
922+ """Error creating the file, MD is ok."""
923+ # fake method
924+ called = []
925+ realf = SyncStateMachineRunner.filedir_error_in_creation
926+ def fake(*args):
927+ """Call the original function, but storing the args."""
928+ called.extend(args)
929+ realf(*args)
930+ SyncStateMachineRunner.filedir_error_in_creation = realf
931+ SyncStateMachineRunner.filedir_error_in_creation = fake
932+
933+ # create the node
934+ somepath = os.path.join(self.root, 'somepath')
935+ mdid = self.fsm.create(somepath, '', node_id='node_id')
936+
937+ # send the event and check args after the ssmr instance
938+ failure = Failure(Exception('foo'))
939+ params = {'not_authorized': 'F', 'not_available': 'F'}
940+ self.sync.handle_AQ_FILE_NEW_ERROR(mdid, failure)
941+ self.assertEqual(called[1:],
942+ ['AQ_FILE_NEW_ERROR', params, failure, mdid])
943+
944+ def test_AQ_FILE_NEW_ERROR_md_says_dir(self):
945+ """Error creating the file, MD says it's now a dir."""
946+ # fake method
947+ called = []
948+ self.patch(SyncStateMachineRunner, 'release_marker_error',
949+ lambda *a: called.extend(a))
950+
951+ # create the node as a dir
952+ somepath = os.path.join(self.root, 'somepath')
953+ self.fsm.create(somepath, '', node_id='node_id', is_dir=True)
954+
955+ # send the event and check args after the ssmr instance
956+ failure = Failure(Exception('foo'))
957+ params = {'not_authorized': 'F', 'not_available': 'F'}
958+ self.sync.handle_AQ_FILE_NEW_ERROR('marker', failure)
959+ self.assertEqual(called[1:],
960+ ['AQ_FILE_NEW_ERROR', params, failure, 'marker'])
961+
962+ def test_AQ_DIR_NEW_ERROR_no_md(self):
963+ """Error creating the dir, MD is no longer there."""
964+ # fake method
965+ called = []
966+ self.patch(SyncStateMachineRunner, 'release_marker_error',
967+ lambda *a: called.extend(a))
968+
969+ # send the event and check args after the ssmr instance
970+ failure = Failure(Exception('foo'))
971+ params = {'not_authorized': 'F', 'not_available': 'F'}
972+ self.sync.handle_AQ_DIR_NEW_ERROR('marker', failure)
973+ self.assertEqual(called[1:],
974+ ['AQ_DIR_NEW_ERROR', params, failure, 'marker'])
975+
976+ def test_AQ_DIR_NEW_ERROR_md_ok(self):
977+ """Error creating the dir, MD is ok."""
978+ # fake method
979+ called = []
980+ realf = SyncStateMachineRunner.filedir_error_in_creation
981+ def fake(*args):
982+ """Call the original function, but storing the args."""
983+ called.extend(args)
984+ realf(*args)
985+ SyncStateMachineRunner.filedir_error_in_creation = realf
986+ SyncStateMachineRunner.filedir_error_in_creation = fake
987+
988+ # create the node
989+ somepath = os.path.join(self.root, 'somepath')
990+ mdid = self.fsm.create(somepath, '', node_id='node_id', is_dir=True)
991+
992+ # send the event and check args after the ssmr instance
993+ failure = Failure(Exception('foo'))
994+ params = {'not_authorized': 'F', 'not_available': 'F'}
995+ self.sync.handle_AQ_DIR_NEW_ERROR(mdid, failure)
996+ self.assertEqual(called[1:],
997+ ['AQ_DIR_NEW_ERROR', params, failure, mdid])
998+
999+ def test_AQ_DIR_NEW_ERROR_md_says_file(self):
1000+ """Error creating the dir, MD says it's now a file."""
1001+ # fake method
1002+ called = []
1003+ self.patch(SyncStateMachineRunner, 'release_marker_error',
1004+ lambda *a: called.extend(a))
1005+
1006+ # create the node as a file
1007+ somepath = os.path.join(self.root, 'somepath')
1008+ self.fsm.create(somepath, '', node_id='node_id')
1009+
1010+ # send the event and check args after the ssmr instance
1011+ failure = Failure(Exception('foo'))
1012+ params = {'not_authorized': 'F', 'not_available': 'F'}
1013+ self.sync.handle_AQ_DIR_NEW_ERROR('marker', failure)
1014+ self.assertEqual(called[1:],
1015+ ['AQ_DIR_NEW_ERROR', params, failure, 'marker'])
1016+
1017
1018 class SyncStateMachineRunnerTestCase(BaseSync):
1019 """Tests for the SyncStateMachineRunner."""
1020@@ -318,16 +538,16 @@
1021 def setUp(self):
1022 """Init."""
1023 BaseSync.setUp(self)
1024+ self.fsm = self.main.fs
1025+ self.aq = self.main.action_q
1026
1027 # create a file
1028- self.fsm = self.main.fs
1029 somepath = os.path.join(self.root, 'somepath')
1030 self.mdid = self.fsm.create(somepath, '', node_id='node_id')
1031
1032 key = FSKey(self.main.fs, share_id='', node_id='node_id')
1033 self.ssmr = SyncStateMachineRunner(fsm=self.main.fs, main=self.main,
1034 key=key, logger=None)
1035- self.root = self.mktemp('root')
1036
1037 # log config
1038 self.handler = MementoHandler()
1039@@ -405,6 +625,116 @@
1040 mdobj = self.fsm.get_by_mdid(self.mdid)
1041 self.assertTrue(mdobj.info.is_partial)
1042
1043+ @defer.inlineCallbacks
1044+ def test_new_local_file_created(self):
1045+ """Set the node_id in FSM, and release ok the marker in DeferredMap."""
1046+ # set up FSM and the DeferredMap
1047+ somepath = os.path.join(self.root, 'foo')
1048+ mdid = self.fsm.create(somepath, '')
1049+ map_d = self.aq.uuid_map.get('marker')
1050+
1051+ # create context and call
1052+ key = FSKey(self.main.fs, path=somepath)
1053+ ssmr = SyncStateMachineRunner(fsm=self.fsm, main=self.main,
1054+ key=key, logger=None)
1055+ ssmr.new_local_file_created('some event', {}, 'new_id', 'marker')
1056+
1057+ # check
1058+ mdobj = self.fsm.get_by_mdid(mdid)
1059+ self.assertEqual(mdobj.node_id, 'new_id')
1060+ result = yield map_d
1061+ self.assertEqual(result, 'new_id')
1062+
1063+ @defer.inlineCallbacks
1064+ def test_new_local_dir_created(self):
1065+ """Set the node_id in FSM, and release ok the marker in DeferredMap."""
1066+ # set up FSM and the DeferredMap
1067+ somepath = os.path.join(self.root, 'foo')
1068+ mdid = self.fsm.create(somepath, '', is_dir=True)
1069+ map_d = self.aq.uuid_map.get('marker')
1070+
1071+ # create context and call
1072+ key = FSKey(self.main.fs, path=somepath)
1073+ ssmr = SyncStateMachineRunner(fsm=self.fsm, main=self.main,
1074+ key=key, logger=None)
1075+ ssmr.new_local_dir_created('some event', {}, 'new_id', 'marker')
1076+
1077+ # check
1078+ mdobj = self.fsm.get_by_mdid(mdid)
1079+ self.assertEqual(mdobj.node_id, 'new_id')
1080+ result = yield map_d
1081+ self.assertEqual(result, 'new_id')
1082+
1083+ @defer.inlineCallbacks
1084+ def test_release_marker_ok(self):
1085+ """Just release the marker ok in DeferredMap."""
1086+ # set up the DeferredMap
1087+ map_d = self.aq.uuid_map.get('marker')
1088+
1089+ # create context and call
1090+ key = FSKey(self.main.fs)
1091+ ssmr = SyncStateMachineRunner(fsm=self.fsm, main=self.main,
1092+ key=key, logger=None)
1093+ ssmr.release_marker_ok('some event', {}, 'new_id', 'marker')
1094+
1095+ # check
1096+ result = yield map_d
1097+ self.assertEqual(result, 'new_id')
1098+
1099+ @defer.inlineCallbacks
1100+ def test_filedir_error_in_creation(self):
1101+ """Conflict and delete metada, and release the marker with error."""
1102+ # set up FSM and the DeferredMap
1103+ somepath = os.path.join(self.root, 'foo')
1104+ mdid = self.fsm.create(somepath, '')
1105+ map_d = self.aq.uuid_map.get('mrker')
1106+
1107+ # patch to control the call to key
1108+ called = []
1109+ self.fsm.move_to_conflict = lambda m: called.append(m)
1110+ self.fsm.delete_metadata = lambda p: called.append(p)
1111+
1112+ # create context and call
1113+ key = FSKey(self.main.fs, path=somepath)
1114+ ssmr = SyncStateMachineRunner(fsm=self.fsm, main=self.main,
1115+ key=key, logger=None)
1116+ exc = Exception('foo')
1117+ ssmr.filedir_error_in_creation('some event', {}, Failure(exc), 'mrker')
1118+
1119+ # check
1120+ self.assertEqual(called, [mdid, somepath])
1121+ try:
1122+ yield map_d
1123+ except Exception, e:
1124+ # silence the received exception
1125+ self.assertEqual(e, exc)
1126+ else:
1127+ # no exception? fail!!
1128+ self.fail("The marker was released without failure!")
1129+
1130+ @defer.inlineCallbacks
1131+ def test_release_marker_error(self):
1132+ """Just release the marker with failure in DeferredMap."""
1133+ # set up the DeferredMap
1134+ map_d = self.aq.uuid_map.get('mrker')
1135+
1136+ # create context and call
1137+ key = FSKey(self.main.fs)
1138+ ssmr = SyncStateMachineRunner(fsm=self.fsm, main=self.main,
1139+ key=key, logger=None)
1140+ exc = Exception('foo')
1141+ ssmr.release_marker_error('some event', {}, Failure(exc), 'mrker')
1142+
1143+ # check
1144+ try:
1145+ yield map_d
1146+ except Exception, e:
1147+ # silence the received exception
1148+ self.assertEqual(e, exc)
1149+ else:
1150+ # no exception? fail!!
1151+ self.fail("The marker was released without failure!")
1152+
1153
1154 class FakedState(object):
1155 """A faked state."""
1156
1157=== modified file 'ubuntuone/syncdaemon/action_queue.py'
1158--- ubuntuone/syncdaemon/action_queue.py 2010-08-23 19:52:04 +0000
1159+++ ubuntuone/syncdaemon/action_queue.py 2010-08-30 17:41:19 +0000
1160@@ -209,9 +209,7 @@
1161 self.tokens = self.limit = 10
1162
1163 def acquire(self):
1164- """
1165- return a deferred which fires on token acquisition.
1166- """
1167+ """Return a deferred which fires on token acquisition."""
1168 assert self.tokens >= 0, "Tokens should never be negative"
1169 d = defer.Deferred()
1170 if not self.tokens:
1171@@ -222,8 +220,7 @@
1172 return d
1173
1174 def release(self):
1175- """
1176- Release the token.
1177+ """Release the token.
1178
1179 Should be called by whoever did the acquire() when the shared
1180 resource is free.
1181@@ -283,18 +280,20 @@
1182 else:
1183 reactor.callFromThread(deferred.callback, True)
1184
1185+ @defer.inlineCallbacks
1186 def zip(self, upload):
1187- """
1188- Acquire, do the compression in a thread, release.
1189- """
1190- d_zip = defer.Deferred()
1191- d_lck = self.acquire()
1192- d_lck.addCallback(
1193- lambda _: reactor.callInThread(self._compress,
1194- d_zip, upload) or d_zip)
1195- d_lck.addBoth(passit(lambda _: self.release()))
1196-
1197- return d_lck
1198+ """Acquire, do the compression in a thread, release."""
1199+ deferred = defer.Deferred()
1200+
1201+ yield self.acquire()
1202+ try:
1203+ reactor.callInThread(self._compress, deferred, upload)
1204+ finally:
1205+ self.release()
1206+
1207+ # let's wait _compress to finish
1208+ yield deferred
1209+
1210
1211 class RequestQueue(object):
1212 """
1213@@ -442,23 +441,23 @@
1214
1215
1216 class UniqueRequestQueue(RequestQueue):
1217- """A RequestQueue.
1218+ """A unique RequestQueue.
1219
1220 It only ever queues one command for each (share_id, node_id) pair.
1221-
1222 """
1223
1224 def queue(self, command, on_top=False):
1225 """Add a command to the queue.
1226
1227- If there are commands in the queue for the
1228- same node, they will be dropped on the floor and laughed at.
1229+ If there is a command in the queue for the same node, it (the old
1230+ one) will be removed, leaving queued only the new one.
1231 """
1232- for wc in iter(self.waiting):
1233- wc_share = self.action_queue.resolve_uuid_maybe(wc.share_id)
1234- wc_node = self.action_queue.resolve_uuid_maybe(wc.node_id)
1235- if wc_share == command.share_id and wc_node == command.node_id:
1236+ for wc in self.waiting:
1237+ if wc.share_id == command.share_id and \
1238+ wc.node_id == command.node_id:
1239 self.waiting.remove(wc)
1240+ m = "Request removed (%s), queing other (%s) for same node."
1241+ logger.debug(m, wc, command)
1242 break
1243 return super(UniqueRequestQueue, self).queue(command, on_top)
1244
1245@@ -506,31 +505,16 @@
1246
1247
1248 class DeferredMap(object):
1249- """A mapping of deferred values. Or a deferred map of values.
1250-
1251- Or a mapping that returns deferreds and then fires them when it has
1252- the value.
1253-
1254+ """A mapping of deferred values.
1255+
1256+ Return deferreds for a key that are fired (succesfully or not) later.
1257 """
1258
1259 def __init__(self):
1260 self.waiting = defaultdict(list)
1261- self.failed = {}
1262- self.map = {}
1263
1264 def get(self, key):
1265- """Get the value for the given key.
1266-
1267- This always returns a deferred; when we already know the value
1268- we return a `succeed`, and if we don't know the value because
1269- it failed we return a `fail`; otherwise we return a plain
1270- unfired `Deferred`, and add it to the list of deferreds to
1271- call when we actually get the value.
1272- """
1273- if key in self.map:
1274- return defer.succeed(self.map[key])
1275- if key in self.failed:
1276- return defer.fail(Exception(self.failed[key]))
1277+ """Return a deferred for the given key."""
1278 d = defer.Deferred()
1279 self.waiting[key].append(d)
1280 return d
1281@@ -538,31 +522,22 @@
1282 def set(self, key, value):
1283 """We've got the value for a key!
1284
1285- Write it down in the map, and fire the waiting deferreds.
1286+ If it was waited, fire the waiting deferreds and remove the key.
1287 """
1288- if key not in self.map:
1289- self.map[key] = value
1290- for d in self.waiting.pop(key, ()):
1291+ if key in self.waiting:
1292+ deferreds = self.waiting.pop(key)
1293+ for d in deferreds:
1294 d.callback(value)
1295- elif self.map[key] != value:
1296- if key in self.map:
1297- raise KeyError("key is taken -- dunno what to do")
1298
1299 def err(self, key, failure):
1300 """Something went terribly wrong in the process of getting a value.
1301
1302- Break the news to the waiting deferreds.
1303- """
1304- self.failed[key] = failure.getErrorMessage()
1305- for d in self.waiting.pop(key, ()):
1306- d.errback(failure)
1307-
1308- def resolve_maybe(self, key):
1309- """Return either the mapping of key, or key itself.
1310-
1311- The former if key has been resolved, the later if not.
1312- """
1313- return self.map.get(key, key)
1314+ Break the news to the waiting deferreds and remove the key.
1315+ """
1316+ if key in self.waiting:
1317+ deferreds = self.waiting.pop(key)
1318+ for d in deferreds:
1319+ d.errback(failure)
1320
1321
1322 class UploadProgressWrapper(object):
1323@@ -1001,23 +976,17 @@
1324 defer.returnValue(result.volumes)
1325
1326 def make_file(self, share_id, parent_id, name, marker):
1327- """
1328- See .interfaces.IMetaQueue
1329- """
1330+ """See .interfaces.IMetaQueue."""
1331 return MakeFile(self.meta_queue, share_id, parent_id,
1332 name, marker).start()
1333
1334 def make_dir(self, share_id, parent_id, name, marker):
1335- """
1336- See .interfaces.IMetaQueue
1337- """
1338+ """See .interfaces.IMetaQueue."""
1339 return MakeDir(self.meta_queue, share_id, parent_id,
1340 name, marker).start()
1341
1342 def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
1343- """
1344- See .interfaces.IMetaQueue
1345- """
1346+ """See .interfaces.IMetaQueue."""
1347 return Move(self.meta_queue, share_id, node_id, old_parent_id,
1348 new_parent_id, new_name).start()
1349
1350@@ -1147,14 +1116,6 @@
1351 """True if a Move is queued for that node."""
1352 return self.meta_queue.node_is_queued(Move, share_id, node_id)
1353
1354- def resolve_uuid_maybe(self, marker_maybe):
1355- """Resolve something tha may be a marker.
1356-
1357- Resolve the maybe_marker if it is a marker and has been
1358- resolved. Otherwise just return the marker_maybe back again.
1359- """
1360- return self.uuid_map.resolve_maybe(marker_maybe)
1361-
1362 def get_delta(self, volume_id, generation):
1363 """Get a delta from generation for the volume.
1364
1365@@ -1205,7 +1166,8 @@
1366
1367 logged_attrs = ()
1368
1369- __slots__ = ('_queue', 'start_done', 'running', 'log')
1370+ __slots__ = ('_queue', 'start_done', 'running', 'log',
1371+ 'markers_resolved_deferred')
1372
1373 def __init__(self, request_queue):
1374 """Initialize a command instance."""
1375@@ -1213,6 +1175,7 @@
1376 self.start_done = False
1377 self.running = False
1378 self.log = None
1379+ self.markers_resolved_deferred = defer.Deferred()
1380
1381 def to_dict(self):
1382 """Dump logged attributes to a dict."""
1383@@ -1240,20 +1203,32 @@
1384 @defer.inlineCallbacks
1385 def demark(self, *maybe_markers):
1386 """Arrange to have maybe_markers realized."""
1387- results = []
1388+ # we need to issue all the DeferredMap.get's right now, to be
1389+ # dereferenced later
1390+ waiting_structure = []
1391 for marker in maybe_markers:
1392 if IMarker.providedBy(marker):
1393- self.log.debug("waiting for the real value of '%s'", marker)
1394+ self.log.debug("waiting for the real value of %r", marker)
1395+ d = self.action_queue.uuid_map.get(marker)
1396+ waiting_structure.append((marker, True, d))
1397+ else:
1398+ waiting_structure.append((marker, False, None))
1399+
1400+ # now, we wait for all the dereferencings...
1401+ results = []
1402+ for (marker, need_to_wait, deferred) in waiting_structure:
1403+ if need_to_wait:
1404 try:
1405- result = yield self.action_queue.uuid_map.get(marker)
1406+ value = yield deferred
1407 except:
1408- self.log.error("failed '%s'", marker)
1409+ self.log.error("failed %r", marker)
1410 raise
1411 else:
1412- self.log.debug("got '%s'", marker)
1413- results.append(result)
1414+ self.log.debug("for %r got value %r", marker, value)
1415+ results.append(value)
1416 else:
1417 results.append(marker)
1418+
1419 defer.returnValue(results)
1420
1421 def end_callback(self, arg):
1422@@ -1280,9 +1255,18 @@
1423 else:
1424 return self.handle_failure(failure)
1425
1426+ @defer.inlineCallbacks
1427 def start_unqueued(self):
1428 """Do basic pre-start setup."""
1429 self.log = self.make_logger()
1430+ maybe_markers = self._get_possible_markers()
1431+ result = yield self.demark(*maybe_markers)
1432+ self.store_marker_result(result)
1433+ self.markers_resolved_deferred.callback(True)
1434+
1435+ def _get_possible_markers(self):
1436+ """Nothing if not overwritten."""
1437+ return ()
1438
1439 def start(self, _=None):
1440 """Queue the command."""
1441@@ -1306,25 +1290,26 @@
1442 def store_marker_result(self, _):
1443 """Called when all the markers are realized."""
1444
1445+ @defer.inlineCallbacks
1446 def run(self):
1447 """Do the deed."""
1448 self.running = True
1449- if self.start_done:
1450- self.log.debug('retrying')
1451- d = defer.succeed(None)
1452+ try:
1453+ if self.start_done:
1454+ self.log.debug('retrying')
1455+ else:
1456+ self.log.debug('starting')
1457+ yield self._start()
1458+ self.start_done = True
1459+ except Exception, e:
1460+ yield self.end_errback(Failure(e))
1461 else:
1462- self.log.debug('starting')
1463- d = self._start()
1464- d.addCallback(self.store_marker_result)
1465- d.addCallbacks(self._ready_to_run, self.end_errback)
1466- d.addBoth(self._done_running)
1467- return d
1468-
1469- def _done_running(self, x):
1470- self.running = False
1471- return x
1472-
1473- def _ready_to_run(self, _):
1474+ yield self.markers_resolved_deferred
1475+ yield self._ready_to_run()
1476+ finally:
1477+ self.running = False
1478+
1479+ def _ready_to_run(self):
1480 self.log.debug('running')
1481 if self.running:
1482 d = self._run()
1483@@ -1426,25 +1411,22 @@
1484 self.name = name.decode("utf8")
1485 self.marker = marker
1486
1487- def _start(self):
1488- """Do the specialized pre-run setup."""
1489- return self.demark(self.share_id, self.parent_id)
1490+ def _get_possible_markers(self):
1491+ """Return the possible markers."""
1492+ return self.share_id, self.parent_id
1493
1494 def store_marker_result(self, (share_id, parent_id)):
1495 """Called when all the markers are realized."""
1496 self.share_id = share_id
1497 self.parent_id = parent_id
1498- self.start_done = True
1499
1500 def _run(self):
1501 """Do the actual running."""
1502 maker = getattr(self.action_queue.client, self.client_method)
1503- return maker(self.share_id,
1504- self.parent_id,
1505- self.name)
1506+ return maker(self.share_id, self.parent_id, self.name)
1507
1508 def handle_success(self, request):
1509- """It worked! Push the event, and update the uuid map."""
1510+ """It worked! Push the event."""
1511 # note that we're not getting the new name from the answer
1512 # message, if we would get it, we would have another Unicode
1513 # boundary with it
1514@@ -1452,20 +1434,13 @@
1515 new_generation=request.new_generation,
1516 volume_id=self.share_id)
1517 self.action_queue.event_queue.push(self.ok_event_name, **d)
1518-
1519- # release the marker
1520- if IMarker.providedBy(self.marker):
1521- self.action_queue.uuid_map.set(self.marker, request.new_id)
1522 return request
1523
1524 def handle_failure(self, failure):
1525- """It didn't work! Push the event, and update the uuid map."""
1526+ """It didn't work! Push the event."""
1527 self.action_queue.event_queue.push(self.error_event_name,
1528 marker=self.marker,
1529- error=failure.getErrorMessage())
1530- if IMarker.providedBy(self.marker):
1531- self.action_queue.uuid_map.err(self.marker,
1532- failure)
1533+ failure=failure)
1534
1535
1536 class MakeFile(MakeThing):
1537@@ -1501,16 +1476,18 @@
1538 # here we use bytes for paths
1539 self.new_name = new_name.decode("utf8")
1540
1541- def _start(self):
1542- """Do the specialized pre-run setup."""
1543- return self.demark(self.share_id, self.node_id, self.new_parent_id)
1544+ def _get_possible_markers(self):
1545+ """Return the possible markers."""
1546+ return (self.share_id, self.node_id,
1547+ self.old_parent_id, self.new_parent_id)
1548
1549- def store_marker_result(self, (share_id, node_id, new_parent_id)):
1550+ def store_marker_result(self, (share_id, node_id,
1551+ old_parent_id, new_parent_id)):
1552 """Called when all the markers are realized."""
1553 self.share_id = share_id
1554 self.node_id = node_id
1555+ self.old_parent_id = old_parent_id
1556 self.new_parent_id = new_parent_id
1557- self.start_done = True
1558
1559 def _run(self):
1560 """Do the actual running."""
1561@@ -1550,16 +1527,15 @@
1562 self.node_id = node_id
1563 self.parent_id = parent_id
1564
1565- def _start(self):
1566- """Do the specialized pre-run setup."""
1567- return self.demark(self.share_id, self.node_id, self.parent_id)
1568+ def _get_possible_markers(self):
1569+ """Return the possible markers."""
1570+ return self.share_id, self.node_id, self.parent_id
1571
1572 def store_marker_result(self, (share_id, node_id, parent_id)):
1573 """Called when all the markers are realized."""
1574 self.share_id = share_id
1575 self.node_id = node_id
1576 self.parent_id = parent_id
1577- self.start_done = True
1578
1579 def _run(self):
1580 """Do the actual running."""
1581@@ -1707,17 +1683,12 @@
1582 self.use_http = True
1583
1584 def store_marker_result(self, (node_id,)):
1585- """
1586- Called when all the markers are realized.
1587- """
1588+ """Called when all the markers are realized."""
1589 self.node_id = node_id
1590- self.start_done = True
1591
1592- def _start(self):
1593- """
1594- Do the specialized pre-run setup
1595- """
1596- return self.demark(self.node_id)
1597+ def _get_possible_markers(self):
1598+ """Return the possible markers."""
1599+ return self.node_id,
1600
1601 def _create_share_http(self, node_id, user, name, read_only, deferred):
1602 """Create a share using the HTTP Web API method."""
1603@@ -1879,6 +1850,7 @@
1604
1605 __str__ = __repr__
1606
1607+
1608 class GetDelta(ActionQueueCommand):
1609 """Gets a delta from a generation for a volume."""
1610
1611@@ -2005,15 +1977,14 @@
1612 self.node_id = node_id
1613 self.is_public = is_public
1614
1615- def _start(self):
1616- """See ActionQueueCommand."""
1617- return self.demark(self.share_id, self.node_id)
1618+ def _get_possible_markers(self):
1619+ """Return the possible markers."""
1620+ return self.share_id, self.node_id
1621
1622 def store_marker_result(self, (share_id, node_id)):
1623 """See ActionQueueCommand."""
1624 self.share_id = share_id
1625 self.node_id = node_id
1626- self.start_done = True
1627
1628 def _change_public_access_http(self):
1629 """Change public access using the HTTP Web API method."""
1630@@ -2166,9 +2137,9 @@
1631 self.download_req = None
1632 self.action_queue.cancel_download(self.share_id, self.node_id)
1633
1634- def _start(self):
1635- """Do the specialized pre-run setup."""
1636- return self.demark(self.node_id)
1637+ def _get_possible_markers(self):
1638+ """Return the possible markers."""
1639+ return self.share_id, self.node_id
1640
1641 def cancel(self):
1642 """Cancel the download."""
1643@@ -2177,10 +2148,10 @@
1644 self.download_req.cancel()
1645 self.cleanup()
1646
1647- def store_marker_result(self, (node_id,)):
1648+ def store_marker_result(self, (share_id, node_id)):
1649 """Called when all the markers are realized."""
1650+ self.share_id = share_id
1651 self.node_id = node_id
1652- self.start_done = True
1653
1654 def _run(self):
1655 """Do the actual running."""
1656@@ -2365,6 +2336,10 @@
1657 self.log.debug('stopping the producer')
1658 self.upload_req.producer.stopProducing()
1659
1660+ def _get_possible_markers(self):
1661+ """Return the possible markers."""
1662+ return self.share_id, self.node_id
1663+
1664 def _start(self):
1665 """Do the specialized pre-run setup."""
1666 d = defer.Deferred()
1667@@ -2373,19 +2348,14 @@
1668 self.action_queue.uploading[self.share_id, self.node_id] = uploading
1669
1670 d = self.action_queue.zip_queue.zip(self)
1671- d.addCallback(lambda _: self.demark(self.node_id))
1672 d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
1673 if self.cancelled else x)
1674 return d
1675
1676- def store_marker_result(self, (node_id,)):
1677+ def store_marker_result(self, (share_id, node_id)):
1678 """Called when all the markers are realized."""
1679- # update action_queue.uploading with the real node_id
1680- uploading = self.action_queue.uploading.pop((self.share_id,
1681- self.node_id))
1682+ self.share_id = share_id
1683 self.node_id = node_id
1684- self.action_queue.uploading[self.share_id, node_id] = uploading
1685- self.start_done = True
1686
1687 def _run(self):
1688 """Do the actual running."""
1689
1690=== modified file 'ubuntuone/syncdaemon/event_queue.py'
1691--- ubuntuone/syncdaemon/event_queue.py 2010-08-17 14:23:31 +0000
1692+++ ubuntuone/syncdaemon/event_queue.py 2010-08-30 17:41:19 +0000
1693@@ -44,9 +44,9 @@
1694 'FS_INVALID_NAME': ('dirname', 'filename',),
1695
1696 'AQ_FILE_NEW_OK': ('volume_id', 'marker', 'new_id', 'new_generation'),
1697- 'AQ_FILE_NEW_ERROR': ('marker', 'error'),
1698+ 'AQ_FILE_NEW_ERROR': ('marker', 'failure'),
1699 'AQ_DIR_NEW_OK': ('volume_id', 'marker', 'new_id', 'new_generation'),
1700- 'AQ_DIR_NEW_ERROR': ('marker', 'error'),
1701+ 'AQ_DIR_NEW_ERROR': ('marker', 'failure'),
1702 'AQ_MOVE_OK': ('share_id', 'node_id', 'new_generation'),
1703 'AQ_MOVE_ERROR': ('share_id', 'node_id',
1704 'old_parent_id', 'new_parent_id', 'new_name', 'error'),
1705
1706=== modified file 'ubuntuone/syncdaemon/sync.py'
1707--- ubuntuone/syncdaemon/sync.py 2010-08-10 14:08:40 +0000
1708+++ ubuntuone/syncdaemon/sync.py 2010-08-30 17:41:19 +0000
1709@@ -289,6 +289,7 @@
1710 debug = loglevel(logging.DEBUG)
1711 exception = loglevel(-1)
1712
1713+
1714 class SyncStateMachineRunner(StateMachineRunner):
1715 """This is where all the state machine methods are."""
1716
1717@@ -343,10 +344,10 @@
1718 params.update(self.build_hash_eq(hash))
1719 self.on_event(event, params, error, hash, *args)
1720
1721- def signal_event_with_error(self, event, error, *args):
1722+ def signal_event_with_error(self, event, failure, *args):
1723 """An event returned with error."""
1724- params = self.build_error_eq(error)
1725- self.on_event(event, params, error, *args)
1726+ params = self.build_error_eq(failure.getErrorMessage())
1727+ self.on_event(event, params, failure, *args)
1728
1729 def build_error_eq(self, error):
1730 """Get the error state."""
1731@@ -541,7 +542,6 @@
1732
1733 def new_local_file(self, event, parms, path):
1734 """a new local file was created"""
1735- # XXX: lucio.torre: we should use markers here
1736 parent_path = os.path.dirname(path)
1737 parent = self.m.fs.get_by_path(parent_path)
1738 parent_id = parent.node_id or MDMarker(parent.mdid)
1739@@ -554,14 +554,21 @@
1740 marker = MDMarker(self.key.get_mdid())
1741 self.m.action_q.make_file(share_id, parent_id, name, marker)
1742
1743- def new_local_file_created(self, event, parms, new_id):
1744- """we got the server answer for the file creation."""
1745+ def release_marker_ok(self, event, parms, new_id, marker):
1746+ """Release ok the received marker in AQ's DeferredMap."""
1747+ self.m.action_q.uuid_map.set(marker, new_id)
1748+
1749+ def release_marker_error(self, event, parms, failure, marker):
1750+ """Release with error the received marker in AQ's DeferredMap."""
1751+ self.m.action_q.uuid_map.err(marker, failure)
1752+
1753+ def new_local_file_created(self, event, parms, new_id, marker):
1754+ """We got the server answer for the file creation."""
1755+ self.m.action_q.uuid_map.set(marker, new_id)
1756 self.m.fs.set_node_id(self.key['path'], new_id)
1757
1758-
1759 def new_local_dir(self, event, parms, path):
1760 """a new local dir was created"""
1761- # XXX: lucio.torre: we should use markers here
1762 parent_path = os.path.dirname(path)
1763 parent = self.m.fs.get_by_path(parent_path)
1764 parent_id = parent.node_id or MDMarker(parent.mdid)
1765@@ -573,8 +580,9 @@
1766 self.m.action_q.make_dir(share_id, parent_id, name, marker)
1767 self.m.lr.scan_dir(mdid, path)
1768
1769- def new_local_dir_created(self, event, parms, new_id):
1770+ def new_local_dir_created(self, event, parms, new_id, marker):
1771 """Server answered that dir creation was ok."""
1772+ self.m.action_q.uuid_map.set(marker, new_id)
1773 self.m.fs.set_node_id(self.key['path'], new_id)
1774
1775 def calculate_hash(self, event, params):
1776@@ -684,20 +692,12 @@
1777 self.key.remove_partial()
1778 self.delete_file(event, params)
1779
1780- def file_not_created_remove(self, event, params, error):
1781- """kill it"""
1782+ def filedir_error_in_creation(self, event, params, failure, marker):
1783+ """Move actual content to conflict, and delete the metadata."""
1784+ self.m.action_q.uuid_map.err(marker, failure)
1785 self.key.move_to_conflict()
1786 self.key.delete_metadata()
1787
1788- def dir_not_created_in_server(self, event, params, error):
1789- """Re-get the dir if it was because already exist."""
1790- if error == "ALREADY_EXISTS":
1791- # delete metadata
1792- self.key.delete_metadata()
1793- else:
1794- self.key.move_to_conflict()
1795- self.key.delete_metadata()
1796-
1797 def delete_on_server(self, event, params, path):
1798 """local file was deleted."""
1799 self.m.action_q.unlink(self.key['share_id'],
1800@@ -935,29 +935,29 @@
1801 key = FSKey(self.m.fs, mdid=marker)
1802 log = FileLogger(self.logger, key)
1803 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
1804- ssmr.on_event("AQ_FILE_NEW_OK", {}, new_id)
1805+ ssmr.on_event("AQ_FILE_NEW_OK", {}, new_id, marker)
1806 ssmr.update_generation(volume_id, new_id, new_generation)
1807
1808- def handle_AQ_FILE_NEW_ERROR(self, marker, error):
1809+ def handle_AQ_FILE_NEW_ERROR(self, marker, failure):
1810 """on AQ_FILE_NEW_ERROR"""
1811 key = FSKey(self.m.fs, mdid=marker)
1812 log = FileLogger(self.logger, key)
1813 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
1814- ssmr.signal_event_with_error("AQ_FILE_NEW_ERROR", error)
1815+ ssmr.signal_event_with_error("AQ_FILE_NEW_ERROR", failure, marker)
1816
1817- def handle_AQ_DIR_NEW_ERROR(self, marker, error):
1818+ def handle_AQ_DIR_NEW_ERROR(self, marker, failure):
1819 """on AQ_DIR_NEW_ERROR"""
1820 key = FSKey(self.m.fs, mdid=marker)
1821 log = FileLogger(self.logger, key)
1822 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
1823- ssmr.signal_event_with_error("AQ_DIR_NEW_ERROR", error)
1824+ ssmr.signal_event_with_error("AQ_DIR_NEW_ERROR", failure, marker)
1825
1826 def handle_AQ_DIR_NEW_OK(self, volume_id, marker, new_id, new_generation):
1827 """On AQ_DIR_NEW_OK."""
1828 key = FSKey(self.m.fs, mdid=marker)
1829 log = FileLogger(self.logger, key)
1830 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
1831- ssmr.on_event("AQ_DIR_NEW_OK", {}, new_id)
1832+ ssmr.on_event("AQ_DIR_NEW_OK", {}, new_id, marker)
1833 ssmr.update_generation(volume_id, new_id, new_generation)
1834
1835 def handle_FS_FILE_CLOSE_WRITE(self, path):
1836
1837=== modified file 'ubuntuone/syncdaemon/u1fsfsm.ods'
1838Binary files ubuntuone/syncdaemon/u1fsfsm.ods 2010-07-16 21:36:52 +0000 and ubuntuone/syncdaemon/u1fsfsm.ods 2010-08-30 17:41:19 +0000 differ
1839=== modified file 'ubuntuone/syncdaemon/u1fsfsm.py'
1840--- ubuntuone/syncdaemon/u1fsfsm.py 2010-07-19 17:31:18 +0000
1841+++ ubuntuone/syncdaemon/u1fsfsm.py 2010-08-30 17:41:19 +0000
1842@@ -210,7 +210,7 @@
1843 u'has_metadata': u'=',
1844 u'is_directory': u'='}}],
1845 u'AQ_DIR_NEW_ERROR': [{'ACTION': u'pass',
1846- 'ACTION_FUNC': u'nothing',
1847+ 'ACTION_FUNC': u'release_marker_error',
1848 'COMMENTS': u'',
1849 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1850 u'hash_eq_server_hash': u'NA',
1851@@ -223,24 +223,11 @@
1852 u'has_metadata': u'=',
1853 u'is_directory': u'='}},
1854 {'ACTION': u'CONFLICT',
1855- 'ACTION_FUNC': u'file_not_created_remove',
1856- 'COMMENTS': u'',
1857- 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1858- u'hash_eq_server_hash': u'NA',
1859- u'not_authorized': u'T',
1860- u'not_available': u'*'},
1861- 'STATE': {u'changed': u'*',
1862- u'has_metadata': u'*',
1863- u'is_directory': u'T'},
1864- 'STATE_OUT': {u'changed': u'NA',
1865- u'has_metadata': u'F',
1866- u'is_directory': u'NA'}},
1867- {'ACTION': u'if the error is because the dir already exists in the server, we need to merge them',
1868- 'ACTION_FUNC': u'dir_not_created_in_server',
1869- 'COMMENTS': u'',
1870- 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1871- u'hash_eq_server_hash': u'NA',
1872- u'not_authorized': u'F',
1873+ 'ACTION_FUNC': u'filedir_error_in_creation',
1874+ 'COMMENTS': u'',
1875+ 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1876+ u'hash_eq_server_hash': u'NA',
1877+ u'not_authorized': u'*',
1878 u'not_available': u'*'},
1879 'STATE': {u'changed': u'*',
1880 u'has_metadata': u'*',
1881@@ -249,7 +236,7 @@
1882 u'has_metadata': u'F',
1883 u'is_directory': u'NA'}},
1884 {'ACTION': u'pass',
1885- 'ACTION_FUNC': u'nothing',
1886+ 'ACTION_FUNC': u'release_marker_error',
1887 'COMMENTS': u'',
1888 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1889 u'hash_eq_server_hash': u'NA',
1890@@ -260,48 +247,22 @@
1891 u'is_directory': u'F'},
1892 'STATE_OUT': {u'changed': u'=',
1893 u'has_metadata': u'=',
1894- u'is_directory': u'='}},
1895- {'ACTION': u'NA',
1896- 'ACTION_FUNC': u'',
1897- 'COMMENTS': u'',
1898- 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1899- u'hash_eq_server_hash': u'NA',
1900- u'not_authorized': u'NA',
1901- u'not_available': u'*'},
1902- 'STATE': {u'changed': u'*',
1903- u'has_metadata': u'*',
1904- u'is_directory': u'T'},
1905- 'STATE_OUT': {u'changed': u'=',
1906- u'has_metadata': u'=',
1907 u'is_directory': u'='}}],
1908- u'AQ_DIR_NEW_OK': [{'ACTION': u'',
1909- 'ACTION_FUNC': u'nothing',
1910- 'COMMENTS': u'',
1911- 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1912- u'hash_eq_server_hash': u'NA',
1913- u'not_authorized': u'NA',
1914- u'not_available': u'NA'},
1915- 'STATE': {u'changed': u'NONE',
1916- u'has_metadata': u'T',
1917- u'is_directory': u'F'},
1918- 'STATE_OUT': {u'changed': u'=',
1919- u'has_metadata': u'=',
1920- u'is_directory': u'='}},
1921- {'ACTION': u'pass',
1922- 'ACTION_FUNC': u'nothing',
1923- 'COMMENTS': u'we got IN_FILE_CHANGED and HQ_HASH_NEW between IN_FILE_NEW and AQ_FILE_NEW_OK',
1924- 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1925- u'hash_eq_server_hash': u'NA',
1926- u'not_authorized': u'NA',
1927- u'not_available': u'NA'},
1928- 'STATE': {u'changed': u'LOCAL',
1929- u'has_metadata': u'T',
1930- u'is_directory': u'F'},
1931- 'STATE_OUT': {u'changed': u'=',
1932- u'has_metadata': u'=',
1933- u'is_directory': u'='}},
1934- {'ACTION': u'',
1935- 'ACTION_FUNC': u'nothing',
1936+ u'AQ_DIR_NEW_OK': [{'ACTION': u'aq.uuid_map.set(marker, new_id)',
1937+ 'ACTION_FUNC': u'release_marker_ok',
1938+ 'COMMENTS': u"it's a file now",
1939+ 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1940+ u'hash_eq_server_hash': u'NA',
1941+ u'not_authorized': u'NA',
1942+ u'not_available': u'NA'},
1943+ 'STATE': {u'changed': u'*',
1944+ u'has_metadata': u'T',
1945+ u'is_directory': u'F'},
1946+ 'STATE_OUT': {u'changed': u'=',
1947+ u'has_metadata': u'=',
1948+ u'is_directory': u'='}},
1949+ {'ACTION': u'aq.uuid_map.set(marker, new_id)',
1950+ 'ACTION_FUNC': u'release_marker_ok',
1951 'COMMENTS': u'the dir was now gone',
1952 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1953 u'hash_eq_server_hash': u'NA',
1954@@ -322,7 +283,7 @@
1955 u'not_available': u'NA'},
1956 'STATE': {u'changed': u'SERVER',
1957 u'has_metadata': u'T',
1958- u'is_directory': u'*'},
1959+ u'is_directory': u'T'},
1960 'STATE_OUT': {u'changed': u'NONE',
1961 u'has_metadata': u'T',
1962 u'is_directory': u'='}},
1963@@ -977,7 +938,7 @@
1964 u'has_metadata': u'T',
1965 u'is_directory': u'='}}],
1966 u'AQ_FILE_NEW_ERROR': [{'ACTION': u'pass',
1967- 'ACTION_FUNC': u'nothing',
1968+ 'ACTION_FUNC': u'release_marker_error',
1969 'COMMENTS': u'',
1970 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1971 u'hash_eq_server_hash': u'NA',
1972@@ -990,24 +951,11 @@
1973 u'has_metadata': u'=',
1974 u'is_directory': u'='}},
1975 {'ACTION': u'CONFLICT',
1976- 'ACTION_FUNC': u'file_not_created_remove',
1977- 'COMMENTS': u'',
1978- 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1979- u'hash_eq_server_hash': u'NA',
1980- u'not_authorized': u'T',
1981- u'not_available': u'*'},
1982- 'STATE': {u'changed': u'*',
1983- u'has_metadata': u'*',
1984- u'is_directory': u'F'},
1985- 'STATE_OUT': {u'changed': u'NA',
1986- u'has_metadata': u'F',
1987- u'is_directory': u'NA'}},
1988- {'ACTION': u'CONFLICT',
1989- 'ACTION_FUNC': u'file_not_created_remove',
1990- 'COMMENTS': u'',
1991- 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1992- u'hash_eq_server_hash': u'NA',
1993- u'not_authorized': u'F',
1994+ 'ACTION_FUNC': u'filedir_error_in_creation',
1995+ 'COMMENTS': u'',
1996+ 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
1997+ u'hash_eq_server_hash': u'NA',
1998+ u'not_authorized': u'*',
1999 u'not_available': u'*'},
2000 'STATE': {u'changed': u'*',
2001 u'has_metadata': u'*',
2002@@ -1016,7 +964,7 @@
2003 u'has_metadata': u'F',
2004 u'is_directory': u'NA'}},
2005 {'ACTION': u'pass',
2006- 'ACTION_FUNC': u'nothing',
2007+ 'ACTION_FUNC': u'release_marker_error',
2008 'COMMENTS': u'',
2009 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
2010 u'hash_eq_server_hash': u'NA',
2011@@ -1027,19 +975,6 @@
2012 u'is_directory': u'T'},
2013 'STATE_OUT': {u'changed': u'=',
2014 u'has_metadata': u'=',
2015- u'is_directory': u'='}},
2016- {'ACTION': u'NA',
2017- 'ACTION_FUNC': u'',
2018- 'COMMENTS': u'',
2019- 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
2020- u'hash_eq_server_hash': u'NA',
2021- u'not_authorized': u'NA',
2022- u'not_available': u'*'},
2023- 'STATE': {u'changed': u'*',
2024- u'has_metadata': u'*',
2025- u'is_directory': u'F'},
2026- 'STATE_OUT': {u'changed': u'=',
2027- u'has_metadata': u'=',
2028 u'is_directory': u'='}}],
2029 u'AQ_FILE_NEW_OK': [{'ACTION': u'md.set(mdid, server_uuid=server_uuid)',
2030 'ACTION_FUNC': u'new_local_file_created',
2031@@ -1067,8 +1002,8 @@
2032 'STATE_OUT': {u'changed': u'=',
2033 u'has_metadata': u'=',
2034 u'is_directory': u'='}},
2035- {'ACTION': u'pass',
2036- 'ACTION_FUNC': u'nothing',
2037+ {'ACTION': u'aq.uuid_map.set(marker, new_id)',
2038+ 'ACTION_FUNC': u'release_marker_ok',
2039 'COMMENTS': u'file deleted locally',
2040 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
2041 u'hash_eq_server_hash': u'NA',
2042@@ -1089,18 +1024,18 @@
2043 u'not_available': u'NA'},
2044 'STATE': {u'changed': u'SERVER',
2045 u'has_metadata': u'T',
2046- u'is_directory': u'*'},
2047+ u'is_directory': u'F'},
2048 'STATE_OUT': {u'changed': u'NONE',
2049 u'has_metadata': u'T',
2050 u'is_directory': u'='}},
2051- {'ACTION': u'pass',
2052- 'ACTION_FUNC': u'nothing',
2053- 'COMMENTS': u'',
2054+ {'ACTION': u'aq.uuid_map.set(marker, new_id)',
2055+ 'ACTION_FUNC': u'release_marker_ok',
2056+ 'COMMENTS': u"it's a directory now",
2057 'PARAMETERS': {u'hash_eq_local_hash': u'NA',
2058 u'hash_eq_server_hash': u'NA',
2059 u'not_authorized': u'NA',
2060 u'not_available': u'NA'},
2061- 'STATE': {u'changed': u'NONE',
2062+ 'STATE': {u'changed': u'*',
2063 u'has_metadata': u'T',
2064 u'is_directory': u'T'},
2065 'STATE_OUT': {u'changed': u'=',

Subscribers

People subscribed via source and target branches