Merge lp:~wallyworld/launchpad/another-reporting-cache-garbojob-1071581 into lp:launchpad

Proposed by Ian Booth
Status: Merged
Approved by: William Grant
Approved revision: no longer in the source branch.
Merged at revision: 16266
Proposed branch: lp:~wallyworld/launchpad/another-reporting-cache-garbojob-1071581
Merge into: lp:launchpad
Diff against target: 848 lines (+263/-189)
10 files modified
lib/lp/registry/browser/tests/test_person.py (+3/-3)
lib/lp/registry/model/person.py (+15/-16)
lib/lp/scripts/garbo.py (+155/-137)
lib/lp/scripts/tests/test_garbo.py (+14/-16)
lib/lp/services/database/bulk.py (+5/-4)
lib/lp/services/database/stormexpr.py (+58/-0)
lib/lp/soyuz/configure.zcml (+2/-2)
lib/lp/soyuz/interfaces/reporting.py (+3/-3)
lib/lp/soyuz/model/reporting.py (+6/-6)
lib/lp/soyuz/stories/soyuz/xx-person-packages.txt (+2/-2)
To merge this branch: bzr merge lp:~wallyworld/launchpad/another-reporting-cache-garbojob-1071581
Reviewer Review Type Date Requested Status
William Grant (community) code Approve
Review via email: mp+133859@code.launchpad.net

Commit message

Improve performance of latest source package release cache garbo job.

Description of the change

== Implementation ==

The previous version of the job selected single, distinct records to insert in the cache table, thus the query used suffered similar performance issues to the query which pulled the data live.

This version of the job iterates over SPPH records, ordered by id, starting at the watermark of the last processed spph id. The query joins the SPR table to pick up published SPRs.

A cache record update is done, using Greatest() for the date_uploaded column. Any records not existing already are inserted.

Hopefully this will run a lot faster than the previous version.

== Tests ==

Existing tests suffice.

== Lint ==

Linting changed files:
  lib/lp/scripts/garbo.py
  lib/lp/scripts/tests/test_garbo.py

To post a comment you must log in.
Revision history for this message
Curtis Hovey (sinzui) wrote :

I don't think you wan to disable half of the production garbojobs or the threading. I think you need to uncomment them to restore the garbo features.

Revision history for this message
Ian Booth (wallyworld) wrote :

Bollocks. That was so I could debug. I forgot to uncomment it.

On Mon 12 Nov 2012 23:47:31 EST, Curtis Hovey wrote:
> I don't think you wan to disable half of the production garbojobs or the threading. I think you need to uncomment them to restore the garbo features.

Revision history for this message
William Grant (wgrant) wrote :

73 spph = ClassAlias(SourcePackagePublishingHistory, "spph")

I don't understand why this needs to be a ClassAlias; if you just want a shorter Python name for the class without aliasing in SQL then 'spph = SourcePackagePublishingHistory' is fine and relatively idiomatic.

113 + SourcePackageRelease.dateuploaded, Alias(spph.id, 'spph_id')),

The custom alias seems pointless here.

114 + spph.id > self.next_spph_id

That sounds, then, rather like it's actually last_spph_id.

132 + return self.getPendingUpdates().count() == 0

We don't care about the exact count. is_empty() is cheaper and does what we want.

134 - def update_cache(self, updates):
135 + def update_cache(self, update, inserts):

This should be updateCache.

167 + def perform_update(spr_id, creator_id, maintainer_id, archive_id,
168 + purpose, distroseries_id, spn_id, dateuploaded,
169 + spph_id):

The update here is perhaps slightly excessive. The only fields that can ever change are publication, date_uploaded, and sourcepackagerelease, and it will overwrite all but date_uploaded even if the old record is newer. I suspect we want the update to be conditional on its dateuploaded being newer (for correctness), and to avoid setting the immutable columns (for compactness and efficiency).

Additionally, you probably want to aggregate the inserts; I think the current code will crash if two of the same key show up new in a single batch. There's also likely to be benefit in applying the same aggregation technique to updates, not to avoid crashes but to avoid duplicating work. update_cache will likely end up compact enough to be inlined.

It's probably also cleaner to reword the update as a self.store.find(LPSPRC, LPSPRC.upload_archive_id == archive_id, [...], LPSPRC.dateuploaded < dateuploaded).set(dateuploaded=dateuploaded, [...]).

I'd lastly like to see the Storm class renamed to LatestPersonSourcePackageReleaseCache (note the 'Package' rather than 'package'), as the existing compound concept capitalisation scheme was phased out and exterminated from the codebase in around 2005.

209 + for update in (self.getPendingUpdates()[:chunk_size]):

Extraneous parens are extraneous.

245 - self.runDaily()
246 - self.runHourly()
247 +# self.runDaily()
248 +# self.runHourly()

This seems like an accidental change.

review: Approve (code)
Revision history for this message
William Grant (wgrant) wrote :

You'll also greatly benefit from ordering the getPendingUpdates joins the way we want the query to be executed: SPPH, SPR, then Archive.

And it might be worth looking at batching the updates (using a CASE expression is the best way, sadly), but that's a far less critical optimisation.

Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/registry/browser/tests/test_person.py'
2--- lib/lp/registry/browser/tests/test_person.py 2012-11-08 05:55:16 +0000
3+++ lib/lp/registry/browser/tests/test_person.py 2012-11-14 07:45:27 +0000
4@@ -37,7 +37,7 @@
5 )
6 from lp.registry.model.karma import KarmaCategory
7 from lp.registry.model.milestone import milestone_sort_key
8-from lp.scripts.garbo import PopulateLatestPersonSourcepackageReleaseCache
9+from lp.scripts.garbo import PopulateLatestPersonSourcePackageReleaseCache
10 from lp.services.config import config
11 from lp.services.features.testing import FeatureFixture
12 from lp.services.identity.interfaces.account import AccountStatus
13@@ -849,7 +849,7 @@
14 spphs.append(spph)
15 # Update the releases cache table.
16 switch_dbuser('garbo_frequently')
17- job = PopulateLatestPersonSourcepackageReleaseCache(FakeLogger())
18+ job = PopulateLatestPersonSourcePackageReleaseCache(FakeLogger())
19 while not job.isDone():
20 job(chunk_size=100)
21 switch_dbuser('launchpad')
22@@ -1062,7 +1062,7 @@
23 self.build.archive = publisher.distroseries.main_archive
24 # Update the releases cache table.
25 switch_dbuser('garbo_frequently')
26- job = PopulateLatestPersonSourcepackageReleaseCache(FakeLogger())
27+ job = PopulateLatestPersonSourcePackageReleaseCache(FakeLogger())
28 while not job.isDone():
29 job(chunk_size=100)
30 switch_dbuser('launchpad')
31
32=== modified file 'lib/lp/registry/model/person.py'
33--- lib/lp/registry/model/person.py 2012-11-08 05:35:59 +0000
34+++ lib/lp/registry/model/person.py 2012-11-14 07:45:27 +0000
35@@ -328,7 +328,7 @@
36 Archive,
37 validate_ppa,
38 )
39-from lp.soyuz.model.reporting import LatestPersonSourcepackageReleaseCache
40+from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache
41 from lp.soyuz.model.publishing import SourcePackagePublishingHistory
42 from lp.soyuz.model.sourcepackagerelease import SourcePackageRelease
43 from lp.translations.model.hastranslationimports import (
44@@ -2270,8 +2270,8 @@
45 ('QuestionSubscription', 'person'),
46 ('SpecificationSubscription', 'person'),
47 ('AnswerContact', 'person'),
48- ('LatestPersonSourcepackageReleaseCache', 'creator'),
49- ('LatestPersonSourcepackageReleaseCache', 'maintainer')]
50+ ('LatestPersonSourcePackageReleaseCache', 'creator'),
51+ ('LatestPersonSourcePackageReleaseCache', 'maintainer')]
52 cur = cursor()
53 for table, person_id_column in removals:
54 cur.execute("DELETE FROM %s WHERE %s=%d"
55@@ -2838,33 +2838,33 @@
56 clauses = []
57 if uploader_only:
58 clauses.append(
59- LatestPersonSourcepackageReleaseCache.creator_id == self.id)
60+ LatestPersonSourcePackageReleaseCache.creator_id == self.id)
61 if ppa_only:
62 # Source maintainer is irrelevant for PPA uploads.
63 pass
64 elif uploader_only:
65- lpspr = ClassAlias(LatestPersonSourcepackageReleaseCache, 'lpspr')
66+ lpspr = ClassAlias(LatestPersonSourcePackageReleaseCache, 'lpspr')
67 clauses.append(Not(Exists(Select(1,
68 where=And(
69 lpspr.sourcepackagename_id ==
70- LatestPersonSourcepackageReleaseCache.sourcepackagename_id,
71+ LatestPersonSourcePackageReleaseCache.sourcepackagename_id,
72 lpspr.upload_archive_id ==
73- LatestPersonSourcepackageReleaseCache.upload_archive_id,
74+ LatestPersonSourcePackageReleaseCache.upload_archive_id,
75 lpspr.upload_distroseries_id ==
76- LatestPersonSourcepackageReleaseCache.upload_distroseries_id,
77+ LatestPersonSourcePackageReleaseCache.upload_distroseries_id,
78 lpspr.archive_purpose != ArchivePurpose.PPA,
79 lpspr.maintainer_id == self.id),
80 tables=lpspr))))
81 else:
82 clauses.append(
83- LatestPersonSourcepackageReleaseCache.maintainer_id == self.id)
84+ LatestPersonSourcePackageReleaseCache.maintainer_id == self.id)
85 if ppa_only:
86 clauses.append(
87- LatestPersonSourcepackageReleaseCache.archive_purpose ==
88+ LatestPersonSourcePackageReleaseCache.archive_purpose ==
89 ArchivePurpose.PPA)
90 else:
91 clauses.append(
92- LatestPersonSourcepackageReleaseCache.archive_purpose !=
93+ LatestPersonSourcePackageReleaseCache.archive_purpose !=
94 ArchivePurpose.PPA)
95 return clauses
96
97@@ -2876,8 +2876,8 @@
98 return self._legacy_hasReleasesQuery(uploader_only, ppa_only)
99
100 clauses = self._releasesQueryFilter(uploader_only, ppa_only)
101- rs = Store.of(self).using(LatestPersonSourcepackageReleaseCache).find(
102- LatestPersonSourcepackageReleaseCache.publication_id, *clauses)
103+ rs = Store.of(self).using(LatestPersonSourcePackageReleaseCache).find(
104+ LatestPersonSourcePackageReleaseCache.publication_id, *clauses)
105 return not rs.is_empty()
106
107 def _latestReleasesQuery(self, uploader_only=False, ppa_only=False):
108@@ -2889,8 +2889,8 @@
109
110 clauses = self._releasesQueryFilter(uploader_only, ppa_only)
111 rs = Store.of(self).find(
112- LatestPersonSourcepackageReleaseCache, *clauses).order_by(
113- Desc(LatestPersonSourcepackageReleaseCache.dateuploaded))
114+ LatestPersonSourcePackageReleaseCache, *clauses).order_by(
115+ Desc(LatestPersonSourcePackageReleaseCache.dateuploaded))
116
117 def load_related_objects(rows):
118 if rows and rows[0].maintainer_id:
119@@ -2904,7 +2904,6 @@
120
121 return DecoratedResultSet(rs, pre_iter_hook=load_related_objects)
122
123-
124 def _legacy_releasesQueryFilter(self, uploader_only=False, ppa_only=False):
125 """Return the filter used to find sourcepackagereleases (SPRs)
126 related to this person.
127
128=== modified file 'lib/lp/scripts/garbo.py'
129--- lib/lp/scripts/garbo.py 2012-11-09 14:18:45 +0000
130+++ lib/lp/scripts/garbo.py 2012-11-14 07:45:27 +0000
131@@ -31,22 +31,19 @@
132 from psycopg2 import IntegrityError
133 import pytz
134 from storm.expr import (
135- Alias,
136 And,
137- Desc,
138 In,
139- Insert,
140 Join,
141 Like,
142+ Max,
143+ Min,
144+ Or,
145+ Row,
146 Select,
147+ SQL,
148 Update,
149 )
150 from storm.info import ClassAlias
151-from storm.locals import (
152- Max,
153- Min,
154- SQL,
155- )
156 from storm.store import EmptyResultSet
157 import transaction
158 from zope.component import getUtility
159@@ -75,6 +72,10 @@
160 from lp.registry.model.product import Product
161 from lp.services.config import config
162 from lp.services.database import postgresql
163+from lp.services.database.bulk import (
164+ create,
165+ dbify_value,
166+ )
167 from lp.services.database.constants import UTC_NOW
168 from lp.services.database.interfaces import (
169 IStoreSelector,
170@@ -87,6 +88,10 @@
171 session_store,
172 sqlvalues,
173 )
174+from lp.services.database.stormexpr import (
175+ BulkUpdate,
176+ Values,
177+ )
178 from lp.services.features import (
179 getFeatureFlag,
180 install_feature_controller,
181@@ -116,7 +121,7 @@
182 from lp.services.verification.model.logintoken import LoginToken
183 from lp.soyuz.model.archive import Archive
184 from lp.soyuz.model.publishing import SourcePackagePublishingHistory
185-from lp.soyuz.model.reporting import LatestPersonSourcepackageReleaseCache
186+from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache
187 from lp.soyuz.model.sourcepackagerelease import SourcePackageRelease
188 from lp.translations.interfaces.potemplate import IPOTemplateSet
189 from lp.translations.model.potmsgset import POTMsgSet
190@@ -463,164 +468,177 @@
191 transaction.commit()
192
193
194-class PopulateLatestPersonSourcepackageReleaseCache(TunableLoop):
195- """Populate the LatestPersonSourcepackageReleaseCache table.
196+class PopulateLatestPersonSourcePackageReleaseCache(TunableLoop):
197+ """Populate the LatestPersonSourcePackageReleaseCache table.
198
199- The LatestPersonSourcepackageReleaseCache contains 2 sets of data, one set
200- for package maintainers and another for package creators. This job first
201- populates the creator data and then does the maintainer data.
202+ The LatestPersonSourcePackageReleaseCache contains 2 sets of data, one set
203+ for package maintainers and another for package creators. This job iterates
204+ over the SPPH records, populating the cache table.
205 """
206 maximum_chunk_size = 1000
207
208+ cache_columns = (
209+ LatestPersonSourcePackageReleaseCache.maintainer_id,
210+ LatestPersonSourcePackageReleaseCache.creator_id,
211+ LatestPersonSourcePackageReleaseCache.upload_archive_id,
212+ LatestPersonSourcePackageReleaseCache.upload_distroseries_id,
213+ LatestPersonSourcePackageReleaseCache.sourcepackagename_id,
214+ LatestPersonSourcePackageReleaseCache.archive_purpose,
215+ LatestPersonSourcePackageReleaseCache.publication_id,
216+ LatestPersonSourcePackageReleaseCache.dateuploaded,
217+ LatestPersonSourcePackageReleaseCache.sourcepackagerelease_id,
218+ )
219+
220 def __init__(self, log, abort_time=None):
221- super_cl = super(PopulateLatestPersonSourcepackageReleaseCache, self)
222+ super_cl = super(PopulateLatestPersonSourcePackageReleaseCache, self)
223 super_cl.__init__(log, abort_time)
224 self.store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
225 # Keep a record of the processed source package release id and data
226 # type (creator or maintainer) so we know where to job got up to.
227- self.next_id_for_creator = 0
228- self.next_id_for_maintainer = 0
229- self.current_person_filter_type = 'creator'
230- self.starting_person_filter_type = self.current_person_filter_type
231+ self.last_spph_id = 0
232 self.job_name = self.__class__.__name__
233 job_data = load_garbo_job_state(self.job_name)
234 if job_data:
235- self.next_id_for_creator = job_data['next_id_for_creator']
236- self.next_id_for_maintainer = job_data['next_id_for_maintainer']
237- self.current_person_filter_type = job_data['person_filter_type']
238- self.starting_person_filter_type = self.current_person_filter_type
239+ self.last_spph_id = job_data.get('last_spph_id', 0)
240
241 def getPendingUpdates(self):
242- # Load the latest published source package release data keyed on either
243- # creator or maintainer as required.
244- if self.current_person_filter_type == 'creator':
245- person_filter = SourcePackageRelease.creatorID
246- next_id = self.next_id_for_creator
247- else:
248- person_filter = SourcePackageRelease.maintainerID
249- next_id = self.next_id_for_maintainer
250- spph = ClassAlias(SourcePackagePublishingHistory, "spph")
251+ # Load the latest published source package release data.
252+ spph = SourcePackagePublishingHistory
253 origin = [
254 SourcePackageRelease,
255 Join(
256 spph,
257 And(spph.sourcepackagereleaseID == SourcePackageRelease.id,
258- spph.archiveID == SourcePackageRelease.upload_archiveID))]
259- spr_select = self.store.using(*origin).find(
260- (SourcePackageRelease.id, Alias(spph.id, 'spph_id')),
261- SourcePackageRelease.id > next_id
262- ).order_by(
263- person_filter,
264- SourcePackageRelease.upload_distroseriesID,
265- SourcePackageRelease.sourcepackagenameID,
266- SourcePackageRelease.upload_archiveID,
267- Desc(SourcePackageRelease.dateuploaded),
268- SourcePackageRelease.id
269- ).config(distinct=(
270- person_filter,
271- SourcePackageRelease.upload_distroseriesID,
272- SourcePackageRelease.sourcepackagenameID,
273- SourcePackageRelease.upload_archiveID))._get_select()
274-
275- spr = Alias(spr_select, 'spr')
276- origin = [
277- SourcePackageRelease,
278- Join(spr, SQL('spr.id') == SourcePackageRelease.id),
279- Join(Archive, Archive.id == SourcePackageRelease.upload_archiveID)]
280+ spph.archiveID == SourcePackageRelease.upload_archiveID)),
281+ Join(Archive, Archive.id == spph.archiveID)]
282 rs = self.store.using(*origin).find(
283 (SourcePackageRelease.id,
284- person_filter,
285+ SourcePackageRelease.creatorID,
286+ SourcePackageRelease.maintainerID,
287 SourcePackageRelease.upload_archiveID,
288 Archive.purpose,
289 SourcePackageRelease.upload_distroseriesID,
290 SourcePackageRelease.sourcepackagenameID,
291- SourcePackageRelease.dateuploaded, SQL('spph_id'))
292- ).order_by(SourcePackageRelease.id)
293+ SourcePackageRelease.dateuploaded, spph.id),
294+ spph.id > self.last_spph_id
295+ ).order_by(spph.id)
296 return rs
297
298 def isDone(self):
299- # If there is no more data to process for creators, switch over to
300- # processing data for maintainers, or visa versa.
301- current_count = self.getPendingUpdates().count()
302- if current_count == 0:
303- if (self.current_person_filter_type !=
304- self.starting_person_filter_type):
305- return True
306- if self.current_person_filter_type == 'creator':
307- self.current_person_filter_type = 'maintainer'
308- else:
309- self.current_person_filter_type = 'creator'
310- current_count = self.getPendingUpdates().count()
311- return current_count == 0
312-
313- def update_cache(self, updates):
314- # Update the LatestPersonSourcepackageReleaseCache table. Records for
315- # each creator/maintainer will either be new inserts or updates. We try
316- # to update first, and gather data for missing (new) records along the
317- # way. At the end, a bulk insert is done for any new data.
318- # Updates is a list of data records (tuples of values).
319- # Each record is keyed on:
320- # - (creator/maintainer), archive, distroseries, sourcepackagename
321- inserts = []
322- columns = (
323- LatestPersonSourcepackageReleaseCache.sourcepackagerelease_id,
324- LatestPersonSourcepackageReleaseCache.creator_id,
325- LatestPersonSourcepackageReleaseCache.maintainer_id,
326- LatestPersonSourcepackageReleaseCache.upload_archive_id,
327- LatestPersonSourcepackageReleaseCache.archive_purpose,
328- LatestPersonSourcepackageReleaseCache.upload_distroseries_id,
329- LatestPersonSourcepackageReleaseCache.sourcepackagename_id,
330- LatestPersonSourcepackageReleaseCache.dateuploaded,
331- LatestPersonSourcepackageReleaseCache.publication_id,
332- )
333- for update in updates:
334- (spr_id, person_id, archive_id, purpose,
335- distroseries_id, spn_id, dateuploaded, spph_id) = update
336- if self.current_person_filter_type == 'creator':
337- creator_id = person_id
338- maintainer_id = None
339- else:
340- creator_id = None
341- maintainer_id = person_id
342- values = (
343- spr_id, creator_id, maintainer_id, archive_id, purpose.value,
344- distroseries_id, spn_id, dateuploaded, spph_id)
345- data = dict(zip(columns, values))
346- result = self.store.execute(Update(
347- data, And(
348- LatestPersonSourcepackageReleaseCache.upload_archive_id ==
349- archive_id,
350- LatestPersonSourcepackageReleaseCache.upload_distroseries_id ==
351- distroseries_id,
352- LatestPersonSourcepackageReleaseCache.sourcepackagename_id ==
353- spn_id,
354- LatestPersonSourcepackageReleaseCache.creator_id ==
355- creator_id,
356- LatestPersonSourcepackageReleaseCache.maintainer_id ==
357- maintainer_id)))
358- if result.rowcount == 0:
359- inserts.append(values)
360+ return self.getPendingUpdates().is_empty()
361+
362+ def __call__(self, chunk_size):
363+ cache_filter_data = []
364+ new_records = dict()
365+ # Create a map of new published spr data for creators and maintainers.
366+ # The map is keyed on (creator/maintainer, archive, spn, distroseries).
367+ for new_published_spr_data in self.getPendingUpdates()[:chunk_size]:
368+ (spr_id, creator_id, maintainer_id, archive_id, purpose,
369+ distroseries_id, spn_id, dateuploaded,
370+ spph_id) = new_published_spr_data
371+ cache_filter_data.append((archive_id, distroseries_id, spn_id))
372+
373+ value = (purpose, spph_id, dateuploaded, spr_id)
374+ maintainer_key = (
375+ maintainer_id, None, archive_id, distroseries_id, spn_id)
376+ creator_key = (
377+ None, creator_id, archive_id, distroseries_id, spn_id)
378+ new_records[maintainer_key] = maintainer_key + value
379+ new_records[creator_key] = creator_key + value
380+ self.last_spph_id = spph_id
381+
382+ # Gather all the current cached reporting records corresponding to the
383+ # data in the current batch. We select matching records from the
384+ # reporting cache table based on
385+ # (archive_id, distroseries_id, sourcepackagename_id).
386+ existing_records = dict()
387+ lpsprc = LatestPersonSourcePackageReleaseCache
388+ rs = self.store.find(
389+ lpsprc,
390+ In(
391+ Row(
392+ lpsprc.upload_archive_id,
393+ lpsprc.upload_distroseries_id,
394+ lpsprc.sourcepackagename_id),
395+ map(Row, cache_filter_data)))
396+ for lpsprc_record in rs:
397+ key = (
398+ lpsprc_record.maintainer_id,
399+ lpsprc_record.creator_id,
400+ lpsprc_record.upload_archive_id,
401+ lpsprc_record.upload_distroseries_id,
402+ lpsprc_record.sourcepackagename_id)
403+ existing_records[key] = pytz.UTC.localize(
404+ lpsprc_record.dateuploaded)
405+
406+ # Figure out what records from the new published spr data need to be
407+ # inserted and updated into the cache table.
408+ inserts = dict()
409+ updates = dict()
410+ for key, new_published_spr_data in new_records.items():
411+ existing_dateuploaded = existing_records.get(key, None)
412+ new_dateuploaded = new_published_spr_data[7]
413+ if existing_dateuploaded is None:
414+ target = inserts
415+ else:
416+ target = updates
417+
418+ existing_action = target.get(key, None)
419+ if (existing_action is None
420+ or existing_action[7] < new_dateuploaded):
421+ target[key] = new_published_spr_data
422+
423 if inserts:
424- self.store.execute(Insert(columns, values=inserts))
425-
426- def __call__(self, chunk_size):
427- max_id = None
428- updates = []
429- for update in (self.getPendingUpdates()[:chunk_size]):
430- updates.append(update)
431- max_id = update[0]
432- self.update_cache(updates)
433-
434- if max_id:
435- if self.current_person_filter_type == 'creator':
436- self.next_id_for_creator = max_id
437- else:
438- self.next_id_for_maintainer = max_id
439+ # Do a bulk insert.
440+ create(self.cache_columns, inserts.values())
441+ if updates:
442+ # Do a bulk update.
443+ cols = [
444+ ("maintainer", "integer"),
445+ ("creator", "integer"),
446+ ("upload_archive", "integer"),
447+ ("upload_distroseries", "integer"),
448+ ("sourcepackagename", "integer"),
449+ ("archive_purpose", "integer"),
450+ ("publication", "integer"),
451+ ("date_uploaded", "timestamp without time zone"),
452+ ("sourcepackagerelease", "integer"),
453+ ]
454+ values = [
455+ [dbify_value(col, val)[0]
456+ for (col, val) in zip(self.cache_columns, data)]
457+ for data in updates.values()]
458+
459+ cache_data_expr = Values('cache_data', cols, values)
460+ cache_data = ClassAlias(lpsprc, "cache_data")
461+
462+ # The columns to be updated.
463+ updated_columns = dict([
464+ (lpsprc.dateuploaded, cache_data.dateuploaded),
465+ (lpsprc.sourcepackagerelease_id,
466+ cache_data.sourcepackagerelease_id),
467+ (lpsprc.publication_id, cache_data.publication_id)])
468+ # The update filter.
469+ filter = And(
470+ Or(
471+ cache_data.creator_id == None,
472+ lpsprc.creator_id == cache_data.creator_id),
473+ Or(
474+ cache_data.maintainer_id == None,
475+ lpsprc.maintainer_id == cache_data.maintainer_id),
476+ lpsprc.upload_archive_id == cache_data.upload_archive_id,
477+ lpsprc.upload_distroseries_id ==
478+ cache_data.upload_distroseries_id,
479+ lpsprc.sourcepackagename_id == cache_data.sourcepackagename_id)
480+
481+ self.store.execute(
482+ BulkUpdate(
483+ updated_columns,
484+ table=LatestPersonSourcePackageReleaseCache,
485+ values=cache_data_expr, where=filter))
486 self.store.flush()
487 save_garbo_job_state(self.job_name, {
488- 'next_id_for_creator': self.next_id_for_creator,
489- 'next_id_for_maintainer': self.next_id_for_maintainer,
490- 'person_filter_type': self.current_person_filter_type})
491+ 'last_spph_id': self.last_spph_id})
492 transaction.commit()
493
494
495@@ -1560,7 +1578,7 @@
496 OpenIDConsumerAssociationPruner,
497 AntiqueSessionPruner,
498 VoucherRedeemer,
499- PopulateLatestPersonSourcepackageReleaseCache,
500+ PopulateLatestPersonSourcePackageReleaseCache,
501 ]
502 experimental_tunable_loops = []
503
504
505=== modified file 'lib/lp/scripts/tests/test_garbo.py'
506--- lib/lp/scripts/tests/test_garbo.py 2012-11-09 14:18:45 +0000
507+++ lib/lp/scripts/tests/test_garbo.py 2012-11-14 07:45:27 +0000
508@@ -114,7 +114,7 @@
509 from lp.services.verification.model.logintoken import LoginToken
510 from lp.services.worlddata.interfaces.language import ILanguageSet
511 from lp.soyuz.enums import PackagePublishingStatus
512-from lp.soyuz.model.reporting import LatestPersonSourcepackageReleaseCache
513+from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache
514 from lp.testing import (
515 FakeAdapterMixin,
516 person_logged_in,
517@@ -1166,7 +1166,7 @@
518 self.assertEqual(0, store.find(Product,
519 Product._information_type == None).count())
520
521- def test_PopulateLatestPersonSourcepackageReleaseCache(self):
522+ def test_PopulateLatestPersonSourcePackageReleaseCache(self):
523 switch_dbuser('testadmin')
524 # Make some same test data - we create published source package
525 # releases for 2 different creators and maintainers.
526@@ -1204,25 +1204,25 @@
527 creator=creators[1], maintainer=maintainers[1],
528 distroseries=distroseries, sourcepackagename=spn,
529 date_uploaded=datetime(2010, 12, 4, tzinfo=pytz.UTC))
530- self.factory.makeSourcePackagePublishingHistory(
531+ spph_1 = self.factory.makeSourcePackagePublishingHistory(
532 status=PackagePublishingStatus.PUBLISHED,
533 sourcepackagerelease=spr4)
534
535 transaction.commit()
536 self.runFrequently()
537
538- store = IMasterStore(LatestPersonSourcepackageReleaseCache)
539+ store = IMasterStore(LatestPersonSourcePackageReleaseCache)
540 # Check that the garbo state table has data.
541 self.assertIsNotNone(
542 store.execute(
543 'SELECT * FROM GarboJobState WHERE name=?',
544- params=[u'PopulateLatestPersonSourcepackageReleaseCache']
545+ params=[u'PopulateLatestPersonSourcePackageReleaseCache']
546 ).get_one())
547
548 def _assert_release_by_creator(creator, spr):
549 release_records = store.find(
550- LatestPersonSourcepackageReleaseCache,
551- LatestPersonSourcepackageReleaseCache.creator_id == creator.id)
552+ LatestPersonSourcePackageReleaseCache,
553+ LatestPersonSourcePackageReleaseCache.creator_id == creator.id)
554 [record] = list(release_records)
555 self.assertEqual(spr.creator, record.creator)
556 self.assertIsNone(record.maintainer_id)
557@@ -1231,8 +1231,8 @@
558
559 def _assert_release_by_maintainer(maintainer, spr):
560 release_records = store.find(
561- LatestPersonSourcepackageReleaseCache,
562- LatestPersonSourcepackageReleaseCache.maintainer_id ==
563+ LatestPersonSourcePackageReleaseCache,
564+ LatestPersonSourcePackageReleaseCache.maintainer_id ==
565 maintainer.id)
566 [record] = list(release_records)
567 self.assertEqual(spr.maintainer, record.maintainer)
568@@ -1246,9 +1246,8 @@
569 _assert_release_by_maintainer(maintainers[1], spr4)
570
571 job_data = load_garbo_job_state(
572- 'PopulateLatestPersonSourcepackageReleaseCache')
573- self.assertEqual(spr4.id, job_data['next_id_for_creator'])
574- self.assertEqual(spr4.id, job_data['next_id_for_maintainer'])
575+ 'PopulateLatestPersonSourcePackageReleaseCache')
576+ self.assertEqual(spph_1.id, job_data['last_spph_id'])
577
578 # Create a newer published source package release and ensure the
579 # release cache table is correctly updated.
580@@ -1257,7 +1256,7 @@
581 creator=creators[1], maintainer=maintainers[1],
582 distroseries=distroseries, sourcepackagename=spn,
583 date_uploaded=datetime(2010, 12, 5, tzinfo=pytz.UTC))
584- self.factory.makeSourcePackagePublishingHistory(
585+ spph_2 = self.factory.makeSourcePackagePublishingHistory(
586 status=PackagePublishingStatus.PUBLISHED,
587 sourcepackagerelease=spr5)
588
589@@ -1270,9 +1269,8 @@
590 _assert_release_by_maintainer(maintainers[1], spr5)
591
592 job_data = load_garbo_job_state(
593- 'PopulateLatestPersonSourcepackageReleaseCache')
594- self.assertEqual(spr5.id, job_data['next_id_for_creator'])
595- self.assertEqual(spr5.id, job_data['next_id_for_maintainer'])
596+ 'PopulateLatestPersonSourcePackageReleaseCache')
597+ self.assertEqual(spph_2.id, job_data['last_spph_id'])
598
599
600 class TestGarboTasks(TestCaseWithFactory):
601
602=== modified file 'lib/lp/services/database/bulk.py'
603--- lib/lp/services/database/bulk.py 2012-10-17 00:22:31 +0000
604+++ lib/lp/services/database/bulk.py 2012-11-14 07:45:27 +0000
605@@ -6,6 +6,7 @@
606 __metaclass__ = type
607 __all__ = [
608 'create',
609+ 'dbify_value',
610 'load',
611 'load_referencing',
612 'load_related',
613@@ -167,7 +168,7 @@
614 return load(object_type, keys)
615
616
617-def _dbify_value(col, val):
618+def dbify_value(col, val):
619 """Convert a value into a form that Storm can compile directly."""
620 if isinstance(val, SQL):
621 return (val,)
622@@ -184,7 +185,7 @@
623 return (col.variable_factory(value=val),)
624
625
626-def _dbify_column(col):
627+def dbify_column(col):
628 """Convert a column into a form that Storm can compile directly."""
629 if isinstance(col, Reference):
630 # References are mainly meant to be used as descriptors, so we
631@@ -207,7 +208,7 @@
632 :return: A list of the created objects if get_created, otherwise None.
633 """
634 # Flatten Reference faux-columns into their primary keys.
635- db_cols = list(chain.from_iterable(map(_dbify_column, columns)))
636+ db_cols = list(chain.from_iterable(map(dbify_column, columns)))
637 clses = set(col.cls for col in db_cols)
638 if len(clses) != 1:
639 raise ValueError(
640@@ -228,7 +229,7 @@
641 # squashed into primary key variables.
642 db_values = [
643 list(chain.from_iterable(
644- _dbify_value(col, val) for col, val in zip(columns, value)))
645+ dbify_value(col, val) for col, val in zip(columns, value)))
646 for value in values]
647
648 if get_objects or get_primary_keys:
649
650=== modified file 'lib/lp/services/database/stormexpr.py'
651--- lib/lp/services/database/stormexpr.py 2012-09-24 16:03:53 +0000
652+++ lib/lp/services/database/stormexpr.py 2012-11-14 07:45:27 +0000
653@@ -8,6 +8,7 @@
654 'ArrayAgg',
655 'ArrayContains',
656 'ArrayIntersects',
657+ 'BulkUpdate',
658 'ColumnSelect',
659 'Concatenate',
660 'CountDistinct',
661@@ -17,11 +18,14 @@
662 'NullCount',
663 'TryAdvisoryLock',
664 'Unnest',
665+ 'Values',
666 ]
667
668+from storm import Undef
669 from storm.exceptions import ClassInfoError
670 from storm.expr import (
671 BinaryOper,
672+ COLUMN_NAME,
673 ComparableExpr,
674 compile,
675 CompoundOper,
676@@ -31,6 +35,7 @@
677 NamedFunc,
678 Or,
679 SQL,
680+ TABLE,
681 )
682 from storm.info import (
683 get_cls_info,
684@@ -38,6 +43,59 @@
685 )
686
687
688+class BulkUpdate(Expr):
689+ # Perform a bulk table update using literal values.
690+ __slots__ = ("map", "where", "table", "values")
691+
692+ def __init__(self, map, table, values, where=Undef):
693+ self.map = map
694+ self.where = where
695+ self.table = table
696+ self.values = values
697+
698+
699+@compile.when(BulkUpdate)
700+def compile_bulkupdate(compile, update, state):
701+ pairs = update.map.items()
702+ state.push("context", COLUMN_NAME)
703+ col_names = [compile(col, state, token=True) for col, val in pairs]
704+ state.context = EXPR
705+ col_values = [compile(val, state) for col, val in pairs]
706+ sets = ["%s=%s" % (col, val) for col, val in zip(col_names, col_values)]
707+ state.context = TABLE
708+ tokens = ["UPDATE ", compile(update.table, state, token=True), " SET ",
709+ ", ".join(sets), " FROM "]
710+ state.context = EXPR
711+ # We don't want the values expression wrapped in parenthesis.
712+ state.precedence = 0
713+ tokens.append(compile(update.values, state, raw=True))
714+ if update.where is not Undef:
715+ tokens.append(" WHERE ")
716+ tokens.append(compile(update.where, state, raw=True))
717+ state.pop()
718+ return "".join(tokens)
719+
720+
721+class Values(Expr):
722+ __slots__ = ("name", "cols", "values")
723+
724+ def __init__(self, name, cols, values):
725+ self.name = name
726+ self.cols = cols
727+ self.values = values
728+
729+
730+@compile.when(Values)
731+def compile_values(compile, expr, state):
732+ col_names, col_types = zip(*expr.cols)
733+ first_row = ", ".join(
734+ "%s::%s" % (compile(value, state), type)
735+ for value, type in zip(expr.values[0], col_types))
736+ rows = [first_row] + [compile(value, state) for value in expr.values[1:]]
737+ return "(VALUES (%s)) AS %s(%s)" % (
738+ "), (".join(rows), expr.name, ', '.join(col_names))
739+
740+
741 class ColumnSelect(Expr):
742 # Wrap a select statement in braces so that it can be used as a column
743 # expression in another query.
744
745=== modified file 'lib/lp/soyuz/configure.zcml'
746--- lib/lp/soyuz/configure.zcml 2012-11-02 04:29:26 +0000
747+++ lib/lp/soyuz/configure.zcml 2012-11-14 07:45:27 +0000
748@@ -1003,9 +1003,9 @@
749 </class>
750
751 <class
752- class="lp.soyuz.model.reporting.LatestPersonSourcepackageReleaseCache">
753+ class="lp.soyuz.model.reporting.LatestPersonSourcePackageReleaseCache">
754 <allow
755- interface="lp.soyuz.interfaces.reporting.ILatestPersonSourcepackageReleaseCache"/>
756+ interface="lp.soyuz.interfaces.reporting.ILatestPersonSourcePackageReleaseCache"/>
757 </class>
758
759 <!-- ProcessAcceptedBugsJobSource -->
760
761=== modified file 'lib/lp/soyuz/interfaces/reporting.py'
762--- lib/lp/soyuz/interfaces/reporting.py 2012-11-07 12:38:19 +0000
763+++ lib/lp/soyuz/interfaces/reporting.py 2012-11-14 07:45:27 +0000
764@@ -3,7 +3,7 @@
765
766 __metaclass__ = type
767 __all__ = [
768- 'ILatestPersonSourcepackageReleaseCache',
769+ 'ILatestPersonSourcePackageReleaseCache',
770 ]
771
772
773@@ -11,7 +11,7 @@
774 from lp.soyuz.interfaces.sourcepackagerelease import ISourcePackageRelease
775
776
777-class ILatestPersonSourcepackageReleaseCache(ISourcePackageRelease):
778+class ILatestPersonSourcePackageReleaseCache(ISourcePackageRelease):
779 """Published source package release information for a person.
780
781 The records represented by this object are the latest published source
782@@ -22,7 +22,7 @@
783 """
784
785 cache_id = Attribute(
786- "The id of the associated LatestPersonSourcepackageReleaseCache"
787+ "The id of the associated LatestPersonSourcePackageReleaseCache"
788 "record.")
789 sourcepackagerelease = Attribute(
790 "The SourcePackageRelease which this object represents.")
791
792=== modified file 'lib/lp/soyuz/model/reporting.py'
793--- lib/lp/soyuz/model/reporting.py 2012-11-07 12:38:19 +0000
794+++ lib/lp/soyuz/model/reporting.py 2012-11-14 07:45:27 +0000
795@@ -3,7 +3,7 @@
796
797 __metaclass__ = type
798 __all__ = [
799- 'LatestPersonSourcepackageReleaseCache',
800+ 'LatestPersonSourcePackageReleaseCache',
801 ]
802
803 from lazr.delegates import delegates
804@@ -18,17 +18,17 @@
805 from lp.services.database.enumcol import EnumCol
806 from lp.soyuz.enums import ArchivePurpose
807 from lp.soyuz.interfaces.reporting import (
808- ILatestPersonSourcepackageReleaseCache,
809+ ILatestPersonSourcePackageReleaseCache,
810 )
811 from lp.soyuz.interfaces.sourcepackagerelease import ISourcePackageRelease
812
813
814-class LatestPersonSourcepackageReleaseCache(Storm):
815- """See `LatestPersonSourcepackageReleaseCache`."""
816- implements(ILatestPersonSourcepackageReleaseCache)
817+class LatestPersonSourcePackageReleaseCache(Storm):
818+ """See `LatestPersonSourcePackageReleaseCache`."""
819+ implements(ILatestPersonSourcePackageReleaseCache)
820 delegates(ISourcePackageRelease, context='sourcepackagerelease')
821
822- __storm_table__ = 'LatestPersonSourcepackageReleaseCache'
823+ __storm_table__ = 'LatestPersonSourcePackageReleaseCache'
824
825 cache_id = Int(name='id', primary=True)
826 publication_id = Int(name='publication')
827
828=== modified file 'lib/lp/soyuz/stories/soyuz/xx-person-packages.txt'
829--- lib/lp/soyuz/stories/soyuz/xx-person-packages.txt 2012-11-07 12:38:19 +0000
830+++ lib/lp/soyuz/stories/soyuz/xx-person-packages.txt 2012-11-14 07:45:27 +0000
831@@ -163,7 +163,7 @@
832 Make a function to update the cached latest person source package release
833 records.
834
835- >>> from lp.scripts.garbo import PopulateLatestPersonSourcepackageReleaseCache
836+ >>> from lp.scripts.garbo import PopulateLatestPersonSourcePackageReleaseCache
837 >>> from lp.services.database.sqlbase import flush_database_updates
838 >>> from lp.services.log.logger import FakeLogger
839 >>> from lp.testing.dbuser import switch_dbuser
840@@ -180,7 +180,7 @@
841 ... "delete from latestpersonsourcepackagereleasecache")
842 ... flush_database_updates()
843 ... switch_dbuser('garbo_frequently')
844- ... job = PopulateLatestPersonSourcepackageReleaseCache(FakeLogger())
845+ ... job = PopulateLatestPersonSourcePackageReleaseCache(FakeLogger())
846 ... while not job.isDone():
847 ... job(chunk_size=100)
848 ... switch_dbuser('launchpad')