Merge lp:~allenap/launchpad/multithreaded-checkwatches into lp:launchpad
- multithreaded-checkwatches
- Merge into devel
Proposed by
Gavin Panella
Status: | Merged |
---|---|
Approved by: | Gavin Panella |
Approved revision: | not available |
Merged at revision: | not available |
Proposed branch: | lp:~allenap/launchpad/multithreaded-checkwatches |
Merge into: | lp:launchpad |
Diff against target: |
600 lines (+284/-122) 5 files modified
lib/lp/bugs/doc/checkwatches-cli-switches.txt (+3/-0) lib/lp/bugs/doc/checkwatches.txt (+4/-4) lib/lp/bugs/doc/externalbugtracker.txt (+97/-1) lib/lp/bugs/scripts/checkwatches.py (+177/-114) lib/lp/bugs/scripts/tests/test_bugimport.py (+3/-3) |
To merge this branch: | bzr merge lp:~allenap/launchpad/multithreaded-checkwatches |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Abel Deuring (community) | code | Approve | |
Review via email: mp+15283@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote : | # |
Revision history for this message
Abel Deuring (adeuring) wrote : | # |
Hi Gavin,
great work! And thanks for answering all my somewhat paranoid questions on IRC
review:
Approve
(code)
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'lib/lp/bugs/doc/checkwatches-cli-switches.txt' |
2 | --- lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-10-09 20:52:12 +0000 |
3 | +++ lib/lp/bugs/doc/checkwatches-cli-switches.txt 2009-11-27 16:20:37 +0000 |
4 | @@ -138,3 +138,6 @@ |
5 | --reset Update all the watches on the bug tracker, |
6 | regardless of whether or not they need |
7 | checking. |
8 | + --jobs=JOBS The number of simulataneous jobs to run, 1 |
9 | + by default. |
10 | + <BLANKLINE> |
11 | |
12 | === modified file 'lib/lp/bugs/doc/checkwatches.txt' |
13 | --- lib/lp/bugs/doc/checkwatches.txt 2009-11-19 14:41:14 +0000 |
14 | +++ lib/lp/bugs/doc/checkwatches.txt 2009-11-27 16:20:37 +0000 |
15 | @@ -46,6 +46,7 @@ |
16 | >>> print err |
17 | INFO creating lockfile |
18 | DEBUG Using a global batch size of None |
19 | + DEBUG Skipping updating Ubuntu Bugzilla watches. |
20 | DEBUG No watches to update on http://bugs.debian.org |
21 | DEBUG No watches to update on mailto:bugs@example.com |
22 | WARNING ExternalBugtracker for BugTrackerType 'SAVANE' is not known. |
23 | @@ -53,7 +54,6 @@ |
24 | DEBUG No watches to update on http://sourceforge.net/ |
25 | DEBUG No watches to update on http://bugzilla.gnome.org/ |
26 | DEBUG No watches to update on https://bugzilla.mozilla.org/ |
27 | - DEBUG Skipping updating Ubuntu Bugzilla watches. |
28 | INFO Time for this run: ... seconds. |
29 | DEBUG Removing lock file:... |
30 | <BLANKLINE> |
31 | @@ -201,7 +201,7 @@ |
32 | ... broken_get_external_bugtracker) |
33 | ... updater = BugWatchUpdater(transaction) |
34 | ... updater._login() |
35 | - ... updater.updateBugTracker(example_bug_tracker) |
36 | + ... updater._updateBugTracker(example_bug_tracker) |
37 | ... finally: |
38 | ... externalbugtracker.get_external_bugtracker = ( |
39 | ... real_get_external_bugtracker) |
40 | @@ -223,7 +223,7 @@ |
41 | a given ExternalBugTracker in each checkwatches run: the batch size. |
42 | |
43 | We need to add some bug watches again since |
44 | -BugWatchUpdate.updateBugTracker() automatically rolls back the |
45 | +BugWatchUpdate._updateBugTracker() automatically rolls back the |
46 | transaction if something goes wrong. |
47 | |
48 | >>> login('test@canonical.com') |
49 | @@ -353,7 +353,7 @@ |
50 | |
51 | >>> class NonConnectingUpdater(BugWatchUpdater): |
52 | ... |
53 | - ... def updateBugTracker(self, bug_tracker, batch_size): |
54 | + ... def _updateBugTracker(self, bug_tracker, batch_size): |
55 | ... # Update as many watches as the batch size says. |
56 | ... watches_to_update = ( |
57 | ... bug_tracker.getBugWatchesNeedingUpdate(23)[:batch_size]) |
58 | |
59 | === modified file 'lib/lp/bugs/doc/externalbugtracker.txt' |
60 | --- lib/lp/bugs/doc/externalbugtracker.txt 2009-09-22 15:22:53 +0000 |
61 | +++ lib/lp/bugs/doc/externalbugtracker.txt 2009-11-27 16:20:37 +0000 |
62 | @@ -1079,7 +1079,8 @@ |
63 | |
64 | >>> bug_watch_updater = NonConnectingBugWatchUpdater( |
65 | ... transaction, QuietFakeLogger()) |
66 | - >>> bug_watch_updater.updateBugTracker(standard_bugzilla, batch_size=2) |
67 | + >>> bug_watch_updater._updateBugTracker( |
68 | + ... standard_bugzilla, batch_size=2) |
69 | initializeRemoteBugDB() called: [u'5', u'6'] |
70 | getRemoteStatus() called: u'5' |
71 | getRemoteStatus() called: u'6' |
72 | @@ -1089,6 +1090,12 @@ |
73 | allows it to be passed as a command-line option when the checkwatches script |
74 | is run. |
75 | |
76 | +Before going further, we must abort the current transaction to avoid |
77 | +deadlock; updateBugTrackers() runs updateBugTracker() in a different |
78 | +thread. |
79 | + |
80 | + >>> transaction.abort() |
81 | + |
82 | >>> from canonical.launchpad.scripts.logger import FakeLogger |
83 | >>> bug_watch_updater = NonConnectingBugWatchUpdater( |
84 | ... transaction, FakeLogger()) |
85 | @@ -1099,3 +1106,92 @@ |
86 | initializeRemoteBugDB() called: [u'5', u'6'] |
87 | getRemoteStatus() called: u'5' |
88 | getRemoteStatus() called: u'6' |
89 | + |
90 | + >>> # We should log in again because updateBugTrackers() logs out. |
91 | + >>> login('test@canonical.com') |
92 | + |
93 | +By default, the updateBugTrackers() only spawns one thread, but it can |
94 | +spawn as many as required. |
95 | + |
96 | + >>> import threading |
97 | + |
98 | + >>> class OutputFileForThreads: |
99 | + ... def __init__(self): |
100 | + ... self.output = {} |
101 | + ... self.lock = threading.Lock() |
102 | + ... def write(self, data): |
103 | + ... thread_id = id(threading.currentThread()) |
104 | + ... self.lock.acquire() |
105 | + ... try: |
106 | + ... if thread_id in self.output: |
107 | + ... self.output[thread_id].append(data) |
108 | + ... else: |
109 | + ... self.output[thread_id] = [data] |
110 | + ... finally: |
111 | + ... self.lock.release() |
112 | + |
113 | + >>> output_file = OutputFileForThreads() |
114 | + |
115 | + >>> class ExternalBugTrackerForThreads(TestExternalBugTracker): |
116 | + ... def getModifiedRemoteBugs(self, remote_bug_ids, last_checked): |
117 | + ... print >> output_file, ( |
118 | + ... "getModifiedRemoteBugs(\n" |
119 | + ... " remote_bug_ids=%r,\n" |
120 | + ... " last_checked=%r)" % (remote_bug_ids, last_checked)) |
121 | + ... return [remote_bug_ids[0], remote_bug_ids[-1]] |
122 | + ... def getRemoteStatus(self, bug_id): |
123 | + ... print >> output_file, ( |
124 | + ... "getRemoteStatus(bug_id=%r)" % bug_id) |
125 | + ... return 'UNKNOWN' |
126 | + ... def getCurrentDBTime(self): |
127 | + ... return None |
128 | + |
129 | + >>> class BugWatchUpdaterForThreads(BugWatchUpdater): |
130 | + ... def _getExternalBugTrackersAndWatches( |
131 | + ... self, bug_trackers, bug_watches): |
132 | + ... return [(ExternalBugTrackerForThreads(), bug_watches)] |
133 | + |
134 | + >>> threaded_bug_watch_updater = BugWatchUpdaterForThreads( |
135 | + ... transaction, FakeLogger(output_file)) |
136 | + >>> threaded_bug_watch_updater.updateBugTrackers( |
137 | + ... batch_size=5, num_threads=10) |
138 | + |
139 | + >>> for output in sorted(output_file.output.itervalues()): |
140 | + ... print "".join(output), |
141 | + ... print '--' |
142 | + DEBUG No watches to update on http://bugs.example.com |
143 | + -- |
144 | + DEBUG No watches to update on http://bugzilla.gnome.org/ |
145 | + -- |
146 | + DEBUG No watches to update on http://savannah.gnu.org/ |
147 | + -- |
148 | + DEBUG No watches to update on http://sourceforge.net/ |
149 | + -- |
150 | + DEBUG No watches to update on mailto:bugs@example.com |
151 | + -- |
152 | + DEBUG Using a global batch size of 5 |
153 | + DEBUG Skipping updating Ubuntu Bugzilla watches. |
154 | + -- |
155 | + INFO Updating 2 watches for 2 bugs on http://example.com |
156 | + getRemoteStatus(bug_id=u'304070') |
157 | + getRemoteStatus(bug_id=u'3224') |
158 | + -- |
159 | + INFO Updating 4 watches for 3 bugs on http://example.com |
160 | + getRemoteStatus(bug_id=u'123543') |
161 | + getRemoteStatus(bug_id=u'2000') |
162 | + getRemoteStatus(bug_id=u'42') |
163 | + -- |
164 | + INFO Updating 5 watches for 5 bugs on http://example.com |
165 | + getRemoteStatus(bug_id=u'1') |
166 | + getRemoteStatus(bug_id=u'101') |
167 | + getRemoteStatus(bug_id=u'5') |
168 | + getRemoteStatus(bug_id=u'6') |
169 | + getRemoteStatus(bug_id=u'7') |
170 | + -- |
171 | + INFO Updating 5 watches for 5 bugs on http://example.com |
172 | + getRemoteStatus(bug_id=u'280883') |
173 | + getRemoteStatus(bug_id=u'304014') |
174 | + getRemoteStatus(bug_id=u'308994') |
175 | + getRemoteStatus(bug_id=u'327452') |
176 | + getRemoteStatus(bug_id=u'327549') |
177 | + -- |
178 | |
179 | === modified file 'lib/lp/bugs/scripts/checkwatches.py' |
180 | --- lib/lp/bugs/scripts/checkwatches.py 2009-11-19 12:49:04 +0000 |
181 | +++ lib/lp/bugs/scripts/checkwatches.py 2009-11-27 16:20:37 +0000 |
182 | @@ -6,8 +6,10 @@ |
183 | |
184 | from copy import copy |
185 | from datetime import datetime, timedelta |
186 | +import Queue as queue |
187 | import socket |
188 | import sys |
189 | +import threading |
190 | import time |
191 | |
192 | import pytz |
193 | @@ -31,7 +33,7 @@ |
194 | ErrorReportingUtility, ScriptRequest) |
195 | from canonical.launchpad.webapp.interfaces import IPlacelessAuthUtility |
196 | from canonical.launchpad.webapp.interaction import ( |
197 | - setupInteraction, endInteraction) |
198 | + setupInteraction, endInteraction, queryInteraction) |
199 | from canonical.launchpad.webapp.publisher import canonical_url |
200 | |
201 | from lp.bugs import externalbugtracker |
202 | @@ -161,7 +163,21 @@ |
203 | |
204 | ACCEPTABLE_TIME_SKEW = timedelta(minutes=10) |
205 | |
206 | + LOGIN = 'bugwatch@bugs.launchpad.net' |
207 | + |
208 | def __init__(self, txn, log=default_log, syncable_gnome_products=None): |
209 | + """Initialize a BugWatchUpdater. |
210 | + |
211 | + :param txn: A transaction manager on which `begin()`, |
212 | + `abort()` and `commit()` can be called. Additionally, it |
213 | + should be safe for different threads to use its methods to |
214 | + manage their own transactions (i.e. with thread-local |
215 | + storage). |
216 | + |
217 | + :param log: An instance of `logging.Logger`, or something that |
218 | + provides a similar interface. |
219 | + |
220 | + """ |
221 | self.txn = txn |
222 | self.log = log |
223 | |
224 | @@ -171,96 +187,176 @@ |
225 | else: |
226 | self._syncable_gnome_products = list(SYNCABLE_GNOME_PRODUCTS) |
227 | |
228 | + self._principal = ( |
229 | + getUtility(IPlacelessAuthUtility).getPrincipalByLogin( |
230 | + self.LOGIN, want_password=False)) |
231 | + |
232 | def _login(self): |
233 | """Set up an interaction as the Bug Watch Updater""" |
234 | - auth_utility = getUtility(IPlacelessAuthUtility) |
235 | - setupInteraction( |
236 | - auth_utility.getPrincipalByLogin( |
237 | - 'bugwatch@bugs.launchpad.net', want_password=False), |
238 | - login='bugwatch@bugs.launchpad.net') |
239 | + setupInteraction(self._principal, login=self.LOGIN) |
240 | |
241 | def _logout(self): |
242 | """Tear down the Bug Watch Updater Interaction.""" |
243 | endInteraction() |
244 | |
245 | - def updateBugTrackers(self, bug_tracker_names=None, batch_size=None): |
246 | - """Update all the bug trackers that have watches pending. |
247 | - |
248 | - If bug tracker names are specified in bug_tracker_names only |
249 | - those bug trackers will be checked. |
250 | + def _interactionDecorator(self, func): |
251 | + """Wrap a function to ensure that it runs within an interaction. |
252 | + |
253 | + If an interaction is already set up, this simply calls the |
254 | + function. If no interaction exists, it will set one up, call the |
255 | + function, then end the interaction. |
256 | + |
257 | + This is intended to make sure the right thing happens whether or not |
258 | + the function is run in a different thread. |
259 | """ |
260 | - self.txn.begin() |
261 | + def wrapper(*args, **kwargs): |
262 | + if queryInteraction() is None: |
263 | + self._login() |
264 | + try: |
265 | + return func(*args, **kwargs) |
266 | + finally: |
267 | + self._logout() |
268 | + else: |
269 | + return func(*args, **kwargs) |
270 | + return wrapper |
271 | + |
272 | + def _bugTrackerUpdaters(self, bug_tracker_names=None): |
273 | + """Yields functions that can be used to update each bug tracker.""" |
274 | + # Set up an interaction as the Bug Watch Updater since the |
275 | + # notification code expects a logged in user. |
276 | + self._login() |
277 | + |
278 | ubuntu_bugzilla = getUtility(ILaunchpadCelebrities).ubuntu_bugzilla |
279 | # Save the name, so we can use it in other transactions. |
280 | ubuntu_bugzilla_name = ubuntu_bugzilla.name |
281 | |
282 | - # Set up an interaction as the Bug Watch Updater since the |
283 | - # notification code expects a logged in user. |
284 | - self._login() |
285 | - |
286 | - self.log.debug("Using a global batch size of %s" % batch_size) |
287 | - |
288 | if bug_tracker_names is None: |
289 | bug_tracker_names = [ |
290 | bugtracker.name for bugtracker in getUtility(IBugTrackerSet)] |
291 | - self.txn.commit() |
292 | + |
293 | + def make_updater(bug_tracker_id): |
294 | + """Returns a function that can update the given bug tracker.""" |
295 | + def updater(batch_size=None): |
296 | + run = self._interactionDecorator(self.updateBugTracker) |
297 | + return run(bug_tracker_id, batch_size) |
298 | + return updater |
299 | + |
300 | for bug_tracker_name in bug_tracker_names: |
301 | - self.txn.begin() |
302 | - bug_tracker = getUtility(IBugTrackerSet).getByName( |
303 | - bug_tracker_name) |
304 | - |
305 | - if not bug_tracker.active: |
306 | + if bug_tracker_name == ubuntu_bugzilla_name: |
307 | + # XXX: 2007-09-11 Graham Binns |
308 | + # We automatically ignore the Ubuntu Bugzilla |
309 | + # here as all its bugs have been imported into |
310 | + # Launchpad. Ideally we would have some means |
311 | + # to identify all bug trackers like this so |
312 | + # that hard-coding like this can be genericised |
313 | + # (Bug 138949). |
314 | self.log.debug( |
315 | - "Updates are disabled for bug tracker at %s" % |
316 | - bug_tracker.baseurl) |
317 | - self.txn.abort() |
318 | - continue |
319 | - |
320 | - # Save the url for later, since we might need it to report an |
321 | - # error after a transaction has been aborted. |
322 | - bug_tracker_url = bug_tracker.baseurl |
323 | - try: |
324 | - if bug_tracker_name == ubuntu_bugzilla_name: |
325 | - # XXX: 2007-09-11 Graham Binns |
326 | - # We automatically ignore the Ubuntu Bugzilla |
327 | - # here as all its bugs have been imported into |
328 | - # Launchpad. Ideally we would have some means |
329 | - # to identify all bug trackers like this so |
330 | - # that hard-coding like this can be genericised |
331 | - # (Bug 138949). |
332 | + "Skipping updating Ubuntu Bugzilla watches.") |
333 | + else: |
334 | + bug_tracker = getUtility(IBugTrackerSet).getByName( |
335 | + bug_tracker_name) |
336 | + if bug_tracker.active: |
337 | + yield make_updater(bug_tracker.id) |
338 | + else: |
339 | self.log.debug( |
340 | - "Skipping updating Ubuntu Bugzilla watches.") |
341 | - else: |
342 | - self.updateBugTracker(bug_tracker, batch_size) |
343 | + "Updates are disabled for bug tracker at %s" % |
344 | + bug_tracker.baseurl) |
345 | |
346 | - self.txn.commit() |
347 | - except (KeyboardInterrupt, SystemExit): |
348 | - # We should never catch KeyboardInterrupt or SystemExit. |
349 | - raise |
350 | - except Exception, error: |
351 | - # If something unexpected goes wrong, we log it and |
352 | - # continue: a failure shouldn't break the updating of |
353 | - # the other bug trackers. |
354 | - info = sys.exc_info() |
355 | - properties = [ |
356 | - ('bugtracker', bug_tracker_name), |
357 | - ('baseurl', bug_tracker_url)] |
358 | - if isinstance(error, BugWatchUpdateError): |
359 | - self.error( |
360 | - str(error), properties=properties, info=info) |
361 | - elif isinstance(error, socket.timeout): |
362 | - self.error( |
363 | - "Connection timed out when updating %s" % |
364 | - bug_tracker_url, |
365 | - properties=properties, info=info) |
366 | - else: |
367 | - self.error( |
368 | - "An exception was raised when updating %s" % |
369 | - bug_tracker_url, |
370 | - properties=properties, info=info) |
371 | - self.txn.abort() |
372 | self._logout() |
373 | |
374 | + def updateBugTrackers( |
375 | + self, bug_tracker_names=None, batch_size=None, num_threads=1): |
376 | + """Update all the bug trackers that have watches pending. |
377 | + |
378 | + If bug tracker names are specified in bug_tracker_names only |
379 | + those bug trackers will be checked. |
380 | + |
381 | + The updates are run in threads, so that long running updates |
382 | + don't block progress. However, by default the number of |
383 | + threads is 1, to help with testing. |
384 | + """ |
385 | + self.log.debug("Using a global batch size of %s" % batch_size) |
386 | + |
387 | + # Put all the work on the queue. This is simpler than drip-feeding the |
388 | + # queue, and avoids a situation where a worker thread exits because |
389 | + # there's no work left and the feeding thread hasn't been scheduled to |
390 | + # add work to the queue. |
391 | + work = queue.Queue() |
392 | + for updater in self._bugTrackerUpdaters(bug_tracker_names): |
393 | + work.put(updater) |
394 | + |
395 | + # This will be run once in each worker thread. |
396 | + def do_work(): |
397 | + while True: |
398 | + try: |
399 | + job = work.get(block=False) |
400 | + except queue.Empty: |
401 | + break |
402 | + else: |
403 | + job(batch_size) |
404 | + |
405 | + # Start and join the worker threads. |
406 | + threads = [] |
407 | + for run in xrange(num_threads): |
408 | + thread = threading.Thread(target=do_work) |
409 | + thread.start() |
410 | + threads.append(thread) |
411 | + for thread in threads: |
412 | + thread.join() |
413 | + |
414 | + def updateBugTracker(self, bug_tracker, batch_size): |
415 | + """Updates the given bug trackers's bug watches. |
416 | + |
417 | + If there is an error, logs are updated, and the transaction is |
418 | + aborted. |
419 | + |
420 | + :param bug_tracker: An IBugTracker or the ID of one, so that this |
421 | + method can be called from a different interaction. |
422 | + |
423 | + :return: A boolean indicating if the operation was successful. |
424 | + """ |
425 | + # Get the bug tracker. |
426 | + if isinstance(bug_tracker, (int, long)): |
427 | + bug_tracker = getUtility(IBugTrackerSet).get(bug_tracker) |
428 | + |
429 | + # Save the name and url for later, since we might need it to report an |
430 | + # error after a transaction has been aborted. |
431 | + bug_tracker_name = bug_tracker.name |
432 | + bug_tracker_url = bug_tracker.baseurl |
433 | + |
434 | + try: |
435 | + self.txn.begin() |
436 | + self._updateBugTracker(bug_tracker, batch_size) |
437 | + self.txn.commit() |
438 | + except (KeyboardInterrupt, SystemExit): |
439 | + # We should never catch KeyboardInterrupt or SystemExit. |
440 | + raise |
441 | + except Exception, error: |
442 | + # If something unexpected goes wrong, we log it and |
443 | + # continue: a failure shouldn't break the updating of |
444 | + # the other bug trackers. |
445 | + info = sys.exc_info() |
446 | + properties = [ |
447 | + ('bugtracker', bug_tracker_name), |
448 | + ('baseurl', bug_tracker_url)] |
449 | + if isinstance(error, BugWatchUpdateError): |
450 | + self.error( |
451 | + str(error), properties=properties, info=info) |
452 | + elif isinstance(error, socket.timeout): |
453 | + self.error( |
454 | + "Connection timed out when updating %s" % |
455 | + bug_tracker_url, |
456 | + properties=properties, info=info) |
457 | + else: |
458 | + self.error( |
459 | + "An exception was raised when updating %s" % |
460 | + bug_tracker_url, |
461 | + properties=properties, info=info) |
462 | + self.txn.abort() |
463 | + return False |
464 | + else: |
465 | + return True |
466 | + |
467 | def forceUpdateAll(self, bug_tracker_name, batch_size): |
468 | """Update all the watches for `bug_tracker_name`. |
469 | |
470 | @@ -290,50 +386,16 @@ |
471 | bug_tracker.resetWatches() |
472 | self.txn.commit() |
473 | |
474 | - # Take a copy of the bug tracker URL. If the transaction fails |
475 | - # later we can't refer to the baseurl attribute of the bug |
476 | - # tracker. |
477 | - bug_tracker_url = bug_tracker.baseurl |
478 | - |
479 | # Loop over the bug watches in batches as specificed by |
480 | # batch_size until there are none left to update. |
481 | self.log.info( |
482 | "Updating %s watches on bug tracker '%s'" % |
483 | (bug_tracker.watches.count(), bug_tracker_name)) |
484 | - iteration = 0 |
485 | has_watches_to_update = True |
486 | while has_watches_to_update: |
487 | self.txn.begin() |
488 | - try: |
489 | - self.updateBugTracker(bug_tracker, batch_size) |
490 | - self.txn.commit() |
491 | - except (KeyboardInterrupt, SystemExit): |
492 | - # We should never catch KeyboardInterrupt or SystemExit. |
493 | - raise |
494 | - except Exception, error: |
495 | - # If something unexpected goes wrong, we log it and |
496 | - # continue: a failure shouldn't break the updating of |
497 | - # the other bug trackers. |
498 | - info = sys.exc_info() |
499 | - properties = [ |
500 | - ('bugtracker', bug_tracker_name), |
501 | - ('baseurl', bug_tracker_url)] |
502 | - if isinstance(error, BugWatchUpdateError): |
503 | - self.error( |
504 | - str(error), properties=properties, info=info) |
505 | - elif isinstance(error, socket.timeout): |
506 | - self.error( |
507 | - "Connection timed out when updating %s" % |
508 | - bug_tracker_url, |
509 | - properties=properties, info=info) |
510 | - else: |
511 | - self.error( |
512 | - "An exception was raised when updating %s" % |
513 | - bug_tracker_url, |
514 | - properties=properties, info=info) |
515 | - self.txn.abort() |
516 | + if not self.updateBugTracker(bug_tracker, batch_size): |
517 | break |
518 | - |
519 | watches_left = bug_tracker.getBugWatchesNeedingUpdate(23).count() |
520 | self.log.info( |
521 | "%s watches left to check on bug tracker '%s'" % |
522 | @@ -409,7 +471,7 @@ |
523 | |
524 | return trackers_and_watches |
525 | |
526 | - def updateBugTracker(self, bug_tracker, batch_size=None): |
527 | + def _updateBugTracker(self, bug_tracker, batch_size=None): |
528 | """Updates the given bug trackers's bug watches.""" |
529 | # XXX 2007-01-18 gmb: |
530 | # Once we start running checkwatches more frequently we need |
531 | @@ -1085,7 +1147,7 @@ |
532 | "one bugtracker using this option will check all the " |
533 | "bugtrackers specified.") |
534 | self.parser.add_option( |
535 | - '-b', '--batch-size', action='store', dest='batch_size', |
536 | + '-b', '--batch-size', action='store', type=int, dest='batch_size', |
537 | help="Set the number of watches to be checked per bug " |
538 | "tracker in this run. If BATCH_SIZE is 0, all watches " |
539 | "on the bug tracker that are eligible for checking will " |
540 | @@ -1094,26 +1156,27 @@ |
541 | '--reset', action='store_true', dest='update_all', |
542 | help="Update all the watches on the bug tracker, regardless of " |
543 | "whether or not they need checking.") |
544 | + self.parser.add_option( |
545 | + '--jobs', action='store', type=int, dest='jobs', default=1, |
546 | + help=("The number of simulataneous jobs to run, %default by " |
547 | + "default.")) |
548 | |
549 | def main(self): |
550 | start_time = time.time() |
551 | |
552 | updater = BugWatchUpdater(self.txn, self.logger) |
553 | |
554 | - # Make sure batch_size is an integer or None. |
555 | - batch_size = self.options.batch_size |
556 | - if batch_size is not None: |
557 | - batch_size = int(batch_size) |
558 | - |
559 | if self.options.update_all and len(self.options.bug_trackers) > 0: |
560 | # The user has requested that we update *all* the watches |
561 | # for these bugtrackers |
562 | for bug_tracker in self.options.bug_trackers: |
563 | - updater.forceUpdateAll(bug_tracker, batch_size) |
564 | + updater.forceUpdateAll(bug_tracker, self.options.batch_size) |
565 | else: |
566 | # Otherwise we just update those watches that need updating, |
567 | # and we let the BugWatchUpdater decide which those are. |
568 | - updater.updateBugTrackers(self.options.bug_trackers, batch_size) |
569 | + updater.updateBugTrackers( |
570 | + self.options.bug_trackers, self.options.batch_size, |
571 | + self.options.jobs) |
572 | |
573 | run_time = time.time() - start_time |
574 | self.logger.info("Time for this run: %.3f seconds." % run_time) |
575 | |
576 | === modified file 'lib/lp/bugs/scripts/tests/test_bugimport.py' |
577 | --- lib/lp/bugs/scripts/tests/test_bugimport.py 2009-10-21 18:46:29 +0000 |
578 | +++ lib/lp/bugs/scripts/tests/test_bugimport.py 2009-11-27 16:20:37 +0000 |
579 | @@ -883,10 +883,10 @@ |
580 | class TestBugWatchUpdater(BugWatchUpdater): |
581 | """A mock `BugWatchUpdater` object.""" |
582 | |
583 | - def updateBugTracker(self, bug_tracker): |
584 | + def _updateBugTracker(self, bug_tracker): |
585 | # Save the current bug tracker, so _getBugWatch can reference it. |
586 | self.bugtracker = bug_tracker |
587 | - super(TestBugWatchUpdater, self).updateBugTracker(bug_tracker) |
588 | + super(TestBugWatchUpdater, self)._updateBugTracker(bug_tracker) |
589 | |
590 | def _getExternalBugTrackersAndWatches(self, bug_tracker, bug_watches): |
591 | """See `BugWatchUpdater`.""" |
592 | @@ -928,7 +928,7 @@ |
593 | # trigger a DB error, the second updates successfully. |
594 | bug_tracker = TestBugTracker(test_bug_one, test_bug_two) |
595 | bug_watch_updater = TestBugWatchUpdater(self.layer.txn) |
596 | - bug_watch_updater.updateBugTracker(bug_tracker) |
597 | + bug_watch_updater._updateBugTracker(bug_tracker) |
598 | # We verify that the first bug watch didn't update the status, |
599 | # and the second did. |
600 | for bugtask in test_bug_one.bugtasks: |
This branch makes checkwatches update bug trackers in multiple threads. The
watches for a specific bug tracker are updated sequentially in a single
thread, but other bug trackers may be being updated concurrently in separate
threads.
Most of the changes are refactoring of or new methods for BugWatchUpdater in
checkwatches.py:
* The existing updateBugTracker() method was renamed to _updateBugTrack er().
* The existing exception handling and oops reporting code in ckers() was moved to a new updateBugTracker() method.
updateBugTra
* forceUpdateAll() had some identical exception handling code which was
replaced by a call to the new updateBugTracker() method.
* updateBugTrackers() was eviscerated and replaced with the thread setup and ters() method which generates functions
run. It calls a new _bugTrackerUpda
for the threads to run, each of which will update one bug tracker. These
functions are put into a work queue from which the threads will pull.
* A new _interactionDec orator( ) method was created to support running jobs in ters() yields is
an interaction. Each of the functions that _bugTrackerUpda
decorated with this. This is needed because interactions are specific to a
thread.
Other work:
* Add a --jobs option to the checkwatches.py script.
* Tests that called updateBugTracker() now call _updateBugTracker() to avoid
the transaction stuff.
* Demonstrate updateBugTrackers() spawning multiple threads.
Testing:
bin/test -vvct 'checkwatch| externalbug'
Lint:
lib/lp/ bugs/scripts/ checkwatches. py .event' (No module named
lifecycle)
22: [F0401] Unable to import 'lazr.lifecycle