Merge lp:~thisfred/u1db/txid_in_sync_exchange into lp:u1db

Proposed by Eric Casteleijn
Status: Superseded
Proposed branch: lp:~thisfred/u1db/txid_in_sync_exchange
Merge into: lp:u1db
Prerequisite: lp:~thisfred/u1db/check-replica-trans-id
Diff against target: 1339 lines (+305/-182)
14 files modified
include/u1db/u1db.h (+2/-1)
include/u1db/u1db_internal.h (+10/-5)
src/u1db_http_sync_target.c (+33/-20)
src/u1db_sync_target.c (+60/-33)
u1db/__init__.py (+4/-4)
u1db/remote/http_app.py (+4/-4)
u1db/remote/http_target.py (+3/-3)
u1db/sync.py (+21/-16)
u1db/tests/c_backend_wrapper.pyx (+44/-20)
u1db/tests/test_backends.py (+1/-1)
u1db/tests/test_c_backend.py (+19/-14)
u1db/tests/test_http_app.py (+14/-8)
u1db/tests/test_remote_sync_target.py (+44/-22)
u1db/tests/test_sync.py (+46/-31)
To merge this branch: bzr merge lp:~thisfred/u1db/txid_in_sync_exchange
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+110625@code.launchpad.net

This proposal supersedes a proposal from 2012-06-14.

This proposal has been superseded by a proposal from 2012-06-19.

Commit message

Added the transaction ids to the sync_exchange

Description of the change

Added the transaction ids to the sync_exchange

To post a comment you must log in.
Revision history for this message
Eric Casteleijn (thisfred) wrote : Posted in a previous version of this proposal

Since we get a lot of uuid txids, I strip them in a lot of the tests. This means the tests will still fail if the txids are not there, but perhaps not in the most obvious way, and we make no assumptions about the contents of the txids, but I think that's probably a good thing.

Also I seem to have misplaced some 3.5K of memory, which I will hunt down and fix tomorrow.

Revision history for this message
Eric Casteleijn (thisfred) wrote : Posted in a previous version of this proposal

Memory issue fixed.

Revision history for this message
Samuele Pedroni (pedronis) wrote :

looks good,

494 + :param docs_by_generation: A list of [(Document, generation,
495 + transaction_id)] pairs indicating documents which should be updated
496 + on this replica paired with the generation of their latest change.

doesn't mention the trans_id

review: Approve
Revision history for this message
Ubuntu One Auto Pilot (otto-pilot) wrote :
332. By Eric Casteleijn

merged trunk

333. By Eric Casteleijn

merged trunk

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'include/u1db/u1db.h'
2--- include/u1db/u1db.h 2012-06-19 17:44:24 +0000
3+++ include/u1db/u1db.h 2012-06-19 18:44:19 +0000
4@@ -42,7 +42,8 @@
5 typedef int (*u1db_doc_callback)(void *context, u1db_document *doc);
6 typedef int (*u1db_key_callback)(void *context, int num_fields,
7 const char **key);
8-typedef int (*u1db_doc_gen_callback)(void *context, u1db_document *doc, int gen);
9+typedef int (*u1db_doc_gen_callback)(void *context, u1db_document *doc,
10+ int gen, const char *trans_id);
11 typedef int (*u1db_doc_id_gen_callback)(void *context, const char *doc_id, int gen);
12 typedef int (*u1db_trans_info_callback)(void *context, const char *doc_id,
13 int gen, const char *trans_id);
14
15=== modified file 'include/u1db/u1db_internal.h'
16--- include/u1db/u1db_internal.h 2012-06-19 17:55:20 +0000
17+++ include/u1db/u1db_internal.h 2012-06-19 18:44:19 +0000
18@@ -99,6 +99,8 @@
19 * want to send to the sync target
20 * @param generations An array of generations. Each generation
21 * corresponds to a doc_id.
22+ * @param trans_ids An array of transaction ids. Each transaction id
23+ * corresponds to a doc_id.
24 * @param target_gen (IN/OUT) Seed this with the generation of the
25 * target that source_db has last seen, it will then
26 * be filled with the final generation of the target
27@@ -115,7 +117,7 @@
28 */
29 int (*sync_exchange_doc_ids)(u1db_sync_target *st, u1database *source_db,
30 int n_doc_ids, const char **doc_ids, int *generations,
31- int *target_gen, char **target_trans_id,
32+ const char **trans_ids, int *target_gen, char **target_trans_id,
33 void *context, u1db_doc_gen_callback cb);
34
35 /**
36@@ -124,7 +126,8 @@
37 int (*sync_exchange)(u1db_sync_target *st,
38 const char *source_replica_uid, int n_docs,
39 u1db_document **docs, int *generations,
40- int *target_gen, char **target_trans_id,
41+ const char **trans_ids, int *target_gen,
42+ char **target_trans_id,
43 void *context, u1db_doc_gen_callback cb);
44 /**
45 * Create a sync_exchange state object.
46@@ -171,6 +174,7 @@
47 struct lh_table *seen_ids;
48 int num_doc_ids;
49 int *gen_for_doc_ids;
50+ const char **trans_ids_for_doc_ids;
51 char **doc_ids_to_return;
52 void *trace_context;
53 u1db__trace_callback trace_cb;
54@@ -468,7 +472,7 @@
55 * We have received a doc from source, record it.
56 */
57 int u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se,
58- u1db_document *doc, int source_gen);
59+ u1db_document *doc, int source_gen, const char *trans_id);
60
61 /**
62 * We are done receiving docs, find what we want to return.
63@@ -486,8 +490,9 @@
64 * u1db_free_doc().
65 */
66 int u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context,
67- int (*cb)(void *context, u1db_document *doc, int gen));
68-
69+ int (*cb)(void *context,
70+ u1db_document *doc, int gen,
71+ const char *trans_id));
72
73 /**
74 * Create a sync target pointing at a given URL.
75
76=== modified file 'src/u1db_http_sync_target.c'
77--- src/u1db_http_sync_target.c 2012-05-31 12:22:03 +0000
78+++ src/u1db_http_sync_target.c 2012-06-19 18:44:19 +0000
79@@ -45,14 +45,19 @@
80 int source_gen,
81 u1db_sync_exchange **exchange);
82 static int st_http_sync_exchange(u1db_sync_target *st,
83- const char *source_replica_uid,
84- int n_docs, u1db_document **docs,
85- int *generations, int *target_gen, char **target_trans_id,
86- void *context, u1db_doc_gen_callback cb);
87+ const char *source_replica_uid, int n_docs,
88+ u1db_document **docs, int *generations,
89+ const char **trans_ids, int *target_gen,
90+ char **target_trans_id, void *context,
91+ u1db_doc_gen_callback cb);
92 static int st_http_sync_exchange_doc_ids(u1db_sync_target *st,
93- u1database *source_db, int n_doc_ids, const char **doc_ids,
94- int *generations, int *target_gen, char **target_trans_id,
95- void *context, u1db_doc_gen_callback cb);
96+ u1database *source_db, int n_doc_ids,
97+ const char **doc_ids,
98+ int *generations,
99+ const char **trans_ids,
100+ int *target_gen,
101+ char **target_trans_id, void *context,
102+ u1db_doc_gen_callback cb);
103 static void st_http_finalize_sync_exchange(u1db_sync_target *st,
104 u1db_sync_exchange **exchange);
105 static int st_http_set_trace_hook(u1db_sync_target *st,
106@@ -410,8 +415,8 @@
107 goto finish;
108 }
109 if (req.body_buffer == NULL) {
110- status = U1DB_INVALID_HTTP_RESPONSE;
111- goto finish;
112+ status = U1DB_INVALID_HTTP_RESPONSE;
113+ goto finish;
114 }
115 json = json_tokener_parse(req.body_buffer);
116 if (json == NULL) {
117@@ -663,7 +668,7 @@
118
119
120 static int
121-doc_to_tempfile(u1db_document *doc, int gen, FILE *fd)
122+doc_to_tempfile(u1db_document *doc, int gen, const char *trans_id, FILE *fd)
123 {
124 int status = U1DB_OK;
125 json_object *json = NULL;
126@@ -678,6 +683,7 @@
127 json_object_object_add(
128 json, "content", doc->json?json_object_new_string(doc->json):NULL);
129 json_object_object_add(json, "gen", json_object_new_int(gen));
130+ json_object_object_add(json, "trans_id", json_object_new_string(trans_id));
131 fputs(json_object_to_json_string(json), fd);
132 finish:
133 if (json != NULL) {
134@@ -791,6 +797,7 @@
135 const char *doc_id, *content, *rev;
136 const char *tmp = NULL;
137 int gen;
138+ const char *trans_id = NULL;
139 u1db_document *doc;
140
141 json = json_tokener_parse(response);
142@@ -837,12 +844,14 @@
143 content = json_object_get_string(attr);
144 attr = json_object_object_get(obj, "gen");
145 gen = json_object_get_int(attr);
146+ attr = json_object_object_get(obj, "trans_id");
147+ trans_id = json_object_get_string(attr);
148 doc = u1db__allocate_document(doc_id, rev, content, 0);
149 if (doc == NULL) {
150 status = U1DB_NOMEM;
151 goto finish;
152 }
153- status = cb(context, doc, gen);
154+ status = cb(context, doc, gen, trans_id);
155 if (status != U1DB_OK) { goto finish; }
156 }
157 finish:
158@@ -875,11 +884,10 @@
159 }
160
161 static int
162-st_http_sync_exchange(u1db_sync_target *st,
163- const char *source_replica_uid,
164- int n_docs, u1db_document **docs,
165- int *generations, int *target_gen, char **target_trans_id,
166- void *context,
167+st_http_sync_exchange(u1db_sync_target *st, const char *source_replica_uid,
168+ int n_docs, u1db_document **docs, int *generations,
169+ const char **trans_ids, int *target_gen,
170+ char **target_trans_id, void *context,
171 u1db_doc_gen_callback cb)
172 {
173 int status, i;
174@@ -898,7 +906,8 @@
175 status = init_temp_file(tmpname, &temp_fd, *target_gen);
176 if (status != U1DB_OK) { goto finish; }
177 for (i = 0; i < n_docs; ++i) {
178- status = doc_to_tempfile(docs[i], generations[i], temp_fd);
179+ status = doc_to_tempfile(
180+ docs[i], generations[i], trans_ids[i], temp_fd);
181 if (status != U1DB_OK) { goto finish; }
182 }
183 status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req);
184@@ -915,6 +924,7 @@
185 int offset;
186 int num;
187 int *generations;
188+ const char **trans_ids;
189 FILE *temp_fd;
190 };
191
192@@ -930,6 +940,7 @@
193 status = U1DB_INTERNAL_ERROR;
194 } else {
195 status = doc_to_tempfile(doc, state->generations[state->offset],
196+ state->trans_ids[state->offset],
197 state->temp_fd);
198 }
199 u1db_free_doc(&doc);
200@@ -943,9 +954,10 @@
201
202 static int
203 st_http_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db,
204- int n_doc_ids, const char **doc_ids, int *generations,
205- int *target_gen, char **target_trans_id,
206- void *context, u1db_doc_gen_callback cb)
207+ int n_doc_ids, const char **doc_ids,
208+ int *generations, const char **trans_ids,
209+ int *target_gen, char **target_trans_id,
210+ void *context, u1db_doc_gen_callback cb)
211 {
212 int status;
213 FILE *temp_fd = NULL;
214@@ -968,6 +980,7 @@
215 if (status != U1DB_OK) { goto finish; }
216 state.num = n_doc_ids;
217 state.generations = generations;
218+ state.trans_ids = trans_ids;
219 state.temp_fd = temp_fd;
220 status = u1db_get_docs(source_db, n_doc_ids, doc_ids, 0, 1,
221 &state, get_docs_to_tempfile);
222
223=== modified file 'src/u1db_sync_target.c'
224--- src/u1db_sync_target.c 2012-06-08 10:06:05 +0000
225+++ src/u1db_sync_target.c 2012-06-19 18:44:19 +0000
226@@ -32,13 +32,15 @@
227 static int st_sync_exchange(u1db_sync_target *st,
228 const char *source_replica_uid, int n_docs,
229 u1db_document **docs, int *generations,
230- int *target_gen, char **target_trans_id,
231- void *context, u1db_doc_gen_callback cb);
232+ const char **trans_ids, int *target_gen,
233+ char **target_trans_id, void *context,
234+ u1db_doc_gen_callback cb);
235 static int st_sync_exchange_doc_ids(u1db_sync_target *st,
236- u1database *source_db,
237- int n_doc_ids, const char **doc_ids, int *generations,
238- int *target_gen, char **target_trans_id,
239- void *context, u1db_doc_gen_callback cb);
240+ u1database *source_db, int n_doc_ids,
241+ const char **doc_ids, int *generations,
242+ const char **trans_ids, int *target_gen,
243+ char **target_trans_id, void *context,
244+ u1db_doc_gen_callback cb);
245 static int st_get_sync_exchange(u1db_sync_target *st,
246 const char *source_replica_uid,
247 int source_gen,
248@@ -57,6 +59,7 @@
249 void *user_context;
250 u1db_doc_gen_callback user_cb;
251 int *gen_for_doc_ids;
252+ const char **trans_ids_for_doc_ids;
253 int free_when_done;
254 };
255
256@@ -203,6 +206,10 @@
257 free((*exchange)->gen_for_doc_ids);
258 (*exchange)->gen_for_doc_ids = NULL;
259 }
260+ if ((*exchange)->trans_ids_for_doc_ids != NULL) {
261+ free((*exchange)->trans_ids_for_doc_ids);
262+ (*exchange)->trans_ids_for_doc_ids = NULL;
263+ }
264 if ((*exchange)->target_trans_id != NULL) {
265 free((*exchange)->target_trans_id);
266 (*exchange)->target_trans_id = NULL;
267@@ -275,7 +282,7 @@
268
269 int
270 u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se,
271- u1db_document *doc, int source_gen)
272+ u1db_document *doc, int source_gen, const char *trans_id)
273 {
274 int status = U1DB_OK;
275 int insert_state;
276@@ -285,7 +292,8 @@
277 }
278 // fprintf(stderr, "Inserting %s from source\n", doc->doc_id);
279 status = u1db__put_doc_if_newer(se->db, doc, 0, se->source_replica_uid,
280- source_gen, NULL, &insert_state, &at_gen);
281+ source_gen, trans_id, &insert_state,
282+ &at_gen);
283 if (insert_state == U1DB_INSERTED || insert_state == U1DB_CONVERGED) {
284 lh_table_insert(se->seen_ids, strdup(doc->doc_id),
285 (void *)(intptr_t)at_gen);
286@@ -305,6 +313,7 @@
287 struct lh_table *exclude_ids;
288 char **doc_ids_to_return;
289 int *gen_for_doc_ids;
290+ const char **trans_ids_for_doc_ids;
291 };
292
293 // Callback for whats_changed to map the callback into the sync_exchange
294@@ -327,25 +336,29 @@
295 if (state->num_doc_ids >= state->max_doc_ids) {
296 state->max_doc_ids = (state->max_doc_ids * 2) + 10;
297 if (state->doc_ids_to_return == NULL) {
298- state->doc_ids_to_return = (char **)calloc(state->max_doc_ids,
299- sizeof(char*));
300- state->gen_for_doc_ids = (int *)calloc(state->max_doc_ids,
301- sizeof(int));
302+ state->doc_ids_to_return = (char **)calloc(
303+ state->max_doc_ids, sizeof(char*));
304+ state->gen_for_doc_ids = (int *)calloc(
305+ state->max_doc_ids, sizeof(int));
306+ state->trans_ids_for_doc_ids = (const char **)calloc(
307+ state->max_doc_ids, sizeof(char*));
308 } else {
309 state->doc_ids_to_return = (char **)realloc(
310- state->doc_ids_to_return,
311- state->max_doc_ids * sizeof(char*));
312- state->gen_for_doc_ids = (int *)realloc(state->gen_for_doc_ids,
313- state->max_doc_ids * sizeof(int));
314+ state->doc_ids_to_return, state->max_doc_ids * sizeof(char*));
315+ state->gen_for_doc_ids = (int *)realloc(
316+ state->gen_for_doc_ids, state->max_doc_ids * sizeof(int));
317+ state->trans_ids_for_doc_ids = (const char **)realloc(
318+ state->gen_for_doc_ids, state->max_doc_ids * sizeof(char*));
319 }
320- if (state->doc_ids_to_return == NULL
321- || state->gen_for_doc_ids == NULL)
322+ if (state->doc_ids_to_return == NULL || state->gen_for_doc_ids == NULL
323+ || state->trans_ids_for_doc_ids == NULL)
324 {
325 return U1DB_NOMEM;
326 }
327 }
328 state->doc_ids_to_return[state->num_doc_ids] = strdup(doc_id);
329 state->gen_for_doc_ids[state->num_doc_ids] = gen;
330+ state->trans_ids_for_doc_ids[state->num_doc_ids] = trans_id;
331 state->num_doc_ids++;
332 return 0;
333 }
334@@ -370,6 +383,7 @@
335 if (status != U1DB_OK) {
336 free(state.doc_ids_to_return);
337 free(state.gen_for_doc_ids);
338+ free(state.trans_ids_for_doc_ids);
339 goto finish;
340 }
341 if (se->trace_cb) {
342@@ -379,6 +393,7 @@
343 se->num_doc_ids = state.num_doc_ids;
344 se->doc_ids_to_return = state.doc_ids_to_return;
345 se->gen_for_doc_ids = state.gen_for_doc_ids;
346+ se->trans_ids_for_doc_ids = state.trans_ids_for_doc_ids;
347 finish:
348 if (target_trans_id != NULL) {
349 free(target_trans_id);
350@@ -397,8 +412,9 @@
351 // Note: using doc_offset in this way assumes that u1db_get_docs will
352 // always return them in exactly the order we requested. This is
353 // probably true, though.
354- status = ctx->user_cb(ctx->user_context, doc,
355- ctx->gen_for_doc_ids[ctx->doc_offset]);
356+ status = ctx->user_cb(
357+ ctx->user_context, doc, ctx->gen_for_doc_ids[ctx->doc_offset],
358+ ctx->trans_ids_for_doc_ids[ctx->doc_offset]);
359 ctx->doc_offset++;
360 if (ctx->free_when_done) {
361 u1db_free_doc(&doc);
362@@ -409,7 +425,8 @@
363
364 int
365 u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context,
366- int (*cb)(void *context, u1db_document *doc, int gen))
367+ int (*cb)(void *context, u1db_document *doc,
368+ int gen, const char *trans_id))
369 {
370 int status = U1DB_OK;
371 struct _get_docs_to_doc_gen_context state = {0};
372@@ -420,6 +437,7 @@
373 state.user_cb = cb;
374 state.doc_offset = 0;
375 state.gen_for_doc_ids = se->gen_for_doc_ids;
376+ state.trans_ids_for_doc_ids = se->trans_ids_for_doc_ids;
377 if (se->trace_cb) {
378 status = se->trace_cb(se->trace_context, "before get_docs");
379 if (status != U1DB_OK) { goto finish; }
380@@ -440,14 +458,16 @@
381 };
382
383 static int
384-return_doc_to_insert_from_target(void *context, u1db_document *doc, int gen)
385+return_doc_to_insert_from_target(void *context, u1db_document *doc, int gen,
386+ const char *trans_id)
387 {
388 int status, insert_state;
389 struct _return_doc_state *state;
390 state = (struct _return_doc_state *)context;
391
392- status = u1db__put_doc_if_newer(state->db, doc, 1, state->target_uid, gen,
393- NULL, &insert_state, NULL);
394+ status = u1db__put_doc_if_newer(
395+ state->db, doc, 1, state->target_uid, gen, trans_id, &insert_state,
396+ NULL);
397 u1db_free_doc(&doc);
398 if (status == U1DB_OK) {
399 if (insert_state == U1DB_INSERTED || insert_state == U1DB_CONFLICTED) {
400@@ -462,7 +482,8 @@
401
402 static int
403 get_and_insert_docs(u1database *source_db, u1db_sync_exchange *se,
404- int n_doc_ids, const char **doc_ids, int *generations)
405+ int n_doc_ids, const char **doc_ids, int *generations,
406+ const char **trans_ids)
407 {
408 struct _get_docs_to_doc_gen_context get_doc_state = {0};
409
410@@ -473,6 +494,7 @@
411 get_doc_state.user_cb =
412 (u1db_doc_gen_callback)u1db__sync_exchange_insert_doc_from_source;
413 get_doc_state.gen_for_doc_ids = generations;
414+ get_doc_state.trans_ids_for_doc_ids = trans_ids;
415 return u1db_get_docs(source_db, n_doc_ids, doc_ids,
416 0, 1, &get_doc_state, get_docs_to_gen_docs);
417 }
418@@ -480,11 +502,13 @@
419
420 static int
421 st_sync_exchange(u1db_sync_target *st, const char *source_replica_uid,
422- int n_docs, u1db_document **docs,
423- int *generations, int *target_gen, char **target_trans_id,
424- void *context, u1db_doc_gen_callback cb)
425+ int n_docs, u1db_document **docs, int *generations,
426+ const char **trans_ids, int *target_gen,
427+ char **target_trans_id, void *context,
428+ u1db_doc_gen_callback cb)
429 {
430 int status, i;
431+
432 u1db_sync_exchange *exchange = NULL;
433 if (st == NULL || generations == NULL || target_gen == NULL
434 || target_trans_id == NULL || cb == NULL)
435@@ -498,8 +522,8 @@
436 *target_gen, &exchange);
437 if (status != U1DB_OK) { goto finish; }
438 for (i = 0; i < n_docs; ++i) {
439- status = u1db__sync_exchange_insert_doc_from_source(exchange, docs[i],
440- generations[i]);
441+ status = u1db__sync_exchange_insert_doc_from_source(
442+ exchange, docs[i], generations[i], trans_ids[i]);
443 if (status != U1DB_OK) { goto finish; }
444 }
445 status = u1db__sync_exchange_find_doc_ids_to_return(exchange);
446@@ -521,7 +545,7 @@
447 static int
448 st_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db,
449 int n_doc_ids, const char **doc_ids, int *generations,
450- int *target_gen, char **target_trans_id,
451+ const char **trans_ids, int *target_gen, char **target_trans_id,
452 void *context, u1db_doc_gen_callback cb)
453 {
454 int status;
455@@ -542,7 +566,7 @@
456 if (status != U1DB_OK) { goto finish; }
457 if (n_doc_ids > 0) {
458 status = get_and_insert_docs(source_db, exchange,
459- n_doc_ids, doc_ids, generations);
460+ n_doc_ids, doc_ids, generations, trans_ids);
461 if (status != U1DB_OK) { goto finish; }
462 }
463 status = u1db__sync_exchange_find_doc_ids_to_return(exchange);
464@@ -616,7 +640,7 @@
465 status = target->sync_exchange_doc_ids(target, db,
466 to_send_state.num_doc_ids,
467 (const char**)to_send_state.doc_ids_to_return,
468- to_send_state.gen_for_doc_ids,
469+ to_send_state.gen_for_doc_ids, to_send_state.trans_ids_for_doc_ids,
470 &target_gen_known_by_local, &target_trans_id_known_by_local,
471 &return_doc_state, return_doc_to_insert_from_target);
472 if (status != U1DB_OK) { goto finish; }
473@@ -665,5 +689,8 @@
474 if (to_send_state.gen_for_doc_ids != NULL) {
475 free(to_send_state.gen_for_doc_ids);
476 }
477+ if (to_send_state.trans_ids_for_doc_ids != NULL) {
478+ free(to_send_state.trans_ids_for_doc_ids);
479+ }
480 return status;
481 }
482
483=== modified file 'u1db/__init__.py'
484--- u1db/__init__.py 2012-06-19 17:44:24 +0000
485+++ u1db/__init__.py 2012-06-19 18:44:19 +0000
486@@ -557,10 +557,10 @@
487 their latest change in order from the oldest change to the
488 newest.
489
490- :param docs_by_generation: A list of [(Document, generation)]
491- pairs indicating documents which should be updated on
492- this replica paired with the generation of their
493- latest change.
494+ :param docs_by_generation: A list of [(Document, generation,
495+ transaction_id)] tuples indicating documents which should be
496+ updated on this replica paired with the generation and transaction
497+ id of their latest change.
498 :param source_replica_uid: The source replica's identifier
499 :param last_known_generation: The last generation that the source
500 replica knows about this
501
502=== modified file 'u1db/remote/http_app.py'
503--- u1db/remote/http_app.py 2012-05-31 12:22:03 +0000
504+++ u1db/remote/http_app.py 2012-06-19 18:44:19 +0000
505@@ -324,14 +324,14 @@
506 last_known_generation)
507
508 @http_method(content_as_args=True)
509- def post_stream_entry(self, id, rev, content, gen):
510+ def post_stream_entry(self, id, rev, content, gen, trans_id):
511 doc = Document(id, rev, content)
512- self.sync_exch.insert_doc_from_source(doc, gen)
513+ self.sync_exch.insert_doc_from_source(doc, gen, trans_id)
514
515 def post_end(self):
516- def send_doc(doc, gen):
517+ def send_doc(doc, gen, trans_id):
518 entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
519- gen=gen)
520+ gen=gen, trans_id=trans_id)
521 self.responder.stream_entry(entry)
522 new_gen = self.sync_exch.find_changes_to_return()
523 self.responder.content_type = 'application/x-u1db-sync-stream'
524
525=== modified file 'u1db/remote/http_target.py'
526--- u1db/remote/http_target.py 2012-05-31 12:22:03 +0000
527+++ u1db/remote/http_target.py 2012-06-19 18:44:19 +0000
528@@ -65,7 +65,7 @@
529 line, comma = utils.check_and_strip_comma(entry)
530 entry = simplejson.loads(line)
531 doc = Document(entry['id'], entry['rev'], entry['content'])
532- return_doc_cb(doc, entry['gen'])
533+ return_doc_cb(doc, entry['gen'], entry['trans_id'])
534 if parts[-1] != ']':
535 try:
536 partdic = simplejson.loads(parts[-1])
537@@ -98,9 +98,9 @@
538 comma = ''
539 size += prepare(last_known_generation=last_known_generation)
540 comma = ','
541- for doc, gen in docs_by_generations:
542+ for doc, gen, trans_id in docs_by_generations:
543 size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
544- gen=gen)
545+ gen=gen, trans_id=trans_id)
546 entries.append('\r\n]')
547 size += len(entries[-1])
548 self._conn.putheader('content-length', str(size))
549
550=== modified file 'u1db/sync.py'
551--- u1db/sync.py 2012-05-31 12:22:03 +0000
552+++ u1db/sync.py 2012-06-19 18:44:19 +0000
553@@ -40,7 +40,7 @@
554 self.target_replica_uid = None
555 self.num_inserted = 0
556
557- def _insert_doc_from_target(self, doc, replica_gen):
558+ def _insert_doc_from_target(self, doc, replica_gen, trans_id):
559 """Try to insert synced document from target.
560
561 Implements TAKE OTHER semantics: any document from the target
562@@ -54,7 +54,7 @@
563 # was effectively inserted.
564 state, _ = self.source._put_doc_if_newer(doc, save_conflict=True,
565 replica_uid=self.target_replica_uid, replica_gen=replica_gen,
566- replica_trans_id=None)
567+ replica_trans_id=trans_id)
568 if state == 'inserted':
569 self.num_inserted += 1
570 elif state == 'converged':
571@@ -110,7 +110,10 @@
572 # prepare to send all the changed docs
573 docs_to_send = self.source.get_docs(changed_doc_ids,
574 check_for_conflicts=False, include_deleted=True)
575- docs_by_generation = zip(docs_to_send, (gen for _, gen, _ in changes))
576+ # TODO: there must be a way to not iterate twice
577+ docs_by_generation = zip(
578+ docs_to_send, (gen for _, gen, _ in changes),
579+ (trans for _, _, trans in changes))
580
581 # exchange documents and try to insert the returned ones with
582 # the target, return target synced-up-to gen
583@@ -153,7 +156,7 @@
584 return
585 self._trace_hook(state)
586
587- def insert_doc_from_source(self, doc, source_gen):
588+ def insert_doc_from_source(self, doc, source_gen, trans_id):
589 """Try to insert synced document from source.
590
591 Conflicting documents are not inserted but will be sent over
592@@ -171,7 +174,7 @@
593 """
594 state, at_gen = self._db._put_doc_if_newer(doc, save_conflict=False,
595 replica_uid=self.source_replica_uid, replica_gen=source_gen,
596- replica_trans_id=None)
597+ replica_trans_id=trans_id)
598 if state == 'inserted':
599 self.seen_ids[doc.doc_id] = at_gen
600 elif state == 'converged':
601@@ -212,10 +215,10 @@
602 self.new_trans_id = trans_id
603 seen_ids = self.seen_ids
604 # changed docs that weren't superseded by or converged with
605- self.changes_to_return = [(doc_id, gen) for (doc_id, gen, _) in changes
606- # there was a subsequent update
607- if doc_id not in seen_ids or
608- seen_ids.get(doc_id) < gen]
609+ self.changes_to_return = [
610+ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
611+ # there was a subsequent update
612+ if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
613 return self.new_gen
614
615 def return_docs(self, return_doc_cb):
616@@ -224,21 +227,23 @@
617
618 The final step of a sync exchange.
619
620- :param: return_doc_cb(doc, gen): is a callback
621+ :param: return_doc_cb(doc, gen, trans_id): is a callback
622 used to return the documents with their last change generation
623 to the target replica.
624 :return: None
625 """
626 changes_to_return = self.changes_to_return
627 # return docs, including conflicts
628- changed_doc_ids = [doc_id for doc_id, _ in changes_to_return]
629+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
630 self._trace('before get_docs')
631 docs = self._db.get_docs(
632 changed_doc_ids, check_for_conflicts=False, include_deleted=True)
633
634- docs_by_gen = izip(docs, (gen for _, gen in changes_to_return))
635- for doc, gen in docs_by_gen:
636- return_doc_cb(doc, gen)
637+ docs_by_gen = izip(
638+ docs, (gen for _, gen, _ in changes_to_return),
639+ (trans_id for _, _, trans_id in changes_to_return))
640+ for doc, gen, trans_id in docs_by_gen:
641+ return_doc_cb(doc, gen, trans_id)
642 # for tests
643 self._db._last_exchange_log['return'] = {
644 'docs': [(d.doc_id, d.rev) for d in docs],
645@@ -260,8 +265,8 @@
646 if self._trace_hook:
647 sync_exch._set_trace_hook(self._trace_hook)
648 # 1st step: try to insert incoming docs and record progress
649- for doc, doc_gen in docs_by_generations:
650- sync_exch.insert_doc_from_source(doc, doc_gen)
651+ for doc, doc_gen, trans_id in docs_by_generations:
652+ sync_exch.insert_doc_from_source(doc, doc_gen, trans_id)
653 # 2nd step: find changed documents (including conflicts) to return
654 new_gen = sync_exch.find_changes_to_return()
655 # final step: return docs and record source replica sync point
656
657=== modified file 'u1db/tests/c_backend_wrapper.pyx'
658--- u1db/tests/c_backend_wrapper.pyx 2012-06-19 17:55:20 +0000
659+++ u1db/tests/c_backend_wrapper.pyx 2012-06-19 18:44:19 +0000
660@@ -61,7 +61,7 @@
661 ctypedef int (*u1db_key_callback)(void *context, int num_fields,
662 const_char_ptr *key)
663 ctypedef int (*u1db_doc_gen_callback)(void *context,
664- u1db_document *doc, int gen)
665+ u1db_document *doc, int gen, const_char_ptr trans_id)
666 ctypedef int (*u1db_trans_info_callback)(void *context,
667 const_char_ptr doc_id, int gen, const_char_ptr trans_id)
668
669@@ -170,6 +170,7 @@
670 int num_doc_ids
671 char **doc_ids_to_return
672 int *gen_for_doc_ids
673+ const_char_ptr *trans_ids_for_doc_ids
674
675 ctypedef int (*u1db__trace_callback)(void *context, const_char_ptr state)
676 ctypedef struct u1db_sync_target:
677@@ -182,12 +183,16 @@
678 int (*sync_exchange)(u1db_sync_target *st,
679 char *source_replica_uid, int n_docs,
680 u1db_document **docs, int *generations,
681- int *target_gen, char **target_trans_id,
682- void *context, u1db_doc_gen_callback cb) nogil
683+ const_char_ptr *trans_ids, int *target_gen,
684+ char **target_trans_id, void *context,
685+ u1db_doc_gen_callback cb) nogil
686 int (*sync_exchange_doc_ids)(u1db_sync_target *st,
687- u1database *source_db, int n_doc_ids, const_char_ptr *doc_ids,
688- int *generations, int *target_gen, char **target_trans_id,
689- void *context, u1db_doc_gen_callback cb) nogil
690+ u1database *source_db, int n_doc_ids,
691+ const_char_ptr *doc_ids, int *generations,
692+ const_char_ptr *trans_ids,
693+ int *target_gen, char **target_trans_id,
694+ void *context,
695+ u1db_doc_gen_callback cb) nogil
696 int (*get_sync_exchange)(u1db_sync_target *st,
697 char *source_replica_uid,
698 int last_known_source_gen,
699@@ -234,10 +239,12 @@
700 int *local_gen_before_sync) nogil
701
702 int u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se,
703- u1db_document *doc, int source_gen)
704+ u1db_document *doc, int source_gen, const_char_ptr trans_id)
705 int u1db__sync_exchange_find_doc_ids_to_return(u1db_sync_exchange *se)
706 int u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context,
707- int (*cb)(void *context, u1db_document *doc, int gen))
708+ int (*cb)(void *context,
709+ u1db_document *doc, int gen,
710+ const_char_ptr trans_id))
711 int u1db__create_http_sync_target(char *url, u1db_sync_target **target)
712 int u1db__create_oauth_http_sync_target(char *url,
713 char *consumer_key, char *consumer_secret,
714@@ -337,13 +344,13 @@
715
716
717 cdef int return_doc_cb_wrapper(void *context, u1db_document *doc,
718- int gen) with gil:
719+ int gen, const_char_ptr trans_id) with gil:
720 cdef CDocument pydoc
721 user_cb = <object>context
722 pydoc = CDocument()
723 pydoc._doc = doc
724 try:
725- user_cb(pydoc, gen)
726+ user_cb(pydoc, gen, trans_id)
727 except Exception, e:
728 # We suppress the exception here, because intermediating through the C
729 # layer gets a bit crazy
730@@ -616,11 +623,12 @@
731 self._check()
732 return self._exchange.target_gen
733
734- def insert_doc_from_source(self, CDocument doc, source_gen):
735+ def insert_doc_from_source(self, CDocument doc, source_gen,
736+ source_trans_id):
737 self._check()
738 handle_status("insert_doc_from_source",
739 u1db__sync_exchange_insert_doc_from_source(self._exchange,
740- doc._doc, source_gen))
741+ doc._doc, source_gen, source_trans_id))
742
743 def find_doc_ids_to_return(self):
744 self._check()
745@@ -652,8 +660,10 @@
746 if (self._exchange.num_doc_ids > 0
747 and self._exchange.doc_ids_to_return != NULL):
748 for i from 0 <= i < self._exchange.num_doc_ids:
749- res.append((self._exchange.doc_ids_to_return[i],
750- self._exchange.gen_for_doc_ids[i]))
751+ res.append(
752+ (self._exchange.doc_ids_to_return[i],
753+ self._exchange.gen_for_doc_ids[i],
754+ self._exchange.trans_ids_for_doc_ids[i]))
755 return res
756
757
758@@ -724,25 +734,32 @@
759 if generations == NULL:
760 free(<void *>doc_ids)
761 raise MemoryError
762+ trans_ids = <const_char_ptr*>calloc(num_doc_ids, sizeof(char *))
763+ if trans_ids == NULL:
764+ raise MemoryError
765 res_trans_id = ''
766 try:
767- for i, (doc_id, gen) in enumerate(doc_id_generations):
768+ for i, (doc_id, gen, trans_id) in enumerate(doc_id_generations):
769 doc_ids[i] = PyString_AsString(doc_id)
770 generations[i] = gen
771+ trans_ids[i] = trans_id
772 target_gen = last_known_generation
773 with nogil:
774 status = self._st.sync_exchange_doc_ids(self._st, sdb._db,
775- num_doc_ids, doc_ids, generations,
776+ num_doc_ids, doc_ids, generations, trans_ids,
777 &target_gen, &target_trans_id,
778 <void*>return_doc_cb, return_doc_cb_wrapper)
779 handle_status("sync_exchange_doc_ids", status)
780 finally:
781- free(<void *>doc_ids)
782- free(generations)
783+ if doc_ids != NULL:
784+ free(<void *>doc_ids)
785+ if generations != NULL:
786+ free(generations)
787+ if trans_ids != NULL:
788+ free(trans_ids)
789 if target_trans_id != NULL:
790 res_trans_id = target_trans_id
791 free(target_trans_id)
792-
793 return target_gen, res_trans_id
794
795 def sync_exchange(self, docs_by_generations, source_replica_uid,
796@@ -750,6 +767,7 @@
797 cdef CDocument cur_doc
798 cdef u1db_document **docs = NULL
799 cdef int *generations = NULL
800+ cdef const_char_ptr *trans_ids = NULL
801 cdef char *trans_id = NULL
802 cdef int i, count, status, target_gen
803
804@@ -764,15 +782,19 @@
805 generations = <int*>calloc(count, sizeof(int))
806 if generations == NULL:
807 raise MemoryError
808+ trans_ids = <const_char_ptr*>calloc(count, sizeof(char*))
809+ if trans_ids == NULL:
810+ raise MemoryError
811 for i from 0 <= i < count:
812 cur_doc = docs_by_generations[i][0]
813 generations[i] = docs_by_generations[i][1]
814+ trans_ids[i] = docs_by_generations[i][2]
815 docs[i] = cur_doc._doc
816 target_gen = last_known_generation
817 with nogil:
818 status = self._st.sync_exchange(self._st,
819 source_replica_uid, count,
820- docs, generations, &target_gen, &trans_id,
821+ docs, generations, trans_ids, &target_gen, &trans_id,
822 <void *>return_doc_cb, return_doc_cb_wrapper)
823 handle_status("sync_exchange", status)
824 finally:
825@@ -780,6 +802,8 @@
826 free(docs)
827 if generations != NULL:
828 free(generations)
829+ if trans_ids != NULL:
830+ free(trans_ids)
831 if trans_id != NULL:
832 res_trans_id = trans_id
833 free(trans_id)
834
835=== modified file 'u1db/tests/test_backends.py'
836--- u1db/tests/test_backends.py 2012-06-19 17:55:20 +0000
837+++ u1db/tests/test_backends.py 2012-06-19 18:44:19 +0000
838@@ -1464,7 +1464,7 @@
839 pass
840
841 doc_other = self.make_document(doc.doc_id, other_rev, new_content)
842- docs_by_gen = [(doc_other, 10)]
843+ docs_by_gen = [(doc_other, 10, 'T-sid')]
844 st.sync_exchange(
845 docs_by_gen, 'other-replica', last_known_generation=0,
846 return_doc_cb=ignore)
847
848=== modified file 'u1db/tests/test_c_backend.py'
849--- u1db/tests/test_c_backend.py 2012-06-12 14:57:44 +0000
850+++ u1db/tests/test_c_backend.py 2012-06-19 18:44:19 +0000
851@@ -287,10 +287,10 @@
852 doc = c_backend_wrapper.make_document('doc-id', 'replica:1',
853 tests.simple_doc)
854 self.assertEqual([], exc.get_seen_ids())
855- exc.insert_doc_from_source(doc, 10)
856+ exc.insert_doc_from_source(doc, 10, 'T-sid')
857 self.assertGetDoc(self.db, 'doc-id', 'replica:1', tests.simple_doc,
858 False)
859- self.assertEqual((10, None),
860+ self.assertEqual((10, 'T-sid'),
861 self.db._get_sync_gen_info('source-uid'))
862 self.assertEqual(['doc-id'], exc.get_seen_ids())
863
864@@ -301,7 +301,7 @@
865 tests.nested_doc)
866 self.assertEqual([], exc.get_seen_ids())
867 # The insert should be rejected and the doc_id not considered 'seen'
868- exc.insert_doc_from_source(doc2, 10)
869+ exc.insert_doc_from_source(doc2, 10, 'T-sid')
870 self.assertGetDoc(
871 self.db, doc.doc_id, doc.rev, tests.simple_doc, False)
872 self.assertEqual([], exc.get_seen_ids())
873@@ -311,7 +311,10 @@
874 exc = self.st._get_sync_exchange("source-uid", 0)
875 self.assertEqual(0, exc.target_gen)
876 exc.find_doc_ids_to_return()
877- self.assertEqual([(doc.doc_id, 1)], exc.get_doc_ids_to_return())
878+ doc_id = exc.get_doc_ids_to_return()[0]
879+ self.assertEqual(
880+ (doc.doc_id, 1), doc_id[:-1])
881+ self.assertTrue(doc_id[-1].startswith('T-'))
882 self.assertEqual(1, exc.target_gen)
883
884 def test_sync_exchange_find_doc_ids_not_including_recently_inserted(self):
885@@ -320,22 +323,23 @@
886 exc = self.st._get_sync_exchange("source-uid", 0)
887 doc3 = c_backend_wrapper.make_document(doc1.doc_id,
888 doc1.rev + "|zreplica:2", tests.simple_doc)
889- exc.insert_doc_from_source(doc3, 10)
890+ exc.insert_doc_from_source(doc3, 10, 'T-sid')
891 exc.find_doc_ids_to_return()
892- self.assertEqual([(doc2.doc_id, 2)], exc.get_doc_ids_to_return())
893+ self.assertEqual(
894+ (doc2.doc_id, 2), exc.get_doc_ids_to_return()[0][:-1])
895 self.assertEqual(3, exc.target_gen)
896
897 def test_sync_exchange_return_docs(self):
898 returned = []
899
900- def return_doc_cb(doc, gen):
901- returned.append((doc, gen))
902+ def return_doc_cb(doc, gen, trans_id):
903+ returned.append((doc, gen, trans_id))
904
905 doc1 = self.db.create_doc(tests.simple_doc)
906 exc = self.st._get_sync_exchange("source-uid", 0)
907 exc.find_doc_ids_to_return()
908 exc.return_docs(return_doc_cb)
909- self.assertEqual([(doc1, 1)], returned)
910+ self.assertEqual((doc1, 1), returned[0][:-1])
911
912 def test_sync_exchange_doc_ids(self):
913 doc1 = self.db.create_doc(tests.simple_doc, doc_id='doc-1')
914@@ -343,16 +347,17 @@
915 doc2 = db2.create_doc(tests.nested_doc, doc_id='doc-2')
916 returned = []
917
918- def return_doc_cb(doc, gen):
919- returned.append((doc, gen))
920- val = self.st.sync_exchange_doc_ids(db2, [(doc2.doc_id, 1)], 0,
921- return_doc_cb)
922+ def return_doc_cb(doc, gen, trans_id):
923+ returned.append((doc, gen, trans_id))
924+
925+ val = self.st.sync_exchange_doc_ids(
926+ db2, [(doc2.doc_id, 1, 'T-sid')], 0, return_doc_cb)
927 last_trans_id = self.db._get_transaction_log()[-1][1]
928 self.assertEqual(2, self.db._get_generation())
929 self.assertEqual((2, last_trans_id), val)
930 self.assertGetDoc(self.db, doc2.doc_id, doc2.rev, tests.nested_doc,
931 False)
932- self.assertEqual([(doc1, 1)], returned)
933+ self.assertEqual((doc1, 1), returned[0][:-1])
934
935
936 class TestCHTTPSyncTarget(BackendTests):
937
938=== modified file 'u1db/tests/test_http_app.py'
939--- u1db/tests/test_http_app.py 2012-05-31 12:22:03 +0000
940+++ u1db/tests/test_http_app.py 2012-06-19 18:44:19 +0000
941@@ -715,9 +715,9 @@
942 def test_sync_exchange_send(self):
943 entries = {
944 10: {'id': 'doc-here', 'rev': 'replica:1', 'content':
945- '{"value": "here"}', 'gen': 10},
946+ '{"value": "here"}', 'gen': 10, 'trans_id': 'T-sid'},
947 11: {'id': 'doc-here2', 'rev': 'replica:1', 'content':
948- '{"value": "here2"}', 'gen': 11}
949+ '{"value": "here2"}', 'gen': 11, 'trans_id': 'T-sed'}
950 }
951
952 gens = []
953@@ -793,12 +793,18 @@
954 self.assertEqual({'new_generation': 2,
955 'new_transaction_id': last_trans_id},
956 simplejson.loads(parts[1].rstrip(",")))
957- self.assertEqual({'content': '{"value": "there"}',
958- 'rev': doc.rev, 'id': doc.doc_id, 'gen': 1},
959- simplejson.loads(parts[2].rstrip(",")))
960- self.assertEqual({'content': '{"value": "there2"}',
961- 'rev': doc2.rev, 'id': doc2.doc_id, 'gen': 2},
962- simplejson.loads(parts[3].rstrip(",")))
963+ part2 = simplejson.loads(parts[2].rstrip(","))
964+ self.assertTrue(part2['trans_id'].startswith('T-'))
965+ self.assertEqual('{"value": "there"}', part2['content'])
966+ self.assertEqual(doc.rev, part2['rev'])
967+ self.assertEqual(doc.doc_id, part2['id'])
968+ self.assertEqual(1, part2['gen'])
969+ part3 = simplejson.loads(parts[3].rstrip(","))
970+ self.assertTrue(part3['trans_id'].startswith('T-'))
971+ self.assertEqual('{"value": "there2"}', part3['content'])
972+ self.assertEqual(doc2.rev, part3['rev'])
973+ self.assertEqual(doc2.doc_id, part3['id'])
974+ self.assertEqual(2, part3['gen'])
975 self.assertEqual(']', parts[4])
976
977 def test_sync_exchange_error_in_stream(self):
978
979=== modified file 'u1db/tests/test_remote_sync_target.py'
980--- u1db/tests/test_remote_sync_target.py 2012-05-31 12:22:03 +0000
981+++ u1db/tests/test_remote_sync_target.py 2012-06-19 18:44:19 +0000
982@@ -81,8 +81,9 @@
983 self.assertRaises(errors.BrokenSyncStream,
984 tgt._parse_sync_stream,
985 '[\r\n{},\r\n{"id": "i", "rev": "r", '
986- '"content": "c", "gen": 3},\r\n]',
987- lambda doc, gen: None)
988+ '"content": "c", "gen": 3, "trans_id": "T-sid"}'
989+ ',\r\n]',
990+ lambda doc, gen, trans_id: None)
991
992 def test_error_in_stream(self):
993 tgt = http_target.HTTPSyncTarget("http://foo/foo")
994@@ -102,6 +103,7 @@
995
996
997 def http_server_def():
998+
999 def make_server(host_port, handler, state):
1000 application = http_app.HTTPApp(state)
1001 srv = simple_server.WSGIServer(host_port, handler)
1002@@ -111,9 +113,11 @@
1003 # handler
1004 # )
1005 return srv
1006+
1007 class req_handler(simple_server.WSGIRequestHandler):
1008 def log_request(*args):
1009 pass # suppress
1010+
1011 #rh = httpserver.WSGIHandler
1012 return make_server, req_handler, "shutdown", "http"
1013
1014@@ -123,6 +127,7 @@
1015
1016
1017 def oauth_http_server_def():
1018+
1019 def make_server(host_port, handler, state):
1020 app = http_app.HTTPApp(state)
1021 application = oauth_middleware.OAuthMiddleware(app, None)
1022@@ -132,9 +137,11 @@
1023 application.base_url = "http://%s:%s" % srv.server_address
1024 srv.set_app(application)
1025 return srv
1026+
1027 class req_handler(simple_server.WSGIRequestHandler):
1028 def log_request(*args):
1029 pass # suppress
1030+
1031 return make_server, req_handler, "shutdown", "http"
1032
1033
1034@@ -181,26 +188,31 @@
1035 db = self.request_state._create_database('test')
1036 remote_target = self.getSyncTarget('test')
1037 other_docs = []
1038+
1039 def receive_doc(doc):
1040 other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
1041+
1042 doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
1043 new_gen, trans_id = remote_target.sync_exchange(
1044- [(doc, 10)],
1045+ [(doc, 10, 'T-sid')],
1046 'replica', last_known_generation=0,
1047 return_doc_cb=receive_doc)
1048 self.assertEqual(1, new_gen)
1049- self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}',
1050- False)
1051+ self.assertGetDoc(
1052+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
1053
1054 def test_sync_exchange_send_failure_and_retry_scenario(self):
1055 self.startServer()
1056+
1057 def blackhole_getstderr(inst):
1058 return cStringIO.StringIO()
1059+
1060 self.patch(self.server.RequestHandlerClass, 'get_stderr',
1061 blackhole_getstderr)
1062 db = self.request_state._create_database('test')
1063 _put_doc_if_newer = db._put_doc_if_newer
1064 trigger_ids = ['doc-here2']
1065+
1066 def bomb_put_doc_if_newer(doc, save_conflict,
1067 replica_uid=None, replica_gen=None,
1068 replica_trans_id=None):
1069@@ -212,33 +224,37 @@
1070 self.patch(db, '_put_doc_if_newer', bomb_put_doc_if_newer)
1071 remote_target = self.getSyncTarget('test')
1072 other_changes = []
1073- def receive_doc(doc, gen):
1074- other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen))
1075+
1076+ def receive_doc(doc, gen, trans_id):
1077+ other_changes.append(
1078+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
1079+
1080 doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
1081 doc2 = self.make_document('doc-here2', 'replica:1',
1082 '{"value": "here2"}')
1083 self.assertRaises(errors.HTTPError, remote_target.sync_exchange,
1084- [(doc1, 10),
1085- (doc2, 11)
1086+ [(doc1, 10, 'T-sid'),
1087+ (doc2, 11, 'T-sud')
1088 ], 'replica', last_known_generation=0,
1089 return_doc_cb=receive_doc)
1090 self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}',
1091 False)
1092- self.assertEqual((10, None), db._get_sync_gen_info('replica'))
1093+ self.assertEqual((10, 'T-sid'), db._get_sync_gen_info('replica'))
1094 self.assertEqual([], other_changes)
1095 # retry
1096 trigger_ids = []
1097 new_gen, trans_id = remote_target.sync_exchange(
1098- [(doc2, 11)
1099+ [(doc2, 11, 'T-sud')
1100 ], 'replica', last_known_generation=0,
1101 return_doc_cb=receive_doc)
1102 self.assertGetDoc(db, 'doc-here2', 'replica:1', '{"value": "here2"}',
1103 False)
1104- self.assertEqual((11, None), db._get_sync_gen_info('replica'))
1105+ self.assertEqual((11, 'T-sud'), db._get_sync_gen_info('replica'))
1106 self.assertEqual(2, new_gen)
1107 # bounced back to us
1108- self.assertEqual([('doc-here', 'replica:1', '{"value": "here"}', 1)],
1109- other_changes)
1110+ self.assertEqual(
1111+ ('doc-here', 'replica:1', '{"value": "here"}', 1),
1112+ other_changes[0][:-1])
1113
1114 def test_sync_exchange_in_stream_error(self):
1115 self.startServer()
1116@@ -261,14 +277,16 @@
1117 remote_target = self.getSyncTarget('test')
1118 other_changes = []
1119
1120- def receive_doc(doc, gen):
1121- other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen))
1122+ def receive_doc(doc, gen, trans_id):
1123+ other_changes.append(
1124+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
1125
1126 self.assertRaises(errors.Unavailable, remote_target.sync_exchange,
1127 [], 'replica', last_known_generation=0,
1128 return_doc_cb=receive_doc)
1129- self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}', 1)],
1130- other_changes)
1131+ self.assertEqual(
1132+ (doc.doc_id, doc.rev, '{"value": "there"}', 1),
1133+ other_changes[0][:-1])
1134
1135 def test_sync_exchange_receive(self):
1136 self.startServer()
1137@@ -276,14 +294,18 @@
1138 doc = db.create_doc('{"value": "there"}')
1139 remote_target = self.getSyncTarget('test')
1140 other_changes = []
1141- def receive_doc(doc, gen):
1142- other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen))
1143+
1144+ def receive_doc(doc, gen, trans_id):
1145+ other_changes.append(
1146+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
1147+
1148 new_gen, trans_id = remote_target.sync_exchange(
1149 [], 'replica', last_known_generation=0,
1150 return_doc_cb=receive_doc)
1151 self.assertEqual(1, new_gen)
1152- self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}', 1)],
1153- other_changes)
1154+ self.assertEqual(
1155+ (doc.doc_id, doc.rev, '{"value": "there"}', 1),
1156+ other_changes[0][:-1])
1157
1158
1159 load_tests = tests.load_with_scenarios
1160
1161=== modified file 'u1db/tests/test_sync.py'
1162--- u1db/tests/test_sync.py 2012-06-08 07:39:35 +0000
1163+++ u1db/tests/test_sync.py 2012-06-19 18:44:19 +0000
1164@@ -128,8 +128,9 @@
1165 del self.db
1166 super(DatabaseSyncTargetTests, self).tearDown()
1167
1168- def receive_doc(self, doc, gen):
1169- self.other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen))
1170+ def receive_doc(self, doc, gen, trans_id):
1171+ self.other_changes.append(
1172+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
1173
1174 def set_trace_hook(self, callback):
1175 try:
1176@@ -157,7 +158,8 @@
1177
1178 def test_sync_exchange(self):
1179 docs_by_gen = [
1180- (self.make_document('doc-id', 'replica:1', simple_doc), 10)]
1181+ (self.make_document('doc-id', 'replica:1', simple_doc), 10,
1182+ 'T-sid')]
1183 new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
1184 last_known_generation=0,
1185 return_doc_cb=self.receive_doc)
1186@@ -172,7 +174,7 @@
1187 doc = self.db.create_doc('{}')
1188 edit_rev = 'replica:1|' + doc.rev
1189 docs_by_gen = [
1190- (self.make_document(doc.doc_id, edit_rev, None), 10)]
1191+ (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')]
1192 new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
1193 last_known_generation=0,
1194 return_doc_cb=self.receive_doc)
1195@@ -186,8 +188,9 @@
1196
1197 def test_sync_exchange_push_many(self):
1198 docs_by_gen = [
1199- (self.make_document('doc-id', 'replica:1', simple_doc), 10),
1200- (self.make_document('doc-id2', 'replica:1', nested_doc), 11)]
1201+ (self.make_document('doc-id', 'replica:1', simple_doc), 10, 'T-1'),
1202+ (self.make_document('doc-id2', 'replica:1', nested_doc), 11,
1203+ 'T-2')]
1204 new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
1205 last_known_generation=0,
1206 return_doc_cb=self.receive_doc)
1207@@ -204,13 +207,15 @@
1208 self.assertTransactionLog([doc.doc_id], self.db)
1209 new_doc = '{"key": "altval"}'
1210 docs_by_gen = [
1211- (self.make_document(doc.doc_id, 'replica:1', new_doc), 10)]
1212- new_gen, _ = self.st.sync_exchange(docs_by_gen, 'replica',
1213- last_known_generation=0,
1214- return_doc_cb=self.receive_doc)
1215+ (self.make_document(doc.doc_id, 'replica:1', new_doc), 10,
1216+ 'T-sid')]
1217+ new_gen, _ = self.st.sync_exchange(
1218+ docs_by_gen, 'replica', last_known_generation=0,
1219+ return_doc_cb=self.receive_doc)
1220 self.assertTransactionLog([doc.doc_id], self.db)
1221- self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1)], 1),
1222- (self.other_changes, new_gen))
1223+ self.assertEqual(
1224+ (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1])
1225+ self.assertEqual(1, new_gen)
1226 if self.whitebox:
1227 self.assertEqual(self.db._last_exchange_log['return'],
1228 {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
1229@@ -219,7 +224,7 @@
1230 doc = self.db.create_doc(simple_doc)
1231 self.assertTransactionLog([doc.doc_id], self.db)
1232 docs_by_gen = [
1233- (self.make_document(doc.doc_id, doc.rev, simple_doc), 10)]
1234+ (self.make_document(doc.doc_id, doc.rev, simple_doc), 10, 'T-sid')]
1235 new_gen, _ = self.st.sync_exchange(docs_by_gen, 'replica',
1236 last_known_generation=1,
1237 return_doc_cb=self.receive_doc)
1238@@ -233,8 +238,9 @@
1239 last_known_generation=0,
1240 return_doc_cb=self.receive_doc)
1241 self.assertTransactionLog([doc.doc_id], self.db)
1242- self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1)], 1),
1243- (self.other_changes, new_gen))
1244+ self.assertEqual(
1245+ (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1])
1246+ self.assertEqual(1, new_gen)
1247 if self.whitebox:
1248 self.assertEqual(self.db._last_exchange_log['return'],
1249 {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
1250@@ -247,8 +253,9 @@
1251 last_known_generation=0,
1252 return_doc_cb=self.receive_doc)
1253 self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
1254- self.assertEqual(([(doc.doc_id, doc.rev, None, 2)], 2),
1255- (self.other_changes, new_gen))
1256+ self.assertEqual(
1257+ (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1])
1258+ self.assertEqual(2, new_gen)
1259 if self.whitebox:
1260 self.assertEqual(self.db._last_exchange_log['return'],
1261 {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]})
1262@@ -261,9 +268,11 @@
1263 last_known_generation=0,
1264 return_doc_cb=self.receive_doc)
1265 self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
1266- self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1),
1267- (doc2.doc_id, doc2.rev, nested_doc, 2)], 2),
1268- (self.other_changes, new_gen))
1269+ self.assertEqual(2, new_gen)
1270+ self.assertEqual(
1271+ [(doc.doc_id, doc.rev, simple_doc, 1),
1272+ (doc2.doc_id, doc2.rev, nested_doc, 2)],
1273+ [c[:-1] for c in self.other_changes])
1274 if self.whitebox:
1275 self.assertEqual(
1276 self.db._last_exchange_log['return'],
1277@@ -275,7 +284,8 @@
1278 self.assertTransactionLog([doc.doc_id], self.db)
1279 new_doc = '{"key": "altval"}'
1280 docs_by_gen = [
1281- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)]
1282+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
1283+ 'T-sid')]
1284 new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
1285 last_known_generation=0,
1286 return_doc_cb=self.receive_doc)
1287@@ -298,11 +308,13 @@
1288 self.assertTransactionLog([doc.doc_id], self.db)
1289 new_doc = '{"key": "altval"}'
1290 docs_by_gen = [
1291- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)]
1292- new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
1293- last_known_generation=0,
1294- return_doc_cb=self.receive_doc)
1295- self.assertEqual((expected, 3), (self.other_changes, new_gen))
1296+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
1297+ 'T-sid')]
1298+ new_gen, _ = self.st.sync_exchange(
1299+ docs_by_gen, 'other-replica', last_known_generation=0,
1300+ return_doc_cb=self.receive_doc)
1301+ self.assertEqual(expected, [c[:-1] for c in self.other_changes])
1302+ self.assertEqual(3, new_gen)
1303
1304 def test_sync_exchange_with_concurrent_updates(self):
1305 def after_whatschanged_cb(state):
1306@@ -314,7 +326,8 @@
1307 self.assertTransactionLog([doc.doc_id], self.db)
1308 new_doc = '{"key": "altval"}'
1309 docs_by_gen = [
1310- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)]
1311+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
1312+ 'T-sid')]
1313 new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
1314 last_known_generation=0,
1315 return_doc_cb=self.receive_doc)
1316@@ -323,8 +336,9 @@
1317 def test_sync_exchange_converged_handling(self):
1318 doc = self.db.create_doc(simple_doc)
1319 docs_by_gen = [
1320- (self.make_document('new', 'other:1', '{}'), 4),
1321- (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5)]
1322+ (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'),
1323+ (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5,
1324+ 'T-bar')]
1325 new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
1326 last_known_generation=0,
1327 return_doc_cb=self.receive_doc)
1328@@ -352,8 +366,9 @@
1329 self.skipTest("sync_exchange_doc_ids not implemented")
1330 db2 = self.create_database('test2')
1331 doc = db2.create_doc(simple_doc)
1332- new_gen, trans_id = sync_exchange_doc_ids(db2, [(doc.doc_id, 10)], 0,
1333- return_doc_cb=self.receive_doc)
1334+ new_gen, trans_id = sync_exchange_doc_ids(
1335+ db2, [(doc.doc_id, 10, 'T-sid')], 0,
1336+ return_doc_cb=self.receive_doc)
1337 self.assertGetDoc(self.db, doc.doc_id, doc.rev, simple_doc, False)
1338 self.assertTransactionLog([doc.doc_id], self.db)
1339 last_trans_id = self.getLastTransId(self.db)

Subscribers

People subscribed via source and target branches