Merge lp:~diegosarmentero/ubuntuone-client/darwin2-fsevents into lp:ubuntuone-client

Proposed by Diego Sarmentero
Status: Merged
Approved by: Alejandro J. Cura
Approved revision: 1268
Merged at revision: 1267
Proposed branch: lp:~diegosarmentero/ubuntuone-client/darwin2-fsevents
Merge into: lp:ubuntuone-client
Prerequisite: lp:~diegosarmentero/ubuntuone-client/darwin-fsevents-1
Diff against target: 912 lines (+376/-280)
4 files modified
tests/platform/filesystem_notifications/test_windows.py (+3/-3)
ubuntuone/platform/filesystem_notifications/__init__.py (+1/-0)
ubuntuone/platform/filesystem_notifications/common.py (+64/-277)
ubuntuone/platform/filesystem_notifications/windows.py (+308/-0)
To merge this branch: bzr merge lp:~diegosarmentero/ubuntuone-client/darwin2-fsevents
Reviewer Review Type Date Requested Status
Alejandro J. Cura (community) Approve
Manuel de la Peña (community) Approve
Review via email: mp+111427@code.launchpad.net

Commit message

- Refactoring windows fsevents implementation, to be usable from the future darwin one (LP: #1013323).

To post a comment you must log in.
Revision history for this message
Alejandro J. Cura (alecu) wrote :

The implementation in add_watches_to_udf_ancestors is *very* specific to windows, so it needs to be moved to windows.py

---

FILESYSTEM_MONITOR_MASK gets broken in this branch because filesystem_monitor_mask seems to always be None.

---

All of this would break on darwin:
    is_valid_syncdaemon_path = None
    is_valid_os_path = None
    os_path = windowspath = None

So everything under:
   elif sys.platform == "darwin":

should come on a following branch.

---

Please rename "reference" to something less vague, and move each definition to the platform specific files.

review: Needs Fixing
1263. By Diego Sarmentero

Updating branch

Revision history for this message
Diego Sarmentero (diegosarmentero) wrote :

> The implementation in add_watches_to_udf_ancestors is *very* specific to
> windows, so it needs to be moved to windows.py
>

Moved to windows, we will need to implement later for darwin something here, but not just returning True as in windows.

> ---
>
> FILESYSTEM_MONITOR_MASK gets broken in this branch because
> filesystem_monitor_mask seems to always be None.
>

This is not broken, the mask is assigned in windows.py, and for darwin i'm using it as None, so i can share more common code in common.py.

> ---
>
> All of this would break on darwin:
> is_valid_syncdaemon_path = None
> is_valid_os_path = None
> os_path = windowspath = None
>
> So everything under:
> elif sys.platform == "darwin":
>
> should come on a following branch.
>
> ---
>
> Please rename "reference" to something less vague, and move each definition to
> the platform specific files.

Done

Revision history for this message
Alejandro J. Cura (alecu) wrote :

> > FILESYSTEM_MONITOR_MASK gets broken in this branch because
> > filesystem_monitor_mask seems to always be None.
> >
>
> This is not broken, the mask is assigned in windows.py, and for darwin i'm
> using it as None, so i can share more common code in common.py.

You are right! Sorry about that :P

Revision history for this message
Alejandro J. Cura (alecu) wrote :

This comment is *Very* specific to windows:

        # On windows there is no need to add the watches to the ancestors
        # so we will always return true. The reason for this is that the
        # handles that we open stop the user from renaming the ancestors of
        # the UDF, for a user to do that he has to unsync the udf first

This too:
    For Windows:
    Also, they must not be literal paths, that is the \\?\ prefix should not be
    in the path.

Both should be in windows.py.

review: Needs Fixing
1264. By Diego Sarmentero

merge

1265. By Diego Sarmentero

updates

1266. By Diego Sarmentero

adding not implemented error

1267. By Diego Sarmentero

moving event codes to filesystem notification init

Revision history for this message
Diego Sarmentero (diegosarmentero) wrote :

Branch fixed.
I added the event codes in the filesystem_notifications/__init__.py so we can create where it actually does this os dependent thing, and we can grab it from anywhere, because implementing it as a class attribute was breaking a lot of things.

Also i remove the if sys.platform == "win32": import decorators
on common, and in the future branch, when i add the darwin part, i'll make os_helper import the proper decorator based on the platform as it should be.

Revision history for this message
Manuel de la Peña (mandel) :
review: Approve
Revision history for this message
Alejandro J. Cura (alecu) wrote :

This will probably break on darwin, where fs paths are utf-8 bytes, so it should go from common.py to windows.py:

473 if not isinstance(path, unicode):

----

The comment that starts with:
        # We are using this on windows and darwin.
        # For windows the logic is as follow:
        # ....

is still crap. The whole 9 lines of it, so it's not completely your fault, but it's crap.
Let's rewrite it with something simpler like this:

# We need to manually check if the path is a folder, because
# neither ReadDirectoryChangesW nor the FSEvents API tell us

Also the docstring in that function lies about "update the local subdir list".

---

review: Needs Fixing
1268. By Diego Sarmentero

fixing comment

Revision history for this message
Diego Sarmentero (diegosarmentero) wrote :

> This will probably break on darwin, where fs paths are utf-8 bytes, so it
> should go from common.py to windows.py:
>
> 473 if not isinstance(path, unicode):
>
> ----
>
> The comment that starts with:
> # We are using this on windows and darwin.
> # For windows the logic is as follow:
> # ....
>
> is still crap. The whole 9 lines of it, so it's not completely your fault, but
> it's crap.
> Let's rewrite it with something simpler like this:
>
> # We need to manually check if the path is a folder, because
> # neither ReadDirectoryChangesW nor the FSEvents API tell us
>
> Also the docstring in that function lies about "update the local subdir list".
>
> ---

The comment has been fixed, and the isinstance thing is fixed in the next branch which involves actually the darwin things as we talk.

Revision history for this message
Alejandro J. Cura (alecu) wrote :

Code looks fine. Ran tests on Precise and windows, and they all pass.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'tests/platform/filesystem_notifications/test_windows.py'
--- tests/platform/filesystem_notifications/test_windows.py 2012-05-14 21:24:24 +0000
+++ tests/platform/filesystem_notifications/test_windows.py 2012-06-26 19:23:19 +0000
@@ -51,6 +51,7 @@
51 IN_OPEN,51 IN_OPEN,
52)52)
53from ubuntuone.platform.filesystem_notifications import (53from ubuntuone.platform.filesystem_notifications import (
54 common,
54 windows as filesystem_notifications,55 windows as filesystem_notifications,
55)56)
56from ubuntuone.platform.filesystem_notifications.windows import (57from ubuntuone.platform.filesystem_notifications.windows import (
@@ -65,12 +66,11 @@
65 FILE_NOTIFY_CHANGE_LAST_WRITE,66 FILE_NOTIFY_CHANGE_LAST_WRITE,
66 FILE_NOTIFY_CHANGE_SECURITY,67 FILE_NOTIFY_CHANGE_SECURITY,
67 FILE_NOTIFY_CHANGE_LAST_ACCESS,68 FILE_NOTIFY_CHANGE_LAST_ACCESS,
68 WINDOWS_ACTIONS_NAMES,
69)69)
7070
71#create a rever mapping to use it in the tests.71#create a rever mapping to use it in the tests.
72REVERSE_WINDOWS_ACTIONS = {}72REVERSE_WINDOWS_ACTIONS = {}
73for key, value in filesystem_notifications.WINDOWS_ACTIONS.iteritems():73for key, value in common.COMMON_ACTIONS.iteritems():
74 REVERSE_WINDOWS_ACTIONS[value] = key74 REVERSE_WINDOWS_ACTIONS[value] = key
7575
7676
@@ -147,7 +147,7 @@
147 # group all events in a single lists which is not what the COM API147 # group all events in a single lists which is not what the COM API
148 # does.148 # does.
149 str_events = [149 str_events = [
150 (WINDOWS_ACTIONS_NAMES[action], path) for action, path in150 (common.COMMON_ACTIONS_NAMES[action], path) for action, path in
151 events]151 events]
152 self.raw_events.append(str_events)152 self.raw_events.append(str_events)
153 return events153 return events
154154
=== modified file 'ubuntuone/platform/filesystem_notifications/__init__.py'
--- ubuntuone/platform/filesystem_notifications/__init__.py 2012-05-16 16:24:23 +0000
+++ ubuntuone/platform/filesystem_notifications/__init__.py 2012-06-26 19:23:19 +0000
@@ -32,6 +32,7 @@
3232
3333
34if sys.platform == "win32":34if sys.platform == "win32":
35 EVENT_CODES = [1, 2, 3, 4, 5]
35 from ubuntuone.platform.filesystem_notifications import windows36 from ubuntuone.platform.filesystem_notifications import windows
36 FilesystemMonitor = windows.FilesystemMonitor37 FilesystemMonitor = windows.FilesystemMonitor
37 _GeneralINotifyProcessor = windows.NotifyProcessor38 _GeneralINotifyProcessor = windows.NotifyProcessor
3839
=== renamed file 'ubuntuone/platform/filesystem_notifications/windows.py' => 'ubuntuone/platform/filesystem_notifications/common.py'
--- ubuntuone/platform/filesystem_notifications/windows.py 2012-05-14 21:24:24 +0000
+++ ubuntuone/platform/filesystem_notifications/common.py 2012-06-26 19:23:19 +0000
@@ -26,41 +26,14 @@
26# do not wish to do so, delete this exception statement from your26# do not wish to do so, delete this exception statement from your
27# version. If you delete this exception statement from all source27# version. If you delete this exception statement from all source
28# files in the program, then also delete it here.28# files in the program, then also delete it here.
29"""File notifications on windows."""29"""Generic File notifications."""
3030
31import logging31import logging
32import os32import os
3333
34from uuid import uuid434from twisted.internet import defer
3535
36from twisted.internet import defer, reactor36from ubuntuone.platform.filesystem_notifications import EVENT_CODES
37from twisted.python.failure import Failure
38from pywintypes import OVERLAPPED
39from win32api import CloseHandle
40from win32con import (
41 FILE_SHARE_READ,
42 FILE_SHARE_WRITE,
43 FILE_FLAG_BACKUP_SEMANTICS,
44 FILE_NOTIFY_CHANGE_FILE_NAME,
45 FILE_NOTIFY_CHANGE_DIR_NAME,
46 FILE_NOTIFY_CHANGE_ATTRIBUTES,
47 FILE_NOTIFY_CHANGE_SIZE,
48 FILE_NOTIFY_CHANGE_LAST_WRITE,
49 FILE_NOTIFY_CHANGE_SECURITY,
50 OPEN_EXISTING)
51from win32file import (
52 AllocateReadBuffer,
53 CreateFileW,
54 GetOverlappedResult,
55 ReadDirectoryChangesW,
56 FILE_FLAG_OVERLAPPED,
57 FILE_NOTIFY_INFORMATION)
58from win32event import (
59 CreateEvent,
60 INFINITE,
61 SetEvent,
62 WaitForMultipleObjects,
63 WAIT_OBJECT_0)
64from ubuntuone.platform.filesystem_notifications.pyinotify_agnostic import (37from ubuntuone.platform.filesystem_notifications.pyinotify_agnostic import (
65 Event,38 Event,
66 WatchManagerError,39 WatchManagerError,
@@ -75,40 +48,32 @@
75 IN_MOVED_FROM,48 IN_MOVED_FROM,
76 IN_MOVED_TO,49 IN_MOVED_TO,
77 IN_MODIFY)50 IN_MODIFY)
51
52from ubuntuone import logger
53
78from ubuntuone.platform.os_helper.windows import (54from ubuntuone.platform.os_helper.windows import (
79 is_valid_syncdaemon_path,55 is_valid_syncdaemon_path,
80 is_valid_windows_path,56 is_valid_windows_path as is_valid_os_path,
81 get_syncdaemon_valid_path,57 windowspath as os_path,
82 windowspath,
83)58)
84from ubuntuone import logger59
8560# a map between the few events that we have on common platforms and those
86# our logging level
87TRACE = logger.TRACE
88
89# constant found in the msdn documentation:
90# http://msdn.microsoft.com/en-us/library/ff538834(v=vs.85).aspx
91FILE_LIST_DIRECTORY = 0x0001
92FILE_NOTIFY_CHANGE_LAST_ACCESS = 0x00000020
93FILE_NOTIFY_CHANGE_CREATION = 0x00000040
94
95# a map between the few events that we have on windows and those
96# found in pyinotify61# found in pyinotify
97WINDOWS_ACTIONS = {62COMMON_ACTIONS = {
98 1: IN_CREATE,63 EVENT_CODES[0]: IN_CREATE,
99 2: IN_DELETE,64 EVENT_CODES[1]: IN_DELETE,
100 3: IN_MODIFY,65 EVENT_CODES[2]: IN_MODIFY,
101 4: IN_MOVED_FROM,66 EVENT_CODES[3]: IN_MOVED_FROM,
102 5: IN_MOVED_TO,67 EVENT_CODES[4]: IN_MOVED_TO,
103}68}
10469
105# a map of the actions to names so that we have better logs.70# a map of the actions to names so that we have better logs.
106WINDOWS_ACTIONS_NAMES = {71COMMON_ACTIONS_NAMES = {
107 1: 'IN_CREATE',72 EVENT_CODES[0]: 'IN_CREATE',
108 2: 'IN_DELETE',73 EVENT_CODES[1]: 'IN_DELETE',
109 3: 'IN_MODIFY',74 EVENT_CODES[2]: 'IN_MODIFY',
110 4: 'IN_MOVED_FROM',75 EVENT_CODES[3]: 'IN_MOVED_FROM',
111 5: 'IN_MOVED_TO',76 EVENT_CODES[4]: 'IN_MOVED_TO',
112}77}
11378
114# translates quickly the event and it's is_dir state to our standard events79# translates quickly the event and it's is_dir state to our standard events
@@ -125,17 +90,8 @@
125 IN_MOVED_TO: 'FS_FILE_CREATE',90 IN_MOVED_TO: 'FS_FILE_CREATE',
126 IN_MOVED_TO | IN_ISDIR: 'FS_DIR_CREATE'}91 IN_MOVED_TO | IN_ISDIR: 'FS_DIR_CREATE'}
12792
128# the default mask to be used in the watches added by the FilesystemMonitor93# our logging level
129# class94TRACE = logger.TRACE
130FILESYSTEM_MONITOR_MASK = FILE_NOTIFY_CHANGE_FILE_NAME | \
131 FILE_NOTIFY_CHANGE_DIR_NAME | \
132 FILE_NOTIFY_CHANGE_ATTRIBUTES | \
133 FILE_NOTIFY_CHANGE_SIZE | \
134 FILE_NOTIFY_CHANGE_LAST_WRITE | \
135 FILE_NOTIFY_CHANGE_SECURITY | \
136 FILE_NOTIFY_CHANGE_LAST_ACCESS
137
138THREADPOOL_MAX = 20
13995
14096
141# The implementation of the code that is provided as the pyinotify substitute97# The implementation of the code that is provided as the pyinotify substitute
@@ -144,15 +100,11 @@
144100
145 def __init__(self, watch_descriptor, path, mask, auto_add, processor,101 def __init__(self, watch_descriptor, path, mask, auto_add, processor,
146 buf_size=8192):102 buf_size=8192):
147 super(Watch, self).__init__()103 self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.common.' +
148 self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.windows.' +
149 'filesystem_notifications.Watch')104 'filesystem_notifications.Watch')
150 self.log.setLevel(TRACE)105 self.log.setLevel(TRACE)
151 self._processor = processor106 self._processor = processor
152 self._buf_size = buf_size107 self._buf_size = buf_size
153 self._wait_stop = CreateEvent(None, 0, 0, None)
154 self._overlapped = OVERLAPPED()
155 self._overlapped.hEvent = CreateEvent(None, 0, 0, None)
156 self._watching = False108 self._watching = False
157 self._descriptor = watch_descriptor109 self._descriptor = watch_descriptor
158 self._auto_add = auto_add110 self._auto_add = auto_add
@@ -170,36 +122,25 @@
170 path += os.path.sep122 path += os.path.sep
171 self._path = os.path.abspath(path)123 self._path = os.path.abspath(path)
172 self._mask = mask124 self._mask = mask
173 # this deferred is fired when the watch has started monitoring
174 # a directory from a thread
175 self._watch_started_deferred = defer.Deferred()
176 # and this one is fired when the watch has stopped
177 self._watch_stopped_deferred = defer.Deferred()
178125
179 @is_valid_windows_path(path_indexes=[1])126 @is_valid_os_path(path_indexes=[1])
180 def _update_subdirs(self, path, event):127 def _update_subdirs(self, path, event):
181 """Adds the path to the internal subdirs.128 """Adds the path to the internal subdirs.
182129
183 The given path is considered to be a path and therefore this130 The given path is considered to be a path and therefore this
184 will not be checked.131 will not be checked.
185 """132 """
186 if WINDOWS_ACTIONS[event] == IN_CREATE:133 if COMMON_ACTIONS[event] == IN_CREATE:
187 self._subdirs.add(path)134 self._subdirs.add(path)
188 elif WINDOWS_ACTIONS[event] == IN_DELETE and\135 elif COMMON_ACTIONS[event] == IN_DELETE and path in self._subdirs:
189 path in self._subdirs:
190 self._subdirs.remove(path)136 self._subdirs.remove(path)
191137
192 @is_valid_windows_path(path_indexes=[1])138 @is_valid_os_path(path_indexes=[1])
193 def _path_is_dir(self, path):139 def _path_is_dir(self, path):
194 """Check if the path is a dir and update the local subdir list."""140 """Check if the path is a dir."""
195141
196 # The logic of this function is the following:142 # We need to manually check if the path is a folder, because
197 # 1. ReadDirectoryChangesW changes does not send if a path143 # neither ReadDirectoryChangesW nor the FSEvents API tell us
198 # is a new dir or not.
199 # 2. We keep track of subdirs that in self._subdir.
200 # 3. We check if a path is a dir by:
201 # * Asking the os if the path exists.
202 # * Finding the path in self._subdirs
203144
204 is_dir = False145 is_dir = False
205 if os.path.exists(path):146 if os.path.exists(path):
@@ -210,131 +151,7 @@
210 self.log.debug('Is path %r a dir? %s', path, is_dir)151 self.log.debug('Is path %r a dir? %s', path, is_dir)
211 return is_dir152 return is_dir
212153
213 def _process_events(self, events):154 @is_valid_os_path(path_indexes=[1])
214 """Process the events from the queue."""
215 # do not do it if we stop watching and the events are empty
216 if not self._watching:
217 return
218
219 # we transform the events to be the same as the one in pyinotify
220 # and then use the proc_fun
221 for action, file_name in events:
222 if any([file_name.startswith(path)
223 for path in self._ignore_paths]):
224 continue
225 # map the windows events to the pyinotify ones, tis is dirty but
226 # makes the multiplatform better, linux was first :P
227 syncdaemon_path = get_syncdaemon_valid_path(
228 os.path.join(self._path, file_name))
229 full_dir_path = os.path.join(self._path, file_name)
230 is_dir = self._path_is_dir(full_dir_path)
231 if is_dir:
232 # we need to update the list of subdirs that we have
233 self._update_subdirs(full_dir_path, action)
234 mask = WINDOWS_ACTIONS[action]
235 head, tail = os.path.split(file_name)
236 if is_dir:
237 mask |= IN_ISDIR
238 event_raw_data = {
239 'wd': self._descriptor,
240 'dir': is_dir,
241 'mask': mask,
242 'name': tail,
243 'path': '.'}
244 # by the way in which the win api fires the events we know for
245 # sure that no move events will be added in the wrong order, this
246 # is kind of hacky, I dont like it too much
247 if WINDOWS_ACTIONS[action] == IN_MOVED_FROM:
248 self._cookie = str(uuid4())
249 self._source_pathname = tail
250 event_raw_data['cookie'] = self._cookie
251 if WINDOWS_ACTIONS[action] == IN_MOVED_TO:
252 event_raw_data['src_pathname'] = self._source_pathname
253 event_raw_data['cookie'] = self._cookie
254 event = Event(event_raw_data)
255 # FIXME: event deduces the pathname wrong and we need to manually
256 # set it
257 event.pathname = syncdaemon_path
258 # add the event only if we do not have an exclude filter or
259 # the exclude filter returns False, that is, the event will not
260 # be excluded
261 self.log.debug('Pushing event %r to processor.', event)
262 self._processor(event)
263
264 def _call_deferred(self, f, *args):
265 """Executes the deferred call avoiding possible race conditions."""
266 if not self._watch_started_deferred.called:
267 f(*args)
268
269 def _watch_wrapper(self):
270 """Wrap _watch, and errback on any unhandled error."""
271 try:
272 self._watch()
273 except Exception:
274 reactor.callFromThread(self._call_deferred,
275 self._watch_started_deferred.errback, Failure())
276
277 def _watch(self):
278 """Watch a path that is a directory."""
279 self.log.debug('Adding watch for %r (exists? %r is dir? %r).',
280 self._path,
281 os.path.exists(self._path), os.path.isdir(self._path))
282 # we are going to be using the ReadDirectoryChangesW whihc requires
283 # a directory handle and the mask to be used.
284 self._watch_handle = CreateFileW(
285 self._path,
286 FILE_LIST_DIRECTORY,
287 FILE_SHARE_READ | FILE_SHARE_WRITE,
288 None,
289 OPEN_EXISTING,
290 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
291 None)
292
293 try:
294 self._watch_loop(self._watch_handle)
295 finally:
296 CloseHandle(self._watch_handle)
297 self._watch_handle = None
298 reactor.callFromThread(self.stopped.callback, True)
299
300 def _watch_loop(self, handle):
301 """The loop where we watch the directory."""
302 while True:
303 # important information to know about the parameters:
304 # param 1: the handle to the dir
305 # param 2: the size to be used in the kernel to store events
306 # that might be lost while the call is being performed. This
307 # is complicated to fine tune since if you make lots of watcher
308 # you migh used too much memory and make your OS to BSOD
309 buf = AllocateReadBuffer(self._buf_size)
310 ReadDirectoryChangesW(
311 handle,
312 buf,
313 self._auto_add,
314 self._mask,
315 self._overlapped,
316 )
317 if not self._watch_started_deferred.called:
318 reactor.callFromThread(self._call_deferred,
319 self._watch_started_deferred.callback, True)
320 # wait for an event and ensure that we either stop or read the
321 # data
322 rc = WaitForMultipleObjects((self._wait_stop,
323 self._overlapped.hEvent),
324 0, INFINITE)
325 if rc == WAIT_OBJECT_0:
326 # Stop event
327 break
328 # if we continue, it means that we got some data, lets read it
329 data = GetOverlappedResult(handle, self._overlapped, True)
330 # lets ead the data and store it in the results
331 events = FILE_NOTIFY_INFORMATION(buf, data)
332 self.log.debug('Got from ReadDirectoryChangesW %r.',
333 [(WINDOWS_ACTIONS_NAMES[action], path) for action, path in
334 events])
335 reactor.callFromThread(self._process_events, events)
336
337 @is_valid_windows_path(path_indexes=[1])
338 def ignore_path(self, path):155 def ignore_path(self, path):
339 """Add the path of the events to ignore."""156 """Add the path of the events to ignore."""
340 if not path.endswith(os.path.sep):157 if not path.endswith(os.path.sep):
@@ -343,7 +160,7 @@
343 path = path[len(self._path):]160 path = path[len(self._path):]
344 self._ignore_paths.append(path)161 self._ignore_paths.append(path)
345162
346 @is_valid_windows_path(path_indexes=[1])163 @is_valid_os_path(path_indexes=[1])
347 def remove_ignored_path(self, path):164 def remove_ignored_path(self, path):
348 """Reaccept path."""165 """Reaccept path."""
349 if not path.endswith(os.path.sep):166 if not path.endswith(os.path.sep):
@@ -363,16 +180,12 @@
363 # process the events.180 # process the events.
364 self.log.debug('Start watching path.')181 self.log.debug('Start watching path.')
365 self._watching = True182 self._watching = True
366 reactor.callInThread(self._watch_wrapper)
367 return self._watch_started_deferred
368183
369 def stop_watching(self):184 def stop_watching(self):
370 """Tell the watch to stop processing events."""185 """Tell the watch to stop processing events."""
371 self.log.info('Stop watching %s', self._path)186 self.log.info('Stop watching %s', self._path)
372 SetEvent(self._wait_stop)
373 self._watching = False187 self._watching = False
374 self._subdirs = set()188 self._subdirs = set()
375 return self.stopped
376189
377 def update(self, mask, auto_add=False):190 def update(self, mask, auto_add=False):
378 """Update the info used by the watcher."""191 """Update the info used by the watcher."""
@@ -389,29 +202,18 @@
389 def auto_add(self):202 def auto_add(self):
390 return self._auto_add203 return self._auto_add
391204
392 @property
393 def started(self):
394 """A deferred that will be called when the watch is running."""
395 return self._watch_started_deferred
396
397 @property
398 def stopped(self):
399 """A deferred fired when the watch thread has finished."""
400 return self._watch_stopped_deferred
401
402205
403class WatchManager(object):206class WatchManager(object):
404 """Implement the same functions as pyinotify.WatchManager.207 """Implement the same functions as pyinotify.WatchManager.
405208
406 All paths passed to methods in this class should be windows paths.209 All paths passed to methods in this class should be proper os paths.
407210
408 """211 """
409212
410 def __init__(self, processor):213 def __init__(self, processor):
411 """Init the manager to keep trak of the different watches."""214 """Init the manager to keep trak of the different watches."""
412 super(WatchManager, self).__init__()
413 self._processor = processor215 self._processor = processor
414 self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.windows.'216 self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.common.'
415 + 'filesystem_notifications.WatchManager')217 + 'filesystem_notifications.WatchManager')
416 self.log.setLevel(TRACE)218 self.log.setLevel(TRACE)
417 self._wdm = {}219 self._wdm = {}
@@ -420,12 +222,8 @@
420222
421 def stop(self):223 def stop(self):
422 """Close the manager and stop all watches."""224 """Close the manager and stop all watches."""
423 self.log.debug('Stopping watches.')225 # Should be implemented for each platform
424 wait_list = []226 raise NotImplementedError("Not implemented on this platform.")
425 for current_wd in self._wdm:
426 wait_list.append(self._wdm[current_wd].stop_watching())
427 self.log.debug('Stopping Watch on %r.', self._wdm[current_wd].path)
428 return defer.DeferredList(wait_list)
429227
430 def get_watch(self, wd):228 def get_watch(self, wd):
431 """Return the watch with the given descriptor."""229 """Return the watch with the given descriptor."""
@@ -442,8 +240,7 @@
442 except KeyError, e:240 except KeyError, e:
443 logging.error(str(e))241 logging.error(str(e))
444242
445 def _add_single_watch(self, path, mask, auto_add=False,243 def _add_single_watch(self, path, mask, auto_add=False, quiet=True):
446 quiet=True):
447 if path in self._ignored_paths:244 if path in self._ignored_paths:
448 # simply removed it from the filter245 # simply removed it from the filter
449 self._ignored_paths.remove(path)246 self._ignored_paths.remove(path)
@@ -452,23 +249,21 @@
452 self.log.debug('add_single_watch(%s, %s, %s, %s)', path, mask,249 self.log.debug('add_single_watch(%s, %s, %s, %s)', path, mask,
453 auto_add, quiet)250 auto_add, quiet)
454251
455 # adjust the number of threads based on the UDFs watched252 return self._adding_watch(path, mask, auto_add)
456 reactor.suggestThreadPoolSize(THREADPOOL_MAX + self._wd_count + 1)253
457 self._wdm[self._wd_count] = Watch(self._wd_count, path,254 def _adding_watch(self, path, mask, auto_add):
458 mask, auto_add, self._processor)255 """Add the watch to the dict and start it."""
459 d = self._wdm[self._wd_count].start_watching()256 # This should be implemented for each OS
460 self._wd_count += 1257 raise NotImplementedError("Not implemented on this platform.")
461 return d258
462259 @is_valid_os_path(path_indexes=[1])
463 @is_valid_windows_path(path_indexes=[1])260 def add_watch(self, path, mask, auto_add=False, quiet=True):
464 def add_watch(self, path, mask, auto_add=False,
465 quiet=True):
466 """Add a new path tp be watch.261 """Add a new path tp be watch.
467262
468 The method will ensure that the path is not already present.263 The method will ensure that the path is not already present.
469 """264 """
470 if not isinstance(path, unicode):265 if not isinstance(path, unicode):
471 e = NotImplementedError("No implementation on windows.")266 e = NotImplementedError("No implementation on this platform.")
472 return defer.fail(e)267 return defer.fail(e)
473 wd = self.get_wd(path)268 wd = self.get_wd(path)
474 if wd is None:269 if wd is None:
@@ -480,9 +275,9 @@
480275
481 def update_watch(self, wd, mask=None, rec=False,276 def update_watch(self, wd, mask=None, rec=False,
482 auto_add=False, quiet=True):277 auto_add=False, quiet=True):
483 raise NotImplementedError("Not implemented on windows.")278 raise NotImplementedError("Not implemented on this platform.")
484279
485 @is_valid_windows_path(path_indexes=[1])280 @is_valid_os_path(path_indexes=[1])
486 def get_wd(self, path):281 def get_wd(self, path):
487 """Return the watcher that is used to watch the given path."""282 """Return the watcher that is used to watch the given path."""
488 if not path.endswith(os.path.sep):283 if not path.endswith(os.path.sep):
@@ -512,7 +307,7 @@
512 if not quiet:307 if not quiet:
513 raise WatchManagerError('Watch %s was not found' % wd, {})308 raise WatchManagerError('Watch %s was not found' % wd, {})
514309
515 @is_valid_windows_path(path_indexes=[1])310 @is_valid_os_path(path_indexes=[1])
516 def rm_path(self, path):311 def rm_path(self, path):
517 """Remove a watch to the given path."""312 """Remove a watch to the given path."""
518 wd = self.get_wd(path)313 wd = self.get_wd(path)
@@ -527,9 +322,6 @@
527 This interface will be exposed to syncdaemon, ergo all passed322 This interface will be exposed to syncdaemon, ergo all passed
528 and returned paths must be a sequence of BYTES encoded with utf8.323 and returned paths must be a sequence of BYTES encoded with utf8.
529324
530 Also, they must not be literal paths, that is the \\?\ prefix should not be
531 in the path.
532
533 """325 """
534326
535 def __init__(self, monitor, ignore_config=None):327 def __init__(self, monitor, ignore_config=None):
@@ -549,13 +341,10 @@
549 """Add an event and path(s) to the mute filter."""341 """Add an event and path(s) to the mute filter."""
550 self.general_processor.add_to_mute_filter(event, paths)342 self.general_processor.add_to_mute_filter(event, paths)
551343
552 @is_valid_syncdaemon_path(path_indexes=[1])
553 def platform_is_ignored(self, path):344 def platform_is_ignored(self, path):
554 """Should we ignore this path in the current platform.?"""345 """Should we ignore this path in the current platform.?"""
555 # don't support links yet346 # This should be implemented in each platform
556 if path.endswith('.lnk'):347 raise NotImplementedError("Not implemented on this platform.")
557 return True
558 return False
559348
560 @is_valid_syncdaemon_path(path_indexes=[1])349 @is_valid_syncdaemon_path(path_indexes=[1])
561 def is_ignored(self, path):350 def is_ignored(self, path):
@@ -572,7 +361,7 @@
572 # lets ignore dir changes361 # lets ignore dir changes
573 if event.dir:362 if event.dir:
574 return363 return
575 # on windows we just get IN_MODIFY, lets always fake364 # on someplatforms we just get IN_MODIFY, lets always fake
576 # an OPEN & CLOSE_WRITE couple365 # an OPEN & CLOSE_WRITE couple
577 raw_open = raw_close = {366 raw_open = raw_close = {
578 'wd': event.wd,367 'wd': event.wd,
@@ -665,7 +454,7 @@
665 self.release_held_event()454 self.release_held_event()
666 self.general_processor.push_event(event)455 self.general_processor.push_event(event)
667 else:456 else:
668 # We should never get here on windows, I really do not know how we457 # We should never get here, I really do not know how we
669 # got here458 # got here
670 self.general_processor.log.warn(459 self.general_processor.log.warn(
671 'Cookie does not match the previoues held event!')460 'Cookie does not match the previoues held event!')
@@ -739,11 +528,12 @@
739528
740 def __init__(self, eq, fs, ignore_config=None, timeout=1):529 def __init__(self, eq, fs, ignore_config=None, timeout=1):
741 self.log = logging.getLogger('ubuntuone.SyncDaemon.FSMonitor')530 self.log = logging.getLogger('ubuntuone.SyncDaemon.FSMonitor')
531 self.filesystem_monitor_mask = None
742 self.log.setLevel(TRACE)532 self.log.setLevel(TRACE)
743 self.fs = fs533 self.fs = fs
744 self.eq = eq534 self.eq = eq
745 self._processor = NotifyProcessor(self, ignore_config)535 # You will need to create the NotifyProcessor and WatchManager
746 self._watch_manager = WatchManager(self._processor)536 # in each OS-specific implementation
747537
748 def add_to_mute_filter(self, event, **info):538 def add_to_mute_filter(self, event, **info):
749 """Add info to mute filter in the processor."""539 """Add info to mute filter in the processor."""
@@ -757,27 +547,24 @@
757 """Prepares the EQ to be closed."""547 """Prepares the EQ to be closed."""
758 return self._watch_manager.stop()548 return self._watch_manager.stop()
759549
760 @windowspath(path_indexes=[1])550 @os_path(path_indexes=[1])
761 def rm_watch(self, dirpath):551 def rm_watch(self, dirpath):
762 """Remove watch from a dir."""552 """Remove watch from a dir."""
763 # trust the implementation of the manager553 # trust the implementation of the manager
764 self._watch_manager.rm_path(dirpath)554 self._watch_manager.rm_path(dirpath)
765555
766 @windowspath(path_indexes=[1])556 @os_path(path_indexes=[1])
767 def add_watch(self, dirpath):557 def add_watch(self, dirpath):
768 """Add watch to a dir."""558 """Add watch to a dir."""
769 # the logic to check if the watch is already set559 # the logic to check if the watch is already set
770 # is all in WatchManager.add_watch560 # is all in WatchManager.add_watch
771 return self._watch_manager.add_watch(dirpath,561 return self._watch_manager.add_watch(dirpath,
772 FILESYSTEM_MONITOR_MASK, auto_add=True)562 self.filesystem_monitor_mask, auto_add=True)
773563
774 def add_watches_to_udf_ancestors(self, volume):564 def add_watches_to_udf_ancestors(self, volume):
775 """Add a inotify watch to volume's ancestors if it's an UDF."""565 """Add a inotify watch to volume's ancestors if it's an UDF."""
776 # On windows there is no need to add the watches to the ancestors566
777 # so we will always return true. The reason for this is that the567 # Should be implemented in each OS if necessary
778 # handles that we open stop the user from renaming the ancestors of
779 # the UDF, for a user to do that he has to unsync the udf first
780 return defer.succeed(True)
781568
782 def is_frozen(self):569 def is_frozen(self):
783 """Checks if there's something frozen."""570 """Checks if there's something frozen."""
784571
=== added file 'ubuntuone/platform/filesystem_notifications/windows.py'
--- ubuntuone/platform/filesystem_notifications/windows.py 1970-01-01 00:00:00 +0000
+++ ubuntuone/platform/filesystem_notifications/windows.py 2012-06-26 19:23:19 +0000
@@ -0,0 +1,308 @@
1# -*- coding: utf-8 *-*
2#
3# Copyright 2011-2012 Canonical Ltd.
4#
5# This program is free software: you can redistribute it and/or modify it
6# under the terms of the GNU General Public License version 3, as published
7# by the Free Software Foundation.
8#
9# This program is distributed in the hope that it will be useful, but
10# WITHOUT ANY WARRANTY; without even the implied warranties of
11# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12# PURPOSE. See the GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License along
15# with this program. If not, see <http://www.gnu.org/licenses/>.
16#
17# In addition, as a special exception, the copyright holders give
18# permission to link the code of portions of this program with the
19# OpenSSL library under certain conditions as described in each
20# individual source file, and distribute linked combinations
21# including the two.
22# You must obey the GNU General Public License in all respects
23# for all of the code used other than OpenSSL. If you modify
24# file(s) with this exception, you may extend this exception to your
25# version of the file(s), but you are not obligated to do so. If you
26# do not wish to do so, delete this exception statement from your
27# version. If you delete this exception statement from all source
28# files in the program, then also delete it here.
29"""File notifications on windows."""
30
31import os
32from uuid import uuid4
33
34from twisted.internet import defer, reactor
35from twisted.python.failure import Failure
36
37from pywintypes import OVERLAPPED
38from win32api import CloseHandle
39from win32con import (
40 FILE_SHARE_READ,
41 FILE_SHARE_WRITE,
42 FILE_FLAG_BACKUP_SEMANTICS,
43 FILE_NOTIFY_CHANGE_FILE_NAME,
44 FILE_NOTIFY_CHANGE_DIR_NAME,
45 FILE_NOTIFY_CHANGE_ATTRIBUTES,
46 FILE_NOTIFY_CHANGE_SIZE,
47 FILE_NOTIFY_CHANGE_LAST_WRITE,
48 FILE_NOTIFY_CHANGE_SECURITY,
49 OPEN_EXISTING)
50from win32file import (
51 AllocateReadBuffer,
52 CreateFileW,
53 GetOverlappedResult,
54 ReadDirectoryChangesW,
55 FILE_FLAG_OVERLAPPED,
56 FILE_NOTIFY_INFORMATION)
57from win32event import (
58 CreateEvent,
59 INFINITE,
60 SetEvent,
61 WaitForMultipleObjects,
62 WAIT_OBJECT_0)
63
64from ubuntuone.platform.os_helper.windows import (
65 is_valid_syncdaemon_path,
66 get_syncdaemon_valid_path,
67)
68
69from ubuntuone.platform.filesystem_notifications import common
70
71
72# constant found in the msdn documentation:
73# http://msdn.microsoft.com/en-us/library/ff538834(v=vs.85).aspx
74FILE_LIST_DIRECTORY = 0x0001
75FILE_NOTIFY_CHANGE_LAST_ACCESS = 0x00000020
76FILE_NOTIFY_CHANGE_CREATION = 0x00000040
77
78THREADPOOL_MAX = 20
79
80
81class Watch(common.Watch):
82 """Implement the same functions as pyinotify.Watch."""
83
84 def __init__(self, watch_descriptor, path, mask, auto_add, processor,
85 buf_size=8192):
86 super(Watch, self).__init__(watch_descriptor, path, mask, auto_add,
87 processor, buf_size)
88 self._wait_stop = CreateEvent(None, 0, 0, None)
89 self._overlapped = OVERLAPPED()
90 self._overlapped.hEvent = CreateEvent(None, 0, 0, None)
91 # remember the subdirs we have so that when we have a delete we can
92 # check if it was a remove
93 self._subdirs = set()
94 # this deferred is fired when the watch has started monitoring
95 # a directory from a thread
96 self._watch_started_deferred = defer.Deferred()
97 # and this one is fired when the watch has stopped
98 self._watch_stopped_deferred = defer.Deferred()
99
100 def _process_events(self, events):
101 """Process the events from the queue."""
102 # do not do it if we stop watching and the events are empty
103 if not self._watching:
104 return
105
106 # we transform the events to be the same as the one in pyinotify
107 # and then use the proc_fun
108 for action, file_name in events:
109 if any([file_name.startswith(path)
110 for path in self._ignore_paths]):
111 continue
112 # map the windows events to the pyinotify ones, tis is dirty but
113 # makes the multiplatform better, linux was first :P
114 syncdaemon_path = get_syncdaemon_valid_path(
115 os.path.join(self._path, file_name))
116 full_dir_path = os.path.join(self._path, file_name)
117 is_dir = self._path_is_dir(full_dir_path)
118 if is_dir:
119 # we need to update the list of subdirs that we have
120 self._update_subdirs(full_dir_path, action)
121 mask = common.COMMON_ACTIONS[action]
122 head, tail = os.path.split(file_name)
123 if is_dir:
124 mask |= common.IN_ISDIR
125 event_raw_data = {
126 'wd': self._descriptor,
127 'dir': is_dir,
128 'mask': mask,
129 'name': tail,
130 'path': '.'}
131 # by the way in which the win api fires the events we know for
132 # sure that no move events will be added in the wrong order, this
133 # is kind of hacky, I dont like it too much
134 if common.COMMON_ACTIONS[action] == common.IN_MOVED_FROM:
135 self._cookie = str(uuid4())
136 self._source_pathname = tail
137 event_raw_data['cookie'] = self._cookie
138 if common.COMMON_ACTIONS[action] == common.IN_MOVED_TO:
139 event_raw_data['src_pathname'] = self._source_pathname
140 event_raw_data['cookie'] = self._cookie
141 event = common.Event(event_raw_data)
142 # FIXME: event deduces the pathname wrong and we need to manually
143 # set it
144 event.pathname = syncdaemon_path
145 # add the event only if we do not have an exclude filter or
146 # the exclude filter returns False, that is, the event will not
147 # be excluded
148 self.log.debug('Pushing event %r to processor.', event)
149 self._processor(event)
150
151 def _call_deferred(self, f, *args):
152 """Executes the deferred call avoiding possible race conditions."""
153 if not self._watch_started_deferred.called:
154 f(*args)
155
156 def _watch_wrapper(self):
157 """Wrap _watch, and errback on any unhandled error."""
158 try:
159 self._watch()
160 except Exception:
161 reactor.callFromThread(self._call_deferred,
162 self._watch_started_deferred.errback, Failure())
163
164 def _watch(self):
165 """Watch a path that is a directory."""
166 self.log.debug('Adding watch for %r (exists? %r is dir? %r).',
167 self._path,
168 os.path.exists(self._path), os.path.isdir(self._path))
169 # we are going to be using the ReadDirectoryChangesW whihc requires
170 # a directory handle and the mask to be used.
171 self._watch_handle = CreateFileW(
172 self._path,
173 FILE_LIST_DIRECTORY,
174 FILE_SHARE_READ | FILE_SHARE_WRITE,
175 None,
176 OPEN_EXISTING,
177 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
178 None)
179
180 try:
181 self._watch_loop(self._watch_handle)
182 finally:
183 CloseHandle(self._watch_handle)
184 self._watch_handle = None
185 reactor.callFromThread(self.stopped.callback, True)
186
187 def _watch_loop(self, handle):
188 """The loop where we watch the directory."""
189 while True:
190 # important information to know about the parameters:
191 # param 1: the handle to the dir
192 # param 2: the size to be used in the kernel to store events
193 # that might be lost while the call is being performed. This
194 # is complicated to fine tune since if you make lots of watcher
195 # you migh used too much memory and make your OS to BSOD
196 buf = AllocateReadBuffer(self._buf_size)
197 ReadDirectoryChangesW(
198 handle,
199 buf,
200 self._auto_add,
201 self._mask,
202 self._overlapped,
203 )
204 if not self._watch_started_deferred.called:
205 reactor.callFromThread(self._call_deferred,
206 self._watch_started_deferred.callback, True)
207 # wait for an event and ensure that we either stop or read the
208 # data
209 rc = WaitForMultipleObjects((self._wait_stop,
210 self._overlapped.hEvent),
211 0, INFINITE)
212 if rc == WAIT_OBJECT_0:
213 # Stop event
214 break
215 # if we continue, it means that we got some data, lets read it
216 data = GetOverlappedResult(handle, self._overlapped, True)
217 # lets ead the data and store it in the results
218 events = FILE_NOTIFY_INFORMATION(buf, data)
219 self.log.debug('Got from ReadDirectoryChangesW %r.',
220 [(common.COMMON_ACTIONS_NAMES[action], path) \
221 for action, path in events])
222 reactor.callFromThread(self._process_events, events)
223
224 def start_watching(self):
225 """Tell the watch to start processing events."""
226 super(Watch, self).start_watching()
227 reactor.callInThread(self._watch_wrapper)
228 return self._watch_started_deferred
229
230 def stop_watching(self):
231 """Tell the watch to stop processing events."""
232 super(Watch, self).stop_watching()
233 SetEvent(self._wait_stop)
234 return self.stopped
235
236 @property
237 def started(self):
238 """A deferred that will be called when the watch is running."""
239 return self._watch_started_deferred
240
241 @property
242 def stopped(self):
243 """A deferred fired when the watch thread has finished."""
244 return self._watch_stopped_deferred
245
246
247class WatchManager(common.WatchManager):
248
249 def stop(self):
250 """Close the manager and stop all watches."""
251 self.log.debug('Stopping watches.')
252 wait_list = []
253 for current_wd in self._wdm:
254 wait_list.append(self._wdm[current_wd].stop_watching())
255 self.log.debug('Stopping Watch on %r.', self._wdm[current_wd].path)
256 return defer.DeferredList(wait_list)
257
258 def _adding_watch(self, path, mask, auto_add):
259 # adjust the number of threads based on the UDFs watched
260 watch = Watch(self._wd_count, path, mask, auto_add, self._processor)
261 reactor.suggestThreadPoolSize(THREADPOOL_MAX + self._wd_count + 1)
262 self._wdm[self._wd_count] = watch
263 d = self._wdm[self._wd_count].start_watching()
264 self._wd_count += 1
265 return d
266
267
268class NotifyProcessor(common.NotifyProcessor):
269 """
270 Processor that takes care of dealing with the events.
271
272 Also, they must not be literal paths, that is the \\?\ prefix should not be
273 in the path.
274 """
275
276 @is_valid_syncdaemon_path(path_indexes=[1])
277 def platform_is_ignored(self, path):
278 """Should we ignore this path in the current platform.?"""
279 # don't support links yet
280 if path.endswith('.lnk'):
281 return True
282 return False
283
284
285class FilesystemMonitor(common.FilesystemMonitor):
286 """Manages the signals from filesystem."""
287
288 def __init__(self, eq, fs, ignore_config=None, timeout=1):
289 super(FilesystemMonitor, self).__init__(eq, fs, ignore_config, timeout)
290 self._processor = NotifyProcessor(self, ignore_config)
291 self._watch_manager = WatchManager(self._processor)
292 # the default mask to be used in the watches
293 # added by the FilesystemMonitor class
294 self.filesystem_monitor_mask = FILE_NOTIFY_CHANGE_FILE_NAME | \
295 FILE_NOTIFY_CHANGE_DIR_NAME | \
296 FILE_NOTIFY_CHANGE_ATTRIBUTES | \
297 FILE_NOTIFY_CHANGE_SIZE | \
298 FILE_NOTIFY_CHANGE_LAST_WRITE | \
299 FILE_NOTIFY_CHANGE_SECURITY | \
300 FILE_NOTIFY_CHANGE_LAST_ACCESS
301
302 def add_watches_to_udf_ancestors(self, volume):
303 """Add a inotify watch to volume's ancestors if it's an UDF."""
304 # There is no need to add the watches to the ancestors
305 # so we will always return true. The reason for this is that the
306 # handles that we open stop the user from renaming the ancestors of
307 # the UDF, for a user to do that he has to unsync the udf first
308 return defer.succeed(True)

Subscribers

People subscribed via source and target branches