Merge lp:~thisfred/u1db/sync_integration_tests into lp:u1db

Proposed by Eric Casteleijn
Status: Merged
Approved by: Eric Casteleijn
Approved revision: 346
Merged at revision: 344
Proposed branch: lp:~thisfred/u1db/sync_integration_tests
Merge into: lp:u1db
Diff against target: 899 lines (+226/-130)
18 files modified
.bzrignore (+6/-0)
include/u1db/u1db_internal.h (+2/-2)
src/u1db.c (+1/-1)
src/u1db_http_sync_target.c (+8/-5)
src/u1db_sync_target.c (+23/-27)
u1db/__init__.py (+5/-2)
u1db/backends/inmemory.py (+4/-0)
u1db/errors.py (+4/-0)
u1db/remote/http_app.py (+15/-6)
u1db/remote/http_errors.py (+2/-0)
u1db/remote/http_target.py (+5/-2)
u1db/sync.py (+13/-7)
u1db/tests/c_backend_wrapper.pyx (+25/-17)
u1db/tests/test_backends.py (+1/-1)
u1db/tests/test_c_backend.py (+1/-1)
u1db/tests/test_http_app.py (+2/-1)
u1db/tests/test_remote_sync_target.py (+16/-16)
u1db/tests/test_sync.py (+93/-42)
To merge this branch: bzr merge lp:~thisfred/u1db/sync_integration_tests
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+111278@code.launchpad.net

Commit message

Added integration tests for the detection of a diverged source or target db during sync and made them pass.

Description of the change

Added integration tests for the detection of a diverged source or target db during sync and made them pass.

To post a comment you must log in.
Revision history for this message
Eric Casteleijn (thisfred) wrote :

shoot, something's still not right with sync_exchange_doc_ids

Revision history for this message
Eric Casteleijn (thisfred) wrote :

fixed

Revision history for this message
John O'Brien (jdobrien) wrote :

I'm completely unqualified to mark this approved. but the changes look sound to me and I really like the formatting cleanup.

Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 6/20/2012 9:27 PM, Eric Casteleijn wrote:
> Eric Casteleijn has proposed merging
> lp:~thisfred/u1db/sync_integration_tests into lp:u1db.
>
> Requested reviews: Ubuntu One hackers (ubuntuone-hackers) Related
> bugs: Bug #1006872 in U1DB: "sync_exchange should transmit the txid
> it thinks the target was at"
> https://bugs.launchpad.net/u1db/+bug/1006872
>
> For more details, see:
> https://code.launchpad.net/~thisfred/u1db/sync_integration_tests/+merge/111278
>
> Added integration tests for the detection of a diverged source or
> target db during sync and made them pass.
>

+ def test_sync_detects_rollback_in_source(self):
+ self.db1.create_doc(tests.simple_doc, doc_id="divergent")
+ self.sync(self.db1, self.db2)
+ # make db2 think it's synced with a much later version of db1
+ self.db2._set_sync_info(self.db1._replica_uid, 28, 'T-madeup')
+ self.assertRaises(
+ errors.InvalidGeneration, self.sync, self.db1, self.db2)

^- I think this is something that we eventually wanted to support, as
long as there wasn't a local change on source.

Otherwise, the diverged tests are certainly valid.

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAk/kIZsACgkQJdeBCYSNAAPCqQCgy7rzs2uAYMKIZMGKzVyiHrcZ
O7kAn1apHU4pnrBBhC0vNqWysVe+ny7g
=6nPi
-----END PGP SIGNATURE-----

Revision history for this message
Eric Casteleijn (thisfred) wrote :

I think we should as well, but we need more information to do it: all this says is that the target knows a newer generation than the source knows of itself. That still doesn't mean it hasn't diverged. I believe we probably can just start the sync though, and put_doc_if_newer would complain at the point of divergence if any. What I need to think about though is how to determine which generation to start the sync from source to target at. Can we do better than 0 there?

Revision history for this message
Samuele Pedroni (pedronis) wrote :

as discussed _validate_source isn't triggered:

https://pastebin.canonical.com/69190/

Revision history for this message
Eric Casteleijn (thisfred) wrote :

Ah, no: The code is triggered (just put a print in there) but all it does is shortcut the put_doc_if_newer, because what the target has is already strictly newer (this is the case where _validate_source returns 'superseded' rather than ok.) In the case where something is really wrong we raise an InvalidTransactionId, and that code is also exercised by the tests.

I'll try and change the relevant tests so that they detect that the put_doc_if_newer is broken off early, although I'm not sure that it's important, since it should not have any effect when it isn't (and indeed doesn't seem to, as your commenting out the check shows.)

342. By Eric Casteleijn

merged trunk, and fixed conflicts

343. By Eric Casteleijn

ignore

344. By Eric Casteleijn

fixed errors

345. By Eric Casteleijn

fixed errors

346. By Eric Casteleijn

merged lp:~pedronis/u1db/sync_integration_tests

Revision history for this message
Samuele Pedroni (pedronis) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file '.bzrignore'
2--- .bzrignore 2012-06-29 17:00:49 +0000
3+++ .bzrignore 2012-07-04 16:16:18 +0000
4@@ -15,3 +15,9 @@
5 src/CMakeFiles
6 src/Makefile
7 src/cmake_install.cmake
8+CMakeCache.txt
9+CMakeFiles
10+CPackConfig.cmake
11+CPackSourceConfig.cmake
12+Makefile
13+cmake_install.cmake
14
15=== modified file 'include/u1db/u1db_internal.h'
16--- include/u1db/u1db_internal.h 2012-07-03 13:50:27 +0000
17+++ include/u1db/u1db_internal.h 2012-07-04 16:16:18 +0000
18@@ -127,8 +127,8 @@
19 const char *source_replica_uid, int n_docs,
20 u1db_document **docs, int *generations,
21 const char **trans_ids, int *target_gen,
22- char **target_trans_id,
23- void *context, u1db_doc_gen_callback cb);
24+ char **target_trans_id, void *context,
25+ u1db_doc_gen_callback cb);
26 /**
27 * Create a sync_exchange state object.
28 *
29
30=== modified file 'src/u1db.c'
31--- src/u1db.c 2012-07-03 13:50:27 +0000
32+++ src/u1db.c 2012-07-04 16:16:18 +0000
33@@ -1618,7 +1618,7 @@
34 {
35 int status;
36 sqlite3_stmt *statement;
37- const char *tmp;
38+ const char *tmp = NULL;
39
40 if (db == NULL || replica_uid == NULL || generation == NULL
41 || trans_id == NULL)
42
43=== modified file 'src/u1db_http_sync_target.c'
44--- src/u1db_http_sync_target.c 2012-06-27 18:56:17 +0000
45+++ src/u1db_http_sync_target.c 2012-07-04 16:16:18 +0000
46@@ -773,7 +773,8 @@
47
48
49 static int
50-init_temp_file(char tmpname[], FILE **temp_fd, int target_gen)
51+init_temp_file(char tmpname[], FILE **temp_fd, int target_gen,
52+ char *target_trans_id)
53 {
54 int status = U1DB_OK;
55 *temp_fd = make_tempfile(tmpname);
56@@ -786,12 +787,14 @@
57 }
58 // Spool all of the documents to a temporary file, so that it we can
59 // determine Content-Length before we start uploading the data.
60- fprintf(*temp_fd, "[\r\n{\"last_known_generation\": %d}", target_gen);
61+ fprintf(
62+ *temp_fd,
63+ "[\r\n{\"last_known_generation\": %d, \"last_known_trans_id\": \"%s\"}",
64+ target_gen, target_trans_id);
65 finish:
66 return status;
67 }
68
69-
70 static int
71 finalize_and_send_temp_file(u1db_sync_target *st, FILE *temp_fd,
72 const char *source_replica_uid,
73@@ -981,7 +984,7 @@
74 if (n_docs > 0 && (docs == NULL || generations == NULL)) {
75 return U1DB_INVALID_PARAMETER;
76 }
77- status = init_temp_file(tmpname, &temp_fd, *target_gen);
78+ status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id);
79 if (status != U1DB_OK) { goto finish; }
80 for (i = 0; i < n_docs; ++i) {
81 status = doc_to_tempfile(
82@@ -1054,7 +1057,7 @@
83 }
84 status = u1db_get_replica_uid(source_db, &source_replica_uid);
85 if (status != U1DB_OK) { goto finish; }
86- status = init_temp_file(tmpname, &temp_fd, *target_gen);
87+ status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id);
88 if (status != U1DB_OK) { goto finish; }
89 state.num = n_doc_ids;
90 state.generations = generations;
91
92=== modified file 'src/u1db_sync_target.c'
93--- src/u1db_sync_target.c 2012-07-03 13:50:27 +0000
94+++ src/u1db_sync_target.c 2012-07-04 16:16:18 +0000
95@@ -30,11 +30,11 @@
96 const char *source_replica_uid, int source_gen, const char *trans_id);
97
98 static int st_sync_exchange(u1db_sync_target *st,
99- const char *source_replica_uid, int n_docs,
100- u1db_document **docs, int *generations,
101- const char **trans_ids, int *target_gen,
102- char **target_trans_id, void *context,
103- u1db_doc_gen_callback cb);
104+ const char *source_replica_uid, int n_docs,
105+ u1db_document **docs, int *generations,
106+ const char **trans_ids, int *target_gen,
107+ char **target_trans_id, void *context,
108+ u1db_doc_gen_callback cb);
109 static int st_sync_exchange_doc_ids(u1db_sync_target *st,
110 u1database *source_db, int n_doc_ids,
111 const char **doc_ids, int *generations,
112@@ -370,7 +370,6 @@
113 {
114 int status;
115 struct _whats_changed_doc_ids_state state = {0};
116- char *target_trans_id = NULL;
117 if (se == NULL) {
118 return U1DB_INVALID_PARAMETER;
119 }
120@@ -396,9 +395,6 @@
121 se->gen_for_doc_ids = state.gen_for_doc_ids;
122 se->trans_ids_for_doc_ids = state.trans_ids_for_doc_ids;
123 finish:
124- if (target_trans_id != NULL) {
125- free(target_trans_id);
126- }
127 return status;
128 }
129
130@@ -522,6 +518,9 @@
131 status = st->get_sync_exchange(st, source_replica_uid,
132 *target_gen, &exchange);
133 if (status != U1DB_OK) { goto finish; }
134+ status = u1db_validate_gen_and_trans_id(
135+ exchange->db, *target_gen, *target_trans_id);
136+ if (status != U1DB_OK) { goto finish; }
137 for (i = 0; i < n_docs; ++i) {
138 status = u1db__sync_exchange_insert_doc_from_source(
139 exchange, docs[i], generations[i], trans_ids[i]);
140@@ -530,14 +529,11 @@
141 status = u1db__sync_exchange_find_doc_ids_to_return(exchange);
142 if (status != U1DB_OK) { goto finish; }
143 status = u1db__sync_exchange_return_docs(exchange, context, cb);
144- if (status != U1DB_OK) { goto finish; }
145- if (status == U1DB_OK) {
146- *target_gen = exchange->target_gen;
147- *target_trans_id = exchange->target_trans_id;
148- // We set this to NULL, because the caller is now responsible for it
149- exchange->target_trans_id = NULL;
150- }
151 finish:
152+ *target_gen = exchange->target_gen;
153+ *target_trans_id = exchange->target_trans_id;
154+ // We set this to NULL, because the caller is now responsible for it
155+ exchange->target_trans_id = NULL;
156 st->finalize_sync_exchange(st, &exchange);
157 return status;
158 }
159@@ -565,6 +561,9 @@
160 status = st->get_sync_exchange(st, source_replica_uid,
161 *target_gen, &exchange);
162 if (status != U1DB_OK) { goto finish; }
163+ status = u1db_validate_gen_and_trans_id(
164+ exchange->db, *target_gen, *target_trans_id);
165+ if (status != U1DB_OK) { goto finish; }
166 if (n_doc_ids > 0) {
167 status = get_and_insert_docs(source_db, exchange,
168 n_doc_ids, doc_ids, generations, trans_ids);
169@@ -573,15 +572,11 @@
170 status = u1db__sync_exchange_find_doc_ids_to_return(exchange);
171 if (status != U1DB_OK) { goto finish; }
172 status = u1db__sync_exchange_return_docs(exchange, context, cb);
173- if (status != U1DB_OK) { goto finish; }
174+finish:
175 *target_gen = exchange->target_gen;
176- if (status == U1DB_OK) {
177- *target_gen = exchange->target_gen;
178- *target_trans_id = exchange->target_trans_id;
179- // We set this to NULL, because the caller is now responsible for it
180- exchange->target_trans_id = NULL;
181- }
182-finish:
183+ *target_trans_id = exchange->target_trans_id;
184+ // We set this to NULL, because the caller is now responsible for it
185+ exchange->target_trans_id = NULL;
186 st->finalize_sync_exchange(st, &exchange);
187 return status;
188 }
189@@ -612,11 +607,12 @@
190 status = u1db_get_replica_uid(db, &local_uid);
191 if (status != U1DB_OK) { goto finish; }
192 // fprintf(stderr, "Local uid: %s\n", local_uid);
193- status = target->get_sync_info(target, local_uid, &target_uid, &target_gen,
194- &local_gen_known_by_target, &local_trans_id_known_by_target);
195+ status = target->get_sync_info(
196+ target, local_uid, &target_uid, &target_gen,
197+ &local_gen_known_by_target, &local_trans_id_known_by_target);
198 if (status != U1DB_OK) { goto finish; }
199 status = u1db_validate_gen_and_trans_id(
200- db, local_gen_known_by_target, local_trans_id_known_by_target);
201+ db, local_gen_known_by_target, local_trans_id_known_by_target);
202 if (status != U1DB_OK) { goto finish; }
203 status = u1db__get_replica_gen_and_trans_id(db, target_uid,
204 &target_gen_known_by_local, &target_trans_id_known_by_local);
205
206=== modified file 'u1db/__init__.py'
207--- u1db/__init__.py 2012-07-03 14:35:42 +0000
208+++ u1db/__init__.py 2012-07-04 16:16:18 +0000
209@@ -565,7 +565,8 @@
210 raise NotImplementedError(self.record_sync_info)
211
212 def sync_exchange(self, docs_by_generation, source_replica_uid,
213- last_known_generation, return_doc_cb):
214+ last_known_generation, last_known_trans_id,
215+ return_doc_cb):
216 """Incorporate the documents sent from the source replica.
217
218 This is not meant to be called by client code directly, but is used as
219@@ -589,7 +590,9 @@
220 id of their latest change.
221 :param source_replica_uid: The source replica's identifier
222 :param last_known_generation: The last generation that the source
223- replica knows about this
224+ replica knows about this target replica
225+ :param last_known_trans_id: The last transaction id that the source
226+ replica knows about this target replica
227 :param: return_doc_cb(doc, gen): is a callback
228 used to return documents to the source replica, it will
229 be invoked in turn with Documents that have changed since
230
231=== modified file 'u1db/backends/inmemory.py'
232--- u1db/backends/inmemory.py 2012-07-03 13:50:27 +0000
233+++ u1db/backends/inmemory.py 2012-07-04 16:16:18 +0000
234@@ -46,6 +46,10 @@
235 self._last_exchange_log = None
236 self._factory = document_factory or Document
237
238+ def _set_replica_uid(self, replica_uid):
239+ """Force the replica_uid to be set."""
240+ self._replica_uid = replica_uid
241+
242 def set_document_factory(self, factory):
243 self._factory = factory
244
245
246=== modified file 'u1db/errors.py'
247--- u1db/errors.py 2012-06-27 18:56:17 +0000
248+++ u1db/errors.py 2012-07-04 16:16:18 +0000
249@@ -46,10 +46,14 @@
250 class InvalidTransactionId(U1DBError):
251 """Invalid transaction for generation."""
252
253+ wire_description = "invalid transaction id"
254+
255
256 class InvalidGeneration(U1DBError):
257 """Generation was previously synced with a different transaction id."""
258
259+ wire_description = "invalid generation"
260+
261
262 class ConflictedDoc(U1DBError):
263 """The document is conflicted, you must call resolve before put()"""
264
265=== modified file 'u1db/remote/http_app.py'
266--- u1db/remote/http_app.py 2012-06-27 18:35:52 +0000
267+++ u1db/remote/http_app.py 2012-07-04 16:16:18 +0000
268@@ -45,6 +45,12 @@
269 return False
270
271
272+def none_or_str(expression):
273+ if expression is None:
274+ return None
275+ return str(expression)
276+
277+
278 class BadRequest(Exception):
279 """Bad request."""
280
281@@ -316,12 +322,13 @@
282
283 # Implements the same logic as LocalSyncTarget.sync_exchange
284
285- @http_method(last_known_generation=int,
286+ @http_method(last_known_generation=int, last_known_trans_id=none_or_str,
287 content_as_args=True)
288- def post_args(self, last_known_generation):
289- self.sync_exch = self.sync_exchange_class(self.db,
290- self.source_replica_uid,
291- last_known_generation)
292+ def post_args(self, last_known_generation, last_known_trans_id=None):
293+ self.db.validate_gen_and_trans_id(
294+ last_known_generation, last_known_trans_id)
295+ self.sync_exch = self.sync_exchange_class(
296+ self.db, self.source_replica_uid, last_known_generation)
297
298 @http_method(content_as_args=True)
299 def post_stream_entry(self, id, rev, content, gen, trans_id):
300@@ -329,17 +336,19 @@
301 self.sync_exch.insert_doc_from_source(doc, gen, trans_id)
302
303 def post_end(self):
304+
305 def send_doc(doc, gen, trans_id):
306 entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
307 gen=gen, trans_id=trans_id)
308 self.responder.stream_entry(entry)
309+
310 new_gen = self.sync_exch.find_changes_to_return()
311 self.responder.content_type = 'application/x-u1db-sync-stream'
312 self.responder.start_response(200)
313 self.responder.start_stream(),
314 self.responder.stream_entry({"new_generation": new_gen,
315 "new_transaction_id": self.sync_exch.new_trans_id})
316- new_gen = self.sync_exch.return_docs(send_doc)
317+ self.sync_exch.return_docs(send_doc)
318 self.responder.end_stream()
319 self.responder.finish_response()
320
321
322=== modified file 'u1db/remote/http_errors.py'
323--- u1db/remote/http_errors.py 2012-04-24 13:40:36 +0000
324+++ u1db/remote/http_errors.py 2012-07-04 16:16:18 +0000
325@@ -29,6 +29,8 @@
326 (errors.DocumentDoesNotExist.wire_description, 404),
327 (errors.DocumentAlreadyDeleted.wire_description, 404),
328 (errors.RevisionConflict.wire_description, 409),
329+ (errors.InvalidGeneration.wire_description, 409),
330+ (errors.InvalidTransactionId.wire_description, 409),
331 (errors.Unavailable.wire_description, 503),
332 # without matching exception
333 (errors.DOCUMENT_DELETED, 404)
334
335=== modified file 'u1db/remote/http_target.py'
336--- u1db/remote/http_target.py 2012-06-14 23:28:58 +0000
337+++ u1db/remote/http_target.py 2012-07-04 16:16:18 +0000
338@@ -80,7 +80,8 @@
339 return res
340
341 def sync_exchange(self, docs_by_generations, source_replica_uid,
342- last_known_generation, return_doc_cb):
343+ last_known_generation, last_known_trans_id,
344+ return_doc_cb):
345 self._ensure_connection()
346 url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
347 self._conn.putrequest('POST', url)
348@@ -96,7 +97,9 @@
349 return len(entry)
350
351 comma = ''
352- size += prepare(last_known_generation=last_known_generation)
353+ size += prepare(
354+ last_known_generation=last_known_generation,
355+ last_known_trans_id=last_known_trans_id)
356 comma = ','
357 for doc, gen, trans_id in docs_by_generations:
358 size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
359
360=== modified file 'u1db/sync.py'
361--- u1db/sync.py 2012-07-03 13:50:27 +0000
362+++ u1db/sync.py 2012-07-04 16:16:18 +0000
363@@ -97,9 +97,11 @@
364 (self.target_replica_uid, target_gen, target_my_gen,
365 target_my_trans_id) = sync_target.get_sync_info(
366 self.source._replica_uid)
367- # what's changed since that generation and this current gen
368+ # validate that the generation and transaction id the target knows
369+ # about us are valid.
370 self.source.validate_gen_and_trans_id(
371 target_my_gen, target_my_trans_id)
372+ # what's changed since that generation and this current gen
373 my_gen, _, changes = self.source.whats_changed(target_my_gen)
374
375 # this source last-seen database generation for the target
376@@ -119,9 +121,10 @@
377
378 # exchange documents and try to insert the returned ones with
379 # the target, return target synced-up-to gen
380- new_gen, new_trans_id = sync_target.sync_exchange(docs_by_generation,
381- self.source._replica_uid, target_last_known_gen,
382- return_doc_cb=self._insert_doc_from_target)
383+ new_gen, new_trans_id = sync_target.sync_exchange(
384+ docs_by_generation, self.source._replica_uid,
385+ target_last_known_gen, target_trans_id,
386+ self._insert_doc_from_target)
387 # record target synced-up-to generation including applying what we sent
388 self.source._set_replica_gen_and_trans_id(
389 self.target_replica_uid, new_gen, new_trans_id)
390@@ -262,9 +265,12 @@
391 self._trace_hook = None
392
393 def sync_exchange(self, docs_by_generations, source_replica_uid,
394- last_known_generation, return_doc_cb):
395- sync_exch = SyncExchange(self._db, source_replica_uid,
396- last_known_generation)
397+ last_known_generation, last_known_trans_id,
398+ return_doc_cb):
399+ self._db.validate_gen_and_trans_id(
400+ last_known_generation, last_known_trans_id)
401+ sync_exch = SyncExchange(
402+ self._db, source_replica_uid, last_known_generation)
403 if self._trace_hook:
404 sync_exch._set_trace_hook(self._trace_hook)
405 # 1st step: try to insert incoming docs and record progress
406
407=== modified file 'u1db/tests/c_backend_wrapper.pyx'
408--- u1db/tests/c_backend_wrapper.pyx 2012-07-03 13:50:27 +0000
409+++ u1db/tests/c_backend_wrapper.pyx 2012-07-04 16:16:18 +0000
410@@ -184,9 +184,9 @@
411 int (*sync_exchange)(u1db_sync_target *st,
412 char *source_replica_uid, int n_docs,
413 u1db_document **docs, int *generations,
414- const_char_ptr *trans_ids, int *target_gen,
415- char **target_trans_id, void *context,
416- u1db_doc_gen_callback cb) nogil
417+ const_char_ptr *trans_ids,
418+ int *target_gen, char **target_trans_id,
419+ void *context, u1db_doc_gen_callback cb) nogil
420 int (*sync_exchange_doc_ids)(u1db_sync_target *st,
421 u1database *source_db, int n_doc_ids,
422 const_char_ptr *doc_ids, int *generations,
423@@ -713,6 +713,7 @@
424
425 def record_sync_info(self, source_replica_uid, source_gen, source_trans_id):
426 cdef int status
427+
428 self._check()
429 assert self._st.record_sync_info != NULL, "record_sync_info is NULL?"
430 with nogil:
431@@ -725,12 +726,13 @@
432 return CSyncExchange(self, source_replica_uid, source_gen)
433
434 def sync_exchange_doc_ids(self, source_db, doc_id_generations,
435- last_known_generation, return_doc_cb):
436+ last_known_generation, last_known_trans_id,
437+ return_doc_cb):
438 cdef const_char_ptr *doc_ids
439 cdef int *generations
440 cdef int num_doc_ids
441 cdef int target_gen
442- cdef char *target_trans_id
443+ cdef char *target_trans_id = NULL
444 cdef int status
445 cdef CDatabase sdb
446
447@@ -755,31 +757,35 @@
448 generations[i] = gen
449 trans_ids[i] = trans_id
450 target_gen = last_known_generation
451+ if last_known_trans_id is not None:
452+ target_trans_id = last_known_trans_id
453 with nogil:
454 status = self._st.sync_exchange_doc_ids(self._st, sdb._db,
455 num_doc_ids, doc_ids, generations, trans_ids,
456 &target_gen, &target_trans_id,
457 <void*>return_doc_cb, return_doc_cb_wrapper)
458 handle_status("sync_exchange_doc_ids", status)
459+ if target_trans_id != NULL:
460+ res_trans_id = target_trans_id
461 finally:
462+ if target_trans_id != NULL:
463+ free(target_trans_id)
464 if doc_ids != NULL:
465 free(<void *>doc_ids)
466 if generations != NULL:
467 free(generations)
468 if trans_ids != NULL:
469 free(trans_ids)
470- if target_trans_id != NULL:
471- res_trans_id = target_trans_id
472- free(target_trans_id)
473 return target_gen, res_trans_id
474
475 def sync_exchange(self, docs_by_generations, source_replica_uid,
476- last_known_generation, return_doc_cb):
477+ last_known_generation, last_known_trans_id,
478+ return_doc_cb):
479 cdef CDocument cur_doc
480 cdef u1db_document **docs = NULL
481 cdef int *generations = NULL
482 cdef const_char_ptr *trans_ids = NULL
483- cdef char *trans_id = NULL
484+ cdef char *target_trans_id = NULL
485 cdef int i, count, status, target_gen
486
487 self._check()
488@@ -802,11 +808,13 @@
489 trans_ids[i] = docs_by_generations[i][2]
490 docs[i] = cur_doc._doc
491 target_gen = last_known_generation
492+ if last_known_trans_id is not None:
493+ target_trans_id = last_known_trans_id
494 with nogil:
495- status = self._st.sync_exchange(self._st,
496- source_replica_uid, count,
497- docs, generations, trans_ids, &target_gen, &trans_id,
498- <void *>return_doc_cb, return_doc_cb_wrapper)
499+ status = self._st.sync_exchange(
500+ self._st, source_replica_uid, count, docs, generations,
501+ trans_ids, &target_gen, &target_trans_id,
502+ <void *>return_doc_cb, return_doc_cb_wrapper)
503 handle_status("sync_exchange", status)
504 finally:
505 if docs != NULL:
506@@ -815,9 +823,9 @@
507 free(generations)
508 if trans_ids != NULL:
509 free(trans_ids)
510- if trans_id != NULL:
511- res_trans_id = trans_id
512- free(trans_id)
513+ if target_trans_id != NULL:
514+ res_trans_id = target_trans_id
515+ free(target_trans_id)
516 return target_gen, res_trans_id
517
518 def _set_trace_hook(self, cb):
519
520=== modified file 'u1db/tests/test_backends.py'
521--- u1db/tests/test_backends.py 2012-07-03 22:24:43 +0000
522+++ u1db/tests/test_backends.py 2012-07-04 16:16:18 +0000
523@@ -1556,7 +1556,7 @@
524 docs_by_gen = [(doc_other, 10, 'T-sid')]
525 st.sync_exchange(
526 docs_by_gen, 'other-replica', last_known_generation=0,
527- return_doc_cb=ignore)
528+ last_known_trans_id=None, return_doc_cb=ignore)
529 self.assertGetDoc(self.db, doc.doc_id, other_rev, new_content, False)
530 self.assertEqual(
531 [doc_other], self.db.get_from_index('test-idx', 'altval'))
532
533=== modified file 'u1db/tests/test_c_backend.py'
534--- u1db/tests/test_c_backend.py 2012-07-03 13:50:27 +0000
535+++ u1db/tests/test_c_backend.py 2012-07-04 16:16:18 +0000
536@@ -359,7 +359,7 @@
537 returned.append((doc, gen, trans_id))
538
539 val = self.st.sync_exchange_doc_ids(
540- db2, [(doc2.doc_id, 1, 'T-sid')], 0, return_doc_cb)
541+ db2, [(doc2.doc_id, 1, 'T-sid')], 0, None, return_doc_cb)
542 last_trans_id = self.db._get_transaction_log()[-1][1]
543 self.assertEqual(2, self.db._get_generation())
544 self.assertEqual((2, last_trans_id), val)
545
546=== modified file 'u1db/tests/test_http_app.py'
547--- u1db/tests/test_http_app.py 2012-07-03 13:50:27 +0000
548+++ u1db/tests/test_http_app.py 2012-07-04 16:16:18 +0000
549@@ -1016,5 +1016,6 @@
550 sync_exchange_class = MySyncExchange
551
552 sync_res = MySyncResource('foo', 'src', self.state, None)
553- sync_res.post_args({'last_known_generation': 0}, '{}')
554+ sync_res.post_args(
555+ {'last_known_generation': 0, 'last_known_trans_id': None}, '{}')
556 self.assertIsInstance(sync_res.sync_exch, MySyncExchange)
557
558=== modified file 'u1db/tests/test_remote_sync_target.py'
559--- u1db/tests/test_remote_sync_target.py 2012-07-03 13:50:27 +0000
560+++ u1db/tests/test_remote_sync_target.py 2012-07-04 16:16:18 +0000
561@@ -195,9 +195,8 @@
562
563 doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
564 new_gen, trans_id = remote_target.sync_exchange(
565- [(doc, 10, 'T-sid')],
566- 'replica', last_known_generation=0,
567- return_doc_cb=receive_doc)
568+ [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
569+ last_known_trans_id=None, return_doc_cb=receive_doc)
570 self.assertEqual(1, new_gen)
571 self.assertGetDoc(
572 db, 'doc-here', 'replica:1', '{"value": "here"}', False)
573@@ -233,11 +232,12 @@
574 doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
575 doc2 = self.make_document('doc-here2', 'replica:1',
576 '{"value": "here2"}')
577- self.assertRaises(errors.HTTPError, remote_target.sync_exchange,
578- [(doc1, 10, 'T-sid'),
579- (doc2, 11, 'T-sud')
580- ], 'replica', last_known_generation=0,
581- return_doc_cb=receive_doc)
582+ self.assertRaises(
583+ errors.HTTPError,
584+ remote_target.sync_exchange,
585+ [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')],
586+ 'replica', last_known_generation=0, last_known_trans_id=None,
587+ return_doc_cb=receive_doc)
588 self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}',
589 False)
590 self.assertEqual(
591@@ -246,9 +246,8 @@
592 # retry
593 trigger_ids = []
594 new_gen, trans_id = remote_target.sync_exchange(
595- [(doc2, 11, 'T-sud')
596- ], 'replica', last_known_generation=0,
597- return_doc_cb=receive_doc)
598+ [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0,
599+ last_known_trans_id=None, return_doc_cb=receive_doc)
600 self.assertGetDoc(db, 'doc-here2', 'replica:1', '{"value": "here2"}',
601 False)
602 self.assertEqual(
603@@ -284,9 +283,10 @@
604 other_changes.append(
605 (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
606
607- self.assertRaises(errors.Unavailable, remote_target.sync_exchange,
608- [], 'replica', last_known_generation=0,
609- return_doc_cb=receive_doc)
610+ self.assertRaises(
611+ errors.Unavailable, remote_target.sync_exchange, [], 'replica',
612+ last_known_generation=0, last_known_trans_id=None,
613+ return_doc_cb=receive_doc)
614 self.assertEqual(
615 (doc.doc_id, doc.rev, '{"value": "there"}', 1),
616 other_changes[0][:-1])
617@@ -303,8 +303,8 @@
618 (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
619
620 new_gen, trans_id = remote_target.sync_exchange(
621- [], 'replica', last_known_generation=0,
622- return_doc_cb=receive_doc)
623+ [], 'replica', last_known_generation=0, last_known_trans_id=None,
624+ return_doc_cb=receive_doc)
625 self.assertEqual(1, new_gen)
626 self.assertEqual(
627 (doc.doc_id, doc.rev, '{"value": "there"}', 1),
628
629=== modified file 'u1db/tests/test_sync.py'
630--- u1db/tests/test_sync.py 2012-07-03 18:01:54 +0000
631+++ u1db/tests/test_sync.py 2012-07-04 16:16:18 +0000
632@@ -157,11 +157,12 @@
633 self.st.get_sync_info('replica'))
634
635 def test_sync_exchange(self):
636- doc = self.make_document('doc-id', 'replica:1', simple_doc)
637- docs_by_gen = [(doc, 10, 'T-sid')]
638- new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
639- last_known_generation=0,
640- return_doc_cb=self.receive_doc)
641+ docs_by_gen = [
642+ (self.make_document('doc-id', 'replica:1', simple_doc), 10,
643+ 'T-sid')]
644+ new_gen, trans_id = self.st.sync_exchange(
645+ docs_by_gen, 'replica', last_known_generation=0,
646+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
647 self.assertGetDoc(self.db, 'doc-id', 'replica:1', simple_doc, False)
648 self.assertTransactionLog(['doc-id'], self.db)
649 last_trans_id = self.getLastTransId(self.db)
650@@ -174,9 +175,9 @@
651 edit_rev = 'replica:1|' + doc.rev
652 docs_by_gen = [
653 (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')]
654- new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
655- last_known_generation=0,
656- return_doc_cb=self.receive_doc)
657+ new_gen, trans_id = self.st.sync_exchange(
658+ docs_by_gen, 'replica', last_known_generation=0,
659+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
660 self.assertGetDocIncludeDeleted(
661 self.db, doc.doc_id, edit_rev, None, False)
662 self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
663@@ -190,9 +191,9 @@
664 (self.make_document('doc-id', 'replica:1', simple_doc), 10, 'T-1'),
665 (self.make_document('doc-id2', 'replica:1', nested_doc), 11,
666 'T-2')]
667- new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
668- last_known_generation=0,
669- return_doc_cb=self.receive_doc)
670+ new_gen, trans_id = self.st.sync_exchange(
671+ docs_by_gen, 'replica', last_known_generation=0,
672+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
673 self.assertGetDoc(self.db, 'doc-id', 'replica:1', simple_doc, False)
674 self.assertGetDoc(self.db, 'doc-id2', 'replica:1', nested_doc, False)
675 self.assertTransactionLog(['doc-id', 'doc-id2'], self.db)
676@@ -210,7 +211,7 @@
677 'T-sid')]
678 new_gen, _ = self.st.sync_exchange(
679 docs_by_gen, 'replica', last_known_generation=0,
680- return_doc_cb=self.receive_doc)
681+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
682 self.assertTransactionLog([doc.doc_id], self.db)
683 self.assertEqual(
684 (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1])
685@@ -222,20 +223,21 @@
686 def test_sync_exchange_ignores_convergence(self):
687 doc = self.db.create_doc(simple_doc)
688 self.assertTransactionLog([doc.doc_id], self.db)
689+ gen, txid = self.db._get_generation_info()
690 docs_by_gen = [
691 (self.make_document(doc.doc_id, doc.rev, simple_doc), 10, 'T-sid')]
692- new_gen, _ = self.st.sync_exchange(docs_by_gen, 'replica',
693- last_known_generation=1,
694- return_doc_cb=self.receive_doc)
695+ new_gen, _ = self.st.sync_exchange(
696+ docs_by_gen, 'replica', last_known_generation=gen,
697+ last_known_trans_id=txid, return_doc_cb=self.receive_doc)
698 self.assertTransactionLog([doc.doc_id], self.db)
699 self.assertEqual(([], 1), (self.other_changes, new_gen))
700
701 def test_sync_exchange_returns_new_docs(self):
702 doc = self.db.create_doc(simple_doc)
703 self.assertTransactionLog([doc.doc_id], self.db)
704- new_gen, _ = self.st.sync_exchange([], 'other-replica',
705- last_known_generation=0,
706- return_doc_cb=self.receive_doc)
707+ new_gen, _ = self.st.sync_exchange(
708+ [], 'other-replica', last_known_generation=0,
709+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
710 self.assertTransactionLog([doc.doc_id], self.db)
711 self.assertEqual(
712 (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1])
713@@ -248,9 +250,9 @@
714 doc = self.db.create_doc(simple_doc)
715 self.db.delete_doc(doc)
716 self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
717- new_gen, _ = self.st.sync_exchange([], 'other-replica',
718- last_known_generation=0,
719- return_doc_cb=self.receive_doc)
720+ new_gen, _ = self.st.sync_exchange(
721+ [], 'other-replica', last_known_generation=0,
722+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
723 self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
724 self.assertEqual(
725 (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1])
726@@ -263,9 +265,9 @@
727 doc = self.db.create_doc(simple_doc)
728 doc2 = self.db.create_doc(nested_doc)
729 self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
730- new_gen, _ = self.st.sync_exchange([], 'other-replica',
731- last_known_generation=0,
732- return_doc_cb=self.receive_doc)
733+ new_gen, _ = self.st.sync_exchange(
734+ [], 'other-replica', last_known_generation=0,
735+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
736 self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
737 self.assertEqual(2, new_gen)
738 self.assertEqual(
739@@ -285,9 +287,9 @@
740 docs_by_gen = [
741 (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
742 'T-sid')]
743- new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
744- last_known_generation=0,
745- return_doc_cb=self.receive_doc)
746+ new_gen, _ = self.st.sync_exchange(
747+ docs_by_gen, 'other-replica', last_known_generation=0,
748+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
749 self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
750 self.assertEqual(([], 2), (self.other_changes, new_gen))
751
752@@ -311,15 +313,17 @@
753 'T-sid')]
754 new_gen, _ = self.st.sync_exchange(
755 docs_by_gen, 'other-replica', last_known_generation=0,
756- return_doc_cb=self.receive_doc)
757+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
758 self.assertEqual(expected, [c[:-1] for c in self.other_changes])
759 self.assertEqual(3, new_gen)
760
761 def test_sync_exchange_with_concurrent_updates(self):
762+
763 def after_whatschanged_cb(state):
764 if state != 'after whats_changed':
765 return
766 self.db.create_doc('{"new": "doc"}')
767+
768 self.set_trace_hook(after_whatschanged_cb)
769 doc = self.db.create_doc(simple_doc)
770 self.assertTransactionLog([doc.doc_id], self.db)
771@@ -327,9 +331,9 @@
772 docs_by_gen = [
773 (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
774 'T-sid')]
775- new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
776- last_known_generation=0,
777- return_doc_cb=self.receive_doc)
778+ new_gen, _ = self.st.sync_exchange(
779+ docs_by_gen, 'other-replica', last_known_generation=0,
780+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
781 self.assertEqual(([], 2), (self.other_changes, new_gen))
782
783 def test_sync_exchange_converged_handling(self):
784@@ -338,9 +342,9 @@
785 (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'),
786 (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5,
787 'T-bar')]
788- new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
789- last_known_generation=0,
790- return_doc_cb=self.receive_doc)
791+ new_gen, _ = self.st.sync_exchange(
792+ docs_by_gen, 'other-replica', last_known_generation=0,
793+ last_known_trans_id=None, return_doc_cb=self.receive_doc)
794 self.assertEqual(([], 2), (self.other_changes, new_gen))
795
796 def test_sync_exchange_detect_incomplete_exchange(self):
797@@ -354,10 +358,11 @@
798 'log_exception', lambda h, exc_info: None)
799 doc = self.db.create_doc(simple_doc)
800 self.assertTransactionLog([doc.doc_id], self.db)
801- self.assertRaises((errors.U1DBError, errors.BrokenSyncStream),
802- self.st.sync_exchange, [], 'other-replica',
803- last_known_generation=0,
804- return_doc_cb=self.receive_doc)
805+ self.assertRaises(
806+ (errors.U1DBError, errors.BrokenSyncStream),
807+ self.st.sync_exchange, [], 'other-replica',
808+ last_known_generation=0, last_known_trans_id=None,
809+ return_doc_cb=self.receive_doc)
810
811 def test_sync_exchange_doc_ids(self):
812 sync_exchange_doc_ids = getattr(self.st, 'sync_exchange_doc_ids', None)
813@@ -366,7 +371,7 @@
814 db2 = self.create_database('test2')
815 doc = db2.create_doc(simple_doc)
816 new_gen, trans_id = sync_exchange_doc_ids(
817- db2, [(doc.doc_id, 10, 'T-sid')], 0,
818+ db2, [(doc.doc_id, 10, 'T-sid')], 0, None,
819 return_doc_cb=self.receive_doc)
820 self.assertGetDoc(self.db, doc.doc_id, doc.rev, simple_doc, False)
821 self.assertTransactionLog([doc.doc_id], self.db)
822@@ -382,7 +387,7 @@
823 called.append(state)
824
825 self.set_trace_hook(cb)
826- self.st.sync_exchange([], 'replica', 0, self.receive_doc)
827+ self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
828 self.st.record_sync_info('replica', 0, 'T-sid')
829 self.assertEqual(['before whats_changed',
830 'after whats_changed',
831@@ -410,13 +415,19 @@
832 def make_database_for_http_test(test, replica_uid):
833 if test.server is None:
834 test.startServer()
835- return test.request_state._create_database(replica_uid)
836+ db = test.request_state._create_database(replica_uid)
837+ try:
838+ http_at = test._http_at
839+ except AttributeError:
840+ http_at = test._http_at = {}
841+ http_at[db] = replica_uid
842+ return db
843
844
845 def sync_via_synchronizer_and_http(test, db_source, db_target, trace_hook=None):
846 if trace_hook:
847 test.skipTest("trace_hook unsupported over http")
848- path = db_target._replica_uid
849+ path = test._http_at[db_target]
850 target = http_target.HTTPSyncTarget.connect(test.getURL(path))
851 return sync.Synchronizer(db_source, target).sync()
852
853@@ -937,6 +948,46 @@
854
855 self.sync(self.db1, self.db2, trace_hook=put_hook)
856
857+ def test_sync_detects_rollback_in_source(self):
858+ self.db1.create_doc(tests.simple_doc, doc_id="divergent")
859+ self.sync(self.db1, self.db2)
860+ # make db2 think it's synced with a much later version of db1
861+ self.db2._set_replica_gen_and_trans_id(
862+ self.db1._replica_uid, 28, 'T-madeup')
863+ self.assertRaises(
864+ errors.InvalidGeneration, self.sync, self.db1, self.db2)
865+
866+ def test_sync_detects_rollback_in_target(self):
867+ self.db1.create_doc(tests.simple_doc, doc_id="divergent")
868+ self.sync(self.db1, self.db2)
869+ # make db1 think it's synced with a much later version of db2
870+ self.db1._set_replica_gen_and_trans_id(
871+ self.db2._replica_uid, 28, 'T-madeup')
872+ self.assertRaises(
873+ errors.InvalidGeneration, self.sync, self.db1, self.db2)
874+
875+ def test_sync_detects_diverged_source(self):
876+ self.db1.create_doc(tests.simple_doc, doc_id="divergent")
877+ self.sync(self.db1, self.db2)
878+ # create a new db with the same replica as the source, and add
879+ # a document, so it will have a different txid for the same generation.
880+ db3 = self.create_database('test3')
881+ db3.create_doc(tests.nested_doc, doc_id="divergent")
882+ db3._set_replica_uid(self.db1._replica_uid)
883+ self.assertRaises(
884+ errors.InvalidTransactionId, self.sync, db3, self.db2)
885+
886+ def test_sync_detects_diverged_target(self):
887+ self.db1.create_doc(tests.simple_doc, doc_id="divergent")
888+ self.sync(self.db1, self.db2)
889+ # create a new db with the same replica as the target, and add
890+ # a document, so it will have a different txid for the same generation.
891+ db3 = self.create_database('test3')
892+ db3.create_doc(tests.nested_doc, doc_id="divergent")
893+ db3._set_replica_uid(self.db2._replica_uid)
894+ self.assertRaises(
895+ errors.InvalidTransactionId, self.sync, self.db1, db3)
896+
897
898 class TestDbSync(tests.TestCaseWithServer):
899 """Test db.sync remote sync shortcut"""

Subscribers

People subscribed via source and target branches