Merge lp:~jameinel/u1db/combined-put-and-gen into lp:u1db

Proposed by John A Meinel
Status: Merged
Merged at revision: 154
Proposed branch: lp:~jameinel/u1db/combined-put-and-gen
Merge into: lp:u1db
Prerequisite: lp:~jameinel/u1db/put_doc_save_conflict
Diff against target: 289 lines (+68/-28)
10 files modified
u1db/__init__.py (+6/-1)
u1db/backends/__init__.py (+8/-5)
u1db/backends/inmemory.py (+3/-0)
u1db/backends/sqlite_backend.py (+7/-3)
u1db/remote/http_app.py (+1/-2)
u1db/sync.py (+9/-9)
u1db/tests/test_backends.py (+24/-0)
u1db/tests/test_http_app.py (+2/-2)
u1db/tests/test_remote_sync_target.py (+4/-2)
u1db/tests/test_sync.py (+4/-4)
To merge this branch: bzr merge lp:~jameinel/u1db/combined-put-and-gen
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+86030@code.launchpad.net

This proposal supersedes a proposal from 2011-12-14.

Description of the change

This updates Database.put_doc_if_newer to allow passing in the other replica_uid and replica_generation. It also updates SyncExchange.insert_doc_from_source to take those arguments and pass them on. And then it updates the HTTP sync code to just pass the arguments rather than calling set_sync_generation directly.

The main benefit is that we have 1 transaction per document, rather than 2.

It should be pretty easy to update insert_doc_from_target as well. It doesn't look like we need to update force_doc_conflict after all, because it is always called *after* put_doc_if_newer, so we'll have already handled the generation stuff.

Updated to merge the removal of force_doc_sync_conflict and clean up the api a bit.

Also updated to include target => source updates.

One thing I'm wondering, we currently have the thunk take_doc in order to pass the other_replica_uid to _insert_doc_from_target. We didn't need to do that for the Source => Target side, because we could use Source._replica_uid. However we don't have sync_target._replica_uid to do the same for the other direction.

I wonder if we want to change the callback api, instead of being (doc, gen) to be (doc, replica_uid, replica_gen).
Thoughts?

To post a comment you must log in.
Revision history for this message
Samuele Pedroni (pedronis) wrote :

looks good

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'u1db/__init__.py'
2--- u1db/__init__.py 2011-12-16 11:55:07 +0000
3+++ u1db/__init__.py 2011-12-16 12:23:24 +0000
4@@ -102,7 +102,8 @@
5 """
6 raise NotImplementedError(self.put_doc)
7
8- def put_doc_if_newer(self, doc, save_conflict):
9+ def put_doc_if_newer(self, doc, save_conflict, replica_uid=None,
10+ replica_gen=None):
11 """Insert/update document into the database with a given revision.
12
13 This api is used during synchronization operations.
14@@ -123,6 +124,10 @@
15 :param doc: A Document object
16 :param save_conflict: If this document is a conflict, do you want to
17 save it as a conflict, or just ignore it.
18+ :param replica_uid: A unique replica identifier.
19+ :param replica_gen: The generation of the replica corresponding to the
20+ this document. The replica arguments are optional, but are used
21+ during synchronization.
22 :return: state - If we don't have doc_id already, or if doc_rev
23 supersedes the existing document revision, then the content will
24 be inserted, and state is 'inserted'.
25
26=== modified file 'u1db/backends/__init__.py'
27--- u1db/backends/__init__.py 2011-12-16 11:55:07 +0000
28+++ u1db/backends/__init__.py 2011-12-16 12:23:24 +0000
29@@ -85,7 +85,8 @@
30 result.append(doc)
31 return result
32
33- def put_doc_if_newer(self, doc, save_conflict):
34+ def put_doc_if_newer(self, doc, save_conflict, replica_uid=None,
35+ replica_gen=None):
36 cur_doc = self._get_doc(doc.doc_id)
37 doc_vcr = VectorClockRev(doc.rev)
38 if cur_doc is None:
39@@ -95,20 +96,22 @@
40 if doc_vcr.is_newer(cur_vcr):
41 self._put_and_update_indexes(cur_doc, doc)
42 self._prune_conflicts(doc, doc_vcr)
43- return 'inserted'
44+ state = 'inserted'
45 elif doc.rev == cur_doc.rev:
46 # magical convergence
47- return 'converged'
48+ state = 'converged'
49 elif cur_vcr.is_newer(doc_vcr):
50 # Don't add this to seen_ids, because we have something newer,
51 # so we should send it back, and we should not generate a
52 # conflict
53- return 'superseded'
54+ state = 'superseded'
55 else:
56 state = 'conflicted'
57 if save_conflict:
58 self._force_doc_sync_conflict(doc)
59- return 'conflicted'
60+ if replica_uid is not None and replica_gen is not None:
61+ self._set_sync_generation(replica_uid, replica_gen)
62+ return state
63
64 def _ensure_maximal_rev(self, cur_rev, extra_revs):
65 vcr = VectorClockRev(cur_rev)
66
67=== modified file 'u1db/backends/inmemory.py'
68--- u1db/backends/inmemory.py 2011-12-16 11:19:34 +0000
69+++ u1db/backends/inmemory.py 2011-12-16 12:23:24 +0000
70@@ -47,6 +47,9 @@
71 return self._other_generations.get(other_replica_uid, 0)
72
73 def set_sync_generation(self, other_replica_uid, other_generation):
74+ self._set_sync_generation(other_replica_uid, other_generation)
75+
76+ def _set_sync_generation(self, other_replica_uid, other_generation):
77 # TODO: to handle race conditions, we may want to check if the current
78 # value is greater than this new value.
79 self._other_generations[other_replica_uid] = other_generation
80
81=== modified file 'u1db/backends/sqlite_backend.py'
82--- u1db/backends/sqlite_backend.py 2011-12-16 11:55:07 +0000
83+++ u1db/backends/sqlite_backend.py 2011-12-16 12:23:24 +0000
84@@ -420,15 +420,19 @@
85
86 def set_sync_generation(self, other_replica_uid, other_generation):
87 with self._db_handle:
88+ self._set_sync_generation(other_replica_uid, other_generation)
89+
90+ def _set_sync_generation(self, other_replica_uid, other_generation):
91 c = self._db_handle.cursor()
92- my_gen = self._get_generation()
93 c.execute("INSERT OR REPLACE INTO sync_log VALUES (?, ?)",
94 (other_replica_uid, other_generation))
95
96- def put_doc_if_newer(self, doc, save_conflict):
97+ def put_doc_if_newer(self, doc, save_conflict, replica_uid=None,
98+ replica_gen=None):
99 with self._db_handle:
100 return super(SQLiteDatabase, self).put_doc_if_newer(doc,
101- save_conflict=save_conflict)
102+ save_conflict=save_conflict,
103+ replica_uid=replica_uid, replica_gen=replica_gen)
104
105 def _add_conflict(self, c, doc_id, my_doc_rev, my_content):
106 c.execute("INSERT INTO conflicts VALUES (?, ?, ?)",
107
108=== modified file 'u1db/remote/http_app.py'
109--- u1db/remote/http_app.py 2011-12-14 15:37:05 +0000
110+++ u1db/remote/http_app.py 2011-12-16 12:23:24 +0000
111@@ -290,8 +290,7 @@
112 @http_method(content_as_args=True)
113 def post_stream_entry(self, id, rev, content, gen):
114 doc = Document(id, rev, content)
115- self.sync_exch.insert_doc_from_source(doc)
116- self.sync_exch.record_sync_progress(self.from_replica_uid, gen)
117+ self.sync_exch.insert_doc_from_source(doc, self.from_replica_uid, gen)
118
119 def post_end(self):
120 def send_doc(doc, gen):
121
122=== modified file 'u1db/sync.py'
123--- u1db/sync.py 2011-12-16 11:55:07 +0000
124+++ u1db/sync.py 2011-12-16 12:23:24 +0000
125@@ -36,7 +36,7 @@
126 self.sync_target = sync_target
127 self.num_inserted = 0
128
129- def _insert_doc_from_target(self, doc):
130+ def _insert_doc_from_target(self, doc, replica_uid, replica_gen):
131 """Try to insert synced document from target.
132
133 Implements TAKE OTHER semantics: any document from the target
134@@ -48,7 +48,8 @@
135 """
136 # Increases self.num_inserted depending whether the document
137 # was effectively inserted.
138- state = self.source.put_doc_if_newer(doc, save_conflict=True)
139+ state = self.source.put_doc_if_newer(doc, save_conflict=True,
140+ replica_uid=replica_uid, replica_gen=replica_gen)
141 if state == 'inserted':
142 self.num_inserted += 1
143 elif state == 'converged':
144@@ -103,9 +104,7 @@
145 # exchange documents and try to insert the returned ones with
146 # the target, return target synced-up-to gen
147 def take_doc(doc, gen):
148- self._insert_doc_from_target(doc)
149- # record target synced-up-to generation
150- self.source.set_sync_generation(other_replica_uid, gen)
151+ return self._insert_doc_from_target(doc, other_replica_uid, gen)
152
153 new_gen = sync_target.sync_exchange(docs_by_generation,
154 self.source._replica_uid, other_last_known_gen,
155@@ -134,7 +133,7 @@
156 'return': None
157 }
158
159- def insert_doc_from_source(self, doc):
160+ def insert_doc_from_source(self, doc, other_uid, other_gen):
161 """Try to insert synced document from source.
162
163 Conflicting documents are not inserted but will be sent over
164@@ -146,7 +145,8 @@
165 :param doc: A Document object.
166 :return: None
167 """
168- state = self._db.put_doc_if_newer(doc, save_conflict=False)
169+ state = self._db.put_doc_if_newer(doc, save_conflict=False,
170+ replica_uid=other_uid, replica_gen=other_gen)
171 if state == 'inserted':
172 self.seen_ids.add(doc.doc_id)
173 elif state == 'converged':
174@@ -241,8 +241,8 @@
175 last_known_generation, return_doc_cb):
176 sync_exch = self.get_sync_exchange()
177 # 1st step: try to insert incoming docs
178- for doc, _ in docs_by_generations:
179- sync_exch.insert_doc_from_source(doc)
180+ for doc, doc_gen in docs_by_generations:
181+ sync_exch.insert_doc_from_source(doc, from_replica_uid, doc_gen)
182 # record progress
183 if docs_by_generations:
184 latest_gen = docs_by_generations[-1][1]
185
186=== modified file 'u1db/tests/test_backends.py'
187--- u1db/tests/test_backends.py 2011-12-16 11:55:07 +0000
188+++ u1db/tests/test_backends.py 2011-12-16 12:23:24 +0000
189@@ -315,6 +315,30 @@
190 # The database wasn't altered
191 self.assertGetDoc(self.db, doc1.doc_id, doc1.rev, simple_doc, False)
192
193+ def test_put_doc_if_newer_replica_uid(self):
194+ doc1 = self.db.create_doc(simple_doc)
195+ self.db.set_sync_generation('other', 1)
196+ doc2 = Document(doc1.doc_id, doc1.rev + '|other:1', nested_doc)
197+ self.assertEqual('inserted',
198+ self.db.put_doc_if_newer(doc2, save_conflict=True,
199+ replica_uid='other', replica_gen=2))
200+ self.assertEqual(2, self.db.get_sync_generation('other'))
201+ doc2.rev = doc1.rev
202+ self.assertEqual('superseded',
203+ self.db.put_doc_if_newer(doc2, save_conflict=True,
204+ replica_uid='other', replica_gen=3))
205+ self.assertEqual(3, self.db.get_sync_generation('other'))
206+ doc2.rev = doc1.rev + '|third:3'
207+ self.assertEqual('conflicted',
208+ self.db.put_doc_if_newer(doc2, save_conflict=True,
209+ replica_uid='other', replica_gen=4))
210+ self.assertEqual(4, self.db.get_sync_generation('other'))
211+ doc2.rev = doc1.rev + '|fourth:1'
212+ self.assertEqual('conflicted',
213+ self.db.put_doc_if_newer(doc2, save_conflict=False,
214+ replica_uid='other', replica_gen=5))
215+ self.assertEqual(5, self.db.get_sync_generation('other'))
216+
217 def test_put_doc_if_newer_save_conflicted(self):
218 doc1 = self.db.create_doc(simple_doc)
219 # Document is inserted as a conflict
220
221=== modified file 'u1db/tests/test_http_app.py'
222--- u1db/tests/test_http_app.py 2011-12-15 19:43:40 +0000
223+++ u1db/tests/test_http_app.py 2011-12-16 12:23:24 +0000
224@@ -562,7 +562,7 @@
225 }
226
227 gens = []
228- _set_sync_generation = self.db0.set_sync_generation
229+ _set_sync_generation = self.db0._set_sync_generation
230 def set_sync_generation_witness(other_uid, other_gen):
231 gens.append((other_uid, other_gen))
232 _set_sync_generation(other_uid, other_gen)
233@@ -570,7 +570,7 @@
234 entries[other_gen]['rev'],
235 entries[other_gen]['content'], False)
236
237- self.patch(self.db0, 'set_sync_generation',
238+ self.patch(self.db0, '_set_sync_generation',
239 set_sync_generation_witness)
240
241 args = dict(last_known_generation=0)
242
243=== modified file 'u1db/tests/test_remote_sync_target.py'
244--- u1db/tests/test_remote_sync_target.py 2011-12-16 11:55:07 +0000
245+++ u1db/tests/test_remote_sync_target.py 2011-12-16 12:23:24 +0000
246@@ -109,10 +109,12 @@
247 db = self.request_state._create_database('test')
248 _put_doc_if_newer = db.put_doc_if_newer
249 trigger_ids = ['doc-here2']
250- def bomb_put_doc_if_newer(doc, save_conflict):
251+ def bomb_put_doc_if_newer(doc, save_conflict,
252+ replica_uid=None, replica_gen=None):
253 if doc.doc_id in trigger_ids:
254 raise Exception
255- return _put_doc_if_newer(doc, save_conflict)
256+ return _put_doc_if_newer(doc, save_conflict=save_conflict,
257+ replica_uid=replica_uid, replica_gen=replica_gen)
258 self.patch(db, 'put_doc_if_newer', bomb_put_doc_if_newer)
259 remote_target = self.getSyncTarget('test')
260 other_changes = []
261
262=== modified file 'u1db/tests/test_sync.py'
263--- u1db/tests/test_sync.py 2011-12-15 19:41:40 +0000
264+++ u1db/tests/test_sync.py 2011-12-16 12:23:24 +0000
265@@ -523,20 +523,20 @@
266 self.assertEqual(2, len(self.db2._get_transaction_log()))
267 progress1 = []
268 progress2 = []
269- _set_sync_generation1 = self.db1.set_sync_generation
270+ _set_sync_generation1 = self.db1._set_sync_generation
271 def set_sync_generation_witness1(other_uid, other_gen):
272 progress1.append((other_uid, other_gen,
273 self.db1._get_transaction_log()[2:]))
274 _set_sync_generation1(other_uid, other_gen)
275- self.patch(self.db1, 'set_sync_generation',
276+ self.patch(self.db1, '_set_sync_generation',
277 set_sync_generation_witness1)
278
279- _set_sync_generation2 = self.db2.set_sync_generation
280+ _set_sync_generation2 = self.db2._set_sync_generation
281 def set_sync_generation_witness2(other_uid, other_gen):
282 progress2.append((other_uid, other_gen,
283 self.db2._get_transaction_log()[2:]))
284 _set_sync_generation2(other_uid, other_gen)
285- self.patch(self.db2, 'set_sync_generation',
286+ self.patch(self.db2, '_set_sync_generation',
287 set_sync_generation_witness2)
288
289 db2_url = self.getURL('test2')

Subscribers

People subscribed via source and target branches