dee

Merge lp:~kamstrup/dee/shared-model-txn-notify into lp:dee

Proposed by Mikkel Kamstrup Erlandsen
Status: Merged
Approved by: Michal Hruby
Approved revision: 337
Merged at revision: 332
Proposed branch: lp:~kamstrup/dee/shared-model-txn-notify
Merge into: lp:dee
Diff against target: 474 lines (+189/-20)
8 files modified
src/dee-marshal.list (+7/-3)
src/dee-shared-model.c (+89/-17)
tests/model-helper-add3rows.c (+24/-0)
tests/model-helper-clear3add5.c (+28/-0)
tests/model-helper-clear3rows.c (+26/-0)
tests/server-helper-client.c (+4/-0)
tests/test-model-seqnums.c (+9/-0)
vapi/dee-1.0.vapi (+2/-0)
To merge this branch: bzr merge lp:~kamstrup/dee/shared-model-txn-notify
Reviewer Review Type Date Requested Status
Michal Hruby (community) Approve
Review via email: mp+87722@code.launchpad.net

Description of the change

Add two new signals to DeeSharedModel; begin-transaction and end-transaction.

Both signals will pass in the seqnums of the model before- and after the
transaction.

To post a comment you must log in.
Revision history for this message
Michal Hruby (mhr3) wrote :

Looks quite good, although I'd prefer more comments, specifically:

144 + flush_revision_queue (DEE_MODEL (user_data));

Why is this needed now?

236 + /* Chain up to parent class impl */
237 + ((DeeModelIface*) g_type_interface_peek_parent (DEE_MODEL_GET_IFACE(model)))->clear (model);

Why the chainup?

review: Needs Fixing
Revision history for this message
Mikkel Kamstrup Erlandsen (kamstrup) wrote :

Pushed with comments

337. By Mikkel Kamstrup Erlandsen

Add some comments to clarify some of the more obscure logic

Revision history for this message
Michal Hruby (mhr3) wrote :

Great!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/dee-marshal.list'
2--- src/dee-marshal.list 2010-05-06 13:45:44 +0000
3+++ src/dee-marshal.list 2012-01-06 12:59:23 +0000
4@@ -1,8 +1,12 @@
5-# DbusSharedModel::RowsAdded, DbusSharedModel::RowsChanged
6+# DeeSharedModel::RowsAdded, DbusSharedModel::RowsChanged
7 VOID:BOXED,BOXED,BOXED
8
9-# DbusSharedModel::RowsRemoved
10+# DeeSharedModel::RowsRemoved
11 VOID:BOXED,BOXED
12
13-# DbusPeer::bye
14+# DeeSharedModel
15+VOID:UINT64,UINT64
16+
17+# DeePeer::bye
18 VOID:STRING,STRING
19+
20
21=== modified file 'src/dee-shared-model.c'
22--- src/dee-shared-model.c 2011-12-16 13:01:10 +0000
23+++ src/dee-shared-model.c 2012-01-06 12:59:23 +0000
24@@ -85,6 +85,7 @@
25 GSList *connections;
26 gchar *model_path;
27
28+ guint64 last_committed_seqnum;
29 /* Buffer of DeeSharedModelRevisions that we keep in order to batch
30 * our DBus signals. The invariant is that all buffered revisions
31 * are of the same type */
32@@ -139,12 +140,13 @@
33 enum
34 {
35 /* Public signal */
36- READY,
37+ BEGIN_TRANSACTION,
38+ END_TRANSACTION,
39
40 LAST_SIGNAL
41 };
42
43-//static guint32 _signals[LAST_SIGNAL] = { 0 };
44+static guint32 _signals[LAST_SIGNAL] = { 0 };
45
46 /* Forwards */
47 static void on_connection_acquired (DeeSharedModel *self,
48@@ -315,7 +317,10 @@
49 /* If we don't have anything queued up, just return. It's assumed beyond
50 * this point that it is non-empty */
51 if (priv->revision_queue == NULL)
52- return 0;
53+ {
54+ priv->last_committed_seqnum = dee_serializable_model_get_seqnum (self);
55+ return 0;
56+ }
57
58 /* Since we always prepend to the queue we need to reverse it */
59 priv->revision_queue = g_slist_reverse (priv->revision_queue);
60@@ -326,7 +331,7 @@
61 * first element and assume that the last seqnum before this transaction
62 * started was the seqnum in the first revision - 1. */
63 seqnum_end = ((DeeSharedModelRevision *) priv->revision_queue->data)->seqnum - 1;
64- seqnum_begin = seqnum_end;
65+ seqnum_begin = priv->last_committed_seqnum;
66
67 g_variant_builder_init (&aav, G_VARIANT_TYPE ("aav"));
68 g_variant_builder_init (&au, G_VARIANT_TYPE ("au"));
69@@ -415,6 +420,8 @@
70 * we constructed the Commit message */
71 g_slist_free (priv->revision_queue);
72 priv->revision_queue = NULL;
73+
74+ priv->last_committed_seqnum = seqnum_end;
75
76 return seqnum_end - seqnum_begin; // Very theoretical overflow possible here...
77 }
78@@ -674,6 +681,42 @@
79 FALSE,
80 G_PARAM_READABLE);
81 g_object_class_install_property (obj_class, PROP_SYNCHRONIZED, pspec);
82+
83+ /**
84+ * DeeSharedModel::begin-transaction:
85+ * @model: The shared model the signal is emitted on
86+ * @begin_seqnum: The seqnum the model has now
87+ * @end_seqnum: The seqnum the model will have after the transaction is applied
88+ *
89+ * Emitted right before a remote transaction will be committed to the model.
90+ */
91+ _signals[BEGIN_TRANSACTION] =
92+ g_signal_new ("begin-transaction",
93+ DEE_TYPE_SHARED_MODEL,
94+ G_SIGNAL_RUN_LAST,
95+ 0,
96+ NULL, NULL,
97+ _dee_marshal_VOID__UINT64_UINT64,
98+ G_TYPE_NONE, 2,
99+ G_TYPE_UINT64, G_TYPE_UINT64);
100+
101+ /**
102+ * DeeSharedModel::end-transaction:
103+ * @model: The shared model the signal is emitted on
104+ * @begin_seqnum: The seqnum the model had before the transaction was applied
105+ * @end_seqnum: The seqnum the model has now
106+ *
107+ * Emitted right after a remote transaction has been committed to the model.
108+ */
109+ _signals[END_TRANSACTION] =
110+ g_signal_new ("end-transaction",
111+ DEE_TYPE_SHARED_MODEL,
112+ G_SIGNAL_RUN_LAST,
113+ 0,
114+ NULL, NULL,
115+ _dee_marshal_VOID__UINT64_UINT64,
116+ G_TYPE_NONE, 2,
117+ G_TYPE_UINT64, G_TYPE_UINT64);
118
119 /* Add private data */
120 g_type_class_add_private (obj_class, sizeof (DeeSharedModelPrivate));
121@@ -689,6 +732,7 @@
122 priv->swarm = NULL;
123 priv->model_path = NULL;
124
125+ priv->last_committed_seqnum = 0;
126 priv->revision_queue = NULL;
127 priv->revision_queue_timeout_id = 0;
128 priv->swarm_leader_handler = 0;
129@@ -720,12 +764,20 @@
130 GDBusMethodInvocation *invocation,
131 gpointer user_data)
132 {
133- GVariant *retval;
134+ DeeSharedModelPrivate *priv;
135+ GVariant *retval;
136
137 g_return_if_fail (DEE_IS_SHARED_MODEL (user_data));
138+
139+ priv = DEE_SHARED_MODEL (user_data)->priv;
140
141 if (g_strcmp0 ("Clone", method_name) == 0)
142 {
143+ /* If we have anything in the rev queue it wont validate against the
144+ * seqnum for the cloned model. So flush the rev queue before answering
145+ * the Clone call */
146+ flush_revision_queue (DEE_MODEL (user_data));
147+
148 /* We return a special error if we have no schema. It's legal for the
149 * leader to expect the schema from the slaves */
150 if (dee_model_get_n_columns (DEE_MODEL (user_data)) == 0)
151@@ -931,6 +983,11 @@
152 * but in that case we should still consider our selves synchronized */
153 if (transaction != NULL)
154 {
155+ /* Guard against a race where we might inadvertedly have accepted a Commit
156+ * before receiving the initial Clone */
157+ if (dee_model_get_n_columns (DEE_MODEL (self)) > 0)
158+ reset_model (DEE_MODEL (self));
159+
160 /* We use the swarm name as sender_name here, because DBus passes us the
161 * unique name of the swarm leader here and we want to indicate in the debug
162 * messages that the transaction came from the leader */
163@@ -1143,9 +1200,11 @@
164 if (current_seqnum != 0 &&
165 current_seqnum != seqnum_before)
166 {
167- g_warning ("Transaction from %s is in the %s. Ignoring transaction.",
168+ g_warning ("Transaction from %s is in the %s. Expected seqnum %"G_GUINT64_FORMAT
169+ ", but got %"G_GUINT64_FORMAT". Ignoring transaction.",
170 sender_name,
171- current_seqnum < seqnum_before ? "future" : "past");
172+ current_seqnum < seqnum_before ? "future" : "past",
173+ current_seqnum, seqnum_before);
174 if (dee_shared_model_is_leader (self))
175 {
176 g_warning ("Invalidating %s", sender_name);
177@@ -1173,7 +1232,7 @@
178 sender_name);
179 // FIXME cleanup
180 }
181- if (n_rows != (seqnum_after - seqnum_before))
182+ if (n_rows > (seqnum_after - seqnum_before))
183 {
184 g_warning ("Commit from %s has illegal seqnum count.",
185 sender_name);
186@@ -1185,6 +1244,7 @@
187 trace_object (self, "Applying transaction of %i rows", n_rows);
188
189 /* Phew. Finally. We're ready to merge the changes */
190+ g_signal_emit (self, _signals[BEGIN_TRANSACTION], 0, seqnum_before, seqnum_after);
191 priv->suppress_remote_signals = TRUE;
192 for (i = 0; i < n_rows; i++) /* Begin outer loop */
193 {
194@@ -1270,7 +1330,13 @@
195 g_variant_unref (au);
196 g_variant_unref (ay);
197
198+ /* We must manually override the seqnum in case we started off from
199+ * zero our selves, but the transaction was a later snapshot */
200 dee_serializable_model_set_seqnum (DEE_MODEL (self), seqnum_after);
201+
202+ priv->last_committed_seqnum = seqnum_after;
203+
204+ g_signal_emit (self, _signals[END_TRANSACTION], 0, seqnum_before, seqnum_after);
205 }
206
207 static void
208@@ -1650,6 +1716,7 @@
209 DeeModel *backend;
210 gboolean was_suppressing;
211 guint64 seqnum;
212+ guint n_rows;
213
214 self = DEE_SHARED_MODEL (model);
215 priv = self->priv;
216@@ -1658,21 +1725,25 @@
217
218 was_suppressing = priv->suppress_remote_signals;
219 seqnum = dee_serializable_model_get_seqnum (model);
220- if (!was_suppressing && dee_model_get_n_rows (model) > 0)
221+ n_rows = dee_model_get_n_rows (model);
222+
223+ if (!was_suppressing && n_rows > 0)
224 {
225+ seqnum += n_rows;
226 enqueue_revision (model,
227 CHANGE_TYPE_CLEAR,
228 0,
229- ++seqnum,
230+ seqnum,
231 NULL);
232 }
233 /* make sure we don't enqueue lots of CHANGE_TYPE_REMOVE */
234 priv->suppress_remote_signals = TRUE;
235
236- /* FIXME: should we set the seqnum after each removed row? */
237- dee_model_clear (backend);
238+ /* Chain up to parent class impl. This handles the seqnums for us and the
239+ * backend alike. We just hook in before it, really, to player clever
240+ * tricks with the revision queue (inserting a CLEAR and not N*REMOVE) */
241+ ((DeeModelIface*) g_type_interface_peek_parent (DEE_MODEL_GET_IFACE(model)))->clear (model);
242
243- dee_serializable_model_set_seqnum (model, seqnum);
244 priv->suppress_remote_signals = was_suppressing;
245
246 g_object_unref (backend);
247@@ -1695,9 +1766,6 @@
248 guint64 last_seqnum;
249
250 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
251-
252- trace_object (self, "Serializing %u rows",
253- dee_model_get_n_rows (DEE_MODEL (self)));
254
255 _self = DEE_MODEL (self);
256 n_columns = dee_model_get_n_columns (DEE_MODEL (self));
257@@ -1732,7 +1800,7 @@
258
259 /* Collect the seqnums */
260 last_seqnum = dee_serializable_model_get_seqnum (_self);
261- tt = g_variant_new ("(tt)", last_seqnum - i, last_seqnum);
262+ tt = g_variant_new ("(tt)", last_seqnum - i, last_seqnum);// FIXME last_committed_seqnum
263
264 /* Build the final clone */
265 g_variant_builder_init (&clone, G_VARIANT_TYPE ("(sasaavauay(tt))"));
266@@ -1743,6 +1811,10 @@
267 g_variant_builder_add_value (&clone, g_variant_builder_end (&ay));
268 g_variant_builder_add_value (&clone, tt);
269
270+ trace_object (self, "Serialized %u rows. "
271+ "Seqnum range %"G_GUINT64_FORMAT"-%"G_GUINT64_FORMAT,
272+ i, last_seqnum - i, last_seqnum);
273+
274 return g_variant_builder_end (&clone);
275 }
276
277
278=== modified file 'tests/model-helper-add3rows.c'
279--- tests/model-helper-add3rows.c 2011-12-14 14:09:58 +0000
280+++ tests/model-helper-add3rows.c 2012-01-06 12:59:23 +0000
281@@ -25,6 +25,23 @@
282 #include <gtx.h>
283 #include <dee.h>
284
285+guint64 before_begin_seqnum, before_end_seqnum,
286+ after_begin_seqnum, after_end_seqnum;
287+
288+static void
289+_begin_txn (DeeSharedModel *model, guint64 begin_seqnum, guint64 end_seqnum)
290+{
291+ before_begin_seqnum = begin_seqnum;
292+ before_end_seqnum = end_seqnum;
293+}
294+
295+static void
296+_end_txn (DeeSharedModel *model, guint64 begin_seqnum, guint64 end_seqnum)
297+{
298+ after_begin_seqnum = begin_seqnum;
299+ after_end_seqnum = end_seqnum;
300+}
301+
302 static void
303 _row_added (DeeModel *model, DeeModelIter *iter, GSList **rows_so_far)
304 {
305@@ -48,6 +65,8 @@
306 else
307 model = dee_shared_model_new_for_peer ((DeePeer*) dee_client_new (argv[1]));
308
309+ g_signal_connect (model, "begin-transaction", G_CALLBACK (_begin_txn), NULL);
310+ g_signal_connect (model, "end-transaction", G_CALLBACK (_end_txn), NULL);
311
312 /* Wait until we find the leader */
313 if (gtx_wait_for_signal (G_OBJECT (model), 1000, "notify::synchronized", NULL))
314@@ -84,5 +103,10 @@
315 gtx_assert_last_unref (model);
316 g_slist_free (rows_added);
317
318+ g_assert (before_begin_seqnum == after_begin_seqnum);
319+ g_assert (before_end_seqnum == after_end_seqnum);
320+ g_assert_cmpint (0, ==, (guint) before_begin_seqnum);
321+ g_assert_cmpint (3, ==, (guint) before_end_seqnum);
322+
323 return 0;
324 }
325
326=== modified file 'tests/model-helper-clear3add5.c'
327--- tests/model-helper-clear3add5.c 2011-12-14 14:09:58 +0000
328+++ tests/model-helper-clear3add5.c 2012-01-06 12:59:23 +0000
329@@ -25,6 +25,23 @@
330 #include <gtx.h>
331 #include <dee.h>
332
333+guint64 before_begin_seqnum, before_end_seqnum,
334+ after_begin_seqnum, after_end_seqnum;
335+
336+static void
337+_begin_txn (DeeSharedModel *model, guint64 begin_seqnum, guint64 end_seqnum)
338+{
339+ before_begin_seqnum = begin_seqnum;
340+ before_end_seqnum = end_seqnum;
341+}
342+
343+static void
344+_end_txn (DeeSharedModel *model, guint64 begin_seqnum, guint64 end_seqnum)
345+{
346+ after_begin_seqnum = begin_seqnum;
347+ after_end_seqnum = end_seqnum;
348+}
349+
350 static void
351 _row_added (DeeModel *model, DeeModelIter *iter, GSList **added)
352 {
353@@ -47,6 +64,8 @@
354 else
355 model = dee_shared_model_new_for_peer ((DeePeer*) dee_client_new (argv[1]));
356
357+ g_signal_connect (model, "begin-transaction", G_CALLBACK (_begin_txn), NULL);
358+ g_signal_connect (model, "end-transaction", G_CALLBACK (_end_txn), NULL);
359
360 if (gtx_wait_for_signal (G_OBJECT (model), 100000, "notify::synchronized", NULL))
361 g_error ("Helper model timed out waiting for 'ready' signal");
362@@ -65,6 +84,15 @@
363 g_assert (g_slist_length (added) == 5 || g_slist_length (added) == 2);
364 g_assert_cmpint (dee_model_get_n_rows (model), ==, 5);
365
366+ /* Disregarding the hypothetical optimization mentioned above we need the
367+ * correct seqnum */
368+ g_assert_cmpint (11, ==, (guint) dee_serializable_model_get_seqnum (model));
369+
370+ g_assert (before_begin_seqnum == after_begin_seqnum);
371+ g_assert (before_end_seqnum == after_end_seqnum);
372+ g_assert_cmpint (3, ==, (guint) before_begin_seqnum);
373+ g_assert_cmpint (11, ==, (guint) before_end_seqnum);
374+
375 gtx_assert_last_unref (model);
376 g_slist_free (added);
377
378
379=== modified file 'tests/model-helper-clear3rows.c'
380--- tests/model-helper-clear3rows.c 2011-12-14 14:09:58 +0000
381+++ tests/model-helper-clear3rows.c 2012-01-06 12:59:23 +0000
382@@ -25,6 +25,23 @@
383 #include <gtx.h>
384 #include <dee.h>
385
386+guint64 before_begin_seqnum, before_end_seqnum,
387+ after_begin_seqnum, after_end_seqnum;
388+
389+static void
390+_begin_txn (DeeSharedModel *model, guint64 begin_seqnum, guint64 end_seqnum)
391+{
392+ before_begin_seqnum = begin_seqnum;
393+ before_end_seqnum = end_seqnum;
394+}
395+
396+static void
397+_end_txn (DeeSharedModel *model, guint64 begin_seqnum, guint64 end_seqnum)
398+{
399+ after_begin_seqnum = begin_seqnum;
400+ after_end_seqnum = end_seqnum;
401+}
402+
403 static void
404 _row_removed (DeeModel *model, DeeModelIter *iter, GSList **removed)
405 {
406@@ -47,6 +64,8 @@
407 else
408 model = dee_shared_model_new_for_peer ((DeePeer*) dee_client_new (argv[1]));
409
410+ g_signal_connect (model, "begin-transaction", G_CALLBACK (_begin_txn), NULL);
411+ g_signal_connect (model, "end-transaction", G_CALLBACK (_end_txn), NULL);
412
413 if (gtx_wait_for_signal (G_OBJECT (model), 100000, "notify::synchronized", NULL))
414 g_error ("Helper model timed out waiting for 'ready' signal");
415@@ -63,6 +82,13 @@
416 g_assert_cmpint (g_slist_length (removed), ==, 3);
417 g_assert_cmpint (dee_model_get_n_rows(model), ==, 0);
418
419+ g_assert_cmpint (6, ==, (guint) dee_serializable_model_get_seqnum (model));
420+
421+ g_assert (before_begin_seqnum == after_begin_seqnum);
422+ g_assert (before_end_seqnum == after_end_seqnum);
423+ g_assert_cmpint (3, ==, (guint) before_begin_seqnum);
424+ g_assert_cmpint (6, ==, (guint) before_end_seqnum);
425+
426 gtx_assert_last_unref (model);
427 g_slist_free (removed);
428
429
430=== modified file 'tests/server-helper-client.c'
431--- tests/server-helper-client.c 2012-01-02 15:59:19 +0000
432+++ tests/server-helper-client.c 2012-01-06 12:59:23 +0000
433@@ -102,6 +102,10 @@
434 gtx_yield_main_loop (200);
435 if (finished_children == num_clients) break;
436 }
437+
438+ /* Give a window of opportunity for children to
439+ * flush stdout/err before we exit */
440+ gtx_yield_main_loop (200);
441
442 g_assert_cmpint (num_clients, ==, finished_children);
443
444
445=== modified file 'tests/test-model-seqnums.c'
446--- tests/test-model-seqnums.c 2011-12-09 11:47:47 +0000
447+++ tests/test-model-seqnums.c 2012-01-06 12:59:23 +0000
448@@ -143,4 +143,13 @@
449
450 dee_model_clear (fix->model);
451 g_assert_cmpint (3, ==, dee_serializable_model_get_seqnum (model));
452+
453+ dee_model_append (fix->model, 11);
454+ dee_model_append (fix->model, 12);
455+ dee_model_append (fix->model, 13);
456+ g_assert_cmpint (6, ==, dee_serializable_model_get_seqnum (model));
457+
458+ dee_model_clear (fix->model);
459+ g_assert_cmpint (9, ==, dee_serializable_model_get_seqnum (model));
460 }
461+
462
463=== modified file 'vapi/dee-1.0.vapi'
464--- vapi/dee-1.0.vapi 2012-01-04 08:32:53 +0000
465+++ vapi/dee-1.0.vapi 2012-01-06 12:59:23 +0000
466@@ -150,6 +150,8 @@
467 public Dee.Peer peer { get; construct; }
468 [NoAccessorMethod]
469 public bool synchronized { get; }
470+ public signal void begin_transaction (uint64 begin_seqnum, uint64 end_seqnum);
471+ public signal void end_transaction (uint64 begin_seqnum, uint64 end_seqnum);
472 }
473 [CCode (cheader_filename = "dee.h", type_id = "dee_term_list_get_type ()")]
474 public class TermList : GLib.Object {

Subscribers

People subscribed via source and target branches