Merge lp:~pedronis/u1db/sync-autocreate into lp:u1db
- sync-autocreate
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 412 |
Merged at revision: | 411 |
Proposed branch: | lp:~pedronis/u1db/sync-autocreate |
Merge into: | lp:u1db |
Diff against target: |
846 lines (+269/-68) 14 files modified
include/u1db/u1db_internal.h (+10/-2) src/u1db_http_sync_target.c (+52/-17) src/u1db_sync_target.c (+55/-10) u1db/__init__.py (+8/-3) u1db/remote/http_app.py (+23/-10) u1db/remote/http_target.py (+7/-4) u1db/remote/server_state.py (+3/-2) u1db/sync.py (+22/-7) u1db/tests/__init__.py (+3/-2) u1db/tests/c_backend_wrapper.pyx (+7/-5) u1db/tests/test_http_app.py (+32/-0) u1db/tests/test_remote_sync_target.py (+24/-0) u1db/tests/test_server_state.py (+3/-2) u1db/tests/test_sync.py (+20/-4) |
To merge this branch: | bzr merge lp:~pedronis/u1db/sync-autocreate |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Lucio Torre (community) | Approve | ||
Review via email: mp+126330@code.launchpad.net |
Commit message
support sync(autocreate
Description of the change
support sync(autocreate
To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'include/u1db/u1db_internal.h' |
2 | --- include/u1db/u1db_internal.h 2012-08-15 12:39:01 +0000 |
3 | +++ include/u1db/u1db_internal.h 2012-09-25 20:06:22 +0000 |
4 | @@ -48,6 +48,8 @@ |
5 | |
6 | typedef int (*u1db__trace_callback)(void *context, const char *state); |
7 | |
8 | +typedef void (*u1db__ensure_callback)(void *context, const char *replica_uid); |
9 | + |
10 | struct _u1db_sync_target { |
11 | void *trace_context; |
12 | u1db__trace_callback trace_cb; |
13 | @@ -119,11 +121,16 @@ |
14 | * @param cb After sending the requested documents, we read the |
15 | * response stream. For each document in the stream, we |
16 | * will trigger a callback. |
17 | + * @param: ensure_callback If set the target may create |
18 | + * the target db if not yet existent, |
19 | + * the callback can then be used to inform of |
20 | + * the created db replica uid. |
21 | */ |
22 | int (*sync_exchange_doc_ids)(u1db_sync_target *st, u1database *source_db, |
23 | int n_doc_ids, const char **doc_ids, int *generations, |
24 | const char **trans_ids, int *target_gen, char **target_trans_id, |
25 | - void *context, u1db_doc_gen_callback cb); |
26 | + void *context, u1db_doc_gen_callback cb, |
27 | + u1db__ensure_callback ensure_callback); |
28 | |
29 | /** |
30 | * Same as sync_exchange, only using document objects. |
31 | @@ -133,7 +140,8 @@ |
32 | u1db_document **docs, int *generations, |
33 | const char **trans_ids, int *target_gen, |
34 | char **target_trans_id, void *context, |
35 | - u1db_doc_gen_callback cb); |
36 | + u1db_doc_gen_callback cb, |
37 | + u1db__ensure_callback ensure_callback); |
38 | /** |
39 | * Create a sync_exchange state object. |
40 | * |
41 | |
42 | === modified file 'src/u1db_http_sync_target.c' |
43 | --- src/u1db_http_sync_target.c 2012-07-26 16:24:39 +0000 |
44 | +++ src/u1db_http_sync_target.c 2012-09-25 20:06:22 +0000 |
45 | @@ -62,7 +62,8 @@ |
46 | u1db_document **docs, int *generations, |
47 | const char **trans_ids, int *target_gen, |
48 | char **target_trans_id, void *context, |
49 | - u1db_doc_gen_callback cb); |
50 | + u1db_doc_gen_callback cb, |
51 | + u1db__ensure_callback ensure_callback); |
52 | static int st_http_sync_exchange_doc_ids(u1db_sync_target *st, |
53 | u1database *source_db, int n_doc_ids, |
54 | const char **doc_ids, |
55 | @@ -70,7 +71,8 @@ |
56 | const char **trans_ids, |
57 | int *target_gen, |
58 | char **target_trans_id, void *context, |
59 | - u1db_doc_gen_callback cb); |
60 | + u1db_doc_gen_callback cb, |
61 | + u1db__ensure_callback ensure_callback); |
62 | static void st_http_finalize_sync_exchange(u1db_sync_target *st, |
63 | u1db_sync_exchange **exchange); |
64 | static int st_http_set_trace_hook(u1db_sync_target *st, |
65 | @@ -794,7 +796,7 @@ |
66 | |
67 | static int |
68 | init_temp_file(char tmpname[], FILE **temp_fd, int target_gen, |
69 | - char *target_trans_id) |
70 | + char *target_trans_id, int ensure) |
71 | { |
72 | int status = U1DB_OK; |
73 | *temp_fd = make_tempfile(tmpname); |
74 | @@ -809,8 +811,16 @@ |
75 | // determine Content-Length before we start uploading the data. |
76 | fprintf( |
77 | *temp_fd, |
78 | - "[\r\n{\"last_known_generation\": %d, \"last_known_trans_id\": \"%s\"}", |
79 | + "[\r\n{\"last_known_generation\": %d, \"last_known_trans_id\": \"%s\"", |
80 | target_gen, target_trans_id); |
81 | + if (ensure) { |
82 | + fprintf( |
83 | + *temp_fd, |
84 | + ", \"ensure\": true"); |
85 | + } |
86 | + fprintf( |
87 | + *temp_fd, |
88 | + "}"); |
89 | finish: |
90 | return status; |
91 | } |
92 | @@ -866,7 +876,7 @@ |
93 | } |
94 | if (status != U1DB_OK) { goto finish; } |
95 | if (http_code != 200 && http_code != 201) { |
96 | - printf("broken 0\n"); |
97 | + //printf("broken 0\n"); |
98 | status = U1DB_BROKEN_SYNC_STREAM; |
99 | goto finish; |
100 | } |
101 | @@ -883,7 +893,8 @@ |
102 | |
103 | static int |
104 | process_response(u1db_sync_target *st, void *context, u1db_doc_gen_callback cb, |
105 | - char *response, int *target_gen, char **target_trans_id) |
106 | + u1db__ensure_callback ensure_callback, char *response, |
107 | + int *target_gen, char **target_trans_id) |
108 | { |
109 | int status = U1DB_OK; |
110 | int i, doc_count; |
111 | @@ -893,37 +904,38 @@ |
112 | int gen; |
113 | const char *trans_id = NULL; |
114 | u1db_document *doc; |
115 | + const char *replica_uid = NULL; |
116 | |
117 | json = json_tokener_parse(response); |
118 | if (json == NULL || !json_object_is_type(json, json_type_array)) { |
119 | - printf("broken 1, response: %s\n", response); |
120 | + //printf("broken 1, response: %s\n", response); |
121 | status = U1DB_BROKEN_SYNC_STREAM; |
122 | goto finish; |
123 | } |
124 | doc_count = json_object_array_length(json); |
125 | if (doc_count < 1) { |
126 | // the first response is the new_generation info, so it must exist |
127 | - printf("broken 2\n"); |
128 | + //printf("broken 2\n"); |
129 | status = U1DB_BROKEN_SYNC_STREAM; |
130 | goto finish; |
131 | } |
132 | obj = json_object_array_get_idx(json, 0); |
133 | attr = json_object_object_get(obj, "new_generation"); |
134 | if (attr == NULL) { |
135 | - printf("broken 3\n"); |
136 | + //printf("broken 3\n"); |
137 | status = U1DB_BROKEN_SYNC_STREAM; |
138 | goto finish; |
139 | } |
140 | *target_gen = json_object_get_int(attr); |
141 | attr = json_object_object_get(obj, "new_transaction_id"); |
142 | if (attr == NULL) { |
143 | - printf("broken 4\n"); |
144 | + //printf("broken 4\n"); |
145 | status = U1DB_BROKEN_SYNC_STREAM; |
146 | goto finish; |
147 | } |
148 | tmp = json_object_get_string(attr); |
149 | if (tmp == NULL) { |
150 | - printf("broken 5\n"); |
151 | + //printf("broken 5\n"); |
152 | status = U1DB_BROKEN_SYNC_STREAM; |
153 | goto finish; |
154 | } |
155 | @@ -933,6 +945,23 @@ |
156 | goto finish; |
157 | } |
158 | |
159 | + if (ensure_callback) { |
160 | + attr = json_object_object_get(obj, "replica_uid"); |
161 | + if (attr != NULL) { |
162 | + tmp = json_object_get_string(attr); |
163 | + if (tmp == NULL) { |
164 | + status = U1DB_BROKEN_SYNC_STREAM; |
165 | + goto finish; |
166 | + } |
167 | + replica_uid = strdup(tmp); |
168 | + if (*target_trans_id == NULL) { |
169 | + status = U1DB_NOMEM; |
170 | + goto finish; |
171 | + } |
172 | + ensure_callback(context, replica_uid); |
173 | + } |
174 | + } |
175 | + |
176 | for (i = 1; i < doc_count; ++i) { |
177 | obj = json_object_array_get_idx(json, i); |
178 | attr = json_object_object_get(obj, "id"); |
179 | @@ -989,7 +1018,8 @@ |
180 | int n_docs, u1db_document **docs, int *generations, |
181 | const char **trans_ids, int *target_gen, |
182 | char **target_trans_id, void *context, |
183 | - u1db_doc_gen_callback cb) |
184 | + u1db_doc_gen_callback cb, |
185 | + u1db__ensure_callback ensure_callback) |
186 | { |
187 | int status, i; |
188 | FILE *temp_fd = NULL; |
189 | @@ -1004,7 +1034,8 @@ |
190 | if (n_docs > 0 && (docs == NULL || generations == NULL)) { |
191 | return U1DB_INVALID_PARAMETER; |
192 | } |
193 | - status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id); |
194 | + status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id, |
195 | + ensure_callback != NULL); |
196 | if (status != U1DB_OK) { goto finish; } |
197 | for (i = 0; i < n_docs; ++i) { |
198 | status = doc_to_tempfile( |
199 | @@ -1013,7 +1044,8 @@ |
200 | } |
201 | status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req); |
202 | if (status != U1DB_OK) { goto finish; } |
203 | - status = process_response(st, context, cb, req.body_buffer, target_gen, |
204 | + status = process_response(st, context, cb, ensure_callback, |
205 | + req.body_buffer, target_gen, |
206 | target_trans_id); |
207 | finish: |
208 | cleanup_temp_files(tmpname, temp_fd, &req); |
209 | @@ -1058,7 +1090,8 @@ |
210 | int n_doc_ids, const char **doc_ids, |
211 | int *generations, const char **trans_ids, |
212 | int *target_gen, char **target_trans_id, |
213 | - void *context, u1db_doc_gen_callback cb) |
214 | + void *context, u1db_doc_gen_callback cb, |
215 | + u1db__ensure_callback ensure_callback) |
216 | { |
217 | int status; |
218 | FILE *temp_fd = NULL; |
219 | @@ -1077,7 +1110,8 @@ |
220 | } |
221 | status = u1db_get_replica_uid(source_db, &source_replica_uid); |
222 | if (status != U1DB_OK) { goto finish; } |
223 | - status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id); |
224 | + status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id, |
225 | + ensure_callback != NULL); |
226 | if (status != U1DB_OK) { goto finish; } |
227 | state.num = n_doc_ids; |
228 | state.generations = generations; |
229 | @@ -1088,7 +1122,8 @@ |
230 | if (status != U1DB_OK) { goto finish; } |
231 | status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req); |
232 | if (status != U1DB_OK) { goto finish; } |
233 | - status = process_response(st, context, cb, req.body_buffer, target_gen, |
234 | + status = process_response(st, context, cb, ensure_callback, |
235 | + req.body_buffer, target_gen, |
236 | target_trans_id); |
237 | finish: |
238 | cleanup_temp_files(tmpname, temp_fd, &req); |
239 | |
240 | === modified file 'src/u1db_sync_target.c' |
241 | --- src/u1db_sync_target.c 2012-08-15 12:57:57 +0000 |
242 | +++ src/u1db_sync_target.c 2012-09-25 20:06:22 +0000 |
243 | @@ -35,13 +35,15 @@ |
244 | u1db_document **docs, int *generations, |
245 | const char **trans_ids, int *target_gen, |
246 | char **target_trans_id, void *context, |
247 | - u1db_doc_gen_callback cb); |
248 | + u1db_doc_gen_callback cb, |
249 | + u1db__ensure_callback ensure_callback); |
250 | static int st_sync_exchange_doc_ids(u1db_sync_target *st, |
251 | u1database *source_db, int n_doc_ids, |
252 | const char **doc_ids, int *generations, |
253 | const char **trans_ids, int *target_gen, |
254 | char **target_trans_id, void *context, |
255 | - u1db_doc_gen_callback cb); |
256 | + u1db_doc_gen_callback cb, |
257 | + u1db__ensure_callback ensure_callback); |
258 | static int st_get_sync_exchange(u1db_sync_target *st, |
259 | const char *source_replica_uid, |
260 | int source_gen, |
261 | @@ -477,6 +479,12 @@ |
262 | return status; |
263 | } |
264 | |
265 | +static void ensure_callback(void *context, const char *replica_uid) { |
266 | + struct _return_doc_state *state; |
267 | + state = (struct _return_doc_state *)context; |
268 | + state->target_uid = replica_uid; |
269 | +} |
270 | + |
271 | |
272 | static int |
273 | get_and_insert_docs(u1database *source_db, u1db_sync_exchange *se, |
274 | @@ -503,7 +511,8 @@ |
275 | int n_docs, u1db_document **docs, int *generations, |
276 | const char **trans_ids, int *target_gen, |
277 | char **target_trans_id, void *context, |
278 | - u1db_doc_gen_callback cb) |
279 | + u1db_doc_gen_callback cb, |
280 | + u1db__ensure_callback ensure_callback) |
281 | { |
282 | int status, i; |
283 | |
284 | @@ -544,7 +553,8 @@ |
285 | st_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db, |
286 | int n_doc_ids, const char **doc_ids, int *generations, |
287 | const char **trans_ids, int *target_gen, char **target_trans_id, |
288 | - void *context, u1db_doc_gen_callback cb) |
289 | + void *context, u1db_doc_gen_callback cb, |
290 | + u1db__ensure_callback ensure_callback) |
291 | { |
292 | int status; |
293 | const char *source_replica_uid = NULL; |
294 | @@ -598,6 +608,7 @@ |
295 | char *target_trans_id = NULL; |
296 | int target_gen, local_gen; |
297 | int local_gen_known_by_target, target_gen_known_by_local; |
298 | + u1db__ensure_callback ensure= NULL; |
299 | |
300 | // fprintf(stderr, "Starting\n"); |
301 | if (db == NULL || target == NULL || local_gen_before_sync == NULL) { |
302 | @@ -612,14 +623,38 @@ |
303 | status = target->get_sync_info( |
304 | target, local_uid, &target_uid, &target_gen, &target_trans_id, |
305 | &local_gen_known_by_target, &local_trans_id_known_by_target); |
306 | - if (status != U1DB_OK) { goto finish; } |
307 | + if (status == 404) { // database does not exist |
308 | + target_uid = NULL; |
309 | + target_gen = 0; |
310 | + local_gen_known_by_target = 0; |
311 | + target_trans_id = strdup(""); |
312 | + if (target_trans_id == NULL) { |
313 | + status = U1DB_NOMEM; |
314 | + goto finish; |
315 | + } |
316 | + local_trans_id_known_by_target = strdup(""); |
317 | + if (local_trans_id_known_by_target == NULL) { |
318 | + status = U1DB_NOMEM; |
319 | + goto finish; |
320 | + } |
321 | + ensure = ensure_callback; |
322 | + } else if (status != U1DB_OK) { goto finish; } |
323 | status = u1db_validate_gen_and_trans_id( |
324 | db, local_gen_known_by_target, local_trans_id_known_by_target); |
325 | if (status != U1DB_OK) { goto finish; } |
326 | - status = u1db__get_replica_gen_and_trans_id( |
327 | - db, target_uid, &target_gen_known_by_local, |
328 | - &target_trans_id_known_by_local); |
329 | - if (status != U1DB_OK) { goto finish; } |
330 | + if (target_uid == NULL) { |
331 | + target_gen_known_by_local = 0; |
332 | + target_trans_id_known_by_local = strdup(""); |
333 | + if (target_trans_id_known_by_local == NULL) { |
334 | + status = U1DB_NOMEM; |
335 | + goto finish; |
336 | + } |
337 | + } else { |
338 | + status = u1db__get_replica_gen_and_trans_id( |
339 | + db, target_uid, &target_gen_known_by_local, |
340 | + &target_trans_id_known_by_local); |
341 | + if (status != U1DB_OK) { goto finish; } |
342 | + } |
343 | local_target_trans_id = target_trans_id_known_by_local; |
344 | local_gen = local_gen_known_by_target; |
345 | |
346 | @@ -649,7 +684,14 @@ |
347 | (const char**)to_send_state.doc_ids_to_return, |
348 | to_send_state.gen_for_doc_ids, to_send_state.trans_ids_for_doc_ids, |
349 | &target_gen_known_by_local, &target_trans_id_known_by_local, |
350 | - &return_doc_state, return_doc_to_insert_from_target); |
351 | + &return_doc_state, return_doc_to_insert_from_target, ensure); |
352 | + if (ensure != NULL) { |
353 | + if (return_doc_state.target_uid != NULL) { |
354 | + target_uid = return_doc_state.target_uid; |
355 | + } else { |
356 | + ensure = NULL; |
357 | + } |
358 | + } |
359 | if (status != U1DB_OK) { goto finish; } |
360 | if (local_trans_id != NULL) { |
361 | free(local_trans_id); |
362 | @@ -671,6 +713,9 @@ |
363 | if (status != U1DB_OK) { goto finish; } |
364 | } |
365 | finish: |
366 | + if (ensure != NULL && target_uid != NULL) { |
367 | + free((void *)target_uid); |
368 | + } |
369 | if (local_trans_id != NULL) { |
370 | free(local_trans_id); |
371 | } |
372 | |
373 | === modified file 'u1db/__init__.py' |
374 | --- u1db/__init__.py 2012-09-24 14:12:27 +0000 |
375 | +++ u1db/__init__.py 2012-09-25 20:06:22 +0000 |
376 | @@ -303,7 +303,7 @@ |
377 | """Release any resources associated with this database.""" |
378 | raise NotImplementedError(self.close) |
379 | |
380 | - def sync(self, url, creds=None): |
381 | + def sync(self, url, creds=None, autocreate=True): |
382 | """Synchronize documents with remote replica exposed at url. |
383 | |
384 | :param url: the url of the target replica to sync with. |
385 | @@ -316,6 +316,7 @@ |
386 | 'token_key': ..., |
387 | 'token_secret': ... |
388 | }} |
389 | + :param autocreate: ask the target to create the db if non-existent. |
390 | :return: local_gen_before_sync The local generation before the |
391 | synchronisation was performed. This is useful to pass into |
392 | whatschanged, if an application wants to know which documents were |
393 | @@ -323,7 +324,8 @@ |
394 | """ |
395 | from u1db.sync import Synchronizer |
396 | from u1db.remote.http_target import HTTPSyncTarget |
397 | - return Synchronizer(self, HTTPSyncTarget(url, creds=creds)).sync() |
398 | + return Synchronizer(self, HTTPSyncTarget(url, creds=creds)).sync( |
399 | + autocreate=autocreate) |
400 | |
401 | def _get_replica_gen_and_trans_id(self, other_replica_uid): |
402 | """Return the last known generation and transaction id for the other db |
403 | @@ -633,7 +635,7 @@ |
404 | |
405 | def sync_exchange(self, docs_by_generation, source_replica_uid, |
406 | last_known_generation, last_known_trans_id, |
407 | - return_doc_cb): |
408 | + return_doc_cb, ensure_callback=None): |
409 | """Incorporate the documents sent from the source replica. |
410 | |
411 | This is not meant to be called by client code directly, but is used as |
412 | @@ -665,6 +667,9 @@ |
413 | be invoked in turn with Documents that have changed since |
414 | last_known_generation together with the generation of |
415 | their last change. |
416 | + :param: ensure_callback(replica_uid): if set the target may create |
417 | + the target db if not yet existent, the callback can then |
418 | + be used to inform of the created db replica uid. |
419 | :return: new_generation - After applying docs_by_generation, this is |
420 | the current generation for this replica |
421 | """ |
422 | |
423 | === modified file 'u1db/remote/http_app.py' |
424 | --- u1db/remote/http_app.py 2012-09-04 16:24:31 +0000 |
425 | +++ u1db/remote/http_app.py 2012-09-25 20:06:22 +0000 |
426 | @@ -344,12 +344,16 @@ |
427 | def __init__(self, dbname, source_replica_uid, state, responder): |
428 | self.source_replica_uid = source_replica_uid |
429 | self.responder = responder |
430 | - self.db = state.open_database(dbname) |
431 | - self.target = self.db.get_sync_target() |
432 | + self.state = state |
433 | + self.dbname = dbname |
434 | + self.replica_uid = None |
435 | + |
436 | + def get_target(self): |
437 | + return self.state.open_database(self.dbname).get_sync_target() |
438 | |
439 | @http_method() |
440 | def get(self): |
441 | - result = self.target.get_sync_info(self.source_replica_uid) |
442 | + result = self.get_target().get_sync_info(self.source_replica_uid) |
443 | self.responder.send_response_json( |
444 | target_replica_uid=result[0], target_replica_generation=result[1], |
445 | target_replica_transaction_id=result[2], |
446 | @@ -360,19 +364,25 @@ |
447 | @http_method(generation=int, |
448 | content_as_args=True, no_query=True) |
449 | def put(self, generation, transaction_id): |
450 | - self.target.record_sync_info(self.source_replica_uid, generation, |
451 | - transaction_id) |
452 | + self.get_target().record_sync_info(self.source_replica_uid, |
453 | + generation, |
454 | + transaction_id) |
455 | self.responder.send_response_json(ok=True) |
456 | |
457 | # Implements the same logic as LocalSyncTarget.sync_exchange |
458 | |
459 | @http_method(last_known_generation=int, last_known_trans_id=none_or_str, |
460 | content_as_args=True) |
461 | - def post_args(self, last_known_generation, last_known_trans_id=None): |
462 | - self.db.validate_gen_and_trans_id( |
463 | + def post_args(self, last_known_generation, last_known_trans_id=None, |
464 | + ensure=False): |
465 | + if ensure: |
466 | + db, self.replica_uid = self.state.ensure_database(self.dbname) |
467 | + else: |
468 | + db = self.state.open_database(self.dbname) |
469 | + db.validate_gen_and_trans_id( |
470 | last_known_generation, last_known_trans_id) |
471 | self.sync_exch = self.sync_exchange_class( |
472 | - self.db, self.source_replica_uid, last_known_generation) |
473 | + db, self.source_replica_uid, last_known_generation) |
474 | |
475 | @http_method(content_as_args=True) |
476 | def post_stream_entry(self, id, rev, content, gen, trans_id): |
477 | @@ -390,8 +400,11 @@ |
478 | self.responder.content_type = 'application/x-u1db-sync-stream' |
479 | self.responder.start_response(200) |
480 | self.responder.start_stream(), |
481 | - self.responder.stream_entry({"new_generation": new_gen, |
482 | - "new_transaction_id": self.sync_exch.new_trans_id}) |
483 | + header = {"new_generation": new_gen, |
484 | + "new_transaction_id": self.sync_exch.new_trans_id} |
485 | + if self.replica_uid is not None: |
486 | + header['replica_uid'] = self.replica_uid |
487 | + self.responder.stream_entry(header) |
488 | self.sync_exch.return_docs(send_doc) |
489 | self.responder.end_stream() |
490 | self.responder.finish_response() |
491 | |
492 | === modified file 'u1db/remote/http_target.py' |
493 | --- u1db/remote/http_target.py 2012-09-20 18:46:10 +0000 |
494 | +++ u1db/remote/http_target.py 2012-09-25 20:06:22 +0000 |
495 | @@ -57,7 +57,7 @@ |
496 | {'generation': source_replica_generation, |
497 | 'transaction_id': source_transaction_id}) |
498 | |
499 | - def _parse_sync_stream(self, data, return_doc_cb): |
500 | + def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): |
501 | parts = data.splitlines() # one at a time |
502 | if not parts or parts[0] != '[': |
503 | raise BrokenSyncStream |
504 | @@ -66,6 +66,8 @@ |
505 | if data: |
506 | line, comma = utils.check_and_strip_comma(data[0]) |
507 | res = json.loads(line) |
508 | + if ensure_callback and 'replica_uid' in res: |
509 | + ensure_callback(res['replica_uid']) |
510 | for entry in data[1:]: |
511 | if not comma: # missing in between comma |
512 | raise BrokenSyncStream |
513 | @@ -88,7 +90,7 @@ |
514 | |
515 | def sync_exchange(self, docs_by_generations, source_replica_uid, |
516 | last_known_generation, last_known_trans_id, |
517 | - return_doc_cb): |
518 | + return_doc_cb, ensure_callback=None): |
519 | self._ensure_connection() |
520 | if self._trace_hook: # for tests |
521 | self._trace_hook('sync_exchange') |
522 | @@ -108,7 +110,8 @@ |
523 | comma = '' |
524 | size += prepare( |
525 | last_known_generation=last_known_generation, |
526 | - last_known_trans_id=last_known_trans_id) |
527 | + last_known_trans_id=last_known_trans_id, |
528 | + ensure=ensure_callback is not None) |
529 | comma = ',' |
530 | for doc, gen, trans_id in docs_by_generations: |
531 | size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), |
532 | @@ -121,7 +124,7 @@ |
533 | self._conn.send(entry) |
534 | entries = None |
535 | data, _ = self._response() |
536 | - res = self._parse_sync_stream(data, return_doc_cb) |
537 | + res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) |
538 | data = None |
539 | return res['new_generation'], res['new_transaction_id'] |
540 | |
541 | |
542 | === modified file 'u1db/remote/server_state.py' |
543 | --- u1db/remote/server_state.py 2012-05-15 11:11:06 +0000 |
544 | +++ u1db/remote/server_state.py 2012-09-25 20:06:22 +0000 |
545 | @@ -56,8 +56,9 @@ |
546 | """Ensure database at the given location.""" |
547 | from u1db.backends import sqlite_backend |
548 | full_path = self._relpath(path) |
549 | - return sqlite_backend.SQLiteDatabase.open_database(full_path, |
550 | - create=True) |
551 | + db = sqlite_backend.SQLiteDatabase.open_database(full_path, |
552 | + create=True) |
553 | + return db, db._replica_uid |
554 | |
555 | def delete_database(self, path): |
556 | """Delete database at the given location.""" |
557 | |
558 | === modified file 'u1db/sync.py' |
559 | --- u1db/sync.py 2012-07-26 19:14:26 +0000 |
560 | +++ u1db/sync.py 2012-09-25 20:06:22 +0000 |
561 | @@ -90,14 +90,26 @@ |
562 | self.sync_target.record_sync_info( |
563 | self.source._replica_uid, cur_gen, trans_id) |
564 | |
565 | - def sync(self, callback=None): |
566 | + def sync(self, callback=None, autocreate=False): |
567 | """Synchronize documents between source and target.""" |
568 | sync_target = self.sync_target |
569 | # get target identifier, its current generation, |
570 | # and its last-seen database generation for this source |
571 | - (self.target_replica_uid, target_gen, target_trans_id, target_my_gen, |
572 | - target_my_trans_id) = sync_target.get_sync_info( |
573 | - self.source._replica_uid) |
574 | + try: |
575 | + (self.target_replica_uid, target_gen, target_trans_id, |
576 | + target_my_gen, target_my_trans_id) = sync_target.get_sync_info( |
577 | + self.source._replica_uid) |
578 | + except errors.DatabaseDoesNotExist: |
579 | + if not autocreate: |
580 | + raise |
581 | + # will try to ask sync_exchange() to create the db |
582 | + self.target_replica_uid = None |
583 | + target_gen, target_trans_id = 0, '' |
584 | + target_my_gen, target_my_trans_id = 0, '' |
585 | + def ensure_callback(replica_uid): |
586 | + self.target_replica_uid = replica_uid |
587 | + else: |
588 | + ensure_callback = None |
589 | # validate the generation and transaction id the target knows about us |
590 | self.source.validate_gen_and_trans_id( |
591 | target_my_gen, target_my_trans_id) |
592 | @@ -105,7 +117,10 @@ |
593 | my_gen, _, changes = self.source.whats_changed(target_my_gen) |
594 | |
595 | # this source last-seen database generation for the target |
596 | - target_last_known_gen, target_last_known_trans_id = \ |
597 | + if self.target_replica_uid is None: |
598 | + target_last_known_gen, target_last_known_trans_id = 0, '' |
599 | + else: |
600 | + target_last_known_gen, target_last_known_trans_id = \ |
601 | self.source._get_replica_gen_and_trans_id(self.target_replica_uid) |
602 | if not changes and target_last_known_gen == target_gen: |
603 | if target_trans_id != target_last_known_trans_id: |
604 | @@ -125,7 +140,7 @@ |
605 | new_gen, new_trans_id = sync_target.sync_exchange( |
606 | docs_by_generation, self.source._replica_uid, |
607 | target_last_known_gen, target_last_known_trans_id, |
608 | - self._insert_doc_from_target) |
609 | + self._insert_doc_from_target, ensure_callback=ensure_callback) |
610 | # record target synced-up-to generation including applying what we sent |
611 | self.source._set_replica_gen_and_trans_id( |
612 | self.target_replica_uid, new_gen, new_trans_id) |
613 | @@ -269,7 +284,7 @@ |
614 | |
615 | def sync_exchange(self, docs_by_generations, source_replica_uid, |
616 | last_known_generation, last_known_trans_id, |
617 | - return_doc_cb): |
618 | + return_doc_cb, ensure_callback=None): |
619 | self._db.validate_gen_and_trans_id( |
620 | last_known_generation, last_known_trans_id) |
621 | sync_exch = SyncExchange( |
622 | |
623 | === modified file 'u1db/tests/__init__.py' |
624 | --- u1db/tests/__init__.py 2012-09-24 09:36:54 +0000 |
625 | +++ u1db/tests/__init__.py 2012-09-25 20:06:22 +0000 |
626 | @@ -293,9 +293,10 @@ |
627 | |
628 | def ensure_database(self, path): |
629 | try: |
630 | - return self.open_database(path) |
631 | + db = self.open_database(path) |
632 | except errors.DatabaseDoesNotExist: |
633 | - return self._create_database(path) |
634 | + db = self._create_database(path) |
635 | + return db, db._replica_uid |
636 | |
637 | def _copy_database(self, db): |
638 | # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES |
639 | |
640 | === modified file 'u1db/tests/c_backend_wrapper.pyx' |
641 | --- u1db/tests/c_backend_wrapper.pyx 2012-09-24 14:12:27 +0000 |
642 | +++ u1db/tests/c_backend_wrapper.pyx 2012-09-25 20:06:22 +0000 |
643 | @@ -205,14 +205,16 @@ |
644 | u1db_document **docs, int *generations, |
645 | const_char_ptr *trans_ids, |
646 | int *target_gen, char **target_trans_id, |
647 | - void *context, u1db_doc_gen_callback cb) nogil |
648 | + void *context, u1db_doc_gen_callback cb, |
649 | + void *ensure_callback) nogil |
650 | int (*sync_exchange_doc_ids)(u1db_sync_target *st, |
651 | u1database *source_db, int n_doc_ids, |
652 | const_char_ptr *doc_ids, int *generations, |
653 | const_char_ptr *trans_ids, |
654 | int *target_gen, char **target_trans_id, |
655 | void *context, |
656 | - u1db_doc_gen_callback cb) nogil |
657 | + u1db_doc_gen_callback cb, |
658 | + void *ensure_callback) nogil |
659 | int (*get_sync_exchange)(u1db_sync_target *st, |
660 | char *source_replica_uid, |
661 | int last_known_source_gen, |
662 | @@ -811,7 +813,7 @@ |
663 | status = self._st.sync_exchange_doc_ids(self._st, sdb._db, |
664 | num_doc_ids, doc_ids, generations, trans_ids, |
665 | &target_gen, &target_trans_id, |
666 | - <void*>return_doc_cb, return_doc_cb_wrapper) |
667 | + <void*>return_doc_cb, return_doc_cb_wrapper, NULL) |
668 | handle_status("sync_exchange_doc_ids", status) |
669 | if target_trans_id != NULL: |
670 | res_trans_id = target_trans_id |
671 | @@ -828,7 +830,7 @@ |
672 | |
673 | def sync_exchange(self, docs_by_generations, source_replica_uid, |
674 | last_known_generation, last_known_trans_id, |
675 | - return_doc_cb): |
676 | + return_doc_cb, ensure_callback=None): |
677 | cdef CDocument cur_doc |
678 | cdef u1db_document **docs = NULL |
679 | cdef int *generations = NULL |
680 | @@ -864,7 +866,7 @@ |
681 | status = self._st.sync_exchange( |
682 | self._st, c_source_replica_uid, count, docs, generations, |
683 | trans_ids, &target_gen, &target_trans_id, |
684 | - <void *>return_doc_cb, return_doc_cb_wrapper) |
685 | + <void *>return_doc_cb, return_doc_cb_wrapper, NULL) |
686 | handle_status("sync_exchange", status) |
687 | finally: |
688 | if docs != NULL: |
689 | |
690 | === modified file 'u1db/tests/test_http_app.py' |
691 | --- u1db/tests/test_http_app.py 2012-09-04 16:28:21 +0000 |
692 | +++ u1db/tests/test_http_app.py 2012-09-25 20:06:22 +0000 |
693 | @@ -839,6 +839,38 @@ |
694 | self.assertEqual('', bits[3]) |
695 | self.assertEqual([('replica', 10), ('replica', 11)], gens) |
696 | |
697 | + def test_sync_exchange_send_ensure(self): |
698 | + entries = { |
699 | + 10: {'id': 'doc-here', 'rev': 'replica:1', 'content': |
700 | + '{"value": "here"}', 'gen': 10, 'trans_id': 'T-sid'}, |
701 | + 11: {'id': 'doc-here2', 'rev': 'replica:1', 'content': |
702 | + '{"value": "here2"}', 'gen': 11, 'trans_id': 'T-sed'} |
703 | + } |
704 | + |
705 | + args = dict(last_known_generation=0, ensure=True) |
706 | + body = ("[\r\n" + |
707 | + "%s,\r\n" % json.dumps(args) + |
708 | + "%s,\r\n" % json.dumps(entries[10]) + |
709 | + "%s\r\n" % json.dumps(entries[11]) + |
710 | + "]\r\n") |
711 | + resp = self.app.post('/dbnew/sync-from/replica', |
712 | + params=body, |
713 | + headers={'content-type': |
714 | + 'application/x-u1db-sync-stream'}) |
715 | + self.assertEqual(200, resp.status) |
716 | + self.assertEqual('application/x-u1db-sync-stream', |
717 | + resp.header('content-type')) |
718 | + bits = resp.body.split('\r\n') |
719 | + self.assertEqual('[', bits[0]) |
720 | + dbnew = self.state.open_database("dbnew") |
721 | + last_trans_id = dbnew._get_transaction_log()[-1][1] |
722 | + self.assertEqual({'new_generation': 2, |
723 | + 'new_transaction_id': last_trans_id, |
724 | + 'replica_uid': dbnew._replica_uid}, |
725 | + json.loads(bits[1])) |
726 | + self.assertEqual(']', bits[2]) |
727 | + self.assertEqual('', bits[3]) |
728 | + |
729 | def test_sync_exchange_send_entry_too_large(self): |
730 | self.patch(http_app.SyncResource, 'max_request_size', 20000) |
731 | self.patch(http_app.SyncResource, 'max_entry_size', 10000) |
732 | |
733 | === modified file 'u1db/tests/test_remote_sync_target.py' |
734 | --- u1db/tests/test_remote_sync_target.py 2012-09-24 09:36:54 +0000 |
735 | +++ u1db/tests/test_remote_sync_target.py 2012-09-25 20:06:22 +0000 |
736 | @@ -286,5 +286,29 @@ |
737 | (doc.doc_id, doc.rev, '{"value": "there"}', 1), |
738 | other_changes[0][:-1]) |
739 | |
740 | + def test_sync_exchange_send_ensure_callback(self): |
741 | + self.startServer() |
742 | + remote_target = self.getSyncTarget('test') |
743 | + other_docs = [] |
744 | + replica_uid_box = [] |
745 | + |
746 | + def receive_doc(doc): |
747 | + other_docs.append((doc.doc_id, doc.rev, doc.get_json())) |
748 | + |
749 | + def ensure_cb(replica_uid): |
750 | + replica_uid_box.append(replica_uid) |
751 | + |
752 | + doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') |
753 | + new_gen, trans_id = remote_target.sync_exchange( |
754 | + [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, |
755 | + last_known_trans_id=None, return_doc_cb=receive_doc, |
756 | + ensure_callback=ensure_cb) |
757 | + self.assertEqual(1, new_gen) |
758 | + db = self.request_state.open_database('test') |
759 | + self.assertEqual(1, len(replica_uid_box)) |
760 | + self.assertEqual(db._replica_uid, replica_uid_box[0]) |
761 | + self.assertGetDoc( |
762 | + db, 'doc-here', 'replica:1', '{"value": "here"}', False) |
763 | + |
764 | |
765 | load_tests = tests.load_with_scenarios |
766 | |
767 | === modified file 'u1db/tests/test_server_state.py' |
768 | --- u1db/tests/test_server_state.py 2012-05-16 08:39:54 +0000 |
769 | +++ u1db/tests/test_server_state.py 2012-09-25 20:06:22 +0000 |
770 | @@ -70,8 +70,9 @@ |
771 | self.state.set_workingdir(tempdir) |
772 | path = tempdir + '/test.db' |
773 | self.assertFalse(os.path.exists(path)) |
774 | - db = self.state.ensure_database('test.db') |
775 | + db, replica_uid = self.state.ensure_database('test.db') |
776 | self.assertIsInstance(db, sqlite_backend.SQLitePartialExpandDatabase) |
777 | + self.assertEqual(db._replica_uid, replica_uid) |
778 | self.assertTrue(os.path.exists(path)) |
779 | db2 = self.state.open_database('test.db') |
780 | self.assertIsInstance(db2, sqlite_backend.SQLitePartialExpandDatabase) |
781 | @@ -80,7 +81,7 @@ |
782 | tempdir = self.createTempDir() |
783 | self.state.set_workingdir(tempdir) |
784 | path = tempdir + '/test.db' |
785 | - db = self.state.ensure_database('test.db') |
786 | + db, _ = self.state.ensure_database('test.db') |
787 | db.close() |
788 | self.state.delete_database('test.db') |
789 | self.assertFalse(os.path.exists(path)) |
790 | |
791 | === modified file 'u1db/tests/test_sync.py' |
792 | --- u1db/tests/test_sync.py 2012-09-24 14:12:27 +0000 |
793 | +++ u1db/tests/test_sync.py 2012-09-25 20:06:22 +0000 |
794 | @@ -1178,9 +1178,9 @@ |
795 | |
796 | oauth = False |
797 | |
798 | - def do_sync(self): |
799 | + def do_sync(self, target_name): |
800 | if self.oauth: |
801 | - path = '~/test2.db' |
802 | + path = '~/' + target_name |
803 | extra = dict(creds={'oauth': { |
804 | 'consumer_key': tests.consumer1.key, |
805 | 'consumer_secret': tests.consumer1.secret, |
806 | @@ -1188,7 +1188,7 @@ |
807 | 'token_secret': tests.token1.secret |
808 | }}) |
809 | else: |
810 | - path = 'test2.db' |
811 | + path = target_name |
812 | extra = {} |
813 | target_url = self.getURL(path) |
814 | return self.db.sync(target_url, **extra) |
815 | @@ -1202,7 +1202,7 @@ |
816 | def test_db_sync(self): |
817 | doc1 = self.db.create_doc_from_json(tests.simple_doc) |
818 | doc2 = self.db2.create_doc_from_json(tests.nested_doc) |
819 | - local_gen_before_sync = self.do_sync() |
820 | + local_gen_before_sync = self.do_sync('test2.db') |
821 | gen, _, changes = self.db.whats_changed(local_gen_before_sync) |
822 | self.assertEqual(1, len(changes)) |
823 | self.assertEqual(doc2.doc_id, changes[0][0]) |
824 | @@ -1212,6 +1212,22 @@ |
825 | self.assertGetDoc(self.db, doc2.doc_id, doc2.rev, tests.nested_doc, |
826 | False) |
827 | |
828 | + def test_db_sync_autocreate(self): |
829 | + doc1 = self.db.create_doc_from_json(tests.simple_doc) |
830 | + local_gen_before_sync = self.do_sync('test3.db') |
831 | + gen, _, changes = self.db.whats_changed(local_gen_before_sync) |
832 | + self.assertEqual(0, gen - local_gen_before_sync) |
833 | + db3 = self.request_state.open_database('test3.db') |
834 | + gen, _, changes = db3.whats_changed() |
835 | + self.assertEqual(1, len(changes)) |
836 | + self.assertEqual(doc1.doc_id, changes[0][0]) |
837 | + self.assertGetDoc(db3, doc1.doc_id, doc1.rev, tests.simple_doc, |
838 | + False) |
839 | + t_gen, _ = self.db._get_replica_gen_and_trans_id('test3.db') |
840 | + s_gen, _ = db3._get_replica_gen_and_trans_id('test1') |
841 | + self.assertEqual(1, t_gen) |
842 | + self.assertEqual(1, s_gen) |
843 | + |
844 | |
845 | class TestRemoteSyncIntegration(tests.TestCaseWithServer): |
846 | """Integration tests for the most common sync scenario local -> remote""" |