Merge lp:~james-w/aptdaemon/fix-455861 into lp:aptdaemon

Proposed by James Westby
Status: Merged
Merge reported by: Michael Vogt
Merged at revision: not available
Proposed branch: lp:~james-w/aptdaemon/fix-455861
Merge into: lp:aptdaemon
Diff against target: 102 lines (+44/-30)
1 file modified
aptdaemon/debconf.py (+44/-30)
To merge this branch: bzr merge lp:~james-w/aptdaemon/fix-455861
Reviewer Review Type Date Requested Status
Aptdaemon Developers Pending
Review via email: mp+21344@code.launchpad.net

Description of the change

This implements locking as the io events are delivered asynchronously.

It ensures that if you call .stop() it won't close the socket just before
something else uses it.

I think that exiting quietly if the socket is already closed is the
correct behaviour.

A little testing here seems to show that this at least stops the crashes,
but it's a race condition, so it's hard to know for sure.

Thanks,

James

To post a comment you must log in.
Revision history for this message
Sebastian Heinlein (glatzor) wrote :

Actually you don't need to introduce a lock here, since the event sources run in the same main loop.

I think that the root of the problem is iterating on the main loop after closing the socket, which will result in processing already queued io events which will fail.

Furthermore I increased the priority of the hangup callback, so that it is preferred to copy_stdout.

See bzr diff -r 340..343

=== modified file 'aptdaemon/debconf.py'
--- aptdaemon/debconf.py 2009-10-02 11:27:03 +0000
+++ aptdaemon/debconf.py 2010-03-17 05:59:24 +0000
@@ -89,21 +89,12 @@
         """Stop listening on the socket."""
         logging.debug("debconf.stop()")
         self.socket.close()
- # ensure outstanding gio messages are processed
- context = glib.main_context_default()
- while context.pending():
- context.iteration()
         gobject.source_remove(self._listener_id)
         self._listener_id = None

     def _accept_connection(self, source, condition):
         """Callback for new connections of the passthrough frontend."""
         log.debug("New passthrough connection")
- # ensure outstanding gio messages are processed (to ensure
- # that _hangup was run on the previous connection, see LP: #432607)
- context = glib.main_context_default()
- while context.pending():
- context.iteration()
         if self._active_conn:
             raise IOError, "Multiple debconf connections not supported"
         conn, addr = source.accept()
@@ -112,15 +103,18 @@
                                        stdin=subprocess.PIPE,
                                        stdout=subprocess.PIPE,
                                        env=self._get_debconf_env())
- w = gobject.io_add_watch(conn, gobject.IO_IN|gobject.IO_HUP|gobject.IO_ERR,
- self._copy_conn, self.helper.stdin)
- self._watch_ids.append(w)
- w= gobject.io_add_watch(self.helper.stdout, gobject.IO_IN,
- self._copy_stdout, conn)
- self._watch_ids.append(w)
- w = gobject.io_add_watch(self.helper.stdout, gobject.IO_HUP|gobject.IO_ERR,
- self._hangup)
- self._watch_ids.append(w)
+ wid = gobject.io_add_watch(conn,
+ gobject.IO_IN|gobject.IO_HUP|gobject.IO_ERR,
+ self._copy_conn, self.helper.stdin)
+ self._watch_ids.append(wid)
+ wid = gobject.io_add_watch(self.helper.stdout, gobject.IO_IN,
+ self._copy_stdout, conn)
+ self._watch_ids.append(wid)
+ wid = gobject.io_add_watch(self.helper.stdout,
+ gobject.IO_HUP|gobject.IO_ERR,
+ self._hangup,
+ priority=gobject.PRIORITY_HIGH)
+ self._watch_ids.append(wid)
         return True

Revision history for this message
Michael Vogt (mvo) wrote :

I think we still need some synchronization here because stop() maybe called by the aptdaemon when there are still unprocessed io events. We also need to ensure that all pending io events are processed because otherwise we may kill debconf-communicate (in _hangup) when it has not processed all messages yet, this could let to "forgetting" some of the debconf communication because it never quite made it to the socket.

Revision history for this message
Sebastian Heinlein (glatzor) wrote :

You could reduce the patch to the self._socket_closed setting/checking.

Revision history for this message
Michael Vogt (mvo) wrote :

This was fixed in trunk/ (and 0.2.x) in a slightly different way. Many thnaks for the initial patch.

Revision history for this message
Michael Vogt (mvo) wrote :

Set to merge (even if not the exact patch got merged)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'aptdaemon/debconf.py'
2--- aptdaemon/debconf.py 2009-10-02 11:27:03 +0000
3+++ aptdaemon/debconf.py 2010-03-15 02:18:14 +0000
4@@ -33,6 +33,7 @@
5 import socket
6 import subprocess
7 import tempfile
8+import threading
9
10 import gobject
11
12@@ -65,6 +66,8 @@
13 self._listener_id = None
14 self._active_conn = None
15 self._watch_ids = []
16+ self._socket_lock = threading.Lock()
17+ self._socket_closed = False
18
19 def _get_debconf_env(self):
20 """Returns a dictonary of the environment variables required by
21@@ -88,40 +91,51 @@
22 def stop(self):
23 """Stop listening on the socket."""
24 logging.debug("debconf.stop()")
25- self.socket.close()
26- # ensure outstanding gio messages are processed
27- context = glib.main_context_default()
28- while context.pending():
29- context.iteration()
30- gobject.source_remove(self._listener_id)
31- self._listener_id = None
32+ self._socket_lock.acquire()
33+ try:
34+ self.socket.close()
35+ self._socket_closed = True
36+ # ensure outstanding gio messages are processed
37+ context = glib.main_context_default()
38+ while context.pending():
39+ context.iteration()
40+ gobject.source_remove(self._listener_id)
41+ self._listener_id = None
42+ finally:
43+ self._socket_lock.release()
44
45 def _accept_connection(self, source, condition):
46 """Callback for new connections of the passthrough frontend."""
47 log.debug("New passthrough connection")
48- # ensure outstanding gio messages are processed (to ensure
49- # that _hangup was run on the previous connection, see LP: #432607)
50- context = glib.main_context_default()
51- while context.pending():
52- context.iteration()
53- if self._active_conn:
54- raise IOError, "Multiple debconf connections not supported"
55- conn, addr = source.accept()
56- self._active_conn = conn
57- self.helper = subprocess.Popen(["debconf-communicate"],
58- stdin=subprocess.PIPE,
59- stdout=subprocess.PIPE,
60- env=self._get_debconf_env())
61- w = gobject.io_add_watch(conn, gobject.IO_IN|gobject.IO_HUP|gobject.IO_ERR,
62- self._copy_conn, self.helper.stdin)
63- self._watch_ids.append(w)
64- w= gobject.io_add_watch(self.helper.stdout, gobject.IO_IN,
65- self._copy_stdout, conn)
66- self._watch_ids.append(w)
67- w = gobject.io_add_watch(self.helper.stdout, gobject.IO_HUP|gobject.IO_ERR,
68- self._hangup)
69- self._watch_ids.append(w)
70- return True
71+ self._socket_lock.acquire()
72+ try:
73+ # ensure outstanding gio messages are processed (to ensure
74+ # that _hangup was run on the previous connection, see LP: #432607)
75+ context = glib.main_context_default()
76+ while context.pending():
77+ context.iteration()
78+ if self._socket_closed:
79+ return True
80+ if self._active_conn:
81+ raise IOError, "Multiple debconf connections not supported"
82+ conn, addr = source.accept()
83+ self._active_conn = conn
84+ self.helper = subprocess.Popen(["debconf-communicate"],
85+ stdin=subprocess.PIPE,
86+ stdout=subprocess.PIPE,
87+ env=self._get_debconf_env())
88+ w = gobject.io_add_watch(conn, gobject.IO_IN|gobject.IO_HUP|gobject.IO_ERR,
89+ self._copy_conn, self.helper.stdin)
90+ self._watch_ids.append(w)
91+ w= gobject.io_add_watch(self.helper.stdout, gobject.IO_IN,
92+ self._copy_stdout, conn)
93+ self._watch_ids.append(w)
94+ w = gobject.io_add_watch(self.helper.stdout, gobject.IO_HUP|gobject.IO_ERR,
95+ self._hangup)
96+ self._watch_ids.append(w)
97+ return True
98+ finally:
99+ self._socket_lock.release()
100
101 def _hangup(self, source, condition):
102 """Callback when the debconf-communicate program exists

Subscribers

People subscribed via source and target branches

to status/vote changes: