Merge lp:~jderose/filestore/helpers into lp:filestore

Proposed by Jason Gerard DeRose
Status: Merged
Merged at revision: 190
Proposed branch: lp:~jderose/filestore/helpers
Merge into: lp:filestore
Diff against target: 453 lines (+251/-50)
2 files modified
filestore.py (+95/-42)
test_filestore.py (+156/-8)
To merge this branch: bzr merge lp:~jderose/filestore/helpers
Reviewer Review Type Date Requested Status
David Jordan Approve
Review via email: mp+74932@code.launchpad.net

Description of the change

This merge:

* Adds 3 new highish-level FileStore methods:

  FileStore.content_md5(_id) - compute a known correct MD5 hash (for uploading to S3 using boto)

  FileStore.verify_and_move(tmp_fp, _id) - check content hash of a file in .dmedia/partial/, and if correct, move the file into its canonical location. This replaces an equivalent method of the legacy FileStore that is used by the S3 and BitTorrent backends.

  FileStore.hash_and_move(tmp_fp) - calculate content hash of a file in .dmedia/tmp/, and then move the file into its canonical location. This replaces an equivalent method of the legacy FileStore that is used by the transcoder.

* Makes the use of the `Leaf` namedtuple consistent. Experience has shown the code is simpler and more robust if you keep track of the position in the file (leaf_index) a given chunk of leaf_data corresponds to. As of several revisions ago, when a leaf is read by reader() or batch_reader(), the index and data are bundled together right there in a Leaf(index, data) namedtuple. But this merge fixes some inconsistencies higher up:

  - Consumers of `Leaf` no longer unpack it into (i, data)

  - Consumers of `Leaf` yield/return the exact `Leaf` rather than (i, data)

  - Hasher.update() has been renamed to the clearer Hasher.hash_leaf()

  - Hasher.hash_leaf() now takes a `Leaf` instead of `bytes`

  - Hasher is now more robust as it makes sure the Leaf.index from the readers matches up with its expected leaf_index

To post a comment you must log in.
Revision history for this message
David Jordan (dmj726) wrote :

Looks a lot simpler. Approved.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'filestore.py'
--- filestore.py 2011-09-11 17:13:58 +0000
+++ filestore.py 2011-09-12 03:19:24 +0000
@@ -298,7 +298,8 @@
298import tempfile298import tempfile
299import stat299import stat
300from subprocess import check_call, CalledProcessError300from subprocess import check_call, CalledProcessError
301from base64 import b32encode, b32decode301from base64 import b32encode, b32decode, b64encode
302import hashlib
302from threading import Thread303from threading import Thread
303from queue import Queue304from queue import Queue
304from collections import namedtuple305from collections import namedtuple
@@ -724,25 +725,26 @@
724 self.array = bytearray()725 self.array = bytearray()
725 self.closed = False726 self.closed = False
726727
727 def update(self, leaf):728 def hash_leaf(self, leaf):
728 assert isinstance(leaf, bytes)729 if not isinstance(leaf, Leaf):
729 assert 1 <= len(leaf) <= LEAF_SIZE730 raise TypeError(TYPE_ERROR.format('leaf', Leaf, type(leaf), leaf))
730 assert not self.closed731 if self.closed:
731 if len(leaf) < LEAF_SIZE:732 raise Exception('Cannot call Hasher.hash_leaf() when Hasher.closed')
733 if leaf.index != self.leaf_index:
734 raise Exception('Expected leaf.index {}, got {}'.format(
735 self.leaf_index, leaf.index)
736 )
737
738 if len(leaf.data) < LEAF_SIZE:
732 self.closed = True739 self.closed = True
733740 leaf_hash = hash_leaf(leaf.index, leaf.data)
734 leaf_hash = hash_leaf(self.leaf_index, leaf)
735 self.array.extend(leaf_hash)741 self.array.extend(leaf_hash)
736 self.file_size += len(leaf)742 self.file_size += len(leaf.data)
737
738 low = self.leaf_index * LEAF_SIZE + 1
739 high = (self.leaf_index + 1) * LEAF_SIZE
740 assert low <= self.file_size <= high
741
742 self.leaf_index += 1743 self.leaf_index += 1
743 return leaf_hash744 return leaf_hash
744745
745 def content_hash(self):746 def content_hash(self):
747 self.closed = True
746 leaf_hashes = bytes(self.array)748 leaf_hashes = bytes(self.array)
747 return ContentHash(749 return ContentHash(
748 hash_root(self.file_size, leaf_hashes),750 hash_root(self.file_size, leaf_hashes),
@@ -824,14 +826,14 @@
824 assert isinstance(src_fp, (io.BufferedReader, io.BufferedRandom))826 assert isinstance(src_fp, (io.BufferedReader, io.BufferedRandom))
825 assert src_fp.mode in ('rb', 'rb+')827 assert src_fp.mode in ('rb', 'rb+')
826 src_fp.seek(0)828 src_fp.seek(0)
827 i = 0829 index = 0
828 while True:830 while True:
829 data = src_fp.read(LEAF_SIZE)831 data = src_fp.read(LEAF_SIZE)
830 if not data:832 if not data:
831 queue.put(None)833 queue.put(None)
832 break834 break
833 queue.put(Leaf(i, data))835 queue.put(Leaf(index, data))
834 i += 1836 index += 1
835837
836838
837def batch_reader(batch, queue):839def batch_reader(batch, queue):
@@ -890,8 +892,7 @@
890 """892 """
891 Iterate through leaves in *src_fp*, reading in a separate thread.893 Iterate through leaves in *src_fp*, reading in a separate thread.
892894
893 The function yields a ``(leaf_index, leaf_data)`` tuple for each leaf in895 The function yields a `Leaf` namedtuple for each leaf in *src_fp*.
894 *src_fp*.
895896
896 Many operations in the `FileStore` involve a loop where an 8 MiB leaf is897 Many operations in the `FileStore` involve a loop where an 8 MiB leaf is
897 read, then hashed, then possibly written to disk. As the hashing898 read, then hashed, then possibly written to disk. As the hashing
@@ -904,8 +905,8 @@
904 >>> def example():905 >>> def example():
905 ... h = Hasher()906 ... h = Hasher()
906 ... src_fp = open('/my/file', 'rb')907 ... src_fp = open('/my/file', 'rb')
907 ... for (leaf_index, leaf) in reader_iter(src_fp):908 ... for leaf in reader_iter(src_fp):
908 ... h.update(leaf)909 ... h.hash_leaf(leaf)
909 ...910 ...
910911
911 The thread target is the `reader()` function.912 The thread target is the `reader()` function.
@@ -987,7 +988,7 @@
987 leaf = q.get()988 leaf = q.get()
988 if leaf is EndFile:989 if leaf is EndFile:
989 break990 break
990 h.update(leaf.data)991 h.hash_leaf(leaf)
991 for tmp_fp in temps:992 for tmp_fp in temps:
992 tmp_fp.write(leaf.data)993 tmp_fp.write(leaf.data)
993 ch = h.content_hash()994 ch = h.content_hash()
@@ -1013,10 +1014,10 @@
1013 :param dst_fp: optional file opened in ``'wb'`` mode1014 :param dst_fp: optional file opened in ``'wb'`` mode
1014 """1015 """
1015 hasher = Hasher()1016 hasher = Hasher()
1016 for (i, leaf) in reader_iter(src_fp):1017 for leaf in reader_iter(src_fp):
1017 hasher.update(leaf)1018 hasher.hash_leaf(leaf)
1018 if dst_fp:1019 if dst_fp:
1019 dst_fp.write(leaf)1020 dst_fp.write(leaf.data)
1020 return hasher.content_hash()1021 return hasher.content_hash()
10211022
10221023
@@ -1296,37 +1297,61 @@
1296 if size != file_size:1297 if size != file_size:
1297 corrupt = self.move_to_corrupt(src_fp, _id)1298 corrupt = self.move_to_corrupt(src_fp, _id)
1298 raise SizeIntegrityError(self.parent, _id, file_size, size)1299 raise SizeIntegrityError(self.parent, _id, file_size, size)
1299 for (i, leaf) in reader_iter(src_fp):1300 for leaf in reader_iter(src_fp):
1300 got = hash_leaf(i, leaf)1301 got = hash_leaf(leaf.index, leaf.data)
1301 expected = get_leaf_hash(leaf_hashes, i)1302 expected = get_leaf_hash(leaf_hashes, leaf.index)
1302 if got != expected:1303 if got != expected:
1303 self.move_to_corrupt(src_fp, _id)1304 self.move_to_corrupt(src_fp, _id)
1304 raise LeafIntegrityError(self.parent, _id, i, expected, got)1305 raise LeafIntegrityError(
1305 yield (i, leaf)1306 self.parent, _id, leaf.index, expected, got
1306 assert get_leaf_hash(leaf_hashes, i + 1) == b''1307 )
1308 yield leaf
1309 assert get_leaf_hash(leaf_hashes, leaf.index + 1) == b''
13071310
1308 def verify_iter2(self, _id):1311 def verify_iter2(self, _id):
1309 """1312 """
1310 Yield each leaf as it's read, verifying file integrity after final leaf.1313 Yield each leaf as it's read, verifying file integrity after final leaf.
13111314
1312 This method will yield ``(leaf_index, leaf_data)`` tuple for each leaf1315 This method will yield a `Leaf` namedtuple for each leaf in the file
1313 in the file identified by *_id*. Use this method with care as the1316 identified by *_id*. Use this method with care as the integrity of the
1314 integrity of the leaves is not known till after the last leaf has been1317 leaves is not known till after the last leaf has been yielded.
1315 yielded.
13161318
1317 This method is similar to `FileStore.verify_iter()` except only the1319 This method is similar to `FileStore.verify_iter()` except only the
1318 *_id* is needed.1320 *_id* is needed.
1319 """1321 """
1320 src_fp = self.open(_id)1322 src_fp = self.open(_id)
1321 h = Hasher()1323 h = Hasher()
1322 for (i, leaf) in reader_iter(src_fp):1324 for leaf in reader_iter(src_fp):
1323 h.update(leaf)1325 h.hash_leaf(leaf)
1324 yield (i, leaf)1326 yield leaf
1325 c = h.content_hash()1327 c = h.content_hash()
1326 if c.id != _id:1328 if c.id != _id:
1327 self.move_to_corrupt(src_fp, _id)1329 self.move_to_corrupt(src_fp, _id)
1328 raise FileIntegrityError(self.parent, _id, c.id)1330 raise FileIntegrityError(self.parent, _id, c.id)
13291331
1332 def content_md5(self, _id):
1333 """
1334 Compute md5 hash of the file with *_id* for use in Content-MD5 header.
1335
1336 For example:
1337
1338 >>> fs = FileStore('/home/jderose') #doctest: +SKIP
1339 >>> fs.content_md5('YDDL5ROVABZP4NBSJPC3HUQDVDAGAP5L26YFXD3UR6N5OLVN') #doctest: +SKIP
1340 ('99ca2a74521ad7825768bbfe7fe0dc49', 'mcoqdFIa14JXaLv+f+DcSQ==')
1341
1342 This method guarantees the correct md5 hash is computed for the file
1343 with *_id* because it verifies the file as it computes the md5 hash.
1344 Note that you do not have this guarantee if you simply opened the file
1345 with `FileStore.open()` and computed the md5 hash that way.
1346
1347 If you need the md5 hash for use with boto when uploading to S3, use
1348 this method rather than letting boto compute the md5 hash itself.
1349 """
1350 md5 = hashlib.md5()
1351 for leaf in self.verify_iter2(_id):
1352 md5.update(leaf.data)
1353 return (md5.hexdigest(), b64encode(md5.digest()).decode('utf-8'))
1354
1330 def remove(self, _id):1355 def remove(self, _id):
1331 """1356 """
1332 Delete file with *_id* from underlying filesystem.1357 Delete file with *_id* from underlying filesystem.
@@ -1445,7 +1470,7 @@
1445 """1470 """
1446 Move a file from its canonical location to its corrupt location.1471 Move a file from its canonical location to its corrupt location.
14471472
1448 While a file is found to be corrupt (meaning the computed content hash1473 When a file is found to be corrupt (meaning the computed content hash
1449 doesn't match the expected content hash), it is moved from its canonical1474 doesn't match the expected content hash), it is moved from its canonical
1450 location to a special corrupt location.1475 location to a special corrupt location.
14511476
@@ -1476,6 +1501,34 @@
1476 src_fp.close()1501 src_fp.close()
1477 return dst1502 return dst
14781503
1504 def verify_and_move(self, tmp_fp, _id):
1505 allowed = (io.BufferedReader, io.BufferedRandom)
1506 if not isinstance(tmp_fp, allowed):
1507 raise TypeError(
1508 TYPE_ERROR.format('tmp_fp', allowed, type(tmp_fp), tmp_fp)
1509 )
1510 if tmp_fp.name != self.partial_path(_id):
1511 raise ValueError(
1512 'bad partial_path() for {!r}: {!r}'.format(_id, tmp_fp.name)
1513 )
1514 ch = hash_fp(tmp_fp)
1515 if ch.id != _id:
1516 raise ValueError(
1517 'expected {!r}, computed {!r}'.format(_id, ch.id)
1518 )
1519 self.move_to_canonical(tmp_fp, _id)
1520 return ch
1521
1522 def hash_and_move(self, tmp_fp):
1523 allowed = (io.BufferedReader, io.BufferedRandom)
1524 if not isinstance(tmp_fp, allowed):
1525 raise TypeError(
1526 TYPE_ERROR.format('tmp_fp', allowed, type(tmp_fp), tmp_fp)
1527 )
1528 ch = hash_fp(tmp_fp)
1529 self.move_to_canonical(tmp_fp, ch.id)
1530 return ch
1531
1479 def import_file(self, src_fp):1532 def import_file(self, src_fp):
1480 """1533 """
1481 Atomically copy open file *src_fp* into this filestore.1534 Atomically copy open file *src_fp* into this filestore.
@@ -1524,10 +1577,10 @@
1524 size = os.fstat(src_fp.fileno()).st_size1577 size = os.fstat(src_fp.fileno()).st_size
1525 temps = [fs.allocate_tmp(size) for fs in filestores]1578 temps = [fs.allocate_tmp(size) for fs in filestores]
1526 h = Hasher()1579 h = Hasher()
1527 for (i, leaf) in reader_iter(src_fp):1580 for leaf in reader_iter(src_fp):
1528 h.update(leaf)1581 h.hash_leaf(leaf)
1529 for tmp_fp in temps:1582 for tmp_fp in temps:
1530 tmp_fp.write(leaf)1583 tmp_fp.write(leaf.data)
1531 c = h.content_hash()1584 c = h.content_hash()
1532 if c.id != _id:1585 if c.id != _id:
1533 self.move_to_corrupt(src_fp, _id)1586 self.move_to_corrupt(src_fp, _id)
15341587
=== modified file 'test_filestore.py'
--- test_filestore.py 2011-09-11 09:13:31 +0000
+++ test_filestore.py 2011-09-12 03:19:24 +0000
@@ -1052,30 +1052,76 @@
1052 self.assertEqual(h.array, b'')1052 self.assertEqual(h.array, b'')
1053 self.assertFalse(h.closed)1053 self.assertFalse(h.closed)
10541054
1055 def test_update(self):1055 def test_hash_leaf(self):
1056 h = filestore.Hasher()1056 # Test with bad leaf type
10571057 h = filestore.Hasher()
1058 self.assertEqual(h.update(LEAVES[0]), LEAF_HASHES[0:30])1058 with self.assertRaises(TypeError) as cm:
1059 h.hash_leaf(b'nope')
1060 self.assertEqual(
1061 str(cm.exception),
1062 TYPE_ERROR.format('leaf', filestore.Leaf, bytes, b'nope')
1063 )
1064
1065 # We'll use these below
1066 leaf0 = filestore.Leaf(0, LEAVES[0])
1067 leaf1 = filestore.Leaf(1, LEAVES[1])
1068 leaf2 = filestore.Leaf(2, LEAVES[2])
1069
1070 # Test when closed
1071 h = filestore.Hasher()
1072 h.closed = True
1073 with self.assertRaises(Exception) as cm:
1074 h.hash_leaf(leaf0)
1075 self.assertEqual(
1076 str(cm.exception),
1077 'Cannot call Hasher.hash_leaf() when Hasher.closed'
1078 )
1079
1080 # Test when leaf_index is wrong
1081 h = filestore.Hasher()
1082 h.leaf_index = 1
1083 with self.assertRaises(Exception) as cm:
1084 h.hash_leaf(leaf0)
1085 self.assertEqual(
1086 str(cm.exception),
1087 'Expected leaf.index 1, got 0'
1088 )
1089
1090 # Test when it's all good
1091 h = filestore.Hasher()
1092
1093 self.assertEqual(h.hash_leaf(leaf0), LEAF_HASHES[0:30])
1059 self.assertEqual(h.leaf_index, 1)1094 self.assertEqual(h.leaf_index, 1)
1060 self.assertEqual(h.file_size, filestore.LEAF_SIZE)1095 self.assertEqual(h.file_size, filestore.LEAF_SIZE)
1061 self.assertFalse(h.closed)1096 self.assertFalse(h.closed)
10621097
1063 self.assertEqual(h.update(LEAVES[1]), LEAF_HASHES[30:60])1098 self.assertEqual(h.hash_leaf(leaf1), LEAF_HASHES[30:60])
1064 self.assertEqual(h.leaf_index, 2)1099 self.assertEqual(h.leaf_index, 2)
1065 self.assertEqual(h.file_size, filestore.LEAF_SIZE * 2)1100 self.assertEqual(h.file_size, filestore.LEAF_SIZE * 2)
1066 self.assertFalse(h.closed)1101 self.assertFalse(h.closed)
10671102
1068 self.assertEqual(h.update(LEAVES[2]), LEAF_HASHES[60:90])1103 self.assertEqual(h.hash_leaf(leaf2), LEAF_HASHES[60:90])
1069 self.assertEqual(h.leaf_index, 3)1104 self.assertEqual(h.leaf_index, 3)
1070 self.assertEqual(h.file_size, sum(len(l) for l in LEAVES))1105 self.assertEqual(h.file_size, sum(len(l) for l in LEAVES))
1071 self.assertTrue(h.closed)1106 self.assertTrue(h.closed)
10721107
1073 def test_content_hash(self):1108 def test_content_hash(self):
1109 leaf0 = filestore.Leaf(0, LEAVES[0])
1110 leaf1 = filestore.Leaf(1, LEAVES[1])
1111 leaf2 = filestore.Leaf(2, LEAVES[2])
1112
1074 h = filestore.Hasher()1113 h = filestore.Hasher()
1075 for l in LEAVES:1114 for leaf in (leaf0, leaf1, leaf2):
1076 h.update(l)1115 h.hash_leaf(leaf)
1077 self.assertEqual(h.content_hash(), CH)1116 self.assertEqual(h.content_hash(), CH)
10781117
1118 # Test that Hasher.content_hash() sets closed = True
1119 h = filestore.Hasher()
1120 h.hash_leaf(leaf0)
1121 self.assertFalse(h.closed)
1122 h.content_hash()
1123 self.assertTrue(h.closed)
1124
10791125
1080class TestFileStore(TestCase):1126class TestFileStore(TestCase):
1081 def test_init(self):1127 def test_init(self):
@@ -1671,6 +1717,50 @@
1671 self.assertFalse(path.exists(canonical))1717 self.assertFalse(path.exists(canonical))
1672 self.assertTrue(path.isfile(corrupt))1718 self.assertTrue(path.isfile(corrupt))
16731719
1720 def test_content_md5(self):
1721 tmp = TempDir()
1722 fs = filestore.FileStore(tmp.dir)
1723
1724 canonical = fs.path(ID)
1725 corrupt = fs.corrupt_path(ID)
1726
1727 # File doesn't exist
1728 with self.assertRaises(IOError) as cm:
1729 md5 = fs.content_md5(ID)
1730 self.assertEqual(cm.exception.errno, 2)
1731
1732 # File exists:
1733 fp = open(canonical, 'wb')
1734 for leaf in LEAVES:
1735 fp.write(leaf)
1736 fp.close()
1737
1738 self.assertEqual(
1739 fs.content_md5(ID),
1740 ('99ca2a74521ad7825768bbfe7fe0dc49', 'mcoqdFIa14JXaLv+f+DcSQ==')
1741 )
1742
1743 # File exists and is corrupted
1744 fp = open(canonical, 'wb')
1745 for leaf in LEAVES:
1746 fp.write(leaf)
1747 fp.write(b'F')
1748 fp.close()
1749 c = filestore.hash_fp(open(canonical, 'rb'))
1750 self.assertNotEqual(c.id, ID)
1751
1752 self.assertTrue(path.isfile(canonical))
1753 self.assertFalse(path.exists(corrupt))
1754
1755 with self.assertRaises(filestore.FileIntegrityError) as cm:
1756 md5 = fs.content_md5(ID)
1757 self.assertEqual(cm.exception.id, ID)
1758 self.assertEqual(cm.exception.parent, tmp.dir)
1759 self.assertEqual(cm.exception.bad_id, c.id)
1760
1761 self.assertFalse(path.exists(canonical))
1762 self.assertTrue(path.isfile(corrupt))
1763
1674 def test_remove(self):1764 def test_remove(self):
1675 tmp = TempDir()1765 tmp = TempDir()
1676 fs = filestore.FileStore(tmp.dir)1766 fs = filestore.FileStore(tmp.dir)
@@ -1890,6 +1980,64 @@
1890 self.assertTrue(path.isfile(corrupt))1980 self.assertTrue(path.isfile(corrupt))
1891 self.assertEqual(open(corrupt, 'rb').read(), b'yup')1981 self.assertEqual(open(corrupt, 'rb').read(), b'yup')
18921982
1983 def test_verify_and_move(self):
1984 # Test when it's all good
1985 tmp = TempDir()
1986 fs = filestore.FileStore(tmp.dir)
1987 src = fs.partial_path(ID)
1988 write_sample_file(src)
1989 src_fp = open(src, 'rb')
1990 self.assertFalse(fs.exists(ID))
1991 self.assertEqual(fs.verify_and_move(src_fp, ID), CH)
1992 self.assertTrue(fs.exists(ID))
1993 self.assertEqual(fs.verify(ID), CH)
1994
1995 # Test when tmp_fp.name != partial_path(_id)
1996 tmp = TempDir()
1997 fs = filestore.FileStore(tmp.dir)
1998 src = path.join(fs.tmp, 'foo.mov')
1999 write_sample_file(src)
2000 src_fp = open(src, 'rb')
2001 with self.assertRaises(ValueError) as cm:
2002 fs.verify_and_move(src_fp, ID)
2003 self.assertEqual(
2004 str(cm.exception),
2005 'bad partial_path() for {!r}: {!r}'.format(ID, src)
2006 )
2007 self.assertFalse(fs.exists(ID))
2008
2009 # Test when partial has wrong content
2010 tmp = TempDir()
2011 fs = filestore.FileStore(tmp.dir)
2012 src = fs.partial_path(ID)
2013 src_fp = open(src, 'wb')
2014 for leaf in LEAVES:
2015 src_fp.write(leaf)
2016 src_fp.write(b'F')
2017 src_fp.close()
2018 ch = filestore.hash_fp(open(src, 'rb'))
2019 self.assertNotEqual(ch.id, ID)
2020 src_fp = open(src, 'rb')
2021 with self.assertRaises(ValueError) as cm:
2022 fs.verify_and_move(src_fp, ID)
2023 self.assertEqual(
2024 str(cm.exception),
2025 'expected {!r}, computed {!r}'.format(ID, ch.id)
2026 )
2027 self.assertFalse(fs.exists(ID))
2028
2029 def test_hash_and_move(self):
2030 tmp = TempDir()
2031 fs = filestore.FileStore(tmp.dir)
2032
2033 src = path.join(fs.tmp, 'foo.mov')
2034 write_sample_file(src)
2035 src_fp = open(src, 'rb')
2036 self.assertFalse(fs.exists(ID))
2037 self.assertEqual(fs.hash_and_move(src_fp), CH)
2038 self.assertTrue(fs.exists(ID))
2039 self.assertEqual(fs.verify(ID), CH)
2040
1893 def test_import_file(self):2041 def test_import_file(self):
1894 tmp = TempDir()2042 tmp = TempDir()
1895 src = tmp.join('movie.mov')2043 src = tmp.join('movie.mov')

Subscribers

People subscribed via source and target branches