Merge lp:~james-w/aptdaemon/fix-455861 into lp:aptdaemon

Proposed by James Westby
Status: Merged
Merge reported by: Michael Vogt
Merged at revision: not available
Proposed branch: lp:~james-w/aptdaemon/fix-455861
Merge into: lp:aptdaemon
Diff against target: 102 lines (+44/-30)
1 file modified
aptdaemon/debconf.py (+44/-30)
To merge this branch: bzr merge lp:~james-w/aptdaemon/fix-455861
Reviewer Review Type Date Requested Status
Aptdaemon Developers Pending
Review via email: mp+21344@code.launchpad.net

Description of the change

This implements locking as the io events are delivered asynchronously.

It ensures that if you call .stop() it won't close the socket just before
something else uses it.

I think that exiting quietly if the socket is already closed is the
correct behaviour.

A little testing here seems to show that this at least stops the crashes,
but it's a race condition, so it's hard to know for sure.

Thanks,

James

To post a comment you must log in.
Revision history for this message
Sebastian Heinlein (glatzor) wrote :

Actually you don't need to introduce a lock here, since the event sources run in the same main loop.

I think that the root of the problem is iterating on the main loop after closing the socket, which will result in processing already queued io events which will fail.

Furthermore I increased the priority of the hangup callback, so that it is preferred to copy_stdout.

See bzr diff -r 340..343

=== modified file 'aptdaemon/debconf.py'
--- aptdaemon/debconf.py 2009-10-02 11:27:03 +0000
+++ aptdaemon/debconf.py 2010-03-17 05:59:24 +0000
@@ -89,21 +89,12 @@
         """Stop listening on the socket."""
         logging.debug("debconf.stop()")
         self.socket.close()
- # ensure outstanding gio messages are processed
- context = glib.main_context_default()
- while context.pending():
- context.iteration()
         gobject.source_remove(self._listener_id)
         self._listener_id = None

     def _accept_connection(self, source, condition):
         """Callback for new connections of the passthrough frontend."""
         log.debug("New passthrough connection")
- # ensure outstanding gio messages are processed (to ensure
- # that _hangup was run on the previous connection, see LP: #432607)
- context = glib.main_context_default()
- while context.pending():
- context.iteration()
         if self._active_conn:
             raise IOError, "Multiple debconf connections not supported"
         conn, addr = source.accept()
@@ -112,15 +103,18 @@
                                        stdin=subprocess.PIPE,
                                        stdout=subprocess.PIPE,
                                        env=self._get_debconf_env())
- w = gobject.io_add_watch(conn, gobject.IO_IN|gobject.IO_HUP|gobject.IO_ERR,
- self._copy_conn, self.helper.stdin)
- self._watch_ids.append(w)
- w= gobject.io_add_watch(self.helper.stdout, gobject.IO_IN,
- self._copy_stdout, conn)
- self._watch_ids.append(w)
- w = gobject.io_add_watch(self.helper.stdout, gobject.IO_HUP|gobject.IO_ERR,
- self._hangup)
- self._watch_ids.append(w)
+ wid = gobject.io_add_watch(conn,
+ gobject.IO_IN|gobject.IO_HUP|gobject.IO_ERR,
+ self._copy_conn, self.helper.stdin)
+ self._watch_ids.append(wid)
+ wid = gobject.io_add_watch(self.helper.stdout, gobject.IO_IN,
+ self._copy_stdout, conn)
+ self._watch_ids.append(wid)
+ wid = gobject.io_add_watch(self.helper.stdout,
+ gobject.IO_HUP|gobject.IO_ERR,
+ self._hangup,
+ priority=gobject.PRIORITY_HIGH)
+ self._watch_ids.append(wid)
         return True

Revision history for this message
Michael Vogt (mvo) wrote :

I think we still need some synchronization here because stop() maybe called by the aptdaemon when there are still unprocessed io events. We also need to ensure that all pending io events are processed because otherwise we may kill debconf-communicate (in _hangup) when it has not processed all messages yet, this could let to "forgetting" some of the debconf communication because it never quite made it to the socket.

Revision history for this message
Sebastian Heinlein (glatzor) wrote :

You could reduce the patch to the self._socket_closed setting/checking.

Revision history for this message
Michael Vogt (mvo) wrote :

This was fixed in trunk/ (and 0.2.x) in a slightly different way. Many thnaks for the initial patch.

Revision history for this message
Michael Vogt (mvo) wrote :

Set to merge (even if not the exact patch got merged)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'aptdaemon/debconf.py'
--- aptdaemon/debconf.py 2009-10-02 11:27:03 +0000
+++ aptdaemon/debconf.py 2010-03-15 02:18:14 +0000
@@ -33,6 +33,7 @@
33import socket33import socket
34import subprocess34import subprocess
35import tempfile35import tempfile
36import threading
3637
37import gobject38import gobject
3839
@@ -65,6 +66,8 @@
65 self._listener_id = None66 self._listener_id = None
66 self._active_conn = None67 self._active_conn = None
67 self._watch_ids = []68 self._watch_ids = []
69 self._socket_lock = threading.Lock()
70 self._socket_closed = False
6871
69 def _get_debconf_env(self):72 def _get_debconf_env(self):
70 """Returns a dictonary of the environment variables required by73 """Returns a dictonary of the environment variables required by
@@ -88,40 +91,51 @@
88 def stop(self):91 def stop(self):
89 """Stop listening on the socket."""92 """Stop listening on the socket."""
90 logging.debug("debconf.stop()")93 logging.debug("debconf.stop()")
91 self.socket.close()94 self._socket_lock.acquire()
92 # ensure outstanding gio messages are processed95 try:
93 context = glib.main_context_default()96 self.socket.close()
94 while context.pending():97 self._socket_closed = True
95 context.iteration()98 # ensure outstanding gio messages are processed
96 gobject.source_remove(self._listener_id)99 context = glib.main_context_default()
97 self._listener_id = None100 while context.pending():
101 context.iteration()
102 gobject.source_remove(self._listener_id)
103 self._listener_id = None
104 finally:
105 self._socket_lock.release()
98106
99 def _accept_connection(self, source, condition):107 def _accept_connection(self, source, condition):
100 """Callback for new connections of the passthrough frontend."""108 """Callback for new connections of the passthrough frontend."""
101 log.debug("New passthrough connection")109 log.debug("New passthrough connection")
102 # ensure outstanding gio messages are processed (to ensure110 self._socket_lock.acquire()
103 # that _hangup was run on the previous connection, see LP: #432607)111 try:
104 context = glib.main_context_default()112 # ensure outstanding gio messages are processed (to ensure
105 while context.pending():113 # that _hangup was run on the previous connection, see LP: #432607)
106 context.iteration()114 context = glib.main_context_default()
107 if self._active_conn:115 while context.pending():
108 raise IOError, "Multiple debconf connections not supported"116 context.iteration()
109 conn, addr = source.accept()117 if self._socket_closed:
110 self._active_conn = conn118 return True
111 self.helper = subprocess.Popen(["debconf-communicate"],119 if self._active_conn:
112 stdin=subprocess.PIPE,120 raise IOError, "Multiple debconf connections not supported"
113 stdout=subprocess.PIPE,121 conn, addr = source.accept()
114 env=self._get_debconf_env())122 self._active_conn = conn
115 w = gobject.io_add_watch(conn, gobject.IO_IN|gobject.IO_HUP|gobject.IO_ERR,123 self.helper = subprocess.Popen(["debconf-communicate"],
116 self._copy_conn, self.helper.stdin)124 stdin=subprocess.PIPE,
117 self._watch_ids.append(w)125 stdout=subprocess.PIPE,
118 w= gobject.io_add_watch(self.helper.stdout, gobject.IO_IN,126 env=self._get_debconf_env())
119 self._copy_stdout, conn)127 w = gobject.io_add_watch(conn, gobject.IO_IN|gobject.IO_HUP|gobject.IO_ERR,
120 self._watch_ids.append(w)128 self._copy_conn, self.helper.stdin)
121 w = gobject.io_add_watch(self.helper.stdout, gobject.IO_HUP|gobject.IO_ERR,129 self._watch_ids.append(w)
122 self._hangup)130 w= gobject.io_add_watch(self.helper.stdout, gobject.IO_IN,
123 self._watch_ids.append(w)131 self._copy_stdout, conn)
124 return True132 self._watch_ids.append(w)
133 w = gobject.io_add_watch(self.helper.stdout, gobject.IO_HUP|gobject.IO_ERR,
134 self._hangup)
135 self._watch_ids.append(w)
136 return True
137 finally:
138 self._socket_lock.release()
125139
126 def _hangup(self, source, condition):140 def _hangup(self, source, condition):
127 """Callback when the debconf-communicate program exists 141 """Callback when the debconf-communicate program exists

Subscribers

People subscribed via source and target branches

to status/vote changes: