Merge lp:~pedronis/u1db/sync-simple-reorg-n-hide-details into lp:u1db
- sync-simple-reorg-n-hide-details
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | John A Meinel |
Approved revision: | 151 |
Merged at revision: | 147 |
Proposed branch: | lp:~pedronis/u1db/sync-simple-reorg-n-hide-details |
Merge into: | lp:u1db |
Diff against target: |
307 lines (+64/-72) 3 files modified
u1db/remote/http_app.py (+3/-4) u1db/sync.py (+46/-44) u1db/tests/test_sync.py (+15/-24) |
To merge this branch: | bzr merge lp:~pedronis/u1db/sync-simple-reorg-n-hide-details |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu One hackers | Pending | ||
Review via email: mp+85453@code.launchpad.net |
Commit message
Description of the change
some things I think are necessary for incremental sync:
- don't send/treat conflicts separately if the plan is to send back documents sorted somehow by generation of changes
- make recording the source last known generation its own helper if we want to call it repeatedly for the generation of incoming docs
To post a comment you must log in.
Revision history for this message
John A Meinel (jameinel) wrote : | # |
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'u1db/remote/http_app.py' |
2 | --- u1db/remote/http_app.py 2011-12-06 20:32:48 +0000 |
3 | +++ u1db/remote/http_app.py 2011-12-13 09:59:32 +0000 |
4 | @@ -294,16 +294,15 @@ |
5 | self.sync_exch.insert_doc_from_source(doc) |
6 | |
7 | def post_end(self): |
8 | + self.sync_exch.record_sync_progress(self.from_replica_uid, |
9 | + self.from_replica_generation) |
10 | def send_doc(doc): |
11 | entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.content) |
12 | self.responder.stream_entry(entry) |
13 | new_gen = self.sync_exch.find_docs_to_return(self.last_known_generation) |
14 | self.responder.content_type = 'application/x-u1db-multi-json' |
15 | self.responder.start_response(200, {"new_generation": new_gen}) |
16 | - new_gen = self.sync_exch.return_docs_and_record_sync( |
17 | - self.from_replica_uid, |
18 | - self.from_replica_generation, |
19 | - send_doc) |
20 | + new_gen = self.sync_exch.return_docs(send_doc) |
21 | self.responder.finish_response() |
22 | |
23 | |
24 | |
25 | === modified file 'u1db/sync.py' |
26 | --- u1db/sync.py 2011-12-12 19:40:50 +0000 |
27 | +++ u1db/sync.py 2011-12-13 09:59:32 +0000 |
28 | @@ -120,19 +120,21 @@ |
29 | |
30 | def __init__(self, db): |
31 | self._db = db |
32 | - self.seen_ids = set() # incoming ids not superseded (or conflicted) |
33 | + self.seen_ids = set() # incoming ids not superseded |
34 | self.doc_ids_to_return = None |
35 | - self.conflict_ids = set() |
36 | self.new_gen = None |
37 | # for tests |
38 | self._incoming_trace = [] |
39 | - self._last_known_generation = None |
40 | + self._db._last_exchange_log = { |
41 | + 'receive': {'docs': self._incoming_trace}, |
42 | + 'return': None |
43 | + } |
44 | |
45 | def insert_doc_from_source(self, doc): |
46 | """Try to insert synced document from source. |
47 | |
48 | - Conflicting documents are not inserted but the current revision |
49 | - marked to be sent over to the sync source. |
50 | + Conflicting documents are not inserted but will be sent over |
51 | + to the sync source. |
52 | |
53 | The 1st step of a sync exchange is to call this repeatedly to |
54 | try insert all incoming documents from the source. |
55 | @@ -150,76 +152,75 @@ |
56 | # we have something newer that we will return |
57 | pass |
58 | else: |
59 | - # conflict, returned independently |
60 | + # conflict that we will returne |
61 | assert state == 'conflicted' |
62 | - self.seen_ids.add(doc.doc_id) |
63 | - self.conflict_ids.add(doc.doc_id) |
64 | # for tests |
65 | self._incoming_trace.append((doc.doc_id, doc.rev)) |
66 | |
67 | + def record_sync_progress(self, from_replica_uid, from_replica_generation): |
68 | + """Record the sync information of from_replica_uid |
69 | + the sync source identifier and the generation until which it |
70 | + sent its documents from_replica_generation. |
71 | + |
72 | + :param from_replica_uid: The source replica's identifier |
73 | + :param from_replica_generation: The db generation for the |
74 | + source replica indicating the tip of data that was sent. |
75 | + :return: None |
76 | + """ |
77 | + # record sync point |
78 | + self._db.set_sync_generation(from_replica_uid, |
79 | + from_replica_generation) |
80 | + # for tests |
81 | + self._db._last_exchange_log['receive'].update({ |
82 | + 'from_id': from_replica_uid, |
83 | + 'from_gen': from_replica_generation |
84 | + }) |
85 | + |
86 | def find_docs_to_return(self, last_known_generation): |
87 | """Find and further mark documents to return to the sync source. |
88 | |
89 | This finds the document identifiers for any documents that |
90 | have been updated since last_known_generation. It excludes |
91 | - documents ids that have already been considered (marked conflicts, |
92 | - or superseded by the sender, etc). |
93 | + documents ids that have already been considered |
94 | + (superseded by the sender, etc). |
95 | |
96 | :return: new_generation - the generation of this database |
97 | which the caller can consider themselves to be synchronized after |
98 | processing the returned documents. |
99 | """ |
100 | - self._last_known_generation = last_known_generation # for tests |
101 | + self._db._last_exchange_log['receive'].update({ # for tests |
102 | + 'last_known_gen': last_known_generation |
103 | + }) |
104 | gen, changes = self._db.whats_changed(last_known_generation) |
105 | changed_doc_ids = set(doc_id for doc_id, _ in changes) |
106 | self.new_gen = gen |
107 | seen_ids = self.seen_ids |
108 | # changed docs that weren't superseded by or converged with |
109 | - # nor conflicted, conflicts are returned independently |
110 | self.doc_ids_to_return = set(doc_id for doc_id in changed_doc_ids |
111 | if doc_id not in seen_ids) |
112 | return gen |
113 | |
114 | - def return_docs_and_record_sync(self, |
115 | - from_replica_uid, from_replica_generation, |
116 | - return_doc_cb): |
117 | + def return_docs(self, return_doc_cb): |
118 | """Return the marked documents repeatedly invoking the callback |
119 | - return_doc_cb, record the sync information of from_replica_uid |
120 | - the sync source identifier and the generation until which it |
121 | - sent its documents from_replica_generation. |
122 | + return_doc_cb. |
123 | |
124 | The final step of a sync exchange. |
125 | |
126 | - :param from_replica_uid: The source replica's identifier |
127 | - :param from_replica_generation: The db generation for the |
128 | - source replica indicating the tip of data that was sent. |
129 | :param: return_doc_cb(doc): is a callback |
130 | used to return the marked documents to the target replica, |
131 | :return: None |
132 | """ |
133 | doc_ids_to_return = self.doc_ids_to_return |
134 | - conflict_ids = self.conflict_ids |
135 | - # return docs |
136 | - new_docs = self._db.get_docs(doc_ids_to_return, |
137 | + # return docs, including conflicts |
138 | + docs = self._db.get_docs(doc_ids_to_return, |
139 | check_for_conflicts=False) |
140 | - for doc in new_docs: |
141 | - return_doc_cb(doc) |
142 | - conflicts = self._db.get_docs(conflict_ids, check_for_conflicts=False) |
143 | - for doc in conflicts: |
144 | - return_doc_cb(doc) |
145 | - # record sync point |
146 | - self._db.set_sync_generation(from_replica_uid, |
147 | - from_replica_generation) |
148 | + for doc in docs: |
149 | + return_doc_cb(doc) |
150 | # for tests |
151 | - self._db._last_exchange_log = { |
152 | - 'receive': {'docs': self._incoming_trace, |
153 | - 'from_id': from_replica_uid, |
154 | - 'from_gen': from_replica_generation, |
155 | - 'last_known_gen': self._last_known_generation}, |
156 | - 'return': {'new_docs': [(d.doc_id, d.rev) for d in new_docs], |
157 | - 'conf_docs': [(d.doc_id, d.rev) for d in conflicts], |
158 | - 'last_gen': self.new_gen} |
159 | - } |
160 | + self._db._last_exchange_log['return'] = { |
161 | + 'docs': [(d.doc_id, d.rev) for d in docs], |
162 | + 'last_gen': self.new_gen |
163 | + } |
164 | |
165 | |
166 | class LocalSyncTarget(u1db.SyncTarget): |
167 | @@ -238,10 +239,11 @@ |
168 | # 1st step: try to insert incoming docs |
169 | for doc in docs: |
170 | sync_exch.insert_doc_from_source(doc) |
171 | + # record progress |
172 | + sync_exch.record_sync_progress(from_replica_uid, |
173 | + from_replica_generation) |
174 | # 2nd step: find changed documents (including conflicts) to return |
175 | new_gen = sync_exch.find_docs_to_return(last_known_generation) |
176 | # final step: return docs and record source replica sync point |
177 | - sync_exch.return_docs_and_record_sync(from_replica_uid, |
178 | - from_replica_generation, |
179 | - return_doc_cb) |
180 | + sync_exch.return_docs(return_doc_cb) |
181 | return new_gen |
182 | |
183 | === modified file 'u1db/tests/test_sync.py' |
184 | --- u1db/tests/test_sync.py 2011-12-06 20:44:53 +0000 |
185 | +++ u1db/tests/test_sync.py 2011-12-13 09:59:32 +0000 |
186 | @@ -115,8 +115,7 @@ |
187 | self.assertEqual(([(doc.doc_id, doc.rev, simple_doc)], 1), |
188 | (self.other_docs, new_gen)) |
189 | self.assertEqual(self.db._last_exchange_log['return'], |
190 | - {'last_gen': 1, 'conf_docs': [(doc.doc_id, doc.rev)], |
191 | - 'new_docs': []}) |
192 | + {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) |
193 | |
194 | def test_sync_exchange_ignores_convergence(self): |
195 | doc = self.db.create_doc(simple_doc) |
196 | @@ -140,8 +139,7 @@ |
197 | self.assertEqual(([(doc.doc_id, doc.rev, simple_doc)], 1), |
198 | (self.other_docs, new_gen)) |
199 | self.assertEqual(self.db._last_exchange_log['return'], |
200 | - {'last_gen': 1, 'new_docs': [(doc.doc_id, doc.rev)], |
201 | - 'conf_docs': []}) |
202 | + {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) |
203 | |
204 | def test_sync_exchange_getting_newer_docs(self): |
205 | doc = self.db.create_doc(simple_doc) |
206 | @@ -192,8 +190,7 @@ |
207 | self.assertEqual(0, self.db2.get_sync_generation('test1')) |
208 | self.assertEqual({'receive': {'docs': [], 'from_id': 'test1', |
209 | 'from_gen': 0, 'last_known_gen': 0}, |
210 | - 'return': {'new_docs': [], 'conf_docs': [], |
211 | - 'last_gen': 0}}, |
212 | + 'return': {'docs': [], 'last_gen': 0}}, |
213 | self.db2._last_exchange_log) |
214 | |
215 | def test_sync_puts_changes(self): |
216 | @@ -205,8 +202,7 @@ |
217 | self.assertEqual({'receive': {'docs': [(doc.doc_id, doc.rev)], |
218 | 'from_id': 'test1', |
219 | 'from_gen': 1, 'last_known_gen': 0}, |
220 | - 'return': {'new_docs': [], 'conf_docs': [], |
221 | - 'last_gen': 1}}, |
222 | + 'return': {'docs': [], 'last_gen': 1}}, |
223 | self.db2._last_exchange_log) |
224 | |
225 | def test_sync_pulls_changes(self): |
226 | @@ -218,8 +214,8 @@ |
227 | self.assertEqual(1, self.db2.get_sync_generation('test1')) |
228 | self.assertEqual({'receive': {'docs': [], 'from_id': 'test1', |
229 | 'from_gen': 0, 'last_known_gen': 0}, |
230 | - 'return': {'new_docs': [(doc.doc_id, doc.rev)], |
231 | - 'conf_docs': [], 'last_gen': 1}}, |
232 | + 'return': {'docs': [(doc.doc_id, doc.rev)], |
233 | + 'last_gen': 1}}, |
234 | self.db2._last_exchange_log) |
235 | self.assertEqual([doc], |
236 | self.db1.get_from_index('test-idx', [('value',)])) |
237 | @@ -242,8 +238,8 @@ |
238 | self.assertEqual(0, self.sync(self.db1, self.db2)) |
239 | self.assertEqual({'receive': {'docs': [], 'from_id': 'test1', |
240 | 'from_gen': 0, 'last_known_gen': 0}, |
241 | - 'return': {'new_docs': [(doc.doc_id, doc.rev)], |
242 | - 'conf_docs': [], 'last_gen': 1}}, |
243 | + 'return': {'docs': [(doc.doc_id, doc.rev)], |
244 | + 'last_gen': 1}}, |
245 | self.db2._last_exchange_log) |
246 | self.assertEqual(1, self.db1.get_sync_generation('test2')) |
247 | # c2 should not have gotten a '_record_sync_info' call, because the |
248 | @@ -260,8 +256,7 @@ |
249 | self.assertEqual({'receive': {'docs': [(doc.doc_id, doc.rev)], |
250 | 'from_id': 'test1', |
251 | 'from_gen': 1, 'last_known_gen': 0}, |
252 | - 'return': {'new_docs': [], |
253 | - 'conf_docs': [], 'last_gen': 1}}, |
254 | + 'return': {'docs': [], 'last_gen': 1}}, |
255 | self.db2._last_exchange_log) |
256 | |
257 | def test_sync_ignores_superseded(self): |
258 | @@ -278,8 +273,8 @@ |
259 | self.assertEqual({'receive': {'docs': [(doc.doc_id, doc_rev1)], |
260 | 'from_id': 'test2', |
261 | 'from_gen': 1, 'last_known_gen': 0}, |
262 | - 'return': {'new_docs': [(doc.doc_id, doc_rev2)], |
263 | - 'conf_docs': [], 'last_gen': 2}}, |
264 | + 'return': {'docs': [(doc.doc_id, doc_rev2)], |
265 | + 'last_gen': 2}}, |
266 | self.db1._last_exchange_log) |
267 | self.assertGetDoc(self.db1, doc.doc_id, doc_rev2, new_content, False) |
268 | |
269 | @@ -296,8 +291,7 @@ |
270 | self.assertEqual({'receive': {'docs': [(doc_id, doc1_rev)], |
271 | 'from_id': 'test1', |
272 | 'from_gen': 1, 'last_known_gen': 0}, |
273 | - 'return': {'new_docs': [], |
274 | - 'conf_docs': [(doc_id, doc2_rev)], |
275 | + 'return': {'docs': [(doc_id, doc2_rev)], |
276 | 'last_gen': 1}}, |
277 | self.db2._last_exchange_log) |
278 | self.assertEqual([doc_id, doc_id], self.db1._get_transaction_log()) |
279 | @@ -322,8 +316,7 @@ |
280 | self.assertEqual({'receive': {'docs': [(doc_id, doc1.rev)], |
281 | 'from_id': 'test1', |
282 | 'from_gen': 2, 'last_known_gen': 1}, |
283 | - 'return': {'new_docs': [], |
284 | - 'conf_docs': [(doc_id, doc2.rev)], |
285 | + 'return': {'docs': [(doc_id, doc2.rev)], |
286 | 'last_gen': 2}}, |
287 | self.db2._last_exchange_log) |
288 | self.assertEqual([doc_id, doc_id, doc_id], |
289 | @@ -375,8 +368,7 @@ |
290 | self.assertEqual({'receive': {'docs': [(doc_id, deleted_rev)], |
291 | 'from_id': 'test1', |
292 | 'from_gen': 2, 'last_known_gen': 1}, |
293 | - 'return': {'new_docs': [], 'conf_docs': [], |
294 | - 'last_gen': 2}}, |
295 | + 'return': {'docs': [], 'last_gen': 2}}, |
296 | self.db2._last_exchange_log) |
297 | self.assertGetDoc(self.db1, doc_id, deleted_rev, None, False) |
298 | self.assertGetDoc(self.db2, doc_id, deleted_rev, None, False) |
299 | @@ -386,8 +378,7 @@ |
300 | self.assertEqual({'receive': {'docs': [(doc_id, deleted_rev)], |
301 | 'from_id': 'test2', |
302 | 'from_gen': 2, 'last_known_gen': 0}, |
303 | - 'return': {'new_docs': [], 'conf_docs': [], |
304 | - 'last_gen': 2}}, |
305 | + 'return': {'docs': [], 'last_gen': 2}}, |
306 | self.db3._last_exchange_log) |
307 | self.assertGetDoc(self.db3, doc_id, deleted_rev, None, False) |
308 |
I like this in general. I'm a little concerned about not tracking conflicts
separately. It might just be paranoia, though. So I'm trying to think
through it.
A-1 doc-x created
B-1 doc-x sync
A-2 x updates rev-A:2
B-2 x updated rev-A:1¦B:1 (maybe B:2 for gen)
A starts sync, sends doc-x A:2 gen 2
B skips the doc, does(n't?) record gen of A
B tries to return x@A:1¦B:1 but loses connection
A-3 doc-y
A starts sync, B wants everything since A-2
A sends y, and last B gen of 1
B will naturally return x@A:1¦B:1
So all of that seems fine. Can we poke holes in it to make it fail? It does
seem that anything which should be in conflict must have been updated since
the last successful synchronization.
So it seems ok to me, but we'll want to think carefully about edge cases.
John
=:->
On Dec 13, 2011 11:02 AM, "Samuele Pedroni" <email address hidden>
wrote:
> Samuele Pedroni has proposed merging /code.launchpad .net/~pedronis/ u1db/sync- simple- reorg-n- hide-details/ +merge/ 85453 /code.launchpad .net/~pedronis/ u1db/sync- simple- reorg-n- hide-details/ +merge/ 85453 http_app. py' http_app. py 2011-12-06 20:32:48 +0000 http_app. py 2011-12-13 09:59:32 +0000 exch.insert_ doc_from_ source( doc) exch.record_ sync_progress( self.from_ replica_ uid, replica_ generation) doc.content) stream_ entry(entry) exch.find_ docs_to_ return( self.last_ known_generatio n) content_ type = 'application/ x-u1db- multi-json' start_response( 200, {"new_generation": new_gen}) exch.return_ docs_and_ record_ sync( replica_ uid, replica_ generation, exch.return_ docs(send_ doc) finish_ response( )
> lp:~pedronis/u1db/sync-simple-reorg-n-hide-details into lp:u1db.
>
> Requested reviews:
> Ubuntu One hackers (ubuntuone-hackers)
>
> For more details, see:
>
> https:/
>
> some things I think are necessary for incremental sync:
>
> - don't send/treat conflicts separately if the plan is to send back
> documents sorted somehow by generation of changes
>
> - make recording the source last known generation its own helper if we
> want to call it repeatedly for the generation of incoming docs
> --
>
> https:/
> Your team Ubuntu One hackers is requested to review the proposed merge of
> lp:~pedronis/u1db/sync-simple-reorg-n-hide-details into lp:u1db.
>
> === modified file 'u1db/remote/
> --- u1db/remote/
> +++ u1db/remote/
> @@ -294,16 +294,15 @@
> self.sync_
>
> def post_end(self):
> + self.sync_
> + self.from_
> def send_doc(doc):
> entry = dict(id=doc.doc_id, rev=doc.rev, content=
> self.responder.
> new_gen =
> self.sync_
> self.responder.
> self.responder.
> - new_gen = self.sync_
> - self.from_
> -
> self.from_
> - send_doc)
> + new_gen = self.sync_
> self.responder.
>
>
>
> === modified file 'u1db/sync.py'
> --- u1db/sync.py 2011-12-12 19:40:50 +0000
> +++ u1db/sync.py 2011-12-13 09:59:32 +0000
> @@ -120,19 +120,21 @@
>
> def __init__(self, db):
> self._db = db
> - self.seen_ids = set() # incoming ids not superseded (or
> conflicted)
> + self.seen_id...