Merge lp:~facundo/ubuntuone-client/non-bouncing-filesystem into lp:ubuntuone-client

Proposed by Facundo Batista
Status: Merged
Approved by: Guillermo Gonzalez
Approved revision: 256
Merged at revision: not available
Proposed branch: lp:~facundo/ubuntuone-client/non-bouncing-filesystem
Merge into: lp:ubuntuone-client
Diff against target: 689 lines
9 files modified
contrib/testing/testcase.py (+1/-0)
tests/syncdaemon/test_eq_inotify.py (+179/-0)
tests/syncdaemon/test_eventqueue.py (+66/-0)
tests/syncdaemon/test_fsm.py (+15/-6)
tests/syncdaemon/test_localrescan.py (+4/-2)
tests/syncdaemon/test_sync.py (+4/-0)
ubuntuone/syncdaemon/event_queue.py (+69/-13)
ubuntuone/syncdaemon/filesystem_manager.py (+26/-2)
ubuntuone/syncdaemon/main.py (+1/-0)
To merge this branch: bzr merge lp:~facundo/ubuntuone-client/non-bouncing-filesystem
Reviewer Review Type Date Requested Status
Guillermo Gonzalez Approve
John Lenton (community) Approve
Ubuntu One hackers Pending
Review via email: mp+13419@code.launchpad.net

Commit message

Now we avoid the filesystem event bouncing.

To post a comment you must log in.
Revision history for this message
Facundo Batista (facundo) wrote :

Now we avoid the filesystem event bouncing.

Before, when SD removed a file (for example), the event FS_FILE_DELETED arrived from the filesystem. As the event from filesystem is related to a "path", it may happen that if a new file is created with that path *before* the event from filesystem arrives, the event will impact the new node, incorrectly.

So, we need to avoid these bouncing events. For this I put a MuteFilter en EQ to whom the expected events are added, and it filters them. As the one that touches disk is FSM, this is pretty clean.

Regarding tests, I added some to test MuteFilter itself, and also to test EQ filtering the events. But as FSM now has a reference to EQ, I had to change some other tests because of this.

Revision history for this message
John Lenton (chipaca) wrote :

I love it. Lurve, lurve, luv it.

review: Approve
Revision history for this message
Guillermo Gonzalez (verterok) wrote :

I *really* like this

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'contrib/testing/testcase.py'
--- contrib/testing/testcase.py 2009-10-12 16:00:16 +0000
+++ contrib/testing/testcase.py 2009-10-15 15:40:24 +0000
@@ -102,6 +102,7 @@
102 self.fs = fs_manager.FileSystemManager(self.data_dir,102 self.fs = fs_manager.FileSystemManager(self.data_dir,
103 self.partials_dir, self.vm)103 self.partials_dir, self.vm)
104 self.event_q = event_queue.EventQueue(self.fs)104 self.event_q = event_queue.EventQueue(self.fs)
105 self.fs.register_eq(self.event_q)
105 self.action_q = FakeActionQueue(self.event_q)106 self.action_q = FakeActionQueue(self.event_q)
106 self.state = main.SyncDaemonStateManager(self, 2, 0)107 self.state = main.SyncDaemonStateManager(self, 2, 0)
107 self.event_q.subscribe(self.vm)108 self.event_q.subscribe(self.vm)
108109
=== modified file 'tests/syncdaemon/test_eq_inotify.py'
--- tests/syncdaemon/test_eq_inotify.py 2009-09-30 18:02:18 +0000
+++ tests/syncdaemon/test_eq_inotify.py 2009-10-15 15:40:24 +0000
@@ -885,6 +885,185 @@
885 return self._deferred885 return self._deferred
886886
887887
888class MutedSignalsTests(BaseTwisted):
889 '''Test that EQ filter some signals on demand.'''
890
891 class DontHitMe(object):
892 '''we shouldn't be called'''
893 # class-closure, cannot use self, pylint: disable-msg=E0213
894 def __init__(innerself, obj):
895 innerself.obj = obj
896 def handle_default(innerself, *a):
897 '''Something here? Error!'''
898 innerself.obj.finished_error("don't hit me! received %s" % (a,))
899
900 def check_filter(self, _=None):
901 self.assertFalse(self.eq._processor._to_mute._cnt)
902 self.finished_ok()
903
904 def test_file_open(self):
905 '''Test receiving the open signal on files.'''
906 testfile = os.path.join(self.root_dir, "foo")
907 open(testfile, "w").close()
908 self.eq.add_to_mute_filter("FS_FILE_OPEN", testfile)
909 self.eq.add_to_mute_filter("FS_FILE_CLOSE_NOWRITE", testfile)
910
911 self.eq.inotify_add_watch(self.root_dir)
912 self.eq.subscribe(self.DontHitMe(self))
913
914 # generate the event
915 open(testfile)
916 reactor.callLater(.1, self.check_filter)
917 return self._deferred
918
919 def test_file_close_nowrite(self):
920 '''Test receiving the close_nowrite signal on files.'''
921 testfile = os.path.join(self.root_dir, "foo")
922 open(testfile, "w").close()
923 fh = open(testfile)
924 self.eq.add_to_mute_filter("FS_FILE_CLOSE_NOWRITE", testfile)
925
926 self.eq.inotify_add_watch(self.root_dir)
927 self.eq.subscribe(self.DontHitMe(self))
928
929 # generate the event
930 fh.close()
931 reactor.callLater(.1, self.check_filter)
932 return self._deferred
933
934 def test_file_create_close_write(self):
935 '''Test receiving the create and close_write signals on files.'''
936 testfile = os.path.join(self.root_dir, "foo")
937 self.eq.add_to_mute_filter("FS_FILE_CREATE", testfile)
938 self.eq.add_to_mute_filter("FS_FILE_OPEN", testfile)
939 self.eq.add_to_mute_filter("FS_FILE_CLOSE_WRITE", testfile)
940
941 self.eq.inotify_add_watch(self.root_dir)
942 self.eq.subscribe(self.DontHitMe(self))
943
944 # generate the event
945 open(testfile, "w").close()
946 reactor.callLater(.1, self.check_filter)
947 return self._deferred
948
949 def test_dir_create(self):
950 '''Test receiving the create signal on dirs.'''
951 testdir = os.path.join(self.root_dir, "foo")
952 self.eq.add_to_mute_filter("FS_DIR_CREATE", testdir)
953
954 self.eq.inotify_add_watch(self.root_dir)
955 self.eq.subscribe(self.DontHitMe(self))
956
957 # generate the event
958 os.mkdir(testdir)
959 reactor.callLater(.1, self.check_filter)
960 return self._deferred
961
962 def test_file_delete(self):
963 '''Test the delete signal on a file.'''
964 testfile = os.path.join(self.root_dir, "foo")
965 open(testfile, "w").close()
966 self.eq.add_to_mute_filter("FS_FILE_DELETE", testfile)
967
968 self.eq.inotify_add_watch(self.root_dir)
969 self.eq.subscribe(self.DontHitMe(self))
970
971 # generate the event
972 os.remove(testfile)
973 reactor.callLater(.1, self.check_filter)
974 return self._deferred
975
976 def test_dir_delete(self):
977 '''Test the delete signal on a dir.'''
978 testdir = os.path.join(self.root_dir, "foo")
979 os.mkdir(testdir)
980 self.eq.add_to_mute_filter("FS_DIR_DELETE", testdir)
981
982 self.eq.inotify_add_watch(self.root_dir)
983 self.eq.subscribe(self.DontHitMe(self))
984
985 # generate the event
986 os.rmdir(testdir)
987 reactor.callLater(.1, self.check_filter)
988 return self._deferred
989
990 def test_file_moved_inside(self):
991 '''Test the synthesis of the FILE_MOVE event.'''
992 fromfile = os.path.join(self.root_dir, "foo")
993 self.fs.create(fromfile, "")
994 self.fs.set_node_id(fromfile, "from_node_id")
995 tofile = os.path.join(self.root_dir, "bar")
996 self.fs.create(tofile, "")
997 self.fs.set_node_id(tofile, "to_node_id")
998 open(fromfile, "w").close()
999 self.eq.add_to_mute_filter("FS_FILE_MOVE", fromfile, tofile)
1000
1001 self.eq.inotify_add_watch(self.root_dir)
1002 self.eq.subscribe(self.DontHitMe(self))
1003
1004 # generate the event
1005 os.rename(fromfile, tofile)
1006 reactor.callLater(.1, self.check_filter)
1007 return self._deferred
1008
1009 def test_dir_moved_inside(self):
1010 '''Test the synthesis of the DIR_MOVE event.'''
1011 fromdir = os.path.join(self.root_dir, "foo")
1012 self.fs.create(fromdir, "")
1013 self.fs.set_node_id(fromdir, "from_node_id")
1014 todir = os.path.join(self.root_dir, "bar")
1015 self.fs.create(todir, "")
1016 self.fs.set_node_id(todir, "to_node_id")
1017 os.mkdir(fromdir)
1018 self.eq.add_to_mute_filter("FS_DIR_MOVE", fromdir, todir)
1019
1020 self.eq.inotify_add_watch(self.root_dir)
1021 self.eq.subscribe(self.DontHitMe(self))
1022
1023 # generate the event
1024 os.rename(fromdir, todir)
1025 reactor.callLater(.1, self.check_filter)
1026 return self._deferred
1027
1028 def test_file_moved_to_conflict(self):
1029 '''Test the handling of the FILE_MOVE event when dest is conflict.'''
1030 fromfile = os.path.join(self.root_dir, "foo")
1031 self.fs.create(fromfile, "")
1032 self.fs.set_node_id(fromfile, "from_node_id")
1033 tofile = os.path.join(self.root_dir, "foo.u1conflict")
1034 self.fs.create(tofile, "")
1035 self.fs.set_node_id(tofile, "to_node_id")
1036 open(fromfile, "w").close()
1037 self.eq.add_to_mute_filter("FS_FILE_MOVE", fromfile, tofile)
1038
1039 self.eq.inotify_add_watch(self.root_dir)
1040 self.eq.subscribe(self.DontHitMe(self))
1041
1042 # generate the event
1043 os.rename(fromfile, tofile)
1044 reactor.callLater(.1, self.check_filter)
1045 return self._deferred
1046
1047 def test_file_moved_from_partial(self):
1048 '''Test the handling of the FILE_MOVE event when source is partial.'''
1049 fromfile = os.path.join(self.root_dir, "mdid.u1partial.foo")
1050 root_dir = os.path.join(self.root_dir, "my_files")
1051 tofile = os.path.join(root_dir, "foo")
1052 mypath = functools.partial(os.path.join, root_dir)
1053 os.mkdir(root_dir)
1054 open(fromfile, "w").close()
1055 self.eq.add_to_mute_filter("FS_FILE_CREATE", tofile)
1056 self.eq.add_to_mute_filter("FS_FILE_CLOSE_WRITE", tofile)
1057
1058 self.eq.inotify_add_watch(root_dir)
1059 self.eq.subscribe(self.DontHitMe(self))
1060
1061 # generate the event
1062 os.rename(fromfile, tofile)
1063 reactor.callLater(.1, self.check_filter)
1064 return self._deferred
1065
1066
888def test_suite():1067def test_suite():
889 # pylint: disable-msg=C01111068 # pylint: disable-msg=C0111
890 return unittest.TestLoader().loadTestsFromName(__name__)1069 return unittest.TestLoader().loadTestsFromName(__name__)
8911070
=== modified file 'tests/syncdaemon/test_eventqueue.py'
--- tests/syncdaemon/test_eventqueue.py 2009-09-01 20:35:36 +0000
+++ tests/syncdaemon/test_eventqueue.py 2009-10-15 15:40:24 +0000
@@ -47,6 +47,7 @@
47 self.fs.set_by_path(path=self.root_dir,47 self.fs.set_by_path(path=self.root_dir,
48 local_hash=None, server_hash=None)48 local_hash=None, server_hash=None)
49 self.eq = event_queue.EventQueue(self.fs)49 self.eq = event_queue.EventQueue(self.fs)
50 self.fs.register_eq(self.eq)
5051
51 def tearDown(self):52 def tearDown(self):
52 '''Clean up the tests.'''53 '''Clean up the tests.'''
@@ -301,6 +302,71 @@
301 return d302 return d
302303
303304
305class MuteFilterTests(unittest.TestCase):
306 '''Tests the MuteFilter class.'''
307
308 def setUp(self):
309 self.mf = event_queue.MuteFilter()
310
311 def test_empty(self):
312 '''Nothing there.'''
313 self.assertFalse(self.mf._cnt)
314
315 def test_add_one(self):
316 '''Adds one element.'''
317 self.mf.add("foo")
318 self.assertEqual(self.mf._cnt, dict(foo=1))
319
320 def test_add_two_different(self):
321 '''Adds two different elements.'''
322 self.mf.add("foo")
323 self.mf.add("bar")
324 self.assertEqual(self.mf._cnt, dict(foo=1, bar=1))
325
326 def test_add_two_equal(self):
327 '''Adds one element twice.'''
328 self.mf.add("foo")
329 self.mf.add("foo")
330 self.assertEqual(self.mf._cnt, dict(foo=2))
331
332 def test_add_two_equal_and_third(self):
333 '''Adds one element.'''
334 self.mf.add("foo")
335 self.mf.add("bar")
336 self.mf.add("bar")
337 self.assertEqual(self.mf._cnt, dict(foo=1, bar=2))
338
339 def test_pop_simple(self):
340 '''Pops one element.'''
341 self.mf.add("foo")
342 self.assertFalse(self.mf.pop("bar"))
343 self.assertEqual(self.mf._cnt, dict(foo=1))
344 self.assertTrue(self.mf.pop("foo"))
345 self.assertFalse(self.mf._cnt)
346
347 def test_pop_complex(self):
348 '''Pops several elements.'''
349 # add several
350 self.mf.add("foo")
351 self.mf.add("bar")
352 self.mf.add("bar")
353 self.assertEqual(self.mf._cnt, dict(foo=1, bar=2))
354
355 # clean bar
356 self.assertTrue(self.mf.pop("bar"))
357 self.assertEqual(self.mf._cnt, dict(foo=1, bar=1))
358 self.assertTrue(self.mf.pop("bar"))
359 self.assertEqual(self.mf._cnt, dict(foo=1))
360 self.assertFalse(self.mf.pop("bar"))
361 self.assertEqual(self.mf._cnt, dict(foo=1))
362
363 # clean foo
364 self.assertTrue(self.mf.pop("foo"))
365 self.assertFalse(self.mf._cnt)
366 self.assertFalse(self.mf.pop("foo"))
367 self.assertFalse(self.mf._cnt)
368
369
304def test_suite():370def test_suite():
305 # pylint: disable-msg=C0111371 # pylint: disable-msg=C0111
306 return unittest.TestLoader().loadTestsFromName(__name__)372 return unittest.TestLoader().loadTestsFromName(__name__)
307373
=== modified file 'tests/syncdaemon/test_fsm.py'
--- tests/syncdaemon/test_fsm.py 2009-10-09 17:52:07 +0000
+++ tests/syncdaemon/test_fsm.py 2009-10-15 15:40:24 +0000
@@ -36,6 +36,7 @@
36 METADATA_VERSION,36 METADATA_VERSION,
37)37)
38from ubuntuone.syncdaemon.volume_manager import Share, allow_writes38from ubuntuone.syncdaemon.volume_manager import Share, allow_writes
39from ubuntuone.syncdaemon.event_queue import EventQueue
3940
40TESTS_DIR = os.path.join(os.getcwd(), "tmp")41TESTS_DIR = os.path.join(os.getcwd(), "tmp")
4142
@@ -62,12 +63,15 @@
62 self.partials_dir = os.path.join(TESTS_DIR, "partials")63 self.partials_dir = os.path.join(TESTS_DIR, "partials")
63 self.fsm = FileSystemManager(self.fsmdir, self.partials_dir,64 self.fsm = FileSystemManager(self.fsmdir, self.partials_dir,
64 FakeVolumeManager(self.root_dir))65 FakeVolumeManager(self.root_dir))
66 self.eq = EventQueue(self.fsm)
67 self.fsm.register_eq(self.eq)
65 self.share = self.create_share('share', 'share_name',68 self.share = self.create_share('share', 'share_name',
66 self.fsm, self.shares_dir)69 self.fsm, self.shares_dir)
67 self.share_path = self.share.path70 self.share_path = self.share.path
6871
69 def tearDown(self):72 def tearDown(self):
70 """ Clean up the tests. """73 """ Clean up the tests. """
74 self.eq.shutdown()
71 self.rmtree(TESTS_DIR)75 self.rmtree(TESTS_DIR)
7276
73 @staticmethod77 @staticmethod
@@ -2090,7 +2094,7 @@
2090 os.chmod(os.path.join(dirpath, dir), 0777)2094 os.chmod(os.path.join(dirpath, dir), 0777)
2091 for file in files:2095 for file in files:
2092 os.chmod(os.path.join(dirpath, file), 0666)2096 os.chmod(os.path.join(dirpath, file), 0666)
2093 shutil.rmtree(TESTS_DIR)2097 FSMTestCase.tearDown(self)
20942098
2095 def test_file_ro_share_fail(self):2099 def test_file_ro_share_fail(self):
2096 """ Test that manual creation of a file, fails on a ro-share. """2100 """ Test that manual creation of a file, fails on a ro-share. """
@@ -2241,21 +2245,25 @@
2241 FSMTestCase.setUp(self)2245 FSMTestCase.setUp(self)
2242 # create a ro share2246 # create a ro share
2243 self.share_ro = self.create_share('share_ro', 'share_ro_name',2247 self.share_ro = self.create_share('share_ro', 'share_ro_name',
2244 self.fsm, self.shares_dir, access_level='View')2248 self.fsm, self.shares_dir,
2249 access_level='View')
2245 self.share_ro_path = self.share_ro.path2250 self.share_ro_path = self.share_ro.path
22462251
2247 def test_write_in_ro_share(self):2252 def test_write_in_ro_share(self):
2248 """test the EnableShareWrite context manager in a ro share"""2253 """test the EnableShareWrite context manager in a ro share"""
2249 path = os.path.join(self.share_ro_path, 'foo', 'a_file_in_a_ro_share')2254 path = os.path.join(self.share_ro_path, 'foo', 'a_file_in_a_ro_share')
2250 data = 'yes I can write!'2255 data = 'yes I can write!'
2251 can_write_parent = os.access(os.path.dirname(self.share_ro_path), os.W_OK)2256 can_write_parent = os.access(os.path.dirname(self.share_ro_path),
2257 os.W_OK)
2252 with EnableShareWrite(self.share_ro, path):2258 with EnableShareWrite(self.share_ro, path):
2253 with open(path, 'w') as f:2259 with open(path, 'w') as f:
2254 f.write(data)2260 f.write(data)
2255 self.assertEquals(data, open(path, 'r').read())2261 self.assertEquals(data, open(path, 'r').read())
2256 self.assertFalse(os.access(self.share_ro_path, os.W_OK))2262 self.assertFalse(os.access(self.share_ro_path, os.W_OK))
2257 # check that the parent permissions are ok2263 # check that the parent permissions are ok
2258 self.assertEquals(can_write_parent, os.access(os.path.dirname(self.share_ro_path), os.W_OK))2264 self.assertEquals(can_write_parent,
2265 os.access(os.path.dirname(self.share_ro_path),
2266 os.W_OK))
2259 # fail to write directly in the share2267 # fail to write directly in the share
2260 self.assertRaises(IOError, open, path, 'w')2268 self.assertRaises(IOError, open, path, 'w')
22612269
@@ -2291,7 +2299,8 @@
2291 os.makedirs(self.root_dir)2299 os.makedirs(self.root_dir)
2292 self.data_dir = os.path.join(TESTS_DIR, "data")2300 self.data_dir = os.path.join(TESTS_DIR, "data")
2293 self.partials_dir = os.path.join(TESTS_DIR, "partials")2301 self.partials_dir = os.path.join(TESTS_DIR, "partials")
2294 self.main = FakeMain(self.root_dir, self.shares_dir, self.data_dir, self.partials_dir)2302 self.main = FakeMain(self.root_dir, self.shares_dir,
2303 self.data_dir, self.partials_dir)
2295 self.fsm = self.main.fs2304 self.fsm = self.main.fs
2296 self.share = self.create_share('share', 'share_name',2305 self.share = self.create_share('share', 'share_name',
2297 self.fsm, self.shares_dir)2306 self.fsm, self.shares_dir)
@@ -2300,7 +2309,7 @@
2300 def tearDown(self):2309 def tearDown(self):
2301 """ Clean up the tests. """2310 """ Clean up the tests. """
2302 self.main.shutdown()2311 self.main.shutdown()
2303 FSMTestCase.tearDown(self)2312 self.rmtree(TESTS_DIR)
23042313
23052314
2306 @staticmethod2315 @staticmethod
23072316
=== modified file 'tests/syncdaemon/test_localrescan.py'
--- tests/syncdaemon/test_localrescan.py 2009-09-22 15:19:25 +0000
+++ tests/syncdaemon/test_localrescan.py 2009-10-15 15:40:24 +0000
@@ -46,9 +46,10 @@
46 '''Store stuff as pushed.'''46 '''Store stuff as pushed.'''
47 self.pushed.append((event, path))47 self.pushed.append((event, path))
4848
49 def freeze_begin(self, *a):49 def _fake(self, *a):
50 '''fake'''50 '''fake'''
51 inotify_add_watch = inotify_has_watch = freeze_rollback = is_frozen = freeze_begin51 inotify_add_watch = inotify_has_watch = freeze_rollback = is_frozen = _fake
52 freeze_begin = add_to_mute_filter = _fake
5253
53 def freeze_commit(self, events):54 def freeze_commit(self, events):
54 '''just store events'''55 '''just store events'''
@@ -88,6 +89,7 @@
88 self.vm)89 self.vm)
89 self.fsm.create(usrdir, "")90 self.fsm.create(usrdir, "")
90 self.eq = FakeEQ()91 self.eq = FakeEQ()
92 self.fsm.register_eq(self.eq)
91 self.aq = FakeAQ()93 self.aq = FakeAQ()
9294
93 def tearDown(self):95 def tearDown(self):
9496
=== modified file 'tests/syncdaemon/test_sync.py'
--- tests/syncdaemon/test_sync.py 2009-09-01 20:35:36 +0000
+++ tests/syncdaemon/test_sync.py 2009-10-15 15:40:24 +0000
@@ -38,6 +38,7 @@
38from ubuntuone.syncdaemon.main import Main38from ubuntuone.syncdaemon.main import Main
39from ubuntuone.syncdaemon.sync import FSKey39from ubuntuone.syncdaemon.sync import FSKey
40from ubuntuone.syncdaemon.volume_manager import Share40from ubuntuone.syncdaemon.volume_manager import Share
41from ubuntuone.syncdaemon.event_queue import EventQueue
4142
42DBusInterface.test = True43DBusInterface.test = True
4344
@@ -57,12 +58,15 @@
57 os.makedirs(self.partials_dir)58 os.makedirs(self.partials_dir)
58 self.fsm = FileSystemManager(self.fsmdir, self.partials_dir,59 self.fsm = FileSystemManager(self.fsmdir, self.partials_dir,
59 FakeVolumeManager(self.root_dir))60 FakeVolumeManager(self.root_dir))
61 self.eq = EventQueue(self.fsm)
62 self.fsm.register_eq(self.eq)
60 self.share = self.create_share('share', 'share_name',63 self.share = self.create_share('share', 'share_name',
61 self.fsm, self.shares_dir)64 self.fsm, self.shares_dir)
62 self.share_path = self.share.path65 self.share_path = self.share.path
6366
64 def tearDown(self):67 def tearDown(self):
65 """ Clean up the test dir"""68 """ Clean up the test dir"""
69 self.eq.shutdown()
66 shutil.rmtree(self.test_dir)70 shutil.rmtree(self.test_dir)
6771
68 @staticmethod72 @staticmethod
6973
=== modified file 'ubuntuone/syncdaemon/event_queue.py'
--- ubuntuone/syncdaemon/event_queue.py 2009-09-30 18:02:18 +0000
+++ ubuntuone/syncdaemon/event_queue.py 2009-10-15 15:40:24 +0000
@@ -149,6 +149,35 @@
149149
150DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)150DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
151151
152
153class MuteFilter(object):
154 '''Stores what needs to be muted.'''
155 def __init__(self):
156 self._cnt = {}
157 self.log = logging.getLogger('ubuntuone.SyncDaemon.MuteFilter')
158
159 def add(self, element):
160 '''Adds and element to the filter.'''
161 self.log.debug("Adding: %s", element)
162 self._cnt[element] = self._cnt.get(element, 0) + 1
163
164 def pop(self, element):
165 '''Pops an element from the filter, if there, and returns if it was.'''
166 if element not in self._cnt:
167 return False
168
169 self._cnt[element] = self._cnt.get(element, 0) - 1
170 if not self._cnt[element]:
171 # reached zero
172 del self._cnt[element]
173
174 # log what happened and how many items we have left
175 q = sum(self._cnt.itervalues())
176 self.log.debug("Blocking %s (%d left)", element, q)
177
178 return True
179
180
152class _INotifyProcessor(pyinotify.ProcessEvent):181class _INotifyProcessor(pyinotify.ProcessEvent):
153 '''Helper class that is called from inpotify when an event happens.182 '''Helper class that is called from inpotify when an event happens.
154183
@@ -162,6 +191,24 @@
162 self.timer = None191 self.timer = None
163 self.frozen_path = None192 self.frozen_path = None
164 self.frozen_evts = False193 self.frozen_evts = False
194 self._to_mute = MuteFilter()
195
196 def add_to_mute_filter(self, event, *paths):
197 '''Add an event and path(s) to the mute filter.'''
198 # all events have one path except the MOVEs
199 if event in ("FS_FILE_MOVE", "FS_DIR_MOVE"):
200 f_path, t_path = paths
201 is_from_forreal = not self.is_ignored(f_path)
202 is_to_forreal = not self.is_ignored(t_path)
203 if is_from_forreal and is_to_forreal:
204 self._to_mute.add((event, f_path, t_path))
205 elif is_to_forreal:
206 self._to_mute.add(('FS_FILE_CREATE', t_path))
207 self._to_mute.add(('FS_FILE_CLOSE_WRITE', t_path))
208 else:
209 path = paths[0]
210 if not self.is_ignored(path):
211 self._to_mute.add((event, path))
165212
166 def on_timeout(self):213 def on_timeout(self):
167 '''Called on timeout.'''214 '''Called on timeout.'''
@@ -174,7 +221,7 @@
174 try:221 try:
175 self.timer.cancel()222 self.timer.cancel()
176 except error.AlreadyCalled:223 except error.AlreadyCalled:
177 # self.timeout() was *just* called, do noting here224 # self.timeout() was *just* called, do nothing here
178 return225 return
179 self.push_event(self.held_event)226 self.push_event(self.held_event)
180 self.held_event = None227 self.held_event = None
@@ -234,7 +281,7 @@
234 try:281 try:
235 self.timer.cancel()282 self.timer.cancel()
236 except error.AlreadyCalled:283 except error.AlreadyCalled:
237 # self.timeout() was *just* called, do noting here284 # self.timeout() was *just* called, do nothing here
238 pass285 pass
239 else:286 else:
240 f_path = os.path.join(self.held_event.path,287 f_path = os.path.join(self.held_event.path,
@@ -255,17 +302,17 @@
255 evtname = "FS_FILE_"302 evtname = "FS_FILE_"
256 if f_share_id != t_share_id:303 if f_share_id != t_share_id:
257 # if the share_id are != push a delete/create304 # if the share_id are != push a delete/create
258 self.eq.push(evtname+"DELETE", f_path)305 self.eq_push(evtname+"DELETE", f_path)
259 self.eq.push(evtname+"CREATE", t_path)306 self.eq_push(evtname+"CREATE", t_path)
260 if not this_is_a_dir:307 if not this_is_a_dir:
261 self.eq.push('FS_FILE_CLOSE_WRITE', t_path)308 self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
262 else:309 else:
263 self.eq.push(evtname+"MOVE", f_path, t_path)310 self.eq_push(evtname+"MOVE", f_path, t_path)
264 elif is_to_forreal:311 elif is_to_forreal:
265 # this is the case of a MOVE from something ignored312 # this is the case of a MOVE from something ignored
266 # to a valid filename313 # to a valid filename
267 self.eq.push('FS_FILE_CREATE', t_path)314 self.eq_push('FS_FILE_CREATE', t_path)
268 self.eq.push('FS_FILE_CLOSE_WRITE', t_path)315 self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
269316
270 self.held_event = None317 self.held_event = None
271 return318 return
@@ -279,7 +326,12 @@
279 self.push_event(event)326 self.push_event(event)
280 t_path = os.path.join(event.path, event.name)327 t_path = os.path.join(event.path, event.name)
281 if not os.path.isdir(t_path):328 if not os.path.isdir(t_path):
282 self.eq.push('FS_FILE_CLOSE_WRITE', t_path)329 self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
330
331 def eq_push(self, *event_data):
332 '''Sends to EQ the event data, maybe filtering it.'''
333 if not self._to_mute.pop(event_data):
334 self.eq.push(*event_data)
283335
284 def process_default(self, event):336 def process_default(self, event):
285 '''Push the event into the EventQueue.'''337 '''Push the event into the EventQueue.'''
@@ -313,7 +365,7 @@
313 if not self.is_ignored(fullpath):365 if not self.is_ignored(fullpath):
314 if evt_name == 'FS_DIR_DELETE':366 if evt_name == 'FS_DIR_DELETE':
315 self.handle_dir_delete(fullpath)367 self.handle_dir_delete(fullpath)
316 self.eq.push(evt_name, fullpath)368 self.eq_push(evt_name, fullpath)
317369
318 def freeze_begin(self, path):370 def freeze_begin(self, path):
319 """Puts in hold all the events for this path."""371 """Puts in hold all the events for this path."""
@@ -346,7 +398,7 @@
346 # push the received events398 # push the received events
347 for evt_name, path in events:399 for evt_name, path in events:
348 if not self.is_ignored(path):400 if not self.is_ignored(path):
349 self.eq.push(evt_name, path)401 self.eq_push(evt_name, path)
350402
351 self.frozen_path = None403 self.frozen_path = None
352 self.frozen_evts = False404 self.frozen_evts = False
@@ -360,9 +412,9 @@
360 if path == fullpath:412 if path == fullpath:
361 continue413 continue
362 if is_dir:414 if is_dir:
363 self.eq.push('FS_DIR_DELETE', path)415 self.eq_push('FS_DIR_DELETE', path)
364 else:416 else:
365 self.eq.push('FS_FILE_DELETE', path)417 self.eq_push('FS_FILE_DELETE', path)
366418
367419
368class EventQueue(object):420class EventQueue(object):
@@ -384,6 +436,10 @@
384 self.dispatch_queue = Queue()436 self.dispatch_queue = Queue()
385 self.empty_event_queue_callbacks = set()437 self.empty_event_queue_callbacks = set()
386438
439 def add_to_mute_filter(self, *info):
440 '''Adds info to mute filter in the processor.'''
441 self._processor.add_to_mute_filter(*info)
442
387 def add_empty_event_queue_callback(self, callback):443 def add_empty_event_queue_callback(self, callback):
388 """add a callback for when the even queue has no more events."""444 """add a callback for when the even queue has no more events."""
389 self.empty_event_queue_callbacks.add(callback)445 self.empty_event_queue_callbacks.add(callback)
390446
=== modified file 'ubuntuone/syncdaemon/filesystem_manager.py'
--- ubuntuone/syncdaemon/filesystem_manager.py 2009-10-13 16:03:54 +0000
+++ ubuntuone/syncdaemon/filesystem_manager.py 2009-10-15 15:40:24 +0000
@@ -221,6 +221,7 @@
221 cache_compact_threshold=4)221 cache_compact_threshold=4)
222 self.shares = {}222 self.shares = {}
223 self.vm = vm223 self.vm = vm
224 self.eq = None # this will be registered later
224225
225 # create the indexes226 # create the indexes
226 self._idx_path = {}227 self._idx_path = {}
@@ -244,9 +245,14 @@
244 logger("initialized: idx_path: %s, idx_node_id: %s, shares: %s" %245 logger("initialized: idx_path: %s, idx_node_id: %s, shares: %s" %
245 (len(self._idx_path), len(self._idx_node_id), len(self.shares)))246 (len(self._idx_path), len(self._idx_node_id), len(self.shares)))
246247
248 def register_eq(self, eq):
249 '''Registers an EventQueue here.'''
250 self.eq = eq
251
247 def _safe_fs_iteritems(self):252 def _safe_fs_iteritems(self):
248 """returns a 'safe' iterator over the items of the FileShelf.253 """Returns a 'safe' iterator over the items of the FileShelf.
249 it's 'safe' because broked metadata objects are deleted and not254
255 It's 'safe' because broken metadata objects are deleted and not
250 returned.256 returned.
251 """257 """
252 def safeget_mdobj(mdid):258 def safeget_mdobj(mdid):
@@ -566,6 +572,11 @@
566 # pylint: disable-msg=W0704572 # pylint: disable-msg=W0704
567 try:573 try:
568 with contextlib.nested(from_context, to_context):574 with contextlib.nested(from_context, to_context):
575 if mdobj["is_dir"]:
576 expected_event = "FS_DIR_MOVE"
577 else:
578 expected_event = "FS_FILE_MOVE"
579 self.eq.add_to_mute_filter(expected_event, path_from, path_to)
569 shutil.move(path_from, path_to)580 shutil.move(path_from, path_to)
570 except IOError, e:581 except IOError, e:
571 # file was not yet created582 # file was not yet created
@@ -652,8 +663,10 @@
652 try:663 try:
653 with self._enable_share_write(mdobj['share_id'], path):664 with self._enable_share_write(mdobj['share_id'], path):
654 if self.is_dir(path=path):665 if self.is_dir(path=path):
666 self.eq.add_to_mute_filter("FS_DIR_DELETE", path)
655 os.rmdir(path)667 os.rmdir(path)
656 else:668 else:
669 self.eq.add_to_mute_filter("FS_FILE_DELETE", path)
657 os.remove(path)670 os.remove(path)
658 except OSError, e:671 except OSError, e:
659 if e.errno == errno.ENOTEMPTY:672 if e.errno == errno.ENOTEMPTY:
@@ -674,6 +687,11 @@
674 to_path = base_to_path + "." + str(ind)687 to_path = base_to_path + "." + str(ind)
675 with self._enable_share_write(mdobj['share_id'], path):688 with self._enable_share_write(mdobj['share_id'], path):
676 try:689 try:
690 if mdobj["is_dir"]:
691 expected_event = "FS_DIR_MOVE"
692 else:
693 expected_event = "FS_FILE_MOVE"
694 self.eq.add_to_mute_filter(expected_event, path, to_path)
677 os.rename(path, to_path)695 os.rename(path, to_path)
678 except OSError, e:696 except OSError, e:
679 if e.errno == errno.ENOENT:697 if e.errno == errno.ENOENT:
@@ -739,7 +757,10 @@
739 with self._enable_share_write(share_id, os.path.dirname(path)):757 with self._enable_share_write(share_id, os.path.dirname(path)):
740 # if we don't have the dir yet, create it758 # if we don't have the dir yet, create it
741 if is_dir and not os.path.exists(path):759 if is_dir and not os.path.exists(path):
760 self.eq.add_to_mute_filter("FS_DIR_CREATE", path)
742 os.mkdir(path)761 os.mkdir(path)
762
763 # don't alert EQ, partials are in other directory, not watched
743 open(partial_path, "w").close()764 open(partial_path, "w").close()
744 mdobj["info"]["last_partial_created"] = time.time()765 mdobj["info"]["last_partial_created"] = time.time()
745 mdobj["info"]["is_partial"] = True766 mdobj["info"]["is_partial"] = True
@@ -786,6 +807,8 @@
786807
787 partial_path = self._get_partial_path(mdobj)808 partial_path = self._get_partial_path(mdobj)
788 with self._enable_share_write(share_id, path):809 with self._enable_share_write(share_id, path):
810 self.eq.add_to_mute_filter("FS_FILE_CREATE", path)
811 self.eq.add_to_mute_filter("FS_FILE_CLOSE_WRITE", path)
789 shutil.move(partial_path, path)812 shutil.move(partial_path, path)
790 mdobj["local_hash"] = local_hash813 mdobj["local_hash"] = local_hash
791 mdobj["info"]["last_downloaded"] = time.time()814 mdobj["info"]["last_downloaded"] = time.time()
@@ -805,6 +828,7 @@
805 partial_path = self._get_partial_path(mdobj)828 partial_path = self._get_partial_path(mdobj)
806 #pylint: disable-msg=W0704829 #pylint: disable-msg=W0704
807 try:830 try:
831 # don't alert EQ, partials are in other directory, not watched
808 os.remove(partial_path)832 os.remove(partial_path)
809 except OSError, e:833 except OSError, e:
810 # we only remove it if its there.834 # we only remove it if its there.
811835
=== modified file 'ubuntuone/syncdaemon/main.py'
--- ubuntuone/syncdaemon/main.py 2009-10-09 08:19:05 +0000
+++ ubuntuone/syncdaemon/main.py 2009-10-15 15:40:24 +0000
@@ -109,6 +109,7 @@
109 self.partials_dir,109 self.partials_dir,
110 self.vm)110 self.vm)
111 self.event_q = event_queue.EventQueue(self.fs)111 self.event_q = event_queue.EventQueue(self.fs)
112 self.fs.register_eq(self.event_q)
112 self.oauth_client = OAuthClient(self.realm)113 self.oauth_client = OAuthClient(self.realm)
113 self.state = SyncDaemonStateManager(self, handshake_timeout,114 self.state = SyncDaemonStateManager(self, handshake_timeout,
114 max_handshake_timeouts)115 max_handshake_timeouts)

Subscribers

People subscribed via source and target branches