Merge lp:~pedronis/u1db/sync-autocreate into lp:u1db

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 412
Merged at revision: 411
Proposed branch: lp:~pedronis/u1db/sync-autocreate
Merge into: lp:u1db
Diff against target: 846 lines (+269/-68)
14 files modified
include/u1db/u1db_internal.h (+10/-2)
src/u1db_http_sync_target.c (+52/-17)
src/u1db_sync_target.c (+55/-10)
u1db/__init__.py (+8/-3)
u1db/remote/http_app.py (+23/-10)
u1db/remote/http_target.py (+7/-4)
u1db/remote/server_state.py (+3/-2)
u1db/sync.py (+22/-7)
u1db/tests/__init__.py (+3/-2)
u1db/tests/c_backend_wrapper.pyx (+7/-5)
u1db/tests/test_http_app.py (+32/-0)
u1db/tests/test_remote_sync_target.py (+24/-0)
u1db/tests/test_server_state.py (+3/-2)
u1db/tests/test_sync.py (+20/-4)
To merge this branch: bzr merge lp:~pedronis/u1db/sync-autocreate
Reviewer Review Type Date Requested Status
Lucio Torre (community) Approve
Review via email: mp+126330@code.launchpad.net

Commit message

support sync(autocreate=True) by supporting sending an ensure=true flag over the sync_exchange POST

Description of the change

support sync(autocreate=True) by supporting sending an ensure=true flag over the sync_exchange POST, mediated by passing a ensure_callback to sync_exchange() that will gather the newly created db replica uid

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'include/u1db/u1db_internal.h'
2--- include/u1db/u1db_internal.h 2012-08-15 12:39:01 +0000
3+++ include/u1db/u1db_internal.h 2012-09-25 20:06:22 +0000
4@@ -48,6 +48,8 @@
5
6 typedef int (*u1db__trace_callback)(void *context, const char *state);
7
8+typedef void (*u1db__ensure_callback)(void *context, const char *replica_uid);
9+
10 struct _u1db_sync_target {
11 void *trace_context;
12 u1db__trace_callback trace_cb;
13@@ -119,11 +121,16 @@
14 * @param cb After sending the requested documents, we read the
15 * response stream. For each document in the stream, we
16 * will trigger a callback.
17+ * @param: ensure_callback If set the target may create
18+ * the target db if not yet existent,
19+ * the callback can then be used to inform of
20+ * the created db replica uid.
21 */
22 int (*sync_exchange_doc_ids)(u1db_sync_target *st, u1database *source_db,
23 int n_doc_ids, const char **doc_ids, int *generations,
24 const char **trans_ids, int *target_gen, char **target_trans_id,
25- void *context, u1db_doc_gen_callback cb);
26+ void *context, u1db_doc_gen_callback cb,
27+ u1db__ensure_callback ensure_callback);
28
29 /**
30 * Same as sync_exchange, only using document objects.
31@@ -133,7 +140,8 @@
32 u1db_document **docs, int *generations,
33 const char **trans_ids, int *target_gen,
34 char **target_trans_id, void *context,
35- u1db_doc_gen_callback cb);
36+ u1db_doc_gen_callback cb,
37+ u1db__ensure_callback ensure_callback);
38 /**
39 * Create a sync_exchange state object.
40 *
41
42=== modified file 'src/u1db_http_sync_target.c'
43--- src/u1db_http_sync_target.c 2012-07-26 16:24:39 +0000
44+++ src/u1db_http_sync_target.c 2012-09-25 20:06:22 +0000
45@@ -62,7 +62,8 @@
46 u1db_document **docs, int *generations,
47 const char **trans_ids, int *target_gen,
48 char **target_trans_id, void *context,
49- u1db_doc_gen_callback cb);
50+ u1db_doc_gen_callback cb,
51+ u1db__ensure_callback ensure_callback);
52 static int st_http_sync_exchange_doc_ids(u1db_sync_target *st,
53 u1database *source_db, int n_doc_ids,
54 const char **doc_ids,
55@@ -70,7 +71,8 @@
56 const char **trans_ids,
57 int *target_gen,
58 char **target_trans_id, void *context,
59- u1db_doc_gen_callback cb);
60+ u1db_doc_gen_callback cb,
61+ u1db__ensure_callback ensure_callback);
62 static void st_http_finalize_sync_exchange(u1db_sync_target *st,
63 u1db_sync_exchange **exchange);
64 static int st_http_set_trace_hook(u1db_sync_target *st,
65@@ -794,7 +796,7 @@
66
67 static int
68 init_temp_file(char tmpname[], FILE **temp_fd, int target_gen,
69- char *target_trans_id)
70+ char *target_trans_id, int ensure)
71 {
72 int status = U1DB_OK;
73 *temp_fd = make_tempfile(tmpname);
74@@ -809,8 +811,16 @@
75 // determine Content-Length before we start uploading the data.
76 fprintf(
77 *temp_fd,
78- "[\r\n{\"last_known_generation\": %d, \"last_known_trans_id\": \"%s\"}",
79+ "[\r\n{\"last_known_generation\": %d, \"last_known_trans_id\": \"%s\"",
80 target_gen, target_trans_id);
81+ if (ensure) {
82+ fprintf(
83+ *temp_fd,
84+ ", \"ensure\": true");
85+ }
86+ fprintf(
87+ *temp_fd,
88+ "}");
89 finish:
90 return status;
91 }
92@@ -866,7 +876,7 @@
93 }
94 if (status != U1DB_OK) { goto finish; }
95 if (http_code != 200 && http_code != 201) {
96- printf("broken 0\n");
97+ //printf("broken 0\n");
98 status = U1DB_BROKEN_SYNC_STREAM;
99 goto finish;
100 }
101@@ -883,7 +893,8 @@
102
103 static int
104 process_response(u1db_sync_target *st, void *context, u1db_doc_gen_callback cb,
105- char *response, int *target_gen, char **target_trans_id)
106+ u1db__ensure_callback ensure_callback, char *response,
107+ int *target_gen, char **target_trans_id)
108 {
109 int status = U1DB_OK;
110 int i, doc_count;
111@@ -893,37 +904,38 @@
112 int gen;
113 const char *trans_id = NULL;
114 u1db_document *doc;
115+ const char *replica_uid = NULL;
116
117 json = json_tokener_parse(response);
118 if (json == NULL || !json_object_is_type(json, json_type_array)) {
119- printf("broken 1, response: %s\n", response);
120+ //printf("broken 1, response: %s\n", response);
121 status = U1DB_BROKEN_SYNC_STREAM;
122 goto finish;
123 }
124 doc_count = json_object_array_length(json);
125 if (doc_count < 1) {
126 // the first response is the new_generation info, so it must exist
127- printf("broken 2\n");
128+ //printf("broken 2\n");
129 status = U1DB_BROKEN_SYNC_STREAM;
130 goto finish;
131 }
132 obj = json_object_array_get_idx(json, 0);
133 attr = json_object_object_get(obj, "new_generation");
134 if (attr == NULL) {
135- printf("broken 3\n");
136+ //printf("broken 3\n");
137 status = U1DB_BROKEN_SYNC_STREAM;
138 goto finish;
139 }
140 *target_gen = json_object_get_int(attr);
141 attr = json_object_object_get(obj, "new_transaction_id");
142 if (attr == NULL) {
143- printf("broken 4\n");
144+ //printf("broken 4\n");
145 status = U1DB_BROKEN_SYNC_STREAM;
146 goto finish;
147 }
148 tmp = json_object_get_string(attr);
149 if (tmp == NULL) {
150- printf("broken 5\n");
151+ //printf("broken 5\n");
152 status = U1DB_BROKEN_SYNC_STREAM;
153 goto finish;
154 }
155@@ -933,6 +945,23 @@
156 goto finish;
157 }
158
159+ if (ensure_callback) {
160+ attr = json_object_object_get(obj, "replica_uid");
161+ if (attr != NULL) {
162+ tmp = json_object_get_string(attr);
163+ if (tmp == NULL) {
164+ status = U1DB_BROKEN_SYNC_STREAM;
165+ goto finish;
166+ }
167+ replica_uid = strdup(tmp);
168+ if (*target_trans_id == NULL) {
169+ status = U1DB_NOMEM;
170+ goto finish;
171+ }
172+ ensure_callback(context, replica_uid);
173+ }
174+ }
175+
176 for (i = 1; i < doc_count; ++i) {
177 obj = json_object_array_get_idx(json, i);
178 attr = json_object_object_get(obj, "id");
179@@ -989,7 +1018,8 @@
180 int n_docs, u1db_document **docs, int *generations,
181 const char **trans_ids, int *target_gen,
182 char **target_trans_id, void *context,
183- u1db_doc_gen_callback cb)
184+ u1db_doc_gen_callback cb,
185+ u1db__ensure_callback ensure_callback)
186 {
187 int status, i;
188 FILE *temp_fd = NULL;
189@@ -1004,7 +1034,8 @@
190 if (n_docs > 0 && (docs == NULL || generations == NULL)) {
191 return U1DB_INVALID_PARAMETER;
192 }
193- status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id);
194+ status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id,
195+ ensure_callback != NULL);
196 if (status != U1DB_OK) { goto finish; }
197 for (i = 0; i < n_docs; ++i) {
198 status = doc_to_tempfile(
199@@ -1013,7 +1044,8 @@
200 }
201 status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req);
202 if (status != U1DB_OK) { goto finish; }
203- status = process_response(st, context, cb, req.body_buffer, target_gen,
204+ status = process_response(st, context, cb, ensure_callback,
205+ req.body_buffer, target_gen,
206 target_trans_id);
207 finish:
208 cleanup_temp_files(tmpname, temp_fd, &req);
209@@ -1058,7 +1090,8 @@
210 int n_doc_ids, const char **doc_ids,
211 int *generations, const char **trans_ids,
212 int *target_gen, char **target_trans_id,
213- void *context, u1db_doc_gen_callback cb)
214+ void *context, u1db_doc_gen_callback cb,
215+ u1db__ensure_callback ensure_callback)
216 {
217 int status;
218 FILE *temp_fd = NULL;
219@@ -1077,7 +1110,8 @@
220 }
221 status = u1db_get_replica_uid(source_db, &source_replica_uid);
222 if (status != U1DB_OK) { goto finish; }
223- status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id);
224+ status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id,
225+ ensure_callback != NULL);
226 if (status != U1DB_OK) { goto finish; }
227 state.num = n_doc_ids;
228 state.generations = generations;
229@@ -1088,7 +1122,8 @@
230 if (status != U1DB_OK) { goto finish; }
231 status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req);
232 if (status != U1DB_OK) { goto finish; }
233- status = process_response(st, context, cb, req.body_buffer, target_gen,
234+ status = process_response(st, context, cb, ensure_callback,
235+ req.body_buffer, target_gen,
236 target_trans_id);
237 finish:
238 cleanup_temp_files(tmpname, temp_fd, &req);
239
240=== modified file 'src/u1db_sync_target.c'
241--- src/u1db_sync_target.c 2012-08-15 12:57:57 +0000
242+++ src/u1db_sync_target.c 2012-09-25 20:06:22 +0000
243@@ -35,13 +35,15 @@
244 u1db_document **docs, int *generations,
245 const char **trans_ids, int *target_gen,
246 char **target_trans_id, void *context,
247- u1db_doc_gen_callback cb);
248+ u1db_doc_gen_callback cb,
249+ u1db__ensure_callback ensure_callback);
250 static int st_sync_exchange_doc_ids(u1db_sync_target *st,
251 u1database *source_db, int n_doc_ids,
252 const char **doc_ids, int *generations,
253 const char **trans_ids, int *target_gen,
254 char **target_trans_id, void *context,
255- u1db_doc_gen_callback cb);
256+ u1db_doc_gen_callback cb,
257+ u1db__ensure_callback ensure_callback);
258 static int st_get_sync_exchange(u1db_sync_target *st,
259 const char *source_replica_uid,
260 int source_gen,
261@@ -477,6 +479,12 @@
262 return status;
263 }
264
265+static void ensure_callback(void *context, const char *replica_uid) {
266+ struct _return_doc_state *state;
267+ state = (struct _return_doc_state *)context;
268+ state->target_uid = replica_uid;
269+}
270+
271
272 static int
273 get_and_insert_docs(u1database *source_db, u1db_sync_exchange *se,
274@@ -503,7 +511,8 @@
275 int n_docs, u1db_document **docs, int *generations,
276 const char **trans_ids, int *target_gen,
277 char **target_trans_id, void *context,
278- u1db_doc_gen_callback cb)
279+ u1db_doc_gen_callback cb,
280+ u1db__ensure_callback ensure_callback)
281 {
282 int status, i;
283
284@@ -544,7 +553,8 @@
285 st_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db,
286 int n_doc_ids, const char **doc_ids, int *generations,
287 const char **trans_ids, int *target_gen, char **target_trans_id,
288- void *context, u1db_doc_gen_callback cb)
289+ void *context, u1db_doc_gen_callback cb,
290+ u1db__ensure_callback ensure_callback)
291 {
292 int status;
293 const char *source_replica_uid = NULL;
294@@ -598,6 +608,7 @@
295 char *target_trans_id = NULL;
296 int target_gen, local_gen;
297 int local_gen_known_by_target, target_gen_known_by_local;
298+ u1db__ensure_callback ensure= NULL;
299
300 // fprintf(stderr, "Starting\n");
301 if (db == NULL || target == NULL || local_gen_before_sync == NULL) {
302@@ -612,14 +623,38 @@
303 status = target->get_sync_info(
304 target, local_uid, &target_uid, &target_gen, &target_trans_id,
305 &local_gen_known_by_target, &local_trans_id_known_by_target);
306- if (status != U1DB_OK) { goto finish; }
307+ if (status == 404) { // database does not exist
308+ target_uid = NULL;
309+ target_gen = 0;
310+ local_gen_known_by_target = 0;
311+ target_trans_id = strdup("");
312+ if (target_trans_id == NULL) {
313+ status = U1DB_NOMEM;
314+ goto finish;
315+ }
316+ local_trans_id_known_by_target = strdup("");
317+ if (local_trans_id_known_by_target == NULL) {
318+ status = U1DB_NOMEM;
319+ goto finish;
320+ }
321+ ensure = ensure_callback;
322+ } else if (status != U1DB_OK) { goto finish; }
323 status = u1db_validate_gen_and_trans_id(
324 db, local_gen_known_by_target, local_trans_id_known_by_target);
325 if (status != U1DB_OK) { goto finish; }
326- status = u1db__get_replica_gen_and_trans_id(
327- db, target_uid, &target_gen_known_by_local,
328- &target_trans_id_known_by_local);
329- if (status != U1DB_OK) { goto finish; }
330+ if (target_uid == NULL) {
331+ target_gen_known_by_local = 0;
332+ target_trans_id_known_by_local = strdup("");
333+ if (target_trans_id_known_by_local == NULL) {
334+ status = U1DB_NOMEM;
335+ goto finish;
336+ }
337+ } else {
338+ status = u1db__get_replica_gen_and_trans_id(
339+ db, target_uid, &target_gen_known_by_local,
340+ &target_trans_id_known_by_local);
341+ if (status != U1DB_OK) { goto finish; }
342+ }
343 local_target_trans_id = target_trans_id_known_by_local;
344 local_gen = local_gen_known_by_target;
345
346@@ -649,7 +684,14 @@
347 (const char**)to_send_state.doc_ids_to_return,
348 to_send_state.gen_for_doc_ids, to_send_state.trans_ids_for_doc_ids,
349 &target_gen_known_by_local, &target_trans_id_known_by_local,
350- &return_doc_state, return_doc_to_insert_from_target);
351+ &return_doc_state, return_doc_to_insert_from_target, ensure);
352+ if (ensure != NULL) {
353+ if (return_doc_state.target_uid != NULL) {
354+ target_uid = return_doc_state.target_uid;
355+ } else {
356+ ensure = NULL;
357+ }
358+ }
359 if (status != U1DB_OK) { goto finish; }
360 if (local_trans_id != NULL) {
361 free(local_trans_id);
362@@ -671,6 +713,9 @@
363 if (status != U1DB_OK) { goto finish; }
364 }
365 finish:
366+ if (ensure != NULL && target_uid != NULL) {
367+ free((void *)target_uid);
368+ }
369 if (local_trans_id != NULL) {
370 free(local_trans_id);
371 }
372
373=== modified file 'u1db/__init__.py'
374--- u1db/__init__.py 2012-09-24 14:12:27 +0000
375+++ u1db/__init__.py 2012-09-25 20:06:22 +0000
376@@ -303,7 +303,7 @@
377 """Release any resources associated with this database."""
378 raise NotImplementedError(self.close)
379
380- def sync(self, url, creds=None):
381+ def sync(self, url, creds=None, autocreate=True):
382 """Synchronize documents with remote replica exposed at url.
383
384 :param url: the url of the target replica to sync with.
385@@ -316,6 +316,7 @@
386 'token_key': ...,
387 'token_secret': ...
388 }}
389+ :param autocreate: ask the target to create the db if non-existent.
390 :return: local_gen_before_sync The local generation before the
391 synchronisation was performed. This is useful to pass into
392 whatschanged, if an application wants to know which documents were
393@@ -323,7 +324,8 @@
394 """
395 from u1db.sync import Synchronizer
396 from u1db.remote.http_target import HTTPSyncTarget
397- return Synchronizer(self, HTTPSyncTarget(url, creds=creds)).sync()
398+ return Synchronizer(self, HTTPSyncTarget(url, creds=creds)).sync(
399+ autocreate=autocreate)
400
401 def _get_replica_gen_and_trans_id(self, other_replica_uid):
402 """Return the last known generation and transaction id for the other db
403@@ -633,7 +635,7 @@
404
405 def sync_exchange(self, docs_by_generation, source_replica_uid,
406 last_known_generation, last_known_trans_id,
407- return_doc_cb):
408+ return_doc_cb, ensure_callback=None):
409 """Incorporate the documents sent from the source replica.
410
411 This is not meant to be called by client code directly, but is used as
412@@ -665,6 +667,9 @@
413 be invoked in turn with Documents that have changed since
414 last_known_generation together with the generation of
415 their last change.
416+ :param: ensure_callback(replica_uid): if set the target may create
417+ the target db if not yet existent, the callback can then
418+ be used to inform of the created db replica uid.
419 :return: new_generation - After applying docs_by_generation, this is
420 the current generation for this replica
421 """
422
423=== modified file 'u1db/remote/http_app.py'
424--- u1db/remote/http_app.py 2012-09-04 16:24:31 +0000
425+++ u1db/remote/http_app.py 2012-09-25 20:06:22 +0000
426@@ -344,12 +344,16 @@
427 def __init__(self, dbname, source_replica_uid, state, responder):
428 self.source_replica_uid = source_replica_uid
429 self.responder = responder
430- self.db = state.open_database(dbname)
431- self.target = self.db.get_sync_target()
432+ self.state = state
433+ self.dbname = dbname
434+ self.replica_uid = None
435+
436+ def get_target(self):
437+ return self.state.open_database(self.dbname).get_sync_target()
438
439 @http_method()
440 def get(self):
441- result = self.target.get_sync_info(self.source_replica_uid)
442+ result = self.get_target().get_sync_info(self.source_replica_uid)
443 self.responder.send_response_json(
444 target_replica_uid=result[0], target_replica_generation=result[1],
445 target_replica_transaction_id=result[2],
446@@ -360,19 +364,25 @@
447 @http_method(generation=int,
448 content_as_args=True, no_query=True)
449 def put(self, generation, transaction_id):
450- self.target.record_sync_info(self.source_replica_uid, generation,
451- transaction_id)
452+ self.get_target().record_sync_info(self.source_replica_uid,
453+ generation,
454+ transaction_id)
455 self.responder.send_response_json(ok=True)
456
457 # Implements the same logic as LocalSyncTarget.sync_exchange
458
459 @http_method(last_known_generation=int, last_known_trans_id=none_or_str,
460 content_as_args=True)
461- def post_args(self, last_known_generation, last_known_trans_id=None):
462- self.db.validate_gen_and_trans_id(
463+ def post_args(self, last_known_generation, last_known_trans_id=None,
464+ ensure=False):
465+ if ensure:
466+ db, self.replica_uid = self.state.ensure_database(self.dbname)
467+ else:
468+ db = self.state.open_database(self.dbname)
469+ db.validate_gen_and_trans_id(
470 last_known_generation, last_known_trans_id)
471 self.sync_exch = self.sync_exchange_class(
472- self.db, self.source_replica_uid, last_known_generation)
473+ db, self.source_replica_uid, last_known_generation)
474
475 @http_method(content_as_args=True)
476 def post_stream_entry(self, id, rev, content, gen, trans_id):
477@@ -390,8 +400,11 @@
478 self.responder.content_type = 'application/x-u1db-sync-stream'
479 self.responder.start_response(200)
480 self.responder.start_stream(),
481- self.responder.stream_entry({"new_generation": new_gen,
482- "new_transaction_id": self.sync_exch.new_trans_id})
483+ header = {"new_generation": new_gen,
484+ "new_transaction_id": self.sync_exch.new_trans_id}
485+ if self.replica_uid is not None:
486+ header['replica_uid'] = self.replica_uid
487+ self.responder.stream_entry(header)
488 self.sync_exch.return_docs(send_doc)
489 self.responder.end_stream()
490 self.responder.finish_response()
491
492=== modified file 'u1db/remote/http_target.py'
493--- u1db/remote/http_target.py 2012-09-20 18:46:10 +0000
494+++ u1db/remote/http_target.py 2012-09-25 20:06:22 +0000
495@@ -57,7 +57,7 @@
496 {'generation': source_replica_generation,
497 'transaction_id': source_transaction_id})
498
499- def _parse_sync_stream(self, data, return_doc_cb):
500+ def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None):
501 parts = data.splitlines() # one at a time
502 if not parts or parts[0] != '[':
503 raise BrokenSyncStream
504@@ -66,6 +66,8 @@
505 if data:
506 line, comma = utils.check_and_strip_comma(data[0])
507 res = json.loads(line)
508+ if ensure_callback and 'replica_uid' in res:
509+ ensure_callback(res['replica_uid'])
510 for entry in data[1:]:
511 if not comma: # missing in between comma
512 raise BrokenSyncStream
513@@ -88,7 +90,7 @@
514
515 def sync_exchange(self, docs_by_generations, source_replica_uid,
516 last_known_generation, last_known_trans_id,
517- return_doc_cb):
518+ return_doc_cb, ensure_callback=None):
519 self._ensure_connection()
520 if self._trace_hook: # for tests
521 self._trace_hook('sync_exchange')
522@@ -108,7 +110,8 @@
523 comma = ''
524 size += prepare(
525 last_known_generation=last_known_generation,
526- last_known_trans_id=last_known_trans_id)
527+ last_known_trans_id=last_known_trans_id,
528+ ensure=ensure_callback is not None)
529 comma = ','
530 for doc, gen, trans_id in docs_by_generations:
531 size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
532@@ -121,7 +124,7 @@
533 self._conn.send(entry)
534 entries = None
535 data, _ = self._response()
536- res = self._parse_sync_stream(data, return_doc_cb)
537+ res = self._parse_sync_stream(data, return_doc_cb, ensure_callback)
538 data = None
539 return res['new_generation'], res['new_transaction_id']
540
541
542=== modified file 'u1db/remote/server_state.py'
543--- u1db/remote/server_state.py 2012-05-15 11:11:06 +0000
544+++ u1db/remote/server_state.py 2012-09-25 20:06:22 +0000
545@@ -56,8 +56,9 @@
546 """Ensure database at the given location."""
547 from u1db.backends import sqlite_backend
548 full_path = self._relpath(path)
549- return sqlite_backend.SQLiteDatabase.open_database(full_path,
550- create=True)
551+ db = sqlite_backend.SQLiteDatabase.open_database(full_path,
552+ create=True)
553+ return db, db._replica_uid
554
555 def delete_database(self, path):
556 """Delete database at the given location."""
557
558=== modified file 'u1db/sync.py'
559--- u1db/sync.py 2012-07-26 19:14:26 +0000
560+++ u1db/sync.py 2012-09-25 20:06:22 +0000
561@@ -90,14 +90,26 @@
562 self.sync_target.record_sync_info(
563 self.source._replica_uid, cur_gen, trans_id)
564
565- def sync(self, callback=None):
566+ def sync(self, callback=None, autocreate=False):
567 """Synchronize documents between source and target."""
568 sync_target = self.sync_target
569 # get target identifier, its current generation,
570 # and its last-seen database generation for this source
571- (self.target_replica_uid, target_gen, target_trans_id, target_my_gen,
572- target_my_trans_id) = sync_target.get_sync_info(
573- self.source._replica_uid)
574+ try:
575+ (self.target_replica_uid, target_gen, target_trans_id,
576+ target_my_gen, target_my_trans_id) = sync_target.get_sync_info(
577+ self.source._replica_uid)
578+ except errors.DatabaseDoesNotExist:
579+ if not autocreate:
580+ raise
581+ # will try to ask sync_exchange() to create the db
582+ self.target_replica_uid = None
583+ target_gen, target_trans_id = 0, ''
584+ target_my_gen, target_my_trans_id = 0, ''
585+ def ensure_callback(replica_uid):
586+ self.target_replica_uid = replica_uid
587+ else:
588+ ensure_callback = None
589 # validate the generation and transaction id the target knows about us
590 self.source.validate_gen_and_trans_id(
591 target_my_gen, target_my_trans_id)
592@@ -105,7 +117,10 @@
593 my_gen, _, changes = self.source.whats_changed(target_my_gen)
594
595 # this source last-seen database generation for the target
596- target_last_known_gen, target_last_known_trans_id = \
597+ if self.target_replica_uid is None:
598+ target_last_known_gen, target_last_known_trans_id = 0, ''
599+ else:
600+ target_last_known_gen, target_last_known_trans_id = \
601 self.source._get_replica_gen_and_trans_id(self.target_replica_uid)
602 if not changes and target_last_known_gen == target_gen:
603 if target_trans_id != target_last_known_trans_id:
604@@ -125,7 +140,7 @@
605 new_gen, new_trans_id = sync_target.sync_exchange(
606 docs_by_generation, self.source._replica_uid,
607 target_last_known_gen, target_last_known_trans_id,
608- self._insert_doc_from_target)
609+ self._insert_doc_from_target, ensure_callback=ensure_callback)
610 # record target synced-up-to generation including applying what we sent
611 self.source._set_replica_gen_and_trans_id(
612 self.target_replica_uid, new_gen, new_trans_id)
613@@ -269,7 +284,7 @@
614
615 def sync_exchange(self, docs_by_generations, source_replica_uid,
616 last_known_generation, last_known_trans_id,
617- return_doc_cb):
618+ return_doc_cb, ensure_callback=None):
619 self._db.validate_gen_and_trans_id(
620 last_known_generation, last_known_trans_id)
621 sync_exch = SyncExchange(
622
623=== modified file 'u1db/tests/__init__.py'
624--- u1db/tests/__init__.py 2012-09-24 09:36:54 +0000
625+++ u1db/tests/__init__.py 2012-09-25 20:06:22 +0000
626@@ -293,9 +293,10 @@
627
628 def ensure_database(self, path):
629 try:
630- return self.open_database(path)
631+ db = self.open_database(path)
632 except errors.DatabaseDoesNotExist:
633- return self._create_database(path)
634+ db = self._create_database(path)
635+ return db, db._replica_uid
636
637 def _copy_database(self, db):
638 # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES
639
640=== modified file 'u1db/tests/c_backend_wrapper.pyx'
641--- u1db/tests/c_backend_wrapper.pyx 2012-09-24 14:12:27 +0000
642+++ u1db/tests/c_backend_wrapper.pyx 2012-09-25 20:06:22 +0000
643@@ -205,14 +205,16 @@
644 u1db_document **docs, int *generations,
645 const_char_ptr *trans_ids,
646 int *target_gen, char **target_trans_id,
647- void *context, u1db_doc_gen_callback cb) nogil
648+ void *context, u1db_doc_gen_callback cb,
649+ void *ensure_callback) nogil
650 int (*sync_exchange_doc_ids)(u1db_sync_target *st,
651 u1database *source_db, int n_doc_ids,
652 const_char_ptr *doc_ids, int *generations,
653 const_char_ptr *trans_ids,
654 int *target_gen, char **target_trans_id,
655 void *context,
656- u1db_doc_gen_callback cb) nogil
657+ u1db_doc_gen_callback cb,
658+ void *ensure_callback) nogil
659 int (*get_sync_exchange)(u1db_sync_target *st,
660 char *source_replica_uid,
661 int last_known_source_gen,
662@@ -811,7 +813,7 @@
663 status = self._st.sync_exchange_doc_ids(self._st, sdb._db,
664 num_doc_ids, doc_ids, generations, trans_ids,
665 &target_gen, &target_trans_id,
666- <void*>return_doc_cb, return_doc_cb_wrapper)
667+ <void*>return_doc_cb, return_doc_cb_wrapper, NULL)
668 handle_status("sync_exchange_doc_ids", status)
669 if target_trans_id != NULL:
670 res_trans_id = target_trans_id
671@@ -828,7 +830,7 @@
672
673 def sync_exchange(self, docs_by_generations, source_replica_uid,
674 last_known_generation, last_known_trans_id,
675- return_doc_cb):
676+ return_doc_cb, ensure_callback=None):
677 cdef CDocument cur_doc
678 cdef u1db_document **docs = NULL
679 cdef int *generations = NULL
680@@ -864,7 +866,7 @@
681 status = self._st.sync_exchange(
682 self._st, c_source_replica_uid, count, docs, generations,
683 trans_ids, &target_gen, &target_trans_id,
684- <void *>return_doc_cb, return_doc_cb_wrapper)
685+ <void *>return_doc_cb, return_doc_cb_wrapper, NULL)
686 handle_status("sync_exchange", status)
687 finally:
688 if docs != NULL:
689
690=== modified file 'u1db/tests/test_http_app.py'
691--- u1db/tests/test_http_app.py 2012-09-04 16:28:21 +0000
692+++ u1db/tests/test_http_app.py 2012-09-25 20:06:22 +0000
693@@ -839,6 +839,38 @@
694 self.assertEqual('', bits[3])
695 self.assertEqual([('replica', 10), ('replica', 11)], gens)
696
697+ def test_sync_exchange_send_ensure(self):
698+ entries = {
699+ 10: {'id': 'doc-here', 'rev': 'replica:1', 'content':
700+ '{"value": "here"}', 'gen': 10, 'trans_id': 'T-sid'},
701+ 11: {'id': 'doc-here2', 'rev': 'replica:1', 'content':
702+ '{"value": "here2"}', 'gen': 11, 'trans_id': 'T-sed'}
703+ }
704+
705+ args = dict(last_known_generation=0, ensure=True)
706+ body = ("[\r\n" +
707+ "%s,\r\n" % json.dumps(args) +
708+ "%s,\r\n" % json.dumps(entries[10]) +
709+ "%s\r\n" % json.dumps(entries[11]) +
710+ "]\r\n")
711+ resp = self.app.post('/dbnew/sync-from/replica',
712+ params=body,
713+ headers={'content-type':
714+ 'application/x-u1db-sync-stream'})
715+ self.assertEqual(200, resp.status)
716+ self.assertEqual('application/x-u1db-sync-stream',
717+ resp.header('content-type'))
718+ bits = resp.body.split('\r\n')
719+ self.assertEqual('[', bits[0])
720+ dbnew = self.state.open_database("dbnew")
721+ last_trans_id = dbnew._get_transaction_log()[-1][1]
722+ self.assertEqual({'new_generation': 2,
723+ 'new_transaction_id': last_trans_id,
724+ 'replica_uid': dbnew._replica_uid},
725+ json.loads(bits[1]))
726+ self.assertEqual(']', bits[2])
727+ self.assertEqual('', bits[3])
728+
729 def test_sync_exchange_send_entry_too_large(self):
730 self.patch(http_app.SyncResource, 'max_request_size', 20000)
731 self.patch(http_app.SyncResource, 'max_entry_size', 10000)
732
733=== modified file 'u1db/tests/test_remote_sync_target.py'
734--- u1db/tests/test_remote_sync_target.py 2012-09-24 09:36:54 +0000
735+++ u1db/tests/test_remote_sync_target.py 2012-09-25 20:06:22 +0000
736@@ -286,5 +286,29 @@
737 (doc.doc_id, doc.rev, '{"value": "there"}', 1),
738 other_changes[0][:-1])
739
740+ def test_sync_exchange_send_ensure_callback(self):
741+ self.startServer()
742+ remote_target = self.getSyncTarget('test')
743+ other_docs = []
744+ replica_uid_box = []
745+
746+ def receive_doc(doc):
747+ other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
748+
749+ def ensure_cb(replica_uid):
750+ replica_uid_box.append(replica_uid)
751+
752+ doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
753+ new_gen, trans_id = remote_target.sync_exchange(
754+ [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
755+ last_known_trans_id=None, return_doc_cb=receive_doc,
756+ ensure_callback=ensure_cb)
757+ self.assertEqual(1, new_gen)
758+ db = self.request_state.open_database('test')
759+ self.assertEqual(1, len(replica_uid_box))
760+ self.assertEqual(db._replica_uid, replica_uid_box[0])
761+ self.assertGetDoc(
762+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
763+
764
765 load_tests = tests.load_with_scenarios
766
767=== modified file 'u1db/tests/test_server_state.py'
768--- u1db/tests/test_server_state.py 2012-05-16 08:39:54 +0000
769+++ u1db/tests/test_server_state.py 2012-09-25 20:06:22 +0000
770@@ -70,8 +70,9 @@
771 self.state.set_workingdir(tempdir)
772 path = tempdir + '/test.db'
773 self.assertFalse(os.path.exists(path))
774- db = self.state.ensure_database('test.db')
775+ db, replica_uid = self.state.ensure_database('test.db')
776 self.assertIsInstance(db, sqlite_backend.SQLitePartialExpandDatabase)
777+ self.assertEqual(db._replica_uid, replica_uid)
778 self.assertTrue(os.path.exists(path))
779 db2 = self.state.open_database('test.db')
780 self.assertIsInstance(db2, sqlite_backend.SQLitePartialExpandDatabase)
781@@ -80,7 +81,7 @@
782 tempdir = self.createTempDir()
783 self.state.set_workingdir(tempdir)
784 path = tempdir + '/test.db'
785- db = self.state.ensure_database('test.db')
786+ db, _ = self.state.ensure_database('test.db')
787 db.close()
788 self.state.delete_database('test.db')
789 self.assertFalse(os.path.exists(path))
790
791=== modified file 'u1db/tests/test_sync.py'
792--- u1db/tests/test_sync.py 2012-09-24 14:12:27 +0000
793+++ u1db/tests/test_sync.py 2012-09-25 20:06:22 +0000
794@@ -1178,9 +1178,9 @@
795
796 oauth = False
797
798- def do_sync(self):
799+ def do_sync(self, target_name):
800 if self.oauth:
801- path = '~/test2.db'
802+ path = '~/' + target_name
803 extra = dict(creds={'oauth': {
804 'consumer_key': tests.consumer1.key,
805 'consumer_secret': tests.consumer1.secret,
806@@ -1188,7 +1188,7 @@
807 'token_secret': tests.token1.secret
808 }})
809 else:
810- path = 'test2.db'
811+ path = target_name
812 extra = {}
813 target_url = self.getURL(path)
814 return self.db.sync(target_url, **extra)
815@@ -1202,7 +1202,7 @@
816 def test_db_sync(self):
817 doc1 = self.db.create_doc_from_json(tests.simple_doc)
818 doc2 = self.db2.create_doc_from_json(tests.nested_doc)
819- local_gen_before_sync = self.do_sync()
820+ local_gen_before_sync = self.do_sync('test2.db')
821 gen, _, changes = self.db.whats_changed(local_gen_before_sync)
822 self.assertEqual(1, len(changes))
823 self.assertEqual(doc2.doc_id, changes[0][0])
824@@ -1212,6 +1212,22 @@
825 self.assertGetDoc(self.db, doc2.doc_id, doc2.rev, tests.nested_doc,
826 False)
827
828+ def test_db_sync_autocreate(self):
829+ doc1 = self.db.create_doc_from_json(tests.simple_doc)
830+ local_gen_before_sync = self.do_sync('test3.db')
831+ gen, _, changes = self.db.whats_changed(local_gen_before_sync)
832+ self.assertEqual(0, gen - local_gen_before_sync)
833+ db3 = self.request_state.open_database('test3.db')
834+ gen, _, changes = db3.whats_changed()
835+ self.assertEqual(1, len(changes))
836+ self.assertEqual(doc1.doc_id, changes[0][0])
837+ self.assertGetDoc(db3, doc1.doc_id, doc1.rev, tests.simple_doc,
838+ False)
839+ t_gen, _ = self.db._get_replica_gen_and_trans_id('test3.db')
840+ s_gen, _ = db3._get_replica_gen_and_trans_id('test1')
841+ self.assertEqual(1, t_gen)
842+ self.assertEqual(1, s_gen)
843+
844
845 class TestRemoteSyncIntegration(tests.TestCaseWithServer):
846 """Integration tests for the most common sync scenario local -> remote"""

Subscribers

People subscribed via source and target branches