Merge lp:~jderose/dmedia/empty-files into lp:dmedia
- empty-files
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Jason Gerard DeRose |
Approved revision: | 190 |
Merged at revision: | 163 |
Proposed branch: | lp:~jderose/dmedia/empty-files |
Merge into: | lp:dmedia |
Diff against target: |
1424 lines (+597/-330) 8 files modified
dmedia/filestore.py (+4/-1) dmedia/importer.py (+188/-97) dmedia/metastore.py (+0/-13) dmedia/tests/test_filestore.py (+1/-0) dmedia/tests/test_importer.py (+399/-181) dmedia/tests/test_metastore.py (+0/-36) dmedia/util.py (+4/-1) dmedia/workers.py (+1/-1) |
To merge this branch: | bzr merge lp:~jderose/dmedia/empty-files |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Jason Gerard DeRose | Approve | ||
Review via email: mp+50431@code.launchpad.net |
Commit message
Description of the change
Started out meaning just to change Importer so than rather than importing empty files into the FileStore, it would just note them in the "dmedia/import" record. But as I dug in a bit, decided that there was really some important enhancement needed in Importer and ImportManager, so I just bit the bullet. Changes include:
* Highly detailed logging in "dmedia/import" record: in addition to tracking empty files, it also tracks the files considered for import, files imported, files skipped because they're duplicates, and files for which an error occurred when attempted the import.
* Entirely axed use of quick_id: too error prone, not as super duty as dmedia needs to be.
* Well defined behavior when there is inconsistency between FileStore and database: new files are always copied into FileStore even if corresponding document exists; when document does not exist, document is always created, even if the file is a duplicate from the FileStore perspective
* Database is now compacted upon finishing a batch import: even if there are no old revisions, compacting at this point can dramatically reduce database size as the btree can be optimally laid out (the random IDs dmedia uses means the btree doesn't grow in a particularly space efficient way)
* Now that we're only targeting python-couchdb >= 0.8, I'm taking advantage of Database.save() so I can get rid of some hacky crap, make fewer CouchDB requests (bad python-couchdb, your wrapper-itis obscures and thwarts the elegant CouchDB REST API).
* A number of improvement to what is logged in service.log to make it easier to debug, including putting the process ID in the log format to make it easier to debug multiprocessing issues
You know what they say, "Refactor early, refactor often." Or maybe only I say that, but either way, it's awesome.
- 190. By Jason Gerard DeRose
-
Small tweak to make log more readable
Preview Diff
1 | === modified file 'dmedia/filestore.py' |
2 | --- dmedia/filestore.py 2011-02-17 00:12:20 +0000 |
3 | +++ dmedia/filestore.py 2011-02-19 04:44:09 +0000 |
4 | @@ -1037,5 +1037,8 @@ |
5 | try: |
6 | self.tmp_move(tmp_fp, chash, ext) |
7 | except DuplicateFile as e: |
8 | - raise DuplicateFile(src=src_fp.name, dst=e.dst, chash=e.chash) |
9 | + log.warning('File %r is duplicate of %r', src_fp.name, e.dst) |
10 | + raise DuplicateFile(src=src_fp.name, dst=e.dst, tmp=e.src, |
11 | + chash=chash, leaves=h.leaves |
12 | + ) |
13 | return (chash, h.leaves) |
14 | |
15 | === modified file 'dmedia/importer.py' |
16 | --- dmedia/importer.py 2011-02-15 07:53:55 +0000 |
17 | +++ dmedia/importer.py 2011-02-19 04:44:09 +0000 |
18 | @@ -29,17 +29,21 @@ |
19 | import mimetypes |
20 | import time |
21 | from base64 import b64encode |
22 | +import logging |
23 | + |
24 | import couchdb |
25 | + |
26 | from .util import random_id |
27 | -from .workers import Worker, Manager, register, isregistered |
28 | +from .errors import DuplicateFile |
29 | +from .workers import Worker, Manager, register, isregistered, exception_name |
30 | from .filestore import FileStore, quick_id, safe_open, safe_ext, pack_leaves |
31 | from .metastore import MetaStore |
32 | from .extractor import merge_metadata |
33 | |
34 | + |
35 | mimetypes.init() |
36 | - |
37 | - |
38 | DOTDIR = '.dmedia' |
39 | +log = logging.getLogger() |
40 | |
41 | |
42 | def normalize_ext(name): |
43 | @@ -118,7 +122,8 @@ |
44 | error to be interpreted as there being no files on the card! |
45 | """ |
46 | if path.isfile(base): |
47 | - yield base |
48 | + s = os.stat(base) |
49 | + yield (base, s.st_size, s.st_mtime) |
50 | return |
51 | names = sorted(os.listdir(base)) |
52 | dirs = [] |
53 | @@ -127,12 +132,13 @@ |
54 | if path.islink(fullname): |
55 | continue |
56 | if path.isfile(fullname): |
57 | - yield fullname |
58 | + s = os.stat(fullname) |
59 | + yield (fullname, s.st_size, s.st_mtime) |
60 | elif path.isdir(fullname): |
61 | dirs.append(fullname) |
62 | for fullname in dirs: |
63 | - for f in files_iter(fullname): |
64 | - yield f |
65 | + for tup in files_iter(fullname): |
66 | + yield tup |
67 | |
68 | |
69 | def create_batch(machine_id=None): |
70 | @@ -145,8 +151,14 @@ |
71 | 'time': time.time(), |
72 | 'machine_id': machine_id, |
73 | 'imports': [], |
74 | - 'imported': {'count': 0, 'bytes': 0}, |
75 | - 'skipped': {'count': 0, 'bytes': 0}, |
76 | + 'errors': [], |
77 | + 'stats': { |
78 | + 'considered': {'count': 0, 'bytes': 0}, |
79 | + 'imported': {'count': 0, 'bytes': 0}, |
80 | + 'skipped': {'count': 0, 'bytes': 0}, |
81 | + 'empty': {'count': 0, 'bytes': 0}, |
82 | + 'error': {'count': 0, 'bytes': 0}, |
83 | + } |
84 | } |
85 | |
86 | |
87 | @@ -157,10 +169,22 @@ |
88 | return { |
89 | '_id': random_id(), |
90 | 'type': 'dmedia/import', |
91 | + 'time': time.time(), |
92 | 'batch_id': batch_id, |
93 | 'machine_id': machine_id, |
94 | 'base': base, |
95 | - 'time': time.time(), |
96 | + 'log': { |
97 | + 'imported': [], |
98 | + 'skipped': [], |
99 | + 'empty': [], |
100 | + 'error': [], |
101 | + }, |
102 | + 'stats': { |
103 | + 'imported': {'count': 0, 'bytes': 0}, |
104 | + 'skipped': {'count': 0, 'bytes': 0}, |
105 | + 'empty': {'count': 0, 'bytes': 0}, |
106 | + 'error': {'count': 0, 'bytes': 0}, |
107 | + } |
108 | } |
109 | |
110 | |
111 | @@ -181,59 +205,88 @@ |
112 | except couchdb.ResourceConflict: |
113 | pass |
114 | |
115 | - self.__stats = { |
116 | - 'imported': { |
117 | - 'count': 0, |
118 | - 'bytes': 0, |
119 | - }, |
120 | - 'skipped': { |
121 | - 'count': 0, |
122 | - 'bytes': 0, |
123 | - }, |
124 | - } |
125 | - self.__files = None |
126 | - self.__imported = [] |
127 | - self._import = None |
128 | - self._import_id = None |
129 | + self.filetuples = None |
130 | + self._processed = [] |
131 | + self.doc = None |
132 | + self._id = None |
133 | + |
134 | + def save(self): |
135 | + """ |
136 | + Save current 'dmedia/import' record to CouchDB. |
137 | + """ |
138 | + self.db.save(self.doc) |
139 | |
140 | def start(self): |
141 | """ |
142 | - Create the initial import record, return that record's ID. |
143 | + Create the initial 'dmedia/import' record, return that record's ID. |
144 | """ |
145 | - doc = create_import(self.base, |
146 | + assert self._id is None |
147 | + self.doc = create_import(self.base, |
148 | batch_id=self.batch_id, |
149 | machine_id=self.metastore.machine_id, |
150 | ) |
151 | - self._import_id = doc['_id'] |
152 | - assert self.metastore.db.create(doc) == self._import_id |
153 | - self._import = self.metastore.db[self._import_id] |
154 | - return self._import_id |
155 | - |
156 | - def get_stats(self): |
157 | - return dict( |
158 | - (k, dict(v)) for (k, v) in self.__stats.iteritems() |
159 | - ) |
160 | + self._id = self.doc['_id'] |
161 | + self.save() |
162 | + return self._id |
163 | |
164 | def scanfiles(self): |
165 | - if self.__files is None: |
166 | - self.__files = tuple(files_iter(self.base)) |
167 | - return self.__files |
168 | - |
169 | - def __import_file(self, src): |
170 | + """ |
171 | + Build list of files that will be considered for import. |
172 | + |
173 | + After this method has been called, the ``Importer.filetuples`` attribute |
174 | + will contain ``(filename,size,mtime)`` tuples for all files being |
175 | + considered. This information is saved into the dmedia/import record to |
176 | + provide a rich audio trail and aid in debugging. |
177 | + """ |
178 | + assert self.filetuples is None |
179 | + self.filetuples = tuple(files_iter(self.base)) |
180 | + self.doc['log']['considered'] = [ |
181 | + {'src': src, 'bytes': size, 'mtime': mtime} |
182 | + for (src, size, mtime) in self.filetuples |
183 | + ] |
184 | + total_bytes = sum(size for (src, size, mtime) in self.filetuples) |
185 | + self.doc['stats']['considered'] = { |
186 | + 'count': len(self.filetuples), 'bytes': total_bytes |
187 | + } |
188 | + self.save() |
189 | + return self.filetuples |
190 | + |
191 | + def _import_file(self, src): |
192 | + """ |
193 | + Attempt to import *src* into dmedia library. |
194 | + """ |
195 | fp = safe_open(src, 'rb') |
196 | - quickid = quick_id(fp) |
197 | - ids = list(self.metastore.by_quickid(quickid)) |
198 | - if ids: |
199 | - # FIXME: Even if this is a duplicate, we should check if the file |
200 | - # is stored on this machine, and if not copy into the FileStore. |
201 | - doc = self.metastore.db[ids[0]] |
202 | - return ('skipped', doc) |
203 | - basename = path.basename(src) |
204 | - (root, ext) = normalize_ext(basename) |
205 | - # FIXME: We need to handle the (rare) case when a DuplicateFile |
206 | - # exception is raised by FileStore.import_file() |
207 | - (chash, leaves) = self.filestore.import_file(fp, ext) |
208 | stat = os.fstat(fp.fileno()) |
209 | + if stat.st_size == 0: |
210 | + log.warning('File size is zero: %r', src) |
211 | + return ('empty', None) |
212 | + |
213 | + name = path.basename(src) |
214 | + (root, ext) = normalize_ext(name) |
215 | + try: |
216 | + (chash, leaves) = self.filestore.import_file(fp, ext) |
217 | + action = 'imported' |
218 | + except DuplicateFile as e: |
219 | + chash = e.chash |
220 | + leaves = e.leaves |
221 | + action = 'skipped' |
222 | + assert e.tmp.startswith(self.filestore.join('imports')) |
223 | + # FIXME: We should really probably move this into duplicates/ or |
224 | + # something and not delete till we verify integrity of what is |
225 | + # already in the filestore. |
226 | + os.remove(e.tmp) |
227 | + |
228 | + try: |
229 | + doc = self.db[chash] |
230 | + if self.filestore._id not in doc['stored']: |
231 | + doc['stored'][self.filestore._id] = { |
232 | + 'copies': 1, |
233 | + 'time': time.time(), |
234 | + } |
235 | + self.db.save(doc) |
236 | + return (action, doc) |
237 | + except couchdb.ResourceNotFound as e: |
238 | + pass |
239 | |
240 | ts = time.time() |
241 | doc = { |
242 | @@ -256,42 +309,66 @@ |
243 | }, |
244 | }, |
245 | |
246 | - 'qid': quickid, |
247 | - 'import_id': self._import_id, |
248 | + 'import_id': self._id, |
249 | 'mtime': stat.st_mtime, |
250 | - 'basename': basename, |
251 | - 'dirname': path.relpath(path.dirname(src), self.base), |
252 | + 'name': name, |
253 | + 'dir': path.relpath(path.dirname(src), self.base), |
254 | } |
255 | if ext: |
256 | doc['content_type'] = mimetypes.types_map.get('.' + ext) |
257 | if self.extract: |
258 | merge_metadata(src, doc) |
259 | - (_id, _rev) = self.metastore.db.save(doc) |
260 | + (_id, _rev) = self.db.save(doc) |
261 | assert _id == chash |
262 | - return ('imported', doc) |
263 | - |
264 | - def import_file(self, src): |
265 | - (action, doc) = self.__import_file(src) |
266 | - self.__imported.append(src) |
267 | - self.__stats[action]['count'] += 1 |
268 | - self.__stats[action]['bytes'] += doc['bytes'] |
269 | return (action, doc) |
270 | |
271 | + def import_file(self, src, size): |
272 | + """ |
273 | + Wraps `Importer._import_file()` with error handling and logging. |
274 | + """ |
275 | + self._processed.append(src) |
276 | + try: |
277 | + (action, doc) = self._import_file(src) |
278 | + if action == 'empty': |
279 | + entry = src |
280 | + else: |
281 | + entry = { |
282 | + 'src': src, |
283 | + 'id': doc['_id'], |
284 | + } |
285 | + except Exception as e: |
286 | + log.exception('Error importing %r', src) |
287 | + action = 'error' |
288 | + entry = { |
289 | + 'src': src, |
290 | + 'name': exception_name(e), |
291 | + 'msg': str(e), |
292 | + } |
293 | + self.doc['log'][action].append(entry) |
294 | + self.doc['stats'][action]['count'] += 1 |
295 | + self.doc['stats'][action]['bytes'] += size |
296 | + if action == 'error': |
297 | + self.save() |
298 | + return (action, entry) |
299 | + |
300 | def import_all_iter(self): |
301 | - for src in self.scanfiles(): |
302 | - (action, doc) = self.import_file(src) |
303 | - yield (src, action, doc) |
304 | + for (src, size, mtime) in self.filetuples: |
305 | + (action, entry) = self.import_file(src, size) |
306 | + yield (src, action) |
307 | |
308 | def finalize(self): |
309 | - files = self.scanfiles() |
310 | - assert len(files) == len(self.__imported) |
311 | - assert set(files) == set(self.__imported) |
312 | - s = self.get_stats() |
313 | - self._import.update(s) |
314 | - self._import['time_end'] = time.time() |
315 | - self.db[self._import_id] = self._import |
316 | - assert s['imported']['count'] + s['skipped']['count'] == len(files) |
317 | - return s |
318 | + """ |
319 | + Finalize import and save final import record to CouchDB. |
320 | + |
321 | + The method will add the ``"time_end"`` key into the import record and |
322 | + save it to CouchDB. There will likely also be being changes in the |
323 | + ``"log"`` and ``"stats"`` keys, which will likewise be saved to CouchDB. |
324 | + """ |
325 | + assert len(self.filetuples) == len(self._processed) |
326 | + assert list(t[0] for t in self.filetuples) == self._processed |
327 | + self.doc['time_end'] = time.time() |
328 | + self.save() |
329 | + return self.doc['stats'] |
330 | |
331 | |
332 | class ImportWorker(Worker): |
333 | @@ -307,12 +384,11 @@ |
334 | self.emit('count', import_id, total) |
335 | |
336 | c = 1 |
337 | - for (src, action, doc) in adapter.import_all_iter(): |
338 | + for (src, action) in adapter.import_all_iter(): |
339 | self.emit('progress', import_id, c, total, |
340 | dict( |
341 | action=action, |
342 | src=src, |
343 | - _id=doc['_id'], |
344 | ) |
345 | ) |
346 | c += 1 |
347 | @@ -332,6 +408,8 @@ |
348 | |
349 | def accumulate_stats(accum, stats): |
350 | for (key, d) in stats.items(): |
351 | + if key not in accum: |
352 | + accum[key] = {'count': 0, 'bytes': 0} |
353 | for (k, v) in d.items(): |
354 | accum[key][k] += v |
355 | |
356 | @@ -342,33 +420,46 @@ |
357 | self._dbname = dbname |
358 | self.metastore = MetaStore(dbname=dbname) |
359 | self.db = self.metastore.db |
360 | - self._batch = None |
361 | + self.doc = None |
362 | self._total = 0 |
363 | self._completed = 0 |
364 | if not isregistered(ImportWorker): |
365 | register(ImportWorker) |
366 | |
367 | - def _sync(self, doc): |
368 | - _id = doc['_id'] |
369 | - self.db[_id] = doc |
370 | - return self.db[_id] |
371 | + def save(self): |
372 | + """ |
373 | + Save current 'dmedia/batch' record to CouchDB. |
374 | + """ |
375 | + self.db.save(self.doc) |
376 | |
377 | def _start_batch(self): |
378 | - assert self._batch is None |
379 | + assert self.doc is None |
380 | assert self._workers == {} |
381 | self._total = 0 |
382 | self._completed = 0 |
383 | - self._batch = self._sync(create_batch(self.metastore.machine_id)) |
384 | - self.emit('BatchStarted', self._batch['_id']) |
385 | + self.doc = create_batch(self.metastore.machine_id) |
386 | + self.save() |
387 | + self.emit('BatchStarted', self.doc['_id']) |
388 | |
389 | def _finish_batch(self): |
390 | assert self._workers == {} |
391 | - self._batch['time_end'] = time.time() |
392 | - self._batch = self._sync(self._batch) |
393 | - self.emit('BatchFinished', self._batch['_id'], |
394 | - to_dbus_stats(self._batch) |
395 | - ) |
396 | - self._batch = None |
397 | + self.doc['time_end'] = time.time() |
398 | + self.save() |
399 | + self.emit('BatchFinished', self.doc['_id'], |
400 | + to_dbus_stats(self.doc['stats']) |
401 | + ) |
402 | + self.doc = None |
403 | + log.info('Batch complete, compacting database...') |
404 | + self.db.compact() |
405 | + |
406 | + def on_error(self, key, exception, message): |
407 | + super(ImportManager, self).on_error(key, exception, message) |
408 | + if self.doc is None: |
409 | + return |
410 | + self.doc['errors'].append( |
411 | + {'key': key, 'name': exception, 'msg': message} |
412 | + ) |
413 | + self.save() |
414 | |
415 | def on_terminate(self, key): |
416 | super(ImportManager, self).on_terminate(key) |
417 | @@ -376,8 +467,8 @@ |
418 | self._finish_batch() |
419 | |
420 | def on_started(self, key, import_id): |
421 | - self._batch['imports'].append(import_id) |
422 | - self._batch = self._sync(self._batch) |
423 | + self.doc['imports'].append(import_id) |
424 | + self.save() |
425 | self.emit('ImportStarted', key, import_id) |
426 | |
427 | def on_count(self, key, import_id, total): |
428 | @@ -389,8 +480,8 @@ |
429 | self.emit('ImportProgress', key, import_id, completed, total, info) |
430 | |
431 | def on_finished(self, key, import_id, stats): |
432 | - accumulate_stats(self._batch, stats) |
433 | - self._batch = self._sync(self._batch) |
434 | + accumulate_stats(self.doc['stats'], stats) |
435 | + self.save() |
436 | self.emit('ImportFinished', key, import_id, to_dbus_stats(stats)) |
437 | |
438 | def get_batch_progress(self): |
439 | @@ -404,7 +495,7 @@ |
440 | if len(self._workers) == 0: |
441 | self._start_batch() |
442 | return self.do('ImportWorker', base, |
443 | - self._batch['_id'], base, extract, self._dbname |
444 | + self.doc['_id'], base, extract, self._dbname |
445 | ) |
446 | |
447 | def list_imports(self): |
448 | |
449 | === modified file 'dmedia/metastore.py' |
450 | --- dmedia/metastore.py 2011-02-07 09:53:28 +0000 |
451 | +++ dmedia/metastore.py 2011-02-19 04:44:09 +0000 |
452 | @@ -109,14 +109,6 @@ |
453 | } |
454 | """ |
455 | |
456 | -file_qid = """ |
457 | -function(doc) { |
458 | - if (doc.type == 'dmedia/file' && doc.qid) { |
459 | - emit(doc.qid, null); |
460 | - } |
461 | -} |
462 | -""" |
463 | - |
464 | file_import_id = """ |
465 | function(doc) { |
466 | if (doc.type == 'dmedia/file' && doc.import_id) { |
467 | @@ -181,7 +173,6 @@ |
468 | )), |
469 | |
470 | ('file', ( |
471 | - ('qid', file_qid, None), |
472 | ('import_id', file_import_id, None), |
473 | ('bytes', file_bytes, _sum), |
474 | ('ext', file_ext, _count), |
475 | @@ -257,10 +248,6 @@ |
476 | (_id, doc) = build_design_doc(name, views) |
477 | self.update(doc) |
478 | |
479 | - def by_quickid(self, qid): |
480 | - for row in self.db.view('_design/file/_view/qid', key=qid): |
481 | - yield row.id |
482 | - |
483 | def total_bytes(self): |
484 | for row in self.db.view('_design/file/_view/bytes'): |
485 | return row.value |
486 | |
487 | === modified file 'dmedia/tests/test_filestore.py' |
488 | --- dmedia/tests/test_filestore.py 2011-02-15 00:54:58 +0000 |
489 | +++ dmedia/tests/test_filestore.py 2011-02-19 04:44:09 +0000 |
490 | @@ -1188,3 +1188,4 @@ |
491 | self.assertEqual(e.chash, mov_hash) |
492 | self.assertEqual(e.src, src) |
493 | self.assertEqual(e.dst, dst) |
494 | + self.assertTrue(e.tmp.startswith(base + '/imports/')) |
495 | |
496 | === modified file 'dmedia/tests/test_importer.py' |
497 | --- dmedia/tests/test_importer.py 2011-02-07 09:46:16 +0000 |
498 | +++ dmedia/tests/test_importer.py 2011-02-19 04:44:09 +0000 |
499 | @@ -105,13 +105,14 @@ |
500 | f = importer.files_iter |
501 | tmp = TempDir() |
502 | files = [] |
503 | - for args in relpaths: |
504 | - p = tmp.touch('subdir', *args) |
505 | - files.append(p) |
506 | + for (i, args) in enumerate(relpaths): |
507 | + content = 'a' * (2 ** i) |
508 | + p = tmp.write(content, 'subdir', *args) |
509 | + files.append((p, len(content), path.getmtime(p))) |
510 | |
511 | # Test when base is a file: |
512 | - for p in files: |
513 | - self.assertEqual(list(f(p)), [p]) |
514 | + for (p, s, t) in files: |
515 | + self.assertEqual(list(f(p)), [(p, s, t)]) |
516 | |
517 | # Test importing from tmp.path: |
518 | self.assertEqual(list(f(tmp.path)), files) |
519 | @@ -143,9 +144,9 @@ |
520 | 'type', |
521 | 'time', |
522 | 'imports', |
523 | - 'imported', |
524 | - 'skipped', |
525 | + 'errors', |
526 | 'machine_id', |
527 | + 'stats', |
528 | ]) |
529 | ) |
530 | _id = doc['_id'] |
531 | @@ -155,9 +156,18 @@ |
532 | self.assertTrue(isinstance(doc['time'], (int, float))) |
533 | self.assertTrue(doc['time'] <= time.time()) |
534 | self.assertEqual(doc['imports'], []) |
535 | - self.assertEqual(doc['imported'], {'count': 0, 'bytes': 0}) |
536 | - self.assertEqual(doc['skipped'], {'count': 0, 'bytes': 0}) |
537 | + self.assertEqual(doc['errors'], []) |
538 | self.assertEqual(doc['machine_id'], machine_id) |
539 | + self.assertEqual( |
540 | + doc['stats'], |
541 | + { |
542 | + 'considered': {'count': 0, 'bytes': 0}, |
543 | + 'imported': {'count': 0, 'bytes': 0}, |
544 | + 'skipped': {'count': 0, 'bytes': 0}, |
545 | + 'empty': {'count': 0, 'bytes': 0}, |
546 | + 'error': {'count': 0, 'bytes': 0}, |
547 | + } |
548 | + ) |
549 | |
550 | def test_create_import(self): |
551 | f = importer.create_import |
552 | @@ -173,6 +183,8 @@ |
553 | 'base', |
554 | 'batch_id', |
555 | 'machine_id', |
556 | + 'log', |
557 | + 'stats', |
558 | ]) |
559 | |
560 | doc = f(base, batch_id=batch_id, machine_id=machine_id) |
561 | @@ -196,6 +208,24 @@ |
562 | self.assertEqual(set(doc), keys) |
563 | self.assertEqual(doc['batch_id'], None) |
564 | self.assertEqual(doc['machine_id'], None) |
565 | + self.assertEqual( |
566 | + doc['log'], |
567 | + { |
568 | + 'imported': [], |
569 | + 'skipped': [], |
570 | + 'empty': [], |
571 | + 'error': [], |
572 | + } |
573 | + ) |
574 | + self.assertEqual( |
575 | + doc['stats'], |
576 | + { |
577 | + 'imported': {'count': 0, 'bytes': 0}, |
578 | + 'skipped': {'count': 0, 'bytes': 0}, |
579 | + 'empty': {'count': 0, 'bytes': 0}, |
580 | + 'error': {'count': 0, 'bytes': 0}, |
581 | + } |
582 | + ) |
583 | |
584 | def test_to_dbus_stats(self): |
585 | f = importer.to_dbus_stats |
586 | @@ -262,13 +292,13 @@ |
587 | def test_start(self): |
588 | tmp = TempDir() |
589 | inst = self.new(tmp.path) |
590 | - self.assertTrue(inst._import is None) |
591 | + self.assertTrue(inst.doc is None) |
592 | _id = inst.start() |
593 | self.assertEqual(len(_id), 24) |
594 | store = MetaStore(dbname=self.dbname) |
595 | - self.assertEqual(inst._import, store.db[_id]) |
596 | + self.assertEqual(inst.doc, store.db[_id]) |
597 | self.assertEqual( |
598 | - set(inst._import), |
599 | + set(inst.doc), |
600 | set([ |
601 | '_id', |
602 | '_rev', |
603 | @@ -277,60 +307,76 @@ |
604 | 'base', |
605 | 'batch_id', |
606 | 'machine_id', |
607 | + 'log', |
608 | + 'stats', |
609 | ]) |
610 | ) |
611 | - self.assertEqual(inst._import['batch_id'], self.batch_id) |
612 | + self.assertEqual(inst.doc['batch_id'], self.batch_id) |
613 | self.assertEqual( |
614 | - inst._import['machine_id'], |
615 | + inst.doc['machine_id'], |
616 | inst.metastore.machine_id |
617 | ) |
618 | - self.assertEqual(inst._import['base'], tmp.path) |
619 | - |
620 | - def test_get_stats(self): |
621 | - tmp = TempDir() |
622 | - inst = self.new(tmp.path) |
623 | - one = inst.get_stats() |
624 | - self.assertEqual(one, |
625 | - { |
626 | - 'imported': { |
627 | - 'count': 0, |
628 | - 'bytes': 0, |
629 | - }, |
630 | - 'skipped': { |
631 | - 'count': 0, |
632 | - 'bytes': 0, |
633 | - }, |
634 | - } |
635 | - ) |
636 | - two = inst.get_stats() |
637 | - self.assertFalse(one is two) |
638 | - self.assertFalse(one['imported'] is two['imported']) |
639 | - self.assertFalse(one['skipped'] is two['skipped']) |
640 | + self.assertEqual(inst.doc['base'], tmp.path) |
641 | + self.assertEqual( |
642 | + inst.doc['log'], |
643 | + { |
644 | + 'imported': [], |
645 | + 'skipped': [], |
646 | + 'empty': [], |
647 | + 'error': [], |
648 | + } |
649 | + ) |
650 | + self.assertEqual( |
651 | + inst.doc['stats'], |
652 | + { |
653 | + 'imported': {'count': 0, 'bytes': 0}, |
654 | + 'skipped': {'count': 0, 'bytes': 0}, |
655 | + 'empty': {'count': 0, 'bytes': 0}, |
656 | + 'error': {'count': 0, 'bytes': 0}, |
657 | + } |
658 | + ) |
659 | |
660 | def test_scanfiles(self): |
661 | tmp = TempDir() |
662 | inst = self.new(tmp.path) |
663 | + inst.start() |
664 | files = [] |
665 | - for args in relpaths: |
666 | - p = tmp.touch('subdir', *args) |
667 | - files.append(p) |
668 | + for (i, args) in enumerate(relpaths): |
669 | + content = 'a' * (2 ** i) |
670 | + p = tmp.write(content, 'subdir', *args) |
671 | + files.append((p, len(content), path.getmtime(p))) |
672 | got = inst.scanfiles() |
673 | self.assertEqual(got, tuple(files)) |
674 | - self.assertTrue(inst.scanfiles() is got) |
675 | + self.assertEqual( |
676 | + inst.db[inst._id]['log']['considered'], |
677 | + [{'src': src, 'bytes': size, 'mtime': mtime} |
678 | + for (src, size, mtime) in files] |
679 | + ) |
680 | + self.assertEqual( |
681 | + inst.db[inst._id]['stats']['considered'], |
682 | + { |
683 | + 'count': len(files), |
684 | + 'bytes': sum(t[1] for t in files), |
685 | + } |
686 | + ) |
687 | |
688 | - def test_import_file(self): |
689 | + def test_import_file_private(self): |
690 | + """ |
691 | + Test the `Importer._import_file()` method. |
692 | + """ |
693 | tmp = TempDir() |
694 | inst = self.new(tmp.path) |
695 | + inst.start() |
696 | |
697 | # Test that AmbiguousPath is raised: |
698 | traversal = '/home/foo/.dmedia/../.ssh/id_rsa' |
699 | - e = raises(AmbiguousPath, inst.import_file, traversal) |
700 | + e = raises(AmbiguousPath, inst._import_file, traversal) |
701 | self.assertEqual(e.pathname, traversal) |
702 | self.assertEqual(e.abspath, '/home/foo/.ssh/id_rsa') |
703 | |
704 | # Test that IOError propagates up with missing file |
705 | nope = tmp.join('nope.mov') |
706 | - e = raises(IOError, inst.import_file, nope) |
707 | + e = raises(IOError, inst._import_file, nope) |
708 | self.assertEqual( |
709 | str(e), |
710 | '[Errno 2] No such file or directory: %r' % nope |
711 | @@ -339,7 +385,7 @@ |
712 | # Test that IOError propagates up with unreadable file |
713 | nope = tmp.touch('nope.mov') |
714 | os.chmod(nope, 0o000) |
715 | - e = raises(IOError, inst.import_file, nope) |
716 | + e = raises(IOError, inst._import_file, nope) |
717 | self.assertEqual( |
718 | str(e), |
719 | '[Errno 13] Permission denied: %r' % nope |
720 | @@ -351,7 +397,7 @@ |
721 | |
722 | # Test with new file |
723 | size = path.getsize(src1) |
724 | - (action, doc) = inst.import_file(src1) |
725 | + (action, doc) = inst._import_file(src1) |
726 | |
727 | self.assertEqual(action, 'imported') |
728 | self.assertEqual( |
729 | @@ -368,10 +414,9 @@ |
730 | 'stored', |
731 | |
732 | 'import_id', |
733 | - 'qid', |
734 | 'mtime', |
735 | - 'basename', |
736 | - 'dirname', |
737 | + 'name', |
738 | + 'dir', |
739 | 'content_type', |
740 | ]) |
741 | ) |
742 | @@ -384,42 +429,235 @@ |
743 | self.assertEqual(doc['bytes'], size) |
744 | self.assertEqual(doc['ext'], 'mov') |
745 | |
746 | - self.assertEqual(doc['import_id'], None) |
747 | - self.assertEqual(doc['qid'], mov_qid) |
748 | + self.assertEqual(doc['import_id'], inst._id) |
749 | self.assertEqual(doc['mtime'], path.getmtime(src1)) |
750 | - self.assertEqual(doc['basename'], 'MVI_5751.MOV') |
751 | - self.assertEqual(doc['dirname'], 'DCIM/100EOS5D2') |
752 | + self.assertEqual(doc['name'], 'MVI_5751.MOV') |
753 | + self.assertEqual(doc['dir'], 'DCIM/100EOS5D2') |
754 | self.assertEqual(doc['content_type'], 'video/quicktime') |
755 | |
756 | - self.assertEqual(inst.get_stats(), |
757 | - { |
758 | - 'imported': { |
759 | - 'count': 1, |
760 | - 'bytes': size, |
761 | - }, |
762 | - 'skipped': { |
763 | - 'count': 0, |
764 | - 'bytes': 0, |
765 | - }, |
766 | - } |
767 | - ) |
768 | - |
769 | # Test with duplicate |
770 | - (action, wrapper) = inst.import_file(src2) |
771 | - self.assertEqual(action, 'skipped') |
772 | - doc2 = dict(wrapper) |
773 | - doc2['_attachments'] = doc['_attachments'] |
774 | - self.assertEqual(doc2, doc) |
775 | - self.assertEqual(inst.get_stats(), |
776 | - { |
777 | - 'imported': { |
778 | - 'count': 1, |
779 | - 'bytes': size, |
780 | - }, |
781 | - 'skipped': { |
782 | - 'count': 1, |
783 | - 'bytes': size, |
784 | - }, |
785 | + (action, doc) = inst._import_file(src2) |
786 | + self.assertEqual(action, 'skipped') |
787 | + self.assertEqual(doc, inst.db[mov_hash]) |
788 | + |
789 | + # Test with duplicate with missing doc |
790 | + del inst.db[mov_hash] |
791 | + (action, doc) = inst._import_file(src2) |
792 | + self.assertEqual(action, 'skipped') |
793 | + self.assertEqual(doc['time'], inst.db[mov_hash]['time']) |
794 | + |
795 | + # Test with duplicate when doc is missing this filestore in store: |
796 | + old = inst.db[mov_hash] |
797 | + rid = random_id() |
798 | + old['stored'] = {rid: {'copies': 2, 'time': 1234567890}} |
799 | + inst.db.save(old) |
800 | + (action, doc) = inst._import_file(src2) |
801 | + fid = inst.filestore._id |
802 | + self.assertEqual(action, 'skipped') |
803 | + self.assertEqual(set(doc['stored']), set([rid, fid])) |
804 | + t = doc['stored'][fid]['time'] |
805 | + self.assertEqual( |
806 | + doc['stored'], |
807 | + { |
808 | + rid: {'copies': 2, 'time': 1234567890}, |
809 | + fid: {'copies': 1, 'time': t}, |
810 | + } |
811 | + ) |
812 | + self.assertEqual(inst.db[mov_hash]['stored'], doc['stored']) |
813 | + |
814 | + # Test with existing doc but missing file: |
815 | + old = inst.db[mov_hash] |
816 | + inst.filestore.remove(mov_hash, 'mov') |
817 | + (action, doc) = inst._import_file(src2) |
818 | + self.assertEqual(action, 'imported') |
819 | + self.assertEqual(doc['_rev'], old['_rev']) |
820 | + self.assertEqual(doc['time'], old['time']) |
821 | + self.assertEqual(inst.db[mov_hash], old) |
822 | + |
823 | + # Test with empty file: |
824 | + src3 = tmp.touch('DCIM', '100EOS5D2', 'foo.MOV') |
825 | + (action, doc) = inst._import_file(src3) |
826 | + self.assertEqual(action, 'empty') |
827 | + self.assertEqual(doc, None) |
828 | + |
829 | + def test_import_file(self): |
830 | + """ |
831 | + Test the `Importer.import_file()` method. |
832 | + """ |
833 | + tmp = TempDir() |
834 | + inst = self.new(tmp.path) |
835 | + inst.start() |
836 | + |
837 | + self.assertEqual(inst.doc['log']['error'], []) |
838 | + self.assertEqual(inst._processed, []) |
839 | + |
840 | + # Test that AmbiguousPath is raised: |
841 | + nope1 = '/home/foo/.dmedia/../.ssh/id_rsa' |
842 | + abspath = '/home/foo/.ssh/id_rsa' |
843 | + (action, error1) = inst.import_file(nope1, 17) |
844 | + self.assertEqual(action, 'error') |
845 | + self.assertEqual(error1, { |
846 | + 'src': nope1, |
847 | + 'name': 'AmbiguousPath', |
848 | + 'msg': '%r resolves to %r' % (nope1, abspath), |
849 | + }) |
850 | + self.assertEqual( |
851 | + inst.doc['log']['error'], |
852 | + [error1] |
853 | + ) |
854 | + self.assertEqual( |
855 | + inst._processed, |
856 | + [nope1] |
857 | + ) |
858 | + |
859 | + # Test that IOError propagates up with missing file |
860 | + nope2 = tmp.join('nope.mov') |
861 | + (action, error2) = inst.import_file(nope2, 18) |
862 | + self.assertEqual(action, 'error') |
863 | + self.assertEqual(error2, { |
864 | + 'src': nope2, |
865 | + 'name': 'IOError', |
866 | + 'msg': '[Errno 2] No such file or directory: %r' % nope2, |
867 | + }) |
868 | + self.assertEqual( |
869 | + inst.doc['log']['error'], |
870 | + [error1, error2] |
871 | + ) |
872 | + self.assertEqual( |
873 | + inst._processed, |
874 | + [nope1, nope2] |
875 | + ) |
876 | + |
877 | + # Test that IOError propagates up with unreadable file |
878 | + nope3 = tmp.touch('nope.mov') |
879 | + os.chmod(nope3, 0o000) |
880 | + try: |
881 | + (action, error3) = inst.import_file(nope3, 19) |
882 | + self.assertEqual(action, 'error') |
883 | + self.assertEqual(error3, { |
884 | + 'src': nope3, |
885 | + 'name': 'IOError', |
886 | + 'msg': '[Errno 13] Permission denied: %r' % nope3, |
887 | + }) |
888 | + self.assertEqual( |
889 | + inst.doc['log']['error'], |
890 | + [error1, error2, error3] |
891 | + ) |
892 | + self.assertEqual( |
893 | + inst._processed, |
894 | + [nope1, nope2, nope3] |
895 | + ) |
896 | + finally: |
897 | + os.chmod(nope3, 0o600) |
898 | + |
899 | + |
900 | + # Test with new files |
901 | + src1 = tmp.copy(sample_mov, 'DCIM', '100EOS5D2', 'MVI_5751.MOV') |
902 | + src2 = tmp.copy(sample_thm, 'DCIM', '100EOS5D2', 'MVI_5751.THM') |
903 | + self.assertEqual(inst.doc['log']['imported'], []) |
904 | + |
905 | + (action, imported1) = inst.import_file(src1, 17) |
906 | + self.assertEqual(action, 'imported') |
907 | + self.assertEqual(imported1, { |
908 | + 'src': src1, |
909 | + 'id': mov_hash, |
910 | + }) |
911 | + self.assertEqual( |
912 | + inst.doc['log']['imported'], |
913 | + [imported1] |
914 | + ) |
915 | + self.assertEqual( |
916 | + inst._processed, |
917 | + [nope1, nope2, nope3, src1] |
918 | + ) |
919 | + |
920 | + (action, imported2) = inst.import_file(src2, 17) |
921 | + self.assertEqual(action, 'imported') |
922 | + self.assertEqual(imported2, { |
923 | + 'src': src2, |
924 | + 'id': thm_hash, |
925 | + }) |
926 | + self.assertEqual( |
927 | + inst.doc['log']['imported'], |
928 | + [imported1, imported2] |
929 | + ) |
930 | + self.assertEqual( |
931 | + inst._processed, |
932 | + [nope1, nope2, nope3, src1, src2] |
933 | + ) |
934 | + |
935 | + # Test with duplicate files |
936 | + dup1 = tmp.copy(sample_mov, 'DCIM', '100EOS5D2', 'MVI_5750.MOV') |
937 | + dup2 = tmp.copy(sample_thm, 'DCIM', '100EOS5D2', 'MVI_5750.THM') |
938 | + self.assertEqual(inst.doc['log']['skipped'], []) |
939 | + |
940 | + (action, skipped1) = inst.import_file(dup1, 17) |
941 | + self.assertEqual(action, 'skipped') |
942 | + self.assertEqual(skipped1, { |
943 | + 'src': dup1, |
944 | + 'id': mov_hash, |
945 | + }) |
946 | + self.assertEqual( |
947 | + inst.doc['log']['skipped'], |
948 | + [skipped1] |
949 | + ) |
950 | + self.assertEqual( |
951 | + inst._processed, |
952 | + [nope1, nope2, nope3, src1, src2, dup1] |
953 | + ) |
954 | + |
955 | + (action, skipped2) = inst.import_file(dup2, 17) |
956 | + self.assertEqual(action, 'skipped') |
957 | + self.assertEqual(skipped2, { |
958 | + 'src': dup2, |
959 | + 'id': thm_hash, |
960 | + }) |
961 | + self.assertEqual( |
962 | + inst.doc['log']['skipped'], |
963 | + [skipped1, skipped2] |
964 | + ) |
965 | + self.assertEqual( |
966 | + inst._processed, |
967 | + [nope1, nope2, nope3, src1, src2, dup1, dup2] |
968 | + ) |
969 | + |
970 | + # Test with empty files |
971 | + emp1 = tmp.touch('DCIM', '100EOS5D2', 'MVI_5759.MOV') |
972 | + emp2 = tmp.touch('DCIM', '100EOS5D2', 'MVI_5759.THM') |
973 | + self.assertEqual(inst.doc['log']['empty'], []) |
974 | + |
975 | + (action, empty1) = inst.import_file(emp1, 17) |
976 | + self.assertEqual(action, 'empty') |
977 | + self.assertEqual(empty1, emp1) |
978 | + self.assertEqual( |
979 | + inst.doc['log']['empty'], |
980 | + [empty1] |
981 | + ) |
982 | + self.assertEqual( |
983 | + inst._processed, |
984 | + [nope1, nope2, nope3, src1, src2, dup1, dup2, emp1] |
985 | + ) |
986 | + |
987 | + (action, empty2) = inst.import_file(emp2, 17) |
988 | + self.assertEqual(action, 'empty') |
989 | + self.assertEqual(empty2, emp2) |
990 | + self.assertEqual( |
991 | + inst.doc['log']['empty'], |
992 | + [empty1, empty2] |
993 | + ) |
994 | + self.assertEqual( |
995 | + inst._processed, |
996 | + [nope1, nope2, nope3, src1, src2, dup1, dup2, emp1, emp2] |
997 | + ) |
998 | + |
999 | + # Check state of log one final time |
1000 | + self.assertEqual( |
1001 | + inst.doc['log'], |
1002 | + { |
1003 | + 'imported': [imported1, imported2], |
1004 | + 'skipped': [skipped1, skipped2], |
1005 | + 'empty': [empty1, empty2], |
1006 | + 'error': [error1, error2, error3], |
1007 | } |
1008 | ) |
1009 | |
1010 | @@ -430,87 +668,30 @@ |
1011 | src1 = tmp.copy(sample_mov, 'DCIM', '100EOS5D2', 'MVI_5751.MOV') |
1012 | dup1 = tmp.copy(sample_mov, 'DCIM', '100EOS5D2', 'MVI_5752.MOV') |
1013 | src2 = tmp.copy(sample_thm, 'DCIM', '100EOS5D2', 'MVI_5751.THM') |
1014 | + src3 = tmp.touch('DCIM', '100EOS5D2', 'Zar.MOV') |
1015 | + src4 = tmp.touch('DCIM', '100EOS5D2', 'Zoo.MOV') |
1016 | + |
1017 | |
1018 | import_id = inst.start() |
1019 | + inst.scanfiles() |
1020 | items = tuple(inst.import_all_iter()) |
1021 | - self.assertEqual(len(items), 3) |
1022 | + self.assertEqual(len(items), 5) |
1023 | self.assertEqual( |
1024 | - [t[:2] for t in items], |
1025 | - [ |
1026 | + items, |
1027 | + ( |
1028 | (src1, 'imported'), |
1029 | (src2, 'imported'), |
1030 | (dup1, 'skipped'), |
1031 | - ] |
1032 | - ) |
1033 | - |
1034 | - doc = items[0][2] |
1035 | - self.assertEqual(schema.check_dmedia_file(doc), None) |
1036 | - self.assertEqual(doc, |
1037 | - { |
1038 | - '_id': mov_hash, |
1039 | - '_rev': doc['_rev'], |
1040 | - '_attachments': { |
1041 | - 'leaves': { |
1042 | - 'data': b64encode(''.join(mov_leaves)), |
1043 | - 'content_type': 'application/octet-stream', |
1044 | - } |
1045 | - }, |
1046 | - 'type': 'dmedia/file', |
1047 | - 'time': doc['time'], |
1048 | - 'bytes': path.getsize(src1), |
1049 | - 'ext': 'mov', |
1050 | - 'origin': 'user', |
1051 | - 'stored': { |
1052 | - inst.filestore._id: { |
1053 | - 'copies': 1, |
1054 | - 'time': doc['time'], |
1055 | - }, |
1056 | - }, |
1057 | - |
1058 | - 'import_id': import_id, |
1059 | - 'qid': mov_qid, |
1060 | - 'mtime': path.getmtime(src1), |
1061 | - 'basename': 'MVI_5751.MOV', |
1062 | - 'dirname': 'DCIM/100EOS5D2', |
1063 | - 'content_type': 'video/quicktime', |
1064 | - } |
1065 | - ) |
1066 | - |
1067 | - doc = items[1][2] |
1068 | - self.assertEqual(schema.check_dmedia_file(doc), None) |
1069 | - self.assertEqual(doc, |
1070 | - { |
1071 | - '_id': thm_hash, |
1072 | - '_rev': doc['_rev'], |
1073 | - '_attachments': { |
1074 | - 'leaves': { |
1075 | - 'data': b64encode(''.join(thm_leaves)), |
1076 | - 'content_type': 'application/octet-stream', |
1077 | - } |
1078 | - }, |
1079 | - 'type': 'dmedia/file', |
1080 | - 'time': doc['time'], |
1081 | - 'bytes': path.getsize(src2), |
1082 | - 'ext': 'thm', |
1083 | - 'origin': 'user', |
1084 | - 'stored': { |
1085 | - inst.filestore._id: { |
1086 | - 'copies': 1, |
1087 | - 'time': doc['time'], |
1088 | - }, |
1089 | - }, |
1090 | - |
1091 | - 'import_id': import_id, |
1092 | - 'qid': thm_qid, |
1093 | - 'mtime': path.getmtime(src2), |
1094 | - 'basename': 'MVI_5751.THM', |
1095 | - 'dirname': 'DCIM/100EOS5D2', |
1096 | - 'content_type': None, |
1097 | - } |
1098 | - ) |
1099 | - |
1100 | + (src3, 'empty'), |
1101 | + (src4, 'empty'), |
1102 | + ) |
1103 | + ) |
1104 | self.assertEqual(inst.finalize(), |
1105 | { |
1106 | + 'considered': { |
1107 | + 'count': 5, |
1108 | + 'bytes': path.getsize(src1) * 2 + path.getsize(src2), |
1109 | + }, |
1110 | 'imported': { |
1111 | 'count': 2, |
1112 | 'bytes': path.getsize(src1) + path.getsize(src2), |
1113 | @@ -519,6 +700,8 @@ |
1114 | 'count': 1, |
1115 | 'bytes': path.getsize(dup1), |
1116 | }, |
1117 | + 'empty': {'count': 2, 'bytes': 0}, |
1118 | + 'error': {'count': 0, 'bytes': 0}, |
1119 | } |
1120 | ) |
1121 | |
1122 | @@ -569,7 +752,7 @@ |
1123 | dict( |
1124 | signal='progress', |
1125 | args=(base, _id, 1, 3, |
1126 | - dict(action='imported', src=src1, _id=mov_hash) |
1127 | + dict(action='imported', src=src1) |
1128 | ), |
1129 | worker='ImportWorker', |
1130 | pid=pid, |
1131 | @@ -579,7 +762,7 @@ |
1132 | dict( |
1133 | signal='progress', |
1134 | args=(base, _id, 2, 3, |
1135 | - dict(action='imported', src=src2, _id=thm_hash) |
1136 | + dict(action='imported', src=src2) |
1137 | ), |
1138 | worker='ImportWorker', |
1139 | pid=pid, |
1140 | @@ -589,7 +772,7 @@ |
1141 | dict( |
1142 | signal='progress', |
1143 | args=(base, _id, 3, 3, |
1144 | - dict(action='skipped', src=dup1, _id=mov_hash) |
1145 | + dict(action='skipped', src=dup1) |
1146 | ), |
1147 | worker='ImportWorker', |
1148 | pid=pid, |
1149 | @@ -601,8 +784,11 @@ |
1150 | signal='finished', |
1151 | args=(base, _id, |
1152 | dict( |
1153 | + considered={'count': 3, 'bytes': (mov_size*2 + thm_size)}, |
1154 | imported={'count': 2, 'bytes': (mov_size + thm_size)}, |
1155 | skipped={'count': 1, 'bytes': mov_size}, |
1156 | + empty={'count': 0, 'bytes': 0}, |
1157 | + error={'count': 0, 'bytes': 0}, |
1158 | ), |
1159 | ), |
1160 | worker='ImportWorker', |
1161 | @@ -629,7 +815,7 @@ |
1162 | inst._start_batch() |
1163 | self.assertEqual(inst._completed, 0) |
1164 | self.assertEqual(inst._total, 0) |
1165 | - batch = inst._batch |
1166 | + batch = inst.doc |
1167 | batch_id = batch['_id'] |
1168 | self.assertTrue(isinstance(batch, dict)) |
1169 | self.assertEqual( |
1170 | @@ -639,9 +825,9 @@ |
1171 | 'type', |
1172 | 'time', |
1173 | 'imports', |
1174 | - 'imported', |
1175 | - 'skipped', |
1176 | + 'errors', |
1177 | 'machine_id', |
1178 | + 'stats', |
1179 | ]) |
1180 | ) |
1181 | self.assertEqual(batch['type'], 'dmedia/batch') |
1182 | @@ -662,10 +848,12 @@ |
1183 | callback = DummyCallback() |
1184 | inst = self.klass(callback, self.dbname) |
1185 | batch_id = random_id() |
1186 | - inst._batch = dict( |
1187 | + inst.doc = dict( |
1188 | _id=batch_id, |
1189 | - imported={'count': 17, 'bytes': 98765}, |
1190 | - skipped={'count': 3, 'bytes': 12345}, |
1191 | + stats=dict( |
1192 | + imported={'count': 17, 'bytes': 98765}, |
1193 | + skipped={'count': 3, 'bytes': 12345}, |
1194 | + ), |
1195 | ) |
1196 | |
1197 | # Make sure it checks that workers is empty |
1198 | @@ -676,7 +864,7 @@ |
1199 | # Check that it fires signal correctly |
1200 | inst._workers.clear() |
1201 | inst._finish_batch() |
1202 | - self.assertEqual(inst._batch, None) |
1203 | + self.assertEqual(inst.doc, None) |
1204 | stats = dict( |
1205 | imported=17, |
1206 | imported_bytes=98765, |
1207 | @@ -695,20 +883,48 @@ |
1208 | set([ |
1209 | '_id', |
1210 | '_rev', |
1211 | - 'imported', |
1212 | - 'skipped', |
1213 | + 'stats', |
1214 | 'time_end', |
1215 | ]) |
1216 | ) |
1217 | cur = time.time() |
1218 | self.assertTrue(cur - 1 <= doc['time_end'] <= cur) |
1219 | |
1220 | + def test_on_error(self): |
1221 | + callback = DummyCallback() |
1222 | + inst = self.klass(callback, self.dbname) |
1223 | + |
1224 | + # Make sure it works when doc is None: |
1225 | + inst.on_error('foo', 'IOError', 'nope') |
1226 | + self.assertEqual(inst.doc, None) |
1227 | + |
1228 | + # Test normally: |
1229 | + inst._start_batch() |
1230 | + self.assertEqual(inst.doc['errors'], []) |
1231 | + inst.on_error('foo', 'IOError', 'nope') |
1232 | + doc = inst.db[inst.doc['_id']] |
1233 | + self.assertEqual( |
1234 | + doc['errors'], |
1235 | + [ |
1236 | + {'key': 'foo', 'name': 'IOError', 'msg': 'nope'}, |
1237 | + ] |
1238 | + ) |
1239 | + inst.on_error('bar', 'error!', 'no way') |
1240 | + doc = inst.db[inst.doc['_id']] |
1241 | + self.assertEqual( |
1242 | + doc['errors'], |
1243 | + [ |
1244 | + {'key': 'foo', 'name': 'IOError', 'msg': 'nope'}, |
1245 | + {'key': 'bar', 'name': 'error!', 'msg': 'no way'}, |
1246 | + ] |
1247 | + ) |
1248 | + |
1249 | def test_on_started(self): |
1250 | callback = DummyCallback() |
1251 | inst = self.klass(callback, self.dbname) |
1252 | self.assertEqual(callback.messages, []) |
1253 | inst._start_batch() |
1254 | - batch_id = inst._batch['_id'] |
1255 | + batch_id = inst.doc['_id'] |
1256 | self.assertEqual(inst.db[batch_id]['imports'], []) |
1257 | self.assertEqual( |
1258 | callback.messages, |
1259 | @@ -816,10 +1032,12 @@ |
1260 | callback = DummyCallback() |
1261 | inst = self.klass(callback, self.dbname) |
1262 | batch_id = random_id() |
1263 | - inst._batch = dict( |
1264 | + inst.doc = dict( |
1265 | _id=batch_id, |
1266 | - imported={'count': 0, 'bytes': 0}, |
1267 | - skipped={'count': 0, 'bytes': 0}, |
1268 | + stats=dict( |
1269 | + imported={'count': 0, 'bytes': 0}, |
1270 | + skipped={'count': 0, 'bytes': 0}, |
1271 | + ), |
1272 | ) |
1273 | |
1274 | # Call with first import |
1275 | @@ -843,16 +1061,16 @@ |
1276 | ] |
1277 | ) |
1278 | self.assertEqual( |
1279 | - set(inst._batch), |
1280 | - set(['_id', '_rev', 'imported', 'skipped']) |
1281 | + set(inst.doc), |
1282 | + set(['_id', '_rev', 'stats']) |
1283 | ) |
1284 | - self.assertEqual(inst._batch['_id'], batch_id) |
1285 | + self.assertEqual(inst.doc['_id'], batch_id) |
1286 | self.assertEqual( |
1287 | - inst._batch['imported'], |
1288 | + inst.doc['stats']['imported'], |
1289 | {'count': 17, 'bytes': 98765} |
1290 | ) |
1291 | self.assertEqual( |
1292 | - inst._batch['skipped'], |
1293 | + inst.doc['stats']['skipped'], |
1294 | {'count': 3, 'bytes': 12345} |
1295 | ) |
1296 | |
1297 | @@ -884,16 +1102,16 @@ |
1298 | ] |
1299 | ) |
1300 | self.assertEqual( |
1301 | - set(inst._batch), |
1302 | - set(['_id', '_rev', 'imported', 'skipped']) |
1303 | + set(inst.doc), |
1304 | + set(['_id', '_rev', 'stats']) |
1305 | ) |
1306 | - self.assertEqual(inst._batch['_id'], batch_id) |
1307 | + self.assertEqual(inst.doc['_id'], batch_id) |
1308 | self.assertEqual( |
1309 | - inst._batch['imported'], |
1310 | + inst.doc['stats']['imported'], |
1311 | {'count': 17 + 18, 'bytes': 98765 + 9876} |
1312 | ) |
1313 | self.assertEqual( |
1314 | - inst._batch['skipped'], |
1315 | + inst.doc['stats']['skipped'], |
1316 | {'count': 3 + 5, 'bytes': 12345 + 1234} |
1317 | ) |
1318 | |
1319 | @@ -948,21 +1166,21 @@ |
1320 | self.assertEqual( |
1321 | callback.messages[3], |
1322 | ('ImportProgress', (base, import_id, 1, 3, |
1323 | - dict(action='imported', src=src1, _id=mov_hash) |
1324 | + dict(action='imported', src=src1) |
1325 | ) |
1326 | ) |
1327 | ) |
1328 | self.assertEqual( |
1329 | callback.messages[4], |
1330 | ('ImportProgress', (base, import_id, 2, 3, |
1331 | - dict(action='imported', src=src2, _id=thm_hash) |
1332 | + dict(action='imported', src=src2) |
1333 | ) |
1334 | ) |
1335 | ) |
1336 | self.assertEqual( |
1337 | callback.messages[5], |
1338 | ('ImportProgress', (base, import_id, 3, 3, |
1339 | - dict(action='skipped', src=dup1, _id=mov_hash) |
1340 | + dict(action='skipped', src=dup1) |
1341 | ) |
1342 | ) |
1343 | ) |
1344 | |
1345 | === modified file 'dmedia/tests/test_metastore.py' |
1346 | --- dmedia/tests/test_metastore.py 2011-01-30 21:24:30 +0000 |
1347 | +++ dmedia/tests/test_metastore.py 2011-02-19 04:44:09 +0000 |
1348 | @@ -124,42 +124,6 @@ |
1349 | self.assertEqual(inst.machine_id, _id) |
1350 | self.assertEqual(inst._machine_id, _id) |
1351 | |
1352 | - def test_by_quickid(self): |
1353 | - mov_chash = 'OMLUWEIPEUNRGYMKAEHG3AEZPVZ5TUQE' |
1354 | - mov_qid = 'GJ4AQP3BK3DMTXYOLKDK6CW4QIJJGVMN' |
1355 | - thm_chash = 'F6ATTKI6YVWVRBQQESAZ4DSUXQ4G457A' |
1356 | - thm_qid = 'EYCDXXCNDB6OIIX5DN74J7KEXLNCQD5M' |
1357 | - inst = self.new() |
1358 | - self.assertEqual( |
1359 | - list(inst.by_quickid(mov_qid)), |
1360 | - [] |
1361 | - ) |
1362 | - inst.db.create( |
1363 | - {'_id': thm_chash, 'qid': thm_qid, 'type': 'dmedia/file'} |
1364 | - ) |
1365 | - self.assertEqual( |
1366 | - list(inst.by_quickid(mov_qid)), |
1367 | - [] |
1368 | - ) |
1369 | - inst.db.create( |
1370 | - {'_id': mov_chash, 'qid': mov_qid, 'type': 'dmedia/file'} |
1371 | - ) |
1372 | - self.assertEqual( |
1373 | - list(inst.by_quickid(mov_qid)), |
1374 | - [mov_chash] |
1375 | - ) |
1376 | - self.assertEqual( |
1377 | - list(inst.by_quickid(thm_qid)), |
1378 | - [thm_chash] |
1379 | - ) |
1380 | - inst.db.create( |
1381 | - {'_id': 'should-not-happen', 'qid': mov_qid, 'type': 'dmedia/file'} |
1382 | - ) |
1383 | - self.assertEqual( |
1384 | - list(inst.by_quickid(mov_qid)), |
1385 | - [mov_chash, 'should-not-happen'] |
1386 | - ) |
1387 | - |
1388 | def test_total_bytes(self): |
1389 | inst = self.new() |
1390 | self.assertEqual(inst.total_bytes(), 0) |
1391 | |
1392 | === modified file 'dmedia/util.py' |
1393 | --- dmedia/util.py 2011-02-15 07:53:55 +0000 |
1394 | +++ dmedia/util.py 2011-02-19 04:44:09 +0000 |
1395 | @@ -43,12 +43,15 @@ |
1396 | def configure_logging(namespace): |
1397 | format = [ |
1398 | '%(levelname)s', |
1399 | - '%(message)s' |
1400 | + '%(process)d', |
1401 | + '%(message)s', |
1402 | ] |
1403 | cache = path.join(xdg.BaseDirectory.xdg_cache_home, 'dmedia') |
1404 | if not path.exists(cache): |
1405 | os.makedirs(cache) |
1406 | filename = path.join(cache, namespace + '.log') |
1407 | + if path.exists(filename): |
1408 | + os.rename(filename, filename + '.previous') |
1409 | logging.basicConfig( |
1410 | filename=filename, |
1411 | filemode='w', |
1412 | |
1413 | === modified file 'dmedia/workers.py' |
1414 | --- dmedia/workers.py 2011-02-07 07:19:55 +0000 |
1415 | +++ dmedia/workers.py 2011-02-19 04:44:09 +0000 |
1416 | @@ -165,7 +165,7 @@ |
1417 | pass |
1418 | |
1419 | def _process_message(self, msg): |
1420 | - log.info('%(signal)s %(args)r', msg) |
1421 | + log.info('[From %(pid)d] %(signal)s %(args)r', msg) |
1422 | with self._lock: |
1423 | signal = msg['signal'] |
1424 | args = msg['args'] |
Perhaps bad form, but I'm approving this myself. I'm excited about reviews and they've already proven a great way to get more people engaged in the code, but I'm reserving the right to self approve when needed. If there aren't takers for a review, I can't let that hold things up for too long. Velocity needs to stay as high as possible.
This is an important change and needs to get some abuse through the daily builds before 0.4 is released, so I don't want to wait any longer on this one.
"Jason, looks great! --Jason"