Merge lp:~allenap/launchpad/multithreaded-checkwatches into lp:launchpad

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~allenap/launchpad/multithreaded-checkwatches
Merge into: lp:launchpad
Diff against target: 600 lines (+284/-122)
5 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+3/-0)
lib/lp/bugs/doc/checkwatches.txt (+4/-4)
lib/lp/bugs/doc/externalbugtracker.txt (+97/-1)
lib/lp/bugs/scripts/checkwatches.py (+177/-114)
lib/lp/bugs/scripts/tests/test_bugimport.py (+3/-3)
To merge this branch: bzr merge lp:~allenap/launchpad/multithreaded-checkwatches
Reviewer Review Type Date Requested Status
Abel Deuring (community) code Approve
Review via email: mp+15283@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

This branch makes checkwatches update bug trackers in multiple threads. The
watches for a specific bug tracker are updated sequentially in a single
thread, but other bug trackers may be being updated concurrently in separate
threads.

Most of the changes are refactoring of or new methods for BugWatchUpdater in
checkwatches.py:

 * The existing updateBugTracker() method was renamed to _updateBugTracker().

 * The existing exception handling and oops reporting code in
   updateBugTrackers() was moved to a new updateBugTracker() method.

 * forceUpdateAll() had some identical exception handling code which was
   replaced by a call to the new updateBugTracker() method.

 * updateBugTrackers() was eviscerated and replaced with the thread setup and
   run. It calls a new _bugTrackerUpdaters() method which generates functions
   for the threads to run, each of which will update one bug tracker. These
   functions are put into a work queue from which the threads will pull.

 * A new _interactionDecorator() method was created to support running jobs in
   an interaction. Each of the functions that _bugTrackerUpdaters() yields is
   decorated with this. This is needed because interactions are specific to a
   thread.

Other work:

 * Add a --jobs option to the checkwatches.py script.

 * Tests that called updateBugTracker() now call _updateBugTracker() to avoid
   the transaction stuff.

 * Demonstrate updateBugTrackers() spawning multiple threads.

Testing:

  bin/test -vvct 'checkwatch|externalbug'

Lint:

  lib/lp/bugs/scripts/checkwatches.py
      22: [F0401] Unable to import 'lazr.lifecycle.event' (No module named
          lifecycle)

Revision history for this message
Abel Deuring (adeuring) wrote :

Hi Gavin,

great work! And thanks for answering all my somewhat paranoid questions on IRC

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt'
2--- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-10-09 20:52:12 +0000
3+++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-27 16:20:37 +0000
4@@ -138,3 +138,6 @@
5 --reset Update all the watches on the bug tracker,
6 regardless of whether or not they need
7 checking.
8+ --jobs=JOBS The number of simulataneous jobs to run, 1
9+ by default.
10+ <BLANKLINE>
11
12=== modified file 'lib/lp/bugs/doc/checkwatches.txt'
13--- lib/lp/bugs/doc/checkwatches.txt 2009-11-19 14:41:14 +0000
14+++ lib/lp/bugs/doc/checkwatches.txt 2009-11-27 16:20:37 +0000
15@@ -46,6 +46,7 @@
16 >>> print err
17 INFO creating lockfile
18 DEBUG Using a global batch size of None
19+ DEBUG Skipping updating Ubuntu Bugzilla watches.
20 DEBUG No watches to update on http://bugs.debian.org
21 DEBUG No watches to update on mailto:bugs@example.com
22 WARNING ExternalBugtracker for BugTrackerType 'SAVANE' is not known.
23@@ -53,7 +54,6 @@
24 DEBUG No watches to update on http://sourceforge.net/
25 DEBUG No watches to update on http://bugzilla.gnome.org/
26 DEBUG No watches to update on https://bugzilla.mozilla.org/
27- DEBUG Skipping updating Ubuntu Bugzilla watches.
28 INFO Time for this run: ... seconds.
29 DEBUG Removing lock file:...
30 <BLANKLINE>
31@@ -201,7 +201,7 @@
32 ... broken_get_external_bugtracker)
33 ... updater = BugWatchUpdater(transaction)
34 ... updater._login()
35- ... updater.updateBugTracker(example_bug_tracker)
36+ ... updater._updateBugTracker(example_bug_tracker)
37 ... finally:
38 ... externalbugtracker.get_external_bugtracker = (
39 ... real_get_external_bugtracker)
40@@ -223,7 +223,7 @@
41 a given ExternalBugTracker in each checkwatches run: the batch size.
42
43 We need to add some bug watches again since
44-BugWatchUpdate.updateBugTracker() automatically rolls back the
45+BugWatchUpdate._updateBugTracker() automatically rolls back the
46 transaction if something goes wrong.
47
48 >>> login('test@canonical.com')
49@@ -353,7 +353,7 @@
50
51 >>> class NonConnectingUpdater(BugWatchUpdater):
52 ...
53- ... def updateBugTracker(self, bug_tracker, batch_size):
54+ ... def _updateBugTracker(self, bug_tracker, batch_size):
55 ... # Update as many watches as the batch size says.
56 ... watches_to_update = (
57 ... bug_tracker.getBugWatchesNeedingUpdate(23)[:batch_size])
58
59=== modified file 'lib/lp/bugs/doc/externalbugtracker.txt'
60--- lib/lp/bugs/doc/externalbugtracker.txt 2009-09-22 15:22:53 +0000
61+++ lib/lp/bugs/doc/externalbugtracker.txt 2009-11-27 16:20:37 +0000
62@@ -1079,7 +1079,8 @@
63
64 >>> bug_watch_updater = NonConnectingBugWatchUpdater(
65 ... transaction, QuietFakeLogger())
66- >>> bug_watch_updater.updateBugTracker(standard_bugzilla, batch_size=2)
67+ >>> bug_watch_updater._updateBugTracker(
68+ ... standard_bugzilla, batch_size=2)
69 initializeRemoteBugDB() called: [u'5', u'6']
70 getRemoteStatus() called: u'5'
71 getRemoteStatus() called: u'6'
72@@ -1089,6 +1090,12 @@
73 allows it to be passed as a command-line option when the checkwatches script
74 is run.
75
76+Before going further, we must abort the current transaction to avoid
77+deadlock; updateBugTrackers() runs updateBugTracker() in a different
78+thread.
79+
80+ >>> transaction.abort()
81+
82 >>> from canonical.launchpad.scripts.logger import FakeLogger
83 >>> bug_watch_updater = NonConnectingBugWatchUpdater(
84 ... transaction, FakeLogger())
85@@ -1099,3 +1106,92 @@
86 initializeRemoteBugDB() called: [u'5', u'6']
87 getRemoteStatus() called: u'5'
88 getRemoteStatus() called: u'6'
89+
90+ >>> # We should log in again because updateBugTrackers() logs out.
91+ >>> login('test@canonical.com')
92+
93+By default, the updateBugTrackers() only spawns one thread, but it can
94+spawn as many as required.
95+
96+ >>> import threading
97+
98+ >>> class OutputFileForThreads:
99+ ... def __init__(self):
100+ ... self.output = {}
101+ ... self.lock = threading.Lock()
102+ ... def write(self, data):
103+ ... thread_id = id(threading.currentThread())
104+ ... self.lock.acquire()
105+ ... try:
106+ ... if thread_id in self.output:
107+ ... self.output[thread_id].append(data)
108+ ... else:
109+ ... self.output[thread_id] = [data]
110+ ... finally:
111+ ... self.lock.release()
112+
113+ >>> output_file = OutputFileForThreads()
114+
115+ >>> class ExternalBugTrackerForThreads(TestExternalBugTracker):
116+ ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked):
117+ ... print >> output_file, (
118+ ... "getModifiedRemoteBugs(\n"
119+ ... " remote_bug_ids=%r,\n"
120+ ... " last_checked=%r)" % (remote_bug_ids, last_checked))
121+ ... return [remote_bug_ids[0], remote_bug_ids[-1]]
122+ ... def getRemoteStatus(self, bug_id):
123+ ... print >> output_file, (
124+ ... "getRemoteStatus(bug_id=%r)" % bug_id)
125+ ... return 'UNKNOWN'
126+ ... def getCurrentDBTime(self):
127+ ... return None
128+
129+ >>> class BugWatchUpdaterForThreads(BugWatchUpdater):
130+ ... def _getExternalBugTrackersAndWatches(
131+ ... self, bug_trackers, bug_watches):
132+ ... return [(ExternalBugTrackerForThreads(), bug_watches)]
133+
134+ >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads(
135+ ... transaction, FakeLogger(output_file))
136+ >>> threaded_bug_watch_updater.updateBugTrackers(
137+ ... batch_size=5, num_threads=10)
138+
139+ >>> for output in sorted(output_file.output.itervalues()):
140+ ... print "".join(output),
141+ ... print '--'
142+ DEBUG No watches to update on http://bugs.example.com
143+ --
144+ DEBUG No watches to update on http://bugzilla.gnome.org/
145+ --
146+ DEBUG No watches to update on http://savannah.gnu.org/
147+ --
148+ DEBUG No watches to update on http://sourceforge.net/
149+ --
150+ DEBUG No watches to update on mailto:bugs@example.com
151+ --
152+ DEBUG Using a global batch size of 5
153+ DEBUG Skipping updating Ubuntu Bugzilla watches.
154+ --
155+ INFO Updating 2 watches for 2 bugs on http://example.com
156+ getRemoteStatus(bug_id=u'304070')
157+ getRemoteStatus(bug_id=u'3224')
158+ --
159+ INFO Updating 4 watches for 3 bugs on http://example.com
160+ getRemoteStatus(bug_id=u'123543')
161+ getRemoteStatus(bug_id=u'2000')
162+ getRemoteStatus(bug_id=u'42')
163+ --
164+ INFO Updating 5 watches for 5 bugs on http://example.com
165+ getRemoteStatus(bug_id=u'1')
166+ getRemoteStatus(bug_id=u'101')
167+ getRemoteStatus(bug_id=u'5')
168+ getRemoteStatus(bug_id=u'6')
169+ getRemoteStatus(bug_id=u'7')
170+ --
171+ INFO Updating 5 watches for 5 bugs on http://example.com
172+ getRemoteStatus(bug_id=u'280883')
173+ getRemoteStatus(bug_id=u'304014')
174+ getRemoteStatus(bug_id=u'308994')
175+ getRemoteStatus(bug_id=u'327452')
176+ getRemoteStatus(bug_id=u'327549')
177+ --
178
179=== modified file 'lib/lp/bugs/scripts/checkwatches.py'
180--- lib/lp/bugs/scripts/checkwatches.py 2009-11-19 12:49:04 +0000
181+++ lib/lp/bugs/scripts/checkwatches.py 2009-11-27 16:20:37 +0000
182@@ -6,8 +6,10 @@
183
184 from copy import copy
185 from datetime import datetime, timedelta
186+import Queue as queue
187 import socket
188 import sys
189+import threading
190 import time
191
192 import pytz
193@@ -31,7 +33,7 @@
194 ErrorReportingUtility, ScriptRequest)
195 from canonical.launchpad.webapp.interfaces import IPlacelessAuthUtility
196 from canonical.launchpad.webapp.interaction import (
197- setupInteraction, endInteraction)
198+ setupInteraction, endInteraction, queryInteraction)
199 from canonical.launchpad.webapp.publisher import canonical_url
200
201 from lp.bugs import externalbugtracker
202@@ -161,7 +163,21 @@
203
204 ACCEPTABLE_TIME_SKEW = timedelta(minutes=10)
205
206+ LOGIN = 'bugwatch@bugs.launchpad.net'
207+
208 def __init__(self, txn, log=default_log, syncable_gnome_products=None):
209+ """Initialize a BugWatchUpdater.
210+
211+ :param txn: A transaction manager on which `begin()`,
212+ `abort()` and `commit()` can be called. Additionally, it
213+ should be safe for different threads to use its methods to
214+ manage their own transactions (i.e. with thread-local
215+ storage).
216+
217+ :param log: An instance of `logging.Logger`, or something that
218+ provides a similar interface.
219+
220+ """
221 self.txn = txn
222 self.log = log
223
224@@ -171,96 +187,176 @@
225 else:
226 self._syncable_gnome_products = list(SYNCABLE_GNOME_PRODUCTS)
227
228+ self._principal = (
229+ getUtility(IPlacelessAuthUtility).getPrincipalByLogin(
230+ self.LOGIN, want_password=False))
231+
232 def _login(self):
233 """Set up an interaction as the Bug Watch Updater"""
234- auth_utility = getUtility(IPlacelessAuthUtility)
235- setupInteraction(
236- auth_utility.getPrincipalByLogin(
237- 'bugwatch@bugs.launchpad.net', want_password=False),
238- login='bugwatch@bugs.launchpad.net')
239+ setupInteraction(self._principal, login=self.LOGIN)
240
241 def _logout(self):
242 """Tear down the Bug Watch Updater Interaction."""
243 endInteraction()
244
245- def updateBugTrackers(self, bug_tracker_names=None, batch_size=None):
246- """Update all the bug trackers that have watches pending.
247-
248- If bug tracker names are specified in bug_tracker_names only
249- those bug trackers will be checked.
250+ def _interactionDecorator(self, func):
251+ """Wrap a function to ensure that it runs within an interaction.
252+
253+ If an interaction is already set up, this simply calls the
254+ function. If no interaction exists, it will set one up, call the
255+ function, then end the interaction.
256+
257+ This is intended to make sure the right thing happens whether or not
258+ the function is run in a different thread.
259 """
260- self.txn.begin()
261+ def wrapper(*args, **kwargs):
262+ if queryInteraction() is None:
263+ self._login()
264+ try:
265+ return func(*args, **kwargs)
266+ finally:
267+ self._logout()
268+ else:
269+ return func(*args, **kwargs)
270+ return wrapper
271+
272+ def _bugTrackerUpdaters(self, bug_tracker_names=None):
273+ """Yields functions that can be used to update each bug tracker."""
274+ # Set up an interaction as the Bug Watch Updater since the
275+ # notification code expects a logged in user.
276+ self._login()
277+
278 ubuntu_bugzilla = getUtility(ILaunchpadCelebrities).ubuntu_bugzilla
279 # Save the name, so we can use it in other transactions.
280 ubuntu_bugzilla_name = ubuntu_bugzilla.name
281
282- # Set up an interaction as the Bug Watch Updater since the
283- # notification code expects a logged in user.
284- self._login()
285-
286- self.log.debug("Using a global batch size of %s" % batch_size)
287-
288 if bug_tracker_names is None:
289 bug_tracker_names = [
290 bugtracker.name for bugtracker in getUtility(IBugTrackerSet)]
291- self.txn.commit()
292+
293+ def make_updater(bug_tracker_id):
294+ """Returns a function that can update the given bug tracker."""
295+ def updater(batch_size=None):
296+ run = self._interactionDecorator(self.updateBugTracker)
297+ return run(bug_tracker_id, batch_size)
298+ return updater
299+
300 for bug_tracker_name in bug_tracker_names:
301- self.txn.begin()
302- bug_tracker = getUtility(IBugTrackerSet).getByName(
303- bug_tracker_name)
304-
305- if not bug_tracker.active:
306+ if bug_tracker_name == ubuntu_bugzilla_name:
307+ # XXX: 2007-09-11 Graham Binns
308+ # We automatically ignore the Ubuntu Bugzilla
309+ # here as all its bugs have been imported into
310+ # Launchpad. Ideally we would have some means
311+ # to identify all bug trackers like this so
312+ # that hard-coding like this can be genericised
313+ # (Bug 138949).
314 self.log.debug(
315- "Updates are disabled for bug tracker at %s" %
316- bug_tracker.baseurl)
317- self.txn.abort()
318- continue
319-
320- # Save the url for later, since we might need it to report an
321- # error after a transaction has been aborted.
322- bug_tracker_url = bug_tracker.baseurl
323- try:
324- if bug_tracker_name == ubuntu_bugzilla_name:
325- # XXX: 2007-09-11 Graham Binns
326- # We automatically ignore the Ubuntu Bugzilla
327- # here as all its bugs have been imported into
328- # Launchpad. Ideally we would have some means
329- # to identify all bug trackers like this so
330- # that hard-coding like this can be genericised
331- # (Bug 138949).
332+ "Skipping updating Ubuntu Bugzilla watches.")
333+ else:
334+ bug_tracker = getUtility(IBugTrackerSet).getByName(
335+ bug_tracker_name)
336+ if bug_tracker.active:
337+ yield make_updater(bug_tracker.id)
338+ else:
339 self.log.debug(
340- "Skipping updating Ubuntu Bugzilla watches.")
341- else:
342- self.updateBugTracker(bug_tracker, batch_size)
343+ "Updates are disabled for bug tracker at %s" %
344+ bug_tracker.baseurl)
345
346- self.txn.commit()
347- except (KeyboardInterrupt, SystemExit):
348- # We should never catch KeyboardInterrupt or SystemExit.
349- raise
350- except Exception, error:
351- # If something unexpected goes wrong, we log it and
352- # continue: a failure shouldn't break the updating of
353- # the other bug trackers.
354- info = sys.exc_info()
355- properties = [
356- ('bugtracker', bug_tracker_name),
357- ('baseurl', bug_tracker_url)]
358- if isinstance(error, BugWatchUpdateError):
359- self.error(
360- str(error), properties=properties, info=info)
361- elif isinstance(error, socket.timeout):
362- self.error(
363- "Connection timed out when updating %s" %
364- bug_tracker_url,
365- properties=properties, info=info)
366- else:
367- self.error(
368- "An exception was raised when updating %s" %
369- bug_tracker_url,
370- properties=properties, info=info)
371- self.txn.abort()
372 self._logout()
373
374+ def updateBugTrackers(
375+ self, bug_tracker_names=None, batch_size=None, num_threads=1):
376+ """Update all the bug trackers that have watches pending.
377+
378+ If bug tracker names are specified in bug_tracker_names only
379+ those bug trackers will be checked.
380+
381+ The updates are run in threads, so that long running updates
382+ don't block progress. However, by default the number of
383+ threads is 1, to help with testing.
384+ """
385+ self.log.debug("Using a global batch size of %s" % batch_size)
386+
387+ # Put all the work on the queue. This is simpler than drip-feeding the
388+ # queue, and avoids a situation where a worker thread exits because
389+ # there's no work left and the feeding thread hasn't been scheduled to
390+ # add work to the queue.
391+ work = queue.Queue()
392+ for updater in self._bugTrackerUpdaters(bug_tracker_names):
393+ work.put(updater)
394+
395+ # This will be run once in each worker thread.
396+ def do_work():
397+ while True:
398+ try:
399+ job = work.get(block=False)
400+ except queue.Empty:
401+ break
402+ else:
403+ job(batch_size)
404+
405+ # Start and join the worker threads.
406+ threads = []
407+ for run in xrange(num_threads):
408+ thread = threading.Thread(target=do_work)
409+ thread.start()
410+ threads.append(thread)
411+ for thread in threads:
412+ thread.join()
413+
414+ def updateBugTracker(self, bug_tracker, batch_size):
415+ """Updates the given bug trackers's bug watches.
416+
417+ If there is an error, logs are updated, and the transaction is
418+ aborted.
419+
420+ :param bug_tracker: An IBugTracker or the ID of one, so that this
421+ method can be called from a different interaction.
422+
423+ :return: A boolean indicating if the operation was successful.
424+ """
425+ # Get the bug tracker.
426+ if isinstance(bug_tracker, (int, long)):
427+ bug_tracker = getUtility(IBugTrackerSet).get(bug_tracker)
428+
429+ # Save the name and url for later, since we might need it to report an
430+ # error after a transaction has been aborted.
431+ bug_tracker_name = bug_tracker.name
432+ bug_tracker_url = bug_tracker.baseurl
433+
434+ try:
435+ self.txn.begin()
436+ self._updateBugTracker(bug_tracker, batch_size)
437+ self.txn.commit()
438+ except (KeyboardInterrupt, SystemExit):
439+ # We should never catch KeyboardInterrupt or SystemExit.
440+ raise
441+ except Exception, error:
442+ # If something unexpected goes wrong, we log it and
443+ # continue: a failure shouldn't break the updating of
444+ # the other bug trackers.
445+ info = sys.exc_info()
446+ properties = [
447+ ('bugtracker', bug_tracker_name),
448+ ('baseurl', bug_tracker_url)]
449+ if isinstance(error, BugWatchUpdateError):
450+ self.error(
451+ str(error), properties=properties, info=info)
452+ elif isinstance(error, socket.timeout):
453+ self.error(
454+ "Connection timed out when updating %s" %
455+ bug_tracker_url,
456+ properties=properties, info=info)
457+ else:
458+ self.error(
459+ "An exception was raised when updating %s" %
460+ bug_tracker_url,
461+ properties=properties, info=info)
462+ self.txn.abort()
463+ return False
464+ else:
465+ return True
466+
467 def forceUpdateAll(self, bug_tracker_name, batch_size):
468 """Update all the watches for `bug_tracker_name`.
469
470@@ -290,50 +386,16 @@
471 bug_tracker.resetWatches()
472 self.txn.commit()
473
474- # Take a copy of the bug tracker URL. If the transaction fails
475- # later we can't refer to the baseurl attribute of the bug
476- # tracker.
477- bug_tracker_url = bug_tracker.baseurl
478-
479 # Loop over the bug watches in batches as specificed by
480 # batch_size until there are none left to update.
481 self.log.info(
482 "Updating %s watches on bug tracker '%s'" %
483 (bug_tracker.watches.count(), bug_tracker_name))
484- iteration = 0
485 has_watches_to_update = True
486 while has_watches_to_update:
487 self.txn.begin()
488- try:
489- self.updateBugTracker(bug_tracker, batch_size)
490- self.txn.commit()
491- except (KeyboardInterrupt, SystemExit):
492- # We should never catch KeyboardInterrupt or SystemExit.
493- raise
494- except Exception, error:
495- # If something unexpected goes wrong, we log it and
496- # continue: a failure shouldn't break the updating of
497- # the other bug trackers.
498- info = sys.exc_info()
499- properties = [
500- ('bugtracker', bug_tracker_name),
501- ('baseurl', bug_tracker_url)]
502- if isinstance(error, BugWatchUpdateError):
503- self.error(
504- str(error), properties=properties, info=info)
505- elif isinstance(error, socket.timeout):
506- self.error(
507- "Connection timed out when updating %s" %
508- bug_tracker_url,
509- properties=properties, info=info)
510- else:
511- self.error(
512- "An exception was raised when updating %s" %
513- bug_tracker_url,
514- properties=properties, info=info)
515- self.txn.abort()
516+ if not self.updateBugTracker(bug_tracker, batch_size):
517 break
518-
519 watches_left = bug_tracker.getBugWatchesNeedingUpdate(23).count()
520 self.log.info(
521 "%s watches left to check on bug tracker '%s'" %
522@@ -409,7 +471,7 @@
523
524 return trackers_and_watches
525
526- def updateBugTracker(self, bug_tracker, batch_size=None):
527+ def _updateBugTracker(self, bug_tracker, batch_size=None):
528 """Updates the given bug trackers's bug watches."""
529 # XXX 2007-01-18 gmb:
530 # Once we start running checkwatches more frequently we need
531@@ -1085,7 +1147,7 @@
532 "one bugtracker using this option will check all the "
533 "bugtrackers specified.")
534 self.parser.add_option(
535- '-b', '--batch-size', action='store', dest='batch_size',
536+ '-b', '--batch-size', action='store', type=int, dest='batch_size',
537 help="Set the number of watches to be checked per bug "
538 "tracker in this run. If BATCH_SIZE is 0, all watches "
539 "on the bug tracker that are eligible for checking will "
540@@ -1094,26 +1156,27 @@
541 '--reset', action='store_true', dest='update_all',
542 help="Update all the watches on the bug tracker, regardless of "
543 "whether or not they need checking.")
544+ self.parser.add_option(
545+ '--jobs', action='store', type=int, dest='jobs', default=1,
546+ help=("The number of simulataneous jobs to run, %default by "
547+ "default."))
548
549 def main(self):
550 start_time = time.time()
551
552 updater = BugWatchUpdater(self.txn, self.logger)
553
554- # Make sure batch_size is an integer or None.
555- batch_size = self.options.batch_size
556- if batch_size is not None:
557- batch_size = int(batch_size)
558-
559 if self.options.update_all and len(self.options.bug_trackers) > 0:
560 # The user has requested that we update *all* the watches
561 # for these bugtrackers
562 for bug_tracker in self.options.bug_trackers:
563- updater.forceUpdateAll(bug_tracker, batch_size)
564+ updater.forceUpdateAll(bug_tracker, self.options.batch_size)
565 else:
566 # Otherwise we just update those watches that need updating,
567 # and we let the BugWatchUpdater decide which those are.
568- updater.updateBugTrackers(self.options.bug_trackers, batch_size)
569+ updater.updateBugTrackers(
570+ self.options.bug_trackers, self.options.batch_size,
571+ self.options.jobs)
572
573 run_time = time.time() - start_time
574 self.logger.info("Time for this run: %.3f seconds." % run_time)
575
576=== modified file 'lib/lp/bugs/scripts/tests/test_bugimport.py'
577--- lib/lp/bugs/scripts/tests/test_bugimport.py 2009-10-21 18:46:29 +0000
578+++ lib/lp/bugs/scripts/tests/test_bugimport.py 2009-11-27 16:20:37 +0000
579@@ -883,10 +883,10 @@
580 class TestBugWatchUpdater(BugWatchUpdater):
581 """A mock `BugWatchUpdater` object."""
582
583- def updateBugTracker(self, bug_tracker):
584+ def _updateBugTracker(self, bug_tracker):
585 # Save the current bug tracker, so _getBugWatch can reference it.
586 self.bugtracker = bug_tracker
587- super(TestBugWatchUpdater, self).updateBugTracker(bug_tracker)
588+ super(TestBugWatchUpdater, self)._updateBugTracker(bug_tracker)
589
590 def _getExternalBugTrackersAndWatches(self, bug_tracker, bug_watches):
591 """See `BugWatchUpdater`."""
592@@ -928,7 +928,7 @@
593 # trigger a DB error, the second updates successfully.
594 bug_tracker = TestBugTracker(test_bug_one, test_bug_two)
595 bug_watch_updater = TestBugWatchUpdater(self.layer.txn)
596- bug_watch_updater.updateBugTracker(bug_tracker)
597+ bug_watch_updater._updateBugTracker(bug_tracker)
598 # We verify that the first bug watch didn't update the status,
599 # and the second did.
600 for bugtask in test_bug_one.bugtasks: