Merge lp:~aptdaemon-developers/aptdaemon/threading into lp:aptdaemon

Proposed by Sebastian Heinlein on 2011-10-02
Status: Merged
Merged at revision: 701
Proposed branch: lp:~aptdaemon-developers/aptdaemon/threading
Merge into: lp:aptdaemon
Diff against target: 970 lines (+252/-111)
6 files modified
NEWS (+2/-0)
aptdaemon/core.py (+182/-43)
aptdaemon/loop.py (+0/-32)
aptdaemon/progress.py (+8/-14)
aptdaemon/utils.py (+30/-0)
aptdaemon/worker.py (+30/-22)
To merge this branch: bzr merge lp:~aptdaemon-developers/aptdaemon/threading
Reviewer Review Type Date Requested Status
Aptdaemon Developers 2011-10-02 Pending
Review via email: mp+77802@code.launchpad.net

Description of the change

Moves the worker and the simulate actions to separate threads. This makes the daemon more responsive and allows to easily queue simulate and the comming PK query methods.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2011-08-27 11:37:42 +0000
3+++ NEWS 2011-10-02 05:32:23 +0000
4@@ -2,6 +2,8 @@
5
6 * Enhancements:
7
8+ - Use threading internally instead for the worker
9+
10 - Support multiarch package names like apt-get, e.g. xterm:amd64
11
12 - Store the transaction role and sender in apt's history log
13
14=== modified file 'aptdaemon/core.py'
15--- aptdaemon/core.py 2011-09-15 14:23:50 +0000
16+++ aptdaemon/core.py 2011-10-02 05:32:23 +0000
17@@ -47,24 +47,27 @@
18 import signal
19 import sys
20 import time
21+import threading
22 import uuid
23
24+from defer import inline_callbacks, return_value
25+from defer.utils import dbus_deferred_method
26 import gobject
27+gobject.threads_init()
28 import dbus.exceptions
29 import dbus.service
30 import dbus.mainloop.glib
31-import dbus.glib
32+dbus.mainloop.glib.threads_init()
33 from softwareproperties.AptAuth import AptAuth
34 import apt_pkg
35
36 from config import ConfigWriter
37 import errors
38 import enums
39-from defer import inline_callbacks, return_value
40-from defer.utils import dbus_deferred_method
41 import policykit1
42 from worker import AptWorker, DummyWorker
43-from loop import mainloop
44+from utils import locked
45+
46
47 # Setup i18n
48 _ = lambda msg: gettext.dgettext("aptdaemon", msg)
49@@ -313,6 +316,7 @@
50 connect -- if the Transaction should connect to DBus (default is True)
51 bus -- the DBus connection which should be used (defaults to system bus)
52 """
53+ self.lock = threading.Lock()
54 tid = uuid.uuid4().get_hex()
55 self.tid = "/org/debian/apt/transaction/%s" % tid
56 if connect == True:
57@@ -330,18 +334,20 @@
58 packages = ([], [], [], [], [], [])
59 if not kwargs:
60 kwargs = {}
61+ self.sender = sender
62 self.queue = queue
63 self.uid = uid
64- self.locale = dbus.String("")
65- self.allow_unauthenticated = dbus.Boolean(False)
66- self.remove_obsoleted_depends = dbus.Boolean(False)
67+ self.kwargs = kwargs
68+ # Mutable properties which need to be protected by a lock
69+ self._locale = dbus.String("")
70+ self._allow_unauthenticated = dbus.Boolean(False)
71+ self._remove_obsoleted_depends = dbus.Boolean(False)
72 self.http_proxy = dbus.String("")
73- self.terminal = dbus.String("")
74- self.debconf = dbus.String("")
75- self.kwargs = kwargs
76+ self._terminal = dbus.String("")
77+ self._debconf = dbus.String("")
78 self._translation = None
79 # The transaction which should be executed after this one
80- self.after = None
81+ self._after = None
82 self._role = dbus.String(role)
83 self._progress = dbus.Int32(0)
84 # items_done, total_items, bytes_done, total_bytes, speed, time
85@@ -359,8 +365,9 @@
86 self._required_medium = dbus.Struct(("", ""), signature="ss")
87 self._config_file_conflict = dbus.Struct(("", ""), signature="ss")
88 self._config_file_conflict_resolution = ""
89- self.cancelled = dbus.Boolean(False)
90- self.paused = dbus.Boolean(False)
91+ self._cancelled = dbus.Boolean(False)
92+ self._paused = dbus.Boolean(False)
93+ self.feedback = threading.Event()
94 self._meta_data = dbus.Dictionary(signature="sv")
95 self._download = dbus.Int64(0)
96 self._space = dbus.Int64(0)
97@@ -376,15 +383,64 @@
98 self._idle_watch = gobject.timeout_add_seconds(
99 TRANSACTION_IDLE_TIMEOUT, self._remove_from_connection_no_raise)
100 # Handle a disconnect of the client application
101- self.sender_alive = True
102+ self._sender_alive = True
103 if bus:
104 self._sender_watch = bus.watch_name_owner(sender,
105 self._sender_owner_changed)
106 else:
107 self._sender_watch = None
108- self.sender = sender
109- self.output = ""
110- self.simulated = None
111+ self._output = ""
112+ self._simulated = None
113+
114+ @locked
115+ def _get_cancelled(self):
116+ return self._cancelled
117+
118+ @locked
119+ def _set_cancelled(self, txt):
120+ self._cancelled = txt
121+
122+ cancelled = property(_get_cancelled, _set_cancelled)
123+
124+ @locked
125+ def _get_paused(self):
126+ return self._paused
127+
128+ @locked
129+ def _set_paused(self, bool):
130+ self._paused = bool
131+
132+ paused = property(_get_paused, _set_paused)
133+
134+ @locked
135+ def _get_simulated(self):
136+ return self._simulated
137+
138+ @locked
139+ def _set_simulated(self, txt):
140+ self._simulated = txt
141+
142+ simulated = property(_get_simulated, _set_simulated)
143+
144+ @locked
145+ def _get_output(self):
146+ return self._output
147+
148+ @locked
149+ def _set_output(self, txt):
150+ self._output = txt
151+
152+ output = property(_get_output, _set_output)
153+
154+ @locked
155+ def _get_sender_alive(self):
156+ return self._sender_alive
157+
158+ @locked
159+ def _set_sender_alive(self, alive):
160+ self._sender_alive = alive
161+
162+ sender_alive = property(_get_sender_alive, _set_sender_alive)
163
164 def _sender_owner_changed(self, connection):
165 """Callback if the owner of the original sender changed, e.g.
166@@ -458,9 +514,11 @@
167 raise errors.InvalidMetaDataError("The value has to be a "
168 "string: %s" % value)
169 # Merge new data into existing one:
170- self._meta_data.update(data)
171- self.PropertyChanged("MetaData", self._meta_data)
172+ with self.lock:
173+ self._meta_data.update(data)
174+ self.PropertyChanged("MetaData", self._meta_data)
175
176+ @locked
177 def _get_meta_data(self):
178 return self._meta_data
179
180@@ -468,22 +526,26 @@
181 doc="Allows client applications to store meta data "
182 "for the transaction in a dictionary.")
183
184+ @locked
185 def _set_role(self, enum):
186 if self._role != enums.ROLE_UNSET:
187 raise errors.TransactionRoleAlreadySet()
188 self._role = dbus.String(enum)
189 self.PropertyChanged("Role", self._role)
190
191+ @locked
192 def _get_role(self):
193 return self._role
194
195 role = property(_get_role, _set_role, doc="Operation type of transaction.")
196
197+ @locked
198 def _set_progress_details(self, details):
199 # items_done, total_items, bytes_done, total_bytes, speed, time
200 self._progress_details = self._convert_struct(details, "iixxdx")
201 self.PropertyChanged("ProgressDetails", self._progress_details)
202
203+ @locked
204 def _get_progress_details(self):
205 return self._progress_details
206
207@@ -493,12 +555,14 @@
208 "bytes done, total bytes, speed and "
209 "remaining time")
210
211+ @locked
212 def _set_error(self, excep):
213 self._error = excep
214 msg = self.gettext(excep.details) % excep.details_args
215 self._error_property = self._convert_struct((excep.code, msg), "ss")
216 self.PropertyChanged("Error", self._error_property)
217
218+ @locked
219 def _get_error(self):
220 return self._error
221
222@@ -506,9 +570,10 @@
223
224 def _set_exit(self, enum):
225 self.status = enums.STATUS_FINISHED
226- self._exit = dbus.String(enum)
227- self.PropertyChanged("ExitState", self._exit)
228- self.Finished(self._exit)
229+ with self.lock:
230+ self._exit = dbus.String(enum)
231+ self.PropertyChanged("ExitState", self._exit)
232+ self.Finished(self._exit)
233 if self._sender_watch:
234 self._sender_watch.cancel()
235 # Remove the transaction from the Bus after it is complete. A short
236@@ -516,15 +581,18 @@
237 gobject.timeout_add_seconds(TRANSACTION_DEL_TIMEOUT,
238 self._remove_from_connection_no_raise)
239
240+ @locked
241 def _get_exit(self):
242 return self._exit
243
244 exit = property(_get_exit, _set_exit,
245 doc="The exit state of the transaction.")
246
247+ @locked
248 def _get_download(self):
249 return self._download
250
251+ @locked
252 def _set_download(self, size):
253 self._download = dbus.Int64(size)
254 self.PropertyChanged("Download", self._download)
255@@ -532,9 +600,11 @@
256 download = property(_get_download, _set_download,
257 doc="The download size of the transaction.")
258
259+ @locked
260 def _get_space(self):
261 return self._space
262
263+ @locked
264 def _set_space(self, size):
265 self._space = dbus.Int64(size)
266 self.PropertyChanged("Space", self._space)
267@@ -542,12 +612,14 @@
268 space = property(_get_space, _set_space,
269 doc="The required disk space of the transaction.")
270
271+ @locked
272 def _set_packages(self, packages):
273 self._packages = dbus.Struct([dbus.Array(pkgs, signature="s")
274 for pkgs in packages],
275 signature="as")
276 self.PropertyChanged("Packages", self._packages)
277
278+ @locked
279 def _get_packages(self):
280 return self._packages
281
282@@ -556,9 +628,11 @@
283 "reinstalled, removed, purged, upgraded or "
284 "downgraded.")
285
286+ @locked
287 def _get_unauthenticated(self):
288 return self._unauthenticated
289
290+ @locked
291 def _set_unauthenticated(self, unauthenticated):
292 self._unauthenticated = dbus.Array(unauthenticated, signature="s")
293 self.PropertyChanged("Unauthenticated", self._unauthenticated)
294@@ -567,9 +641,11 @@
295 doc="Unauthenticated packages in this "
296 "transaction")
297
298+ @locked
299 def _get_depends(self):
300 return self._depends
301
302+ @locked
303 def _set_depends(self, depends):
304 self._depends = dbus.Struct([dbus.Array(deps, signature="s")
305 for deps in depends],
306@@ -580,9 +656,11 @@
307 doc="The additional dependencies: installs, removals, "
308 "upgrades and downgrades.")
309
310+ @locked
311 def _get_status(self):
312 return self._status
313
314+ @locked
315 def _set_status(self, enum):
316 self._status = dbus.String(enum)
317 self.PropertyChanged("Status", self._status)
318@@ -590,9 +668,11 @@
319 status = property(_get_status, _set_status,
320 doc="The status of the transaction.")
321
322+ @locked
323 def _get_status_details(self):
324 return self._status_details
325
326+ @locked
327 def _set_status_details(self, text):
328 self._status_details = get_dbus_string(text)
329 self.PropertyChanged("StatusDetails", self._status_details)
330@@ -600,9 +680,11 @@
331 status_details = property(_get_status_details, _set_status_details,
332 doc="The status message from apt.")
333
334+ @locked
335 def _get_progress(self):
336 return self._progress
337
338+ @locked
339 def _set_progress(self, percent):
340 self._progress = dbus.Int32(percent)
341 self.PropertyChanged("Progress", self._progress)
342@@ -610,9 +692,11 @@
343 progress = property(_get_progress, _set_progress,
344 doc="The progress of the transaction in percent.")
345
346+ @locked
347 def _get_progress_download(self):
348 return self._progress_download
349
350+ @locked
351 def _set_progress_download(self, progress_download):
352 self._progress_download = self._convert_struct(progress_download,
353 "sssxxs")
354@@ -626,9 +710,11 @@
355 "partially downloaded size and a status "
356 "message.")
357
358+ @locked
359 def _get_cancellable(self):
360 return self._cancellable
361
362+ @locked
363 def _set_cancellable(self, cancellable):
364 self._cancellable = dbus.Boolean(cancellable)
365 self.PropertyChanged("Cancellable", self._cancellable)
366@@ -637,9 +723,11 @@
367 doc="If it's currently allowed to cancel the "
368 "transaction.")
369
370+ @locked
371 def _get_term_attached(self):
372 return self._term_attached
373
374+ @locked
375 def _set_term_attached(self, attached):
376 self._term_attached = dbus.Boolean(attached)
377 self.PropertyChanged("TerminalAttached", self._term_attached)
378@@ -649,9 +737,11 @@
379 "attached to the dpkg call of the "
380 "transaction.")
381
382+ @locked
383 def _get_required_medium(self):
384 return self._required_medium
385
386+ @locked
387 def _set_required_medium(self, medium):
388 self._required_medium = self._convert_struct(medium, "ss")
389 self.PropertyChanged("RequiredMedium", self._required_medium)
390@@ -662,9 +752,11 @@
391 "of a required CD/DVD to install packages "
392 "from.")
393
394+ @locked
395 def _get_config_file_conflict(self):
396 return self._config_file_conflict
397
398+ @locked
399 def _set_config_file_conflict(self, prompt):
400 if prompt is None:
401 self._config_file_conflict = dbus.Struct(("", ""), signature="ss")
402@@ -739,6 +831,18 @@
403
404 # Methods
405
406+ def _get_after(self):
407+ return self._after
408+
409+ def _set_after(self, tid):
410+ self.after = tid
411+
412+ after = property(_get_after, _set_after)
413+
414+ @locked
415+ def _get_locale(self):
416+ return self._locale
417+
418 def _set_locale(self, locale_str):
419 """Set the language and encoding.
420
421@@ -755,11 +859,14 @@
422 except ValueError:
423 raise
424 else:
425- self.locale = dbus.String("%s.%s" % (lang, encoding))
426- self._translation = gettext.translation("aptdaemon",
427- fallback=True,
428- languages=[lang])
429- self.PropertyChanged("locale", self.locale)
430+ with self.lock:
431+ self._locale = dbus.String("%s.%s" % (lang, encoding))
432+ self._translation = gettext.translation("aptdaemon",
433+ fallback=True,
434+ languages=[lang])
435+ self.PropertyChanged("locale", self._locale)
436+
437+ locale = property(_get_locale, _set_locale)
438
439 @inline_callbacks
440 def _set_http_proxy(self, url, sender):
441@@ -776,6 +883,7 @@
442 self.http_proxy = dbus.String(url)
443 self.PropertyChanged("HttpProxy", self.http_proxy)
444
445+ @locked
446 def _set_remove_obsoleted_depends(self, remove_obsoleted_depends):
447 """Set the handling of the removal of automatically installed
448 dependencies which are now obsoleted.
449@@ -788,6 +896,14 @@
450 self.PropertyChanged("RemoveObsoletedDepends",
451 self.remove_obsoleted_depends)
452
453+ @locked
454+ def _get_remove_obsoleted_depends(self):
455+ return self._remove_obsoleted_depends
456+
457+ remove_obsoleted_depends = property(_get_remove_obsoleted_depends,
458+ _set_remove_obsoleted_depends)
459+
460+ @locked
461 def _set_allow_unauthenticated(self, allow_unauthenticated):
462 """Set the handling of unauthenticated packages
463
464@@ -795,8 +911,16 @@
465 allow_unauthenticated -- True to allow packages that come from a
466 repository without a valid authentication signature
467 """
468- self.allow_unauthenticated = dbus.Boolean(allow_unauthenticated)
469- self.PropertyChanged("AllowUnauthenticated", self.allow_unauthenticated)
470+ self._allow_unauthenticated = dbus.Boolean(allow_unauthenticated)
471+ self.PropertyChanged("AllowUnauthenticated",
472+ self._allow_unauthenticated)
473+
474+ @locked
475+ def _get_allow_unauthenticated(self):
476+ return self._allow_unauthenticated
477+
478+ allow_unauthenticated = property(_get_allow_unauthenticated,
479+ _set_allow_unauthenticated)
480
481 # pylint: disable-msg=C0103,C0322
482 @dbus.service.method(APTDAEMON_TRANSACTION_DBUS_INTERFACE,
483@@ -837,12 +961,12 @@
484 def _run(self, sender):
485 yield self._check_foreign_user(sender)
486 yield self._check_auth()
487- self.queue.put(self.tid)
488+ yield self.queue.put(self.tid)
489 self.status = enums.STATUS_WAITING
490 next_trans = self.after
491 while next_trans:
492 yield next_trans._check_auth()
493- self.queue.put(next_trans.tid)
494+ yield self.queue.put(next_trans.tid)
495 next_trans.status = enums.STATUS_WAITING
496 next_trans = next_trans.after
497
498@@ -921,6 +1045,7 @@
499 log_trans.debug("Setting cancel event")
500 self.cancelled = True
501 self.status = enums.STATUS_CANCELLING
502+ self.feedback.set()
503 self.paused = False
504 return
505 raise errors.AptDaemonError("Could not cancel transaction")
506@@ -944,7 +1069,7 @@
507 if self.status != enums.STATUS_SETTING_UP:
508 raise errors.TransactionAlreadyRunning()
509 yield self._check_foreign_user(sender)
510- self.queue.worker.simulate(self)
511+ yield self.queue.worker.simulate(self)
512 if self._idle_watch is not None:
513 gobject.source_remove(self._idle_watch)
514 self._idle_watch = None
515@@ -974,13 +1099,20 @@
516 try:
517 slave_fd = os.open(ttyname, os.O_RDWR | os.O_NOCTTY)
518 if os.isatty(slave_fd):
519- self.terminal = dbus.String(ttyname)
520- self.PropertyChanged("Terminal", self.terminal)
521+ with self.lock:
522+ self._terminal = dbus.String(ttyname)
523+ self.PropertyChanged("Terminal", self._terminal)
524 else:
525 raise errors.AptDaemonError("%s isn't a tty" % ttyname)
526 finally:
527 os.close(slave_fd)
528
529+ @locked
530+ def _get_terminal(self):
531+ return self._terminal
532+
533+ terminal = property(_get_terminal, _set_terminal)
534+
535 def _set_debconf(self, debconf_socket):
536 """Set the socket of the debconf proxy.
537
538@@ -1002,8 +1134,15 @@
539 raise errors.AptDaemonError("socket '%s' has to be owned by the "
540 "owner of the "
541 "transaction" % debconf_socket)
542- self.debconf = dbus.String(debconf_socket)
543- self.PropertyChanged("DebconfSocket", self.debconf)
544+ with self.lock:
545+ self._debconf = dbus.String(debconf_socket)
546+ self.PropertyChanged("DebconfSocket", self._debconf)
547+
548+ @locked
549+ def _get_debconf(self):
550+ return self._debconf
551+
552+ debconf = property(_get_debconf, _set_debconf)
553
554 # pylint: disable-msg=C0103,C0322
555 @dbus_deferred_method(APTDAEMON_TRANSACTION_DBUS_INTERFACE,
556@@ -1030,6 +1169,7 @@
557 if not self.required_medium[0] == medium:
558 raise errors.AptDaemonError("The medium '%s' isn't "
559 "requested." % medium)
560+ self.feedback.set()
561 self.paused = False
562
563 # pylint: disable-msg=C0103,C0322
564@@ -1066,6 +1206,7 @@
565 if not self.config_file_conflict[0] == config:
566 raise errors.AptDaemonError("Invalid config file: %s" % config)
567 self.config_file_conflict_resolution = answer
568+ self.feedback.set()
569 self.paused = False
570
571 @inline_callbacks
572@@ -1132,10 +1273,6 @@
573 if self.uid != uid:
574 raise errors.ForeignTransaction()
575
576- def _set_kwargs(self, kwargs):
577- """Set the kwargs which will be send to the AptWorker."""
578- self.kwargs = kwargs
579-
580 def gettext(self, msg):
581 """Translate the given message to the language of the transaction.
582 Fallback to the system default.
583@@ -1184,6 +1321,7 @@
584 log.debug("emitting queue changed")
585 self.emit("queue-changed")
586
587+ @inline_callbacks
588 def put(self, tid):
589 """Add an item to the queue."""
590 trans = self.limbo.pop(tid)
591@@ -1197,7 +1335,7 @@
592 # the transaction has been started
593 if not self.worker.trans:
594 trans.progress = 9
595- self.worker.simulate(trans)
596+ yield self.worker.simulate(trans)
597
598 if trans._idle_watch is not None:
599 gobject.source_remove(trans._idle_watch)
600@@ -1271,6 +1409,7 @@
601 signal.signal(signal.SIGQUIT, self._sigquit)
602 signal.signal(signal.SIGTERM, self._sigquit)
603 self.options = options
604+ self.mainloop = gobject.MainLoop()
605 if connect == True:
606 if bus is None:
607 bus = dbus.SystemBus()
608@@ -1362,7 +1501,7 @@
609 self._check_for_inactivity)
610 log.debug("Waiting for calls")
611 try:
612- mainloop.run()
613+ self.mainloop.run()
614 except KeyboardInterrupt:
615 self.Quit(None)
616
617@@ -1853,7 +1992,7 @@
618 """Request a shutdown of the daemon."""
619 log.info("Quitting was requested")
620 log.debug("Quitting main loop...")
621- mainloop.quit()
622+ self.mainloop.quit()
623 log.debug("Exit")
624
625 @inline_callbacks
626
627=== removed file 'aptdaemon/loop.py'
628--- aptdaemon/loop.py 2010-05-03 05:49:33 +0000
629+++ aptdaemon/loop.py 1970-01-01 00:00:00 +0000
630@@ -1,32 +0,0 @@
631-#!/usr/bin/env python
632-# -*- coding: utf-8 -*-
633-"""Main loop for aptdaemon."""
634-# Copyright (C) 2008-2009 Sebastian Heinlein <devel@glatzor.de>
635-#
636-# This program is free software; you can redistribute it and/or modify
637-# it under the terms of the GNU General Public License as published by
638-# the Free Software Foundation; either version 2 of the License, or
639-# any later version.
640-#
641-# This program is distributed in the hope that it will be useful,
642-# but WITHOUT ANY WARRANTY; without even the implied warranty of
643-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
644-# GNU General Public License for more details.
645-#
646-# You should have received a copy of the GNU General Public License along
647-# with this program; if not, write to the Free Software Foundation, Inc.,
648-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
649-
650-__author__ = "Sebastian Heinlein <devel@glatzor.de>"
651-
652-__all__ = ("mainloop", "get_main_loop")
653-
654-import gobject
655-
656-mainloop = gobject.MainLoop()
657-
658-def get_main_loop():
659- """Return the gobject main loop as a singelton."""
660- return mainloop
661-
662-# vim:ts=4:sw=4:et
663
664=== modified file 'aptdaemon/progress.py'
665--- aptdaemon/progress.py 2011-09-30 15:22:13 +0000
666+++ aptdaemon/progress.py 2011-10-02 05:32:23 +0000
667@@ -33,7 +33,6 @@
668
669 import enums
670 import lock
671-from loop import mainloop
672
673 # Required to get translatable strings extraced by xgettext
674 _ = lambda s: s
675@@ -210,9 +209,6 @@
676 len(items)) % {"files":
677 " ".join(items)}
678 self.transaction.status_details = msg
679-
680- while gobject.main_context_default().pending():
681- gobject.main_context_default().iteration()
682 return True
683
684 def start(self):
685@@ -232,8 +228,7 @@
686 self.transaction.required_medium = medium, drive
687 self.transaction.paused = True
688 self.transaction.status = enums.STATUS_WAITING_MEDIUM
689- while self.transaction.paused:
690- gobject.main_context_default().iteration()
691+ self.transaction.feedback.wait()
692 self.transaction.status = enums.STATUS_DOWNLOADING
693 if self.transaction.cancelled:
694 return False
695@@ -309,6 +304,7 @@
696 self.child_pid = pid
697 os.close(self.status_child_fd)
698 log.debug("Child pid: %s", pid)
699+ loop = gobject.MainLoop()
700 watchers = []
701 flags = gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP
702 if self.transaction.terminal:
703@@ -318,13 +314,12 @@
704 watchers.append(gobject.io_add_watch(self.master_fd, flags,
705 self._copy_io_master, terminal_fd))
706 # Monitor the child process
707- watchers.append(gobject.child_watch_add(pid, self._on_child_exit))
708+ watchers.append(gobject.child_watch_add(pid, self._on_child_exit, loop))
709 # Watch for status updates
710 watchers.append(gobject.io_add_watch(self.status_parent_fd,
711 gobject.IO_IN,
712 self._on_status_update))
713- while self._child_exit == -1:
714- gobject.main_context_default().iteration()
715+ loop.run()
716 for id in watchers:
717 gobject.source_remove(id)
718 # Restore the settings of the transaction terminal
719@@ -341,9 +336,10 @@
720 pass
721 return os.WEXITSTATUS(self._child_exit)
722
723- def _on_child_exit(self, pid, condition):
724+ def _on_child_exit(self, pid, condition, loop):
725 log.debug("Child exited: %s", condition)
726 self._child_exit = condition
727+ loop.quit()
728 return False
729
730 def _on_status_update(self, source, condition):
731@@ -397,8 +393,6 @@
732 signal.signal(signal.SIGINT, interrupt_handler)
733 # Make sure that exceptions of the child are not catched by apport
734 sys.excepthook = sys.__excepthook__
735-
736- mainloop.quit()
737 # Switch to the language of the user
738 if self.transaction.locale:
739 os.putenv("LANG", self.transaction.locale)
740@@ -496,8 +490,8 @@
741 self.transaction.config_file_conflict = (current, new)
742 self.transaction.paused = True
743 self.transaction.status = enums.STATUS_WAITING_CONFIG_FILE_PROMPT
744- while self.transaction.paused:
745- gobject.main_context_default().iteration()
746+ #FIXME: Should we only wait some time?
747+ self.transaction.feedback.wait()
748 log.debug("Sending config file answer: %s",
749 self.transaction.config_file_conflict_resolution)
750 if self.transaction.config_file_conflict_resolution == "replace":
751
752=== modified file 'aptdaemon/utils.py'
753--- aptdaemon/utils.py 2010-05-03 05:49:33 +0000
754+++ aptdaemon/utils.py 2011-10-02 05:32:23 +0000
755@@ -25,8 +25,11 @@
756 __all__ = ("deprecated",)
757
758 import functools
759+import threading
760 import warnings
761
762+from defer import Deferred
763+
764 def deprecated(func):
765 """This is a decorator which can be used to mark functions
766 as deprecated. It will result in a warning being emitted
767@@ -48,4 +51,31 @@
768 return func(*args, **kwargs)
769 return new_func
770
771+def locked(func):
772+ """Protect the called method by a lock."""
773+ def _locked(*args, **kwargs):
774+ self = args[0]
775+ with self.lock:
776+ return func(*args, **kwargs)
777+ return _locked
778+
779+def defer_to_thread(func):
780+ """Wrap the decorated message to a Deferred which will be called
781+ in a separated thread.
782+ """
783+ def _deferred_to_thread(*args, **kwargs):
784+ def __deferred_to_thread(deferred):
785+ try:
786+ result = func(*args, **kwargs)
787+ except Exception, error:
788+ deferred.errback(error)
789+ else:
790+ deferred.callback(result)
791+ deferred = Deferred()
792+ #FIXME: Would be nice to have a thread pool and a Queue here
793+ thread = threading.Thread(target=__deferred_to_thread, args=[deferred])
794+ thread.run()
795+ return deferred
796+ return _deferred_to_thread
797+
798 # vim:ts=4:sw=4:et
799
800=== modified file 'aptdaemon/worker.py'
801--- aptdaemon/worker.py 2011-09-18 06:31:09 +0000
802+++ aptdaemon/worker.py 2011-10-02 05:32:23 +0000
803@@ -1,4 +1,4 @@
804-#!/usr/bin/env python
805+#/usr/bin/env python
806 # -*- coding: utf-8 -*-
807 """Provides AptWorker which processes transactions."""
808 # Copyright (C) 2008-2009 Sebastian Heinlein <devel@glatzor.de>
809@@ -29,6 +29,7 @@
810 import sys
811 import tempfile
812 import time
813+import threading
814 import traceback
815
816 import apt
817@@ -54,6 +55,7 @@
818 DaemonDpkgInstallProgress, \
819 DaemonDpkgReconfigureProgress, \
820 DaemonDpkgRecoverProgress
821+from utils import locked, defer_to_thread
822
823 log = logging.getLogger("AptDaemon.Worker")
824
825@@ -105,6 +107,8 @@
826 self._status_frozen = None
827 self.plugins = {}
828 self._load_plugins()
829+ self.lock = threading.Lock()
830+ self.worker_thread = None
831
832 def _load_plugins(self):
833 """Load the plugins from setuptools' entry points."""
834@@ -153,10 +157,14 @@
835 transaction -- core.Transcation instance to run
836 """
837 log.info("Processing transaction %s", transaction.tid)
838- if self.trans:
839- raise Exception("There is already a running transaction")
840- self.trans = transaction
841- gobject.idle_add(self._process_transaction, transaction)
842+ with self.lock:
843+ if self.trans:
844+ raise Exception("There is already a running transaction")
845+ self.trans = transaction
846+ self.worker_thread = threading.Thread(target=self._process_transaction,
847+ args=[transaction],
848+ name="WorkerThread")
849+ self.worker_thread.start()
850
851 def _emit_transaction_done(self, trans):
852 """Emit the transaction-done signal.
853@@ -174,6 +182,7 @@
854 trans.progress = 11
855 # FIXME: Check if the transaction has been just simulated. So we could
856 # skip marking the changes a second time.
857+ self.lock.acquire()
858 try:
859 lock.wait_for_lock(trans)
860 # Prepare the package cache
861@@ -236,14 +245,15 @@
862 else:
863 trans.exit = EXIT_SUCCESS
864 finally:
865+ self.lock.release()
866 trans.progress = 100
867 self.last_action_timestamp = time.time()
868 tid = trans.tid[:]
869- self.trans = None
870+ with self.lock:
871+ self.trans = None
872 self._emit_transaction_done(trans)
873 lock.release()
874 log.info("Finished transaction %s", tid)
875- return False
876
877 def commit_packages(self, trans, install, reinstall, remove, purge, upgrade,
878 downgrade, simulate=False):
879@@ -360,7 +370,6 @@
880 "available."), pkg_ver, pkg_name)
881 elif pkg_rel:
882 self._set_candidate_release(pkg, pkg_rel)
883-
884
885 def enable_distro_comp(self, trans, component):
886 """Enable given component in the sources list.
887@@ -381,7 +390,8 @@
888 finally:
889 os.umask(old_umask)
890
891- def add_repository(self, trans, src_type, uri, dist, comps, comment, sourcesfile):
892+ def add_repository(self, trans, src_type, uri, dist, comps, comment,
893+ sourcesfile):
894 """Add given repository to the sources list.
895
896 Keyword arguments:
897@@ -444,7 +454,6 @@
898 log.info("Adding vendor key from keyserver: %s %s", keyid, keyserver)
899 trans.status = STATUS_DOWNLOADING
900 trans.progress = 101
901- last_pulse = time.time()
902 #FIXME: Use gobject.spawn_async and deferreds in the worker
903 # Alternatively we could use python-pyme directly for a better
904 # error handling. Or the --status-fd of gpg
905@@ -453,12 +462,8 @@
906 "--recv", keyid], stderr=subprocess.STDOUT,
907 stdout=subprocess.PIPE, close_fds=True)
908 while proc.poll() is None:
909- while gobject.main_context_default().pending():
910- gobject.main_context_default().iteration()
911- time.sleep(0.05)
912- if time.time() - last_pulse > 0.3:
913- trans.progress = 101
914- last_pulse = time.time()
915+ time.sleep(0.5)
916+ trans.progress = 101
917 if proc.returncode != 0:
918 stdout = unicode(proc.stdout.read(),
919 # that can return "None", in this case, just
920@@ -941,12 +946,15 @@
921 frozen_dir = tempfile.mkdtemp(prefix="aptdaemon-frozen-status")
922 shutil.copy(self._status_orig, frozen_dir)
923 self._status_frozen = os.path.join(frozen_dir, "status")
924+ self.lock.release()
925 try:
926 yield
927 finally:
928+ self.lock.acquire()
929 shutil.rmtree(frozen_dir)
930 self._status_frozen = None
931
932+ @defer_to_thread
933 def simulate(self, trans):
934 """Return the dependencies which will be installed by the transaction,
935 the content of the dpkg status file after the transaction would have
936@@ -958,8 +966,9 @@
937 log.info("Simulating trans: %s" % trans.tid)
938 trans.status = STATUS_RESOLVING_DEP
939 try:
940- trans.depends, trans.download, trans.space, \
941- trans.unauthenticated = self._simulate_helper(trans)
942+ with self.lock:
943+ trans.depends, trans.download, trans.space, \
944+ trans.unauthenticated = self._simulate_helper(trans)
945 except TransactionFailed, excep:
946 trans.error = excep
947 except Exception, excep:
948@@ -980,7 +989,6 @@
949 trans.exit = EXIT_FAILED
950 trans.progress = 100
951 self.last_action_timestamp = time.time()
952- raise trans.error
953
954 def _simulate_helper(self, trans):
955 depends = [[], [], [], [], [], [], []]
956@@ -1341,11 +1349,11 @@
957 trans.status = STATUS_FINISHED
958 self.last_action_timestamp = time.time()
959 tid = self.trans.tid[:]
960- trans = self.trans
961- self.trans = None
962+ with self.lock:
963+ trans = self.trans
964+ self.trans = None
965 self._emit_transaction_done(trans)
966 log.info("Finished transaction %s", tid)
967- return False
968
969 def simulate(self, trans):
970 depends = [[], [], [], [], [], [], []]

Subscribers

People subscribed via source and target branches

to status/vote changes: