Merge lp:~thisfred/u1db/txid_in_sync_exchange into lp:u1db
- txid_in_sync_exchange
- Merge into trunk
Proposed by
Eric Casteleijn
Status: | Superseded | ||||
---|---|---|---|---|---|
Proposed branch: | lp:~thisfred/u1db/txid_in_sync_exchange | ||||
Merge into: | lp:u1db | ||||
Diff against target: |
1338 lines (+304/-182) 14 files modified
include/u1db/u1db.h (+2/-1) include/u1db/u1db_internal.h (+10/-5) src/u1db_http_sync_target.c (+33/-20) src/u1db_sync_target.c (+60/-33) u1db/__init__.py (+3/-4) u1db/remote/http_app.py (+4/-4) u1db/remote/http_target.py (+3/-3) u1db/sync.py (+21/-16) u1db/tests/c_backend_wrapper.pyx (+44/-20) u1db/tests/test_backends.py (+1/-1) u1db/tests/test_c_backend.py (+19/-14) u1db/tests/test_http_app.py (+14/-8) u1db/tests/test_remote_sync_target.py (+44/-22) u1db/tests/test_sync.py (+46/-31) |
||||
To merge this branch: | bzr merge lp:~thisfred/u1db/txid_in_sync_exchange | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu One hackers | Pending | ||
Review via email: mp+110436@code.launchpad.net |
This proposal has been superseded by a proposal from 2012-06-15.
Commit message
Added the transaction ids to the sync_exchange
Description of the change
Added the transaction ids to the sync_exchange
To post a comment you must log in.
Revision history for this message
Eric Casteleijn (thisfred) wrote : | # |
- 329. By Eric Casteleijn
-
merged trunk, fixed conflicts
- 330. By Eric Casteleijn
-
found and fixed memory leaks
Revision history for this message
Eric Casteleijn (thisfred) wrote : | # |
Memory issue fixed.
- 331. By Eric Casteleijn
-
fixed incorrect documentation
- 332. By Eric Casteleijn
-
merged trunk
- 333. By Eric Casteleijn
-
merged trunk
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'include/u1db/u1db.h' |
2 | --- include/u1db/u1db.h 2012-06-12 15:44:43 +0000 |
3 | +++ include/u1db/u1db.h 2012-06-15 00:40:24 +0000 |
4 | @@ -42,7 +42,8 @@ |
5 | typedef int (*u1db_doc_callback)(void *context, u1db_document *doc); |
6 | typedef int (*u1db_key_callback)(void *context, int num_fields, |
7 | const char **key); |
8 | -typedef int (*u1db_doc_gen_callback)(void *context, u1db_document *doc, int gen); |
9 | +typedef int (*u1db_doc_gen_callback)(void *context, u1db_document *doc, |
10 | + int gen, const char *trans_id); |
11 | typedef int (*u1db_doc_id_gen_callback)(void *context, const char *doc_id, int gen); |
12 | typedef int (*u1db_trans_info_callback)(void *context, const char *doc_id, |
13 | int gen, const char *trans_id); |
14 | |
15 | === modified file 'include/u1db/u1db_internal.h' |
16 | --- include/u1db/u1db_internal.h 2012-06-12 15:44:43 +0000 |
17 | +++ include/u1db/u1db_internal.h 2012-06-15 00:40:24 +0000 |
18 | @@ -98,6 +98,8 @@ |
19 | * want to send to the sync target |
20 | * @param generations An array of generations. Each generation |
21 | * corresponds to a doc_id. |
22 | + * @param trans_ids An array of transaction ids. Each transaction id |
23 | + * corresponds to a doc_id. |
24 | * @param target_gen (IN/OUT) Seed this with the generation of the |
25 | * target that source_db has last seen, it will then |
26 | * be filled with the final generation of the target |
27 | @@ -114,7 +116,7 @@ |
28 | */ |
29 | int (*sync_exchange_doc_ids)(u1db_sync_target *st, u1database *source_db, |
30 | int n_doc_ids, const char **doc_ids, int *generations, |
31 | - int *target_gen, char **target_trans_id, |
32 | + const char **trans_ids, int *target_gen, char **target_trans_id, |
33 | void *context, u1db_doc_gen_callback cb); |
34 | |
35 | /** |
36 | @@ -123,7 +125,8 @@ |
37 | int (*sync_exchange)(u1db_sync_target *st, |
38 | const char *source_replica_uid, int n_docs, |
39 | u1db_document **docs, int *generations, |
40 | - int *target_gen, char **target_trans_id, |
41 | + const char **trans_ids, int *target_gen, |
42 | + char **target_trans_id, |
43 | void *context, u1db_doc_gen_callback cb); |
44 | /** |
45 | * Create a sync_exchange state object. |
46 | @@ -170,6 +173,7 @@ |
47 | struct lh_table *seen_ids; |
48 | int num_doc_ids; |
49 | int *gen_for_doc_ids; |
50 | + const char **trans_ids_for_doc_ids; |
51 | char **doc_ids_to_return; |
52 | void *trace_context; |
53 | u1db__trace_callback trace_cb; |
54 | @@ -448,7 +452,7 @@ |
55 | * We have received a doc from source, record it. |
56 | */ |
57 | int u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se, |
58 | - u1db_document *doc, int source_gen); |
59 | + u1db_document *doc, int source_gen, const char *trans_id); |
60 | |
61 | /** |
62 | * We are done receiving docs, find what we want to return. |
63 | @@ -466,8 +470,9 @@ |
64 | * u1db_free_doc(). |
65 | */ |
66 | int u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context, |
67 | - int (*cb)(void *context, u1db_document *doc, int gen)); |
68 | - |
69 | + int (*cb)(void *context, |
70 | + u1db_document *doc, int gen, |
71 | + const char *trans_id)); |
72 | |
73 | /** |
74 | * Create a sync target pointing at a given URL. |
75 | |
76 | === modified file 'src/u1db_http_sync_target.c' |
77 | --- src/u1db_http_sync_target.c 2012-05-31 12:22:03 +0000 |
78 | +++ src/u1db_http_sync_target.c 2012-06-15 00:40:24 +0000 |
79 | @@ -45,14 +45,19 @@ |
80 | int source_gen, |
81 | u1db_sync_exchange **exchange); |
82 | static int st_http_sync_exchange(u1db_sync_target *st, |
83 | - const char *source_replica_uid, |
84 | - int n_docs, u1db_document **docs, |
85 | - int *generations, int *target_gen, char **target_trans_id, |
86 | - void *context, u1db_doc_gen_callback cb); |
87 | + const char *source_replica_uid, int n_docs, |
88 | + u1db_document **docs, int *generations, |
89 | + const char **trans_ids, int *target_gen, |
90 | + char **target_trans_id, void *context, |
91 | + u1db_doc_gen_callback cb); |
92 | static int st_http_sync_exchange_doc_ids(u1db_sync_target *st, |
93 | - u1database *source_db, int n_doc_ids, const char **doc_ids, |
94 | - int *generations, int *target_gen, char **target_trans_id, |
95 | - void *context, u1db_doc_gen_callback cb); |
96 | + u1database *source_db, int n_doc_ids, |
97 | + const char **doc_ids, |
98 | + int *generations, |
99 | + const char **trans_ids, |
100 | + int *target_gen, |
101 | + char **target_trans_id, void *context, |
102 | + u1db_doc_gen_callback cb); |
103 | static void st_http_finalize_sync_exchange(u1db_sync_target *st, |
104 | u1db_sync_exchange **exchange); |
105 | static int st_http_set_trace_hook(u1db_sync_target *st, |
106 | @@ -410,8 +415,8 @@ |
107 | goto finish; |
108 | } |
109 | if (req.body_buffer == NULL) { |
110 | - status = U1DB_INVALID_HTTP_RESPONSE; |
111 | - goto finish; |
112 | + status = U1DB_INVALID_HTTP_RESPONSE; |
113 | + goto finish; |
114 | } |
115 | json = json_tokener_parse(req.body_buffer); |
116 | if (json == NULL) { |
117 | @@ -663,7 +668,7 @@ |
118 | |
119 | |
120 | static int |
121 | -doc_to_tempfile(u1db_document *doc, int gen, FILE *fd) |
122 | +doc_to_tempfile(u1db_document *doc, int gen, const char *trans_id, FILE *fd) |
123 | { |
124 | int status = U1DB_OK; |
125 | json_object *json = NULL; |
126 | @@ -678,6 +683,7 @@ |
127 | json_object_object_add( |
128 | json, "content", doc->json?json_object_new_string(doc->json):NULL); |
129 | json_object_object_add(json, "gen", json_object_new_int(gen)); |
130 | + json_object_object_add(json, "trans_id", json_object_new_string(trans_id)); |
131 | fputs(json_object_to_json_string(json), fd); |
132 | finish: |
133 | if (json != NULL) { |
134 | @@ -791,6 +797,7 @@ |
135 | const char *doc_id, *content, *rev; |
136 | const char *tmp = NULL; |
137 | int gen; |
138 | + const char *trans_id = NULL; |
139 | u1db_document *doc; |
140 | |
141 | json = json_tokener_parse(response); |
142 | @@ -837,12 +844,14 @@ |
143 | content = json_object_get_string(attr); |
144 | attr = json_object_object_get(obj, "gen"); |
145 | gen = json_object_get_int(attr); |
146 | + attr = json_object_object_get(obj, "trans_id"); |
147 | + trans_id = json_object_get_string(attr); |
148 | doc = u1db__allocate_document(doc_id, rev, content, 0); |
149 | if (doc == NULL) { |
150 | status = U1DB_NOMEM; |
151 | goto finish; |
152 | } |
153 | - status = cb(context, doc, gen); |
154 | + status = cb(context, doc, gen, trans_id); |
155 | if (status != U1DB_OK) { goto finish; } |
156 | } |
157 | finish: |
158 | @@ -875,11 +884,10 @@ |
159 | } |
160 | |
161 | static int |
162 | -st_http_sync_exchange(u1db_sync_target *st, |
163 | - const char *source_replica_uid, |
164 | - int n_docs, u1db_document **docs, |
165 | - int *generations, int *target_gen, char **target_trans_id, |
166 | - void *context, |
167 | +st_http_sync_exchange(u1db_sync_target *st, const char *source_replica_uid, |
168 | + int n_docs, u1db_document **docs, int *generations, |
169 | + const char **trans_ids, int *target_gen, |
170 | + char **target_trans_id, void *context, |
171 | u1db_doc_gen_callback cb) |
172 | { |
173 | int status, i; |
174 | @@ -898,7 +906,8 @@ |
175 | status = init_temp_file(tmpname, &temp_fd, *target_gen); |
176 | if (status != U1DB_OK) { goto finish; } |
177 | for (i = 0; i < n_docs; ++i) { |
178 | - status = doc_to_tempfile(docs[i], generations[i], temp_fd); |
179 | + status = doc_to_tempfile( |
180 | + docs[i], generations[i], trans_ids[i], temp_fd); |
181 | if (status != U1DB_OK) { goto finish; } |
182 | } |
183 | status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req); |
184 | @@ -915,6 +924,7 @@ |
185 | int offset; |
186 | int num; |
187 | int *generations; |
188 | + const char **trans_ids; |
189 | FILE *temp_fd; |
190 | }; |
191 | |
192 | @@ -930,6 +940,7 @@ |
193 | status = U1DB_INTERNAL_ERROR; |
194 | } else { |
195 | status = doc_to_tempfile(doc, state->generations[state->offset], |
196 | + state->trans_ids[state->offset], |
197 | state->temp_fd); |
198 | } |
199 | u1db_free_doc(&doc); |
200 | @@ -943,9 +954,10 @@ |
201 | |
202 | static int |
203 | st_http_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db, |
204 | - int n_doc_ids, const char **doc_ids, int *generations, |
205 | - int *target_gen, char **target_trans_id, |
206 | - void *context, u1db_doc_gen_callback cb) |
207 | + int n_doc_ids, const char **doc_ids, |
208 | + int *generations, const char **trans_ids, |
209 | + int *target_gen, char **target_trans_id, |
210 | + void *context, u1db_doc_gen_callback cb) |
211 | { |
212 | int status; |
213 | FILE *temp_fd = NULL; |
214 | @@ -968,6 +980,7 @@ |
215 | if (status != U1DB_OK) { goto finish; } |
216 | state.num = n_doc_ids; |
217 | state.generations = generations; |
218 | + state.trans_ids = trans_ids; |
219 | state.temp_fd = temp_fd; |
220 | status = u1db_get_docs(source_db, n_doc_ids, doc_ids, 0, 1, |
221 | &state, get_docs_to_tempfile); |
222 | |
223 | === modified file 'src/u1db_sync_target.c' |
224 | --- src/u1db_sync_target.c 2012-06-08 10:06:05 +0000 |
225 | +++ src/u1db_sync_target.c 2012-06-15 00:40:24 +0000 |
226 | @@ -32,13 +32,15 @@ |
227 | static int st_sync_exchange(u1db_sync_target *st, |
228 | const char *source_replica_uid, int n_docs, |
229 | u1db_document **docs, int *generations, |
230 | - int *target_gen, char **target_trans_id, |
231 | - void *context, u1db_doc_gen_callback cb); |
232 | + const char **trans_ids, int *target_gen, |
233 | + char **target_trans_id, void *context, |
234 | + u1db_doc_gen_callback cb); |
235 | static int st_sync_exchange_doc_ids(u1db_sync_target *st, |
236 | - u1database *source_db, |
237 | - int n_doc_ids, const char **doc_ids, int *generations, |
238 | - int *target_gen, char **target_trans_id, |
239 | - void *context, u1db_doc_gen_callback cb); |
240 | + u1database *source_db, int n_doc_ids, |
241 | + const char **doc_ids, int *generations, |
242 | + const char **trans_ids, int *target_gen, |
243 | + char **target_trans_id, void *context, |
244 | + u1db_doc_gen_callback cb); |
245 | static int st_get_sync_exchange(u1db_sync_target *st, |
246 | const char *source_replica_uid, |
247 | int source_gen, |
248 | @@ -57,6 +59,7 @@ |
249 | void *user_context; |
250 | u1db_doc_gen_callback user_cb; |
251 | int *gen_for_doc_ids; |
252 | + const char **trans_ids_for_doc_ids; |
253 | int free_when_done; |
254 | }; |
255 | |
256 | @@ -203,6 +206,10 @@ |
257 | free((*exchange)->gen_for_doc_ids); |
258 | (*exchange)->gen_for_doc_ids = NULL; |
259 | } |
260 | + if ((*exchange)->trans_ids_for_doc_ids != NULL) { |
261 | + free((*exchange)->trans_ids_for_doc_ids); |
262 | + (*exchange)->trans_ids_for_doc_ids = NULL; |
263 | + } |
264 | if ((*exchange)->target_trans_id != NULL) { |
265 | free((*exchange)->target_trans_id); |
266 | (*exchange)->target_trans_id = NULL; |
267 | @@ -275,7 +282,7 @@ |
268 | |
269 | int |
270 | u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se, |
271 | - u1db_document *doc, int source_gen) |
272 | + u1db_document *doc, int source_gen, const char *trans_id) |
273 | { |
274 | int status = U1DB_OK; |
275 | int insert_state; |
276 | @@ -285,7 +292,8 @@ |
277 | } |
278 | // fprintf(stderr, "Inserting %s from source\n", doc->doc_id); |
279 | status = u1db__put_doc_if_newer(se->db, doc, 0, se->source_replica_uid, |
280 | - source_gen, NULL, &insert_state, &at_gen); |
281 | + source_gen, trans_id, &insert_state, |
282 | + &at_gen); |
283 | if (insert_state == U1DB_INSERTED || insert_state == U1DB_CONVERGED) { |
284 | lh_table_insert(se->seen_ids, strdup(doc->doc_id), |
285 | (void *)(intptr_t)at_gen); |
286 | @@ -305,6 +313,7 @@ |
287 | struct lh_table *exclude_ids; |
288 | char **doc_ids_to_return; |
289 | int *gen_for_doc_ids; |
290 | + const char **trans_ids_for_doc_ids; |
291 | }; |
292 | |
293 | // Callback for whats_changed to map the callback into the sync_exchange |
294 | @@ -327,25 +336,29 @@ |
295 | if (state->num_doc_ids >= state->max_doc_ids) { |
296 | state->max_doc_ids = (state->max_doc_ids * 2) + 10; |
297 | if (state->doc_ids_to_return == NULL) { |
298 | - state->doc_ids_to_return = (char **)calloc(state->max_doc_ids, |
299 | - sizeof(char*)); |
300 | - state->gen_for_doc_ids = (int *)calloc(state->max_doc_ids, |
301 | - sizeof(int)); |
302 | + state->doc_ids_to_return = (char **)calloc( |
303 | + state->max_doc_ids, sizeof(char*)); |
304 | + state->gen_for_doc_ids = (int *)calloc( |
305 | + state->max_doc_ids, sizeof(int)); |
306 | + state->trans_ids_for_doc_ids = (const char **)calloc( |
307 | + state->max_doc_ids, sizeof(char*)); |
308 | } else { |
309 | state->doc_ids_to_return = (char **)realloc( |
310 | - state->doc_ids_to_return, |
311 | - state->max_doc_ids * sizeof(char*)); |
312 | - state->gen_for_doc_ids = (int *)realloc(state->gen_for_doc_ids, |
313 | - state->max_doc_ids * sizeof(int)); |
314 | + state->doc_ids_to_return, state->max_doc_ids * sizeof(char*)); |
315 | + state->gen_for_doc_ids = (int *)realloc( |
316 | + state->gen_for_doc_ids, state->max_doc_ids * sizeof(int)); |
317 | + state->trans_ids_for_doc_ids = (const char **)realloc( |
318 | + state->gen_for_doc_ids, state->max_doc_ids * sizeof(char*)); |
319 | } |
320 | - if (state->doc_ids_to_return == NULL |
321 | - || state->gen_for_doc_ids == NULL) |
322 | + if (state->doc_ids_to_return == NULL || state->gen_for_doc_ids == NULL |
323 | + || state->trans_ids_for_doc_ids == NULL) |
324 | { |
325 | return U1DB_NOMEM; |
326 | } |
327 | } |
328 | state->doc_ids_to_return[state->num_doc_ids] = strdup(doc_id); |
329 | state->gen_for_doc_ids[state->num_doc_ids] = gen; |
330 | + state->trans_ids_for_doc_ids[state->num_doc_ids] = trans_id; |
331 | state->num_doc_ids++; |
332 | return 0; |
333 | } |
334 | @@ -370,6 +383,7 @@ |
335 | if (status != U1DB_OK) { |
336 | free(state.doc_ids_to_return); |
337 | free(state.gen_for_doc_ids); |
338 | + free(state.trans_ids_for_doc_ids); |
339 | goto finish; |
340 | } |
341 | if (se->trace_cb) { |
342 | @@ -379,6 +393,7 @@ |
343 | se->num_doc_ids = state.num_doc_ids; |
344 | se->doc_ids_to_return = state.doc_ids_to_return; |
345 | se->gen_for_doc_ids = state.gen_for_doc_ids; |
346 | + se->trans_ids_for_doc_ids = state.trans_ids_for_doc_ids; |
347 | finish: |
348 | if (target_trans_id != NULL) { |
349 | free(target_trans_id); |
350 | @@ -397,8 +412,9 @@ |
351 | // Note: using doc_offset in this way assumes that u1db_get_docs will |
352 | // always return them in exactly the order we requested. This is |
353 | // probably true, though. |
354 | - status = ctx->user_cb(ctx->user_context, doc, |
355 | - ctx->gen_for_doc_ids[ctx->doc_offset]); |
356 | + status = ctx->user_cb( |
357 | + ctx->user_context, doc, ctx->gen_for_doc_ids[ctx->doc_offset], |
358 | + ctx->trans_ids_for_doc_ids[ctx->doc_offset]); |
359 | ctx->doc_offset++; |
360 | if (ctx->free_when_done) { |
361 | u1db_free_doc(&doc); |
362 | @@ -409,7 +425,8 @@ |
363 | |
364 | int |
365 | u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context, |
366 | - int (*cb)(void *context, u1db_document *doc, int gen)) |
367 | + int (*cb)(void *context, u1db_document *doc, |
368 | + int gen, const char *trans_id)) |
369 | { |
370 | int status = U1DB_OK; |
371 | struct _get_docs_to_doc_gen_context state = {0}; |
372 | @@ -420,6 +437,7 @@ |
373 | state.user_cb = cb; |
374 | state.doc_offset = 0; |
375 | state.gen_for_doc_ids = se->gen_for_doc_ids; |
376 | + state.trans_ids_for_doc_ids = se->trans_ids_for_doc_ids; |
377 | if (se->trace_cb) { |
378 | status = se->trace_cb(se->trace_context, "before get_docs"); |
379 | if (status != U1DB_OK) { goto finish; } |
380 | @@ -440,14 +458,16 @@ |
381 | }; |
382 | |
383 | static int |
384 | -return_doc_to_insert_from_target(void *context, u1db_document *doc, int gen) |
385 | +return_doc_to_insert_from_target(void *context, u1db_document *doc, int gen, |
386 | + const char *trans_id) |
387 | { |
388 | int status, insert_state; |
389 | struct _return_doc_state *state; |
390 | state = (struct _return_doc_state *)context; |
391 | |
392 | - status = u1db__put_doc_if_newer(state->db, doc, 1, state->target_uid, gen, |
393 | - NULL, &insert_state, NULL); |
394 | + status = u1db__put_doc_if_newer( |
395 | + state->db, doc, 1, state->target_uid, gen, trans_id, &insert_state, |
396 | + NULL); |
397 | u1db_free_doc(&doc); |
398 | if (status == U1DB_OK) { |
399 | if (insert_state == U1DB_INSERTED || insert_state == U1DB_CONFLICTED) { |
400 | @@ -462,7 +482,8 @@ |
401 | |
402 | static int |
403 | get_and_insert_docs(u1database *source_db, u1db_sync_exchange *se, |
404 | - int n_doc_ids, const char **doc_ids, int *generations) |
405 | + int n_doc_ids, const char **doc_ids, int *generations, |
406 | + const char **trans_ids) |
407 | { |
408 | struct _get_docs_to_doc_gen_context get_doc_state = {0}; |
409 | |
410 | @@ -473,6 +494,7 @@ |
411 | get_doc_state.user_cb = |
412 | (u1db_doc_gen_callback)u1db__sync_exchange_insert_doc_from_source; |
413 | get_doc_state.gen_for_doc_ids = generations; |
414 | + get_doc_state.trans_ids_for_doc_ids = trans_ids; |
415 | return u1db_get_docs(source_db, n_doc_ids, doc_ids, |
416 | 0, 1, &get_doc_state, get_docs_to_gen_docs); |
417 | } |
418 | @@ -480,11 +502,13 @@ |
419 | |
420 | static int |
421 | st_sync_exchange(u1db_sync_target *st, const char *source_replica_uid, |
422 | - int n_docs, u1db_document **docs, |
423 | - int *generations, int *target_gen, char **target_trans_id, |
424 | - void *context, u1db_doc_gen_callback cb) |
425 | + int n_docs, u1db_document **docs, int *generations, |
426 | + const char **trans_ids, int *target_gen, |
427 | + char **target_trans_id, void *context, |
428 | + u1db_doc_gen_callback cb) |
429 | { |
430 | int status, i; |
431 | + |
432 | u1db_sync_exchange *exchange = NULL; |
433 | if (st == NULL || generations == NULL || target_gen == NULL |
434 | || target_trans_id == NULL || cb == NULL) |
435 | @@ -498,8 +522,8 @@ |
436 | *target_gen, &exchange); |
437 | if (status != U1DB_OK) { goto finish; } |
438 | for (i = 0; i < n_docs; ++i) { |
439 | - status = u1db__sync_exchange_insert_doc_from_source(exchange, docs[i], |
440 | - generations[i]); |
441 | + status = u1db__sync_exchange_insert_doc_from_source( |
442 | + exchange, docs[i], generations[i], trans_ids[i]); |
443 | if (status != U1DB_OK) { goto finish; } |
444 | } |
445 | status = u1db__sync_exchange_find_doc_ids_to_return(exchange); |
446 | @@ -521,7 +545,7 @@ |
447 | static int |
448 | st_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db, |
449 | int n_doc_ids, const char **doc_ids, int *generations, |
450 | - int *target_gen, char **target_trans_id, |
451 | + const char **trans_ids, int *target_gen, char **target_trans_id, |
452 | void *context, u1db_doc_gen_callback cb) |
453 | { |
454 | int status; |
455 | @@ -542,7 +566,7 @@ |
456 | if (status != U1DB_OK) { goto finish; } |
457 | if (n_doc_ids > 0) { |
458 | status = get_and_insert_docs(source_db, exchange, |
459 | - n_doc_ids, doc_ids, generations); |
460 | + n_doc_ids, doc_ids, generations, trans_ids); |
461 | if (status != U1DB_OK) { goto finish; } |
462 | } |
463 | status = u1db__sync_exchange_find_doc_ids_to_return(exchange); |
464 | @@ -616,7 +640,7 @@ |
465 | status = target->sync_exchange_doc_ids(target, db, |
466 | to_send_state.num_doc_ids, |
467 | (const char**)to_send_state.doc_ids_to_return, |
468 | - to_send_state.gen_for_doc_ids, |
469 | + to_send_state.gen_for_doc_ids, to_send_state.trans_ids_for_doc_ids, |
470 | &target_gen_known_by_local, &target_trans_id_known_by_local, |
471 | &return_doc_state, return_doc_to_insert_from_target); |
472 | if (status != U1DB_OK) { goto finish; } |
473 | @@ -665,5 +689,8 @@ |
474 | if (to_send_state.gen_for_doc_ids != NULL) { |
475 | free(to_send_state.gen_for_doc_ids); |
476 | } |
477 | + if (to_send_state.trans_ids_for_doc_ids != NULL) { |
478 | + free(to_send_state.trans_ids_for_doc_ids); |
479 | + } |
480 | return status; |
481 | } |
482 | |
483 | === modified file 'u1db/__init__.py' |
484 | --- u1db/__init__.py 2012-06-12 15:44:43 +0000 |
485 | +++ u1db/__init__.py 2012-06-15 00:40:24 +0000 |
486 | @@ -557,10 +557,9 @@ |
487 | their latest change in order from the oldest change to the |
488 | newest. |
489 | |
490 | - :param docs_by_generation: A list of [(Document, generation)] |
491 | - pairs indicating documents which should be updated on |
492 | - this replica paired with the generation of their |
493 | - latest change. |
494 | + :param docs_by_generation: A list of [(Document, generation, |
495 | + transaction_id)] pairs indicating documents which should be updated |
496 | + on this replica paired with the generation of their latest change. |
497 | :param source_replica_uid: The source replica's identifier |
498 | :param last_known_generation: The last generation that the source |
499 | replica knows about this |
500 | |
501 | === modified file 'u1db/remote/http_app.py' |
502 | --- u1db/remote/http_app.py 2012-05-31 12:22:03 +0000 |
503 | +++ u1db/remote/http_app.py 2012-06-15 00:40:24 +0000 |
504 | @@ -324,14 +324,14 @@ |
505 | last_known_generation) |
506 | |
507 | @http_method(content_as_args=True) |
508 | - def post_stream_entry(self, id, rev, content, gen): |
509 | + def post_stream_entry(self, id, rev, content, gen, trans_id): |
510 | doc = Document(id, rev, content) |
511 | - self.sync_exch.insert_doc_from_source(doc, gen) |
512 | + self.sync_exch.insert_doc_from_source(doc, gen, trans_id) |
513 | |
514 | def post_end(self): |
515 | - def send_doc(doc, gen): |
516 | + def send_doc(doc, gen, trans_id): |
517 | entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), |
518 | - gen=gen) |
519 | + gen=gen, trans_id=trans_id) |
520 | self.responder.stream_entry(entry) |
521 | new_gen = self.sync_exch.find_changes_to_return() |
522 | self.responder.content_type = 'application/x-u1db-sync-stream' |
523 | |
524 | === modified file 'u1db/remote/http_target.py' |
525 | --- u1db/remote/http_target.py 2012-05-31 12:22:03 +0000 |
526 | +++ u1db/remote/http_target.py 2012-06-15 00:40:24 +0000 |
527 | @@ -65,7 +65,7 @@ |
528 | line, comma = utils.check_and_strip_comma(entry) |
529 | entry = simplejson.loads(line) |
530 | doc = Document(entry['id'], entry['rev'], entry['content']) |
531 | - return_doc_cb(doc, entry['gen']) |
532 | + return_doc_cb(doc, entry['gen'], entry['trans_id']) |
533 | if parts[-1] != ']': |
534 | try: |
535 | partdic = simplejson.loads(parts[-1]) |
536 | @@ -98,9 +98,9 @@ |
537 | comma = '' |
538 | size += prepare(last_known_generation=last_known_generation) |
539 | comma = ',' |
540 | - for doc, gen in docs_by_generations: |
541 | + for doc, gen, trans_id in docs_by_generations: |
542 | size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), |
543 | - gen=gen) |
544 | + gen=gen, trans_id=trans_id) |
545 | entries.append('\r\n]') |
546 | size += len(entries[-1]) |
547 | self._conn.putheader('content-length', str(size)) |
548 | |
549 | === modified file 'u1db/sync.py' |
550 | --- u1db/sync.py 2012-05-31 12:22:03 +0000 |
551 | +++ u1db/sync.py 2012-06-15 00:40:24 +0000 |
552 | @@ -40,7 +40,7 @@ |
553 | self.target_replica_uid = None |
554 | self.num_inserted = 0 |
555 | |
556 | - def _insert_doc_from_target(self, doc, replica_gen): |
557 | + def _insert_doc_from_target(self, doc, replica_gen, trans_id): |
558 | """Try to insert synced document from target. |
559 | |
560 | Implements TAKE OTHER semantics: any document from the target |
561 | @@ -54,7 +54,7 @@ |
562 | # was effectively inserted. |
563 | state, _ = self.source._put_doc_if_newer(doc, save_conflict=True, |
564 | replica_uid=self.target_replica_uid, replica_gen=replica_gen, |
565 | - replica_trans_id=None) |
566 | + replica_trans_id=trans_id) |
567 | if state == 'inserted': |
568 | self.num_inserted += 1 |
569 | elif state == 'converged': |
570 | @@ -110,7 +110,10 @@ |
571 | # prepare to send all the changed docs |
572 | docs_to_send = self.source.get_docs(changed_doc_ids, |
573 | check_for_conflicts=False, include_deleted=True) |
574 | - docs_by_generation = zip(docs_to_send, (gen for _, gen, _ in changes)) |
575 | + # TODO: there must be a way to not iterate twice |
576 | + docs_by_generation = zip( |
577 | + docs_to_send, (gen for _, gen, _ in changes), |
578 | + (trans for _, _, trans in changes)) |
579 | |
580 | # exchange documents and try to insert the returned ones with |
581 | # the target, return target synced-up-to gen |
582 | @@ -153,7 +156,7 @@ |
583 | return |
584 | self._trace_hook(state) |
585 | |
586 | - def insert_doc_from_source(self, doc, source_gen): |
587 | + def insert_doc_from_source(self, doc, source_gen, trans_id): |
588 | """Try to insert synced document from source. |
589 | |
590 | Conflicting documents are not inserted but will be sent over |
591 | @@ -171,7 +174,7 @@ |
592 | """ |
593 | state, at_gen = self._db._put_doc_if_newer(doc, save_conflict=False, |
594 | replica_uid=self.source_replica_uid, replica_gen=source_gen, |
595 | - replica_trans_id=None) |
596 | + replica_trans_id=trans_id) |
597 | if state == 'inserted': |
598 | self.seen_ids[doc.doc_id] = at_gen |
599 | elif state == 'converged': |
600 | @@ -212,10 +215,10 @@ |
601 | self.new_trans_id = trans_id |
602 | seen_ids = self.seen_ids |
603 | # changed docs that weren't superseded by or converged with |
604 | - self.changes_to_return = [(doc_id, gen) for (doc_id, gen, _) in changes |
605 | - # there was a subsequent update |
606 | - if doc_id not in seen_ids or |
607 | - seen_ids.get(doc_id) < gen] |
608 | + self.changes_to_return = [ |
609 | + (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes |
610 | + # there was a subsequent update |
611 | + if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] |
612 | return self.new_gen |
613 | |
614 | def return_docs(self, return_doc_cb): |
615 | @@ -224,21 +227,23 @@ |
616 | |
617 | The final step of a sync exchange. |
618 | |
619 | - :param: return_doc_cb(doc, gen): is a callback |
620 | + :param: return_doc_cb(doc, gen, trans_id): is a callback |
621 | used to return the documents with their last change generation |
622 | to the target replica. |
623 | :return: None |
624 | """ |
625 | changes_to_return = self.changes_to_return |
626 | # return docs, including conflicts |
627 | - changed_doc_ids = [doc_id for doc_id, _ in changes_to_return] |
628 | + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] |
629 | self._trace('before get_docs') |
630 | docs = self._db.get_docs( |
631 | changed_doc_ids, check_for_conflicts=False, include_deleted=True) |
632 | |
633 | - docs_by_gen = izip(docs, (gen for _, gen in changes_to_return)) |
634 | - for doc, gen in docs_by_gen: |
635 | - return_doc_cb(doc, gen) |
636 | + docs_by_gen = izip( |
637 | + docs, (gen for _, gen, _ in changes_to_return), |
638 | + (trans_id for _, _, trans_id in changes_to_return)) |
639 | + for doc, gen, trans_id in docs_by_gen: |
640 | + return_doc_cb(doc, gen, trans_id) |
641 | # for tests |
642 | self._db._last_exchange_log['return'] = { |
643 | 'docs': [(d.doc_id, d.rev) for d in docs], |
644 | @@ -260,8 +265,8 @@ |
645 | if self._trace_hook: |
646 | sync_exch._set_trace_hook(self._trace_hook) |
647 | # 1st step: try to insert incoming docs and record progress |
648 | - for doc, doc_gen in docs_by_generations: |
649 | - sync_exch.insert_doc_from_source(doc, doc_gen) |
650 | + for doc, doc_gen, trans_id in docs_by_generations: |
651 | + sync_exch.insert_doc_from_source(doc, doc_gen, trans_id) |
652 | # 2nd step: find changed documents (including conflicts) to return |
653 | new_gen = sync_exch.find_changes_to_return() |
654 | # final step: return docs and record source replica sync point |
655 | |
656 | === modified file 'u1db/tests/c_backend_wrapper.pyx' |
657 | --- u1db/tests/c_backend_wrapper.pyx 2012-06-13 11:15:57 +0000 |
658 | +++ u1db/tests/c_backend_wrapper.pyx 2012-06-15 00:40:24 +0000 |
659 | @@ -61,7 +61,7 @@ |
660 | ctypedef int (*u1db_key_callback)(void *context, int num_fields, |
661 | const_char_ptr *key) |
662 | ctypedef int (*u1db_doc_gen_callback)(void *context, |
663 | - u1db_document *doc, int gen) |
664 | + u1db_document *doc, int gen, const_char_ptr trans_id) |
665 | ctypedef int (*u1db_trans_info_callback)(void *context, |
666 | const_char_ptr doc_id, int gen, const_char_ptr trans_id) |
667 | |
668 | @@ -164,6 +164,7 @@ |
669 | int num_doc_ids |
670 | char **doc_ids_to_return |
671 | int *gen_for_doc_ids |
672 | + const_char_ptr *trans_ids_for_doc_ids |
673 | |
674 | ctypedef int (*u1db__trace_callback)(void *context, const_char_ptr state) |
675 | ctypedef struct u1db_sync_target: |
676 | @@ -176,12 +177,16 @@ |
677 | int (*sync_exchange)(u1db_sync_target *st, |
678 | char *source_replica_uid, int n_docs, |
679 | u1db_document **docs, int *generations, |
680 | - int *target_gen, char **target_trans_id, |
681 | - void *context, u1db_doc_gen_callback cb) nogil |
682 | + const_char_ptr *trans_ids, int *target_gen, |
683 | + char **target_trans_id, void *context, |
684 | + u1db_doc_gen_callback cb) nogil |
685 | int (*sync_exchange_doc_ids)(u1db_sync_target *st, |
686 | - u1database *source_db, int n_doc_ids, const_char_ptr *doc_ids, |
687 | - int *generations, int *target_gen, char **target_trans_id, |
688 | - void *context, u1db_doc_gen_callback cb) nogil |
689 | + u1database *source_db, int n_doc_ids, |
690 | + const_char_ptr *doc_ids, int *generations, |
691 | + const_char_ptr *trans_ids, |
692 | + int *target_gen, char **target_trans_id, |
693 | + void *context, |
694 | + u1db_doc_gen_callback cb) nogil |
695 | int (*get_sync_exchange)(u1db_sync_target *st, |
696 | char *source_replica_uid, |
697 | int last_known_source_gen, |
698 | @@ -228,10 +233,12 @@ |
699 | int *local_gen_before_sync) nogil |
700 | |
701 | int u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se, |
702 | - u1db_document *doc, int source_gen) |
703 | + u1db_document *doc, int source_gen, const_char_ptr trans_id) |
704 | int u1db__sync_exchange_find_doc_ids_to_return(u1db_sync_exchange *se) |
705 | int u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context, |
706 | - int (*cb)(void *context, u1db_document *doc, int gen)) |
707 | + int (*cb)(void *context, |
708 | + u1db_document *doc, int gen, |
709 | + const_char_ptr trans_id)) |
710 | int u1db__create_http_sync_target(char *url, u1db_sync_target **target) |
711 | int u1db__create_oauth_http_sync_target(char *url, |
712 | char *consumer_key, char *consumer_secret, |
713 | @@ -331,13 +338,13 @@ |
714 | |
715 | |
716 | cdef int return_doc_cb_wrapper(void *context, u1db_document *doc, |
717 | - int gen) with gil: |
718 | + int gen, const_char_ptr trans_id) with gil: |
719 | cdef CDocument pydoc |
720 | user_cb = <object>context |
721 | pydoc = CDocument() |
722 | pydoc._doc = doc |
723 | try: |
724 | - user_cb(pydoc, gen) |
725 | + user_cb(pydoc, gen, trans_id) |
726 | except Exception, e: |
727 | # We suppress the exception here, because intermediating through the C |
728 | # layer gets a bit crazy |
729 | @@ -606,11 +613,12 @@ |
730 | self._check() |
731 | return self._exchange.target_gen |
732 | |
733 | - def insert_doc_from_source(self, CDocument doc, source_gen): |
734 | + def insert_doc_from_source(self, CDocument doc, source_gen, |
735 | + source_trans_id): |
736 | self._check() |
737 | handle_status("insert_doc_from_source", |
738 | u1db__sync_exchange_insert_doc_from_source(self._exchange, |
739 | - doc._doc, source_gen)) |
740 | + doc._doc, source_gen, source_trans_id)) |
741 | |
742 | def find_doc_ids_to_return(self): |
743 | self._check() |
744 | @@ -642,8 +650,10 @@ |
745 | if (self._exchange.num_doc_ids > 0 |
746 | and self._exchange.doc_ids_to_return != NULL): |
747 | for i from 0 <= i < self._exchange.num_doc_ids: |
748 | - res.append((self._exchange.doc_ids_to_return[i], |
749 | - self._exchange.gen_for_doc_ids[i])) |
750 | + res.append( |
751 | + (self._exchange.doc_ids_to_return[i], |
752 | + self._exchange.gen_for_doc_ids[i], |
753 | + self._exchange.trans_ids_for_doc_ids[i])) |
754 | return res |
755 | |
756 | |
757 | @@ -714,25 +724,32 @@ |
758 | if generations == NULL: |
759 | free(<void *>doc_ids) |
760 | raise MemoryError |
761 | + trans_ids = <const_char_ptr*>calloc(num_doc_ids, sizeof(char *)) |
762 | + if trans_ids == NULL: |
763 | + raise MemoryError |
764 | res_trans_id = '' |
765 | try: |
766 | - for i, (doc_id, gen) in enumerate(doc_id_generations): |
767 | + for i, (doc_id, gen, trans_id) in enumerate(doc_id_generations): |
768 | doc_ids[i] = PyString_AsString(doc_id) |
769 | generations[i] = gen |
770 | + trans_ids[i] = trans_id |
771 | target_gen = last_known_generation |
772 | with nogil: |
773 | status = self._st.sync_exchange_doc_ids(self._st, sdb._db, |
774 | - num_doc_ids, doc_ids, generations, |
775 | + num_doc_ids, doc_ids, generations, trans_ids, |
776 | &target_gen, &target_trans_id, |
777 | <void*>return_doc_cb, return_doc_cb_wrapper) |
778 | handle_status("sync_exchange_doc_ids", status) |
779 | finally: |
780 | - free(<void *>doc_ids) |
781 | - free(generations) |
782 | + if doc_ids != NULL: |
783 | + free(<void *>doc_ids) |
784 | + if generations != NULL: |
785 | + free(generations) |
786 | + if trans_ids != NULL: |
787 | + free(trans_ids) |
788 | if target_trans_id != NULL: |
789 | res_trans_id = target_trans_id |
790 | free(target_trans_id) |
791 | - |
792 | return target_gen, res_trans_id |
793 | |
794 | def sync_exchange(self, docs_by_generations, source_replica_uid, |
795 | @@ -740,6 +757,7 @@ |
796 | cdef CDocument cur_doc |
797 | cdef u1db_document **docs = NULL |
798 | cdef int *generations = NULL |
799 | + cdef const_char_ptr *trans_ids = NULL |
800 | cdef char *trans_id = NULL |
801 | cdef int i, count, status, target_gen |
802 | |
803 | @@ -754,15 +772,19 @@ |
804 | generations = <int*>calloc(count, sizeof(int)) |
805 | if generations == NULL: |
806 | raise MemoryError |
807 | + trans_ids = <const_char_ptr*>calloc(count, sizeof(char*)) |
808 | + if trans_ids == NULL: |
809 | + raise MemoryError |
810 | for i from 0 <= i < count: |
811 | cur_doc = docs_by_generations[i][0] |
812 | generations[i] = docs_by_generations[i][1] |
813 | + trans_ids[i] = docs_by_generations[i][2] |
814 | docs[i] = cur_doc._doc |
815 | target_gen = last_known_generation |
816 | with nogil: |
817 | status = self._st.sync_exchange(self._st, |
818 | source_replica_uid, count, |
819 | - docs, generations, &target_gen, &trans_id, |
820 | + docs, generations, trans_ids, &target_gen, &trans_id, |
821 | <void *>return_doc_cb, return_doc_cb_wrapper) |
822 | handle_status("sync_exchange", status) |
823 | finally: |
824 | @@ -770,6 +792,8 @@ |
825 | free(docs) |
826 | if generations != NULL: |
827 | free(generations) |
828 | + if trans_ids != NULL: |
829 | + free(trans_ids) |
830 | if trans_id != NULL: |
831 | res_trans_id = trans_id |
832 | free(trans_id) |
833 | |
834 | === modified file 'u1db/tests/test_backends.py' |
835 | --- u1db/tests/test_backends.py 2012-06-12 15:44:43 +0000 |
836 | +++ u1db/tests/test_backends.py 2012-06-15 00:40:24 +0000 |
837 | @@ -1376,7 +1376,7 @@ |
838 | pass |
839 | |
840 | doc_other = self.make_document(doc.doc_id, other_rev, new_content) |
841 | - docs_by_gen = [(doc_other, 10)] |
842 | + docs_by_gen = [(doc_other, 10, 'T-sid')] |
843 | st.sync_exchange( |
844 | docs_by_gen, 'other-replica', last_known_generation=0, |
845 | return_doc_cb=ignore) |
846 | |
847 | === modified file 'u1db/tests/test_c_backend.py' |
848 | --- u1db/tests/test_c_backend.py 2012-06-12 14:57:44 +0000 |
849 | +++ u1db/tests/test_c_backend.py 2012-06-15 00:40:24 +0000 |
850 | @@ -287,10 +287,10 @@ |
851 | doc = c_backend_wrapper.make_document('doc-id', 'replica:1', |
852 | tests.simple_doc) |
853 | self.assertEqual([], exc.get_seen_ids()) |
854 | - exc.insert_doc_from_source(doc, 10) |
855 | + exc.insert_doc_from_source(doc, 10, 'T-sid') |
856 | self.assertGetDoc(self.db, 'doc-id', 'replica:1', tests.simple_doc, |
857 | False) |
858 | - self.assertEqual((10, None), |
859 | + self.assertEqual((10, 'T-sid'), |
860 | self.db._get_sync_gen_info('source-uid')) |
861 | self.assertEqual(['doc-id'], exc.get_seen_ids()) |
862 | |
863 | @@ -301,7 +301,7 @@ |
864 | tests.nested_doc) |
865 | self.assertEqual([], exc.get_seen_ids()) |
866 | # The insert should be rejected and the doc_id not considered 'seen' |
867 | - exc.insert_doc_from_source(doc2, 10) |
868 | + exc.insert_doc_from_source(doc2, 10, 'T-sid') |
869 | self.assertGetDoc( |
870 | self.db, doc.doc_id, doc.rev, tests.simple_doc, False) |
871 | self.assertEqual([], exc.get_seen_ids()) |
872 | @@ -311,7 +311,10 @@ |
873 | exc = self.st._get_sync_exchange("source-uid", 0) |
874 | self.assertEqual(0, exc.target_gen) |
875 | exc.find_doc_ids_to_return() |
876 | - self.assertEqual([(doc.doc_id, 1)], exc.get_doc_ids_to_return()) |
877 | + doc_id = exc.get_doc_ids_to_return()[0] |
878 | + self.assertEqual( |
879 | + (doc.doc_id, 1), doc_id[:-1]) |
880 | + self.assertTrue(doc_id[-1].startswith('T-')) |
881 | self.assertEqual(1, exc.target_gen) |
882 | |
883 | def test_sync_exchange_find_doc_ids_not_including_recently_inserted(self): |
884 | @@ -320,22 +323,23 @@ |
885 | exc = self.st._get_sync_exchange("source-uid", 0) |
886 | doc3 = c_backend_wrapper.make_document(doc1.doc_id, |
887 | doc1.rev + "|zreplica:2", tests.simple_doc) |
888 | - exc.insert_doc_from_source(doc3, 10) |
889 | + exc.insert_doc_from_source(doc3, 10, 'T-sid') |
890 | exc.find_doc_ids_to_return() |
891 | - self.assertEqual([(doc2.doc_id, 2)], exc.get_doc_ids_to_return()) |
892 | + self.assertEqual( |
893 | + (doc2.doc_id, 2), exc.get_doc_ids_to_return()[0][:-1]) |
894 | self.assertEqual(3, exc.target_gen) |
895 | |
896 | def test_sync_exchange_return_docs(self): |
897 | returned = [] |
898 | |
899 | - def return_doc_cb(doc, gen): |
900 | - returned.append((doc, gen)) |
901 | + def return_doc_cb(doc, gen, trans_id): |
902 | + returned.append((doc, gen, trans_id)) |
903 | |
904 | doc1 = self.db.create_doc(tests.simple_doc) |
905 | exc = self.st._get_sync_exchange("source-uid", 0) |
906 | exc.find_doc_ids_to_return() |
907 | exc.return_docs(return_doc_cb) |
908 | - self.assertEqual([(doc1, 1)], returned) |
909 | + self.assertEqual((doc1, 1), returned[0][:-1]) |
910 | |
911 | def test_sync_exchange_doc_ids(self): |
912 | doc1 = self.db.create_doc(tests.simple_doc, doc_id='doc-1') |
913 | @@ -343,16 +347,17 @@ |
914 | doc2 = db2.create_doc(tests.nested_doc, doc_id='doc-2') |
915 | returned = [] |
916 | |
917 | - def return_doc_cb(doc, gen): |
918 | - returned.append((doc, gen)) |
919 | - val = self.st.sync_exchange_doc_ids(db2, [(doc2.doc_id, 1)], 0, |
920 | - return_doc_cb) |
921 | + def return_doc_cb(doc, gen, trans_id): |
922 | + returned.append((doc, gen, trans_id)) |
923 | + |
924 | + val = self.st.sync_exchange_doc_ids( |
925 | + db2, [(doc2.doc_id, 1, 'T-sid')], 0, return_doc_cb) |
926 | last_trans_id = self.db._get_transaction_log()[-1][1] |
927 | self.assertEqual(2, self.db._get_generation()) |
928 | self.assertEqual((2, last_trans_id), val) |
929 | self.assertGetDoc(self.db, doc2.doc_id, doc2.rev, tests.nested_doc, |
930 | False) |
931 | - self.assertEqual([(doc1, 1)], returned) |
932 | + self.assertEqual((doc1, 1), returned[0][:-1]) |
933 | |
934 | |
935 | class TestCHTTPSyncTarget(BackendTests): |
936 | |
937 | === modified file 'u1db/tests/test_http_app.py' |
938 | --- u1db/tests/test_http_app.py 2012-05-31 12:22:03 +0000 |
939 | +++ u1db/tests/test_http_app.py 2012-06-15 00:40:24 +0000 |
940 | @@ -715,9 +715,9 @@ |
941 | def test_sync_exchange_send(self): |
942 | entries = { |
943 | 10: {'id': 'doc-here', 'rev': 'replica:1', 'content': |
944 | - '{"value": "here"}', 'gen': 10}, |
945 | + '{"value": "here"}', 'gen': 10, 'trans_id': 'T-sid'}, |
946 | 11: {'id': 'doc-here2', 'rev': 'replica:1', 'content': |
947 | - '{"value": "here2"}', 'gen': 11} |
948 | + '{"value": "here2"}', 'gen': 11, 'trans_id': 'T-sed'} |
949 | } |
950 | |
951 | gens = [] |
952 | @@ -793,12 +793,18 @@ |
953 | self.assertEqual({'new_generation': 2, |
954 | 'new_transaction_id': last_trans_id}, |
955 | simplejson.loads(parts[1].rstrip(","))) |
956 | - self.assertEqual({'content': '{"value": "there"}', |
957 | - 'rev': doc.rev, 'id': doc.doc_id, 'gen': 1}, |
958 | - simplejson.loads(parts[2].rstrip(","))) |
959 | - self.assertEqual({'content': '{"value": "there2"}', |
960 | - 'rev': doc2.rev, 'id': doc2.doc_id, 'gen': 2}, |
961 | - simplejson.loads(parts[3].rstrip(","))) |
962 | + part2 = simplejson.loads(parts[2].rstrip(",")) |
963 | + self.assertTrue(part2['trans_id'].startswith('T-')) |
964 | + self.assertEqual('{"value": "there"}', part2['content']) |
965 | + self.assertEqual(doc.rev, part2['rev']) |
966 | + self.assertEqual(doc.doc_id, part2['id']) |
967 | + self.assertEqual(1, part2['gen']) |
968 | + part3 = simplejson.loads(parts[3].rstrip(",")) |
969 | + self.assertTrue(part3['trans_id'].startswith('T-')) |
970 | + self.assertEqual('{"value": "there2"}', part3['content']) |
971 | + self.assertEqual(doc2.rev, part3['rev']) |
972 | + self.assertEqual(doc2.doc_id, part3['id']) |
973 | + self.assertEqual(2, part3['gen']) |
974 | self.assertEqual(']', parts[4]) |
975 | |
976 | def test_sync_exchange_error_in_stream(self): |
977 | |
978 | === modified file 'u1db/tests/test_remote_sync_target.py' |
979 | --- u1db/tests/test_remote_sync_target.py 2012-05-31 12:22:03 +0000 |
980 | +++ u1db/tests/test_remote_sync_target.py 2012-06-15 00:40:24 +0000 |
981 | @@ -81,8 +81,9 @@ |
982 | self.assertRaises(errors.BrokenSyncStream, |
983 | tgt._parse_sync_stream, |
984 | '[\r\n{},\r\n{"id": "i", "rev": "r", ' |
985 | - '"content": "c", "gen": 3},\r\n]', |
986 | - lambda doc, gen: None) |
987 | + '"content": "c", "gen": 3, "trans_id": "T-sid"}' |
988 | + ',\r\n]', |
989 | + lambda doc, gen, trans_id: None) |
990 | |
991 | def test_error_in_stream(self): |
992 | tgt = http_target.HTTPSyncTarget("http://foo/foo") |
993 | @@ -102,6 +103,7 @@ |
994 | |
995 | |
996 | def http_server_def(): |
997 | + |
998 | def make_server(host_port, handler, state): |
999 | application = http_app.HTTPApp(state) |
1000 | srv = simple_server.WSGIServer(host_port, handler) |
1001 | @@ -111,9 +113,11 @@ |
1002 | # handler |
1003 | # ) |
1004 | return srv |
1005 | + |
1006 | class req_handler(simple_server.WSGIRequestHandler): |
1007 | def log_request(*args): |
1008 | pass # suppress |
1009 | + |
1010 | #rh = httpserver.WSGIHandler |
1011 | return make_server, req_handler, "shutdown", "http" |
1012 | |
1013 | @@ -123,6 +127,7 @@ |
1014 | |
1015 | |
1016 | def oauth_http_server_def(): |
1017 | + |
1018 | def make_server(host_port, handler, state): |
1019 | app = http_app.HTTPApp(state) |
1020 | application = oauth_middleware.OAuthMiddleware(app, None) |
1021 | @@ -132,9 +137,11 @@ |
1022 | application.base_url = "http://%s:%s" % srv.server_address |
1023 | srv.set_app(application) |
1024 | return srv |
1025 | + |
1026 | class req_handler(simple_server.WSGIRequestHandler): |
1027 | def log_request(*args): |
1028 | pass # suppress |
1029 | + |
1030 | return make_server, req_handler, "shutdown", "http" |
1031 | |
1032 | |
1033 | @@ -181,26 +188,31 @@ |
1034 | db = self.request_state._create_database('test') |
1035 | remote_target = self.getSyncTarget('test') |
1036 | other_docs = [] |
1037 | + |
1038 | def receive_doc(doc): |
1039 | other_docs.append((doc.doc_id, doc.rev, doc.get_json())) |
1040 | + |
1041 | doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') |
1042 | new_gen, trans_id = remote_target.sync_exchange( |
1043 | - [(doc, 10)], |
1044 | + [(doc, 10, 'T-sid')], |
1045 | 'replica', last_known_generation=0, |
1046 | return_doc_cb=receive_doc) |
1047 | self.assertEqual(1, new_gen) |
1048 | - self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}', |
1049 | - False) |
1050 | + self.assertGetDoc( |
1051 | + db, 'doc-here', 'replica:1', '{"value": "here"}', False) |
1052 | |
1053 | def test_sync_exchange_send_failure_and_retry_scenario(self): |
1054 | self.startServer() |
1055 | + |
1056 | def blackhole_getstderr(inst): |
1057 | return cStringIO.StringIO() |
1058 | + |
1059 | self.patch(self.server.RequestHandlerClass, 'get_stderr', |
1060 | blackhole_getstderr) |
1061 | db = self.request_state._create_database('test') |
1062 | _put_doc_if_newer = db._put_doc_if_newer |
1063 | trigger_ids = ['doc-here2'] |
1064 | + |
1065 | def bomb_put_doc_if_newer(doc, save_conflict, |
1066 | replica_uid=None, replica_gen=None, |
1067 | replica_trans_id=None): |
1068 | @@ -212,33 +224,37 @@ |
1069 | self.patch(db, '_put_doc_if_newer', bomb_put_doc_if_newer) |
1070 | remote_target = self.getSyncTarget('test') |
1071 | other_changes = [] |
1072 | - def receive_doc(doc, gen): |
1073 | - other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen)) |
1074 | + |
1075 | + def receive_doc(doc, gen, trans_id): |
1076 | + other_changes.append( |
1077 | + (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) |
1078 | + |
1079 | doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') |
1080 | doc2 = self.make_document('doc-here2', 'replica:1', |
1081 | '{"value": "here2"}') |
1082 | self.assertRaises(errors.HTTPError, remote_target.sync_exchange, |
1083 | - [(doc1, 10), |
1084 | - (doc2, 11) |
1085 | + [(doc1, 10, 'T-sid'), |
1086 | + (doc2, 11, 'T-sud') |
1087 | ], 'replica', last_known_generation=0, |
1088 | return_doc_cb=receive_doc) |
1089 | self.assertGetDoc(db, 'doc-here', 'replica:1', '{"value": "here"}', |
1090 | False) |
1091 | - self.assertEqual((10, None), db._get_sync_gen_info('replica')) |
1092 | + self.assertEqual((10, 'T-sid'), db._get_sync_gen_info('replica')) |
1093 | self.assertEqual([], other_changes) |
1094 | # retry |
1095 | trigger_ids = [] |
1096 | new_gen, trans_id = remote_target.sync_exchange( |
1097 | - [(doc2, 11) |
1098 | + [(doc2, 11, 'T-sud') |
1099 | ], 'replica', last_known_generation=0, |
1100 | return_doc_cb=receive_doc) |
1101 | self.assertGetDoc(db, 'doc-here2', 'replica:1', '{"value": "here2"}', |
1102 | False) |
1103 | - self.assertEqual((11, None), db._get_sync_gen_info('replica')) |
1104 | + self.assertEqual((11, 'T-sud'), db._get_sync_gen_info('replica')) |
1105 | self.assertEqual(2, new_gen) |
1106 | # bounced back to us |
1107 | - self.assertEqual([('doc-here', 'replica:1', '{"value": "here"}', 1)], |
1108 | - other_changes) |
1109 | + self.assertEqual( |
1110 | + ('doc-here', 'replica:1', '{"value": "here"}', 1), |
1111 | + other_changes[0][:-1]) |
1112 | |
1113 | def test_sync_exchange_in_stream_error(self): |
1114 | self.startServer() |
1115 | @@ -261,14 +277,16 @@ |
1116 | remote_target = self.getSyncTarget('test') |
1117 | other_changes = [] |
1118 | |
1119 | - def receive_doc(doc, gen): |
1120 | - other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen)) |
1121 | + def receive_doc(doc, gen, trans_id): |
1122 | + other_changes.append( |
1123 | + (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) |
1124 | |
1125 | self.assertRaises(errors.Unavailable, remote_target.sync_exchange, |
1126 | [], 'replica', last_known_generation=0, |
1127 | return_doc_cb=receive_doc) |
1128 | - self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}', 1)], |
1129 | - other_changes) |
1130 | + self.assertEqual( |
1131 | + (doc.doc_id, doc.rev, '{"value": "there"}', 1), |
1132 | + other_changes[0][:-1]) |
1133 | |
1134 | def test_sync_exchange_receive(self): |
1135 | self.startServer() |
1136 | @@ -276,14 +294,18 @@ |
1137 | doc = db.create_doc('{"value": "there"}') |
1138 | remote_target = self.getSyncTarget('test') |
1139 | other_changes = [] |
1140 | - def receive_doc(doc, gen): |
1141 | - other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen)) |
1142 | + |
1143 | + def receive_doc(doc, gen, trans_id): |
1144 | + other_changes.append( |
1145 | + (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) |
1146 | + |
1147 | new_gen, trans_id = remote_target.sync_exchange( |
1148 | [], 'replica', last_known_generation=0, |
1149 | return_doc_cb=receive_doc) |
1150 | self.assertEqual(1, new_gen) |
1151 | - self.assertEqual([(doc.doc_id, doc.rev, '{"value": "there"}', 1)], |
1152 | - other_changes) |
1153 | + self.assertEqual( |
1154 | + (doc.doc_id, doc.rev, '{"value": "there"}', 1), |
1155 | + other_changes[0][:-1]) |
1156 | |
1157 | |
1158 | load_tests = tests.load_with_scenarios |
1159 | |
1160 | === modified file 'u1db/tests/test_sync.py' |
1161 | --- u1db/tests/test_sync.py 2012-06-08 07:39:35 +0000 |
1162 | +++ u1db/tests/test_sync.py 2012-06-15 00:40:24 +0000 |
1163 | @@ -128,8 +128,9 @@ |
1164 | del self.db |
1165 | super(DatabaseSyncTargetTests, self).tearDown() |
1166 | |
1167 | - def receive_doc(self, doc, gen): |
1168 | - self.other_changes.append((doc.doc_id, doc.rev, doc.get_json(), gen)) |
1169 | + def receive_doc(self, doc, gen, trans_id): |
1170 | + self.other_changes.append( |
1171 | + (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) |
1172 | |
1173 | def set_trace_hook(self, callback): |
1174 | try: |
1175 | @@ -157,7 +158,8 @@ |
1176 | |
1177 | def test_sync_exchange(self): |
1178 | docs_by_gen = [ |
1179 | - (self.make_document('doc-id', 'replica:1', simple_doc), 10)] |
1180 | + (self.make_document('doc-id', 'replica:1', simple_doc), 10, |
1181 | + 'T-sid')] |
1182 | new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica', |
1183 | last_known_generation=0, |
1184 | return_doc_cb=self.receive_doc) |
1185 | @@ -172,7 +174,7 @@ |
1186 | doc = self.db.create_doc('{}') |
1187 | edit_rev = 'replica:1|' + doc.rev |
1188 | docs_by_gen = [ |
1189 | - (self.make_document(doc.doc_id, edit_rev, None), 10)] |
1190 | + (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')] |
1191 | new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica', |
1192 | last_known_generation=0, |
1193 | return_doc_cb=self.receive_doc) |
1194 | @@ -186,8 +188,9 @@ |
1195 | |
1196 | def test_sync_exchange_push_many(self): |
1197 | docs_by_gen = [ |
1198 | - (self.make_document('doc-id', 'replica:1', simple_doc), 10), |
1199 | - (self.make_document('doc-id2', 'replica:1', nested_doc), 11)] |
1200 | + (self.make_document('doc-id', 'replica:1', simple_doc), 10, 'T-1'), |
1201 | + (self.make_document('doc-id2', 'replica:1', nested_doc), 11, |
1202 | + 'T-2')] |
1203 | new_gen, trans_id = self.st.sync_exchange(docs_by_gen, 'replica', |
1204 | last_known_generation=0, |
1205 | return_doc_cb=self.receive_doc) |
1206 | @@ -204,13 +207,15 @@ |
1207 | self.assertTransactionLog([doc.doc_id], self.db) |
1208 | new_doc = '{"key": "altval"}' |
1209 | docs_by_gen = [ |
1210 | - (self.make_document(doc.doc_id, 'replica:1', new_doc), 10)] |
1211 | - new_gen, _ = self.st.sync_exchange(docs_by_gen, 'replica', |
1212 | - last_known_generation=0, |
1213 | - return_doc_cb=self.receive_doc) |
1214 | + (self.make_document(doc.doc_id, 'replica:1', new_doc), 10, |
1215 | + 'T-sid')] |
1216 | + new_gen, _ = self.st.sync_exchange( |
1217 | + docs_by_gen, 'replica', last_known_generation=0, |
1218 | + return_doc_cb=self.receive_doc) |
1219 | self.assertTransactionLog([doc.doc_id], self.db) |
1220 | - self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1)], 1), |
1221 | - (self.other_changes, new_gen)) |
1222 | + self.assertEqual( |
1223 | + (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1]) |
1224 | + self.assertEqual(1, new_gen) |
1225 | if self.whitebox: |
1226 | self.assertEqual(self.db._last_exchange_log['return'], |
1227 | {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) |
1228 | @@ -219,7 +224,7 @@ |
1229 | doc = self.db.create_doc(simple_doc) |
1230 | self.assertTransactionLog([doc.doc_id], self.db) |
1231 | docs_by_gen = [ |
1232 | - (self.make_document(doc.doc_id, doc.rev, simple_doc), 10)] |
1233 | + (self.make_document(doc.doc_id, doc.rev, simple_doc), 10, 'T-sid')] |
1234 | new_gen, _ = self.st.sync_exchange(docs_by_gen, 'replica', |
1235 | last_known_generation=1, |
1236 | return_doc_cb=self.receive_doc) |
1237 | @@ -233,8 +238,9 @@ |
1238 | last_known_generation=0, |
1239 | return_doc_cb=self.receive_doc) |
1240 | self.assertTransactionLog([doc.doc_id], self.db) |
1241 | - self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1)], 1), |
1242 | - (self.other_changes, new_gen)) |
1243 | + self.assertEqual( |
1244 | + (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1]) |
1245 | + self.assertEqual(1, new_gen) |
1246 | if self.whitebox: |
1247 | self.assertEqual(self.db._last_exchange_log['return'], |
1248 | {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) |
1249 | @@ -247,8 +253,9 @@ |
1250 | last_known_generation=0, |
1251 | return_doc_cb=self.receive_doc) |
1252 | self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) |
1253 | - self.assertEqual(([(doc.doc_id, doc.rev, None, 2)], 2), |
1254 | - (self.other_changes, new_gen)) |
1255 | + self.assertEqual( |
1256 | + (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1]) |
1257 | + self.assertEqual(2, new_gen) |
1258 | if self.whitebox: |
1259 | self.assertEqual(self.db._last_exchange_log['return'], |
1260 | {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]}) |
1261 | @@ -261,9 +268,11 @@ |
1262 | last_known_generation=0, |
1263 | return_doc_cb=self.receive_doc) |
1264 | self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) |
1265 | - self.assertEqual(([(doc.doc_id, doc.rev, simple_doc, 1), |
1266 | - (doc2.doc_id, doc2.rev, nested_doc, 2)], 2), |
1267 | - (self.other_changes, new_gen)) |
1268 | + self.assertEqual(2, new_gen) |
1269 | + self.assertEqual( |
1270 | + [(doc.doc_id, doc.rev, simple_doc, 1), |
1271 | + (doc2.doc_id, doc2.rev, nested_doc, 2)], |
1272 | + [c[:-1] for c in self.other_changes]) |
1273 | if self.whitebox: |
1274 | self.assertEqual( |
1275 | self.db._last_exchange_log['return'], |
1276 | @@ -275,7 +284,8 @@ |
1277 | self.assertTransactionLog([doc.doc_id], self.db) |
1278 | new_doc = '{"key": "altval"}' |
1279 | docs_by_gen = [ |
1280 | - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)] |
1281 | + (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, |
1282 | + 'T-sid')] |
1283 | new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica', |
1284 | last_known_generation=0, |
1285 | return_doc_cb=self.receive_doc) |
1286 | @@ -298,11 +308,13 @@ |
1287 | self.assertTransactionLog([doc.doc_id], self.db) |
1288 | new_doc = '{"key": "altval"}' |
1289 | docs_by_gen = [ |
1290 | - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)] |
1291 | - new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica', |
1292 | - last_known_generation=0, |
1293 | - return_doc_cb=self.receive_doc) |
1294 | - self.assertEqual((expected, 3), (self.other_changes, new_gen)) |
1295 | + (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, |
1296 | + 'T-sid')] |
1297 | + new_gen, _ = self.st.sync_exchange( |
1298 | + docs_by_gen, 'other-replica', last_known_generation=0, |
1299 | + return_doc_cb=self.receive_doc) |
1300 | + self.assertEqual(expected, [c[:-1] for c in self.other_changes]) |
1301 | + self.assertEqual(3, new_gen) |
1302 | |
1303 | def test_sync_exchange_with_concurrent_updates(self): |
1304 | def after_whatschanged_cb(state): |
1305 | @@ -314,7 +326,8 @@ |
1306 | self.assertTransactionLog([doc.doc_id], self.db) |
1307 | new_doc = '{"key": "altval"}' |
1308 | docs_by_gen = [ |
1309 | - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10)] |
1310 | + (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, |
1311 | + 'T-sid')] |
1312 | new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica', |
1313 | last_known_generation=0, |
1314 | return_doc_cb=self.receive_doc) |
1315 | @@ -323,8 +336,9 @@ |
1316 | def test_sync_exchange_converged_handling(self): |
1317 | doc = self.db.create_doc(simple_doc) |
1318 | docs_by_gen = [ |
1319 | - (self.make_document('new', 'other:1', '{}'), 4), |
1320 | - (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5)] |
1321 | + (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'), |
1322 | + (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5, |
1323 | + 'T-bar')] |
1324 | new_gen, _ = self.st.sync_exchange(docs_by_gen, 'other-replica', |
1325 | last_known_generation=0, |
1326 | return_doc_cb=self.receive_doc) |
1327 | @@ -352,8 +366,9 @@ |
1328 | self.skipTest("sync_exchange_doc_ids not implemented") |
1329 | db2 = self.create_database('test2') |
1330 | doc = db2.create_doc(simple_doc) |
1331 | - new_gen, trans_id = sync_exchange_doc_ids(db2, [(doc.doc_id, 10)], 0, |
1332 | - return_doc_cb=self.receive_doc) |
1333 | + new_gen, trans_id = sync_exchange_doc_ids( |
1334 | + db2, [(doc.doc_id, 10, 'T-sid')], 0, |
1335 | + return_doc_cb=self.receive_doc) |
1336 | self.assertGetDoc(self.db, doc.doc_id, doc.rev, simple_doc, False) |
1337 | self.assertTransactionLog([doc.doc_id], self.db) |
1338 | last_trans_id = self.getLastTransId(self.db) |
Since we get a lot of uuid txids, I strip them in a lot of the tests. This means the tests will still fail if the txids are not there, but perhaps not in the most obvious way, and we make no assumptions about the contents of the txids, but I think that's probably a good thing.
Also I seem to have misplaced some 3.5K of memory, which I will hunt down and fix tomorrow.