Merge lp:~pedronis/u1db/sync-returns-doc-in-gen-order into lp:u1db

Proposed by Samuele Pedroni
Status: Merged
Approved by: John A Meinel
Approved revision: 149
Merged at revision: 150
Proposed branch: lp:~pedronis/u1db/sync-returns-doc-in-gen-order
Merge into: lp:u1db
Diff against target: 446 lines (+143/-58)
8 files modified
u1db/__init__.py (+8/-3)
u1db/backends/inmemory.py (+2/-1)
u1db/remote/http_app.py (+5/-3)
u1db/remote/http_target.py (+1/-1)
u1db/sync.py (+28/-22)
u1db/tests/test_http_app.py (+9/-5)
u1db/tests/test_remote_sync_target.py (+11/-11)
u1db/tests/test_sync.py (+79/-12)
To merge this branch: bzr merge lp:~pedronis/u1db/sync-returns-doc-in-gen-order
Reviewer Review Type Date Requested Status
Ubuntu One hackers Pending
Review via email: mp+85684@code.launchpad.net

Description of the change

return synced documents from the target to the source in generation order, integration test to check that this is used to set known last generation incrementally as we get the docs

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'u1db/__init__.py'
2--- u1db/__init__.py 2011-12-14 12:43:53 +0000
3+++ u1db/__init__.py 2011-12-14 16:05:29 +0000
4@@ -365,6 +365,10 @@
5 change to the newest, that means from the oldest generation to
6 the newest.
7
8+ Documents are also returned paired with the generation of
9+ their latest change in order from the oldest change to the
10+ newest.
11+
12 :param docs_by_generation: A list of [(Document, generation)]
13 pairs indicating documents which should be updated on
14 this replica paired with the generation of their
15@@ -372,11 +376,12 @@
16 :param from_replica_uid: The other replica's identifier
17 :param last_known_generation: The last generation that other replica
18 knows about this
19- :param: return_doc_cb(doc): is a callback
20+ :param: return_doc_cb(doc, gen): is a callback
21 used to return documents to the other replica, it will
22 be invoked in turn with Documents that have changed since
23- last_known_generation.
24- :return: new_generation - After applying docs_info, this is
25+ last_known_generation together with the generation of
26+ their last change.
27+ :return: new_generation - After applying docs_by_generation, this is
28 the current generation for this replica
29 """
30 raise NotImplementedError(self.sync_exchange)
31
32=== modified file 'u1db/backends/inmemory.py'
33--- u1db/backends/inmemory.py 2011-12-13 09:27:56 +0000
34+++ u1db/backends/inmemory.py 2011-12-14 16:05:29 +0000
35@@ -54,7 +54,8 @@
36 return InMemorySyncTarget(self)
37
38 def _get_transaction_log(self):
39- return self._transaction_log
40+ # snapshot!
41+ return self._transaction_log[:]
42
43 def _get_generation(self):
44 return len(self._transaction_log)
45
46=== modified file 'u1db/remote/http_app.py'
47--- u1db/remote/http_app.py 2011-12-13 15:27:18 +0000
48+++ u1db/remote/http_app.py 2011-12-14 16:05:29 +0000
49@@ -294,10 +294,12 @@
50 self.sync_exch.record_sync_progress(self.from_replica_uid, gen)
51
52 def post_end(self):
53- def send_doc(doc):
54- entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.content)
55+ def send_doc(doc, gen):
56+ entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.content,
57+ gen=gen)
58 self.responder.stream_entry(entry)
59- new_gen = self.sync_exch.find_docs_to_return(self.last_known_generation)
60+ new_gen = self.sync_exch.find_changes_to_return(
61+ self.last_known_generation)
62 self.responder.content_type = 'application/x-u1db-multi-json'
63 self.responder.start_response(200, {"new_generation": new_gen})
64 new_gen = self.sync_exch.return_docs(send_doc)
65
66=== modified file 'u1db/remote/http_target.py'
67--- u1db/remote/http_target.py 2011-12-14 10:37:17 +0000
68+++ u1db/remote/http_target.py 2011-12-14 16:05:29 +0000
69@@ -71,7 +71,7 @@
70 for entry in data[1:]:
71 entry = simplejson.loads(entry)
72 doc = Document(entry['id'], entry['rev'], entry['content'])
73- return_doc_cb(doc)
74+ return_doc_cb(doc, entry['gen'])
75 data = None
76 return res['new_generation']
77
78
79=== modified file 'u1db/sync.py'
80--- u1db/sync.py 2011-12-14 10:37:17 +0000
81+++ u1db/sync.py 2011-12-14 16:05:29 +0000
82@@ -104,10 +104,15 @@
83 other_replica_uid)
84 # exchange documents and try to insert the returned ones with
85 # the target, return target synced-up-to gen
86+ def take_doc(doc, gen):
87+ self._insert_doc_from_target(doc)
88+ # record target synced-up-to generation
89+ self.source.set_sync_generation(other_replica_uid, gen)
90+
91 new_gen = sync_target.sync_exchange(docs_by_generation,
92 self.source._replica_uid, other_last_known_gen,
93- return_doc_cb=self._insert_doc_from_target)
94- # record target synced-up-to generation
95+ return_doc_cb=take_doc)
96+ # record target synced-up-to generation including applying what we sent
97 self.source.set_sync_generation(other_replica_uid, new_gen)
98
99 # if gapless record current reached generation with target
100@@ -122,7 +127,7 @@
101 def __init__(self, db):
102 self._db = db
103 self.seen_ids = set() # incoming ids not superseded
104- self.doc_ids_to_return = None
105+ self.changes_to_return = None
106 self.new_gen = None
107 # for tests
108 self._incoming_trace = []
109@@ -177,13 +182,12 @@
110 'from_gen': from_replica_generation
111 })
112
113- def find_docs_to_return(self, last_known_generation):
114- """Find and further mark documents to return to the sync source.
115+ def find_changes_to_return(self, last_known_generation):
116+ """Find changes to return.
117
118- This finds the document identifiers for any documents that
119- have been updated since last_known_generation. It excludes
120- documents ids that have already been considered
121- (superseded by the sender, etc).
122+ Find changes since last_known_generation in db generation
123+ order using whats_changed. It excludes documents ids that have
124+ already been considered (superseded by the sender, etc).
125
126 :return: new_generation - the generation of this database
127 which the caller can consider themselves to be synchronized after
128@@ -193,30 +197,32 @@
129 'last_known_gen': last_known_generation
130 })
131 gen, changes = self._db.whats_changed(last_known_generation)
132- changed_doc_ids = set(doc_id for doc_id, _ in changes)
133 self.new_gen = gen
134 seen_ids = self.seen_ids
135 # changed docs that weren't superseded by or converged with
136- self.doc_ids_to_return = set(doc_id for doc_id in changed_doc_ids
137- if doc_id not in seen_ids)
138+ self.changes_to_return = [(doc_id, gen) for (doc_id, gen) in changes
139+ if doc_id not in seen_ids]
140 return gen
141
142 def return_docs(self, return_doc_cb):
143- """Return the marked documents repeatedly invoking the callback
144- return_doc_cb.
145+ """Return the changed documents and their last change generation
146+ repeatedly invoking the callback return_doc_cb.
147
148 The final step of a sync exchange.
149
150- :param: return_doc_cb(doc): is a callback
151- used to return the marked documents to the target replica,
152+ :param: return_doc_cb(doc, gen): is a callback
153+ used to return the documents with their last change generation
154+ to the target replica.
155 :return: None
156 """
157- doc_ids_to_return = self.doc_ids_to_return
158+ changes_to_return = self.changes_to_return
159 # return docs, including conflicts
160- docs = self._db.get_docs(doc_ids_to_return,
161- check_for_conflicts=False)
162- for doc in docs:
163- return_doc_cb(doc)
164+ changed_doc_ids = [doc_id for doc_id, _ in changes_to_return]
165+ docs = self._db.get_docs(changed_doc_ids, check_for_conflicts=False)
166+
167+ docs_by_gen = zip(docs, (gen for _, gen in changes_to_return))
168+ for doc, gen in docs_by_gen:
169+ return_doc_cb(doc, gen)
170 # for tests
171 self._db._last_exchange_log['return'] = {
172 'docs': [(d.doc_id, d.rev) for d in docs],
173@@ -244,7 +250,7 @@
174 latest_gen = docs_by_generations[-1][1]
175 sync_exch.record_sync_progress(from_replica_uid, latest_gen)
176 # 2nd step: find changed documents (including conflicts) to return
177- new_gen = sync_exch.find_docs_to_return(last_known_generation)
178+ new_gen = sync_exch.find_changes_to_return(last_known_generation)
179 # final step: return docs and record source replica sync point
180 sync_exch.return_docs(return_doc_cb)
181 return new_gen
182
183=== modified file 'u1db/tests/test_http_app.py'
184--- u1db/tests/test_http_app.py 2011-12-13 15:27:18 +0000
185+++ u1db/tests/test_http_app.py 2011-12-14 16:05:29 +0000
186@@ -556,9 +556,9 @@
187 def test_sync_exchange_send(self):
188 entries = {
189 10: {'id': 'doc-here', 'rev': 'replica:1', 'content':
190- '{"value": "here"}', 'gen':10 },
191+ '{"value": "here"}', 'gen':10},
192 11: {'id': 'doc-here2', 'rev': 'replica:1', 'content':
193- '{"value": "here2"}', 'gen':11 }
194+ '{"value": "here2"}', 'gen': 11}
195 }
196
197 gens = []
198@@ -589,6 +589,7 @@
199
200 def test_sync_exchange_receive(self):
201 doc = self.db0.create_doc('{"value": "there"}')
202+ doc2 = self.db0.create_doc('{"value": "there2"}')
203 args = dict(last_known_generation=0)
204 body = "%s\r\n" % simplejson.dumps(args)
205 resp = self.app.post('/db0/sync-from/replica',
206@@ -599,11 +600,14 @@
207 self.assertEqual('application/x-u1db-multi-json',
208 resp.header('content-type'))
209 parts = resp.body.splitlines()
210- self.assertEqual(2, len(parts))
211- self.assertEqual({'new_generation': 1}, simplejson.loads(parts[0]))
212+ self.assertEqual(3, len(parts))
213+ self.assertEqual({'new_generation': 2}, simplejson.loads(parts[0]))
214 self.assertEqual({'content': '{"value": "there"}',
215- 'rev': doc.rev, 'id': doc.doc_id},
216+ 'rev': doc.rev, 'id': doc.doc_id, 'gen': 1},
217 simplejson.loads(parts[1]))
218+ self.assertEqual({'content': '{"value": "there2"}',
219+ 'rev': doc2.rev, 'id': doc2.doc_id, 'gen': 2},
220+ simplejson.loads(parts[2]))
221
222
223 class TestHTTPErrors(tests.TestCase):
224
225=== modified file 'u1db/tests/test_remote_sync_target.py'
226--- u1db/tests/test_remote_sync_target.py 2011-12-14 11:05:51 +0000
227+++ u1db/tests/test_remote_sync_target.py 2011-12-14 16:05:29 +0000
228@@ -115,9 +115,9 @@
229 return _put_doc_if_newer(doc)
230 self.patch(db, 'put_doc_if_newer', bomb_put_doc_if_newer)
231 remote_target = self.getSyncTarget('test')
232- other_docs = []
233- def receive_doc(doc):
234- other_docs.append((doc.doc_id, doc.rev, doc.content))
235+ other_changes = []
236+ def receive_doc(doc, gen):
237+ other_changes.append((doc.doc_id, doc.rev, doc.content, gen))
238 self.assertRaises(errors.HTTPError, remote_target.sync_exchange,
239 [(Document('doc-here', 'replica:1', '{"value": "here"}'), 10),
240 (Document('doc-here2', 'replica:1', '{"value": "here2"}'), 11)
241@@ -126,7 +126,7 @@
242 self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}',
243 False)
244 self.assertEqual(10, db.get_sync_generation('replica'))
245- self.assertEqual([], other_docs)
246+ self.assertEqual([], other_changes)
247 # retry
248 trigger_ids = []
249 new_gen = remote_target.sync_exchange(
250@@ -138,23 +138,23 @@
251 self.assertEqual(11, db.get_sync_generation('replica'))
252 self.assertEqual(2, new_gen)
253 # bounced back to us
254- self.assertEqual([('doc-here', 'replica:1', '{"value": "here"}')],
255- other_docs)
256+ self.assertEqual([('doc-here', 'replica:1', '{"value": "here"}', 1)],
257+ other_changes)
258
259 def test_sync_exchange_receive(self):
260 self.startServer()
261 db = self.request_state._create_database('test')
262 doc = db.create_doc('{"value": "there"}')
263 remote_target = self.getSyncTarget('test')
264- other_docs = []
265- def receive_doc(doc):
266- other_docs.append((doc.doc_id, doc.rev, doc.content))
267+ other_changes = []
268+ def receive_doc(doc, gen):
269+ other_changes.append((doc.doc_id, doc.rev, doc.content, gen))
270 new_gen = remote_target.sync_exchange(
271 [], 'replica', last_known_generation=0,
272 return_doc_cb=receive_doc)
273 self.assertEqual(1, new_gen)
274- self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}')],
275- other_docs)
276+ self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}', 1)],
277+ other_changes)
278
279
280 load_tests = tests.load_with_scenarios
281
282=== modified file 'u1db/tests/test_sync.py'
283--- u1db/tests/test_sync.py 2011-12-14 10:37:17 +0000
284+++ u1db/tests/test_sync.py 2011-12-14 16:05:29 +0000
285@@ -64,7 +64,7 @@
286 def setUp(self):
287 super(DatabaseSyncTargetTests, self).setUp()
288 self.db, self.st = self.create_db_and_target(self)
289- self.other_docs = []
290+ self.other_changes = []
291
292 def tearDown(self):
293 # We delete them explicitly, so that connections are cleanly closed
294@@ -72,8 +72,8 @@
295 del self.st
296 super(DatabaseSyncTargetTests, self).tearDown()
297
298- def receive_doc(self, doc):
299- self.other_docs.append((doc.doc_id, doc.rev, doc.content))
300+ def receive_doc(self, doc, gen):
301+ self.other_changes.append((doc.doc_id, doc.rev, doc.content, gen))
302
303 def test_get_sync_target(self):
304 self.assertIsNot(None, self.st)
305@@ -98,7 +98,7 @@
306 return_doc_cb=self.receive_doc)
307 self.assertGetDoc(self.db, 'doc-id', 'replica:1', simple_doc, False)
308 self.assertEqual(['doc-id'], self.db._get_transaction_log())
309- self.assertEqual(([], 1), (self.other_docs, new_gen))
310+ self.assertEqual(([], 1), (self.other_changes, new_gen))
311 self.assertEqual(10, self.st.get_sync_info('replica')[-1])
312
313 def test_sync_exchange_push_many(self):
314@@ -110,7 +110,7 @@
315 self.assertGetDoc(self.db, 'doc-id', 'replica:1', simple_doc, False)
316 self.assertGetDoc(self.db, 'doc-id2', 'replica:1', nested_doc, False)
317 self.assertEqual(['doc-id', 'doc-id2'], self.db._get_transaction_log())
318- self.assertEqual(([], 2), (self.other_docs, new_gen))
319+ self.assertEqual(([], 2), (self.other_changes, new_gen))
320 self.assertEqual(11, self.st.get_sync_info('replica')[-1])
321
322 def test_sync_exchange_refuses_conflicts(self):
323@@ -122,8 +122,8 @@
324 last_known_generation=0,
325 return_doc_cb=self.receive_doc)
326 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
327- self.assertEqual(([(doc.doc_id, doc.rev, simple_doc)], 1),
328- (self.other_docs, new_gen))
329+ self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1)], 1),
330+ (self.other_changes, new_gen))
331 self.assertEqual(self.db._last_exchange_log['return'],
332 {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
333
334@@ -135,7 +135,7 @@
335 last_known_generation=1,
336 return_doc_cb=self.receive_doc)
337 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
338- self.assertEqual(([], 1), (self.other_docs, new_gen))
339+ self.assertEqual(([], 1), (self.other_changes, new_gen))
340
341 def test_sync_exchange_returns_new_docs(self):
342 doc = self.db.create_doc(simple_doc)
343@@ -144,11 +144,28 @@
344 last_known_generation=0,
345 return_doc_cb=self.receive_doc)
346 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
347- self.assertEqual(([(doc.doc_id, doc.rev, simple_doc)], 1),
348- (self.other_docs, new_gen))
349+ self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1)], 1),
350+ (self.other_changes, new_gen))
351 self.assertEqual(self.db._last_exchange_log['return'],
352 {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
353
354+ def test_sync_exchange_returns_many_new_docs(self):
355+ doc = self.db.create_doc(simple_doc)
356+ doc2 = self.db.create_doc(nested_doc)
357+ self.assertEqual([doc.doc_id, doc2.doc_id],
358+ self.db._get_transaction_log())
359+ new_gen = self.st.sync_exchange([], 'other-replica',
360+ last_known_generation=0,
361+ return_doc_cb=self.receive_doc)
362+ self.assertEqual([doc.doc_id, doc2.doc_id],
363+ self.db._get_transaction_log())
364+ self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1),
365+ (doc2.doc_id, doc2.rev, nested_doc, 2)], 2),
366+ (self.other_changes, new_gen))
367+ self.assertEqual(self.db._last_exchange_log['return'],
368+ {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev),
369+ (doc2.doc_id, doc2.rev)]})
370+
371 def test_sync_exchange_getting_newer_docs(self):
372 doc = self.db.create_doc(simple_doc)
373 self.assertEqual([doc.doc_id], self.db._get_transaction_log())
374@@ -159,7 +176,7 @@
375 return_doc_cb=self.receive_doc)
376 self.assertEqual([doc.doc_id, doc.doc_id],
377 self.db._get_transaction_log())
378- self.assertEqual(([], 2), (self.other_docs, new_gen))
379+ self.assertEqual(([], 2), (self.other_changes, new_gen))
380
381 def test_sync_exchange_with_concurrent_updates(self):
382 doc = self.db.create_doc(simple_doc)
383@@ -175,7 +192,7 @@
384 new_gen = self.st.sync_exchange(docs_by_gen, 'other-replica',
385 last_known_generation=0,
386 return_doc_cb=self.receive_doc)
387- self.assertEqual(([], 2), (self.other_docs, new_gen))
388+ self.assertEqual(([], 2), (self.other_changes, new_gen))
389
390
391 class DatabaseSyncTests(tests.DatabaseBaseTests):
392@@ -440,4 +457,54 @@
393 False)
394
395
396+class TestRemoteSyncIntegration(tests.TestCaseWithServer):
397+ """Integration tests for the most common sync scenario local -> remote"""
398+
399+ server_def = staticmethod(http_server_def)
400+
401+ def setUp(self):
402+ super(TestRemoteSyncIntegration, self).setUp()
403+ self.startServer()
404+ self.db1 = inmemory.InMemoryDatabase('test1')
405+ self.db2 = self.request_state._create_database('test2')
406+
407+ def test_sync_tracks_generations_incrementally(self):
408+ doc11 = self.db1.create_doc('{"a": 1}')
409+ doc12 = self.db1.create_doc('{"a": 2}')
410+ doc21 = self.db2.create_doc('{"b": 1}')
411+ doc22 = self.db2.create_doc('{"b": 2}')
412+ #sanity
413+ self.assertEqual(2, len(self.db1._get_transaction_log()))
414+ self.assertEqual(2, len(self.db2._get_transaction_log()))
415+ progress1 = []
416+ progress2 = []
417+ _set_sync_generation1 = self.db1.set_sync_generation
418+ def set_sync_generation_witness1(other_uid, other_gen):
419+ progress1.append((other_uid, other_gen,
420+ self.db1._get_transaction_log()[2:]))
421+ _set_sync_generation1(other_uid, other_gen)
422+ self.patch(self.db1, 'set_sync_generation',
423+ set_sync_generation_witness1)
424+
425+ _set_sync_generation2 = self.db2.set_sync_generation
426+ def set_sync_generation_witness2(other_uid, other_gen):
427+ progress2.append((other_uid, other_gen,
428+ self.db2._get_transaction_log()[2:]))
429+ _set_sync_generation2(other_uid, other_gen)
430+ self.patch(self.db2, 'set_sync_generation',
431+ set_sync_generation_witness2)
432+
433+ db2_url = self.getURL('test2')
434+ self.db1.sync(db2_url)
435+
436+ self.assertEqual([('test2', 1, [doc21.doc_id]),
437+ ('test2', 2, [doc21.doc_id, doc22.doc_id]),
438+ ('test2', 4, [doc21.doc_id, doc22.doc_id])],
439+ progress1)
440+ self.assertEqual([('test1', 1, [doc11.doc_id]),
441+ ('test1', 2, [doc11.doc_id, doc12.doc_id]),
442+ ('test1', 4, [doc11.doc_id, doc12.doc_id])],
443+ progress2)
444+
445+
446 load_tests = tests.load_with_scenarios

Subscribers

People subscribed via source and target branches