Merge lp:~pedronis/u1db/remote-sync-exchange-response-order into lp:u1db

Proposed by Samuele Pedroni
Status: Rejected
Rejected by: Samuele Pedroni
Proposed branch: lp:~pedronis/u1db/remote-sync-exchange-response-order
Merge into: lp:u1db
Prerequisite: lp:~pedronis/u1db/remote-sync-exchange
Diff against target: 233 lines (+53/-32)
6 files modified
u1db/backends/__init__.py (+19/-5)
u1db/remote/requests.py (+7/-2)
u1db/remote/sync_server.py (+9/-10)
u1db/tests/__init__.py (+5/-1)
u1db/tests/test_remote_client.py (+6/-9)
u1db/tests/test_remote_sync_server.py (+7/-5)
To merge this branch: bzr merge lp:~pedronis/u1db/remote-sync-exchange-response-order
Reviewer Review Type Date Requested Status
Ubuntu One hackers Pending
Review via email: mp+80801@code.launchpad.net

Description of the change

reorganize things to send args first in remote sync exchange response

To post a comment you must log in.
Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 10/31/2011 03:05 PM, Samuele Pedroni wrote:
> Samuele Pedroni has proposed merging
> lp:~pedronis/u1db/remote-sync-exchange-response-order into lp:u1db
> with lp:~pedronis/u1db/remote-sync-exchange as a prerequisite.
>
> Requested reviews: Ubuntu One hackers (ubuntuone-hackers)
>
> For more details, see:
> https://code.launchpad.net/~pedronis/u1db/remote-sync-exchange-response-order/+merge/80801
>
> reorganize things to send args first in remote sync exchange
> response

I think the overall change here is that 'send_response' doesn't
actually send a complete response back.

Instead, the act of getting 'e\0\0\0' from the client triggers us to
send the end of the response back to the client.

And that feels a bit wrong. If a client sends us an incomplete
request, or if we error out in the middle, the server should still
send the client back a complete record.

I'd be a lot happier with an api that had:

def start_response(...)

def finish_response(...)

def send_response(...):
  self.start_response(...)
  self.finish_response(...)

So that we have a convenience function for RPCs that just a have a
single chunk of response to send. And fuller api so if RPCs want to
control the start and finish of the response.

Doing it based on data from the client doesn't feel right.

I definitely like that the response header is sent before the stream
content.

> + my_gen = self._checkpoint_sync_exchange(from_replica_uid,
> +
> from_replica_generation, +
> last_known_generation) +

^- I'm not sure about the name here.

> === modified file 'u1db/remote/requests.py' ---
> u1db/remote/requests.py 2011-10-31 14:04:45 +0000 +++
> u1db/remote/requests.py 2011-10-31 14:04:45 +0000 @@ -168,11
> +168,16 @@
>
> def handle_end(self): def send_doc(doc_id, doc_rev, doc): -
> self.responder.send_stream_entry((doc_id, doc_rev, doc)) +
> self.responder.stream_entry((doc_id, doc_rev, doc)) +
> new_gen = self.target._checkpoint_sync_exchange( +
> self.from_replica_uid, +
> self.from_replica_generation, +
> self.last_known_generation) +
> self.responder.send_response(other_new_generation=new_gen) new_gen
> = self.target._finish_sync_exchange(self.from_replica_uid,
> self.from_replica_generation, self.last_known_generation,
> send_doc) - self._result(other_new_generation=new_gen) +
> self._close()
>

^- I understand how you are splitting things up here, but it feels a
bit wrong to have RPCSyncExchangeRequest poking at the private apis of
SyncTarget.

I think making it clear how to split up sync exchange into separate
steps is great. We should just make it a bit more part-of-an-api
rather than private collaboration between the classes.

I think if we moved to a state-of-sync object that might be a way to
go. Because it is true that we don't want to expose it as part of the
network RPC.

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAk6u/E8ACgkQJdeBCYSNAANalgCeJBmm5KYZkFj1YfvY++qan+wo
8nsAoI/jQD1HPwByEQK1YpECVfRG6SJp
=xjvG
-----END PGP SIGNATURE-----

Revision history for this message
Samuele Pedroni (pedronis) wrote :
Download full text (3.4 KiB)

some of this was addressed in:

https://code.launchpad.net/~pedronis/u1db/remote-sync-exchange-consolidated

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> On 10/31/2011 03:05 PM, Samuele Pedroni wrote:
> > Samuele Pedroni has proposed merging
> > lp:~pedronis/u1db/remote-sync-exchange-response-order into lp:u1db
> > with lp:~pedronis/u1db/remote-sync-exchange as a prerequisite.
> >
> > Requested reviews: Ubuntu One hackers (ubuntuone-hackers)
> >
> > For more details, see:
> > https://code.launchpad.net/~pedronis/u1db/remote-sync-exchange-response-
> order/+merge/80801
> >
> > reorganize things to send args first in remote sync exchange
> > response
>
>
> I think the overall change here is that 'send_response' doesn't
> actually send a complete response back.
>
> Instead, the act of getting 'e\0\0\0' from the client triggers us to
> send the end of the response back to the client.
>
> And that feels a bit wrong. If a client sends us an incomplete
> request, or if we error out in the middle, the server should still
> send the client back a complete record.
>
> I'd be a lot happier with an api that had:
>
> def start_response(...)
>
> def finish_response(...)
>
> def send_response(...):
> self.start_response(...)
> self.finish_response(...)

done this way

>
> So that we have a convenience function for RPCs that just a have a
> single chunk of response to send. And fuller api so if RPCs want to
> control the start and finish of the response.
>
> Doing it based on data from the client doesn't feel right.
>
> I definitely like that the response header is sent before the stream
> content.
>
>
>
> > + my_gen = self._checkpoint_sync_exchange(from_replica_uid,
> > +
> > from_replica_generation, +
> > last_known_generation) +
>
> ^- I'm not sure about the name here.
>
> > === modified file 'u1db/remote/requests.py' ---
> > u1db/remote/requests.py 2011-10-31 14:04:45 +0000 +++
> > u1db/remote/requests.py 2011-10-31 14:04:45 +0000 @@ -168,11
> > +168,16 @@
> >
> > def handle_end(self): def send_doc(doc_id, doc_rev, doc): -
> > self.responder.send_stream_entry((doc_id, doc_rev, doc)) +
> > self.responder.stream_entry((doc_id, doc_rev, doc)) +
> > new_gen = self.target._checkpoint_sync_exchange( +
> > self.from_replica_uid, +
> > self.from_replica_generation, +
> > self.last_known_generation) +
> > self.responder.send_response(other_new_generation=new_gen) new_gen
> > = self.target._finish_sync_exchange(self.from_replica_uid,
> > self.from_replica_generation, self.last_known_generation,
> > send_doc) - self._result(other_new_generation=new_gen) +
> > self._close()
> >
>
>
> ^- I understand how you are splitting things up here, but it feels a
> bit wrong to have RPCSyncExchangeRequest poking at the private apis of
> SyncTarget.
>
> I think making it clear how to split up sync exchange into separate
> steps is great. We should just make it a bit more part-of-an-api
> rather than private collaboration between the classes.
>
> I think if we moved to a state-of-sync object that might be a way to
> go. Because it is true that we don't want to expose it as part of the
> network RPC.

yes we need to extr...

Read more...

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'u1db/backends/__init__.py'
2--- u1db/backends/__init__.py 2011-10-31 14:04:45 +0000
3+++ u1db/backends/__init__.py 2011-10-31 14:04:45 +0000
4@@ -24,6 +24,8 @@
5 self._db = db
6 self.conflict_ids = set()
7 self.seen_ids = set() # not superseded
8+ self.my_gen = None
9+ self.changed_doc_ids = None
10 self._docs_trace = [] # for tests
11
12 def _insert_other_doc(self, doc_id, doc_rev, doc):
13@@ -51,16 +53,28 @@
14 last_known_generation, take_other_doc):
15 for doc_id, doc_rev, doc in docs_info:
16 self._insert_other_doc(doc_id, doc_rev, doc)
17- return self._finish_sync_exchange(from_replica_uid,
18- from_replica_generation,
19- last_known_generation, take_other_doc)
20+ my_gen = self._checkpoint_sync_exchange(from_replica_uid,
21+ from_replica_generation,
22+ last_known_generation)
23+ self._finish_sync_exchange(from_replica_uid,
24+ from_replica_generation,
25+ last_known_generation, take_other_doc)
26+ return my_gen
27+
28+ def _checkpoint_sync_exchange(self, from_replica_uid,
29+ from_replica_generation,
30+ last_known_generation):
31+ my_gen, changed_doc_ids = self._db.whats_changed(last_known_generation)
32+ self.my_gen = my_gen
33+ self.changed_doc_ids = changed_doc_ids
34+ return my_gen
35
36 def _finish_sync_exchange(self, from_replica_uid, from_replica_generation,
37 last_known_generation, take_other_doc):
38 seen_ids = self.seen_ids
39 conflict_ids = self.conflict_ids
40- new_docs = []
41- my_gen, changed_doc_ids = self._db.whats_changed(last_known_generation)
42+ my_gen = self.my_gen
43+ changed_doc_ids = self.changed_doc_ids
44 doc_ids_to_return = [doc_id for doc_id in changed_doc_ids
45 if doc_id not in seen_ids]
46 new_docs = self._db.get_docs(doc_ids_to_return,
47
48=== modified file 'u1db/remote/requests.py'
49--- u1db/remote/requests.py 2011-10-31 14:04:45 +0000
50+++ u1db/remote/requests.py 2011-10-31 14:04:45 +0000
51@@ -168,11 +168,16 @@
52
53 def handle_end(self):
54 def send_doc(doc_id, doc_rev, doc):
55- self.responder.send_stream_entry((doc_id, doc_rev, doc))
56+ self.responder.stream_entry((doc_id, doc_rev, doc))
57+ new_gen = self.target._checkpoint_sync_exchange(
58+ self.from_replica_uid,
59+ self.from_replica_generation,
60+ self.last_known_generation)
61+ self.responder.send_response(other_new_generation=new_gen)
62 new_gen = self.target._finish_sync_exchange(self.from_replica_uid,
63 self.from_replica_generation,
64 self.last_known_generation,
65 send_doc)
66- self._result(other_new_generation=new_gen)
67+ self._close()
68
69 RPCSyncExchange.register()
70
71=== modified file 'u1db/remote/sync_server.py'
72--- u1db/remote/sync_server.py 2011-10-31 14:04:45 +0000
73+++ u1db/remote/sync_server.py 2011-10-31 14:04:45 +0000
74@@ -170,10 +170,11 @@
75
76 def received_end(self):
77 self._request.handle_end()
78- if not self._responder._sent_response:
79+ if not self._responder._started:
80 raise errors.BadProtocolStream("Client sent end-of-message,"
81 " but the Request did not generate a response."
82 " for Request: %s" % (self._request,))
83+ self._responder._finish_response()
84
85 class Responder(object):
86 """Encode responses from the server back to the client."""
87@@ -184,7 +185,6 @@
88 BUFFER_SIZE)
89 self._encoder = protocol.ProtocolEncoderV1(self._out_buffer.write)
90 self._started = False
91- self._sent_response = False
92 self.request_name = ''
93
94 def _write_to_client(self, content):
95@@ -203,18 +203,17 @@
96
97 # have a way to transmit an error
98
99- def send_stream_entry(self, entry):
100- "send stream entry as part of the response."
101- self._start_response()
102- self._encoder.encode_dict('x', entry)
103-
104 def send_response(self, **kwargs):
105 """send/finalize response."""
106 self._start_response()
107 if kwargs:
108 self._encoder.encode_dict('a', kwargs)
109+
110+ def stream_entry(self, entry):
111+ "send stream entry as part of the response."
112+ self._start_response()
113+ self._encoder.encode_dict('x', entry)
114+
115+ def _finish_response(self):
116 self._encoder.encode_end()
117 self._out_buffer.flush()
118- self._sent_response = True
119-
120-
121
122=== modified file 'u1db/tests/__init__.py'
123--- u1db/tests/__init__.py 2011-10-31 14:04:45 +0000
124+++ u1db/tests/__init__.py 2011-10-31 14:04:45 +0000
125@@ -108,12 +108,16 @@
126
127 class ResponderForTests(object):
128 """Responder for tests."""
129+ _started = False
130 _sent_response = False
131 status = 'success'
132
133 def send_response(self, **kwargs):
134+ self._started = True
135+ self.kwargs = kwargs
136+
137+ def _finish_response(self):
138 self._sent_response = True
139- self.kwargs = kwargs
140
141
142 class TestCaseWithSyncServer(TestCase):
143
144=== modified file 'u1db/tests/test_remote_client.py'
145--- u1db/tests/test_remote_client.py 2011-10-31 14:04:45 +0000
146+++ u1db/tests/test_remote_client.py 2011-10-31 14:04:45 +0000
147@@ -75,20 +75,17 @@
148
149 def __init__(self, state, responder):
150 super(WithStreamRequest, self).__init__(state, responder)
151- self._args = None
152- self._entries = []
153
154 def handle_args(self, **kwargs):
155- self._args = kwargs
156+ self.responder.send_response(**kwargs)
157
158 def handle_stream_entry(self, entry):
159- self._entries.append(entry)
160+ v = entry['outgoing'] * 5
161+ self.responder.stream_entry({'incoming': v})
162
163 def handle_end(self):
164- for entry in self._entries:
165- v = entry['outgoing'] * 5
166- self.responder.send_stream_entry({'incoming': v})
167- self.responder.send_response(**self._args)
168+ pass
169+
170
171
172 class TestClient(tests.TestCase):
173@@ -200,9 +197,9 @@
174 'u1db-1\n'
175 'h%s{"server_version": "%s", "request": "withstream"}'
176 % (struct.pack('!L', 47 + len(_u1db_version)), _u1db_version)
177+ + 'a\x00\x00\x00\x0a{"one": 1}'
178 + 'x\x00\x00\x00\x10{"incoming": 50}'
179 + 'x\x00\x00\x00\x11{"incoming": 100}'
180- + 'a\x00\x00\x00\x0a{"one": 1}'
181 + 'e\x00\x00\x00\x00',
182 content)
183 entries = []
184
185=== modified file 'u1db/tests/test_remote_sync_server.py'
186--- u1db/tests/test_remote_sync_server.py 2011-10-31 14:04:45 +0000
187+++ u1db/tests/test_remote_sync_server.py 2011-10-31 14:04:45 +0000
188@@ -232,22 +232,22 @@
189 def test_send_response_after_args(self):
190 handler = self.makeStructToRequest()
191 handler.received_header({'client_version': '1', 'request': 'arg'})
192- self.assertFalse(handler._responder._sent_response)
193+ self.assertFalse(handler._responder._started)
194 handler.received_args({'arg': 'value', 'foo': 1})
195- self.assertTrue(handler._responder._sent_response)
196+ self.assertTrue(handler._responder._started)
197 handler.received_end()
198
199 def test_send_response_after_end(self):
200 handler = self.makeStructToRequest()
201 handler.received_header({'client_version': '1', 'request': 'end'})
202- self.assertFalse(handler._responder._sent_response)
203+ self.assertFalse(handler._responder._started)
204 handler.received_end()
205 self.assertTrue(handler._responder._sent_response)
206
207 def test_end_no_response(self):
208 handler = self.makeStructToRequest()
209 handler.received_header({'client_version': '1', 'request': 'arg'})
210- self.assertFalse(handler._responder._sent_response)
211+ self.assertFalse(handler._responder._started)
212 self.assertRaises(errors.BadProtocolStream,
213 handler.received_end)
214
215@@ -279,6 +279,7 @@
216 responder = sync_server.Responder(server_sock)
217 responder.request_name = 'request'
218 responder.send_response(value='success')
219+ responder._finish_response()
220 self.assertEqual(
221 'u1db-1\n'
222 'h%s{"server_version": "%s", "request": "request"}'
223@@ -291,8 +292,9 @@
224 server_sock, client_sock = tests.socket_pair()
225 responder = sync_server.Responder(server_sock)
226 responder.request_name = 'request'
227- responder.send_stream_entry({'entry': True})
228 responder.send_response()
229+ responder.stream_entry({'entry': True})
230+ responder._finish_response()
231 self.assertEqual(
232 'u1db-1\n'
233 'h%s{"server_version": "%s", "request": "request"}'

Subscribers

People subscribed via source and target branches