Merge lp:~scode/duplicity/bug-387102 into lp:~duplicity-team/duplicity/trunk

Proposed by Kenneth Loafman on 2009-06-17
Status: Merged
Approved by: Kenneth Loafman on 2009-06-20
Approved revision: 528
Merged at revision: not available
Proposed branch: lp:~scode/duplicity/bug-387102
Merge into: lp:~duplicity-team/duplicity/trunk
Diff against target: None lines
To merge this branch: bzr merge lp:~scode/duplicity/bug-387102
Reviewer Review Type Date Requested Status
Peter Schuller Approve on 2009-06-18
Kenneth Loafman Approve on 2009-06-17
Review via email: mp+7584@code.launchpad.net
To post a comment you must log in.
Kenneth Loafman (kenneth-loafman) wrote :

Kenneth Loafman wrote:
> Kenneth Loafman has proposed merging lp:~scode/duplicity/bug-387102 into lp:duplicity.
>
> Requested reviews:
> duplicity-team (duplicity-team)
>
This applies to the new trunk now owned by the duplicity-team as opposed
to vcs-imports. I could not get rid of the vcs-imports branches, but I
did manage to make the trunk and webstuff series ours.

 review: approve

review: Approve
lp:~scode/duplicity/bug-387102 updated on 2009-06-18
528. By Peter Schuller on 2009-06-18

* merge lp:~mterry/duplicity/log-upload-events since I created conflicts with my
  changes

Peter Schuller (scode) wrote :

Merged lp:~mterry/duplicity/log-upload-events since we touched the same code.

Peter Schuller (scode) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'duplicity/asyncscheduler.py'
2--- duplicity/asyncscheduler.py 2009-04-02 14:47:12 +0000
3+++ duplicity/asyncscheduler.py 2009-06-16 21:16:34 +0000
4@@ -70,15 +70,15 @@
5 (concurrency)))
6 assert concurrency >= 0, "%s concurrency level must be >= 0" % (self.__class__.__name__,)
7
8- self.__failed = False # has at least one task failed so far?
9- self.__failed_waiter = None # when __failed, the waiter of the first task that failed
10- self.__concurrency = concurrency
11- self.__curconc = 0 # current concurrency level (number of workers)
12- self.__workers = 0 # number of active workers
13- self.__q = []
14- self.__cv = threading.Condition() # for simplicity, we use a single cv with its lock
15- # for everything, even if the resulting notifyAll():s
16- # are not technically efficient.
17+ self.__failed = False # has at least one task failed so far?
18+ self.__failed_waiter = None # when __failed, the waiter of the first task that failed
19+ self.__concurrency = concurrency
20+ self.__worker_count = 0 # number of active workers
21+ self.__waiter_count = 0 # number of threads waiting to submit work
22+ self.__barrier = False # barrier currently in effect?
23+ self.__cv = threading.Condition() # for simplicity, we use a single cv with its lock
24+ # for everything, even if the resulting notifyAll():s
25+ # are not technically efficient.
26
27 if concurrency > 0:
28 require_threading("concurrency > 0 (%d)" % (concurrency,))
29@@ -99,8 +99,7 @@
30 # be popped).
31 if self.__concurrency > 0:
32 def _insert_barrier():
33- self.__q.append(None) # None in queue indicates barrier
34- self.__cv.notifyAll()
35+ self.__barrier = True
36
37 with_lock(self.__cv, _insert_barrier)
38
39@@ -162,7 +161,7 @@
40 progress or may happen subsequently to the call to wait().
41 """
42 def _wait():
43- interruptably_wait(self.__cv, lambda: self.__curconc == 0 and len(self.__q) == 0)
44+ interruptably_wait(self.__cv, lambda: self.__worker_count == 0 and self.__waiter_count == 0)
45
46 with_lock(self.__cv, _wait)
47
48@@ -184,119 +183,70 @@
49 def __run_asynchronously(self, fn, params):
50 (waiter, caller) = async_split(lambda: fn(*params))
51
52- def _sched():
53+ def check_pending_failure():
54 if self.__failed:
55- # note that __start_worker() below may block waiting on
56- # task execution; if so we will be one task scheduling too
57- # late triggering the failure. this should be improved.
58 log.Info("%s: %s" % (self.__class__.__name__,
59 _("a previously scheduled task has failed; "
60 "propagating the result immediately")))
61- self.__waiter()
62+ self.__failed_waiter()
63 raise AssertionError("%s: waiter should have raised an exception; "
64 "this is a bug" % (self.__class__.__name__,))
65
66- self.__q.append(caller)
67-
68- free_workers = self.__workers - self.__curconc
69-
70+ def wait_for_and_register_launch():
71+ check_pending_failure() # raise on fail
72+ while self.__worker_count >= self.__concurrency or self.__barrier:
73+ if self.__worker_count == 0:
74+ assert self.__barrier, "barrier should be in effect"
75+ self.__barrier = False
76+ self.__cv.notifyAll()
77+ else:
78+ self.__waiter_count += 1
79+ self.__cv.wait()
80+ self.__waiter_count -= 1
81+
82+ check_pending_failure() # raise on fail
83+
84+ self.__worker_count += 1
85 log.Debug("%s: %s" % (self.__class__.__name__,
86- gettext.ngettext("tasks queue length post-schedule: %d task",
87- "tasks queue length post-schedule: %d tasks",
88- len(self.__q)) % len(self.__q)))
89-
90- assert free_workers >= 0
91-
92- if free_workers == 0:
93- self.__start_worker()
94-
95- with_lock(self.__cv, _sched)
96+ _("active workers = %d") % (self.__worker_count,)))
97+
98+ # simply wait for an OK condition to start, then launch our worker. the worker
99+ # never waits on us, we just wait for them.
100+ with_lock(self.__cv, wait_for_and_register_launch)
101+
102+ self.__start_worker(caller)
103
104 return waiter
105
106- def __start_worker(self):
107- """
108- Start a new worker; self.__cv must be acquired.
109- """
110- while self.__workers >= self.__concurrency:
111- log.Info("%s: %s" % (self.__class__.__name__,
112- gettext.ngettext("no free worker slots (%d worker, and maximum "
113- "concurrency is %d) - waiting for a background "
114- "task to complete",
115- "no free worker slots (%d workers, and maximum "
116- "concurrency is %d) - waiting for a background "
117- "task to complete", self.__workers) %
118- (self.__workers, self.__concurrency)))
119- self.__cv.wait()
120-
121- self.__workers += 1
122-
123- thread.start_new_thread(lambda: self.__worker_thread(), ())
124-
125- def __worker_thread(self):
126- """
127- The worker thread main loop.
128- """
129- # Each worker loops around waiting for work. The exception is
130- # when there is no work to do right now and there is no work
131- # scheduled - when this happens, all workers shut down. This
132- # is to remove the need for explicit shutdown by calling code.
133-
134- done = [False] # use list for destructive mod. in closure
135- while not done[0]:
136- def _prepwork():
137- def workorbarrier_pending():
138- return (len(self.__q) > 0)
139- def tasks_running():
140- return (self.__curconc > 0)
141- def barrier_pending():
142- return (workorbarrier_pending() and self.__q[0] is None)
143-
144- while (not workorbarrier_pending()) or \
145- (barrier_pending() and tasks_running()):
146- if (not workorbarrier_pending()) and (not tasks_running()):
147- # no work, and no tasks in progress - quit as per comments above
148- done[0] = True
149- self.__workers -= 1
150+ def __start_worker(self, caller):
151+ """
152+ Start a new worker.
153+ """
154+ def trampoline():
155+ try:
156+ self.__execute_caller(caller)
157+ finally:
158+ def complete_worker():
159+ self.__worker_count -= 1
160+ log.Debug("%s: %s" % (self.__class__.__name__,
161+ _("active workers = %d") % (self.__worker_count,)))
162+ self.__cv.notifyAll()
163+ with_lock(self.__cv, complete_worker)
164+
165+ thread.start_new_thread(trampoline, ())
166+
167+ def __execute_caller(self, caller):
168+ # The caller half that we get here will not propagate
169+ # errors back to us, but rather propagate it back to the
170+ # "other half" of the async split.
171+ succeeded, waiter = caller()
172+ if not succeeded:
173+ def _signal_failed():
174+ if not self.__failed:
175+ self.__failed = True
176+ self.__failed_waiter = waiter
177 self.__cv.notifyAll()
178- return None
179- self.__cv.wait()
180-
181- # there is work to do
182- work = self.__q.pop(0)
183-
184- log.Debug("%s: %s" % (self.__class__.__name__,
185- gettext.ngettext("tasks queue length post-grab: %d task",
186- "tasks queue length post-grab: %d tasks",
187- len(self.__q)) % len(self.__q)))
188-
189- if work: # real work, not just barrier
190- self.__curconc += 1
191- self.__cv.notifyAll()
192-
193- return work
194-
195- work = with_lock(self.__cv, _prepwork)
196-
197- if work:
198- # the actual work here is going to be the caller half
199- # of an async_split() result, which will not propagate
200- # errors back to us, but rather propagate it back to
201- # the "other half".
202- succeeded, waiter = work()
203- if not succeeded:
204- def _signal_failed():
205- if not self.__failed:
206- self.__failed = True
207- self.__waiter = waiter
208- with_lock(self.__cv, _signal_failed)
209-
210- log.Info("%s: %s" % (self.__class__.__name__,
211- _("task execution done (success: %s)") % succeeded))
212-
213- def _postwork():
214- self.__curconc -= 1
215- self.__cv.notifyAll()
216-
217- with_lock(self.__cv, _postwork)
218-
219+ with_lock(self.__cv, _signal_failed)
220+
221+ log.Info("%s: %s" % (self.__class__.__name__,
222+ _("task execution done (success: %s)") % succeeded))

Subscribers

People subscribed via source and target branches