Merge lp:~scode/duplicity/bug-387102 into lp:~duplicity-team/duplicity/trunk

Proposed by Kenneth Loafman
Status: Merged
Approved by: Kenneth Loafman
Approved revision: 528
Merged at revision: not available
Proposed branch: lp:~scode/duplicity/bug-387102
Merge into: lp:~duplicity-team/duplicity/trunk
Diff against target: None lines
To merge this branch: bzr merge lp:~scode/duplicity/bug-387102
Reviewer Review Type Date Requested Status
Peter Schuller Approve
Kenneth Loafman Approve
Review via email: mp+7584@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Kenneth Loafman (kenneth-loafman) wrote :

Kenneth Loafman wrote:
> Kenneth Loafman has proposed merging lp:~scode/duplicity/bug-387102 into lp:duplicity.
>
> Requested reviews:
> duplicity-team (duplicity-team)
>
This applies to the new trunk now owned by the duplicity-team as opposed
to vcs-imports. I could not get rid of the vcs-imports branches, but I
did manage to make the trunk and webstuff series ours.

 review: approve

review: Approve
lp:~scode/duplicity/bug-387102 updated
528. By Peter Schuller

* merge lp:~mterry/duplicity/log-upload-events since I created conflicts with my
  changes

Revision history for this message
Peter Schuller (scode) wrote :

Merged lp:~mterry/duplicity/log-upload-events since we touched the same code.

Revision history for this message
Peter Schuller (scode) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'duplicity/asyncscheduler.py'
--- duplicity/asyncscheduler.py 2009-04-02 14:47:12 +0000
+++ duplicity/asyncscheduler.py 2009-06-16 21:16:34 +0000
@@ -70,15 +70,15 @@
70 (concurrency)))70 (concurrency)))
71 assert concurrency >= 0, "%s concurrency level must be >= 0" % (self.__class__.__name__,)71 assert concurrency >= 0, "%s concurrency level must be >= 0" % (self.__class__.__name__,)
7272
73 self.__failed = False # has at least one task failed so far?73 self.__failed = False # has at least one task failed so far?
74 self.__failed_waiter = None # when __failed, the waiter of the first task that failed74 self.__failed_waiter = None # when __failed, the waiter of the first task that failed
75 self.__concurrency = concurrency75 self.__concurrency = concurrency
76 self.__curconc = 0 # current concurrency level (number of workers)76 self.__worker_count = 0 # number of active workers
77 self.__workers = 0 # number of active workers77 self.__waiter_count = 0 # number of threads waiting to submit work
78 self.__q = []78 self.__barrier = False # barrier currently in effect?
79 self.__cv = threading.Condition() # for simplicity, we use a single cv with its lock79 self.__cv = threading.Condition() # for simplicity, we use a single cv with its lock
80 # for everything, even if the resulting notifyAll():s80 # for everything, even if the resulting notifyAll():s
81 # are not technically efficient.81 # are not technically efficient.
8282
83 if concurrency > 0:83 if concurrency > 0:
84 require_threading("concurrency > 0 (%d)" % (concurrency,))84 require_threading("concurrency > 0 (%d)" % (concurrency,))
@@ -99,8 +99,7 @@
99 # be popped).99 # be popped).
100 if self.__concurrency > 0:100 if self.__concurrency > 0:
101 def _insert_barrier():101 def _insert_barrier():
102 self.__q.append(None) # None in queue indicates barrier102 self.__barrier = True
103 self.__cv.notifyAll()
104103
105 with_lock(self.__cv, _insert_barrier)104 with_lock(self.__cv, _insert_barrier)
106105
@@ -162,7 +161,7 @@
162 progress or may happen subsequently to the call to wait().161 progress or may happen subsequently to the call to wait().
163 """162 """
164 def _wait():163 def _wait():
165 interruptably_wait(self.__cv, lambda: self.__curconc == 0 and len(self.__q) == 0)164 interruptably_wait(self.__cv, lambda: self.__worker_count == 0 and self.__waiter_count == 0)
166165
167 with_lock(self.__cv, _wait)166 with_lock(self.__cv, _wait)
168167
@@ -184,119 +183,70 @@
184 def __run_asynchronously(self, fn, params):183 def __run_asynchronously(self, fn, params):
185 (waiter, caller) = async_split(lambda: fn(*params))184 (waiter, caller) = async_split(lambda: fn(*params))
186185
187 def _sched():186 def check_pending_failure():
188 if self.__failed:187 if self.__failed:
189 # note that __start_worker() below may block waiting on
190 # task execution; if so we will be one task scheduling too
191 # late triggering the failure. this should be improved.
192 log.Info("%s: %s" % (self.__class__.__name__,188 log.Info("%s: %s" % (self.__class__.__name__,
193 _("a previously scheduled task has failed; "189 _("a previously scheduled task has failed; "
194 "propagating the result immediately")))190 "propagating the result immediately")))
195 self.__waiter()191 self.__failed_waiter()
196 raise AssertionError("%s: waiter should have raised an exception; "192 raise AssertionError("%s: waiter should have raised an exception; "
197 "this is a bug" % (self.__class__.__name__,))193 "this is a bug" % (self.__class__.__name__,))
198194
199 self.__q.append(caller)195 def wait_for_and_register_launch():
200196 check_pending_failure() # raise on fail
201 free_workers = self.__workers - self.__curconc197 while self.__worker_count >= self.__concurrency or self.__barrier:
202198 if self.__worker_count == 0:
199 assert self.__barrier, "barrier should be in effect"
200 self.__barrier = False
201 self.__cv.notifyAll()
202 else:
203 self.__waiter_count += 1
204 self.__cv.wait()
205 self.__waiter_count -= 1
206
207 check_pending_failure() # raise on fail
208
209 self.__worker_count += 1
203 log.Debug("%s: %s" % (self.__class__.__name__,210 log.Debug("%s: %s" % (self.__class__.__name__,
204 gettext.ngettext("tasks queue length post-schedule: %d task",211 _("active workers = %d") % (self.__worker_count,)))
205 "tasks queue length post-schedule: %d tasks",212
206 len(self.__q)) % len(self.__q)))213 # simply wait for an OK condition to start, then launch our worker. the worker
207214 # never waits on us, we just wait for them.
208 assert free_workers >= 0215 with_lock(self.__cv, wait_for_and_register_launch)
209216
210 if free_workers == 0:217 self.__start_worker(caller)
211 self.__start_worker()
212
213 with_lock(self.__cv, _sched)
214218
215 return waiter219 return waiter
216220
217 def __start_worker(self):221 def __start_worker(self, caller):
218 """222 """
219 Start a new worker; self.__cv must be acquired.223 Start a new worker.
220 """224 """
221 while self.__workers >= self.__concurrency:225 def trampoline():
222 log.Info("%s: %s" % (self.__class__.__name__,226 try:
223 gettext.ngettext("no free worker slots (%d worker, and maximum "227 self.__execute_caller(caller)
224 "concurrency is %d) - waiting for a background "228 finally:
225 "task to complete",229 def complete_worker():
226 "no free worker slots (%d workers, and maximum "230 self.__worker_count -= 1
227 "concurrency is %d) - waiting for a background "231 log.Debug("%s: %s" % (self.__class__.__name__,
228 "task to complete", self.__workers) %232 _("active workers = %d") % (self.__worker_count,)))
229 (self.__workers, self.__concurrency)))233 self.__cv.notifyAll()
230 self.__cv.wait()234 with_lock(self.__cv, complete_worker)
231235
232 self.__workers += 1236 thread.start_new_thread(trampoline, ())
233237
234 thread.start_new_thread(lambda: self.__worker_thread(), ())238 def __execute_caller(self, caller):
235239 # The caller half that we get here will not propagate
236 def __worker_thread(self):240 # errors back to us, but rather propagate it back to the
237 """241 # "other half" of the async split.
238 The worker thread main loop.242 succeeded, waiter = caller()
239 """243 if not succeeded:
240 # Each worker loops around waiting for work. The exception is244 def _signal_failed():
241 # when there is no work to do right now and there is no work245 if not self.__failed:
242 # scheduled - when this happens, all workers shut down. This246 self.__failed = True
243 # is to remove the need for explicit shutdown by calling code.247 self.__failed_waiter = waiter
244
245 done = [False] # use list for destructive mod. in closure
246 while not done[0]:
247 def _prepwork():
248 def workorbarrier_pending():
249 return (len(self.__q) > 0)
250 def tasks_running():
251 return (self.__curconc > 0)
252 def barrier_pending():
253 return (workorbarrier_pending() and self.__q[0] is None)
254
255 while (not workorbarrier_pending()) or \
256 (barrier_pending() and tasks_running()):
257 if (not workorbarrier_pending()) and (not tasks_running()):
258 # no work, and no tasks in progress - quit as per comments above
259 done[0] = True
260 self.__workers -= 1
261 self.__cv.notifyAll()248 self.__cv.notifyAll()
262 return None249 with_lock(self.__cv, _signal_failed)
263 self.__cv.wait()250
264251 log.Info("%s: %s" % (self.__class__.__name__,
265 # there is work to do252 _("task execution done (success: %s)") % succeeded))
266 work = self.__q.pop(0)
267
268 log.Debug("%s: %s" % (self.__class__.__name__,
269 gettext.ngettext("tasks queue length post-grab: %d task",
270 "tasks queue length post-grab: %d tasks",
271 len(self.__q)) % len(self.__q)))
272
273 if work: # real work, not just barrier
274 self.__curconc += 1
275 self.__cv.notifyAll()
276
277 return work
278
279 work = with_lock(self.__cv, _prepwork)
280
281 if work:
282 # the actual work here is going to be the caller half
283 # of an async_split() result, which will not propagate
284 # errors back to us, but rather propagate it back to
285 # the "other half".
286 succeeded, waiter = work()
287 if not succeeded:
288 def _signal_failed():
289 if not self.__failed:
290 self.__failed = True
291 self.__waiter = waiter
292 with_lock(self.__cv, _signal_failed)
293
294 log.Info("%s: %s" % (self.__class__.__name__,
295 _("task execution done (success: %s)") % succeeded))
296
297 def _postwork():
298 self.__curconc -= 1
299 self.__cv.notifyAll()
300
301 with_lock(self.__cv, _postwork)
302

Subscribers

People subscribed via source and target branches