Merge lp:~jderose/microfiber/db-dump into lp:microfiber

Proposed by Jason Gerard DeRose
Status: Merged
Merged at revision: 133
Proposed branch: lp:~jderose/microfiber/db-dump
Merge into: lp:microfiber
Diff against target: 368 lines (+260/-26)
3 files modified
doc/microfiber.rst (+29/-1)
microfiber.py (+107/-25)
test_microfiber.py (+124/-0)
To merge this branch: bzr merge lp:~jderose/microfiber/db-dump
Reviewer Review Type Date Requested Status
microfiber dev Pending
Review via email: mp+119829@code.launchpad.net

Description of the change

Use the revised Database.dump() method like this:

>>> db.dump('foo.json')

Or gzip-compress the dump:

>>> db.dump('foo.json.gz')

Like before, doc['_rev'] is deleted before dumping to the file. However, the attachments kwarg was removed, and now we only dump *without* the attachments. Even the stub doc['_attachments'] gets deleted when present. We'll probably add some more flexibility here later, but for now it suits the needs of Novacut and Dmedia.

Also, two big performance improvements were made:

1) We request docs 50 at a time (roughly 4x improvement)

2) We make CouchDB requests in a separate thread (roughly 2x improvement after above)

To post a comment you must log in.
Revision history for this message
David Jordan (dmj726) wrote :

Approved.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'doc/microfiber.rst'
2--- doc/microfiber.rst 2012-08-15 22:11:15 +0000
3+++ doc/microfiber.rst 2012-08-16 04:26:20 +0000
4@@ -308,7 +308,7 @@
5
6
7 .. class:: Database(name, env='http://localhost:5984/')
8-
9+
10 Makes requests relative to a CouchDB database URL.
11
12 Create a :class:`Database` like this:
13@@ -424,6 +424,34 @@
14 *Note:* for subtle reasons that take a while to explain, you probably
15 don't want to use this method. Instead use
16 :meth:`Database.save_many()`.
17+
18+ .. method:: dump(filename)
19+
20+ Dump this database to regular JSON file *filename*.
21+
22+ For example:
23+
24+ >>> db = Database('foo') #doctest: +SKIP
25+ >>> db.dump('foo.json') #doctest: +SKIP
26+
27+ Or if *filename* ends with ``'.json.gz'``, the file will be
28+ gzip-compressed as it is written:
29+
30+ >>> db.dump('foo.json.gz') #doctest: +SKIP
31+
32+ CouchDB is a bit awkward in that its API doesn't offer a nice way to
33+ make a request whose response is suitable for writing directly to a
34+ file, without decoding/encoding. It would be nice if that dump could
35+ be loaded directly from the file as well. One of the biggest issues is
36+ that a dump really needs to have doc['_rev'] removed.
37+
38+ This method is a compromise on many fronts, but it was made with these
39+ priorities:
40+
41+ 1. Readability of the dumped JSON file
42+
43+ 2. High performance and low memory usage, despite the fact that
44+ we must encode and decode each doc
45
46
47
48
49=== modified file 'microfiber.py'
50--- microfiber.py 2012-08-15 22:11:15 +0000
51+++ microfiber.py 2012-08-16 04:26:20 +0000
52@@ -40,15 +40,17 @@
53 """
54
55 from os import urandom
56-from io import BufferedReader
57+from io import BufferedReader, TextIOWrapper
58 from base64 import b32encode, b64encode
59 import json
60+from gzip import GzipFile
61 import time
62 from hashlib import sha1
63 import hmac
64 from urllib.parse import urlparse, urlencode, quote_plus
65 from http.client import HTTPConnection, HTTPSConnection, BadStatusLine
66 import threading
67+from queue import Queue
68 import math
69
70
71@@ -413,27 +415,74 @@
72 super().__init__(msg.format(count))
73
74
75+def _start_thread(target, *args):
76+ thread = threading.Thread(target=target, args=args)
77+ thread.daemon = True
78+ thread.start()
79+ return thread
80+
81+
82+class SmartQueue(Queue):
83+ """
84+ Queue with custom get() that raises exception instances from the queue.
85+ """
86+
87+ def get(self, block=True, timeout=None):
88+ item = super().get(block, timeout)
89+ if isinstance(item, Exception):
90+ raise item
91+ return item
92+
93+
94+def _fakelist_worker(rows, db, queue):
95+ try:
96+ for doc_ids in id_slice_iter(rows, 50):
97+ queue.put(db.get_many(doc_ids))
98+ queue.put(None)
99+ except Exception as e:
100+ queue.put(e)
101+
102+
103 class FakeList(list):
104- __slots__ = ('_count', '_iterable')
105-
106- def __init__(self, count, iterable):
107+ """
108+ Trick ``json.dump()`` into doing memory-efficient incremental encoding.
109+
110+ This class is a hack to allow `Database.dump()` to dump a large database
111+ while keeping the memory usage constant.
112+
113+ It also provides two hacks to improve the performance of `Database.dump()`:
114+
115+ 1. Documents are retrieved 50 at a time using `Database.get_many()`
116+
117+ 2. The CouchDB requests are made in a separate thread so `json.dump()`
118+ can be busy doing work while we're waiting for a response
119+ """
120+
121+ __slots__ = ('_rows', '_db')
122+
123+ def __init__(self, rows, db):
124 super().__init__()
125- self._count = count
126- self._iterable = iterable
127+ self._rows = rows
128+ self._db = db
129
130 def __len__(self):
131- return self._count
132+ return len(self._rows)
133
134 def __iter__(self):
135- for doc in self._iterable:
136- yield doc
137-
138-
139-def iter_all_docs(rows, db, attachments=True):
140- for r in rows:
141- doc = db.get(r['id'], rev=r['value']['rev'], attachments=attachments)
142- del doc['_rev']
143- yield doc
144+ queue = SmartQueue(2)
145+ thread = _start_thread(_fakelist_worker, self._rows, self._db, queue)
146+ while True:
147+ docs = queue.get()
148+ if docs is None:
149+ break
150+ for doc in docs:
151+ del doc['_rev']
152+ try:
153+ del doc['_attachments']
154+ except KeyError:
155+ pass
156+ yield doc
157+ thread.join() # Make sure reader() terminates
158
159
160 class CouchBase(object):
161@@ -876,12 +925,45 @@
162 options['reduce'] = False
163 return self.get('_design', design, '_view', view, **options)
164
165- def dump(self, fp, attachments=True):
166- rows = self.get('_all_docs')['rows']
167- iterable = iter_all_docs(rows, self, attachments)
168- docs = FakeList(len(rows), iterable)
169- json.dump({'docs': docs}, fp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ': '))
170-
171- def load(self, fp):
172- return self.post(fp, '_bulk_docs')
173-
174+ def dump(self, filename):
175+ """
176+ Dump this database to regular JSON file *filename*.
177+
178+ For example:
179+
180+ >>> db = Database('foo') #doctest: +SKIP
181+ >>> db.dump('foo.json') #doctest: +SKIP
182+
183+ Or if *filename* ends with ``'.json.gz'``, the file will be
184+ gzip-compressed as it is written:
185+
186+ >>> db.dump('foo.json.gz') #doctest: +SKIP
187+
188+ CouchDB is a bit awkward in that its API doesn't offer a nice way to
189+ make a request whose response is suitable for writing directly to a
190+ file, without decoding/encoding. It would be nice if that dump could
191+ be loaded directly from the file as well. One of the biggest issues is
192+ that a dump really needs to have doc['_rev'] removed.
193+
194+ This method is a compromise on many fronts, but it was made with these
195+ priorities:
196+
197+ 1. Readability of the dumped JSON file
198+
199+ 2. High performance and low memory usage, despite the fact that
200+ we must encode and decode each doc
201+ """
202+ if filename.lower().endswith('.json.gz'):
203+ _fp = open(filename, 'wb')
204+ fp = TextIOWrapper(GzipFile('docs.json', fileobj=_fp, mtime=1))
205+ else:
206+ fp = open(filename, 'w')
207+ rows = self.get('_all_docs', endkey='_')['rows']
208+ docs = FakeList(rows, self)
209+ json.dump(docs, fp,
210+ ensure_ascii=False,
211+ sort_keys=True,
212+ indent=4,
213+ separators=(',', ': '),
214+ )
215+
216
217=== modified file 'test_microfiber.py'
218--- test_microfiber.py 2012-08-15 22:11:15 +0000
219+++ test_microfiber.py 2012-08-16 04:26:20 +0000
220@@ -30,6 +30,7 @@
221 from base64 import b64encode, b64decode, b32encode, b32decode
222 from copy import deepcopy
223 import json
224+import gzip
225 import time
226 import io
227 import tempfile
228@@ -58,6 +59,26 @@
229 B32ALPHABET = frozenset('234567ABCDEFGHIJKLMNOPQRSTUVWXYZ')
230
231
232+# A sample view from Dmedia:
233+doc_type = """
234+function(doc) {
235+ emit(doc.type, null);
236+}
237+"""
238+doc_time = """
239+function(doc) {
240+ emit(doc.time, null);
241+}
242+"""
243+doc_design = {
244+ '_id': '_design/doc',
245+ 'views': {
246+ 'type': {'map': doc_type, 'reduce': '_count'},
247+ 'time': {'map': doc_time},
248+ },
249+}
250+
251+
252 def is_microfiber_id(_id):
253 assert isinstance(_id, str)
254 return (
255@@ -1014,6 +1035,52 @@
256 self.env = None
257
258
259+class TestFakeList(LiveTestCase):
260+ def test_init(self):
261+ db = microfiber.Database('foo', self.env)
262+ self.assertTrue(db.ensure())
263+
264+ # Test when DB is empty
265+ rows = []
266+ fake = microfiber.FakeList(rows, db)
267+ self.assertIsInstance(fake, list)
268+ self.assertIs(fake._rows, rows)
269+ self.assertIs(fake._db, db)
270+ self.assertEqual(len(fake), 0)
271+ self.assertEqual(list(fake), [])
272+
273+ # Test when there are some docs
274+ ids = sorted(test_id() for i in range(201))
275+ orig = [
276+ {'_id': _id, 'hello': 'мир', 'welcome': 'все'}
277+ for _id in ids
278+ ]
279+ docs = deepcopy(orig)
280+ db.save_many(docs)
281+ rows = db.get('_all_docs')['rows']
282+ fake = microfiber.FakeList(rows, db)
283+ self.assertIsInstance(fake, list)
284+ self.assertIs(fake._rows, rows)
285+ self.assertIs(fake._db, db)
286+ self.assertEqual(len(fake), 201)
287+ self.assertEqual(list(fake), orig)
288+
289+ # Verify that _attachments get deleted
290+ for doc in docs:
291+ db.put_att('application/octet-stream', b'foobar', doc['_id'], 'baz',
292+ rev=doc['_rev']
293+ )
294+ for _id in ids:
295+ self.assertIn('_attachments', db.get(_id))
296+ rows = db.get('_all_docs')['rows']
297+ fake = microfiber.FakeList(rows, db)
298+ self.assertIsInstance(fake, list)
299+ self.assertIs(fake._rows, rows)
300+ self.assertIs(fake._db, db)
301+ self.assertEqual(len(fake), 201)
302+ self.assertEqual(list(fake), orig)
303+
304+
305 class TestCouchBaseLive(LiveTestCase):
306 klass = microfiber.CouchBase
307
308@@ -1676,3 +1743,60 @@
309 db.get_many([ids[17], nope, ids[18]]),
310 [docs[17], None, docs[18]]
311 )
312+
313+ def test_dump(self):
314+ db = microfiber.Database('foo', self.env)
315+ self.assertTrue(db.ensure())
316+ docs = [
317+ {'_id': test_id(), 'hello': 'мир', 'welcome': 'все'}
318+ for i in range(200)
319+ ]
320+ docs_s = microfiber.dumps(
321+ sorted(docs, key=lambda d: d['_id']),
322+ pretty=True
323+ )
324+ docs.append(deepcopy(doc_design))
325+ checksum = md5(docs_s.encode('utf-8')).hexdigest()
326+ db.save_many(docs)
327+
328+ # Test with .json
329+ dst = path.join(self.tmpcouch.paths.bzr, 'foo.json')
330+ db.dump(dst)
331+ self.assertEqual(open(dst, 'r').read(), docs_s)
332+ self.assertEqual(
333+ md5(open(dst, 'rb').read()).hexdigest(),
334+ checksum
335+ )
336+
337+ # Test with .json.gz
338+ dst = path.join(self.tmpcouch.paths.bzr, 'foo.json.gz')
339+ db.dump(dst)
340+ gz_checksum = md5(open(dst, 'rb').read()).hexdigest()
341+ self.assertEqual(
342+ md5(gzip.GzipFile(dst, 'rb').read()).hexdigest(),
343+ checksum
344+ )
345+
346+ # Test that timestamp doesn't change gz_checksum
347+ time.sleep(2)
348+ db.dump(dst)
349+ self.assertEqual(
350+ md5(open(dst, 'rb').read()).hexdigest(),
351+ gz_checksum
352+ )
353+
354+ # Test that filename doesn't change gz_checksum
355+ dst = path.join(self.tmpcouch.paths.bzr, 'bar.json.gz')
356+ db.dump(dst)
357+ self.assertEqual(
358+ md5(open(dst, 'rb').read()).hexdigest(),
359+ gz_checksum
360+ )
361+
362+ # Make sure .JSON.GZ also works, that case is ignored
363+ dst = path.join(self.tmpcouch.paths.bzr, 'FOO.JSON.GZ')
364+ db.dump(dst)
365+ self.assertEqual(
366+ md5(open(dst, 'rb').read()).hexdigest(),
367+ gz_checksum
368+ )

Subscribers

People subscribed via source and target branches