Merge lp:~facundo/ubuntuone-client/non-bouncing-filesystem into lp:ubuntuone-client

Proposed by Facundo Batista
Status: Merged
Approved by: Guillermo Gonzalez
Approved revision: 256
Merged at revision: not available
Proposed branch: lp:~facundo/ubuntuone-client/non-bouncing-filesystem
Merge into: lp:ubuntuone-client
Diff against target: 689 lines
9 files modified
contrib/testing/testcase.py (+1/-0)
tests/syncdaemon/test_eq_inotify.py (+179/-0)
tests/syncdaemon/test_eventqueue.py (+66/-0)
tests/syncdaemon/test_fsm.py (+15/-6)
tests/syncdaemon/test_localrescan.py (+4/-2)
tests/syncdaemon/test_sync.py (+4/-0)
ubuntuone/syncdaemon/event_queue.py (+69/-13)
ubuntuone/syncdaemon/filesystem_manager.py (+26/-2)
ubuntuone/syncdaemon/main.py (+1/-0)
To merge this branch: bzr merge lp:~facundo/ubuntuone-client/non-bouncing-filesystem
Reviewer Review Type Date Requested Status
Guillermo Gonzalez Approve
John Lenton (community) Approve
Ubuntu One hackers Pending
Review via email: mp+13419@code.launchpad.net

Commit message

Now we avoid the filesystem event bouncing.

To post a comment you must log in.
Revision history for this message
Facundo Batista (facundo) wrote :

Now we avoid the filesystem event bouncing.

Before, when SD removed a file (for example), the event FS_FILE_DELETED arrived from the filesystem. As the event from filesystem is related to a "path", it may happen that if a new file is created with that path *before* the event from filesystem arrives, the event will impact the new node, incorrectly.

So, we need to avoid these bouncing events. For this I put a MuteFilter en EQ to whom the expected events are added, and it filters them. As the one that touches disk is FSM, this is pretty clean.

Regarding tests, I added some to test MuteFilter itself, and also to test EQ filtering the events. But as FSM now has a reference to EQ, I had to change some other tests because of this.

Revision history for this message
John Lenton (chipaca) wrote :

I love it. Lurve, lurve, luv it.

review: Approve
Revision history for this message
Guillermo Gonzalez (verterok) wrote :

I *really* like this

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'contrib/testing/testcase.py'
2--- contrib/testing/testcase.py 2009-10-12 16:00:16 +0000
3+++ contrib/testing/testcase.py 2009-10-15 15:40:24 +0000
4@@ -102,6 +102,7 @@
5 self.fs = fs_manager.FileSystemManager(self.data_dir,
6 self.partials_dir, self.vm)
7 self.event_q = event_queue.EventQueue(self.fs)
8+ self.fs.register_eq(self.event_q)
9 self.action_q = FakeActionQueue(self.event_q)
10 self.state = main.SyncDaemonStateManager(self, 2, 0)
11 self.event_q.subscribe(self.vm)
12
13=== modified file 'tests/syncdaemon/test_eq_inotify.py'
14--- tests/syncdaemon/test_eq_inotify.py 2009-09-30 18:02:18 +0000
15+++ tests/syncdaemon/test_eq_inotify.py 2009-10-15 15:40:24 +0000
16@@ -885,6 +885,185 @@
17 return self._deferred
18
19
20+class MutedSignalsTests(BaseTwisted):
21+ '''Test that EQ filter some signals on demand.'''
22+
23+ class DontHitMe(object):
24+ '''we shouldn't be called'''
25+ # class-closure, cannot use self, pylint: disable-msg=E0213
26+ def __init__(innerself, obj):
27+ innerself.obj = obj
28+ def handle_default(innerself, *a):
29+ '''Something here? Error!'''
30+ innerself.obj.finished_error("don't hit me! received %s" % (a,))
31+
32+ def check_filter(self, _=None):
33+ self.assertFalse(self.eq._processor._to_mute._cnt)
34+ self.finished_ok()
35+
36+ def test_file_open(self):
37+ '''Test receiving the open signal on files.'''
38+ testfile = os.path.join(self.root_dir, "foo")
39+ open(testfile, "w").close()
40+ self.eq.add_to_mute_filter("FS_FILE_OPEN", testfile)
41+ self.eq.add_to_mute_filter("FS_FILE_CLOSE_NOWRITE", testfile)
42+
43+ self.eq.inotify_add_watch(self.root_dir)
44+ self.eq.subscribe(self.DontHitMe(self))
45+
46+ # generate the event
47+ open(testfile)
48+ reactor.callLater(.1, self.check_filter)
49+ return self._deferred
50+
51+ def test_file_close_nowrite(self):
52+ '''Test receiving the close_nowrite signal on files.'''
53+ testfile = os.path.join(self.root_dir, "foo")
54+ open(testfile, "w").close()
55+ fh = open(testfile)
56+ self.eq.add_to_mute_filter("FS_FILE_CLOSE_NOWRITE", testfile)
57+
58+ self.eq.inotify_add_watch(self.root_dir)
59+ self.eq.subscribe(self.DontHitMe(self))
60+
61+ # generate the event
62+ fh.close()
63+ reactor.callLater(.1, self.check_filter)
64+ return self._deferred
65+
66+ def test_file_create_close_write(self):
67+ '''Test receiving the create and close_write signals on files.'''
68+ testfile = os.path.join(self.root_dir, "foo")
69+ self.eq.add_to_mute_filter("FS_FILE_CREATE", testfile)
70+ self.eq.add_to_mute_filter("FS_FILE_OPEN", testfile)
71+ self.eq.add_to_mute_filter("FS_FILE_CLOSE_WRITE", testfile)
72+
73+ self.eq.inotify_add_watch(self.root_dir)
74+ self.eq.subscribe(self.DontHitMe(self))
75+
76+ # generate the event
77+ open(testfile, "w").close()
78+ reactor.callLater(.1, self.check_filter)
79+ return self._deferred
80+
81+ def test_dir_create(self):
82+ '''Test receiving the create signal on dirs.'''
83+ testdir = os.path.join(self.root_dir, "foo")
84+ self.eq.add_to_mute_filter("FS_DIR_CREATE", testdir)
85+
86+ self.eq.inotify_add_watch(self.root_dir)
87+ self.eq.subscribe(self.DontHitMe(self))
88+
89+ # generate the event
90+ os.mkdir(testdir)
91+ reactor.callLater(.1, self.check_filter)
92+ return self._deferred
93+
94+ def test_file_delete(self):
95+ '''Test the delete signal on a file.'''
96+ testfile = os.path.join(self.root_dir, "foo")
97+ open(testfile, "w").close()
98+ self.eq.add_to_mute_filter("FS_FILE_DELETE", testfile)
99+
100+ self.eq.inotify_add_watch(self.root_dir)
101+ self.eq.subscribe(self.DontHitMe(self))
102+
103+ # generate the event
104+ os.remove(testfile)
105+ reactor.callLater(.1, self.check_filter)
106+ return self._deferred
107+
108+ def test_dir_delete(self):
109+ '''Test the delete signal on a dir.'''
110+ testdir = os.path.join(self.root_dir, "foo")
111+ os.mkdir(testdir)
112+ self.eq.add_to_mute_filter("FS_DIR_DELETE", testdir)
113+
114+ self.eq.inotify_add_watch(self.root_dir)
115+ self.eq.subscribe(self.DontHitMe(self))
116+
117+ # generate the event
118+ os.rmdir(testdir)
119+ reactor.callLater(.1, self.check_filter)
120+ return self._deferred
121+
122+ def test_file_moved_inside(self):
123+ '''Test the synthesis of the FILE_MOVE event.'''
124+ fromfile = os.path.join(self.root_dir, "foo")
125+ self.fs.create(fromfile, "")
126+ self.fs.set_node_id(fromfile, "from_node_id")
127+ tofile = os.path.join(self.root_dir, "bar")
128+ self.fs.create(tofile, "")
129+ self.fs.set_node_id(tofile, "to_node_id")
130+ open(fromfile, "w").close()
131+ self.eq.add_to_mute_filter("FS_FILE_MOVE", fromfile, tofile)
132+
133+ self.eq.inotify_add_watch(self.root_dir)
134+ self.eq.subscribe(self.DontHitMe(self))
135+
136+ # generate the event
137+ os.rename(fromfile, tofile)
138+ reactor.callLater(.1, self.check_filter)
139+ return self._deferred
140+
141+ def test_dir_moved_inside(self):
142+ '''Test the synthesis of the DIR_MOVE event.'''
143+ fromdir = os.path.join(self.root_dir, "foo")
144+ self.fs.create(fromdir, "")
145+ self.fs.set_node_id(fromdir, "from_node_id")
146+ todir = os.path.join(self.root_dir, "bar")
147+ self.fs.create(todir, "")
148+ self.fs.set_node_id(todir, "to_node_id")
149+ os.mkdir(fromdir)
150+ self.eq.add_to_mute_filter("FS_DIR_MOVE", fromdir, todir)
151+
152+ self.eq.inotify_add_watch(self.root_dir)
153+ self.eq.subscribe(self.DontHitMe(self))
154+
155+ # generate the event
156+ os.rename(fromdir, todir)
157+ reactor.callLater(.1, self.check_filter)
158+ return self._deferred
159+
160+ def test_file_moved_to_conflict(self):
161+ '''Test the handling of the FILE_MOVE event when dest is conflict.'''
162+ fromfile = os.path.join(self.root_dir, "foo")
163+ self.fs.create(fromfile, "")
164+ self.fs.set_node_id(fromfile, "from_node_id")
165+ tofile = os.path.join(self.root_dir, "foo.u1conflict")
166+ self.fs.create(tofile, "")
167+ self.fs.set_node_id(tofile, "to_node_id")
168+ open(fromfile, "w").close()
169+ self.eq.add_to_mute_filter("FS_FILE_MOVE", fromfile, tofile)
170+
171+ self.eq.inotify_add_watch(self.root_dir)
172+ self.eq.subscribe(self.DontHitMe(self))
173+
174+ # generate the event
175+ os.rename(fromfile, tofile)
176+ reactor.callLater(.1, self.check_filter)
177+ return self._deferred
178+
179+ def test_file_moved_from_partial(self):
180+ '''Test the handling of the FILE_MOVE event when source is partial.'''
181+ fromfile = os.path.join(self.root_dir, "mdid.u1partial.foo")
182+ root_dir = os.path.join(self.root_dir, "my_files")
183+ tofile = os.path.join(root_dir, "foo")
184+ mypath = functools.partial(os.path.join, root_dir)
185+ os.mkdir(root_dir)
186+ open(fromfile, "w").close()
187+ self.eq.add_to_mute_filter("FS_FILE_CREATE", tofile)
188+ self.eq.add_to_mute_filter("FS_FILE_CLOSE_WRITE", tofile)
189+
190+ self.eq.inotify_add_watch(root_dir)
191+ self.eq.subscribe(self.DontHitMe(self))
192+
193+ # generate the event
194+ os.rename(fromfile, tofile)
195+ reactor.callLater(.1, self.check_filter)
196+ return self._deferred
197+
198+
199 def test_suite():
200 # pylint: disable-msg=C0111
201 return unittest.TestLoader().loadTestsFromName(__name__)
202
203=== modified file 'tests/syncdaemon/test_eventqueue.py'
204--- tests/syncdaemon/test_eventqueue.py 2009-09-01 20:35:36 +0000
205+++ tests/syncdaemon/test_eventqueue.py 2009-10-15 15:40:24 +0000
206@@ -47,6 +47,7 @@
207 self.fs.set_by_path(path=self.root_dir,
208 local_hash=None, server_hash=None)
209 self.eq = event_queue.EventQueue(self.fs)
210+ self.fs.register_eq(self.eq)
211
212 def tearDown(self):
213 '''Clean up the tests.'''
214@@ -301,6 +302,71 @@
215 return d
216
217
218+class MuteFilterTests(unittest.TestCase):
219+ '''Tests the MuteFilter class.'''
220+
221+ def setUp(self):
222+ self.mf = event_queue.MuteFilter()
223+
224+ def test_empty(self):
225+ '''Nothing there.'''
226+ self.assertFalse(self.mf._cnt)
227+
228+ def test_add_one(self):
229+ '''Adds one element.'''
230+ self.mf.add("foo")
231+ self.assertEqual(self.mf._cnt, dict(foo=1))
232+
233+ def test_add_two_different(self):
234+ '''Adds two different elements.'''
235+ self.mf.add("foo")
236+ self.mf.add("bar")
237+ self.assertEqual(self.mf._cnt, dict(foo=1, bar=1))
238+
239+ def test_add_two_equal(self):
240+ '''Adds one element twice.'''
241+ self.mf.add("foo")
242+ self.mf.add("foo")
243+ self.assertEqual(self.mf._cnt, dict(foo=2))
244+
245+ def test_add_two_equal_and_third(self):
246+ '''Adds one element.'''
247+ self.mf.add("foo")
248+ self.mf.add("bar")
249+ self.mf.add("bar")
250+ self.assertEqual(self.mf._cnt, dict(foo=1, bar=2))
251+
252+ def test_pop_simple(self):
253+ '''Pops one element.'''
254+ self.mf.add("foo")
255+ self.assertFalse(self.mf.pop("bar"))
256+ self.assertEqual(self.mf._cnt, dict(foo=1))
257+ self.assertTrue(self.mf.pop("foo"))
258+ self.assertFalse(self.mf._cnt)
259+
260+ def test_pop_complex(self):
261+ '''Pops several elements.'''
262+ # add several
263+ self.mf.add("foo")
264+ self.mf.add("bar")
265+ self.mf.add("bar")
266+ self.assertEqual(self.mf._cnt, dict(foo=1, bar=2))
267+
268+ # clean bar
269+ self.assertTrue(self.mf.pop("bar"))
270+ self.assertEqual(self.mf._cnt, dict(foo=1, bar=1))
271+ self.assertTrue(self.mf.pop("bar"))
272+ self.assertEqual(self.mf._cnt, dict(foo=1))
273+ self.assertFalse(self.mf.pop("bar"))
274+ self.assertEqual(self.mf._cnt, dict(foo=1))
275+
276+ # clean foo
277+ self.assertTrue(self.mf.pop("foo"))
278+ self.assertFalse(self.mf._cnt)
279+ self.assertFalse(self.mf.pop("foo"))
280+ self.assertFalse(self.mf._cnt)
281+
282+
283 def test_suite():
284 # pylint: disable-msg=C0111
285 return unittest.TestLoader().loadTestsFromName(__name__)
286
287=== modified file 'tests/syncdaemon/test_fsm.py'
288--- tests/syncdaemon/test_fsm.py 2009-10-09 17:52:07 +0000
289+++ tests/syncdaemon/test_fsm.py 2009-10-15 15:40:24 +0000
290@@ -36,6 +36,7 @@
291 METADATA_VERSION,
292 )
293 from ubuntuone.syncdaemon.volume_manager import Share, allow_writes
294+from ubuntuone.syncdaemon.event_queue import EventQueue
295
296 TESTS_DIR = os.path.join(os.getcwd(), "tmp")
297
298@@ -62,12 +63,15 @@
299 self.partials_dir = os.path.join(TESTS_DIR, "partials")
300 self.fsm = FileSystemManager(self.fsmdir, self.partials_dir,
301 FakeVolumeManager(self.root_dir))
302+ self.eq = EventQueue(self.fsm)
303+ self.fsm.register_eq(self.eq)
304 self.share = self.create_share('share', 'share_name',
305 self.fsm, self.shares_dir)
306 self.share_path = self.share.path
307
308 def tearDown(self):
309 """ Clean up the tests. """
310+ self.eq.shutdown()
311 self.rmtree(TESTS_DIR)
312
313 @staticmethod
314@@ -2090,7 +2094,7 @@
315 os.chmod(os.path.join(dirpath, dir), 0777)
316 for file in files:
317 os.chmod(os.path.join(dirpath, file), 0666)
318- shutil.rmtree(TESTS_DIR)
319+ FSMTestCase.tearDown(self)
320
321 def test_file_ro_share_fail(self):
322 """ Test that manual creation of a file, fails on a ro-share. """
323@@ -2241,21 +2245,25 @@
324 FSMTestCase.setUp(self)
325 # create a ro share
326 self.share_ro = self.create_share('share_ro', 'share_ro_name',
327- self.fsm, self.shares_dir, access_level='View')
328+ self.fsm, self.shares_dir,
329+ access_level='View')
330 self.share_ro_path = self.share_ro.path
331
332 def test_write_in_ro_share(self):
333 """test the EnableShareWrite context manager in a ro share"""
334 path = os.path.join(self.share_ro_path, 'foo', 'a_file_in_a_ro_share')
335 data = 'yes I can write!'
336- can_write_parent = os.access(os.path.dirname(self.share_ro_path), os.W_OK)
337+ can_write_parent = os.access(os.path.dirname(self.share_ro_path),
338+ os.W_OK)
339 with EnableShareWrite(self.share_ro, path):
340 with open(path, 'w') as f:
341 f.write(data)
342 self.assertEquals(data, open(path, 'r').read())
343 self.assertFalse(os.access(self.share_ro_path, os.W_OK))
344 # check that the parent permissions are ok
345- self.assertEquals(can_write_parent, os.access(os.path.dirname(self.share_ro_path), os.W_OK))
346+ self.assertEquals(can_write_parent,
347+ os.access(os.path.dirname(self.share_ro_path),
348+ os.W_OK))
349 # fail to write directly in the share
350 self.assertRaises(IOError, open, path, 'w')
351
352@@ -2291,7 +2299,8 @@
353 os.makedirs(self.root_dir)
354 self.data_dir = os.path.join(TESTS_DIR, "data")
355 self.partials_dir = os.path.join(TESTS_DIR, "partials")
356- self.main = FakeMain(self.root_dir, self.shares_dir, self.data_dir, self.partials_dir)
357+ self.main = FakeMain(self.root_dir, self.shares_dir,
358+ self.data_dir, self.partials_dir)
359 self.fsm = self.main.fs
360 self.share = self.create_share('share', 'share_name',
361 self.fsm, self.shares_dir)
362@@ -2300,7 +2309,7 @@
363 def tearDown(self):
364 """ Clean up the tests. """
365 self.main.shutdown()
366- FSMTestCase.tearDown(self)
367+ self.rmtree(TESTS_DIR)
368
369
370 @staticmethod
371
372=== modified file 'tests/syncdaemon/test_localrescan.py'
373--- tests/syncdaemon/test_localrescan.py 2009-09-22 15:19:25 +0000
374+++ tests/syncdaemon/test_localrescan.py 2009-10-15 15:40:24 +0000
375@@ -46,9 +46,10 @@
376 '''Store stuff as pushed.'''
377 self.pushed.append((event, path))
378
379- def freeze_begin(self, *a):
380+ def _fake(self, *a):
381 '''fake'''
382- inotify_add_watch = inotify_has_watch = freeze_rollback = is_frozen = freeze_begin
383+ inotify_add_watch = inotify_has_watch = freeze_rollback = is_frozen = _fake
384+ freeze_begin = add_to_mute_filter = _fake
385
386 def freeze_commit(self, events):
387 '''just store events'''
388@@ -88,6 +89,7 @@
389 self.vm)
390 self.fsm.create(usrdir, "")
391 self.eq = FakeEQ()
392+ self.fsm.register_eq(self.eq)
393 self.aq = FakeAQ()
394
395 def tearDown(self):
396
397=== modified file 'tests/syncdaemon/test_sync.py'
398--- tests/syncdaemon/test_sync.py 2009-09-01 20:35:36 +0000
399+++ tests/syncdaemon/test_sync.py 2009-10-15 15:40:24 +0000
400@@ -38,6 +38,7 @@
401 from ubuntuone.syncdaemon.main import Main
402 from ubuntuone.syncdaemon.sync import FSKey
403 from ubuntuone.syncdaemon.volume_manager import Share
404+from ubuntuone.syncdaemon.event_queue import EventQueue
405
406 DBusInterface.test = True
407
408@@ -57,12 +58,15 @@
409 os.makedirs(self.partials_dir)
410 self.fsm = FileSystemManager(self.fsmdir, self.partials_dir,
411 FakeVolumeManager(self.root_dir))
412+ self.eq = EventQueue(self.fsm)
413+ self.fsm.register_eq(self.eq)
414 self.share = self.create_share('share', 'share_name',
415 self.fsm, self.shares_dir)
416 self.share_path = self.share.path
417
418 def tearDown(self):
419 """ Clean up the test dir"""
420+ self.eq.shutdown()
421 shutil.rmtree(self.test_dir)
422
423 @staticmethod
424
425=== modified file 'ubuntuone/syncdaemon/event_queue.py'
426--- ubuntuone/syncdaemon/event_queue.py 2009-09-30 18:02:18 +0000
427+++ ubuntuone/syncdaemon/event_queue.py 2009-10-15 15:40:24 +0000
428@@ -149,6 +149,35 @@
429
430 DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
431
432+
433+class MuteFilter(object):
434+ '''Stores what needs to be muted.'''
435+ def __init__(self):
436+ self._cnt = {}
437+ self.log = logging.getLogger('ubuntuone.SyncDaemon.MuteFilter')
438+
439+ def add(self, element):
440+ '''Adds and element to the filter.'''
441+ self.log.debug("Adding: %s", element)
442+ self._cnt[element] = self._cnt.get(element, 0) + 1
443+
444+ def pop(self, element):
445+ '''Pops an element from the filter, if there, and returns if it was.'''
446+ if element not in self._cnt:
447+ return False
448+
449+ self._cnt[element] = self._cnt.get(element, 0) - 1
450+ if not self._cnt[element]:
451+ # reached zero
452+ del self._cnt[element]
453+
454+ # log what happened and how many items we have left
455+ q = sum(self._cnt.itervalues())
456+ self.log.debug("Blocking %s (%d left)", element, q)
457+
458+ return True
459+
460+
461 class _INotifyProcessor(pyinotify.ProcessEvent):
462 '''Helper class that is called from inpotify when an event happens.
463
464@@ -162,6 +191,24 @@
465 self.timer = None
466 self.frozen_path = None
467 self.frozen_evts = False
468+ self._to_mute = MuteFilter()
469+
470+ def add_to_mute_filter(self, event, *paths):
471+ '''Add an event and path(s) to the mute filter.'''
472+ # all events have one path except the MOVEs
473+ if event in ("FS_FILE_MOVE", "FS_DIR_MOVE"):
474+ f_path, t_path = paths
475+ is_from_forreal = not self.is_ignored(f_path)
476+ is_to_forreal = not self.is_ignored(t_path)
477+ if is_from_forreal and is_to_forreal:
478+ self._to_mute.add((event, f_path, t_path))
479+ elif is_to_forreal:
480+ self._to_mute.add(('FS_FILE_CREATE', t_path))
481+ self._to_mute.add(('FS_FILE_CLOSE_WRITE', t_path))
482+ else:
483+ path = paths[0]
484+ if not self.is_ignored(path):
485+ self._to_mute.add((event, path))
486
487 def on_timeout(self):
488 '''Called on timeout.'''
489@@ -174,7 +221,7 @@
490 try:
491 self.timer.cancel()
492 except error.AlreadyCalled:
493- # self.timeout() was *just* called, do noting here
494+ # self.timeout() was *just* called, do nothing here
495 return
496 self.push_event(self.held_event)
497 self.held_event = None
498@@ -234,7 +281,7 @@
499 try:
500 self.timer.cancel()
501 except error.AlreadyCalled:
502- # self.timeout() was *just* called, do noting here
503+ # self.timeout() was *just* called, do nothing here
504 pass
505 else:
506 f_path = os.path.join(self.held_event.path,
507@@ -255,17 +302,17 @@
508 evtname = "FS_FILE_"
509 if f_share_id != t_share_id:
510 # if the share_id are != push a delete/create
511- self.eq.push(evtname+"DELETE", f_path)
512- self.eq.push(evtname+"CREATE", t_path)
513+ self.eq_push(evtname+"DELETE", f_path)
514+ self.eq_push(evtname+"CREATE", t_path)
515 if not this_is_a_dir:
516- self.eq.push('FS_FILE_CLOSE_WRITE', t_path)
517+ self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
518 else:
519- self.eq.push(evtname+"MOVE", f_path, t_path)
520+ self.eq_push(evtname+"MOVE", f_path, t_path)
521 elif is_to_forreal:
522 # this is the case of a MOVE from something ignored
523 # to a valid filename
524- self.eq.push('FS_FILE_CREATE', t_path)
525- self.eq.push('FS_FILE_CLOSE_WRITE', t_path)
526+ self.eq_push('FS_FILE_CREATE', t_path)
527+ self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
528
529 self.held_event = None
530 return
531@@ -279,7 +326,12 @@
532 self.push_event(event)
533 t_path = os.path.join(event.path, event.name)
534 if not os.path.isdir(t_path):
535- self.eq.push('FS_FILE_CLOSE_WRITE', t_path)
536+ self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
537+
538+ def eq_push(self, *event_data):
539+ '''Sends to EQ the event data, maybe filtering it.'''
540+ if not self._to_mute.pop(event_data):
541+ self.eq.push(*event_data)
542
543 def process_default(self, event):
544 '''Push the event into the EventQueue.'''
545@@ -313,7 +365,7 @@
546 if not self.is_ignored(fullpath):
547 if evt_name == 'FS_DIR_DELETE':
548 self.handle_dir_delete(fullpath)
549- self.eq.push(evt_name, fullpath)
550+ self.eq_push(evt_name, fullpath)
551
552 def freeze_begin(self, path):
553 """Puts in hold all the events for this path."""
554@@ -346,7 +398,7 @@
555 # push the received events
556 for evt_name, path in events:
557 if not self.is_ignored(path):
558- self.eq.push(evt_name, path)
559+ self.eq_push(evt_name, path)
560
561 self.frozen_path = None
562 self.frozen_evts = False
563@@ -360,9 +412,9 @@
564 if path == fullpath:
565 continue
566 if is_dir:
567- self.eq.push('FS_DIR_DELETE', path)
568+ self.eq_push('FS_DIR_DELETE', path)
569 else:
570- self.eq.push('FS_FILE_DELETE', path)
571+ self.eq_push('FS_FILE_DELETE', path)
572
573
574 class EventQueue(object):
575@@ -384,6 +436,10 @@
576 self.dispatch_queue = Queue()
577 self.empty_event_queue_callbacks = set()
578
579+ def add_to_mute_filter(self, *info):
580+ '''Adds info to mute filter in the processor.'''
581+ self._processor.add_to_mute_filter(*info)
582+
583 def add_empty_event_queue_callback(self, callback):
584 """add a callback for when the even queue has no more events."""
585 self.empty_event_queue_callbacks.add(callback)
586
587=== modified file 'ubuntuone/syncdaemon/filesystem_manager.py'
588--- ubuntuone/syncdaemon/filesystem_manager.py 2009-10-13 16:03:54 +0000
589+++ ubuntuone/syncdaemon/filesystem_manager.py 2009-10-15 15:40:24 +0000
590@@ -221,6 +221,7 @@
591 cache_compact_threshold=4)
592 self.shares = {}
593 self.vm = vm
594+ self.eq = None # this will be registered later
595
596 # create the indexes
597 self._idx_path = {}
598@@ -244,9 +245,14 @@
599 logger("initialized: idx_path: %s, idx_node_id: %s, shares: %s" %
600 (len(self._idx_path), len(self._idx_node_id), len(self.shares)))
601
602+ def register_eq(self, eq):
603+ '''Registers an EventQueue here.'''
604+ self.eq = eq
605+
606 def _safe_fs_iteritems(self):
607- """returns a 'safe' iterator over the items of the FileShelf.
608- it's 'safe' because broked metadata objects are deleted and not
609+ """Returns a 'safe' iterator over the items of the FileShelf.
610+
611+ It's 'safe' because broken metadata objects are deleted and not
612 returned.
613 """
614 def safeget_mdobj(mdid):
615@@ -566,6 +572,11 @@
616 # pylint: disable-msg=W0704
617 try:
618 with contextlib.nested(from_context, to_context):
619+ if mdobj["is_dir"]:
620+ expected_event = "FS_DIR_MOVE"
621+ else:
622+ expected_event = "FS_FILE_MOVE"
623+ self.eq.add_to_mute_filter(expected_event, path_from, path_to)
624 shutil.move(path_from, path_to)
625 except IOError, e:
626 # file was not yet created
627@@ -652,8 +663,10 @@
628 try:
629 with self._enable_share_write(mdobj['share_id'], path):
630 if self.is_dir(path=path):
631+ self.eq.add_to_mute_filter("FS_DIR_DELETE", path)
632 os.rmdir(path)
633 else:
634+ self.eq.add_to_mute_filter("FS_FILE_DELETE", path)
635 os.remove(path)
636 except OSError, e:
637 if e.errno == errno.ENOTEMPTY:
638@@ -674,6 +687,11 @@
639 to_path = base_to_path + "." + str(ind)
640 with self._enable_share_write(mdobj['share_id'], path):
641 try:
642+ if mdobj["is_dir"]:
643+ expected_event = "FS_DIR_MOVE"
644+ else:
645+ expected_event = "FS_FILE_MOVE"
646+ self.eq.add_to_mute_filter(expected_event, path, to_path)
647 os.rename(path, to_path)
648 except OSError, e:
649 if e.errno == errno.ENOENT:
650@@ -739,7 +757,10 @@
651 with self._enable_share_write(share_id, os.path.dirname(path)):
652 # if we don't have the dir yet, create it
653 if is_dir and not os.path.exists(path):
654+ self.eq.add_to_mute_filter("FS_DIR_CREATE", path)
655 os.mkdir(path)
656+
657+ # don't alert EQ, partials are in other directory, not watched
658 open(partial_path, "w").close()
659 mdobj["info"]["last_partial_created"] = time.time()
660 mdobj["info"]["is_partial"] = True
661@@ -786,6 +807,8 @@
662
663 partial_path = self._get_partial_path(mdobj)
664 with self._enable_share_write(share_id, path):
665+ self.eq.add_to_mute_filter("FS_FILE_CREATE", path)
666+ self.eq.add_to_mute_filter("FS_FILE_CLOSE_WRITE", path)
667 shutil.move(partial_path, path)
668 mdobj["local_hash"] = local_hash
669 mdobj["info"]["last_downloaded"] = time.time()
670@@ -805,6 +828,7 @@
671 partial_path = self._get_partial_path(mdobj)
672 #pylint: disable-msg=W0704
673 try:
674+ # don't alert EQ, partials are in other directory, not watched
675 os.remove(partial_path)
676 except OSError, e:
677 # we only remove it if its there.
678
679=== modified file 'ubuntuone/syncdaemon/main.py'
680--- ubuntuone/syncdaemon/main.py 2009-10-09 08:19:05 +0000
681+++ ubuntuone/syncdaemon/main.py 2009-10-15 15:40:24 +0000
682@@ -109,6 +109,7 @@
683 self.partials_dir,
684 self.vm)
685 self.event_q = event_queue.EventQueue(self.fs)
686+ self.fs.register_eq(self.event_q)
687 self.oauth_client = OAuthClient(self.realm)
688 self.state = SyncDaemonStateManager(self, handshake_timeout,
689 max_handshake_timeouts)

Subscribers

People subscribed via source and target branches