Merge lp:~jderose/filestore/callback into lp:filestore

Proposed by Jason Gerard DeRose
Status: Merged
Merged at revision: 230
Proposed branch: lp:~jderose/filestore/callback
Merge into: lp:filestore
Diff against target: 161 lines (+90/-1)
2 files modified
filestore.py (+18/-1)
test_filestore.py (+72/-0)
To merge this branch: bzr merge lp:~jderose/filestore/callback
Reviewer Review Type Date Requested Status
David Jordan Approve
Review via email: mp+84910@code.launchpad.net

Description of the change

This changes batch_import_iter() to take an optional keyword-only `callback` argument. The callback takes 2 arguments:

def callback(count, size):
    pass

That's the completed count, and the completed bytes. The callback is called at the end of processing each file, plus whenever the next 128 MiB (16 leaves) have been completed in a large file and at least one more leaf remains.

For example, say you have a batch with one file and the file is exactly 33 leaves in size (264 MiB). The callback would be called 3 times, like this:

callback(0, 128 * MiB)
callback(0, 256 * MiB)
callback(1, 264 * MiB)

Updating the progress every 128 MiB is probably about right as far as reassuring the user, but we should play with it a bit if it seems too infrequent or unnecessarily frequent.

To post a comment you must log in.
Revision history for this message
David Jordan (dmj726) wrote :

Looks good. I'm glad to see this UX issue fixed.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'filestore.py'
--- filestore.py 2011-11-28 21:27:39 +0000
+++ filestore.py 2011-12-08 09:31:33 +0000
@@ -740,7 +740,7 @@
740 queue.put(e)740 queue.put(e)
741741
742742
743def batch_import_iter(batch, *filestores):743def batch_import_iter(batch, *filestores, callback=None):
744 """744 """
745 Import the files in *batch* into one or more destination *filestores*.745 Import the files in *batch* into one or more destination *filestores*.
746746
@@ -769,6 +769,7 @@
769 :param filestores: One or more `FileStore` instances in which to import the769 :param filestores: One or more `FileStore` instances in which to import the
770 files in *batch*770 files in *batch*
771 """771 """
772 # Validate args:
772 if not isinstance(batch, Batch):773 if not isinstance(batch, Batch):
773 raise TypeError(TYPE_ERROR.format('batch', Batch, type(batch), batch))774 raise TypeError(TYPE_ERROR.format('batch', Batch, type(batch), batch))
774 if not filestores:775 if not filestores:
@@ -779,6 +780,13 @@
779 if not isinstance(fs, FileStore):780 if not isinstance(fs, FileStore):
780 name = 'filestores[{}]'.format(i)781 name = 'filestores[{}]'.format(i)
781 raise TypeError(TYPE_ERROR.format(name, FileStore, type(fs), fs))782 raise TypeError(TYPE_ERROR.format(name, FileStore, type(fs), fs))
783 if not (callback is None or callable(callback)):
784 raise TypeError('callback not callable: {!r}'.format(callback))
785
786 # Okay, time to work:
787 if callback is not None:
788 count = 0
789 size = 0
782 q = SmartQueue(16)790 q = SmartQueue(16)
783 thread = _start_thread(batch_reader, batch, q)791 thread = _start_thread(batch_reader, batch, q)
784 while True:792 while True:
@@ -788,6 +796,9 @@
788 if file.size == 0:796 if file.size == 0:
789 if q.get() is not EndFile:797 if q.get() is not EndFile:
790 raise Exception('file.size is 0, but not followed by EndFile')798 raise Exception('file.size is 0, but not followed by EndFile')
799 if callback is not None:
800 count += 1
801 callback(count, size)
791 yield BatchItem(file, None)802 yield BatchItem(file, None)
792 continue803 continue
793 temps = tuple(fs.allocate_tmp(file.size) for fs in filestores)804 temps = tuple(fs.allocate_tmp(file.size) for fs in filestores)
@@ -796,6 +807,8 @@
796 leaf = q.get()807 leaf = q.get()
797 if leaf is EndFile:808 if leaf is EndFile:
798 break809 break
810 if callback is not None and leaf.index and leaf.index % 16 == 0:
811 callback(count, size + h.file_size)
799 h.hash_leaf(leaf)812 h.hash_leaf(leaf)
800 for tmp_fp in temps:813 for tmp_fp in temps:
801 tmp_fp.write(leaf.data)814 tmp_fp.write(leaf.data)
@@ -805,6 +818,10 @@
805 for (i, fs) in enumerate(filestores):818 for (i, fs) in enumerate(filestores):
806 tmp_fp = temps[i]819 tmp_fp = temps[i]
807 fs.move_to_canonical(tmp_fp, ch.id)820 fs.move_to_canonical(tmp_fp, ch.id)
821 if callback is not None:
822 count += 1
823 size += file.size
824 callback(count, size)
808 yield BatchItem(file, ch)825 yield BatchItem(file, ch)
809 thread.join() # Make sure batch_reader() terminates826 thread.join() # Make sure batch_reader() terminates
810827
811828
=== modified file 'test_filestore.py'
--- test_filestore.py 2011-11-23 04:25:45 +0000
+++ test_filestore.py 2011-12-08 09:31:33 +0000
@@ -136,6 +136,14 @@
136 self.items.append(item)136 self.items.append(item)
137137
138138
139class Callback:
140 def __init__(self):
141 self.calls = []
142
143 def __call__(self, *args):
144 self.calls.append(args)
145
146
139class TestSmartQueue(TestCase):147class TestSmartQueue(TestCase):
140 def test_get(self):148 def test_get(self):
141 q = filestore.SmartQueue()149 q = filestore.SmartQueue()
@@ -1094,6 +1102,14 @@
1094 str(cm.exception),1102 str(cm.exception),
1095 'batch_import_iter() takes at least one destination FileStore'1103 'batch_import_iter() takes at least one destination FileStore'
1096 )1104 )
1105
1106 # Test with non-callable callback
1107 with self.assertRaises(TypeError) as cm:
1108 list(filestore.batch_import_iter(batch, fs1, fs2, callback='hello'))
1109 self.assertEqual(
1110 str(cm.exception),
1111 "callback not callable: 'hello'"
1112 )
10971113
1098 # Test with wrong filestore type1114 # Test with wrong filestore type
1099 with self.assertRaises(TypeError) as cm:1115 with self.assertRaises(TypeError) as cm:
@@ -1142,6 +1158,62 @@
1142 '[Errno 13] Permission denied: {!r}'.format(src.name)1158 '[Errno 13] Permission denied: {!r}'.format(src.name)
1143 )1159 )
11441160
1161 # Test with callback and empty batch
1162 cb = Callback()
1163 batch = filestore.Batch(tuple(), 0, 0)
1164 tmp1 = TempDir()
1165 fs1 = filestore.FileStore(tmp1.dir)
1166 tmp2 = TempDir()
1167 fs2 = filestore.FileStore(tmp2.dir)
1168 self.assertEqual(
1169 list(filestore.batch_import_iter(batch, fs1, fs2, callback=cb)),
1170 []
1171 )
1172 self.assertEqual(list(fs1), [])
1173 self.assertEqual(list(fs2), [])
1174 self.assertEqual(cb.calls, [])
1175
1176 # Test with callback and non-empty batch
1177 tmp = TempDir()
1178 src = filestore.File(tmp.join('src.mov'), FILE_SIZE, 0)
1179 write_sample_file(src.name)
1180 empty1 = filestore.File(tmp.touch('empty1.mov'), 0, 0)
1181 empty2 = filestore.File(tmp.touch('empty2.mov'), 0, 0)
1182
1183 file_size = filestore.LEAF_SIZE * 33
1184 big = filestore.File(tmp.join('big.mov'), file_size, 0)
1185 data = b'a' * filestore.LEAF_SIZE
1186 fp = open(big.name, 'wb')
1187 for i in range(33):
1188 fp.write(data)
1189 fp.close()
1190 fp = open(big.name, 'rb')
1191 ch = filestore.hash_fp(fp)
1192
1193 batch = filestore.Batch((big, empty1, src, empty2), 0, 0)
1194 self.assertEqual(
1195 list(filestore.batch_import_iter(batch, fs1, fs2, callback=cb)),
1196 [
1197 (big, ch),
1198 (empty1, None),
1199 (src, CH),
1200 (empty2, None),
1201 ]
1202 )
1203 self.assertEqual(set(s.id for s in fs1), set([ID, ch.id]))
1204 self.assertEqual(set(s.id for s in fs2), set([ID, ch.id]))
1205 self.assertEqual(
1206 cb.calls,
1207 [
1208 (0, filestore.LEAF_SIZE * 16), # 128 MiB done
1209 (0, filestore.LEAF_SIZE * 32), # 256 MiB done
1210 (1, file_size),
1211 (2, file_size),
1212 (3, file_size + 20971520),
1213 (4, file_size + 20971520)
1214 ]
1215 )
1216
1145 def test_hash_fp(self):1217 def test_hash_fp(self):
1146 tmp = TempDir()1218 tmp = TempDir()
1147 src = tmp.join('foo.mov')1219 src = tmp.join('foo.mov')

Subscribers

People subscribed via source and target branches