Merge lp:~cmiller/desktopcouch/kill-asyncore-use-twisted into lp:desktopcouch

Proposed by Chad Miller
Status: Merged
Approved by: Elliot Murphy
Approved revision: 3
Merged at revision: not available
Proposed branch: lp:~cmiller/desktopcouch/kill-asyncore-use-twisted
Merge into: lp:desktopcouch
Diff against target: None lines
To merge this branch: bzr merge lp:~cmiller/desktopcouch/kill-asyncore-use-twisted
Reviewer Review Type Date Requested Status
Elliot Murphy (community) Approve
Review via email: mp+8597@code.launchpad.net

Commit message

[r=statik] Change pairing tool to twisted instead of asyncore.

To post a comment you must log in.
Revision history for this message
Chad Miller (cmiller) wrote :

I, for one, welcome our new twisted overlords.

Revision history for this message
Elliot Murphy (statik) wrote :

Looks good! please file a bug for the couple of FIXME spots.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'desktopcouch/pair/couchdb_pairing/dbus_io.py'
2--- desktopcouch/pair/couchdb_pairing/dbus_io.py 2009-07-08 17:48:11 +0000
3+++ desktopcouch/pair/couchdb_pairing/dbus_io.py 2009-07-10 22:13:01 +0000
4@@ -26,7 +26,7 @@
5
6
7 def get_local_hostname():
8- """Get the name of this host, an ASCII-encoded string."""
9+ """Get the name of this host, as Unicode host and domain parts."""
10 bus, server = get_dbus_bus_server()
11 return server.GetHostName(), server.GetDomainName()
12
13
14=== modified file 'desktopcouch/pair/couchdb_pairing/network_io.py'
15--- desktopcouch/pair/couchdb_pairing/network_io.py 2009-07-08 17:48:11 +0000
16+++ desktopcouch/pair/couchdb_pairing/network_io.py 2009-07-10 22:13:01 +0000
17@@ -15,11 +15,14 @@
18 # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>.
19 """All inter-tool communication."""
20
21-import asyncore
22-import socket
23 import logging
24 import hashlib
25
26+from twisted.internet import reactor
27+from twisted.internet.protocol import ServerFactory, ReconnectingClientFactory
28+from twisted.protocols import basic
29+from twisted.internet import ssl
30+
31 import dbus
32
33 try:
34@@ -29,132 +32,81 @@
35 get_remote_hostname = lambda addr: None
36
37
38-# asyncore is ancient. pylint: disable-msg=W0233
39-# asyncore overridden. pylint: disable-msg=C0111
40-
41-message_terminator = "\n"
42 hash = hashlib.sha512
43
44-def receive_up_to_char(s, buffer_list, sentinel=message_terminator):
45- message = s.recv(4098)
46- if len(message) == 0:
47- return None
48-
49- buffer_list.append(message)
50- total_so_far = "".join(buffer_list)
51- if sentinel not in total_so_far:
52- return None
53-
54- message, extra = total_so_far.split(sentinel, 1)
55- buffer_list[:] = [ extra ]
56-
57- return message
58-
59-
60-
61-class ListenForInvitations(asyncore.dispatcher):
62+
63+class ListenForInvitations():
64 """Narrative "Alice".
65
66 This is the first half of a TCP listening socket. We spawn off
67 processors when we accept invitation-connections."""
68
69- def __init__(self, get_secret_from_user, on_close, local_map=False):
70- self.logging = logging.getLogger("ListenForInvitations")
71-
72- if local_map:
73- if not hasattr(local_map, "get"):
74- local_map = dict()
75- self.__map = local_map
76- asyncore.dispatcher.__init__(self, map=self.__map)
77- else:
78- self.__map = None
79- asyncore.dispatcher.__init__(self)
80-
81- self.get_secret_from_user = get_secret_from_user
82- self.on_close = on_close
83-
84- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
85- self.bind(("", 0))
86- self.listen(5)
87-
88- def process(self):
89- """How we jump into asyncore processing. Call this occasionally."""
90- asyncore.loop(0.01, False, self.__map, 3)
91- return True
92+ def __init__(self, get_secret_from_user, on_close):
93+ """Initialize."""
94+ self.logging = logging.getLogger(self.__class__.__name__)
95+
96+ self.factory = ProcessAnInvitationFactory(get_secret_from_user,
97+ on_close)
98+ self.listening_port = reactor.listenTCP(0, self.factory)
99
100 def get_local_port(self):
101 """We created a socket, and the caller needs to know what our port
102 number is, so it can advertise it."""
103
104- bogus_address, port = self.getsockname()
105+ port = self.listening_port.getHost().port
106 self.logging.info("local port to receive invitations is %s", port)
107 return port
108
109- def handle_accept(self):
110- s, remote_addr = self.accept()
111- self.logging.debug("accepted connection from %s", s)
112- ProcessAnInvitation(self.get_secret_from_user, s, self.on_close,
113- local_map=self.__map)
114-
115-
116-class ProcessAnInvitation(asyncore.dispatcher_with_send):
117+ def close(self):
118+ """Called from the UI when a window is destroyed and we do not need
119+ this connection any more."""
120+ self.listening_port.stopListening()
121+
122+
123+class ProcessAnInvitationProtocol(basic.LineReceiver):
124 """Narrative "Alice".
125
126 Listen for messages, and when we receive one, call the display callback
127 function with the inviter details plus a key."""
128
129- def __init__(self, get_secret_from_user, sock, on_close, local_map=False):
130- self.logging = logging.getLogger("ProcessAnInvitation")
131-
132- if local_map:
133- assert hasattr(local_map, "get")
134- asyncore.dispatcher_with_send.__init__(self, sock, map=local_map)
135- else:
136- asyncore.dispatcher_with_send.__init__(self, sock)
137-
138+ def __init__(self):
139+ """Initialize."""
140+ self.logging = logging.getLogger(self.__class__.__name__)
141 self.expected_hash = None
142 self.public_seed = None
143
144- self.get_secret_from_user = get_secret_from_user
145- self.on_close = on_close
146- self.recv_buffers = []
147- self.logging.debug("initialized")
148-
149- def handle_connect(self):
150- self.logging.debug("connecting")
151-
152- def handle_close(self):
153- self.logging.debug("closing")
154- self.on_close()
155- self.close()
156-
157- def handle_read(self):
158- message = receive_up_to_char(self, self.recv_buffers)
159- if message is None:
160- return
161-
162+ def connectionMade(self):
163+ """Called when a connection is made. No obligation here."""
164+ basic.LineReceiver.connectionMade(self)
165+
166+ def connectionLost(self):
167+ """Called when a connection is lost."""
168+ self.logging.debug("connection lost")
169+ basic.LineReceiver.connectionLost(self)
170+
171+ def lineReceived(self, message):
172+ """Handler for receipt of a message from the Bob end."""
173 h = hash()
174 digest_nybble_count = h.digest_size * 2
175 self.expected_hash = message[0:digest_nybble_count]
176 self.public_seed = message[digest_nybble_count:]
177- self.get_secret_from_user(self.addr[0], self.check_secret_from_user,
178+ self.factory.get_secret_from_user(self.transport.getPeer().host,
179+ self.check_secret_from_user,
180 self.send_secret_to_remote)
181
182 def send_secret_to_remote(self, secret_message):
183 """A callback for the invitation protocol to start a new phase
184 involving the other end getting the hash-digest of the public
185 seed and a secret we receive as a parameter."""
186-
187 h = hash()
188 h.update(self.public_seed)
189 h.update(secret_message)
190- self.send(h.hexdigest() + message_terminator)
191+ self.sendLine(h.hexdigest())
192
193 def check_secret_from_user(self, secret_message):
194 """A callback for the invitation protocol to verify the secret
195 that the user gives, against the hash we received over the
196 network."""
197-
198 h = hash()
199 h.update(secret_message)
200 digest = h.hexdigest()
201@@ -163,89 +115,111 @@
202 h = hash()
203 h.update(self.public_seed)
204 h.update(secret_message)
205- self.send(h.hexdigest() + message_terminator)
206+ self.sendLine(h.hexdigest())
207
208 self.logging.debug("User knew secret!")
209+
210+ self.transport.loseConnection()
211 return True
212
213 self.logging.info("User secret %r is wrong.", secret_message)
214 return False
215
216-# def handle_error(self, *args):
217-# self.logging.error("%s", args)
218-
219-
220-class SendInvitation(asyncore.dispatcher_with_send):
221- """Narrative "Bob".
222- """
223-
224- def __init__(self, host, port, auth_complete_cb, secret_message,
225- public_seed, on_close, local_map=False):
226- self.logging = logging.getLogger("SendInvitation")
227-
228- if local_map:
229- if not hasattr(local_map, "get"):
230- local_map = dict()
231- self.__map = local_map
232- asyncore.dispatcher_with_send.__init__(self, map=self.__map)
233- else:
234- self.__map = None
235- asyncore.dispatcher_with_send.__init__(self)
236-
237- self.auth_complete_cb = auth_complete_cb
238+
239+class ProcessAnInvitationFactory(ServerFactory):
240+ """Hold configuration values for all the connections, and fire off a
241+ protocol to handle the data sent and received."""
242+
243+ protocol = ProcessAnInvitationProtocol
244+
245+ def __init__(self, get_secret_from_user, on_close):
246+ self.logging = logging.getLogger(self.__class__.__name__)
247+ self.get_secret_from_user = get_secret_from_user
248 self.on_close = on_close
249-
250- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
251- self.connect((host, port))
252-
253- self.recv_buffers = []
254-
255- self.logging.debug("secret is %r", secret_message)
256-
257- h = hash()
258- h.update(secret_message)
259- self.send(h.hexdigest() + public_seed + message_terminator)
260-
261- h = hash()
262- h.update(public_seed)
263- h.update(secret_message)
264+
265+
266+class SendInvitationProtocol(basic.LineReceiver):
267+ """Narrative "Bob"."""
268+
269+ def __init__(self):
270+ """Initialize."""
271+ self.logging = logging.getLogger(self.__class__.__name__)
272+ self.logging.debug("initialized")
273+
274+ def connectionMade(self):
275+ """Fire when a connection is made to the listener. No obligation
276+ here."""
277+ self.logging.debug("connection made")
278+
279+ h = hash()
280+ h.update(self.factory.secret_message)
281+ self.sendLine(h.hexdigest() + self.factory.public_seed)
282+
283+ h = hash()
284+ h.update(self.factory.public_seed)
285+ h.update(self.factory.secret_message)
286 self.expected_hash_of_secret = h.hexdigest()
287
288- def process(self):
289- """How we jump into asyncore processing. Call this occasionally."""
290- asyncore.loop(0.01, False, self.__map, 3)
291- return True
292-
293- def handle_connect(self):
294- self.logging.debug("connecting")
295-
296- def handle_read(self):
297- message = receive_up_to_char(self, self.recv_buffers)
298- if message is None:
299- return
300- self.logging.debug("received %r", message)
301+
302+ def lineReceived(self, message):
303+ """Handler for receipt of a message from the Alice end."""
304 if message == self.expected_hash_of_secret:
305- remote_host = self.socket.getpeername()[0]
306+ remote_host = self.transport.getPeer().host
307 try:
308 remote_hostname = get_remote_hostname(remote_host)
309 except dbus.exceptions.DBusException:
310 remote_hostname = None
311- self.auth_complete_cb(remote_hostname, "(port)", "(info)")
312+ self.factory.auth_complete_cb(remote_hostname, "(port)", "(info)")
313+ self.transport.loseConnection()
314 else:
315 self.logging.warn("Expected %r from invitation.",
316 self.expected_hash_of_secret)
317
318- def handle_close(self):
319- self.logging.debug("closing")
320- self.on_close()
321- self.close()
322-
323-# def handle_error(self, *args):
324-# self.logging.error("%s", args)
325-
326-
327-
328-if __name__ == "__main__":
329- logging.basicConfig(level=logging.DEBUG, format=
330- "%(asctime)s [%(process)d] %(levelname)s - %(name)s %(message)s")
331-
332+ def connectionLost(self):
333+ """When a connected socked is broken, this is fired."""
334+ self.logging.info("connection lost.")
335+ basic.LineReceiver.connectionLost(self)
336+
337+
338+class SendInvitationFactory(ReconnectingClientFactory):
339+ """Hold configuration values for all the connections, and fire off a
340+ protocol to handle the data sent and received."""
341+
342+ protocol = SendInvitationProtocol
343+
344+ def __init__(self, auth_complete_cb, secret_message, public_seed,
345+ on_close):
346+ self.logging = logging.getLogger(self.__class__.__name__)
347+ self.auth_complete_cb = auth_complete_cb
348+ self.secret_message = secret_message
349+ self.public_seed = public_seed
350+ self.on_close = on_close
351+ self.logging.debug("initialized")
352+
353+ def close(self):
354+ """Called from the UI when a window is destroyed and we do not need
355+ this connection any more."""
356+ self.logging.warn("close not handled properly") # FIXME
357+
358+ def clientConnectionFailed(self, connector, reason):
359+ """When we fail to connect to the listener, this is fired."""
360+ self.logging.warn("connect failed. %s", reason)
361+ ReconnectingClientFactory.clientConnectionFailed(self, connector,
362+ reason)
363+
364+ def clientConnectionLost(self, connector, reason):
365+ """When a connected socked is broken, this is fired."""
366+ self.logging.info("connection lost. %s", reason)
367+ ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
368+
369+
370+def start_send_invitation(host, port, auth_complete_cb, secret_message,
371+ public_seed, on_close):
372+ """Instantiate the factory to hold configuration data about sending an
373+ invitation and let the reactor add it to its event-handling loop by way of
374+ starting a TCP connection."""
375+ factory = SendInvitationFactory(auth_complete_cb, secret_message,
376+ public_seed, on_close)
377+ reactor.connectTCP(host, port, factory)
378+
379+ return factory
380
381=== modified file 'desktopcouch/pair/pair.py'
382--- desktopcouch/pair/pair.py 2009-07-08 17:48:11 +0000
383+++ desktopcouch/pair/pair.py 2009-07-10 22:13:01 +0000
384@@ -52,6 +52,11 @@
385 import random
386 import cgi
387
388+from twisted.internet import gtk2reactor
389+gtk2reactor.install()
390+from twisted.internet.reactor import run as run_program
391+from twisted.internet.reactor import stop as stop_program
392+
393 import pygtk
394 pygtk.require('2.0')
395 import gtk
396@@ -64,7 +69,6 @@
397 discovery_tool_version = "1"
398
399
400-
401 def generate_secret(length=7):
402 """Create a secret that is easy to write and read. We hate ambiguity and
403 errors."""
404@@ -124,7 +128,6 @@
405
406 def destroy(self, widget, data=None):
407 """The window is destroyed."""
408- gobject.source_remove(self.inviter_loop)
409 self.inviter.close()
410
411 def auth_completed(self, remote_host, remote_port, remote_info):
412@@ -154,10 +157,9 @@
413 self.secret_message = secret_message
414 self.public_seed = generate_secret()
415
416- self.inviter = network_io.SendInvitation(hostname, port,
417+ self.inviter = network_io.start_send_invitation(hostname, port,
418 self.auth_completed, self.secret_message, self.public_seed,
419- self.on_close, local_map=False)
420- self.inviter_loop = gobject.idle_add(self.inviter.process)
421+ self.on_close)
422
423 top_vbox = gtk.VBox()
424 self.window.add(top_vbox)
425@@ -173,6 +175,12 @@
426 top_vbox.pack_start(text, False, False, 10)
427 text.show()
428
429+ cancel_button = gtk.Button(stock=gtk.STOCK_CANCEL)
430+ cancel_button.set_border_width(3)
431+ cancel_button.connect("clicked", lambda widget: self.window.destroy())
432+ top_vbox.pack_end(cancel_button, False, False, 10)
433+ cancel_button.show()
434+
435 self.window.show_all()
436
437
438@@ -293,8 +301,6 @@
439 if self.advertisement is not None:
440 self.advertisement.die()
441 self.listener.close()
442- if self.listener_loop is not None:
443- gobject.source_remove(self.listener_loop)
444
445 def receive_invitation_challenge(self, remote_address, is_secret_valid,
446 send_secret):
447@@ -317,9 +323,7 @@
448
449 self.listener = network_io.ListenForInvitations(
450 self.receive_invitation_challenge,
451- lambda: self.window.destroy(),
452- local_map=False)
453- self.listener_loop = gobject.idle_add(self.listener.process)
454+ lambda: self.window.destroy())
455
456 listen_port = self.listener.get_local_port()
457
458@@ -433,7 +437,7 @@
459
460 def destroy(self, widget, data=None):
461 """The window was destroyed."""
462- gtk.main_quit()
463+ stop_program()
464
465 def create_pick_pane(self, container):
466 """Set up the pane that contains what's necessary to choose an
467@@ -633,6 +637,7 @@
468 def main(args):
469 """Start execution."""
470 global pick_or_listen # pylint: disable-msg=W0601
471+
472 logging.basicConfig(level=logging.DEBUG, format=
473 "%(asctime)s [%(process)d] %(name)s:%(levelname)s: %(message)s")
474
475@@ -641,7 +646,7 @@
476 try:
477 logging.debug("starting couchdb pairing tool")
478 pick_or_listen = PickOrListen()
479- return gtk.main()
480+ return run_program()
481 finally:
482 logging.debug("exiting couchdb pairing tool")
483
484
485=== modified file 'desktopcouch/pair/tests/test_network_io.py'
486--- desktopcouch/pair/tests/test_network_io.py 2009-07-08 17:48:11 +0000
487+++ desktopcouch/pair/tests/test_network_io.py 2009-07-10 22:13:01 +0000
488@@ -14,15 +14,30 @@
489 # You should have received a copy of the GNU Lesser General Public License
490 # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>.
491
492+
493+import pygtk
494+pygtk.require('2.0')
495+import gtk
496+
497+from twisted.internet import gtk2reactor
498+gtk2reactor.install()
499+from twisted.internet import reactor, task
500+
501+
502 if __name__ == "__main__":
503 import sys, os
504 sys.path.append(os.pardir)
505- from couchdb_pairing.network_io import SendInvitation, ListenForInvitations
506+ from couchdb_pairing.network_io import start_send_invitation, ListenForInvitations
507+ from couchdb_pairing.dbus_io import get_local_hostname
508 else:
509- from ..couchdb_pairing.network_io import SendInvitation, ListenForInvitations
510+ from ..couchdb_pairing.network_io import start_send_invitation, ListenForInvitations
511+ from ..couchdb_pairing.dbus_io import get_local_hostname
512
513 import unittest
514
515+local_hostname = ".".join(get_local_hostname())
516+
517+
518 class TestNetworkIO(unittest.TestCase):
519
520 def setUp(self):
521@@ -47,11 +62,9 @@
522 listener_complete_auth()
523
524 def listener_close_socket():
525- print "LISTNER!\n\n"
526 self._listener_socket_state = "closed"
527
528 def inviter_close_socket():
529- print "INVITER!\n\n"
530 self._inviter_socket_state = "closed"
531
532 def inviter_complete_auth(a, b, c):
533@@ -67,16 +80,21 @@
534
535 def inviter_display_message(*args):
536 """Show message to user."""
537- print "display message from inviter:", args
538+ logging.info("display message from inviter: %s", args)
539
540- self.inviter = SendInvitation("localhost", listener_port,
541+ self.inviter = start_send_invitation(local_hostname, listener_port,
542 inviter_complete_auth, secret, "seed", inviter_close_socket)
543
544- for i in xrange(50):
545+ def exit_on_success():
546 if self._listener_auth_completed and self._inviter_auth_completed:
547- break
548- self.listener.process()
549- self.inviter.process()
550+ reactor.stop()
551+ task.LoopingCall(exit_on_success).start(1.0)
552+
553+ def exit_on_timeout():
554+ reactor.stop()
555+ reactor.callLater(30, exit_on_timeout)
556+
557+ reactor.run()
558
559 self.assertTrue(self._listener_auth_completed)
560 self.assertTrue(self._inviter_auth_completed)
561@@ -117,14 +135,12 @@
562
563 def inviter_display_message(*args):
564 """Show message to user."""
565- print "display message from inviter:", args
566+ logging.info("display message from inviter: %s", args)
567
568- self.inviter = SendInvitation("localhost", listener_port,
569+ self.inviter = start_send_invitation(local_hostname, listener_port,
570 inviter_complete_auth, secret, "seed", inviter_close_socket)
571
572- for i in xrange(50):
573- self.listener.process()
574- self.inviter.process()
575+ # FIXME
576
577 self.assertFalse(self._listener_auth_completed)
578 self.assertFalse(self._inviter_auth_completed)
579@@ -147,11 +163,9 @@
580
581
582 def listener_close_socket():
583- print "closed listener"
584 self._listener_socket_state = "closed"
585
586 def inviter_close_socket():
587- print "closed inviter"
588 self._inviter_socket_state = "closed"
589
590 def inviter_complete_auth(a, b, c):
591@@ -167,16 +181,12 @@
592
593 def inviter_display_message(*args):
594 """Show message to user."""
595- print "display message from inviter:", args
596+ logging.info("display message from inviter: %s", args)
597
598- self.inviter = SendInvitation("localhost", listener_port,
599+ self.inviter = start_send_invitation(local_hostname, listener_port,
600 inviter_complete_auth, secret, "seed", inviter_close_socket)
601
602- for i in xrange(500):
603- if self._inviter_socket_state == "closed" and self._listener_socket_state == "closed":
604- break
605- self.listener.process()
606- self.inviter.process()
607+ # FIXME
608
609 self.assertFalse(self._listener_auth_completed)
610 self.assertFalse(self._inviter_auth_completed)
611@@ -185,8 +195,6 @@
612 self.assertEqual(self._inviter_socket_state, "closed")
613
614
615-
616-
617 if __name__ == "__main__":
618 import logging
619 logging.basicConfig(level=logging.ERROR)

Subscribers

People subscribed via source and target branches