Merge lp:~stub/launchpad/garbo into lp:launchpad/db-devel
- garbo
- Merge into db-devel
Proposed by
Stuart Bishop
Status: | Superseded | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Proposed branch: | lp:~stub/launchpad/garbo | ||||||||||||
Merge into: | lp:launchpad/db-devel | ||||||||||||
Diff against target: |
466 lines (+148/-83) 5 files modified
cronscripts/garbo-frequently.py (+23/-0) database/schema/security.cfg (+4/-0) lib/canonical/launchpad/utilities/looptuner.py (+1/-1) lib/lp/scripts/garbo.py (+67/-47) lib/lp/scripts/tests/test_garbo.py (+53/-35) |
||||||||||||
To merge this branch: | bzr merge lp:~stub/launchpad/garbo | ||||||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Robert Collins (community) | Needs Fixing | ||
Review via email:
|
This proposal has been superseded by a proposal from 2011-09-09.
Commit message
Description of the change
Implement a 5 minute garbo job running and use it to fix Bug #795305.
Also a bit of delinting of touched code, and some minor garbo tunings (reducing the default transaction goal time to 2 seconds, and moving some other jobs to the fequent garbo runner to spread the load more evenly).
To post a comment you must log in.
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Stuart Bishop (stub) wrote : | # |
It depends on a database patch, so cannot land on devel until r10832 of lp:launchpad/db-devel has been merged into lp:launchpad/devel
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added file 'cronscripts/garbo-frequently.py' |
2 | --- cronscripts/garbo-frequently.py 1970-01-01 00:00:00 +0000 |
3 | +++ cronscripts/garbo-frequently.py 2011-07-29 13:44:41 +0000 |
4 | @@ -0,0 +1,23 @@ |
5 | +#!/usr/bin/python -S |
6 | +# |
7 | +# Copyright 2011 Canonical Ltd. This software is licensed under the |
8 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
9 | + |
10 | +"""Database garbage collector, every 5 minutes. |
11 | + |
12 | +Remove or archive unwanted data. Detect, warn and possibly repair data |
13 | +corruption. |
14 | +""" |
15 | + |
16 | +__metaclass__ = type |
17 | +__all__ = [] |
18 | + |
19 | +import _pythonpath |
20 | + |
21 | +from lp.scripts.garbo import FrequentDatabaseGarbageCollector |
22 | + |
23 | + |
24 | +if __name__ == '__main__': |
25 | + script = FrequentDatabaseGarbageCollector() |
26 | + script.continue_on_failure = True |
27 | + script.lock_and_run() |
28 | |
29 | === modified file 'database/schema/security.cfg' |
30 | --- database/schema/security.cfg 2011-07-28 12:58:14 +0000 |
31 | +++ database/schema/security.cfg 2011-07-29 13:44:41 +0000 |
32 | @@ -2187,6 +2187,10 @@ |
33 | groups=garbo |
34 | type=user |
35 | |
36 | +[garbo_frequently] |
37 | +groups=garbo |
38 | +type=user |
39 | + |
40 | [generateppahtaccess] |
41 | groups=script |
42 | public.archive = SELECT |
43 | |
44 | === modified file 'lib/canonical/launchpad/utilities/looptuner.py' |
45 | --- lib/canonical/launchpad/utilities/looptuner.py 2011-04-12 09:57:20 +0000 |
46 | +++ lib/canonical/launchpad/utilities/looptuner.py 2011-07-29 13:44:41 +0000 |
47 | @@ -311,7 +311,7 @@ |
48 | """A base implementation of `ITunableLoop`.""" |
49 | implements(ITunableLoop) |
50 | |
51 | - goal_seconds = 4 |
52 | + goal_seconds = 2 |
53 | minimum_chunk_size = 1 |
54 | maximum_chunk_size = None # Override |
55 | cooldown_time = 0 |
56 | |
57 | === modified file 'lib/lp/scripts/garbo.py' |
58 | --- lib/lp/scripts/garbo.py 2011-07-05 05:46:02 +0000 |
59 | +++ lib/lp/scripts/garbo.py 2011-07-29 13:44:41 +0000 |
60 | @@ -56,7 +56,6 @@ |
61 | from lp.bugs.interfaces.bug import IBugSet |
62 | from lp.bugs.model.bug import Bug |
63 | from lp.bugs.model.bugattachment import BugAttachment |
64 | -from lp.bugs.model.bugmessage import BugMessage |
65 | from lp.bugs.model.bugnotification import BugNotification |
66 | from lp.bugs.model.bugwatch import BugWatchActivity |
67 | from lp.bugs.scripts.checkwatches.scheduler import ( |
68 | @@ -84,7 +83,7 @@ |
69 | from lp.translations.model.potranslation import POTranslation |
70 | |
71 | |
72 | -ONE_DAY_IN_SECONDS = 24*60*60 |
73 | +ONE_DAY_IN_SECONDS = 24 * 60 * 60 |
74 | |
75 | |
76 | class BulkPruner(TunableLoop): |
77 | @@ -290,12 +289,34 @@ |
78 | """ |
79 | |
80 | |
81 | +class BugSummaryJournalRollup(TunableLoop): |
82 | + """Rollup BugSummaryJournal rows into BugSummary.""" |
83 | + maximum_chunk_size = 5000 |
84 | + |
85 | + def __init__(self, log, abort_time=None): |
86 | + super(BugSummaryJournalRollup, self).__init__(log, abort_time) |
87 | + self.store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR) |
88 | + |
89 | + def isDone(self): |
90 | + has_more = self.store.execute( |
91 | + "SELECT EXISTS (SELECT TRUE FROM BugSummaryJournal LIMIT 1)" |
92 | + ).get_one()[0] |
93 | + return not has_more |
94 | + |
95 | + def __call__(self, chunk_size): |
96 | + chunk_size = int(chunk_size + 0.5) |
97 | + self.store.execute( |
98 | + "SELECT bugsummary_rollup_journal(%s)", (chunk_size,), |
99 | + noresult=True) |
100 | + self.store.commit() |
101 | + |
102 | + |
103 | class OpenIDConsumerNoncePruner(TunableLoop): |
104 | """An ITunableLoop to prune old OpenIDConsumerNonce records. |
105 | |
106 | We remove all OpenIDConsumerNonce records older than 1 day. |
107 | """ |
108 | - maximum_chunk_size = 6*60*60 # 6 hours in seconds. |
109 | + maximum_chunk_size = 6 * 60 * 60 # 6 hours in seconds. |
110 | |
111 | def __init__(self, log, abort_time=None): |
112 | super(OpenIDConsumerNoncePruner, self).__init__(log, abort_time) |
113 | @@ -601,7 +622,7 @@ |
114 | self.max_offset = self.store.execute( |
115 | "SELECT MAX(id) FROM UnlinkedPeople").get_one()[0] |
116 | if self.max_offset is None: |
117 | - self.max_offset = -1 # Trigger isDone() now. |
118 | + self.max_offset = -1 # Trigger isDone() now. |
119 | self.log.debug("No Person records to remove.") |
120 | else: |
121 | self.log.info("%d Person records to remove." % self.max_offset) |
122 | @@ -684,36 +705,6 @@ |
123 | """ |
124 | |
125 | |
126 | -class MirrorBugMessageOwner(TunableLoop): |
127 | - """Mirror BugMessage.owner from Message. |
128 | - |
129 | - Only needed until they are all set, after that triggers will maintain it. |
130 | - """ |
131 | - |
132 | - # Test migration did 3M in 2 hours, so 5000 is ~ 10 seconds - and thats the |
133 | - # max we want to hold a DB lock open for. |
134 | - minimum_chunk_size = 1000 |
135 | - maximum_chunk_size = 5000 |
136 | - |
137 | - def __init__(self, log, abort_time=None): |
138 | - super(MirrorBugMessageOwner, self).__init__(log, abort_time) |
139 | - self.store = IMasterStore(BugMessage) |
140 | - self.isDone = IMasterStore(BugMessage).find( |
141 | - BugMessage, BugMessage.ownerID==None).is_empty |
142 | - |
143 | - def __call__(self, chunk_size): |
144 | - """See `ITunableLoop`.""" |
145 | - transaction.begin() |
146 | - updated = self.store.execute("""update bugmessage set |
147 | - owner=message.owner from message where |
148 | - bugmessage.message=message.id and bugmessage.id in |
149 | - (select id from bugmessage where owner is NULL limit %s);""" |
150 | - % int(chunk_size) |
151 | - ).rowcount |
152 | - self.log.debug("Updated %s bugmessages." % updated) |
153 | - transaction.commit() |
154 | - |
155 | - |
156 | class BugHeatUpdater(TunableLoop): |
157 | """A `TunableLoop` for bug heat calculations.""" |
158 | |
159 | @@ -802,7 +793,7 @@ |
160 | class OldTimeLimitedTokenDeleter(TunableLoop): |
161 | """Delete expired url access tokens from the session DB.""" |
162 | |
163 | - maximum_chunk_size = 24*60*60 # 24 hours in seconds. |
164 | + maximum_chunk_size = 24 * 60 * 60 # 24 hours in seconds. |
165 | |
166 | def __init__(self, log, abort_time=None): |
167 | super(OldTimeLimitedTokenDeleter, self).__init__(log, abort_time) |
168 | @@ -861,10 +852,10 @@ |
169 | |
170 | class BaseDatabaseGarbageCollector(LaunchpadCronScript): |
171 | """Abstract base class to run a collection of TunableLoops.""" |
172 | - script_name = None # Script name for locking and database user. Override. |
173 | - tunable_loops = None # Collection of TunableLoops. Override. |
174 | - continue_on_failure = False # If True, an exception in a tunable loop |
175 | - # does not cause the script to abort. |
176 | + script_name = None # Script name for locking and database user. Override. |
177 | + tunable_loops = None # Collection of TunableLoops. Override. |
178 | + continue_on_failure = False # If True, an exception in a tunable loop |
179 | + # does not cause the script to abort. |
180 | |
181 | # Default run time of the script in seconds. Override. |
182 | default_abort_script_time = None |
183 | @@ -915,7 +906,7 @@ |
184 | for count in range(0, self.options.threads): |
185 | thread = threading.Thread( |
186 | target=self.run_tasks_in_thread, |
187 | - name='Worker-%d' % (count+1,), |
188 | + name='Worker-%d' % (count + 1,), |
189 | args=(tunable_loops,)) |
190 | thread.start() |
191 | threads.add(thread) |
192 | @@ -949,7 +940,7 @@ |
193 | |
194 | @property |
195 | def script_timeout(self): |
196 | - a_very_long_time = 31536000 # 1 year |
197 | + a_very_long_time = 31536000 # 1 year |
198 | return self.options.abort_script or a_very_long_time |
199 | |
200 | def get_loop_logger(self, loop_name): |
201 | @@ -962,7 +953,7 @@ |
202 | loop_logger = logging.getLogger('garbo.' + loop_name) |
203 | for filter in loop_logger.filters: |
204 | if isinstance(filter, PrefixFilter): |
205 | - return loop_logger # Already have a PrefixFilter attached. |
206 | + return loop_logger # Already have a PrefixFilter attached. |
207 | loop_logger.addFilter(PrefixFilter(loop_name)) |
208 | return loop_logger |
209 | |
210 | @@ -1034,7 +1025,7 @@ |
211 | loop_logger.debug3( |
212 | "Unable to acquire lock %s. Running elsewhere?", |
213 | loop_lock_path) |
214 | - time.sleep(0.3) # Avoid spinning. |
215 | + time.sleep(0.3) # Avoid spinning. |
216 | tunable_loops.append(tunable_loop_class) |
217 | # Otherwise, emit a warning and skip the task. |
218 | else: |
219 | @@ -1073,16 +1064,38 @@ |
220 | transaction.abort() |
221 | |
222 | |
223 | -class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector): |
224 | - script_name = 'garbo-hourly' |
225 | +class FrequentDatabaseGarbageCollector(BaseDatabaseGarbageCollector): |
226 | + """Run every 5 minutes. |
227 | + |
228 | + This may become even more frequent in the future. |
229 | + |
230 | + Jobs with low overhead can go here to distribute work more evenly. |
231 | + """ |
232 | + script_name = 'garbo-frequently' |
233 | tunable_loops = [ |
234 | - MirrorBugMessageOwner, |
235 | + BugSummaryJournalRollup, |
236 | OAuthNoncePruner, |
237 | OpenIDConsumerNoncePruner, |
238 | OpenIDConsumerAssociationPruner, |
239 | + AntiqueSessionPruner, |
240 | + ] |
241 | + experimental_tunable_loops = [] |
242 | + |
243 | + # 5 minmutes minus 20 seconds for cleanup. This helps ensure the |
244 | + # script is fully terminated before the next scheduled hourly run |
245 | + # kicks in. |
246 | + default_abort_script_time = 60 * 5 - 20 |
247 | + |
248 | + |
249 | +class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector): |
250 | + """Run every hour. |
251 | + |
252 | + Jobs we want to run fairly often but have noticable overhead go here. |
253 | + """ |
254 | + script_name = 'garbo-hourly' |
255 | + tunable_loops = [ |
256 | RevisionCachePruner, |
257 | BugWatchScheduler, |
258 | - AntiqueSessionPruner, |
259 | UnusedSessionPruner, |
260 | DuplicateSessionPruner, |
261 | BugHeatUpdater, |
262 | @@ -1095,6 +1108,13 @@ |
263 | |
264 | |
265 | class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector): |
266 | + """Run every day. |
267 | + |
268 | + Jobs that don't need to be run frequently. |
269 | + |
270 | + If there is low overhead, consider putting these tasks in more |
271 | + frequently invoked lists to distribute the work more evenly. |
272 | + """ |
273 | script_name = 'garbo-daily' |
274 | tunable_loops = [ |
275 | BranchJobPruner, |
276 | |
277 | === modified file 'lib/lp/scripts/tests/test_garbo.py' |
278 | --- lib/lp/scripts/tests/test_garbo.py 2011-07-05 05:46:02 +0000 |
279 | +++ lib/lp/scripts/tests/test_garbo.py 2011-07-29 13:44:41 +0000 |
280 | @@ -24,6 +24,10 @@ |
281 | Storm, |
282 | ) |
283 | from storm.store import Store |
284 | +from testtools.matchers import ( |
285 | + Equals, |
286 | + GreaterThan, |
287 | + ) |
288 | import transaction |
289 | from zope.component import getUtility |
290 | from zope.security.proxy import removeSecurityProxy |
291 | @@ -55,7 +59,6 @@ |
292 | LaunchpadZopelessLayer, |
293 | ZopelessDatabaseLayer, |
294 | ) |
295 | -from lp.bugs.model.bugmessage import BugMessage |
296 | from lp.bugs.model.bugnotification import ( |
297 | BugNotification, |
298 | BugNotificationRecipient, |
299 | @@ -81,6 +84,7 @@ |
300 | BulkPruner, |
301 | DailyDatabaseGarbageCollector, |
302 | DuplicateSessionPruner, |
303 | + FrequentDatabaseGarbageCollector, |
304 | HourlyDatabaseGarbageCollector, |
305 | OpenIDConsumerAssociationPruner, |
306 | UnusedSessionPruner, |
307 | @@ -359,12 +363,23 @@ |
308 | # starting us in a known state. |
309 | self.runDaily() |
310 | self.runHourly() |
311 | + self.runFrequently() |
312 | |
313 | # Capture garbo log output to tests can examine it. |
314 | self.log_buffer = StringIO() |
315 | handler = logging.StreamHandler(self.log_buffer) |
316 | self.log.addHandler(handler) |
317 | |
318 | + def runFrequently(self, maximum_chunk_size=2, test_args=()): |
319 | + transaction.commit() |
320 | + LaunchpadZopelessLayer.switchDbUser('garbo_daily') |
321 | + collector = FrequentDatabaseGarbageCollector( |
322 | + test_args=list(test_args)) |
323 | + collector._maximum_chunk_size = maximum_chunk_size |
324 | + collector.logger = self.log |
325 | + collector.main() |
326 | + return collector |
327 | + |
328 | def runDaily(self, maximum_chunk_size=2, test_args=()): |
329 | transaction.commit() |
330 | LaunchpadZopelessLayer.switchDbUser('garbo_daily') |
331 | @@ -385,10 +400,10 @@ |
332 | def test_OAuthNoncePruner(self): |
333 | now = datetime.now(UTC) |
334 | timestamps = [ |
335 | - now - timedelta(days=2), # Garbage |
336 | - now - timedelta(days=1) - timedelta(seconds=60), # Garbage |
337 | - now - timedelta(days=1) + timedelta(seconds=60), # Not garbage |
338 | - now, # Not garbage |
339 | + now - timedelta(days=2), # Garbage |
340 | + now - timedelta(days=1) - timedelta(seconds=60), # Garbage |
341 | + now - timedelta(days=1) + timedelta(seconds=60), # Not garbage |
342 | + now, # Not garbage |
343 | ] |
344 | LaunchpadZopelessLayer.switchDbUser('testadmin') |
345 | store = IMasterStore(OAuthNonce) |
346 | @@ -399,14 +414,15 @@ |
347 | for timestamp in timestamps: |
348 | store.add(OAuthNonce( |
349 | access_token=OAuthAccessToken.get(1), |
350 | - request_timestamp = timestamp, |
351 | - nonce = str(timestamp))) |
352 | + request_timestamp=timestamp, |
353 | + nonce=str(timestamp))) |
354 | transaction.commit() |
355 | |
356 | # Make sure we have 4 nonces now. |
357 | self.failUnlessEqual(store.find(OAuthNonce).count(), 4) |
358 | |
359 | - self.runHourly(maximum_chunk_size=60) # 1 minute maximum chunk size |
360 | + self.runFrequently( |
361 | + maximum_chunk_size=60) # 1 minute maximum chunk size |
362 | |
363 | store = IMasterStore(OAuthNonce) |
364 | |
365 | @@ -428,10 +444,10 @@ |
366 | HOURS = 60 * 60 |
367 | DAYS = 24 * HOURS |
368 | timestamps = [ |
369 | - now - 2 * DAYS, # Garbage |
370 | - now - 1 * DAYS - 1 * MINUTES, # Garbage |
371 | - now - 1 * DAYS + 1 * MINUTES, # Not garbage |
372 | - now, # Not garbage |
373 | + now - 2 * DAYS, # Garbage |
374 | + now - 1 * DAYS - 1 * MINUTES, # Garbage |
375 | + now - 1 * DAYS + 1 * MINUTES, # Not garbage |
376 | + now, # Not garbage |
377 | ] |
378 | LaunchpadZopelessLayer.switchDbUser('testadmin') |
379 | |
380 | @@ -449,7 +465,7 @@ |
381 | self.failUnlessEqual(store.find(OpenIDConsumerNonce).count(), 4) |
382 | |
383 | # Run the garbage collector. |
384 | - self.runHourly(maximum_chunk_size=60) # 1 minute maximum chunks. |
385 | + self.runFrequently(maximum_chunk_size=60) # 1 minute maximum chunks. |
386 | |
387 | store = IMasterStore(OpenIDConsumerNonce) |
388 | |
389 | @@ -458,7 +474,8 @@ |
390 | |
391 | # And none of them are older than 1 day |
392 | earliest = store.find(Min(OpenIDConsumerNonce.timestamp)).one() |
393 | - self.failUnless(earliest >= now - 24*60*60, 'Still have old nonces') |
394 | + self.failUnless( |
395 | + earliest >= now - 24 * 60 * 60, 'Still have old nonces') |
396 | |
397 | def test_CodeImportResultPruner(self): |
398 | now = datetime.now(UTC) |
399 | @@ -485,7 +502,7 @@ |
400 | |
401 | new_code_import_result(now - timedelta(days=60)) |
402 | for i in range(results_to_keep_count - 1): |
403 | - new_code_import_result(now - timedelta(days=19+i)) |
404 | + new_code_import_result(now - timedelta(days=19 + i)) |
405 | |
406 | # Run the garbage collector |
407 | self.runDaily() |
408 | @@ -558,7 +575,7 @@ |
409 | store.execute(""" |
410 | INSERT INTO %s (server_url, handle, issued, lifetime) |
411 | VALUES (%s, %s, %d, %d) |
412 | - """ % (table_name, str(delta), str(delta), now-10, delta)) |
413 | + """ % (table_name, str(delta), str(delta), now - 10, delta)) |
414 | transaction.commit() |
415 | |
416 | # Ensure that we created at least one expirable row (using the |
417 | @@ -571,7 +588,7 @@ |
418 | |
419 | # Expire all those expirable rows, and possibly a few more if this |
420 | # test is running slow. |
421 | - self.runHourly() |
422 | + self.runFrequently() |
423 | |
424 | LaunchpadZopelessLayer.switchDbUser('testadmin') |
425 | store = store_selector.get(MAIN_STORE, MASTER_FLAVOR) |
426 | @@ -879,21 +896,22 @@ |
427 | |
428 | self.assertEqual(1, count) |
429 | |
430 | - def test_mirror_bugmessages(self): |
431 | - # Nuke the owner in sampledata. |
432 | - con = DatabaseLayer._db_fixture.superuser_connection() |
433 | - try: |
434 | - cur = con.cursor() |
435 | - cur.execute("ALTER TABLE bugmessage " |
436 | - "DISABLE TRIGGER bugmessage__owner__mirror") |
437 | - cur.execute("UPDATE bugmessage set owner=NULL") |
438 | - cur.execute("ALTER TABLE bugmessage " |
439 | - "ENABLE TRIGGER bugmessage__owner__mirror") |
440 | - con.commit() |
441 | - finally: |
442 | - con.close() |
443 | - store = IMasterStore(BugMessage) |
444 | - unmigrated = store.find(BugMessage, BugMessage.ownerID==None).count |
445 | - self.assertNotEqual(0, unmigrated()) |
446 | - self.runHourly() |
447 | - self.assertEqual(0, unmigrated()) |
448 | + def test_BugSummaryJournalRollup(self): |
449 | + LaunchpadZopelessLayer.switchDbUser('testadmin') |
450 | + store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR) |
451 | + |
452 | + # Generate a load of entries in BugSummaryJournal. |
453 | + store.execute("UPDATE BugTask SET status=42") |
454 | + |
455 | + # We only need a few to test. |
456 | + num_rows = store.execute( |
457 | + "SELECT COUNT(*) FROM BugSummaryJournal").get_one()[0] |
458 | + self.assertThat(num_rows, GreaterThan(10)) |
459 | + |
460 | + self.runFrequently() |
461 | + |
462 | + # We just care that the rows have been removed. The bugsummary |
463 | + # tests confirm that the rollup stored method is working correctly. |
464 | + num_rows = store.execute( |
465 | + "SELECT COUNT(*) FROM BugSummaryJournal").get_one()[0] |
466 | + self.assertThat(num_rows, Equals(0)) |
This should be a merge into devel - it has no schema changes [security.cfg does not count]