Merge lp:~stub/launchpad/garbo into lp:launchpad/db-devel

Proposed by Stuart Bishop
Status: Superseded
Proposed branch: lp:~stub/launchpad/garbo
Merge into: lp:launchpad/db-devel
Diff against target: 466 lines (+148/-83)
5 files modified
cronscripts/garbo-frequently.py (+23/-0)
database/schema/security.cfg (+4/-0)
lib/canonical/launchpad/utilities/looptuner.py (+1/-1)
lib/lp/scripts/garbo.py (+67/-47)
lib/lp/scripts/tests/test_garbo.py (+53/-35)
To merge this branch: bzr merge lp:~stub/launchpad/garbo
Reviewer Review Type Date Requested Status
Robert Collins (community) Needs Fixing
Review via email: mp+69792@code.launchpad.net

This proposal has been superseded by a proposal from 2011-09-09.

Description of the change

Implement a 5 minute garbo job running and use it to fix Bug #795305.

Also a bit of delinting of touched code, and some minor garbo tunings (reducing the default transaction goal time to 2 seconds, and moving some other jobs to the fequent garbo runner to spread the load more evenly).

To post a comment you must log in.
Revision history for this message
Robert Collins (lifeless) wrote :

This should be a merge into devel - it has no schema changes [security.cfg does not count]

review: Needs Fixing
Revision history for this message
Stuart Bishop (stub) wrote :

It depends on a database patch, so cannot land on devel until r10832 of lp:launchpad/db-devel has been merged into lp:launchpad/devel

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'cronscripts/garbo-frequently.py'
2--- cronscripts/garbo-frequently.py 1970-01-01 00:00:00 +0000
3+++ cronscripts/garbo-frequently.py 2011-07-29 13:44:41 +0000
4@@ -0,0 +1,23 @@
5+#!/usr/bin/python -S
6+#
7+# Copyright 2011 Canonical Ltd. This software is licensed under the
8+# GNU Affero General Public License version 3 (see the file LICENSE).
9+
10+"""Database garbage collector, every 5 minutes.
11+
12+Remove or archive unwanted data. Detect, warn and possibly repair data
13+corruption.
14+"""
15+
16+__metaclass__ = type
17+__all__ = []
18+
19+import _pythonpath
20+
21+from lp.scripts.garbo import FrequentDatabaseGarbageCollector
22+
23+
24+if __name__ == '__main__':
25+ script = FrequentDatabaseGarbageCollector()
26+ script.continue_on_failure = True
27+ script.lock_and_run()
28
29=== modified file 'database/schema/security.cfg'
30--- database/schema/security.cfg 2011-07-28 12:58:14 +0000
31+++ database/schema/security.cfg 2011-07-29 13:44:41 +0000
32@@ -2187,6 +2187,10 @@
33 groups=garbo
34 type=user
35
36+[garbo_frequently]
37+groups=garbo
38+type=user
39+
40 [generateppahtaccess]
41 groups=script
42 public.archive = SELECT
43
44=== modified file 'lib/canonical/launchpad/utilities/looptuner.py'
45--- lib/canonical/launchpad/utilities/looptuner.py 2011-04-12 09:57:20 +0000
46+++ lib/canonical/launchpad/utilities/looptuner.py 2011-07-29 13:44:41 +0000
47@@ -311,7 +311,7 @@
48 """A base implementation of `ITunableLoop`."""
49 implements(ITunableLoop)
50
51- goal_seconds = 4
52+ goal_seconds = 2
53 minimum_chunk_size = 1
54 maximum_chunk_size = None # Override
55 cooldown_time = 0
56
57=== modified file 'lib/lp/scripts/garbo.py'
58--- lib/lp/scripts/garbo.py 2011-07-05 05:46:02 +0000
59+++ lib/lp/scripts/garbo.py 2011-07-29 13:44:41 +0000
60@@ -56,7 +56,6 @@
61 from lp.bugs.interfaces.bug import IBugSet
62 from lp.bugs.model.bug import Bug
63 from lp.bugs.model.bugattachment import BugAttachment
64-from lp.bugs.model.bugmessage import BugMessage
65 from lp.bugs.model.bugnotification import BugNotification
66 from lp.bugs.model.bugwatch import BugWatchActivity
67 from lp.bugs.scripts.checkwatches.scheduler import (
68@@ -84,7 +83,7 @@
69 from lp.translations.model.potranslation import POTranslation
70
71
72-ONE_DAY_IN_SECONDS = 24*60*60
73+ONE_DAY_IN_SECONDS = 24 * 60 * 60
74
75
76 class BulkPruner(TunableLoop):
77@@ -290,12 +289,34 @@
78 """
79
80
81+class BugSummaryJournalRollup(TunableLoop):
82+ """Rollup BugSummaryJournal rows into BugSummary."""
83+ maximum_chunk_size = 5000
84+
85+ def __init__(self, log, abort_time=None):
86+ super(BugSummaryJournalRollup, self).__init__(log, abort_time)
87+ self.store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
88+
89+ def isDone(self):
90+ has_more = self.store.execute(
91+ "SELECT EXISTS (SELECT TRUE FROM BugSummaryJournal LIMIT 1)"
92+ ).get_one()[0]
93+ return not has_more
94+
95+ def __call__(self, chunk_size):
96+ chunk_size = int(chunk_size + 0.5)
97+ self.store.execute(
98+ "SELECT bugsummary_rollup_journal(%s)", (chunk_size,),
99+ noresult=True)
100+ self.store.commit()
101+
102+
103 class OpenIDConsumerNoncePruner(TunableLoop):
104 """An ITunableLoop to prune old OpenIDConsumerNonce records.
105
106 We remove all OpenIDConsumerNonce records older than 1 day.
107 """
108- maximum_chunk_size = 6*60*60 # 6 hours in seconds.
109+ maximum_chunk_size = 6 * 60 * 60 # 6 hours in seconds.
110
111 def __init__(self, log, abort_time=None):
112 super(OpenIDConsumerNoncePruner, self).__init__(log, abort_time)
113@@ -601,7 +622,7 @@
114 self.max_offset = self.store.execute(
115 "SELECT MAX(id) FROM UnlinkedPeople").get_one()[0]
116 if self.max_offset is None:
117- self.max_offset = -1 # Trigger isDone() now.
118+ self.max_offset = -1 # Trigger isDone() now.
119 self.log.debug("No Person records to remove.")
120 else:
121 self.log.info("%d Person records to remove." % self.max_offset)
122@@ -684,36 +705,6 @@
123 """
124
125
126-class MirrorBugMessageOwner(TunableLoop):
127- """Mirror BugMessage.owner from Message.
128-
129- Only needed until they are all set, after that triggers will maintain it.
130- """
131-
132- # Test migration did 3M in 2 hours, so 5000 is ~ 10 seconds - and thats the
133- # max we want to hold a DB lock open for.
134- minimum_chunk_size = 1000
135- maximum_chunk_size = 5000
136-
137- def __init__(self, log, abort_time=None):
138- super(MirrorBugMessageOwner, self).__init__(log, abort_time)
139- self.store = IMasterStore(BugMessage)
140- self.isDone = IMasterStore(BugMessage).find(
141- BugMessage, BugMessage.ownerID==None).is_empty
142-
143- def __call__(self, chunk_size):
144- """See `ITunableLoop`."""
145- transaction.begin()
146- updated = self.store.execute("""update bugmessage set
147- owner=message.owner from message where
148- bugmessage.message=message.id and bugmessage.id in
149- (select id from bugmessage where owner is NULL limit %s);"""
150- % int(chunk_size)
151- ).rowcount
152- self.log.debug("Updated %s bugmessages." % updated)
153- transaction.commit()
154-
155-
156 class BugHeatUpdater(TunableLoop):
157 """A `TunableLoop` for bug heat calculations."""
158
159@@ -802,7 +793,7 @@
160 class OldTimeLimitedTokenDeleter(TunableLoop):
161 """Delete expired url access tokens from the session DB."""
162
163- maximum_chunk_size = 24*60*60 # 24 hours in seconds.
164+ maximum_chunk_size = 24 * 60 * 60 # 24 hours in seconds.
165
166 def __init__(self, log, abort_time=None):
167 super(OldTimeLimitedTokenDeleter, self).__init__(log, abort_time)
168@@ -861,10 +852,10 @@
169
170 class BaseDatabaseGarbageCollector(LaunchpadCronScript):
171 """Abstract base class to run a collection of TunableLoops."""
172- script_name = None # Script name for locking and database user. Override.
173- tunable_loops = None # Collection of TunableLoops. Override.
174- continue_on_failure = False # If True, an exception in a tunable loop
175- # does not cause the script to abort.
176+ script_name = None # Script name for locking and database user. Override.
177+ tunable_loops = None # Collection of TunableLoops. Override.
178+ continue_on_failure = False # If True, an exception in a tunable loop
179+ # does not cause the script to abort.
180
181 # Default run time of the script in seconds. Override.
182 default_abort_script_time = None
183@@ -915,7 +906,7 @@
184 for count in range(0, self.options.threads):
185 thread = threading.Thread(
186 target=self.run_tasks_in_thread,
187- name='Worker-%d' % (count+1,),
188+ name='Worker-%d' % (count + 1,),
189 args=(tunable_loops,))
190 thread.start()
191 threads.add(thread)
192@@ -949,7 +940,7 @@
193
194 @property
195 def script_timeout(self):
196- a_very_long_time = 31536000 # 1 year
197+ a_very_long_time = 31536000 # 1 year
198 return self.options.abort_script or a_very_long_time
199
200 def get_loop_logger(self, loop_name):
201@@ -962,7 +953,7 @@
202 loop_logger = logging.getLogger('garbo.' + loop_name)
203 for filter in loop_logger.filters:
204 if isinstance(filter, PrefixFilter):
205- return loop_logger # Already have a PrefixFilter attached.
206+ return loop_logger # Already have a PrefixFilter attached.
207 loop_logger.addFilter(PrefixFilter(loop_name))
208 return loop_logger
209
210@@ -1034,7 +1025,7 @@
211 loop_logger.debug3(
212 "Unable to acquire lock %s. Running elsewhere?",
213 loop_lock_path)
214- time.sleep(0.3) # Avoid spinning.
215+ time.sleep(0.3) # Avoid spinning.
216 tunable_loops.append(tunable_loop_class)
217 # Otherwise, emit a warning and skip the task.
218 else:
219@@ -1073,16 +1064,38 @@
220 transaction.abort()
221
222
223-class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
224- script_name = 'garbo-hourly'
225+class FrequentDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
226+ """Run every 5 minutes.
227+
228+ This may become even more frequent in the future.
229+
230+ Jobs with low overhead can go here to distribute work more evenly.
231+ """
232+ script_name = 'garbo-frequently'
233 tunable_loops = [
234- MirrorBugMessageOwner,
235+ BugSummaryJournalRollup,
236 OAuthNoncePruner,
237 OpenIDConsumerNoncePruner,
238 OpenIDConsumerAssociationPruner,
239+ AntiqueSessionPruner,
240+ ]
241+ experimental_tunable_loops = []
242+
243+ # 5 minmutes minus 20 seconds for cleanup. This helps ensure the
244+ # script is fully terminated before the next scheduled hourly run
245+ # kicks in.
246+ default_abort_script_time = 60 * 5 - 20
247+
248+
249+class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
250+ """Run every hour.
251+
252+ Jobs we want to run fairly often but have noticable overhead go here.
253+ """
254+ script_name = 'garbo-hourly'
255+ tunable_loops = [
256 RevisionCachePruner,
257 BugWatchScheduler,
258- AntiqueSessionPruner,
259 UnusedSessionPruner,
260 DuplicateSessionPruner,
261 BugHeatUpdater,
262@@ -1095,6 +1108,13 @@
263
264
265 class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
266+ """Run every day.
267+
268+ Jobs that don't need to be run frequently.
269+
270+ If there is low overhead, consider putting these tasks in more
271+ frequently invoked lists to distribute the work more evenly.
272+ """
273 script_name = 'garbo-daily'
274 tunable_loops = [
275 BranchJobPruner,
276
277=== modified file 'lib/lp/scripts/tests/test_garbo.py'
278--- lib/lp/scripts/tests/test_garbo.py 2011-07-05 05:46:02 +0000
279+++ lib/lp/scripts/tests/test_garbo.py 2011-07-29 13:44:41 +0000
280@@ -24,6 +24,10 @@
281 Storm,
282 )
283 from storm.store import Store
284+from testtools.matchers import (
285+ Equals,
286+ GreaterThan,
287+ )
288 import transaction
289 from zope.component import getUtility
290 from zope.security.proxy import removeSecurityProxy
291@@ -55,7 +59,6 @@
292 LaunchpadZopelessLayer,
293 ZopelessDatabaseLayer,
294 )
295-from lp.bugs.model.bugmessage import BugMessage
296 from lp.bugs.model.bugnotification import (
297 BugNotification,
298 BugNotificationRecipient,
299@@ -81,6 +84,7 @@
300 BulkPruner,
301 DailyDatabaseGarbageCollector,
302 DuplicateSessionPruner,
303+ FrequentDatabaseGarbageCollector,
304 HourlyDatabaseGarbageCollector,
305 OpenIDConsumerAssociationPruner,
306 UnusedSessionPruner,
307@@ -359,12 +363,23 @@
308 # starting us in a known state.
309 self.runDaily()
310 self.runHourly()
311+ self.runFrequently()
312
313 # Capture garbo log output to tests can examine it.
314 self.log_buffer = StringIO()
315 handler = logging.StreamHandler(self.log_buffer)
316 self.log.addHandler(handler)
317
318+ def runFrequently(self, maximum_chunk_size=2, test_args=()):
319+ transaction.commit()
320+ LaunchpadZopelessLayer.switchDbUser('garbo_daily')
321+ collector = FrequentDatabaseGarbageCollector(
322+ test_args=list(test_args))
323+ collector._maximum_chunk_size = maximum_chunk_size
324+ collector.logger = self.log
325+ collector.main()
326+ return collector
327+
328 def runDaily(self, maximum_chunk_size=2, test_args=()):
329 transaction.commit()
330 LaunchpadZopelessLayer.switchDbUser('garbo_daily')
331@@ -385,10 +400,10 @@
332 def test_OAuthNoncePruner(self):
333 now = datetime.now(UTC)
334 timestamps = [
335- now - timedelta(days=2), # Garbage
336- now - timedelta(days=1) - timedelta(seconds=60), # Garbage
337- now - timedelta(days=1) + timedelta(seconds=60), # Not garbage
338- now, # Not garbage
339+ now - timedelta(days=2), # Garbage
340+ now - timedelta(days=1) - timedelta(seconds=60), # Garbage
341+ now - timedelta(days=1) + timedelta(seconds=60), # Not garbage
342+ now, # Not garbage
343 ]
344 LaunchpadZopelessLayer.switchDbUser('testadmin')
345 store = IMasterStore(OAuthNonce)
346@@ -399,14 +414,15 @@
347 for timestamp in timestamps:
348 store.add(OAuthNonce(
349 access_token=OAuthAccessToken.get(1),
350- request_timestamp = timestamp,
351- nonce = str(timestamp)))
352+ request_timestamp=timestamp,
353+ nonce=str(timestamp)))
354 transaction.commit()
355
356 # Make sure we have 4 nonces now.
357 self.failUnlessEqual(store.find(OAuthNonce).count(), 4)
358
359- self.runHourly(maximum_chunk_size=60) # 1 minute maximum chunk size
360+ self.runFrequently(
361+ maximum_chunk_size=60) # 1 minute maximum chunk size
362
363 store = IMasterStore(OAuthNonce)
364
365@@ -428,10 +444,10 @@
366 HOURS = 60 * 60
367 DAYS = 24 * HOURS
368 timestamps = [
369- now - 2 * DAYS, # Garbage
370- now - 1 * DAYS - 1 * MINUTES, # Garbage
371- now - 1 * DAYS + 1 * MINUTES, # Not garbage
372- now, # Not garbage
373+ now - 2 * DAYS, # Garbage
374+ now - 1 * DAYS - 1 * MINUTES, # Garbage
375+ now - 1 * DAYS + 1 * MINUTES, # Not garbage
376+ now, # Not garbage
377 ]
378 LaunchpadZopelessLayer.switchDbUser('testadmin')
379
380@@ -449,7 +465,7 @@
381 self.failUnlessEqual(store.find(OpenIDConsumerNonce).count(), 4)
382
383 # Run the garbage collector.
384- self.runHourly(maximum_chunk_size=60) # 1 minute maximum chunks.
385+ self.runFrequently(maximum_chunk_size=60) # 1 minute maximum chunks.
386
387 store = IMasterStore(OpenIDConsumerNonce)
388
389@@ -458,7 +474,8 @@
390
391 # And none of them are older than 1 day
392 earliest = store.find(Min(OpenIDConsumerNonce.timestamp)).one()
393- self.failUnless(earliest >= now - 24*60*60, 'Still have old nonces')
394+ self.failUnless(
395+ earliest >= now - 24 * 60 * 60, 'Still have old nonces')
396
397 def test_CodeImportResultPruner(self):
398 now = datetime.now(UTC)
399@@ -485,7 +502,7 @@
400
401 new_code_import_result(now - timedelta(days=60))
402 for i in range(results_to_keep_count - 1):
403- new_code_import_result(now - timedelta(days=19+i))
404+ new_code_import_result(now - timedelta(days=19 + i))
405
406 # Run the garbage collector
407 self.runDaily()
408@@ -558,7 +575,7 @@
409 store.execute("""
410 INSERT INTO %s (server_url, handle, issued, lifetime)
411 VALUES (%s, %s, %d, %d)
412- """ % (table_name, str(delta), str(delta), now-10, delta))
413+ """ % (table_name, str(delta), str(delta), now - 10, delta))
414 transaction.commit()
415
416 # Ensure that we created at least one expirable row (using the
417@@ -571,7 +588,7 @@
418
419 # Expire all those expirable rows, and possibly a few more if this
420 # test is running slow.
421- self.runHourly()
422+ self.runFrequently()
423
424 LaunchpadZopelessLayer.switchDbUser('testadmin')
425 store = store_selector.get(MAIN_STORE, MASTER_FLAVOR)
426@@ -879,21 +896,22 @@
427
428 self.assertEqual(1, count)
429
430- def test_mirror_bugmessages(self):
431- # Nuke the owner in sampledata.
432- con = DatabaseLayer._db_fixture.superuser_connection()
433- try:
434- cur = con.cursor()
435- cur.execute("ALTER TABLE bugmessage "
436- "DISABLE TRIGGER bugmessage__owner__mirror")
437- cur.execute("UPDATE bugmessage set owner=NULL")
438- cur.execute("ALTER TABLE bugmessage "
439- "ENABLE TRIGGER bugmessage__owner__mirror")
440- con.commit()
441- finally:
442- con.close()
443- store = IMasterStore(BugMessage)
444- unmigrated = store.find(BugMessage, BugMessage.ownerID==None).count
445- self.assertNotEqual(0, unmigrated())
446- self.runHourly()
447- self.assertEqual(0, unmigrated())
448+ def test_BugSummaryJournalRollup(self):
449+ LaunchpadZopelessLayer.switchDbUser('testadmin')
450+ store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
451+
452+ # Generate a load of entries in BugSummaryJournal.
453+ store.execute("UPDATE BugTask SET status=42")
454+
455+ # We only need a few to test.
456+ num_rows = store.execute(
457+ "SELECT COUNT(*) FROM BugSummaryJournal").get_one()[0]
458+ self.assertThat(num_rows, GreaterThan(10))
459+
460+ self.runFrequently()
461+
462+ # We just care that the rows have been removed. The bugsummary
463+ # tests confirm that the rollup stored method is working correctly.
464+ num_rows = store.execute(
465+ "SELECT COUNT(*) FROM BugSummaryJournal").get_one()[0]
466+ self.assertThat(num_rows, Equals(0))

Subscribers

People subscribed via source and target branches

to status/vote changes: