Merge lp:~thisfred/u1db/txid_in_sync_exchange into lp:u1db

Proposed by Eric Casteleijn
Status: Superseded
Proposed branch: lp:~thisfred/u1db/txid_in_sync_exchange
Merge into: lp:u1db
Diff against target: 1338 lines (+304/-182)
14 files modified
include/u1db/u1db.h (+2/-1)
include/u1db/u1db_internal.h (+10/-5)
src/u1db_http_sync_target.c (+33/-20)
src/u1db_sync_target.c (+60/-33)
u1db/__init__.py (+3/-4)
u1db/remote/http_app.py (+4/-4)
u1db/remote/http_target.py (+3/-3)
u1db/sync.py (+21/-16)
u1db/tests/c_backend_wrapper.pyx (+44/-20)
u1db/tests/test_backends.py (+1/-1)
u1db/tests/test_c_backend.py (+19/-14)
u1db/tests/test_http_app.py (+14/-8)
u1db/tests/test_remote_sync_target.py (+44/-22)
u1db/tests/test_sync.py (+46/-31)
To merge this branch: bzr merge lp:~thisfred/u1db/txid_in_sync_exchange
Reviewer Review Type Date Requested Status
Ubuntu One hackers Pending
Review via email: mp+110436@code.launchpad.net

This proposal has been superseded by a proposal from 2012-06-15.

Commit message

Added the transaction ids to the sync_exchange

Description of the change

Added the transaction ids to the sync_exchange

To post a comment you must log in.
Revision history for this message
Eric Casteleijn (thisfred) wrote :

Since we get a lot of uuid txids, I strip them in a lot of the tests. This means the tests will still fail if the txids are not there, but perhaps not in the most obvious way, and we make no assumptions about the contents of the txids, but I think that's probably a good thing.

Also I seem to have misplaced some 3.5K of memory, which I will hunt down and fix tomorrow.

329. By Eric Casteleijn

merged trunk, fixed conflicts

330. By Eric Casteleijn

found and fixed memory leaks

Revision history for this message
Eric Casteleijn (thisfred) wrote :

Memory issue fixed.

331. By Eric Casteleijn

fixed incorrect documentation

332. By Eric Casteleijn

merged trunk

333. By Eric Casteleijn

merged trunk

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'include/u1db/u1db.h'
2--- include/u1db/u1db.h 2012-06-12 15:44:43 +0000
3+++ include/u1db/u1db.h 2012-06-15 00:40:24 +0000
4@@ -42,7 +42,8 @@
5 typedef int (*u1db_doc_callback)(void *context, u1db_document *doc);
6 typedef int (*u1db_key_callback)(void *context, int num_fields,
7 const char **key);
8-typedef int (*u1db_doc_gen_callback)(void *context, u1db_document *doc, int gen);
9+typedef int (*u1db_doc_gen_callback)(void *context, u1db_document *doc,
10+ int gen, const char *trans_id);
11 typedef int (*u1db_doc_id_gen_callback)(void *context, const char *doc_id, int gen);
12 typedef int (*u1db_trans_info_callback)(void *context, const char *doc_id,
13 int gen, const char *trans_id);
14
15=== modified file 'include/u1db/u1db_internal.h'
16--- include/u1db/u1db_internal.h 2012-06-12 15:44:43 +0000
17+++ include/u1db/u1db_internal.h 2012-06-15 00:40:24 +0000
18@@ -98,6 +98,8 @@
19 * want to send to the sync target
20 * @param generations An array of generations. Each generation
21 * corresponds to a doc_id.
22+ * @param trans_ids An array of transaction ids. Each transaction id
23+ * corresponds to a doc_id.
24 * @param target_gen (IN/OUT) Seed this with the generation of the
25 * target that source_db has last seen, it will then
26 * be filled with the final generation of the target
27@@ -114,7 +116,7 @@
28 */
29 int (*sync_exchange_doc_ids)(u1db_sync_target *st, u1database *source_db,
30 int n_doc_ids, const char **doc_ids, int *generations,
31- int *target_gen, char **target_trans_id,
32+ const char **trans_ids, int *target_gen, char **target_trans_id,
33 void *context, u1db_doc_gen_callback cb);
34
35 /**
36@@ -123,7 +125,8 @@
37 int (*sync_exchange)(u1db_sync_target *st,
38 const char *source_replica_uid, int n_docs,
39 u1db_document **docs, int *generations,
40- int *target_gen, char **target_trans_id,
41+ const char **trans_ids, int *target_gen,
42+ char **target_trans_id,
43 void *context, u1db_doc_gen_callback cb);
44 /**
45 * Create a sync_exchange state object.
46@@ -170,6 +173,7 @@
47 struct lh_table *seen_ids;
48 int num_doc_ids;
49 int *gen_for_doc_ids;
50+ const char **trans_ids_for_doc_ids;
51 char **doc_ids_to_return;
52 void *trace_context;
53 u1db__trace_callback trace_cb;
54@@ -448,7 +452,7 @@
55 * We have received a doc from source, record it.
56 */
57 int u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se,
58- u1db_document *doc, int source_gen);
59+ u1db_document *doc, int source_gen, const char *trans_id);
60
61 /**
62 * We are done receiving docs, find what we want to return.
63@@ -466,8 +470,9 @@
64 * u1db_free_doc().
65 */
66 int u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context,
67- int (*cb)(void *context, u1db_document *doc, int gen));
68-
69+ int (*cb)(void *context,
70+ u1db_document *doc, int gen,
71+ const char *trans_id));
72
73 /**
74 * Create a sync target pointing at a given URL.
75
76=== modified file 'src/u1db_http_sync_target.c'
77--- src/u1db_http_sync_target.c 2012-05-31 12:22:03 +0000
78+++ src/u1db_http_sync_target.c 2012-06-15 00:40:24 +0000
79@@ -45,14 +45,19 @@
80 int source_gen,
81 u1db_sync_exchange **exchange);
82 static int st_http_sync_exchange(u1db_sync_target *st,
83- const char *source_replica_uid,
84- int n_docs, u1db_document **docs,
85- int *generations, int *target_gen, char **target_trans_id,
86- void *context, u1db_doc_gen_callback cb);
87+ const char *source_replica_uid, int n_docs,
88+ u1db_document **docs, int *generations,
89+ const char **trans_ids, int *target_gen,
90+ char **target_trans_id, void *context,
91+ u1db_doc_gen_callback cb);
92 static int st_http_sync_exchange_doc_ids(u1db_sync_target *st,
93- u1database *source_db, int n_doc_ids, const char **doc_ids,
94- int *generations, int *target_gen, char **target_trans_id,
95- void *context, u1db_doc_gen_callback cb);
96+ u1database *source_db, int n_doc_ids,
97+ const char **doc_ids,
98+ int *generations,
99+ const char **trans_ids,
100+ int *target_gen,
101+ char **target_trans_id, void *context,
102+ u1db_doc_gen_callback cb);
103 static void st_http_finalize_sync_exchange(u1db_sync_target *st,
104 u1db_sync_exchange **exchange);
105 static int st_http_set_trace_hook(u1db_sync_target *st,
106@@ -410,8 +415,8 @@
107 goto finish;
108 }
109 if (req.body_buffer == NULL) {
110- status = U1DB_INVALID_HTTP_RESPONSE;
111- goto finish;
112+ status = U1DB_INVALID_HTTP_RESPONSE;
113+ goto finish;
114 }
115 json = json_tokener_parse(req.body_buffer);
116 if (json == NULL) {
117@@ -663,7 +668,7 @@
118
119
120 static int
121-doc_to_tempfile(u1db_document *doc, int gen, FILE *fd)
122+doc_to_tempfile(u1db_document *doc, int gen, const char *trans_id, FILE *fd)
123 {
124 int status = U1DB_OK;
125 json_object *json = NULL;
126@@ -678,6 +683,7 @@
127 json_object_object_add(
128 json, "content", doc->json?json_object_new_string(doc->json):NULL);
129 json_object_object_add(json, "gen", json_object_new_int(gen));
130+ json_object_object_add(json, "trans_id", json_object_new_string(trans_id));
131 fputs(json_object_to_json_string(json), fd);
132 finish:
133 if (json != NULL) {
134@@ -791,6 +797,7 @@
135 const char *doc_id, *content, *rev;
136 const char *tmp = NULL;
137 int gen;
138+ const char *trans_id = NULL;
139 u1db_document *doc;
140
141 json = json_tokener_parse(response);
142@@ -837,12 +844,14 @@
143 content = json_object_get_string(attr);
144 attr = json_object_object_get(obj, "gen");
145 gen = json_object_get_int(attr);
146+ attr = json_object_object_get(obj, "trans_id");
147+ trans_id = json_object_get_string(attr);
148 doc = u1db__allocate_document(doc_id, rev, content, 0);
149 if (doc == NULL) {
150 status = U1DB_NOMEM;
151 goto finish;
152 }
153- status = cb(context, doc, gen);
154+ status = cb(context, doc, gen, trans_id);
155 if (status != U1DB_OK) { goto finish; }
156 }
157 finish:
158@@ -875,11 +884,10 @@
159 }
160
161 static int
162-st_http_sync_exchange(u1db_sync_target *st,
163- const char *source_replica_uid,
164- int n_docs, u1db_document **docs,
165- int *generations, int *target_gen, char **target_trans_id,
166- void *context,
167+st_http_sync_exchange(u1db_sync_target *st, const char *source_replica_uid,
168+ int n_docs, u1db_document **docs, int *generations,
169+ const char **trans_ids, int *target_gen,
170+ char **target_trans_id, void *context,
171 u1db_doc_gen_callback cb)
172 {
173 int status, i;
174@@ -898,7 +906,8 @@
175 status = init_temp_file(tmpname, &temp_fd, *target_gen);
176 if (status != U1DB_OK) { goto finish; }
177 for (i = 0; i < n_docs; ++i) {
178- status = doc_to_tempfile(docs[i], generations[i], temp_fd);
179+ status = doc_to_tempfile(
180+ docs[i], generations[i], trans_ids[i], temp_fd);
181 if (status != U1DB_OK) { goto finish; }
182 }
183 status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req);
184@@ -915,6 +924,7 @@
185 int offset;
186 int num;
187 int *generations;
188+ const char **trans_ids;
189 FILE *temp_fd;
190 };
191
192@@ -930,6 +940,7 @@
193 status = U1DB_INTERNAL_ERROR;
194 } else {
195 status = doc_to_tempfile(doc, state->generations[state->offset],
196+ state->trans_ids[state->offset],
197 state->temp_fd);
198 }
199 u1db_free_doc(&doc);
200@@ -943,9 +954,10 @@
201
202 static int
203 st_http_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db,
204- int n_doc_ids, const char **doc_ids, int *generations,
205- int *target_gen, char **target_trans_id,
206- void *context, u1db_doc_gen_callback cb)
207+ int n_doc_ids, const char **doc_ids,
208+ int *generations, const char **trans_ids,
209+ int *target_gen, char **target_trans_id,
210+ void *context, u1db_doc_gen_callback cb)
211 {
212 int status;
213 FILE *temp_fd = NULL;
214@@ -968,6 +980,7 @@
215 if (status != U1DB_OK) { goto finish; }
216 state.num = n_doc_ids;
217 state.generations = generations;
218+ state.trans_ids = trans_ids;
219 state.temp_fd = temp_fd;
220 status = u1db_get_docs(source_db, n_doc_ids, doc_ids, 0, 1,
221 &state, get_docs_to_tempfile);
222
223=== modified file 'src/u1db_sync_target.c'
224--- src/u1db_sync_target.c 2012-06-08 10:06:05 +0000
225+++ src/u1db_sync_target.c 2012-06-15 00:40:24 +0000
226@@ -32,13 +32,15 @@
227 static int st_sync_exchange(u1db_sync_target *st,
228 const char *source_replica_uid, int n_docs,
229 u1db_document **docs, int *generations,
230- int *target_gen, char **target_trans_id,
231- void *context, u1db_doc_gen_callback cb);
232+ const char **trans_ids, int *target_gen,
233+ char **target_trans_id, void *context,
234+ u1db_doc_gen_callback cb);
235 static int st_sync_exchange_doc_ids(u1db_sync_target *st,
236- u1database *source_db,
237- int n_doc_ids, const char **doc_ids, int *generations,
238- int *target_gen, char **target_trans_id,
239- void *context, u1db_doc_gen_callback cb);
240+ u1database *source_db, int n_doc_ids,
241+ const char **doc_ids, int *generations,
242+ const char **trans_ids, int *target_gen,
243+ char **target_trans_id, void *context,
244+ u1db_doc_gen_callback cb);
245 static int st_get_sync_exchange(u1db_sync_target *st,
246 const char *source_replica_uid,
247 int source_gen,
248@@ -57,6 +59,7 @@
249 void *user_context;
250 u1db_doc_gen_callback user_cb;
251 int *gen_for_doc_ids;
252+ const char **trans_ids_for_doc_ids;
253 int free_when_done;
254 };
255
256@@ -203,6 +206,10 @@
257 free((*exchange)->gen_for_doc_ids);
258 (*exchange)->gen_for_doc_ids = NULL;
259 }
260+ if ((*exchange)->trans_ids_for_doc_ids != NULL) {
261+ free((*exchange)->trans_ids_for_doc_ids);
262+ (*exchange)->trans_ids_for_doc_ids = NULL;
263+ }
264 if ((*exchange)->target_trans_id != NULL) {
265 free((*exchange)->target_trans_id);
266 (*exchange)->target_trans_id = NULL;
267@@ -275,7 +282,7 @@
268
269 int
270 u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se,
271- u1db_document *doc, int source_gen)
272+ u1db_document *doc, int source_gen, const char *trans_id)
273 {
274 int status = U1DB_OK;
275 int insert_state;
276@@ -285,7 +292,8 @@
277 }
278 // fprintf(stderr, "Inserting %s from source\n", doc->doc_id);
279 status = u1db__put_doc_if_newer(se->db, doc, 0, se->source_replica_uid,
280- source_gen, NULL, &insert_state, &at_gen);
281+ source_gen, trans_id, &insert_state,
282+ &at_gen);
283 if (insert_state == U1DB_INSERTED || insert_state == U1DB_CONVERGED) {
284 lh_table_insert(se->seen_ids, strdup(doc->doc_id),
285 (void *)(intptr_t)at_gen);
286@@ -305,6 +313,7 @@
287 struct lh_table *exclude_ids;
288 char **doc_ids_to_return;
289 int *gen_for_doc_ids;
290+ const char **trans_ids_for_doc_ids;
291 };
292
293 // Callback for whats_changed to map the callback into the sync_exchange
294@@ -327,25 +336,29 @@
295 if (state->num_doc_ids >= state->max_doc_ids) {
296 state->max_doc_ids = (state->max_doc_ids * 2) + 10;
297 if (state->doc_ids_to_return == NULL) {
298- state->doc_ids_to_return = (char **)calloc(state->max_doc_ids,
299- sizeof(char*));
300- state->gen_for_doc_ids = (int *)calloc(state->max_doc_ids,
301- sizeof(int));
302+ state->doc_ids_to_return = (char **)calloc(
303+ state->max_doc_ids, sizeof(char*));
304+ state->gen_for_doc_ids = (int *)calloc(
305+ state->max_doc_ids, sizeof(int));
306+ state->trans_ids_for_doc_ids = (const char **)calloc(
307+ state->max_doc_ids, sizeof(char*));
308 } else {
309 state->doc_ids_to_return = (char **)realloc(
310- state->doc_ids_to_return,
311- state->max_doc_ids * sizeof(char*));
312- state->gen_for_doc_ids = (int *)realloc(state->gen_for_doc_ids,
313- state->max_doc_ids * sizeof(int));
314+ state->doc_ids_to_return, state->max_doc_ids * sizeof(char*));
315+ state->gen_for_doc_ids = (int *)realloc(
316+ state->gen_for_doc_ids, state->max_doc_ids * sizeof(int));
317+ state->trans_ids_for_doc_ids = (const char **)realloc(
318+ state->gen_for_doc_ids, state->max_doc_ids * sizeof(char*));
319 }
320- if (state->doc_ids_to_return == NULL
321- || state->gen_for_doc_ids == NULL)
322+ if (state->doc_ids_to_return == NULL || state->gen_for_doc_ids == NULL
323+ || state->trans_ids_for_doc_ids == NULL)
324 {
325 return U1DB_NOMEM;
326 }
327 }
328 state->doc_ids_to_return[state->num_doc_ids] = strdup(doc_id);
329 state->gen_for_doc_ids[state->num_doc_ids] = gen;
330+ state->trans_ids_for_doc_ids[state->num_doc_ids] = trans_id;
331 state->num_doc_ids++;
332 return 0;
333 }
334@@ -370,6 +383,7 @@
335 if (status != U1DB_OK) {
336 free(state.doc_ids_to_return);
337 free(state.gen_for_doc_ids);
338+ free(state.trans_ids_for_doc_ids);
339 goto finish;
340 }
341 if (se->trace_cb) {
342@@ -379,6 +393,7 @@
343 se->num_doc_ids = state.num_doc_ids;
344 se->doc_ids_to_return = state.doc_ids_to_return;
345 se->gen_for_doc_ids = state.gen_for_doc_ids;
346+ se->trans_ids_for_doc_ids = state.trans_ids_for_doc_ids;
347 finish:
348 if (target_trans_id != NULL) {
349 free(target_trans_id);
350@@ -397,8 +412,9 @@
351 // Note: using doc_offset in this way assumes that u1db_get_docs will
352 // always return them in exactly the order we requested. This is
353 // probably true, though.
354- status = ctx->user_cb(ctx->user_context, doc,
355- ctx->gen_for_doc_ids[ctx->doc_offset]);
356+ status = ctx->user_cb(
357+ ctx->user_context, doc, ctx->gen_for_doc_ids[ctx->doc_offset],
358+ ctx->trans_ids_for_doc_ids[ctx->doc_offset]);
359 ctx->doc_offset++;
360 if (ctx->free_when_done) {
361 u1db_free_doc(&doc);
362@@ -409,7 +425,8 @@
363
364 int
365 u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context,
366- int (*cb)(void *context, u1db_document *doc, int gen))
367+ int (*cb)(void *context, u1db_document *doc,
368+ int gen, const char *trans_id))
369 {
370 int status = U1DB_OK;
371 struct _get_docs_to_doc_gen_context state = {0};
372@@ -420,6 +437,7 @@
373 state.user_cb = cb;
374 state.doc_offset = 0;
375 state.gen_for_doc_ids = se->gen_for_doc_ids;
376+ state.trans_ids_for_doc_ids = se->trans_ids_for_doc_ids;
377 if (se->trace_cb) {
378 status = se->trace_cb(se->trace_context, "before get_docs");
379 if (status != U1DB_OK) { goto finish; }
380@@ -440,14 +458,16 @@
381 };
382
383 static int
384-return_doc_to_insert_from_target(void *context, u1db_document *doc, int gen)
385+return_doc_to_insert_from_target(void *context, u1db_document *doc, int gen,
386+ const char *trans_id)
387 {
388 int status, insert_state;
389 struct _return_doc_state *state;
390 state = (struct _return_doc_state *)context;
391
392- status = u1db__put_doc_if_newer(state->db, doc, 1, state->target_uid, gen,
393- NULL, &insert_state, NULL);
394+ status = u1db__put_doc_if_newer(
395+ state->db, doc, 1, state->target_uid, gen, trans_id, &insert_state,
396+ NULL);
397 u1db_free_doc(&doc);
398 if (status == U1DB_OK) {
399 if (insert_state == U1DB_INSERTED || insert_state == U1DB_CONFLICTED) {
400@@ -462,7 +482,8 @@
401
402 static int
403 get_and_insert_docs(u1database *source_db, u1db_sync_exchange *se,
404- int n_doc_ids, const char **doc_ids, int *generations)
405+ int n_doc_ids, const char **doc_ids, int *generations,
406+ const char **trans_ids)
407 {
408 struct _get_docs_to_doc_gen_context get_doc_state = {0};
409
410@@ -473,6 +494,7 @@
411 get_doc_state.user_cb =
412 (u1db_doc_gen_callback)u1db__sync_exchange_insert_doc_from_source;
413 get_doc_state.gen_for_doc_ids = generations;
414+ get_doc_state.trans_ids_for_doc_ids = trans_ids;
415 return u1db_get_docs(source_db, n_doc_ids, doc_ids,
416 0, 1, &get_doc_state, get_docs_to_gen_docs);
417 }
418@@ -480,11 +502,13 @@
419
420 static int
421 st_sync_exchange(u1db_sync_target *st, const char *source_replica_uid,
422- int n_docs, u1db_document **docs,
423- int *generations, int *target_gen, char **target_trans_id,
424- void *context, u1db_doc_gen_callback cb)
425+ int n_docs, u1db_document **docs, int *generations,
426+ const char **trans_ids, int *target_gen,
427+ char **target_trans_id, void *context,
428+ u1db_doc_gen_callback cb)
429 {
430 int status, i;
431+
432 u1db_sync_exchange *exchange = NULL;
433 if (st == NULL || generations == NULL || target_gen == NULL
434 || target_trans_id == NULL || cb == NULL)
435@@ -498,8 +522,8 @@
436 *target_gen, &exchange);
437 if (status != U1DB_OK) { goto finish; }
438 for (i = 0; i < n_docs; ++i) {
439- status = u1db__sync_exchange_insert_doc_from_source(exchange, docs[i],
440- generations[i]);
441+ status = u1db__sync_exchange_insert_doc_from_source(
442+ exchange, docs[i], generations[i], trans_ids[i]);
443 if (status != U1DB_OK) { goto finish; }
444 }
445 status = u1db__sync_exchange_find_doc_ids_to_return(exchange);
446@@ -521,7 +545,7 @@
447 static int
448 st_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db,
449 int n_doc_ids, const char **doc_ids, int *generations,
450- int *target_gen, char **target_trans_id,
451+ const char **trans_ids, int *target_gen, char **target_trans_id,
452 void *context, u1db_doc_gen_callback cb)
453 {
454 int status;
455@@ -542,7 +566,7 @@
456 if (status != U1DB_OK) { goto finish; }
457 if (n_doc_ids > 0) {
458 status = get_and_insert_docs(source_db, exchange,
459- n_doc_ids, doc_ids, generations);
460+ n_doc_ids, doc_ids, generations, trans_ids);
461 if (status != U1DB_OK) { goto finish; }
462 }
463 status = u1db__sync_exchange_find_doc_ids_to_return(exchange);
464@@ -616,7 +640,7 @@
465 status = target->sync_exchange_doc_ids(target, db,
466 to_send_state.num_doc_ids,
467 (const char**)to_send_state.doc_ids_to_return,
468- to_send_state.gen_for_doc_ids,
469+ to_send_state.gen_for_doc_ids, to_send_state.trans_ids_for_doc_ids,
470 &target_gen_known_by_local, &target_trans_id_known_by_local,
471 &return_doc_state, return_doc_to_insert_from_target);
472 if (status != U1DB_OK) { goto finish; }
473@@ -665,5 +689,8 @@
474 if (to_send_state.gen_for_doc_ids != NULL) {
475 free(to_send_state.gen_for_doc_ids);
476 }
477+ if (to_send_state.trans_ids_for_doc_ids != NULL) {
478+ free(to_send_state.trans_ids_for_doc_ids);
479+ }
480 return status;
481 }
482
483=== modified file 'u1db/__init__.py'
484--- u1db/__init__.py 2012-06-12 15:44:43 +0000
485+++ u1db/__init__.py 2012-06-15 00:40:24 +0000
486@@ -557,10 +557,9 @@
487 their latest change in order from the oldest change to the
488 newest.
489
490- :param docs_by_generation: A list of [(Document, generation)]
491- pairs indicating documents which should be updated on
492- this replica paired with the generation of their
493- latest change.
494+ :param docs_by_generation: A list of [(Document, generation,
495+ transaction_id)] pairs indicating documents which should be updated
496+ on this replica paired with the generation of their latest change.
497 :param source_replica_uid: The source replica's identifier
498 :param last_known_generation: The last generation that the source
499 replica knows about this
500
501=== modified file 'u1db/remote/http_app.py'
502--- u1db/remote/http_app.py 2012-05-31 12:22:03 +0000
503+++ u1db/remote/http_app.py 2012-06-15 00:40:24 +0000
504@@ -324,14 +324,14 @@
505 last_known_generation)
506
507 @http_method(content_as_args=True)
508- def post_stream_entry(self, id, rev, content, gen):
509+ def post_stream_entry(self, id, rev, content, gen, trans_id):
510 doc = Document(id, rev, content)
511- self.sync_exch.insert_doc_from_source(doc, gen)
512+ self.sync_exch.insert_doc_from_source(doc, gen, trans_id)
513
514 def post_end(self):
515- def send_doc(doc, gen):
516+ def send_doc(doc, gen, trans_id):
517 entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
518- gen=gen)
519+ gen=gen, trans_id=trans_id)
520 self.responder.stream_entry(entry)
521 new_gen = self.sync_exch.find_changes_to_return()
522 self.responder.content_type = 'application/x-u1db-sync-stream'
523
524=== modified file 'u1db/remote/http_target.py'
525--- u1db/remote/http_target.py 2012-05-31 12:22:03 +0000
526+++ u1db/remote/http_target.py 2012-06-15 00:40:24 +0000
527@@ -65,7 +65,7 @@
528 line, comma = utils.check_and_strip_comma(entry)
529 entry = simplejson.loads(line)
530 doc = Document(entry['id'], entry['rev'], entry['content'])
531- return_doc_cb(doc, entry['gen'])
532+ return_doc_cb(doc, entry['gen'], entry['trans_id'])
533 if parts[-1] != ']':
534 try:
535 partdic = simplejson.loads(parts[-1])
536@@ -98,9 +98,9 @@
537 comma = ''
538 size += prepare(last_known_generation=last_known_generation)
539 comma = ','
540- for doc, gen in docs_by_generations:
541+ for doc, gen, trans_id in docs_by_generations:
542 size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
543- gen=gen)
544+ gen=gen, trans_id=trans_id)
545 entries.append('\r\n]')
546 size += len(entries[-1])
547 self._conn.putheader('content-length', str(size))
548
549=== modified file 'u1db/sync.py'
550--- u1db/sync.py 2012-05-31 12:22:03 +0000
551+++ u1db/sync.py 2012-06-15 00:40:24 +0000
552@@ -40,7 +40,7 @@
553 self.target_replica_uid = None
554 self.num_inserted = 0
555
556- def _insert_doc_from_target(self, doc, replica_gen):
557+ def _insert_doc_from_target(self, doc, replica_gen, trans_id):
558 """Try to insert synced document from target.
559
560 Implements TAKE OTHER semantics: any document from the target
561@@ -54,7 +54,7 @@
562 # was effectively inserted.
563 state, _ = self.source._put_doc_if_newer(doc, save_conflict=True,
564 replica_uid=self.target_replica_uid, replica_gen=replica_gen,
565- replica_trans_id=None)
566+ replica_trans_id=trans_id)
567 if state == 'inserted':
568 self.num_inserted += 1
569 elif state == 'converged':
570@@ -110,7 +110,10 @@
571 # prepare to send all the changed docs
572 docs_to_send = self.source.get_docs(changed_doc_ids,
573 check_for_conflicts=False, include_deleted=True)
574- docs_by_generation = zip(docs_to_send, (gen for _, gen, _ in changes))
575+ # TODO: there must be a way to not iterate twice
576+ docs_by_generation = zip(
577+ docs_to_send, (gen for _, gen, _ in changes),
578+ (trans for _, _, trans in changes))
579
580 # exchange documents and try to insert the returned ones with
581 # the target, return target synced-up-to gen
582@@ -153,7 +156,7 @@
583 return
584 self._trace_hook(state)
585
586- def insert_doc_from_source(self, doc, source_gen):
587+ def insert_doc_from_source(self, doc, source_gen, trans_id):
588 """Try to insert synced document from source.
589
590 Conflicting documents are not inserted but will be sent over
591@@ -171,7 +174,7 @@
592 """
593 state, at_gen = self._db._put_doc_if_newer(doc, save_conflict=False,
594 replica_uid=self.source_replica_uid, replica_gen=source_gen,
595- replica_trans_id=None)
596+ replica_trans_id=trans_id)
597 if state == 'inserted':
598 self.seen_ids[doc.doc_id] = at_gen
599 elif state == 'converged':
600@@ -212,10 +215,10 @@
601 self.new_trans_id = trans_id
602 seen_ids = self.seen_ids
603 # changed docs that weren't superseded by or converged with
604- self.changes_to_return = [(doc_id, gen) for (doc_id, gen, _) in changes
605- # there was a subsequent update
606- if doc_id not in seen_ids or
607- seen_ids.get(doc_id) < gen]
608+ self.changes_to_return = [
609+ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
610+ # there was a subsequent update
611+ if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
612 return self.new_gen
613
614 def return_docs(self, return_doc_cb):
615@@ -224,21 +227,23 @@
616
617 The final step of a sync exchange.
618
619- :param: return_doc_cb(doc, gen): is a callback
620+ :param: return_doc_cb(doc, gen, trans_id): is a callback
621 used to return the documents with their last change generation
622 to the target replica.
623 :return: None
624 """
625 changes_to_return = self.changes_to_return
626 # return docs, including conflicts
627- changed_doc_ids = [doc_id for doc_id, _ in changes_to_return]
628+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
629 self._trace('before get_docs')
630 docs = self._db.get_docs(
631 changed_doc_ids, check_for_conflicts=False, include_deleted=True)
632
633- docs_by_gen = izip(docs, (gen for _, gen in changes_to_return))
634- for doc, gen in docs_by_gen:
635- return_doc_cb(doc, gen)
636+ docs_by_gen = izip(
637+ docs, (gen for _, gen, _ in changes_to_return),
638+ (trans_id for _, _, trans_id in changes_to_return))
639+ for doc, gen, trans_id in docs_by_gen:
640+ return_doc_cb(doc, gen, trans_id)
641 # for tests
642 self._db._last_exchange_log['return'] = {
643 'docs': [(d.doc_id, d.rev) for d in docs],
644@@ -260,8 +265,8 @@
645 if self._trace_hook:
646 sync_exch._set_trace_hook(self._trace_hook)
647 # 1st step: try to insert incoming docs and record progress
648- for doc, doc_gen in docs_by_generations:
649- sync_exch.insert_doc_from_source(doc, doc_gen)
650+ for doc, doc_gen, trans_id in docs_by_generations:
651+ sync_exch.insert_doc_from_source(doc, doc_gen, trans_id)
652 # 2nd step: find changed documents (including conflicts) to return
653 new_gen = sync_exch.find_changes_to_return()
654 # final step: return docs and record source replica sync point
655
656=== modified file 'u1db/tests/c_backend_wrapper.pyx'
657--- u1db/tests/c_backend_wrapper.pyx 2012-06-13 11:15:57 +0000
658+++ u1db/tests/c_backend_wrapper.pyx 2012-06-15 00:40:24 +0000
659@@ -61,7 +61,7 @@
660 ctypedef int (*u1db_key_callback)(void *context, int num_fields,
661 const_char_ptr *key)
662 ctypedef int (*u1db_doc_gen_callback)(void *context,
663- u1db_document *doc, int gen)
664+ u1db_document *doc, int gen, const_char_ptr trans_id)
665 ctypedef int (*u1db_trans_info_callback)(void *context,
666 const_char_ptr doc_id, int gen, const_char_ptr trans_id)
667
668@@ -164,6 +164,7 @@
669 int num_doc_ids
670 char **doc_ids_to_return
671 int *gen_for_doc_ids
672+ const_char_ptr *trans_ids_for_doc_ids
673
674 ctypedef int (*u1db__trace_callback)(void *context, const_char_ptr state)
675 ctypedef struct u1db_sync_target:
676@@ -176,12 +177,16 @@
677 int (*sync_exchange)(u1db_sync_target *st,
678 char *source_replica_uid, int n_docs,
679 u1db_document **docs, int *generations,
680- int *target_gen, char **target_trans_id,
681- void *context, u1db_doc_gen_callback cb) nogil
682+ const_char_ptr *trans_ids, int *target_gen,
683+ char **target_trans_id, void *context,
684+ u1db_doc_gen_callback cb) nogil
685 int (*sync_exchange_doc_ids)(u1db_sync_target *st,
686- u1database *source_db, int n_doc_ids, const_char_ptr *doc_ids,
687- int *generations, int *target_gen, char **target_trans_id,
688- void *context, u1db_doc_gen_callback cb) nogil
689+ u1database *source_db, int n_doc_ids,
690+ const_char_ptr *doc_ids, int *generations,
691+ const_char_ptr *trans_ids,
692+ int *target_gen, char **target_trans_id,
693+ void *context,
694+ u1db_doc_gen_callback cb) nogil
695 int (*get_sync_exchange)(u1db_sync_target *st,
696 char *source_replica_uid,
697 int last_known_source_gen,
698@@ -228,10 +233,12 @@
699 int *local_gen_before_sync) nogil
700
701 int u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se,
702- u1db_document *doc, int source_gen)
703+ u1db_document *doc, int source_gen, const_char_ptr trans_id)
704 int u1db__sync_exchange_find_doc_ids_to_return(u1db_sync_exchange *se)
705 int u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context,
706- int (*cb)(void *context, u1db_document *doc, int gen))
707+ int (*cb)(void *context,
708+ u1db_document *doc, int gen,
709+ const_char_ptr trans_id))
710 int u1db__create_http_sync_target(char *url, u1db_sync_target **target)
711 int u1db__create_oauth_http_sync_target(char *url,
712 char *consumer_key, char *consumer_secret,
713@@ -331,13 +338,13 @@
714
715
716 cdef int return_doc_cb_wrapper(void *context, u1db_document *doc,
717- int gen) with gil:
718+ int gen, const_char_ptr trans_id) with gil:
719 cdef CDocument pydoc
720 user_cb = <object>context
721 pydoc = CDocument()
722 pydoc._doc = doc
723 try:
724- user_cb(pydoc, gen)
725+ user_cb(pydoc, gen, trans_id)
726 except Exception, e:
727 # We suppress the exception here, because intermediating through the C
728 # layer gets a bit crazy
729@@ -606,11 +613,12 @@
730 self._check()
731 return self._exchange.target_gen
732
733- def insert_doc_from_source(self, CDocument doc, source_gen):
734+ def insert_doc_from_source(self, CDocument doc, source_gen,
735+ source_trans_id):
736 self._check()
737 handle_status("insert_doc_from_source",
738 u1db__sync_exchange_insert_doc_from_source(self._exchange,
739- doc._doc, source_gen))
740+ doc._doc, source_gen, source_trans_id))
741
742 def find_doc_ids_to_return(self):
743 self._check()
744@@ -642,8 +650,10 @@
745 if (self._exchange.num_doc_ids > 0
746 and self._exchange.doc_ids_to_return != NULL):
747 for i from 0 <= i < self._exchange.num_doc_ids:
748- res.append((self._exchange.doc_ids_to_return[i],
749- self._exchange.gen_for_doc_ids[i]))
750+ res.append(
751+ (self._exchange.doc_ids_to_return[i],
752+ self._exchange.gen_for_doc_ids[i],
753+ self._exchange.trans_ids_for_doc_ids[i]))
754 return res
755
756
757@@ -714,25 +724,32 @@
758 if generations == NULL:
759 free(<void *>doc_ids)
760 raise MemoryError
761+ trans_ids = <const_char_ptr*>calloc(num_doc_ids, sizeof(char *))
762+ if trans_ids == NULL:
763+ raise MemoryError
764 res_trans_id = ''
765 try:
766- for i, (doc_id, gen) in enumerate(doc_id_generations):
767+ for i, (doc_id, gen, trans_id) in enumerate(doc_id_generations):
768 doc_ids[i] = PyString_AsString(doc_id)
769 generations[i] = gen
770+ trans_ids[i] = trans_id
771 target_gen = last_known_generation
772 with nogil:
773 status = self._st.sync_exchange_doc_ids(self._st, sdb._db,
774- num_doc_ids, doc_ids, generations,
775+ num_doc_ids, doc_ids, generations, trans_ids,
776 &target_gen, &target_trans_id,
777 <void*>return_doc_cb, return_doc_cb_wrapper)
778 handle_status("sync_exchange_doc_ids", status)
779 finally:
780- free(<void *>doc_ids)
781- free(generations)
782+ if doc_ids != NULL:
783+ free(<void *>doc_ids)
784+ if generations != NULL:
785+ free(generations)
786+ if trans_ids != NULL:
787+ free(trans_ids)
788 if target_trans_id != NULL:
789 res_trans_id = target_trans_id
790 free(target_trans_id)
791-
792 return target_gen, res_trans_id
793
794 def sync_exchange(self, docs_by_generations, source_replica_uid,
795@@ -740,6 +757,7 @@
796 cdef CDocument cur_doc
797 cdef u1db_document **docs = NULL
798 cdef int *generations = NULL
799+ cdef const_char_ptr *trans_ids = NULL
800 cdef char *trans_id = NULL
801 cdef int i, count, status, target_gen
802
803@@ -754,15 +772,19 @@
804 generations = <int*>calloc(count, sizeof(int))
805 if generations == NULL:
806 raise MemoryError
807+ trans_ids = <const_char_ptr*>calloc(count, sizeof(char*))
808+ if trans_ids == NULL:
809+ raise MemoryError
810 for i from 0 <= i < count:
811 cur_doc = docs_by_generations[i][0]
812 generations[i] = docs_by_generations[i][1]
813+ trans_ids[i] = docs_by_generations[i][2]
814 docs[i] = cur_doc._doc
815 target_gen = last_known_generation
816 with nogil:
817 status = self._st.sync_exchange(self._st,
818 source_replica_uid, count,
819- docs, generations, &target_gen, &trans_id,
820+ docs, generations, trans_ids, &target_gen, &trans_id,
821 <void *>return_doc_cb, return_doc_cb_wrapper)
822 handle_status("sync_exchange", status)
823 finally:
824@@ -770,6 +792,8 @@
825 free(docs)
826 if generations != NULL:
827 free(generations)
828+ if trans_ids != NULL:
829+ free(trans_ids)
830 if trans_id != NULL:
831 res_trans_id = trans_id
832 free(trans_id)
833
834=== modified file 'u1db/tests/test_backends.py'
835--- u1db/tests/test_backends.py 2012-06-12 15:44:43 +0000
836+++ u1db/tests/test_backends.py 2012-06-15 00:40:24 +0000
837@@ -1376,7 +1376,7 @@
838 pass
839
840 doc_other = self.make_document(doc.doc_id, other_rev, new_content)
841- docs_by_gen = [(doc_other, 10)]
842+ docs_by_gen = [(doc_other, 10, 'T-sid')]
843 st.sync_exchange(
844 docs_by_gen, 'other-replica', last_known_generation=0,
845 return_doc_cb=ignore)
846
847=== modified file 'u1db/tests/test_c_backend.py'
848--- u1db/tests/test_c_backend.py 2012-06-12 14:57:44 +0000
849+++ u1db/tests/test_c_backend.py 2012-06-15 00:40:24 +0000
850@@ -287,10 +287,10 @@
851 doc = c_backend_wrapper.make_document('doc-id', 'replica:1',
852 tests.simple_doc)
853 self.assertEqual([], exc.get_seen_ids())
854- exc.insert_doc_from_source(doc, 10)
855+ exc.insert_doc_from_source(doc, 10, 'T-sid')
856 self.assertGetDoc(self.db, 'doc-id', 'replica:1', tests.simple_doc,
857 False)
858- self.assertEqual((10, None),
859+ self.assertEqual((10, 'T-sid'),
860 self.db._get_sync_gen_info('source-uid'))
861 self.assertEqual(['doc-id'], exc.get_seen_ids())
862
863@@ -301,7 +301,7 @@
864 tests.nested_doc)
865 self.assertEqual([], exc.get_seen_ids())
866 # The insert should be rejected and the doc_id not considered 'seen'
867- exc.insert_doc_from_source(doc2, 10)
868+ exc.insert_doc_from_source(doc2, 10, 'T-sid')
869 self.assertGetDoc(
870 self.db, doc.doc_id, doc.rev, tests.simple_doc, False)
871 self.assertEqual([], exc.get_seen_ids())
872@@ -311,7 +311,10 @@
873 exc = self.st._get_sync_exchange("source-uid", 0)
874 self.assertEqual(0, exc.target_gen)
875 exc.find_doc_ids_to_return()
876- self.assertEqual([(doc.doc_id, 1)], exc.get_doc_ids_to_return())
877+ doc_id = exc.get_doc_ids_to_return()[0]
878+ self.assertEqual(
879+ (doc.doc_id, 1), doc_id[:-1])
880+ self.assertTrue(doc_id[-1].startswith('T-'))
881 self.assertEqual(1, exc.target_gen)
882
883 def test_sync_exchange_find_doc_ids_not_including_recently_inserted(self):
884@@ -320,22 +323,23 @@
885 exc = self.st._get_sync_exchange("source-uid", 0)
886 doc3 = c_backend_wrapper.make_document(doc1.doc_id,
887 doc1.rev + "|zreplica:2", tests.simple_doc)
888- exc.insert_doc_from_source(doc3, 10)
889+ exc.insert_doc_from_source(doc3, 10, 'T-sid')
890 exc.find_doc_ids_to_return()
891- self.assertEqual([(doc2.doc_id, 2)], exc.get_doc_ids_to_return())
892+ self.assertEqual(
893+ (doc2.doc_id, 2), exc.get_doc_ids_to_return()[0][:-1])
894 self.assertEqual(3, exc.target_gen)
895
896 def test_sync_exchange_return_docs(self):
897 returned = []
898
899- def return_doc_cb(doc, gen):
900- returned.append((doc, gen))
901+ def return_doc_cb(doc, gen, trans_id):
902+ returned.append((doc, gen, trans_id))
903
904 doc1 = self.db.create_doc(tests.simple_doc)
905 exc = self.st._get_sync_exchange("source-uid", 0)
906 exc.find_doc_ids_to_return()
907 exc.return_docs(return_doc_cb)
908- self.assertEqual([(doc1, 1)], returned)
909+ self.assertEqual((doc1, 1), returned[0][:-1])
910
911 def test_sync_exchange_doc_ids(self):
912 doc1 = self.db.create_doc(tests.simple_doc, doc_id='doc-1')
913@@ -343,16 +347,17 @@
914 doc2 = db2.create_doc(tests.nested_doc, doc_id='doc-2')
915 returned = []
916
917- def return_doc_cb(doc, gen):
918- returned.append((doc, gen))
919- val = self.st.sync_exchange_doc_ids(db2, [(doc2.doc_id, 1)], 0,
920- return_doc_cb)
921+ def return_doc_cb(doc, gen, trans_id):
922+ returned.append((doc, gen, trans_id))
923+
924+ val = self.st.sync_exchange_doc_ids(
925+ db2, [(doc2.doc_id, 1, 'T-sid')], 0, return_doc_cb)
926 last_trans_id = self.db._get_transaction_log()[-1][1]
927 self.assertEqual(2, self.db._get_generation())
928 self.assertEqual((2, last_trans_id), val)
929 self.assertGetDoc(self.db, doc2.doc_id, doc2.rev, tests.nested_doc,
930 False)
931- self.assertEqual([(doc1, 1)], returned)
932+ self.assertEqual((doc1, 1), returned[0][:-1])
933
934
935 class TestCHTTPSyncTarget(BackendTests):
936
937=== modified file 'u1db/tests/test_http_app.py'
938--- u1db/tests/test_http_app.py 2012-05-31 12:22:03 +0000
939+++ u1db/tests/test_http_app.py 2012-06-15 00:40:24 +0000
940@@ -715,9 +715,9 @@
941 def test_sync_exchange_send(self):
942 entries = {
943 10: {'id': 'doc-here', 'rev': 'replica:1', 'content':
944- '{"value": "here"}', 'gen': 10},
945+ '{"value": "here"}', 'gen': 10, 'trans_id': 'T-sid'},
946 11: {'id': 'doc-here2', 'rev': 'replica:1', 'content':
947- '{"value": "here2"}', 'gen': 11}
948+ '{"value": "here2"}', 'gen': 11, 'trans_id': 'T-sed'}
949 }
950
951 gens = []
952@@ -793,12 +793,18 @@
953 self.assertEqual({'new_generation': 2,
954 'new_transaction_id': last_trans_id},
955 simplejson.loads(parts[1].rstrip(",")))
956- self.assertEqual({'content': '{"value": "there"}',
957- 'rev': doc.rev, 'id': doc.doc_id, 'gen': 1},
958- simplejson.loads(parts[2].rstrip(",")))
959- self.assertEqual({'content': '{"value": "there2"}',
960- 'rev': doc2.rev, 'id': doc2.doc_id, 'gen': 2},
961- simplejson.loads(parts[3].rstrip(",")))
962+ part2 = simplejson.loads(parts[2].rstrip(","))
963+ self.assertTrue(part2['trans_id'].startswith('T-'))
964+ self.assertEqual('{"value": "there"}', part2['content'])
965+ self.assertEqual(doc.rev, part2['rev'])
966+ self.assertEqual(doc.doc_id, part2['id'])
967+ self.assertEqual(1, part2['gen'])
968+ part3 = simplejson.loads(parts[3].rstrip(","))
969+ self.assertTrue(part3['trans_id'].startswith('T-'))
970+ self.assertEqual('{"value": "there2"}', part3['content'])
971+ self.assertEqual(doc2.rev, part3['rev'])
972+ self.assertEqual(doc2.doc_id, part3['id'])
973+ self.assertEqual(2, part3['gen'])
974 self.assertEqual(']', parts[4])
975
976 def test_sync_exchange_error_in_stream(self):
977
978=== modified file 'u1db/tests/test_remote_sync_target.py'
979--- u1db/tests/test_remote_sync_target.py 2012-05-31 12:22:03 +0000
980+++ u1db/tests/test_remote_sync_target.py 2012-06-15 00:40:24 +0000
981@@ -81,8 +81,9 @@
982 self.assertRaises(errors.BrokenSyncStream,
983 tgt._parse_sync_stream,
984 '[\r\n{},\r\n{"id": "i", "rev": "r", '
985- '"content": "c", "gen": 3},\r\n]',
986- lambda doc, gen: None)
987+ '"content": "c", "gen": 3, "trans_id": "T-sid"}'
988+ ',\r\n]',
989+ lambda doc, gen, trans_id: None)
990
991 def test_error_in_stream(self):
992 tgt = http_target.HTTPSyncTarget("http://foo/foo")
993@@ -102,6 +103,7 @@
994
995
996 def http_server_def():
997+
998 def make_server(host_port, handler, state):
999 application = http_app.HTTPApp(state)
1000 srv = simple_server.WSGIServer(host_port, handler)
1001@@ -111,9 +113,11 @@
1002 # handler
1003 # )
1004 return srv
1005+
1006 class req_handler(simple_server.WSGIRequestHandler):
1007 def log_request(*args):
1008 pass # suppress
1009+
1010 #rh = httpserver.WSGIHandler
1011 return make_server, req_handler, "shutdown", "http"
1012
1013@@ -123,6 +127,7 @@
1014
1015
1016 def oauth_http_server_def():
1017+
1018 def make_server(host_port, handler, state):
1019 app = http_app.HTTPApp(state)
1020 application = oauth_middleware.OAuthMiddleware(app, None)
1021@@ -132,9 +137,11 @@
1022 application.base_url = "http://%s:%s" % srv.server_address
1023 srv.set_app(application)
1024 return srv
1025+
1026 class req_handler(simple_server.WSGIRequestHandler):
1027 def log_request(*args):
1028 pass # suppress
1029+
1030 return make_server, req_handler, "shutdown", "http"
1031
1032
1033@@ -181,26 +188,31 @@
1034 db = self.request_state._create_database('test')
1035 remote_target = self.getSyncTarget('test')
1036 other_docs = []
1037+
1038 def receive_doc(doc):
1039 other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
1040+
1041 doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
1042 new_gen, trans_id = remote_target.sync_exchange(
1043- [(doc, 10)],
1044+ [(doc, 10, 'T-sid')],
1045 'replica', last_known_generation=0,
1046 return_doc_cb=receive_doc)
1047 self.assertEqual(1, new_gen)
1048- self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}',
1049- False)
1050+ self.assertGetDoc(
1051+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
1052
1053 def test_sync_exchange_send_failure_and_retry_scenario(self):
1054 self.startServer()
1055+
1056 def blackhole_getstderr(inst):
1057 return cStringIO.StringIO()
1058+
1059 self.patch(self.server.RequestHandlerClass, 'get_stderr',
1060 blackhole_getstderr)
1061 db = self.request_state._create_database('test')
1062 _put_doc_if_newer = db._put_doc_if_newer
1063 trigger_ids = ['doc-here2']
1064+
1065 def bomb_put_doc_if_newer(doc, save_conflict,
1066 replica_uid=None, replica_gen=None,
1067 replica_trans_id=None):
1068@@ -212,33 +224,37 @@
1069 self.patch(db, '_put_doc_if_newer', bomb_put_doc_if_newer)
1070 remote_target = self.getSyncTarget('test')
1071 other_changes = []
1072- def receive_doc(doc, gen):
1073- other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen))
1074+
1075+ def receive_doc(doc, gen, trans_id):
1076+ other_changes.append(
1077+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
1078+
1079 doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
1080 doc2 = self.make_document('doc-here2', 'replica:1',
1081 '{"value": "here2"}')
1082 self.assertRaises(errors.HTTPError, remote_target.sync_exchange,
1083- [(doc1, 10),
1084- (doc2, 11)
1085+ [(doc1, 10, 'T-sid'),
1086+ (doc2, 11, 'T-sud')
1087 ], 'replica', last_known_generation=0,
1088 return_doc_cb=receive_doc)
1089 self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}',
1090 False)
1091- self.assertEqual((10, None), db._get_sync_gen_info('replica'))
1092+ self.assertEqual((10, 'T-sid'), db._get_sync_gen_info('replica'))
1093 self.assertEqual([], other_changes)
1094 # retry
1095 trigger_ids = []
1096 new_gen, trans_id = remote_target.sync_exchange(
1097- [(doc2, 11)
1098+ [(doc2, 11, 'T-sud')
1099 ], 'replica', last_known_generation=0,
1100 return_doc_cb=receive_doc)
1101 self.assertGetDoc(db, 'doc-here2', 'replica:1', '{"value": "here2"}',
1102 False)
1103- self.assertEqual((11, None), db._get_sync_gen_info('replica'))
1104+ self.assertEqual((11, 'T-sud'), db._get_sync_gen_info('replica'))
1105 self.assertEqual(2, new_gen)
1106 # bounced back to us
1107- self.assertEqual([('doc-here', 'replica:1', '{"value": "here"}', 1)],
1108- other_changes)
1109+ self.assertEqual(
1110+ ('doc-here', 'replica:1', '{"value": "here"}', 1),
1111+ other_changes[0][:-1])
1112
1113 def test_sync_exchange_in_stream_error(self):
1114 self.startServer()
1115@@ -261,14 +277,16 @@
1116 remote_target = self.getSyncTarget('test')
1117 other_changes = []
1118
1119- def receive_doc(doc, gen):
1120- other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen))
1121+ def receive_doc(doc, gen, trans_id):
1122+ other_changes.append(
1123+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
1124
1125 self.assertRaises(errors.Unavailable, remote_target.sync_exchange,
1126 [], 'replica', last_known_generation=0,
1127 return_doc_cb=receive_doc)
1128- self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}', 1)],
1129- other_changes)
1130+ self.assertEqual(
1131+ (doc.doc_id, doc.rev, '{"value": "there"}', 1),
1132+ other_changes[0][:-1])
1133
1134 def test_sync_exchange_receive(self):
1135 self.startServer()
1136@@ -276,14 +294,18 @@
1137 doc = db.create_doc('{"value": "there"}')
1138 remote_target = self.getSyncTarget('test')
1139 other_changes = []
1140- def receive_doc(doc, gen):
1141- other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen))
1142+
1143+ def receive_doc(doc, gen, trans_id):
1144+ other_changes.append(
1145+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
1146+
1147 new_gen, trans_id = remote_target.sync_exchange(
1148 [], 'replica', last_known_generation=0,
1149 return_doc_cb=receive_doc)
1150 self.assertEqual(1, new_gen)
1151- self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}', 1)],
1152- other_changes)
1153+ self.assertEqual(
1154+ (doc.doc_id, doc.rev, '{"value": "there"}', 1),
1155+ other_changes[0][:-1])
1156
1157
1158 load_tests = tests.load_with_scenarios
1159
1160=== modified file 'u1db/tests/test_sync.py'
1161--- u1db/tests/test_sync.py 2012-06-08 07:39:35 +0000
1162+++ u1db/tests/test_sync.py 2012-06-15 00:40:24 +0000
1163@@ -128,8 +128,9 @@
1164 del self.db
1165 super(DatabaseSyncTargetTests, self).tearDown()
1166
1167- def receive_doc(self, doc, gen):
1168- self.other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen))
1169+ def receive_doc(self, doc, gen, trans_id):
1170+ self.other_changes.append(
1171+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
1172
1173 def set_trace_hook(self, callback):
1174 try:
1175@@ -157,7 +158,8 @@
1176
1177 def test_sync_exchange(self):
1178 docs_by_gen = [
1179- (self.make_document('doc-id', 'replica:1', simple_doc), 10)]
1180+ (self.make_document('doc-id', 'replica:1', simple_doc), 10,
1181+ 'T-sid')]
1182 new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
1183 last_known_generation=0,
1184 return_doc_cb=self.receive_doc)
1185@@ -172,7 +174,7 @@
1186 doc = self.db.create_doc('{}')
1187 edit_rev = 'replica:1|' + doc.rev
1188 docs_by_gen = [
1189- (self.make_document(doc.doc_id, edit_rev, None), 10)]
1190+ (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')]
1191 new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
1192 last_known_generation=0,
1193 return_doc_cb=self.receive_doc)
1194@@ -186,8 +188,9 @@
1195
1196 def test_sync_exchange_push_many(self):
1197 docs_by_gen = [
1198- (self.make_document('doc-id', 'replica:1', simple_doc), 10),
1199- (self.make_document('doc-id2', 'replica:1', nested_doc), 11)]
1200+ (self.make_document('doc-id', 'replica:1', simple_doc), 10, 'T-1'),
1201+ (self.make_document('doc-id2', 'replica:1', nested_doc), 11,
1202+ 'T-2')]
1203 new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica',
1204 last_known_generation=0,
1205 return_doc_cb=self.receive_doc)
1206@@ -204,13 +207,15 @@
1207 self.assertTransactionLog([doc.doc_id], self.db)
1208 new_doc = '{"key": "altval"}'
1209 docs_by_gen = [
1210- (self.make_document(doc.doc_id, 'replica:1', new_doc), 10)]
1211- new_gen, _ = self.st.sync_exchange(docs_by_gen, 'replica',
1212- last_known_generation=0,
1213- return_doc_cb=self.receive_doc)
1214+ (self.make_document(doc.doc_id, 'replica:1', new_doc), 10,
1215+ 'T-sid')]
1216+ new_gen, _ = self.st.sync_exchange(
1217+ docs_by_gen, 'replica', last_known_generation=0,
1218+ return_doc_cb=self.receive_doc)
1219 self.assertTransactionLog([doc.doc_id], self.db)
1220- self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1)], 1),
1221- (self.other_changes, new_gen))
1222+ self.assertEqual(
1223+ (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1])
1224+ self.assertEqual(1, new_gen)
1225 if self.whitebox:
1226 self.assertEqual(self.db._last_exchange_log['return'],
1227 {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
1228@@ -219,7 +224,7 @@
1229 doc = self.db.create_doc(simple_doc)
1230 self.assertTransactionLog([doc.doc_id], self.db)
1231 docs_by_gen = [
1232- (self.make_document(doc.doc_id, doc.rev, simple_doc), 10)]
1233+ (self.make_document(doc.doc_id, doc.rev, simple_doc), 10, 'T-sid')]
1234 new_gen, _ = self.st.sync_exchange(docs_by_gen, 'replica',
1235 last_known_generation=1,
1236 return_doc_cb=self.receive_doc)
1237@@ -233,8 +238,9 @@
1238 last_known_generation=0,
1239 return_doc_cb=self.receive_doc)
1240 self.assertTransactionLog([doc.doc_id], self.db)
1241- self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1)], 1),
1242- (self.other_changes, new_gen))
1243+ self.assertEqual(
1244+ (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1])
1245+ self.assertEqual(1, new_gen)
1246 if self.whitebox:
1247 self.assertEqual(self.db._last_exchange_log['return'],
1248 {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
1249@@ -247,8 +253,9 @@
1250 last_known_generation=0,
1251 return_doc_cb=self.receive_doc)
1252 self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
1253- self.assertEqual(([(doc.doc_id, doc.rev, None, 2)], 2),
1254- (self.other_changes, new_gen))
1255+ self.assertEqual(
1256+ (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1])
1257+ self.assertEqual(2, new_gen)
1258 if self.whitebox:
1259 self.assertEqual(self.db._last_exchange_log['return'],
1260 {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]})
1261@@ -261,9 +268,11 @@
1262 last_known_generation=0,
1263 return_doc_cb=self.receive_doc)
1264 self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
1265- self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1),
1266- (doc2.doc_id, doc2.rev, nested_doc, 2)], 2),
1267- (self.other_changes, new_gen))
1268+ self.assertEqual(2, new_gen)
1269+ self.assertEqual(
1270+ [(doc.doc_id, doc.rev, simple_doc, 1),
1271+ (doc2.doc_id, doc2.rev, nested_doc, 2)],
1272+ [c[:-1] for c in self.other_changes])
1273 if self.whitebox:
1274 self.assertEqual(
1275 self.db._last_exchange_log['return'],
1276@@ -275,7 +284,8 @@
1277 self.assertTransactionLog([doc.doc_id], self.db)
1278 new_doc = '{"key": "altval"}'
1279 docs_by_gen = [
1280- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)]
1281+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
1282+ 'T-sid')]
1283 new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
1284 last_known_generation=0,
1285 return_doc_cb=self.receive_doc)
1286@@ -298,11 +308,13 @@
1287 self.assertTransactionLog([doc.doc_id], self.db)
1288 new_doc = '{"key": "altval"}'
1289 docs_by_gen = [
1290- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)]
1291- new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
1292- last_known_generation=0,
1293- return_doc_cb=self.receive_doc)
1294- self.assertEqual((expected, 3), (self.other_changes, new_gen))
1295+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
1296+ 'T-sid')]
1297+ new_gen, _ = self.st.sync_exchange(
1298+ docs_by_gen, 'other-replica', last_known_generation=0,
1299+ return_doc_cb=self.receive_doc)
1300+ self.assertEqual(expected, [c[:-1] for c in self.other_changes])
1301+ self.assertEqual(3, new_gen)
1302
1303 def test_sync_exchange_with_concurrent_updates(self):
1304 def after_whatschanged_cb(state):
1305@@ -314,7 +326,8 @@
1306 self.assertTransactionLog([doc.doc_id], self.db)
1307 new_doc = '{"key": "altval"}'
1308 docs_by_gen = [
1309- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)]
1310+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
1311+ 'T-sid')]
1312 new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
1313 last_known_generation=0,
1314 return_doc_cb=self.receive_doc)
1315@@ -323,8 +336,9 @@
1316 def test_sync_exchange_converged_handling(self):
1317 doc = self.db.create_doc(simple_doc)
1318 docs_by_gen = [
1319- (self.make_document('new', 'other:1', '{}'), 4),
1320- (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5)]
1321+ (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'),
1322+ (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5,
1323+ 'T-bar')]
1324 new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica',
1325 last_known_generation=0,
1326 return_doc_cb=self.receive_doc)
1327@@ -352,8 +366,9 @@
1328 self.skipTest("sync_exchange_doc_ids not implemented")
1329 db2 = self.create_database('test2')
1330 doc = db2.create_doc(simple_doc)
1331- new_gen, trans_id = sync_exchange_doc_ids(db2, [(doc.doc_id, 10)], 0,
1332- return_doc_cb=self.receive_doc)
1333+ new_gen, trans_id = sync_exchange_doc_ids(
1334+ db2, [(doc.doc_id, 10, 'T-sid')], 0,
1335+ return_doc_cb=self.receive_doc)
1336 self.assertGetDoc(self.db, doc.doc_id, doc.rev, simple_doc, False)
1337 self.assertTransactionLog([doc.doc_id], self.db)
1338 last_trans_id = self.getLastTransId(self.db)

Subscribers

People subscribed via source and target branches