Merge lp:~pedronis/u1db/sync-simple-reorg-n-hide-details into lp:u1db

Proposed by Samuele Pedroni
Status: Merged
Approved by: John A Meinel
Approved revision: 151
Merged at revision: 147
Proposed branch: lp:~pedronis/u1db/sync-simple-reorg-n-hide-details
Merge into: lp:u1db
Diff against target: 307 lines (+64/-72)
3 files modified
u1db/remote/http_app.py (+3/-4)
u1db/sync.py (+46/-44)
u1db/tests/test_sync.py (+15/-24)
To merge this branch: bzr merge lp:~pedronis/u1db/sync-simple-reorg-n-hide-details
Reviewer Review Type Date Requested Status
Ubuntu One hackers Pending
Review via email: mp+85453@code.launchpad.net

Description of the change

some things I think are necessary for incremental sync:

- don't send/treat conflicts separately if the plan is to send back documents sorted somehow by generation of changes

- make recording the source last known generation its own helper if we want to call it repeatedly for the generation of incoming docs

To post a comment you must log in.
Revision history for this message
John A Meinel (jameinel) wrote :
Download full text (17.0 KiB)

I like this in general. I'm a little concerned about not tracking conflicts
separately. It might just be paranoia, though. So I'm trying to think
through it.

A-1 doc-x created
B-1 doc-x sync
A-2 x updates rev-A:2
B-2 x updated rev-A:1¦B:1 (maybe B:2 for gen)
A starts sync, sends doc-x A:2 gen 2
B skips the doc, does(n't?) record gen of A
B tries to return x@A:1¦B:1 but loses connection
A-3 doc-y
A starts sync, B wants everything since A-2
A sends y, and last B gen of 1
B will naturally return x@A:1¦B:1

So all of that seems fine. Can we poke holes in it to make it fail? It does
seem that anything which should be in conflict must have been updated since
the last successful synchronization.

So it seems ok to me, but we'll want to think carefully about edge cases.

John
=:->
On Dec 13, 2011 11:02 AM, "Samuele Pedroni" <email address hidden>
wrote:

> Samuele Pedroni has proposed merging
> lp:~pedronis/u1db/sync-simple-reorg-n-hide-details into lp:u1db.
>
> Requested reviews:
> Ubuntu One hackers (ubuntuone-hackers)
>
> For more details, see:
>
> https://code.launchpad.net/~pedronis/u1db/sync-simple-reorg-n-hide-details/+merge/85453
>
> some things I think are necessary for incremental sync:
>
> - don't send/treat conflicts separately if the plan is to send back
> documents sorted somehow by generation of changes
>
> - make recording the source last known generation its own helper if we
> want to call it repeatedly for the generation of incoming docs
> --
>
> https://code.launchpad.net/~pedronis/u1db/sync-simple-reorg-n-hide-details/+merge/85453
> Your team Ubuntu One hackers is requested to review the proposed merge of
> lp:~pedronis/u1db/sync-simple-reorg-n-hide-details into lp:u1db.
>
> === modified file 'u1db/remote/http_app.py'
> --- u1db/remote/http_app.py 2011-12-06 20:32:48 +0000
> +++ u1db/remote/http_app.py 2011-12-13 09:59:32 +0000
> @@ -294,16 +294,15 @@
> self.sync_exch.insert_doc_from_source(doc)
>
> def post_end(self):
> + self.sync_exch.record_sync_progress(self.from_replica_uid,
> + self.from_replica_generation)
> def send_doc(doc):
> entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.content)
> self.responder.stream_entry(entry)
> new_gen =
> self.sync_exch.find_docs_to_return(self.last_known_generation)
> self.responder.content_type = 'application/x-u1db-multi-json'
> self.responder.start_response(200, {"new_generation": new_gen})
> - new_gen = self.sync_exch.return_docs_and_record_sync(
> - self.from_replica_uid,
> -
> self.from_replica_generation,
> - send_doc)
> + new_gen = self.sync_exch.return_docs(send_doc)
> self.responder.finish_response()
>
>
>
> === modified file 'u1db/sync.py'
> --- u1db/sync.py 2011-12-12 19:40:50 +0000
> +++ u1db/sync.py 2011-12-13 09:59:32 +0000
> @@ -120,19 +120,21 @@
>
> def __init__(self, db):
> self._db = db
> - self.seen_ids = set() # incoming ids not superseded (or
> conflicted)
> + self.seen_id...

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'u1db/remote/http_app.py'
2--- u1db/remote/http_app.py 2011-12-06 20:32:48 +0000
3+++ u1db/remote/http_app.py 2011-12-13 09:59:32 +0000
4@@ -294,16 +294,15 @@
5 self.sync_exch.insert_doc_from_source(doc)
6
7 def post_end(self):
8+ self.sync_exch.record_sync_progress(self.from_replica_uid,
9+ self.from_replica_generation)
10 def send_doc(doc):
11 entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.content)
12 self.responder.stream_entry(entry)
13 new_gen = self.sync_exch.find_docs_to_return(self.last_known_generation)
14 self.responder.content_type = 'application/x-u1db-multi-json'
15 self.responder.start_response(200, {"new_generation": new_gen})
16- new_gen = self.sync_exch.return_docs_and_record_sync(
17- self.from_replica_uid,
18- self.from_replica_generation,
19- send_doc)
20+ new_gen = self.sync_exch.return_docs(send_doc)
21 self.responder.finish_response()
22
23
24
25=== modified file 'u1db/sync.py'
26--- u1db/sync.py 2011-12-12 19:40:50 +0000
27+++ u1db/sync.py 2011-12-13 09:59:32 +0000
28@@ -120,19 +120,21 @@
29
30 def __init__(self, db):
31 self._db = db
32- self.seen_ids = set() # incoming ids not superseded (or conflicted)
33+ self.seen_ids = set() # incoming ids not superseded
34 self.doc_ids_to_return = None
35- self.conflict_ids = set()
36 self.new_gen = None
37 # for tests
38 self._incoming_trace = []
39- self._last_known_generation = None
40+ self._db._last_exchange_log = {
41+ 'receive': {'docs': self._incoming_trace},
42+ 'return': None
43+ }
44
45 def insert_doc_from_source(self, doc):
46 """Try to insert synced document from source.
47
48- Conflicting documents are not inserted but the current revision
49- marked to be sent over to the sync source.
50+ Conflicting documents are not inserted but will be sent over
51+ to the sync source.
52
53 The 1st step of a sync exchange is to call this repeatedly to
54 try insert all incoming documents from the source.
55@@ -150,76 +152,75 @@
56 # we have something newer that we will return
57 pass
58 else:
59- # conflict, returned independently
60+ # conflict that we will returne
61 assert state == 'conflicted'
62- self.seen_ids.add(doc.doc_id)
63- self.conflict_ids.add(doc.doc_id)
64 # for tests
65 self._incoming_trace.append((doc.doc_id, doc.rev))
66
67+ def record_sync_progress(self, from_replica_uid, from_replica_generation):
68+ """Record the sync information of from_replica_uid
69+ the sync source identifier and the generation until which it
70+ sent its documents from_replica_generation.
71+
72+ :param from_replica_uid: The source replica's identifier
73+ :param from_replica_generation: The db generation for the
74+ source replica indicating the tip of data that was sent.
75+ :return: None
76+ """
77+ # record sync point
78+ self._db.set_sync_generation(from_replica_uid,
79+ from_replica_generation)
80+ # for tests
81+ self._db._last_exchange_log['receive'].update({
82+ 'from_id': from_replica_uid,
83+ 'from_gen': from_replica_generation
84+ })
85+
86 def find_docs_to_return(self, last_known_generation):
87 """Find and further mark documents to return to the sync source.
88
89 This finds the document identifiers for any documents that
90 have been updated since last_known_generation. It excludes
91- documents ids that have already been considered (marked conflicts,
92- or superseded by the sender, etc).
93+ documents ids that have already been considered
94+ (superseded by the sender, etc).
95
96 :return: new_generation - the generation of this database
97 which the caller can consider themselves to be synchronized after
98 processing the returned documents.
99 """
100- self._last_known_generation = last_known_generation # for tests
101+ self._db._last_exchange_log['receive'].update({ # for tests
102+ 'last_known_gen': last_known_generation
103+ })
104 gen, changes = self._db.whats_changed(last_known_generation)
105 changed_doc_ids = set(doc_id for doc_id, _ in changes)
106 self.new_gen = gen
107 seen_ids = self.seen_ids
108 # changed docs that weren't superseded by or converged with
109- # nor conflicted, conflicts are returned independently
110 self.doc_ids_to_return = set(doc_id for doc_id in changed_doc_ids
111 if doc_id not in seen_ids)
112 return gen
113
114- def return_docs_and_record_sync(self,
115- from_replica_uid, from_replica_generation,
116- return_doc_cb):
117+ def return_docs(self, return_doc_cb):
118 """Return the marked documents repeatedly invoking the callback
119- return_doc_cb, record the sync information of from_replica_uid
120- the sync source identifier and the generation until which it
121- sent its documents from_replica_generation.
122+ return_doc_cb.
123
124 The final step of a sync exchange.
125
126- :param from_replica_uid: The source replica's identifier
127- :param from_replica_generation: The db generation for the
128- source replica indicating the tip of data that was sent.
129 :param: return_doc_cb(doc): is a callback
130 used to return the marked documents to the target replica,
131 :return: None
132 """
133 doc_ids_to_return = self.doc_ids_to_return
134- conflict_ids = self.conflict_ids
135- # return docs
136- new_docs = self._db.get_docs(doc_ids_to_return,
137+ # return docs, including conflicts
138+ docs = self._db.get_docs(doc_ids_to_return,
139 check_for_conflicts=False)
140- for doc in new_docs:
141- return_doc_cb(doc)
142- conflicts = self._db.get_docs(conflict_ids, check_for_conflicts=False)
143- for doc in conflicts:
144- return_doc_cb(doc)
145- # record sync point
146- self._db.set_sync_generation(from_replica_uid,
147- from_replica_generation)
148+ for doc in docs:
149+ return_doc_cb(doc)
150 # for tests
151- self._db._last_exchange_log = {
152- 'receive': {'docs': self._incoming_trace,
153- 'from_id': from_replica_uid,
154- 'from_gen': from_replica_generation,
155- 'last_known_gen': self._last_known_generation},
156- 'return': {'new_docs': [(d.doc_id, d.rev) for d in new_docs],
157- 'conf_docs': [(d.doc_id, d.rev) for d in conflicts],
158- 'last_gen': self.new_gen}
159- }
160+ self._db._last_exchange_log['return'] = {
161+ 'docs': [(d.doc_id, d.rev) for d in docs],
162+ 'last_gen': self.new_gen
163+ }
164
165
166 class LocalSyncTarget(u1db.SyncTarget):
167@@ -238,10 +239,11 @@
168 # 1st step: try to insert incoming docs
169 for doc in docs:
170 sync_exch.insert_doc_from_source(doc)
171+ # record progress
172+ sync_exch.record_sync_progress(from_replica_uid,
173+ from_replica_generation)
174 # 2nd step: find changed documents (including conflicts) to return
175 new_gen = sync_exch.find_docs_to_return(last_known_generation)
176 # final step: return docs and record source replica sync point
177- sync_exch.return_docs_and_record_sync(from_replica_uid,
178- from_replica_generation,
179- return_doc_cb)
180+ sync_exch.return_docs(return_doc_cb)
181 return new_gen
182
183=== modified file 'u1db/tests/test_sync.py'
184--- u1db/tests/test_sync.py 2011-12-06 20:44:53 +0000
185+++ u1db/tests/test_sync.py 2011-12-13 09:59:32 +0000
186@@ -115,8 +115,7 @@
187 self.assertEqual(([(doc.doc_id, doc.rev, simple_doc)], 1),
188 (self.other_docs, new_gen))
189 self.assertEqual(self.db._last_exchange_log['return'],
190- {'last_gen': 1, 'conf_docs': [(doc.doc_id, doc.rev)],
191- 'new_docs': []})
192+ {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
193
194 def test_sync_exchange_ignores_convergence(self):
195 doc = self.db.create_doc(simple_doc)
196@@ -140,8 +139,7 @@
197 self.assertEqual(([(doc.doc_id, doc.rev, simple_doc)], 1),
198 (self.other_docs, new_gen))
199 self.assertEqual(self.db._last_exchange_log['return'],
200- {'last_gen': 1, 'new_docs': [(doc.doc_id, doc.rev)],
201- 'conf_docs': []})
202+ {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
203
204 def test_sync_exchange_getting_newer_docs(self):
205 doc = self.db.create_doc(simple_doc)
206@@ -192,8 +190,7 @@
207 self.assertEqual(0, self.db2.get_sync_generation('test1'))
208 self.assertEqual({'receive': {'docs': [], 'from_id': 'test1',
209 'from_gen': 0, 'last_known_gen': 0},
210- 'return': {'new_docs': [], 'conf_docs': [],
211- 'last_gen': 0}},
212+ 'return': {'docs': [], 'last_gen': 0}},
213 self.db2._last_exchange_log)
214
215 def test_sync_puts_changes(self):
216@@ -205,8 +202,7 @@
217 self.assertEqual({'receive': {'docs': [(doc.doc_id, doc.rev)],
218 'from_id': 'test1',
219 'from_gen': 1, 'last_known_gen': 0},
220- 'return': {'new_docs': [], 'conf_docs': [],
221- 'last_gen': 1}},
222+ 'return': {'docs': [], 'last_gen': 1}},
223 self.db2._last_exchange_log)
224
225 def test_sync_pulls_changes(self):
226@@ -218,8 +214,8 @@
227 self.assertEqual(1, self.db2.get_sync_generation('test1'))
228 self.assertEqual({'receive': {'docs': [], 'from_id': 'test1',
229 'from_gen': 0, 'last_known_gen': 0},
230- 'return': {'new_docs': [(doc.doc_id, doc.rev)],
231- 'conf_docs': [], 'last_gen': 1}},
232+ 'return': {'docs': [(doc.doc_id, doc.rev)],
233+ 'last_gen': 1}},
234 self.db2._last_exchange_log)
235 self.assertEqual([doc],
236 self.db1.get_from_index('test-idx', [('value',)]))
237@@ -242,8 +238,8 @@
238 self.assertEqual(0, self.sync(self.db1, self.db2))
239 self.assertEqual({'receive': {'docs': [], 'from_id': 'test1',
240 'from_gen': 0, 'last_known_gen': 0},
241- 'return': {'new_docs': [(doc.doc_id, doc.rev)],
242- 'conf_docs': [], 'last_gen': 1}},
243+ 'return': {'docs': [(doc.doc_id, doc.rev)],
244+ 'last_gen': 1}},
245 self.db2._last_exchange_log)
246 self.assertEqual(1, self.db1.get_sync_generation('test2'))
247 # c2 should not have gotten a '_record_sync_info' call, because the
248@@ -260,8 +256,7 @@
249 self.assertEqual({'receive': {'docs': [(doc.doc_id, doc.rev)],
250 'from_id': 'test1',
251 'from_gen': 1, 'last_known_gen': 0},
252- 'return': {'new_docs': [],
253- 'conf_docs': [], 'last_gen': 1}},
254+ 'return': {'docs': [], 'last_gen': 1}},
255 self.db2._last_exchange_log)
256
257 def test_sync_ignores_superseded(self):
258@@ -278,8 +273,8 @@
259 self.assertEqual({'receive': {'docs': [(doc.doc_id, doc_rev1)],
260 'from_id': 'test2',
261 'from_gen': 1, 'last_known_gen': 0},
262- 'return': {'new_docs': [(doc.doc_id, doc_rev2)],
263- 'conf_docs': [], 'last_gen': 2}},
264+ 'return': {'docs': [(doc.doc_id, doc_rev2)],
265+ 'last_gen': 2}},
266 self.db1._last_exchange_log)
267 self.assertGetDoc(self.db1, doc.doc_id, doc_rev2, new_content, False)
268
269@@ -296,8 +291,7 @@
270 self.assertEqual({'receive': {'docs': [(doc_id, doc1_rev)],
271 'from_id': 'test1',
272 'from_gen': 1, 'last_known_gen': 0},
273- 'return': {'new_docs': [],
274- 'conf_docs': [(doc_id, doc2_rev)],
275+ 'return': {'docs': [(doc_id, doc2_rev)],
276 'last_gen': 1}},
277 self.db2._last_exchange_log)
278 self.assertEqual([doc_id, doc_id], self.db1._get_transaction_log())
279@@ -322,8 +316,7 @@
280 self.assertEqual({'receive': {'docs': [(doc_id, doc1.rev)],
281 'from_id': 'test1',
282 'from_gen': 2, 'last_known_gen': 1},
283- 'return': {'new_docs': [],
284- 'conf_docs': [(doc_id, doc2.rev)],
285+ 'return': {'docs': [(doc_id, doc2.rev)],
286 'last_gen': 2}},
287 self.db2._last_exchange_log)
288 self.assertEqual([doc_id, doc_id, doc_id],
289@@ -375,8 +368,7 @@
290 self.assertEqual({'receive': {'docs': [(doc_id, deleted_rev)],
291 'from_id': 'test1',
292 'from_gen': 2, 'last_known_gen': 1},
293- 'return': {'new_docs': [], 'conf_docs': [],
294- 'last_gen': 2}},
295+ 'return': {'docs': [], 'last_gen': 2}},
296 self.db2._last_exchange_log)
297 self.assertGetDoc(self.db1, doc_id, deleted_rev, None, False)
298 self.assertGetDoc(self.db2, doc_id, deleted_rev, None, False)
299@@ -386,8 +378,7 @@
300 self.assertEqual({'receive': {'docs': [(doc_id, deleted_rev)],
301 'from_id': 'test2',
302 'from_gen': 2, 'last_known_gen': 0},
303- 'return': {'new_docs': [], 'conf_docs': [],
304- 'last_gen': 2}},
305+ 'return': {'docs': [], 'last_gen': 2}},
306 self.db3._last_exchange_log)
307 self.assertGetDoc(self.db3, doc_id, deleted_rev, None, False)
308

Subscribers

People subscribed via source and target branches