Merge lp:~jderose/filestore/helpers into lp:filestore

Proposed by Jason Gerard DeRose
Status: Merged
Merged at revision: 190
Proposed branch: lp:~jderose/filestore/helpers
Merge into: lp:filestore
Diff against target: 453 lines (+251/-50)
2 files modified
filestore.py (+95/-42)
test_filestore.py (+156/-8)
To merge this branch: bzr merge lp:~jderose/filestore/helpers
Reviewer Review Type Date Requested Status
David Jordan Approve
Review via email: mp+74932@code.launchpad.net

Description of the change

This merge:

* Adds 3 new highish-level FileStore methods:

  FileStore.content_md5(_id) - compute a known correct MD5 hash (for uploading to S3 using boto)

  FileStore.verify_and_move(tmp_fp, _id) - check content hash of a file in .dmedia/partial/, and if correct, move the file into its canonical location. This replaces an equivalent method of the legacy FileStore that is used by the S3 and BitTorrent backends.

  FileStore.hash_and_move(tmp_fp) - calculate content hash of a file in .dmedia/tmp/, and then move the file into its canonical location. This replaces an equivalent method of the legacy FileStore that is used by the transcoder.

* Makes the use of the `Leaf` namedtuple consistent. Experience has shown the code is simpler and more robust if you keep track of the position in the file (leaf_index) a given chunk of leaf_data corresponds to. As of several revisions ago, when a leaf is read by reader() or batch_reader(), the index and data are bundled together right there in a Leaf(index, data) namedtuple. But this merge fixes some inconsistencies higher up:

  - Consumers of `Leaf` no longer unpack it into (i, data)

  - Consumers of `Leaf` yield/return the exact `Leaf` rather than (i, data)

  - Hasher.update() has been renamed to the clearer Hasher.hash_leaf()

  - Hasher.hash_leaf() now takes a `Leaf` instead of `bytes`

  - Hasher is now more robust as it makes sure the Leaf.index from the readers matches up with its expected leaf_index

To post a comment you must log in.
Revision history for this message
David Jordan (dmj726) wrote :

Looks a lot simpler. Approved.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'filestore.py'
2--- filestore.py 2011-09-11 17:13:58 +0000
3+++ filestore.py 2011-09-12 03:19:24 +0000
4@@ -298,7 +298,8 @@
5 import tempfile
6 import stat
7 from subprocess import check_call, CalledProcessError
8-from base64 import b32encode, b32decode
9+from base64 import b32encode, b32decode, b64encode
10+import hashlib
11 from threading import Thread
12 from queue import Queue
13 from collections import namedtuple
14@@ -724,25 +725,26 @@
15 self.array = bytearray()
16 self.closed = False
17
18- def update(self, leaf):
19- assert isinstance(leaf, bytes)
20- assert 1 <= len(leaf) <= LEAF_SIZE
21- assert not self.closed
22- if len(leaf) < LEAF_SIZE:
23+ def hash_leaf(self, leaf):
24+ if not isinstance(leaf, Leaf):
25+ raise TypeError(TYPE_ERROR.format('leaf', Leaf, type(leaf), leaf))
26+ if self.closed:
27+ raise Exception('Cannot call Hasher.hash_leaf() when Hasher.closed')
28+ if leaf.index != self.leaf_index:
29+ raise Exception('Expected leaf.index {}, got {}'.format(
30+ self.leaf_index, leaf.index)
31+ )
32+
33+ if len(leaf.data) < LEAF_SIZE:
34 self.closed = True
35-
36- leaf_hash = hash_leaf(self.leaf_index, leaf)
37+ leaf_hash = hash_leaf(leaf.index, leaf.data)
38 self.array.extend(leaf_hash)
39- self.file_size += len(leaf)
40-
41- low = self.leaf_index * LEAF_SIZE + 1
42- high = (self.leaf_index + 1) * LEAF_SIZE
43- assert low <= self.file_size <= high
44-
45+ self.file_size += len(leaf.data)
46 self.leaf_index += 1
47 return leaf_hash
48
49 def content_hash(self):
50+ self.closed = True
51 leaf_hashes = bytes(self.array)
52 return ContentHash(
53 hash_root(self.file_size, leaf_hashes),
54@@ -824,14 +826,14 @@
55 assert isinstance(src_fp, (io.BufferedReader, io.BufferedRandom))
56 assert src_fp.mode in ('rb', 'rb+')
57 src_fp.seek(0)
58- i = 0
59+ index = 0
60 while True:
61 data = src_fp.read(LEAF_SIZE)
62 if not data:
63 queue.put(None)
64 break
65- queue.put(Leaf(i, data))
66- i += 1
67+ queue.put(Leaf(index, data))
68+ index += 1
69
70
71 def batch_reader(batch, queue):
72@@ -890,8 +892,7 @@
73 """
74 Iterate through leaves in *src_fp*, reading in a separate thread.
75
76- The function yields a ``(leaf_index, leaf_data)`` tuple for each leaf in
77- *src_fp*.
78+ The function yields a `Leaf` namedtuple for each leaf in *src_fp*.
79
80 Many operations in the `FileStore` involve a loop where an 8 MiB leaf is
81 read, then hashed, then possibly written to disk. As the hashing
82@@ -904,8 +905,8 @@
83 >>> def example():
84 ... h = Hasher()
85 ... src_fp = open('/my/file', 'rb')
86- ... for (leaf_index, leaf) in reader_iter(src_fp):
87- ... h.update(leaf)
88+ ... for leaf in reader_iter(src_fp):
89+ ... h.hash_leaf(leaf)
90 ...
91
92 The thread target is the `reader()` function.
93@@ -987,7 +988,7 @@
94 leaf = q.get()
95 if leaf is EndFile:
96 break
97- h.update(leaf.data)
98+ h.hash_leaf(leaf)
99 for tmp_fp in temps:
100 tmp_fp.write(leaf.data)
101 ch = h.content_hash()
102@@ -1013,10 +1014,10 @@
103 :param dst_fp: optional file opened in ``'wb'`` mode
104 """
105 hasher = Hasher()
106- for (i, leaf) in reader_iter(src_fp):
107- hasher.update(leaf)
108+ for leaf in reader_iter(src_fp):
109+ hasher.hash_leaf(leaf)
110 if dst_fp:
111- dst_fp.write(leaf)
112+ dst_fp.write(leaf.data)
113 return hasher.content_hash()
114
115
116@@ -1296,37 +1297,61 @@
117 if size != file_size:
118 corrupt = self.move_to_corrupt(src_fp, _id)
119 raise SizeIntegrityError(self.parent, _id, file_size, size)
120- for (i, leaf) in reader_iter(src_fp):
121- got = hash_leaf(i, leaf)
122- expected = get_leaf_hash(leaf_hashes, i)
123+ for leaf in reader_iter(src_fp):
124+ got = hash_leaf(leaf.index, leaf.data)
125+ expected = get_leaf_hash(leaf_hashes, leaf.index)
126 if got != expected:
127 self.move_to_corrupt(src_fp, _id)
128- raise LeafIntegrityError(self.parent, _id, i, expected, got)
129- yield (i, leaf)
130- assert get_leaf_hash(leaf_hashes, i + 1) == b''
131+ raise LeafIntegrityError(
132+ self.parent, _id, leaf.index, expected, got
133+ )
134+ yield leaf
135+ assert get_leaf_hash(leaf_hashes, leaf.index + 1) == b''
136
137 def verify_iter2(self, _id):
138 """
139 Yield each leaf as it's read, verifying file integrity after final leaf.
140
141- This method will yield ``(leaf_index, leaf_data)`` tuple for each leaf
142- in the file identified by *_id*. Use this method with care as the
143- integrity of the leaves is not known till after the last leaf has been
144- yielded.
145+ This method will yield a `Leaf` namedtuple for each leaf in the file
146+ identified by *_id*. Use this method with care as the integrity of the
147+ leaves is not known till after the last leaf has been yielded.
148
149 This method is similar to `FileStore.verify_iter()` except only the
150 *_id* is needed.
151 """
152 src_fp = self.open(_id)
153 h = Hasher()
154- for (i, leaf) in reader_iter(src_fp):
155- h.update(leaf)
156- yield (i, leaf)
157+ for leaf in reader_iter(src_fp):
158+ h.hash_leaf(leaf)
159+ yield leaf
160 c = h.content_hash()
161 if c.id != _id:
162 self.move_to_corrupt(src_fp, _id)
163 raise FileIntegrityError(self.parent, _id, c.id)
164
165+ def content_md5(self, _id):
166+ """
167+ Compute md5 hash of the file with *_id* for use in Content-MD5 header.
168+
169+ For example:
170+
171+ >>> fs = FileStore('/home/jderose') #doctest: +SKIP
172+ >>> fs.content_md5('YDDL5ROVABZP4NBSJPC3HUQDVDAGAP5L26YFXD3UR6N5OLVN') #doctest: +SKIP
173+ ('99ca2a74521ad7825768bbfe7fe0dc49', 'mcoqdFIa14JXaLv+f+DcSQ==')
174+
175+ This method guarantees the correct md5 hash is computed for the file
176+ with *_id* because it verifies the file as it computes the md5 hash.
177+ Note that you do not have this guarantee if you simply opened the file
178+ with `FileStore.open()` and computed the md5 hash that way.
179+
180+ If you need the md5 hash for use with boto when uploading to S3, use
181+ this method rather than letting boto compute the md5 hash itself.
182+ """
183+ md5 = hashlib.md5()
184+ for leaf in self.verify_iter2(_id):
185+ md5.update(leaf.data)
186+ return (md5.hexdigest(), b64encode(md5.digest()).decode('utf-8'))
187+
188 def remove(self, _id):
189 """
190 Delete file with *_id* from underlying filesystem.
191@@ -1445,7 +1470,7 @@
192 """
193 Move a file from its canonical location to its corrupt location.
194
195- While a file is found to be corrupt (meaning the computed content hash
196+ When a file is found to be corrupt (meaning the computed content hash
197 doesn't match the expected content hash), it is moved from its canonical
198 location to a special corrupt location.
199
200@@ -1476,6 +1501,34 @@
201 src_fp.close()
202 return dst
203
204+ def verify_and_move(self, tmp_fp, _id):
205+ allowed = (io.BufferedReader, io.BufferedRandom)
206+ if not isinstance(tmp_fp, allowed):
207+ raise TypeError(
208+ TYPE_ERROR.format('tmp_fp', allowed, type(tmp_fp), tmp_fp)
209+ )
210+ if tmp_fp.name != self.partial_path(_id):
211+ raise ValueError(
212+ 'bad partial_path() for {!r}: {!r}'.format(_id, tmp_fp.name)
213+ )
214+ ch = hash_fp(tmp_fp)
215+ if ch.id != _id:
216+ raise ValueError(
217+ 'expected {!r}, computed {!r}'.format(_id, ch.id)
218+ )
219+ self.move_to_canonical(tmp_fp, _id)
220+ return ch
221+
222+ def hash_and_move(self, tmp_fp):
223+ allowed = (io.BufferedReader, io.BufferedRandom)
224+ if not isinstance(tmp_fp, allowed):
225+ raise TypeError(
226+ TYPE_ERROR.format('tmp_fp', allowed, type(tmp_fp), tmp_fp)
227+ )
228+ ch = hash_fp(tmp_fp)
229+ self.move_to_canonical(tmp_fp, ch.id)
230+ return ch
231+
232 def import_file(self, src_fp):
233 """
234 Atomically copy open file *src_fp* into this filestore.
235@@ -1524,10 +1577,10 @@
236 size = os.fstat(src_fp.fileno()).st_size
237 temps = [fs.allocate_tmp(size) for fs in filestores]
238 h = Hasher()
239- for (i, leaf) in reader_iter(src_fp):
240- h.update(leaf)
241+ for leaf in reader_iter(src_fp):
242+ h.hash_leaf(leaf)
243 for tmp_fp in temps:
244- tmp_fp.write(leaf)
245+ tmp_fp.write(leaf.data)
246 c = h.content_hash()
247 if c.id != _id:
248 self.move_to_corrupt(src_fp, _id)
249
250=== modified file 'test_filestore.py'
251--- test_filestore.py 2011-09-11 09:13:31 +0000
252+++ test_filestore.py 2011-09-12 03:19:24 +0000
253@@ -1052,30 +1052,76 @@
254 self.assertEqual(h.array, b'')
255 self.assertFalse(h.closed)
256
257- def test_update(self):
258- h = filestore.Hasher()
259-
260- self.assertEqual(h.update(LEAVES[0]), LEAF_HASHES[0:30])
261+ def test_hash_leaf(self):
262+ # Test with bad leaf type
263+ h = filestore.Hasher()
264+ with self.assertRaises(TypeError) as cm:
265+ h.hash_leaf(b'nope')
266+ self.assertEqual(
267+ str(cm.exception),
268+ TYPE_ERROR.format('leaf', filestore.Leaf, bytes, b'nope')
269+ )
270+
271+ # We'll use these below
272+ leaf0 = filestore.Leaf(0, LEAVES[0])
273+ leaf1 = filestore.Leaf(1, LEAVES[1])
274+ leaf2 = filestore.Leaf(2, LEAVES[2])
275+
276+ # Test when closed
277+ h = filestore.Hasher()
278+ h.closed = True
279+ with self.assertRaises(Exception) as cm:
280+ h.hash_leaf(leaf0)
281+ self.assertEqual(
282+ str(cm.exception),
283+ 'Cannot call Hasher.hash_leaf() when Hasher.closed'
284+ )
285+
286+ # Test when leaf_index is wrong
287+ h = filestore.Hasher()
288+ h.leaf_index = 1
289+ with self.assertRaises(Exception) as cm:
290+ h.hash_leaf(leaf0)
291+ self.assertEqual(
292+ str(cm.exception),
293+ 'Expected leaf.index 1, got 0'
294+ )
295+
296+ # Test when it's all good
297+ h = filestore.Hasher()
298+
299+ self.assertEqual(h.hash_leaf(leaf0), LEAF_HASHES[0:30])
300 self.assertEqual(h.leaf_index, 1)
301 self.assertEqual(h.file_size, filestore.LEAF_SIZE)
302 self.assertFalse(h.closed)
303
304- self.assertEqual(h.update(LEAVES[1]), LEAF_HASHES[30:60])
305+ self.assertEqual(h.hash_leaf(leaf1), LEAF_HASHES[30:60])
306 self.assertEqual(h.leaf_index, 2)
307 self.assertEqual(h.file_size, filestore.LEAF_SIZE * 2)
308 self.assertFalse(h.closed)
309
310- self.assertEqual(h.update(LEAVES[2]), LEAF_HASHES[60:90])
311+ self.assertEqual(h.hash_leaf(leaf2), LEAF_HASHES[60:90])
312 self.assertEqual(h.leaf_index, 3)
313 self.assertEqual(h.file_size, sum(len(l) for l in LEAVES))
314 self.assertTrue(h.closed)
315
316 def test_content_hash(self):
317+ leaf0 = filestore.Leaf(0, LEAVES[0])
318+ leaf1 = filestore.Leaf(1, LEAVES[1])
319+ leaf2 = filestore.Leaf(2, LEAVES[2])
320+
321 h = filestore.Hasher()
322- for l in LEAVES:
323- h.update(l)
324+ for leaf in (leaf0, leaf1, leaf2):
325+ h.hash_leaf(leaf)
326 self.assertEqual(h.content_hash(), CH)
327
328+ # Test that Hasher.content_hash() sets closed = True
329+ h = filestore.Hasher()
330+ h.hash_leaf(leaf0)
331+ self.assertFalse(h.closed)
332+ h.content_hash()
333+ self.assertTrue(h.closed)
334+
335
336 class TestFileStore(TestCase):
337 def test_init(self):
338@@ -1671,6 +1717,50 @@
339 self.assertFalse(path.exists(canonical))
340 self.assertTrue(path.isfile(corrupt))
341
342+ def test_content_md5(self):
343+ tmp = TempDir()
344+ fs = filestore.FileStore(tmp.dir)
345+
346+ canonical = fs.path(ID)
347+ corrupt = fs.corrupt_path(ID)
348+
349+ # File doesn't exist
350+ with self.assertRaises(IOError) as cm:
351+ md5 = fs.content_md5(ID)
352+ self.assertEqual(cm.exception.errno, 2)
353+
354+ # File exists:
355+ fp = open(canonical, 'wb')
356+ for leaf in LEAVES:
357+ fp.write(leaf)
358+ fp.close()
359+
360+ self.assertEqual(
361+ fs.content_md5(ID),
362+ ('99ca2a74521ad7825768bbfe7fe0dc49', 'mcoqdFIa14JXaLv+f+DcSQ==')
363+ )
364+
365+ # File exists and is corrupted
366+ fp = open(canonical, 'wb')
367+ for leaf in LEAVES:
368+ fp.write(leaf)
369+ fp.write(b'F')
370+ fp.close()
371+ c = filestore.hash_fp(open(canonical, 'rb'))
372+ self.assertNotEqual(c.id, ID)
373+
374+ self.assertTrue(path.isfile(canonical))
375+ self.assertFalse(path.exists(corrupt))
376+
377+ with self.assertRaises(filestore.FileIntegrityError) as cm:
378+ md5 = fs.content_md5(ID)
379+ self.assertEqual(cm.exception.id, ID)
380+ self.assertEqual(cm.exception.parent, tmp.dir)
381+ self.assertEqual(cm.exception.bad_id, c.id)
382+
383+ self.assertFalse(path.exists(canonical))
384+ self.assertTrue(path.isfile(corrupt))
385+
386 def test_remove(self):
387 tmp = TempDir()
388 fs = filestore.FileStore(tmp.dir)
389@@ -1890,6 +1980,64 @@
390 self.assertTrue(path.isfile(corrupt))
391 self.assertEqual(open(corrupt, 'rb').read(), b'yup')
392
393+ def test_verify_and_move(self):
394+ # Test when it's all good
395+ tmp = TempDir()
396+ fs = filestore.FileStore(tmp.dir)
397+ src = fs.partial_path(ID)
398+ write_sample_file(src)
399+ src_fp = open(src, 'rb')
400+ self.assertFalse(fs.exists(ID))
401+ self.assertEqual(fs.verify_and_move(src_fp, ID), CH)
402+ self.assertTrue(fs.exists(ID))
403+ self.assertEqual(fs.verify(ID), CH)
404+
405+ # Test when tmp_fp.name != partial_path(_id)
406+ tmp = TempDir()
407+ fs = filestore.FileStore(tmp.dir)
408+ src = path.join(fs.tmp, 'foo.mov')
409+ write_sample_file(src)
410+ src_fp = open(src, 'rb')
411+ with self.assertRaises(ValueError) as cm:
412+ fs.verify_and_move(src_fp, ID)
413+ self.assertEqual(
414+ str(cm.exception),
415+ 'bad partial_path() for {!r}: {!r}'.format(ID, src)
416+ )
417+ self.assertFalse(fs.exists(ID))
418+
419+ # Test when partial has wrong content
420+ tmp = TempDir()
421+ fs = filestore.FileStore(tmp.dir)
422+ src = fs.partial_path(ID)
423+ src_fp = open(src, 'wb')
424+ for leaf in LEAVES:
425+ src_fp.write(leaf)
426+ src_fp.write(b'F')
427+ src_fp.close()
428+ ch = filestore.hash_fp(open(src, 'rb'))
429+ self.assertNotEqual(ch.id, ID)
430+ src_fp = open(src, 'rb')
431+ with self.assertRaises(ValueError) as cm:
432+ fs.verify_and_move(src_fp, ID)
433+ self.assertEqual(
434+ str(cm.exception),
435+ 'expected {!r}, computed {!r}'.format(ID, ch.id)
436+ )
437+ self.assertFalse(fs.exists(ID))
438+
439+ def test_hash_and_move(self):
440+ tmp = TempDir()
441+ fs = filestore.FileStore(tmp.dir)
442+
443+ src = path.join(fs.tmp, 'foo.mov')
444+ write_sample_file(src)
445+ src_fp = open(src, 'rb')
446+ self.assertFalse(fs.exists(ID))
447+ self.assertEqual(fs.hash_and_move(src_fp), CH)
448+ self.assertTrue(fs.exists(ID))
449+ self.assertEqual(fs.verify(ID), CH)
450+
451 def test_import_file(self):
452 tmp = TempDir()
453 src = tmp.join('movie.mov')

Subscribers

People subscribed via source and target branches