Merge lp:~pedronis/u1db/push-in-generation-order into lp:u1db

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 155
Merged at revision: 148
Proposed branch: lp:~pedronis/u1db/push-in-generation-order
Merge into: lp:u1db
Prerequisite: lp:~pedronis/u1db/sync-simple-reorg-n-hide-details
Diff against target: 434 lines (+133/-70)
8 files modified
u1db/__init__.py (+12/-7)
u1db/remote/http_app.py (+3/-5)
u1db/remote/http_target.py (+5/-6)
u1db/sync.py (+9/-8)
u1db/tests/test_backends.py (+2/-3)
u1db/tests/test_http_app.py (+25/-8)
u1db/tests/test_remote_sync_target.py (+52/-9)
u1db/tests/test_sync.py (+25/-24)
To merge this branch: bzr merge lp:~pedronis/u1db/push-in-generation-order
Reviewer Review Type Date Requested Status
John A Meinel (community) Approve
Review via email: mp+85508@code.launchpad.net

Description of the change

send documents from source to target for sync in generation order, update incrementally the known generation from the source on the target

not sure how much sanity checking we want to do

To post a comment you must log in.
Revision history for this message
John A Meinel (jameinel) wrote :

10 + def sync_exchange(self, docs, from_replica_generations,
11 + from_replica_uid,
12 last_known_generation, return_doc_cb):

It seems like the from_replica_generations should be attached to the docs, rather than a separate list. We can do it this way if you think it makes sense, but generally I'm not a big fan of separate lists that need to be iterated in sync.

One of my complaints about the current put and then update generation is that it is taking 2 database locks for a single text insert.

Revision history for this message
Samuele Pedroni (pedronis) wrote :

I think I would prefer optimizing put_doc_if_newer + set_sync_generation in a later branch once we got the code right also for the target -> source direction

I'm still open to suggestions whether to use tuples (gen, doc) or store generation on the docs, to address the first comment. The second lists seemed the path of list resistance

153. By Samuele Pedroni

pass list of tuples [(Doc, gen)] to sync_exchange()

154. By Samuele Pedroni

add a failure and retry test, fix tests to use string contents

Revision history for this message
John A Meinel (jameinel) wrote :

+ docs_by_generation = zip(docs_to_send, (gen for _, gen in changes)

I'm pretty sure that get_docs doesn't guarantee the return order. We should instead do:

doc_id_to_generation = dict(changes)
doc_generation = [(doc, doc_id_to_generation[doc.doc_id]) for doc in docs]
doc_generation.sort(key=lambda x: x[1])

I wonder if we are missing test coverage (or maybe all current implementations happen to return them in order). I did want us to be flexible about it so that implementations could return documents in whatever is the fastest for them.

And if we are going to use this, I think sync_exchange should probably check that cur_gen > last_gen, and have it be an error otherwise. (We could probably allow gen to be 0 to indicate to ignore a given entry.)

review: Approve
155. By Samuele Pedroni

document the expectation that get_docs returns documents in doc_ids order

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'u1db/__init__.py'
2--- u1db/__init__.py 2011-12-12 19:40:50 +0000
3+++ u1db/__init__.py 2011-12-14 12:51:31 +0000
4@@ -76,7 +76,7 @@
5 :param doc_ids: A list of document identifiers.
6 :param check_for_conflicts: If set to False, then the conflict check
7 will be skipped, and 'None' will be returned instead of True/False.
8- :return: [Document] for each document id.
9+ :return: [Document] for each document id and matching doc_ids order.
10 """
11 raise NotImplementedError(self.get_docs)
12
13@@ -349,8 +349,8 @@
14 """
15 raise NotImplementedError(self.record_sync_info)
16
17- def sync_exchange(self, docs,
18- from_replica_uid, from_replica_generation,
19+ def sync_exchange(self, docs_by_generation,
20+ from_replica_uid,
21 last_known_generation, return_doc_cb):
22 """Incorporate the documents sent from the other replica.
23
24@@ -360,11 +360,16 @@
25 This adds docs to the local store, and determines documents that need
26 to be returned to the other replica.
27
28- :param docs: A list of [Document] objects indicating
29- documents which should be updated on this replica.
30+ Documents must be supplied in docs_by_generation paired with
31+ the generation of their latest change in order from the oldest
32+ change to the newest, that means from the oldest generation to
33+ the newest.
34+
35+ :param docs_by_generation: A list of [(Document, generation)]
36+ pairs indicating documents which should be updated on
37+ this replica paired with the generation of their
38+ latest change.
39 :param from_replica_uid: The other replica's identifier
40- :param from_replica_generation: The db generation for the other replica
41- indicating the tip of data being sent by docs_info.
42 :param last_known_generation: The last generation that other replica
43 knows about this
44 :param: return_doc_cb(doc): is a callback
45
46=== modified file 'u1db/remote/http_app.py'
47--- u1db/remote/http_app.py 2011-12-12 20:59:22 +0000
48+++ u1db/remote/http_app.py 2011-12-14 12:51:31 +0000
49@@ -283,19 +283,17 @@
50
51 @http_method(from_replica_generation=int, last_known_generation=int,
52 content_as_args=True)
53- def post_args(self, last_known_generation, from_replica_generation):
54- self.from_replica_generation = from_replica_generation
55+ def post_args(self, last_known_generation):
56 self.last_known_generation = last_known_generation
57 self.sync_exch = self.target.get_sync_exchange()
58
59 @http_method(content_as_args=True)
60- def post_stream_entry(self, id, rev, content):
61+ def post_stream_entry(self, id, rev, content, gen):
62 doc = Document(id, rev, content)
63 self.sync_exch.insert_doc_from_source(doc)
64+ self.sync_exch.record_sync_progress(self.from_replica_uid, gen)
65
66 def post_end(self):
67- self.sync_exch.record_sync_progress(self.from_replica_uid,
68- self.from_replica_generation)
69 def send_doc(doc):
70 entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.content)
71 self.responder.stream_entry(entry)
72
73=== modified file 'u1db/remote/http_target.py'
74--- u1db/remote/http_target.py 2011-12-06 20:26:29 +0000
75+++ u1db/remote/http_target.py 2011-12-14 12:51:31 +0000
76@@ -43,8 +43,7 @@
77 self._request_json('PUT', ['sync-from', other_replica_uid], {},
78 {'generation': other_replica_generation})
79
80- def sync_exchange(self, docs, from_replica_uid,
81- from_replica_generation,
82+ def sync_exchange(self, docs_by_generations, from_replica_uid,
83 last_known_generation, return_doc_cb):
84 self._ensure_connection()
85 self._conn.putrequest('POST',
86@@ -57,10 +56,10 @@
87 entry = simplejson.dumps(dic) + "\r\n"
88 entries.append(entry)
89 return len(entry)
90- size += prepare(last_known_generation=last_known_generation,
91- from_replica_generation=from_replica_generation)
92- for doc in docs:
93- size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.content)
94+ size += prepare(last_known_generation=last_known_generation)
95+ for doc, gen in docs_by_generations:
96+ size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.content,
97+ gen=gen)
98 self._conn.putheader('content-length', str(size))
99 self._conn.endheaders()
100 for entry in entries:
101
102=== modified file 'u1db/sync.py'
103--- u1db/sync.py 2011-12-13 09:47:57 +0000
104+++ u1db/sync.py 2011-12-14 12:51:31 +0000
105@@ -93,18 +93,19 @@
106 others_my_gen) = sync_target.get_sync_info(self.source._replica_uid)
107 # what's changed since that generation and this current gen
108 my_gen, changes = self.source.whats_changed(others_my_gen)
109- changed_doc_ids = set(doc_id for doc_id, _ in changes)
110+ changed_doc_ids = [doc_id for doc_id, _ in changes]
111 # prepare to send all the changed docs
112 docs_to_send = self.source.get_docs(changed_doc_ids,
113 check_for_conflicts=False)
114+ docs_by_generation = zip(docs_to_send, (gen for _, gen in changes))
115
116 # this source last-seen database generation for the target
117 other_last_known_gen = self.source.get_sync_generation(
118 other_replica_uid)
119 # exchange documents and try to insert the returned ones with
120 # the target, return target synced-up-to gen
121- new_gen = sync_target.sync_exchange(docs_to_send,
122- self.source._replica_uid, my_gen, other_last_known_gen,
123+ new_gen = sync_target.sync_exchange(docs_by_generation,
124+ self.source._replica_uid, other_last_known_gen,
125 return_doc_cb=self._insert_doc_from_target)
126 # record target synced-up-to generation
127 self.source.set_sync_generation(other_replica_uid, new_gen)
128@@ -232,16 +233,16 @@
129 def get_sync_exchange(self):
130 return SyncExchange(self._db)
131
132- def sync_exchange(self, docs,
133- from_replica_uid, from_replica_generation,
134+ def sync_exchange(self, docs_by_generations, from_replica_uid,
135 last_known_generation, return_doc_cb):
136 sync_exch = self.get_sync_exchange()
137 # 1st step: try to insert incoming docs
138- for doc in docs:
139+ for doc, _ in docs_by_generations:
140 sync_exch.insert_doc_from_source(doc)
141 # record progress
142- sync_exch.record_sync_progress(from_replica_uid,
143- from_replica_generation)
144+ if docs_by_generations:
145+ latest_gen = docs_by_generations[-1][1]
146+ sync_exch.record_sync_progress(from_replica_uid, latest_gen)
147 # 2nd step: find changed documents (including conflicts) to return
148 new_gen = sync_exch.find_docs_to_return(last_known_generation)
149 # final step: return docs and record source replica sync point
150
151=== modified file 'u1db/tests/test_backends.py'
152--- u1db/tests/test_backends.py 2011-12-12 19:40:50 +0000
153+++ u1db/tests/test_backends.py 2011-12-14 12:51:31 +0000
154@@ -637,9 +637,8 @@
155 st = self.db.get_sync_target()
156 def ignore(doc_id, doc_rev, doc):
157 pass
158- docs = [Document(doc.doc_id, other_rev, new_content)]
159- result = st.sync_exchange(docs, 'other-replica',
160- from_replica_generation=10,
161+ docs_by_gen = [(Document(doc.doc_id, other_rev, new_content), 10)]
162+ result = st.sync_exchange(docs_by_gen, 'other-replica',
163 last_known_generation=0,
164 return_doc_cb=ignore)
165 self.assertGetDoc(self.db, doc.doc_id, other_rev, new_content, False)
166
167=== modified file 'u1db/tests/test_http_app.py'
168--- u1db/tests/test_http_app.py 2011-12-06 20:38:52 +0000
169+++ u1db/tests/test_http_app.py 2011-12-14 12:51:31 +0000
170@@ -554,11 +554,29 @@
171 self.assertEqual(self.db0.get_sync_generation('other-id'), 2)
172
173 def test_sync_exchange_send(self):
174- entry = {'id': 'doc-here', 'rev': 'replica:1', 'content':
175- '{"value": "here"}'}
176- args = dict(from_replica_generation=10, last_known_generation=0)
177+ entries = {
178+ 10: {'id': 'doc-here', 'rev': 'replica:1', 'content':
179+ '{"value": "here"}', 'gen':10 },
180+ 11: {'id': 'doc-here2', 'rev': 'replica:1', 'content':
181+ '{"value": "here2"}', 'gen':11 }
182+ }
183+
184+ gens = []
185+ _set_sync_generation = self.db0.set_sync_generation
186+ def set_sync_generation_witness(other_uid, other_gen):
187+ gens.append((other_uid, other_gen))
188+ _set_sync_generation(other_uid, other_gen)
189+ self.assertGetDoc(self.db0, entries[other_gen]['id'],
190+ entries[other_gen]['rev'],
191+ entries[other_gen]['content'], False)
192+
193+ self.patch(self.db0, 'set_sync_generation',
194+ set_sync_generation_witness)
195+
196+ args = dict(last_known_generation=0)
197 body = ("%s\r\n" % simplejson.dumps(args) +
198- "%s\r\n" % simplejson.dumps(entry))
199+ "%s\r\n" % simplejson.dumps(entries[10]) +
200+ "%s\r\n" % simplejson.dumps(entries[11]))
201 resp = self.app.post('/db0/sync-from/replica',
202 params=body,
203 headers={'content-type':
204@@ -566,13 +584,12 @@
205 self.assertEqual(200, resp.status)
206 self.assertEqual('application/x-u1db-multi-json',
207 resp.header('content-type'))
208- self.assertEqual({'new_generation': 1}, simplejson.loads(resp.body))
209- self.assertGetDoc(self.db0, 'doc-here', 'replica:1',
210- '{"value": "here"}', False)
211+ self.assertEqual({'new_generation': 2}, simplejson.loads(resp.body))
212+ self.assertEqual([('replica', 10), ('replica', 11)], gens)
213
214 def test_sync_exchange_receive(self):
215 doc = self.db0.create_doc('{"value": "there"}')
216- args = dict(from_replica_generation=10, last_known_generation=0)
217+ args = dict(last_known_generation=0)
218 body = "%s\r\n" % simplejson.dumps(args)
219 resp = self.app.post('/db0/sync-from/replica',
220 params=body,
221
222=== modified file 'u1db/tests/test_remote_sync_target.py'
223--- u1db/tests/test_remote_sync_target.py 2011-12-06 20:38:52 +0000
224+++ u1db/tests/test_remote_sync_target.py 2011-12-14 12:51:31 +0000
225@@ -15,10 +15,12 @@
226 """Tests for the remote sync targets"""
227
228 from wsgiref import simple_server
229+import cStringIO
230 #from paste import httpserver
231
232 from u1db import (
233 Document,
234+ errors,
235 tests,
236 )
237 from u1db.remote import (
238@@ -91,26 +93,67 @@
239 def receive_doc(doc):
240 other_docs.append((doc.doc_id, doc.rev, doc.content))
241 new_gen = remote_target.sync_exchange(
242- [Document('doc-here', 'replica:1', {'value': 'here'})],
243- 'replica', from_replica_generation=10,
244- last_known_generation=0, return_doc_cb=receive_doc)
245+ [(Document('doc-here', 'replica:1', '{"value": "here"}'), 10)],
246+ 'replica', last_known_generation=0,
247+ return_doc_cb=receive_doc)
248 self.assertEqual(1, new_gen)
249- self.assertGetDoc(db, 'doc-here', 'replica:1', {'value': 'here'}, False)
250+ self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}',
251+ False)
252+
253+ def test_sync_exchange_send_failure_and_retry_scenario(self):
254+ self.startServer()
255+ def blackhole_getstderr(inst):
256+ return cStringIO.StringIO()
257+ self.patch(self.server.RequestHandlerClass, 'get_stderr',
258+ blackhole_getstderr)
259+ db = self.request_state._create_database('test')
260+ _put_doc_if_newer = db.put_doc_if_newer
261+ trigger_ids = ['doc-here2']
262+ def bomb_put_doc_if_newer(doc):
263+ if doc.doc_id in trigger_ids:
264+ raise Exception
265+ return _put_doc_if_newer(doc)
266+ self.patch(db, 'put_doc_if_newer', bomb_put_doc_if_newer)
267+ remote_target = self.getSyncTarget('test')
268+ other_docs = []
269+ def receive_doc(doc):
270+ other_docs.append((doc.doc_id, doc.rev, doc.content))
271+ self.assertRaises(errors.HTTPError, remote_target.sync_exchange,
272+ [(Document('doc-here', 'replica:1', '{"value": "here"}'), 10),
273+ (Document('doc-here2', 'replica:1', '{"value": "here2"}'), 11)
274+ ], 'replica', last_known_generation=0,
275+ return_doc_cb=receive_doc)
276+ self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}',
277+ False)
278+ self.assertEqual(10, db.get_sync_generation('replica'))
279+ self.assertEqual([], other_docs)
280+ # retry
281+ trigger_ids = []
282+ new_gen = remote_target.sync_exchange(
283+ [(Document('doc-here2', 'replica:1', '{"value": "here2"}'), 11)
284+ ], 'replica', last_known_generation=0,
285+ return_doc_cb=receive_doc)
286+ self.assertGetDoc(db, 'doc-here2', 'replica:1', '{"value": "here2"}',
287+ False)
288+ self.assertEqual(11, db.get_sync_generation('replica'))
289+ self.assertEqual(2, new_gen)
290+ # bounced back to us
291+ self.assertEqual([('doc-here', 'replica:1', '{"value": "here"}')],
292+ other_docs)
293
294 def test_sync_exchange_receive(self):
295 self.startServer()
296 db = self.request_state._create_database('test')
297- doc = db.create_doc({'value': 'there'})
298+ doc = db.create_doc('{"value": "there"}')
299 remote_target = self.getSyncTarget('test')
300 other_docs = []
301 def receive_doc(doc):
302 other_docs.append((doc.doc_id, doc.rev, doc.content))
303 new_gen = remote_target.sync_exchange(
304- [],
305- 'replica', from_replica_generation=10,
306- last_known_generation=0, return_doc_cb=receive_doc)
307+ [], 'replica', last_known_generation=0,
308+ return_doc_cb=receive_doc)
309 self.assertEqual(1, new_gen)
310- self.assertEqual([(doc.doc_id, doc.rev, {'value': 'there'})],
311+ self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}')],
312 other_docs)
313
314
315
316=== modified file 'u1db/tests/test_sync.py'
317--- u1db/tests/test_sync.py 2011-12-12 20:38:45 +0000
318+++ u1db/tests/test_sync.py 2011-12-14 12:51:31 +0000
319@@ -92,9 +92,8 @@
320 self.assertEqual(('test', 0, 10), self.st.get_sync_info('replica'))
321
322 def test_sync_exchange(self):
323- docs = [Document('doc-id', 'replica:1', simple_doc)]
324- new_gen = self.st.sync_exchange(docs,
325- 'replica', from_replica_generation=10,
326+ docs_by_gen = [(Document('doc-id', 'replica:1', simple_doc), 10)]
327+ new_gen = self.st.sync_exchange(docs_by_gen, 'replica',
328 last_known_generation=0,
329 return_doc_cb=self.receive_doc)
330 self.assertGetDoc(self.db, 'doc-id', 'replica:1', simple_doc, False)
331@@ -102,13 +101,24 @@
332 self.assertEqual(([], 1), (self.other_docs, new_gen))
333 self.assertEqual(10, self.st.get_sync_info('replica')[-1])
334
335+ def test_sync_exchange_push_many(self):
336+ docs_by_gen = [(Document('doc-id', 'replica:1', simple_doc), 10),
337+ (Document('doc-id2', 'replica:1', nested_doc), 11)]
338+ new_gen = self.st.sync_exchange(docs_by_gen, 'replica',
339+ last_known_generation=0,
340+ return_doc_cb=self.receive_doc)
341+ self.assertGetDoc(self.db, 'doc-id', 'replica:1', simple_doc, False)
342+ self.assertGetDoc(self.db, 'doc-id2', 'replica:1', nested_doc, False)
343+ self.assertEqual(['doc-id', 'doc-id2'], self.db._get_transaction_log())
344+ self.assertEqual(([], 2), (self.other_docs, new_gen))
345+ self.assertEqual(11, self.st.get_sync_info('replica')[-1])
346+
347 def test_sync_exchange_refuses_conflicts(self):
348 doc = self.db.create_doc(simple_doc)
349 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
350 new_doc = '{"key": "altval"}'
351- docs = [Document(doc.doc_id, 'replica:1', new_doc)]
352- new_gen = self.st.sync_exchange(docs,
353- 'replica', from_replica_generation=10,
354+ docs_by_gen = [(Document(doc.doc_id, 'replica:1', new_doc), 10)]
355+ new_gen = self.st.sync_exchange(docs_by_gen, 'replica',
356 last_known_generation=0,
357 return_doc_cb=self.receive_doc)
358 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
359@@ -120,9 +130,8 @@
360 def test_sync_exchange_ignores_convergence(self):
361 doc = self.db.create_doc(simple_doc)
362 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
363- docs = [Document(doc.doc_id, doc.rev, simple_doc)]
364- new_gen = self.st.sync_exchange(docs,
365- 'replica', from_replica_generation=10,
366+ docs_by_gen = [(Document(doc.doc_id, doc.rev, simple_doc), 10)]
367+ new_gen = self.st.sync_exchange(docs_by_gen, 'replica',
368 last_known_generation=1,
369 return_doc_cb=self.receive_doc)
370 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
371@@ -132,7 +141,6 @@
372 doc = self.db.create_doc(simple_doc)
373 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
374 new_gen = self.st.sync_exchange([], 'other-replica',
375- from_replica_generation=10,
376 last_known_generation=0,
377 return_doc_cb=self.receive_doc)
378 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
379@@ -145,10 +153,8 @@
380 doc = self.db.create_doc(simple_doc)
381 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
382 new_doc = '{"key": "altval"}'
383- docs = [Document(doc.doc_id, 'test:1|z:2', new_doc)]
384- new_gen = self.st.sync_exchange(docs,
385- 'other-replica',
386- from_replica_generation=10,
387+ docs_by_gen = [(Document(doc.doc_id, 'test:1|z:2', new_doc), 10)]
388+ new_gen = self.st.sync_exchange(docs_by_gen, 'other-replica',
389 last_known_generation=0,
390 return_doc_cb=self.receive_doc)
391 self.assertEqual([doc.doc_id, doc.doc_id],
392@@ -165,10 +171,8 @@
393 return val
394 self.db.whats_changed = after_whatschanged
395 new_doc = '{"key": "altval"}'
396- docs = [Document(doc.doc_id, 'test:1|z:2', new_doc)]
397- new_gen = self.st.sync_exchange(docs,
398- 'other-replica',
399- from_replica_generation=10,
400+ docs_by_gen = [(Document(doc.doc_id, 'test:1|z:2', new_doc), 10)]
401+ new_gen = self.st.sync_exchange(docs_by_gen, 'other-replica',
402 last_known_generation=0,
403 return_doc_cb=self.receive_doc)
404 self.assertEqual(([], 2), (self.other_docs, new_gen))
405@@ -188,8 +192,7 @@
406 self.assertEqual(0, self.sync(self.db1, self.db2))
407 self.assertEqual(0, self.db1.get_sync_generation('test2'))
408 self.assertEqual(0, self.db2.get_sync_generation('test1'))
409- self.assertEqual({'receive': {'docs': [], 'from_id': 'test1',
410- 'from_gen': 0, 'last_known_gen': 0},
411+ self.assertEqual({'receive': {'docs': [], 'last_known_gen': 0},
412 'return': {'docs': [], 'last_gen': 0}},
413 self.db2._last_exchange_log)
414
415@@ -212,8 +215,7 @@
416 self.assertGetDoc(self.db1, doc.doc_id, doc.rev, simple_doc, False)
417 self.assertEqual(1, self.db1.get_sync_generation('test2'))
418 self.assertEqual(1, self.db2.get_sync_generation('test1'))
419- self.assertEqual({'receive': {'docs': [], 'from_id': 'test1',
420- 'from_gen': 0, 'last_known_gen': 0},
421+ self.assertEqual({'receive': {'docs': [], 'last_known_gen': 0},
422 'return': {'docs': [(doc.doc_id, doc.rev)],
423 'last_gen': 1}},
424 self.db2._last_exchange_log)
425@@ -236,8 +238,7 @@
426 return result
427 self.db1.put_doc_if_newer = after_put_doc_if_newer
428 self.assertEqual(0, self.sync(self.db1, self.db2))
429- self.assertEqual({'receive': {'docs': [], 'from_id': 'test1',
430- 'from_gen': 0, 'last_known_gen': 0},
431+ self.assertEqual({'receive': {'docs': [], 'last_known_gen': 0},
432 'return': {'docs': [(doc.doc_id, doc.rev)],
433 'last_gen': 1}},
434 self.db2._last_exchange_log)

Subscribers

People subscribed via source and target branches