Merge lp:~jderose/filestore/callback into lp:filestore

Proposed by Jason Gerard DeRose
Status: Merged
Merged at revision: 230
Proposed branch: lp:~jderose/filestore/callback
Merge into: lp:filestore
Diff against target: 161 lines (+90/-1)
2 files modified
filestore.py (+18/-1)
test_filestore.py (+72/-0)
To merge this branch: bzr merge lp:~jderose/filestore/callback
Reviewer Review Type Date Requested Status
David Jordan Approve
Review via email: mp+84910@code.launchpad.net

Description of the change

This changes batch_import_iter() to take an optional keyword-only `callback` argument. The callback takes 2 arguments:

def callback(count, size):
    pass

That's the completed count, and the completed bytes. The callback is called at the end of processing each file, plus whenever the next 128 MiB (16 leaves) have been completed in a large file and at least one more leaf remains.

For example, say you have a batch with one file and the file is exactly 33 leaves in size (264 MiB). The callback would be called 3 times, like this:

callback(0, 128 * MiB)
callback(0, 256 * MiB)
callback(1, 264 * MiB)

Updating the progress every 128 MiB is probably about right as far as reassuring the user, but we should play with it a bit if it seems too infrequent or unnecessarily frequent.

To post a comment you must log in.
Revision history for this message
David Jordan (dmj726) wrote :

Looks good. I'm glad to see this UX issue fixed.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'filestore.py'
2--- filestore.py 2011-11-28 21:27:39 +0000
3+++ filestore.py 2011-12-08 09:31:33 +0000
4@@ -740,7 +740,7 @@
5 queue.put(e)
6
7
8-def batch_import_iter(batch, *filestores):
9+def batch_import_iter(batch, *filestores, callback=None):
10 """
11 Import the files in *batch* into one or more destination *filestores*.
12
13@@ -769,6 +769,7 @@
14 :param filestores: One or more `FileStore` instances in which to import the
15 files in *batch*
16 """
17+ # Validate args:
18 if not isinstance(batch, Batch):
19 raise TypeError(TYPE_ERROR.format('batch', Batch, type(batch), batch))
20 if not filestores:
21@@ -779,6 +780,13 @@
22 if not isinstance(fs, FileStore):
23 name = 'filestores[{}]'.format(i)
24 raise TypeError(TYPE_ERROR.format(name, FileStore, type(fs), fs))
25+ if not (callback is None or callable(callback)):
26+ raise TypeError('callback not callable: {!r}'.format(callback))
27+
28+ # Okay, time to work:
29+ if callback is not None:
30+ count = 0
31+ size = 0
32 q = SmartQueue(16)
33 thread = _start_thread(batch_reader, batch, q)
34 while True:
35@@ -788,6 +796,9 @@
36 if file.size == 0:
37 if q.get() is not EndFile:
38 raise Exception('file.size is 0, but not followed by EndFile')
39+ if callback is not None:
40+ count += 1
41+ callback(count, size)
42 yield BatchItem(file, None)
43 continue
44 temps = tuple(fs.allocate_tmp(file.size) for fs in filestores)
45@@ -796,6 +807,8 @@
46 leaf = q.get()
47 if leaf is EndFile:
48 break
49+ if callback is not None and leaf.index and leaf.index % 16 == 0:
50+ callback(count, size + h.file_size)
51 h.hash_leaf(leaf)
52 for tmp_fp in temps:
53 tmp_fp.write(leaf.data)
54@@ -805,6 +818,10 @@
55 for (i, fs) in enumerate(filestores):
56 tmp_fp = temps[i]
57 fs.move_to_canonical(tmp_fp, ch.id)
58+ if callback is not None:
59+ count += 1
60+ size += file.size
61+ callback(count, size)
62 yield BatchItem(file, ch)
63 thread.join() # Make sure batch_reader() terminates
64
65
66=== modified file 'test_filestore.py'
67--- test_filestore.py 2011-11-23 04:25:45 +0000
68+++ test_filestore.py 2011-12-08 09:31:33 +0000
69@@ -136,6 +136,14 @@
70 self.items.append(item)
71
72
73+class Callback:
74+ def __init__(self):
75+ self.calls = []
76+
77+ def __call__(self, *args):
78+ self.calls.append(args)
79+
80+
81 class TestSmartQueue(TestCase):
82 def test_get(self):
83 q = filestore.SmartQueue()
84@@ -1094,6 +1102,14 @@
85 str(cm.exception),
86 'batch_import_iter() takes at least one destination FileStore'
87 )
88+
89+ # Test with non-callable callback
90+ with self.assertRaises(TypeError) as cm:
91+ list(filestore.batch_import_iter(batch, fs1, fs2, callback='hello'))
92+ self.assertEqual(
93+ str(cm.exception),
94+ "callback not callable: 'hello'"
95+ )
96
97 # Test with wrong filestore type
98 with self.assertRaises(TypeError) as cm:
99@@ -1142,6 +1158,62 @@
100 '[Errno 13] Permission denied: {!r}'.format(src.name)
101 )
102
103+ # Test with callback and empty batch
104+ cb = Callback()
105+ batch = filestore.Batch(tuple(), 0, 0)
106+ tmp1 = TempDir()
107+ fs1 = filestore.FileStore(tmp1.dir)
108+ tmp2 = TempDir()
109+ fs2 = filestore.FileStore(tmp2.dir)
110+ self.assertEqual(
111+ list(filestore.batch_import_iter(batch, fs1, fs2, callback=cb)),
112+ []
113+ )
114+ self.assertEqual(list(fs1), [])
115+ self.assertEqual(list(fs2), [])
116+ self.assertEqual(cb.calls, [])
117+
118+ # Test with callback and non-empty batch
119+ tmp = TempDir()
120+ src = filestore.File(tmp.join('src.mov'), FILE_SIZE, 0)
121+ write_sample_file(src.name)
122+ empty1 = filestore.File(tmp.touch('empty1.mov'), 0, 0)
123+ empty2 = filestore.File(tmp.touch('empty2.mov'), 0, 0)
124+
125+ file_size = filestore.LEAF_SIZE * 33
126+ big = filestore.File(tmp.join('big.mov'), file_size, 0)
127+ data = b'a' * filestore.LEAF_SIZE
128+ fp = open(big.name, 'wb')
129+ for i in range(33):
130+ fp.write(data)
131+ fp.close()
132+ fp = open(big.name, 'rb')
133+ ch = filestore.hash_fp(fp)
134+
135+ batch = filestore.Batch((big, empty1, src, empty2), 0, 0)
136+ self.assertEqual(
137+ list(filestore.batch_import_iter(batch, fs1, fs2, callback=cb)),
138+ [
139+ (big, ch),
140+ (empty1, None),
141+ (src, CH),
142+ (empty2, None),
143+ ]
144+ )
145+ self.assertEqual(set(s.id for s in fs1), set([ID, ch.id]))
146+ self.assertEqual(set(s.id for s in fs2), set([ID, ch.id]))
147+ self.assertEqual(
148+ cb.calls,
149+ [
150+ (0, filestore.LEAF_SIZE * 16), # 128 MiB done
151+ (0, filestore.LEAF_SIZE * 32), # 256 MiB done
152+ (1, file_size),
153+ (2, file_size),
154+ (3, file_size + 20971520),
155+ (4, file_size + 20971520)
156+ ]
157+ )
158+
159 def test_hash_fp(self):
160 tmp = TempDir()
161 src = tmp.join('foo.mov')

Subscribers

People subscribed via source and target branches