Merge lp:~wallyworld/launchpad/another-reporting-cache-garbojob-1071581 into lp:launchpad
- another-reporting-cache-garbojob-1071581
- Merge into devel
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | William Grant | ||||
Approved revision: | no longer in the source branch. | ||||
Merged at revision: | 16266 | ||||
Proposed branch: | lp:~wallyworld/launchpad/another-reporting-cache-garbojob-1071581 | ||||
Merge into: | lp:launchpad | ||||
Diff against target: |
848 lines (+263/-189) 10 files modified
lib/lp/registry/browser/tests/test_person.py (+3/-3) lib/lp/registry/model/person.py (+15/-16) lib/lp/scripts/garbo.py (+155/-137) lib/lp/scripts/tests/test_garbo.py (+14/-16) lib/lp/services/database/bulk.py (+5/-4) lib/lp/services/database/stormexpr.py (+58/-0) lib/lp/soyuz/configure.zcml (+2/-2) lib/lp/soyuz/interfaces/reporting.py (+3/-3) lib/lp/soyuz/model/reporting.py (+6/-6) lib/lp/soyuz/stories/soyuz/xx-person-packages.txt (+2/-2) |
||||
To merge this branch: | bzr merge lp:~wallyworld/launchpad/another-reporting-cache-garbojob-1071581 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
William Grant (community) | code | Approve | |
Review via email: mp+133859@code.launchpad.net |
Commit message
Improve performance of latest source package release cache garbo job.
Description of the change
== Implementation ==
The previous version of the job selected single, distinct records to insert in the cache table, thus the query used suffered similar performance issues to the query which pulled the data live.
This version of the job iterates over SPPH records, ordered by id, starting at the watermark of the last processed spph id. The query joins the SPR table to pick up published SPRs.
A cache record update is done, using Greatest() for the date_uploaded column. Any records not existing already are inserted.
Hopefully this will run a lot faster than the previous version.
== Tests ==
Existing tests suffice.
== Lint ==
Linting changed files:
lib/lp/
lib/lp/
Curtis Hovey (sinzui) wrote : | # |
Ian Booth (wallyworld) wrote : | # |
Bollocks. That was so I could debug. I forgot to uncomment it.
On Mon 12 Nov 2012 23:47:31 EST, Curtis Hovey wrote:
> I don't think you wan to disable half of the production garbojobs or the threading. I think you need to uncomment them to restore the garbo features.
William Grant (wgrant) wrote : | # |
73 spph = ClassAlias(
I don't understand why this needs to be a ClassAlias; if you just want a shorter Python name for the class without aliasing in SQL then 'spph = SourcePackagePu
113 + SourcePackageRe
The custom alias seems pointless here.
114 + spph.id > self.next_spph_id
That sounds, then, rather like it's actually last_spph_id.
132 + return self.getPending
We don't care about the exact count. is_empty() is cheaper and does what we want.
134 - def update_cache(self, updates):
135 + def update_cache(self, update, inserts):
This should be updateCache.
167 + def perform_
168 + purpose, distroseries_id, spn_id, dateuploaded,
169 + spph_id):
The update here is perhaps slightly excessive. The only fields that can ever change are publication, date_uploaded, and sourcepackagere
Additionally, you probably want to aggregate the inserts; I think the current code will crash if two of the same key show up new in a single batch. There's also likely to be benefit in applying the same aggregation technique to updates, not to avoid crashes but to avoid duplicating work. update_cache will likely end up compact enough to be inlined.
It's probably also cleaner to reword the update as a self.store.
I'd lastly like to see the Storm class renamed to LatestPersonSou
209 + for update in (self.getPendin
Extraneous parens are extraneous.
245 - self.runDaily()
246 - self.runHourly()
247 +# self.runDaily()
248 +# self.runHourly()
This seems like an accidental change.
William Grant (wgrant) wrote : | # |
You'll also greatly benefit from ordering the getPendingUpdates joins the way we want the query to be executed: SPPH, SPR, then Archive.
And it might be worth looking at batching the updates (using a CASE expression is the best way, sadly), but that's a far less critical optimisation.
William Grant (wgrant) : | # |
Preview Diff
1 | === modified file 'lib/lp/registry/browser/tests/test_person.py' |
2 | --- lib/lp/registry/browser/tests/test_person.py 2012-11-08 05:55:16 +0000 |
3 | +++ lib/lp/registry/browser/tests/test_person.py 2012-11-14 07:45:27 +0000 |
4 | @@ -37,7 +37,7 @@ |
5 | ) |
6 | from lp.registry.model.karma import KarmaCategory |
7 | from lp.registry.model.milestone import milestone_sort_key |
8 | -from lp.scripts.garbo import PopulateLatestPersonSourcepackageReleaseCache |
9 | +from lp.scripts.garbo import PopulateLatestPersonSourcePackageReleaseCache |
10 | from lp.services.config import config |
11 | from lp.services.features.testing import FeatureFixture |
12 | from lp.services.identity.interfaces.account import AccountStatus |
13 | @@ -849,7 +849,7 @@ |
14 | spphs.append(spph) |
15 | # Update the releases cache table. |
16 | switch_dbuser('garbo_frequently') |
17 | - job = PopulateLatestPersonSourcepackageReleaseCache(FakeLogger()) |
18 | + job = PopulateLatestPersonSourcePackageReleaseCache(FakeLogger()) |
19 | while not job.isDone(): |
20 | job(chunk_size=100) |
21 | switch_dbuser('launchpad') |
22 | @@ -1062,7 +1062,7 @@ |
23 | self.build.archive = publisher.distroseries.main_archive |
24 | # Update the releases cache table. |
25 | switch_dbuser('garbo_frequently') |
26 | - job = PopulateLatestPersonSourcepackageReleaseCache(FakeLogger()) |
27 | + job = PopulateLatestPersonSourcePackageReleaseCache(FakeLogger()) |
28 | while not job.isDone(): |
29 | job(chunk_size=100) |
30 | switch_dbuser('launchpad') |
31 | |
32 | === modified file 'lib/lp/registry/model/person.py' |
33 | --- lib/lp/registry/model/person.py 2012-11-08 05:35:59 +0000 |
34 | +++ lib/lp/registry/model/person.py 2012-11-14 07:45:27 +0000 |
35 | @@ -328,7 +328,7 @@ |
36 | Archive, |
37 | validate_ppa, |
38 | ) |
39 | -from lp.soyuz.model.reporting import LatestPersonSourcepackageReleaseCache |
40 | +from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache |
41 | from lp.soyuz.model.publishing import SourcePackagePublishingHistory |
42 | from lp.soyuz.model.sourcepackagerelease import SourcePackageRelease |
43 | from lp.translations.model.hastranslationimports import ( |
44 | @@ -2270,8 +2270,8 @@ |
45 | ('QuestionSubscription', 'person'), |
46 | ('SpecificationSubscription', 'person'), |
47 | ('AnswerContact', 'person'), |
48 | - ('LatestPersonSourcepackageReleaseCache', 'creator'), |
49 | - ('LatestPersonSourcepackageReleaseCache', 'maintainer')] |
50 | + ('LatestPersonSourcePackageReleaseCache', 'creator'), |
51 | + ('LatestPersonSourcePackageReleaseCache', 'maintainer')] |
52 | cur = cursor() |
53 | for table, person_id_column in removals: |
54 | cur.execute("DELETE FROM %s WHERE %s=%d" |
55 | @@ -2838,33 +2838,33 @@ |
56 | clauses = [] |
57 | if uploader_only: |
58 | clauses.append( |
59 | - LatestPersonSourcepackageReleaseCache.creator_id == self.id) |
60 | + LatestPersonSourcePackageReleaseCache.creator_id == self.id) |
61 | if ppa_only: |
62 | # Source maintainer is irrelevant for PPA uploads. |
63 | pass |
64 | elif uploader_only: |
65 | - lpspr = ClassAlias(LatestPersonSourcepackageReleaseCache, 'lpspr') |
66 | + lpspr = ClassAlias(LatestPersonSourcePackageReleaseCache, 'lpspr') |
67 | clauses.append(Not(Exists(Select(1, |
68 | where=And( |
69 | lpspr.sourcepackagename_id == |
70 | - LatestPersonSourcepackageReleaseCache.sourcepackagename_id, |
71 | + LatestPersonSourcePackageReleaseCache.sourcepackagename_id, |
72 | lpspr.upload_archive_id == |
73 | - LatestPersonSourcepackageReleaseCache.upload_archive_id, |
74 | + LatestPersonSourcePackageReleaseCache.upload_archive_id, |
75 | lpspr.upload_distroseries_id == |
76 | - LatestPersonSourcepackageReleaseCache.upload_distroseries_id, |
77 | + LatestPersonSourcePackageReleaseCache.upload_distroseries_id, |
78 | lpspr.archive_purpose != ArchivePurpose.PPA, |
79 | lpspr.maintainer_id == self.id), |
80 | tables=lpspr)))) |
81 | else: |
82 | clauses.append( |
83 | - LatestPersonSourcepackageReleaseCache.maintainer_id == self.id) |
84 | + LatestPersonSourcePackageReleaseCache.maintainer_id == self.id) |
85 | if ppa_only: |
86 | clauses.append( |
87 | - LatestPersonSourcepackageReleaseCache.archive_purpose == |
88 | + LatestPersonSourcePackageReleaseCache.archive_purpose == |
89 | ArchivePurpose.PPA) |
90 | else: |
91 | clauses.append( |
92 | - LatestPersonSourcepackageReleaseCache.archive_purpose != |
93 | + LatestPersonSourcePackageReleaseCache.archive_purpose != |
94 | ArchivePurpose.PPA) |
95 | return clauses |
96 | |
97 | @@ -2876,8 +2876,8 @@ |
98 | return self._legacy_hasReleasesQuery(uploader_only, ppa_only) |
99 | |
100 | clauses = self._releasesQueryFilter(uploader_only, ppa_only) |
101 | - rs = Store.of(self).using(LatestPersonSourcepackageReleaseCache).find( |
102 | - LatestPersonSourcepackageReleaseCache.publication_id, *clauses) |
103 | + rs = Store.of(self).using(LatestPersonSourcePackageReleaseCache).find( |
104 | + LatestPersonSourcePackageReleaseCache.publication_id, *clauses) |
105 | return not rs.is_empty() |
106 | |
107 | def _latestReleasesQuery(self, uploader_only=False, ppa_only=False): |
108 | @@ -2889,8 +2889,8 @@ |
109 | |
110 | clauses = self._releasesQueryFilter(uploader_only, ppa_only) |
111 | rs = Store.of(self).find( |
112 | - LatestPersonSourcepackageReleaseCache, *clauses).order_by( |
113 | - Desc(LatestPersonSourcepackageReleaseCache.dateuploaded)) |
114 | + LatestPersonSourcePackageReleaseCache, *clauses).order_by( |
115 | + Desc(LatestPersonSourcePackageReleaseCache.dateuploaded)) |
116 | |
117 | def load_related_objects(rows): |
118 | if rows and rows[0].maintainer_id: |
119 | @@ -2904,7 +2904,6 @@ |
120 | |
121 | return DecoratedResultSet(rs, pre_iter_hook=load_related_objects) |
122 | |
123 | - |
124 | def _legacy_releasesQueryFilter(self, uploader_only=False, ppa_only=False): |
125 | """Return the filter used to find sourcepackagereleases (SPRs) |
126 | related to this person. |
127 | |
128 | === modified file 'lib/lp/scripts/garbo.py' |
129 | --- lib/lp/scripts/garbo.py 2012-11-09 14:18:45 +0000 |
130 | +++ lib/lp/scripts/garbo.py 2012-11-14 07:45:27 +0000 |
131 | @@ -31,22 +31,19 @@ |
132 | from psycopg2 import IntegrityError |
133 | import pytz |
134 | from storm.expr import ( |
135 | - Alias, |
136 | And, |
137 | - Desc, |
138 | In, |
139 | - Insert, |
140 | Join, |
141 | Like, |
142 | + Max, |
143 | + Min, |
144 | + Or, |
145 | + Row, |
146 | Select, |
147 | + SQL, |
148 | Update, |
149 | ) |
150 | from storm.info import ClassAlias |
151 | -from storm.locals import ( |
152 | - Max, |
153 | - Min, |
154 | - SQL, |
155 | - ) |
156 | from storm.store import EmptyResultSet |
157 | import transaction |
158 | from zope.component import getUtility |
159 | @@ -75,6 +72,10 @@ |
160 | from lp.registry.model.product import Product |
161 | from lp.services.config import config |
162 | from lp.services.database import postgresql |
163 | +from lp.services.database.bulk import ( |
164 | + create, |
165 | + dbify_value, |
166 | + ) |
167 | from lp.services.database.constants import UTC_NOW |
168 | from lp.services.database.interfaces import ( |
169 | IStoreSelector, |
170 | @@ -87,6 +88,10 @@ |
171 | session_store, |
172 | sqlvalues, |
173 | ) |
174 | +from lp.services.database.stormexpr import ( |
175 | + BulkUpdate, |
176 | + Values, |
177 | + ) |
178 | from lp.services.features import ( |
179 | getFeatureFlag, |
180 | install_feature_controller, |
181 | @@ -116,7 +121,7 @@ |
182 | from lp.services.verification.model.logintoken import LoginToken |
183 | from lp.soyuz.model.archive import Archive |
184 | from lp.soyuz.model.publishing import SourcePackagePublishingHistory |
185 | -from lp.soyuz.model.reporting import LatestPersonSourcepackageReleaseCache |
186 | +from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache |
187 | from lp.soyuz.model.sourcepackagerelease import SourcePackageRelease |
188 | from lp.translations.interfaces.potemplate import IPOTemplateSet |
189 | from lp.translations.model.potmsgset import POTMsgSet |
190 | @@ -463,164 +468,177 @@ |
191 | transaction.commit() |
192 | |
193 | |
194 | -class PopulateLatestPersonSourcepackageReleaseCache(TunableLoop): |
195 | - """Populate the LatestPersonSourcepackageReleaseCache table. |
196 | +class PopulateLatestPersonSourcePackageReleaseCache(TunableLoop): |
197 | + """Populate the LatestPersonSourcePackageReleaseCache table. |
198 | |
199 | - The LatestPersonSourcepackageReleaseCache contains 2 sets of data, one set |
200 | - for package maintainers and another for package creators. This job first |
201 | - populates the creator data and then does the maintainer data. |
202 | + The LatestPersonSourcePackageReleaseCache contains 2 sets of data, one set |
203 | + for package maintainers and another for package creators. This job iterates |
204 | + over the SPPH records, populating the cache table. |
205 | """ |
206 | maximum_chunk_size = 1000 |
207 | |
208 | + cache_columns = ( |
209 | + LatestPersonSourcePackageReleaseCache.maintainer_id, |
210 | + LatestPersonSourcePackageReleaseCache.creator_id, |
211 | + LatestPersonSourcePackageReleaseCache.upload_archive_id, |
212 | + LatestPersonSourcePackageReleaseCache.upload_distroseries_id, |
213 | + LatestPersonSourcePackageReleaseCache.sourcepackagename_id, |
214 | + LatestPersonSourcePackageReleaseCache.archive_purpose, |
215 | + LatestPersonSourcePackageReleaseCache.publication_id, |
216 | + LatestPersonSourcePackageReleaseCache.dateuploaded, |
217 | + LatestPersonSourcePackageReleaseCache.sourcepackagerelease_id, |
218 | + ) |
219 | + |
220 | def __init__(self, log, abort_time=None): |
221 | - super_cl = super(PopulateLatestPersonSourcepackageReleaseCache, self) |
222 | + super_cl = super(PopulateLatestPersonSourcePackageReleaseCache, self) |
223 | super_cl.__init__(log, abort_time) |
224 | self.store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR) |
225 | # Keep a record of the processed source package release id and data |
226 | # type (creator or maintainer) so we know where to job got up to. |
227 | - self.next_id_for_creator = 0 |
228 | - self.next_id_for_maintainer = 0 |
229 | - self.current_person_filter_type = 'creator' |
230 | - self.starting_person_filter_type = self.current_person_filter_type |
231 | + self.last_spph_id = 0 |
232 | self.job_name = self.__class__.__name__ |
233 | job_data = load_garbo_job_state(self.job_name) |
234 | if job_data: |
235 | - self.next_id_for_creator = job_data['next_id_for_creator'] |
236 | - self.next_id_for_maintainer = job_data['next_id_for_maintainer'] |
237 | - self.current_person_filter_type = job_data['person_filter_type'] |
238 | - self.starting_person_filter_type = self.current_person_filter_type |
239 | + self.last_spph_id = job_data.get('last_spph_id', 0) |
240 | |
241 | def getPendingUpdates(self): |
242 | - # Load the latest published source package release data keyed on either |
243 | - # creator or maintainer as required. |
244 | - if self.current_person_filter_type == 'creator': |
245 | - person_filter = SourcePackageRelease.creatorID |
246 | - next_id = self.next_id_for_creator |
247 | - else: |
248 | - person_filter = SourcePackageRelease.maintainerID |
249 | - next_id = self.next_id_for_maintainer |
250 | - spph = ClassAlias(SourcePackagePublishingHistory, "spph") |
251 | + # Load the latest published source package release data. |
252 | + spph = SourcePackagePublishingHistory |
253 | origin = [ |
254 | SourcePackageRelease, |
255 | Join( |
256 | spph, |
257 | And(spph.sourcepackagereleaseID == SourcePackageRelease.id, |
258 | - spph.archiveID == SourcePackageRelease.upload_archiveID))] |
259 | - spr_select = self.store.using(*origin).find( |
260 | - (SourcePackageRelease.id, Alias(spph.id, 'spph_id')), |
261 | - SourcePackageRelease.id > next_id |
262 | - ).order_by( |
263 | - person_filter, |
264 | - SourcePackageRelease.upload_distroseriesID, |
265 | - SourcePackageRelease.sourcepackagenameID, |
266 | - SourcePackageRelease.upload_archiveID, |
267 | - Desc(SourcePackageRelease.dateuploaded), |
268 | - SourcePackageRelease.id |
269 | - ).config(distinct=( |
270 | - person_filter, |
271 | - SourcePackageRelease.upload_distroseriesID, |
272 | - SourcePackageRelease.sourcepackagenameID, |
273 | - SourcePackageRelease.upload_archiveID))._get_select() |
274 | - |
275 | - spr = Alias(spr_select, 'spr') |
276 | - origin = [ |
277 | - SourcePackageRelease, |
278 | - Join(spr, SQL('spr.id') == SourcePackageRelease.id), |
279 | - Join(Archive, Archive.id == SourcePackageRelease.upload_archiveID)] |
280 | + spph.archiveID == SourcePackageRelease.upload_archiveID)), |
281 | + Join(Archive, Archive.id == spph.archiveID)] |
282 | rs = self.store.using(*origin).find( |
283 | (SourcePackageRelease.id, |
284 | - person_filter, |
285 | + SourcePackageRelease.creatorID, |
286 | + SourcePackageRelease.maintainerID, |
287 | SourcePackageRelease.upload_archiveID, |
288 | Archive.purpose, |
289 | SourcePackageRelease.upload_distroseriesID, |
290 | SourcePackageRelease.sourcepackagenameID, |
291 | - SourcePackageRelease.dateuploaded, SQL('spph_id')) |
292 | - ).order_by(SourcePackageRelease.id) |
293 | + SourcePackageRelease.dateuploaded, spph.id), |
294 | + spph.id > self.last_spph_id |
295 | + ).order_by(spph.id) |
296 | return rs |
297 | |
298 | def isDone(self): |
299 | - # If there is no more data to process for creators, switch over to |
300 | - # processing data for maintainers, or visa versa. |
301 | - current_count = self.getPendingUpdates().count() |
302 | - if current_count == 0: |
303 | - if (self.current_person_filter_type != |
304 | - self.starting_person_filter_type): |
305 | - return True |
306 | - if self.current_person_filter_type == 'creator': |
307 | - self.current_person_filter_type = 'maintainer' |
308 | - else: |
309 | - self.current_person_filter_type = 'creator' |
310 | - current_count = self.getPendingUpdates().count() |
311 | - return current_count == 0 |
312 | - |
313 | - def update_cache(self, updates): |
314 | - # Update the LatestPersonSourcepackageReleaseCache table. Records for |
315 | - # each creator/maintainer will either be new inserts or updates. We try |
316 | - # to update first, and gather data for missing (new) records along the |
317 | - # way. At the end, a bulk insert is done for any new data. |
318 | - # Updates is a list of data records (tuples of values). |
319 | - # Each record is keyed on: |
320 | - # - (creator/maintainer), archive, distroseries, sourcepackagename |
321 | - inserts = [] |
322 | - columns = ( |
323 | - LatestPersonSourcepackageReleaseCache.sourcepackagerelease_id, |
324 | - LatestPersonSourcepackageReleaseCache.creator_id, |
325 | - LatestPersonSourcepackageReleaseCache.maintainer_id, |
326 | - LatestPersonSourcepackageReleaseCache.upload_archive_id, |
327 | - LatestPersonSourcepackageReleaseCache.archive_purpose, |
328 | - LatestPersonSourcepackageReleaseCache.upload_distroseries_id, |
329 | - LatestPersonSourcepackageReleaseCache.sourcepackagename_id, |
330 | - LatestPersonSourcepackageReleaseCache.dateuploaded, |
331 | - LatestPersonSourcepackageReleaseCache.publication_id, |
332 | - ) |
333 | - for update in updates: |
334 | - (spr_id, person_id, archive_id, purpose, |
335 | - distroseries_id, spn_id, dateuploaded, spph_id) = update |
336 | - if self.current_person_filter_type == 'creator': |
337 | - creator_id = person_id |
338 | - maintainer_id = None |
339 | - else: |
340 | - creator_id = None |
341 | - maintainer_id = person_id |
342 | - values = ( |
343 | - spr_id, creator_id, maintainer_id, archive_id, purpose.value, |
344 | - distroseries_id, spn_id, dateuploaded, spph_id) |
345 | - data = dict(zip(columns, values)) |
346 | - result = self.store.execute(Update( |
347 | - data, And( |
348 | - LatestPersonSourcepackageReleaseCache.upload_archive_id == |
349 | - archive_id, |
350 | - LatestPersonSourcepackageReleaseCache.upload_distroseries_id == |
351 | - distroseries_id, |
352 | - LatestPersonSourcepackageReleaseCache.sourcepackagename_id == |
353 | - spn_id, |
354 | - LatestPersonSourcepackageReleaseCache.creator_id == |
355 | - creator_id, |
356 | - LatestPersonSourcepackageReleaseCache.maintainer_id == |
357 | - maintainer_id))) |
358 | - if result.rowcount == 0: |
359 | - inserts.append(values) |
360 | + return self.getPendingUpdates().is_empty() |
361 | + |
362 | + def __call__(self, chunk_size): |
363 | + cache_filter_data = [] |
364 | + new_records = dict() |
365 | + # Create a map of new published spr data for creators and maintainers. |
366 | + # The map is keyed on (creator/maintainer, archive, spn, distroseries). |
367 | + for new_published_spr_data in self.getPendingUpdates()[:chunk_size]: |
368 | + (spr_id, creator_id, maintainer_id, archive_id, purpose, |
369 | + distroseries_id, spn_id, dateuploaded, |
370 | + spph_id) = new_published_spr_data |
371 | + cache_filter_data.append((archive_id, distroseries_id, spn_id)) |
372 | + |
373 | + value = (purpose, spph_id, dateuploaded, spr_id) |
374 | + maintainer_key = ( |
375 | + maintainer_id, None, archive_id, distroseries_id, spn_id) |
376 | + creator_key = ( |
377 | + None, creator_id, archive_id, distroseries_id, spn_id) |
378 | + new_records[maintainer_key] = maintainer_key + value |
379 | + new_records[creator_key] = creator_key + value |
380 | + self.last_spph_id = spph_id |
381 | + |
382 | + # Gather all the current cached reporting records corresponding to the |
383 | + # data in the current batch. We select matching records from the |
384 | + # reporting cache table based on |
385 | + # (archive_id, distroseries_id, sourcepackagename_id). |
386 | + existing_records = dict() |
387 | + lpsprc = LatestPersonSourcePackageReleaseCache |
388 | + rs = self.store.find( |
389 | + lpsprc, |
390 | + In( |
391 | + Row( |
392 | + lpsprc.upload_archive_id, |
393 | + lpsprc.upload_distroseries_id, |
394 | + lpsprc.sourcepackagename_id), |
395 | + map(Row, cache_filter_data))) |
396 | + for lpsprc_record in rs: |
397 | + key = ( |
398 | + lpsprc_record.maintainer_id, |
399 | + lpsprc_record.creator_id, |
400 | + lpsprc_record.upload_archive_id, |
401 | + lpsprc_record.upload_distroseries_id, |
402 | + lpsprc_record.sourcepackagename_id) |
403 | + existing_records[key] = pytz.UTC.localize( |
404 | + lpsprc_record.dateuploaded) |
405 | + |
406 | + # Figure out what records from the new published spr data need to be |
407 | + # inserted and updated into the cache table. |
408 | + inserts = dict() |
409 | + updates = dict() |
410 | + for key, new_published_spr_data in new_records.items(): |
411 | + existing_dateuploaded = existing_records.get(key, None) |
412 | + new_dateuploaded = new_published_spr_data[7] |
413 | + if existing_dateuploaded is None: |
414 | + target = inserts |
415 | + else: |
416 | + target = updates |
417 | + |
418 | + existing_action = target.get(key, None) |
419 | + if (existing_action is None |
420 | + or existing_action[7] < new_dateuploaded): |
421 | + target[key] = new_published_spr_data |
422 | + |
423 | if inserts: |
424 | - self.store.execute(Insert(columns, values=inserts)) |
425 | - |
426 | - def __call__(self, chunk_size): |
427 | - max_id = None |
428 | - updates = [] |
429 | - for update in (self.getPendingUpdates()[:chunk_size]): |
430 | - updates.append(update) |
431 | - max_id = update[0] |
432 | - self.update_cache(updates) |
433 | - |
434 | - if max_id: |
435 | - if self.current_person_filter_type == 'creator': |
436 | - self.next_id_for_creator = max_id |
437 | - else: |
438 | - self.next_id_for_maintainer = max_id |
439 | + # Do a bulk insert. |
440 | + create(self.cache_columns, inserts.values()) |
441 | + if updates: |
442 | + # Do a bulk update. |
443 | + cols = [ |
444 | + ("maintainer", "integer"), |
445 | + ("creator", "integer"), |
446 | + ("upload_archive", "integer"), |
447 | + ("upload_distroseries", "integer"), |
448 | + ("sourcepackagename", "integer"), |
449 | + ("archive_purpose", "integer"), |
450 | + ("publication", "integer"), |
451 | + ("date_uploaded", "timestamp without time zone"), |
452 | + ("sourcepackagerelease", "integer"), |
453 | + ] |
454 | + values = [ |
455 | + [dbify_value(col, val)[0] |
456 | + for (col, val) in zip(self.cache_columns, data)] |
457 | + for data in updates.values()] |
458 | + |
459 | + cache_data_expr = Values('cache_data', cols, values) |
460 | + cache_data = ClassAlias(lpsprc, "cache_data") |
461 | + |
462 | + # The columns to be updated. |
463 | + updated_columns = dict([ |
464 | + (lpsprc.dateuploaded, cache_data.dateuploaded), |
465 | + (lpsprc.sourcepackagerelease_id, |
466 | + cache_data.sourcepackagerelease_id), |
467 | + (lpsprc.publication_id, cache_data.publication_id)]) |
468 | + # The update filter. |
469 | + filter = And( |
470 | + Or( |
471 | + cache_data.creator_id == None, |
472 | + lpsprc.creator_id == cache_data.creator_id), |
473 | + Or( |
474 | + cache_data.maintainer_id == None, |
475 | + lpsprc.maintainer_id == cache_data.maintainer_id), |
476 | + lpsprc.upload_archive_id == cache_data.upload_archive_id, |
477 | + lpsprc.upload_distroseries_id == |
478 | + cache_data.upload_distroseries_id, |
479 | + lpsprc.sourcepackagename_id == cache_data.sourcepackagename_id) |
480 | + |
481 | + self.store.execute( |
482 | + BulkUpdate( |
483 | + updated_columns, |
484 | + table=LatestPersonSourcePackageReleaseCache, |
485 | + values=cache_data_expr, where=filter)) |
486 | self.store.flush() |
487 | save_garbo_job_state(self.job_name, { |
488 | - 'next_id_for_creator': self.next_id_for_creator, |
489 | - 'next_id_for_maintainer': self.next_id_for_maintainer, |
490 | - 'person_filter_type': self.current_person_filter_type}) |
491 | + 'last_spph_id': self.last_spph_id}) |
492 | transaction.commit() |
493 | |
494 | |
495 | @@ -1560,7 +1578,7 @@ |
496 | OpenIDConsumerAssociationPruner, |
497 | AntiqueSessionPruner, |
498 | VoucherRedeemer, |
499 | - PopulateLatestPersonSourcepackageReleaseCache, |
500 | + PopulateLatestPersonSourcePackageReleaseCache, |
501 | ] |
502 | experimental_tunable_loops = [] |
503 | |
504 | |
505 | === modified file 'lib/lp/scripts/tests/test_garbo.py' |
506 | --- lib/lp/scripts/tests/test_garbo.py 2012-11-09 14:18:45 +0000 |
507 | +++ lib/lp/scripts/tests/test_garbo.py 2012-11-14 07:45:27 +0000 |
508 | @@ -114,7 +114,7 @@ |
509 | from lp.services.verification.model.logintoken import LoginToken |
510 | from lp.services.worlddata.interfaces.language import ILanguageSet |
511 | from lp.soyuz.enums import PackagePublishingStatus |
512 | -from lp.soyuz.model.reporting import LatestPersonSourcepackageReleaseCache |
513 | +from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache |
514 | from lp.testing import ( |
515 | FakeAdapterMixin, |
516 | person_logged_in, |
517 | @@ -1166,7 +1166,7 @@ |
518 | self.assertEqual(0, store.find(Product, |
519 | Product._information_type == None).count()) |
520 | |
521 | - def test_PopulateLatestPersonSourcepackageReleaseCache(self): |
522 | + def test_PopulateLatestPersonSourcePackageReleaseCache(self): |
523 | switch_dbuser('testadmin') |
524 | # Make some same test data - we create published source package |
525 | # releases for 2 different creators and maintainers. |
526 | @@ -1204,25 +1204,25 @@ |
527 | creator=creators[1], maintainer=maintainers[1], |
528 | distroseries=distroseries, sourcepackagename=spn, |
529 | date_uploaded=datetime(2010, 12, 4, tzinfo=pytz.UTC)) |
530 | - self.factory.makeSourcePackagePublishingHistory( |
531 | + spph_1 = self.factory.makeSourcePackagePublishingHistory( |
532 | status=PackagePublishingStatus.PUBLISHED, |
533 | sourcepackagerelease=spr4) |
534 | |
535 | transaction.commit() |
536 | self.runFrequently() |
537 | |
538 | - store = IMasterStore(LatestPersonSourcepackageReleaseCache) |
539 | + store = IMasterStore(LatestPersonSourcePackageReleaseCache) |
540 | # Check that the garbo state table has data. |
541 | self.assertIsNotNone( |
542 | store.execute( |
543 | 'SELECT * FROM GarboJobState WHERE name=?', |
544 | - params=[u'PopulateLatestPersonSourcepackageReleaseCache'] |
545 | + params=[u'PopulateLatestPersonSourcePackageReleaseCache'] |
546 | ).get_one()) |
547 | |
548 | def _assert_release_by_creator(creator, spr): |
549 | release_records = store.find( |
550 | - LatestPersonSourcepackageReleaseCache, |
551 | - LatestPersonSourcepackageReleaseCache.creator_id == creator.id) |
552 | + LatestPersonSourcePackageReleaseCache, |
553 | + LatestPersonSourcePackageReleaseCache.creator_id == creator.id) |
554 | [record] = list(release_records) |
555 | self.assertEqual(spr.creator, record.creator) |
556 | self.assertIsNone(record.maintainer_id) |
557 | @@ -1231,8 +1231,8 @@ |
558 | |
559 | def _assert_release_by_maintainer(maintainer, spr): |
560 | release_records = store.find( |
561 | - LatestPersonSourcepackageReleaseCache, |
562 | - LatestPersonSourcepackageReleaseCache.maintainer_id == |
563 | + LatestPersonSourcePackageReleaseCache, |
564 | + LatestPersonSourcePackageReleaseCache.maintainer_id == |
565 | maintainer.id) |
566 | [record] = list(release_records) |
567 | self.assertEqual(spr.maintainer, record.maintainer) |
568 | @@ -1246,9 +1246,8 @@ |
569 | _assert_release_by_maintainer(maintainers[1], spr4) |
570 | |
571 | job_data = load_garbo_job_state( |
572 | - 'PopulateLatestPersonSourcepackageReleaseCache') |
573 | - self.assertEqual(spr4.id, job_data['next_id_for_creator']) |
574 | - self.assertEqual(spr4.id, job_data['next_id_for_maintainer']) |
575 | + 'PopulateLatestPersonSourcePackageReleaseCache') |
576 | + self.assertEqual(spph_1.id, job_data['last_spph_id']) |
577 | |
578 | # Create a newer published source package release and ensure the |
579 | # release cache table is correctly updated. |
580 | @@ -1257,7 +1256,7 @@ |
581 | creator=creators[1], maintainer=maintainers[1], |
582 | distroseries=distroseries, sourcepackagename=spn, |
583 | date_uploaded=datetime(2010, 12, 5, tzinfo=pytz.UTC)) |
584 | - self.factory.makeSourcePackagePublishingHistory( |
585 | + spph_2 = self.factory.makeSourcePackagePublishingHistory( |
586 | status=PackagePublishingStatus.PUBLISHED, |
587 | sourcepackagerelease=spr5) |
588 | |
589 | @@ -1270,9 +1269,8 @@ |
590 | _assert_release_by_maintainer(maintainers[1], spr5) |
591 | |
592 | job_data = load_garbo_job_state( |
593 | - 'PopulateLatestPersonSourcepackageReleaseCache') |
594 | - self.assertEqual(spr5.id, job_data['next_id_for_creator']) |
595 | - self.assertEqual(spr5.id, job_data['next_id_for_maintainer']) |
596 | + 'PopulateLatestPersonSourcePackageReleaseCache') |
597 | + self.assertEqual(spph_2.id, job_data['last_spph_id']) |
598 | |
599 | |
600 | class TestGarboTasks(TestCaseWithFactory): |
601 | |
602 | === modified file 'lib/lp/services/database/bulk.py' |
603 | --- lib/lp/services/database/bulk.py 2012-10-17 00:22:31 +0000 |
604 | +++ lib/lp/services/database/bulk.py 2012-11-14 07:45:27 +0000 |
605 | @@ -6,6 +6,7 @@ |
606 | __metaclass__ = type |
607 | __all__ = [ |
608 | 'create', |
609 | + 'dbify_value', |
610 | 'load', |
611 | 'load_referencing', |
612 | 'load_related', |
613 | @@ -167,7 +168,7 @@ |
614 | return load(object_type, keys) |
615 | |
616 | |
617 | -def _dbify_value(col, val): |
618 | +def dbify_value(col, val): |
619 | """Convert a value into a form that Storm can compile directly.""" |
620 | if isinstance(val, SQL): |
621 | return (val,) |
622 | @@ -184,7 +185,7 @@ |
623 | return (col.variable_factory(value=val),) |
624 | |
625 | |
626 | -def _dbify_column(col): |
627 | +def dbify_column(col): |
628 | """Convert a column into a form that Storm can compile directly.""" |
629 | if isinstance(col, Reference): |
630 | # References are mainly meant to be used as descriptors, so we |
631 | @@ -207,7 +208,7 @@ |
632 | :return: A list of the created objects if get_created, otherwise None. |
633 | """ |
634 | # Flatten Reference faux-columns into their primary keys. |
635 | - db_cols = list(chain.from_iterable(map(_dbify_column, columns))) |
636 | + db_cols = list(chain.from_iterable(map(dbify_column, columns))) |
637 | clses = set(col.cls for col in db_cols) |
638 | if len(clses) != 1: |
639 | raise ValueError( |
640 | @@ -228,7 +229,7 @@ |
641 | # squashed into primary key variables. |
642 | db_values = [ |
643 | list(chain.from_iterable( |
644 | - _dbify_value(col, val) for col, val in zip(columns, value))) |
645 | + dbify_value(col, val) for col, val in zip(columns, value))) |
646 | for value in values] |
647 | |
648 | if get_objects or get_primary_keys: |
649 | |
650 | === modified file 'lib/lp/services/database/stormexpr.py' |
651 | --- lib/lp/services/database/stormexpr.py 2012-09-24 16:03:53 +0000 |
652 | +++ lib/lp/services/database/stormexpr.py 2012-11-14 07:45:27 +0000 |
653 | @@ -8,6 +8,7 @@ |
654 | 'ArrayAgg', |
655 | 'ArrayContains', |
656 | 'ArrayIntersects', |
657 | + 'BulkUpdate', |
658 | 'ColumnSelect', |
659 | 'Concatenate', |
660 | 'CountDistinct', |
661 | @@ -17,11 +18,14 @@ |
662 | 'NullCount', |
663 | 'TryAdvisoryLock', |
664 | 'Unnest', |
665 | + 'Values', |
666 | ] |
667 | |
668 | +from storm import Undef |
669 | from storm.exceptions import ClassInfoError |
670 | from storm.expr import ( |
671 | BinaryOper, |
672 | + COLUMN_NAME, |
673 | ComparableExpr, |
674 | compile, |
675 | CompoundOper, |
676 | @@ -31,6 +35,7 @@ |
677 | NamedFunc, |
678 | Or, |
679 | SQL, |
680 | + TABLE, |
681 | ) |
682 | from storm.info import ( |
683 | get_cls_info, |
684 | @@ -38,6 +43,59 @@ |
685 | ) |
686 | |
687 | |
688 | +class BulkUpdate(Expr): |
689 | + # Perform a bulk table update using literal values. |
690 | + __slots__ = ("map", "where", "table", "values") |
691 | + |
692 | + def __init__(self, map, table, values, where=Undef): |
693 | + self.map = map |
694 | + self.where = where |
695 | + self.table = table |
696 | + self.values = values |
697 | + |
698 | + |
699 | +@compile.when(BulkUpdate) |
700 | +def compile_bulkupdate(compile, update, state): |
701 | + pairs = update.map.items() |
702 | + state.push("context", COLUMN_NAME) |
703 | + col_names = [compile(col, state, token=True) for col, val in pairs] |
704 | + state.context = EXPR |
705 | + col_values = [compile(val, state) for col, val in pairs] |
706 | + sets = ["%s=%s" % (col, val) for col, val in zip(col_names, col_values)] |
707 | + state.context = TABLE |
708 | + tokens = ["UPDATE ", compile(update.table, state, token=True), " SET ", |
709 | + ", ".join(sets), " FROM "] |
710 | + state.context = EXPR |
711 | + # We don't want the values expression wrapped in parenthesis. |
712 | + state.precedence = 0 |
713 | + tokens.append(compile(update.values, state, raw=True)) |
714 | + if update.where is not Undef: |
715 | + tokens.append(" WHERE ") |
716 | + tokens.append(compile(update.where, state, raw=True)) |
717 | + state.pop() |
718 | + return "".join(tokens) |
719 | + |
720 | + |
721 | +class Values(Expr): |
722 | + __slots__ = ("name", "cols", "values") |
723 | + |
724 | + def __init__(self, name, cols, values): |
725 | + self.name = name |
726 | + self.cols = cols |
727 | + self.values = values |
728 | + |
729 | + |
730 | +@compile.when(Values) |
731 | +def compile_values(compile, expr, state): |
732 | + col_names, col_types = zip(*expr.cols) |
733 | + first_row = ", ".join( |
734 | + "%s::%s" % (compile(value, state), type) |
735 | + for value, type in zip(expr.values[0], col_types)) |
736 | + rows = [first_row] + [compile(value, state) for value in expr.values[1:]] |
737 | + return "(VALUES (%s)) AS %s(%s)" % ( |
738 | + "), (".join(rows), expr.name, ', '.join(col_names)) |
739 | + |
740 | + |
741 | class ColumnSelect(Expr): |
742 | # Wrap a select statement in braces so that it can be used as a column |
743 | # expression in another query. |
744 | |
745 | === modified file 'lib/lp/soyuz/configure.zcml' |
746 | --- lib/lp/soyuz/configure.zcml 2012-11-02 04:29:26 +0000 |
747 | +++ lib/lp/soyuz/configure.zcml 2012-11-14 07:45:27 +0000 |
748 | @@ -1003,9 +1003,9 @@ |
749 | </class> |
750 | |
751 | <class |
752 | - class="lp.soyuz.model.reporting.LatestPersonSourcepackageReleaseCache"> |
753 | + class="lp.soyuz.model.reporting.LatestPersonSourcePackageReleaseCache"> |
754 | <allow |
755 | - interface="lp.soyuz.interfaces.reporting.ILatestPersonSourcepackageReleaseCache"/> |
756 | + interface="lp.soyuz.interfaces.reporting.ILatestPersonSourcePackageReleaseCache"/> |
757 | </class> |
758 | |
759 | <!-- ProcessAcceptedBugsJobSource --> |
760 | |
761 | === modified file 'lib/lp/soyuz/interfaces/reporting.py' |
762 | --- lib/lp/soyuz/interfaces/reporting.py 2012-11-07 12:38:19 +0000 |
763 | +++ lib/lp/soyuz/interfaces/reporting.py 2012-11-14 07:45:27 +0000 |
764 | @@ -3,7 +3,7 @@ |
765 | |
766 | __metaclass__ = type |
767 | __all__ = [ |
768 | - 'ILatestPersonSourcepackageReleaseCache', |
769 | + 'ILatestPersonSourcePackageReleaseCache', |
770 | ] |
771 | |
772 | |
773 | @@ -11,7 +11,7 @@ |
774 | from lp.soyuz.interfaces.sourcepackagerelease import ISourcePackageRelease |
775 | |
776 | |
777 | -class ILatestPersonSourcepackageReleaseCache(ISourcePackageRelease): |
778 | +class ILatestPersonSourcePackageReleaseCache(ISourcePackageRelease): |
779 | """Published source package release information for a person. |
780 | |
781 | The records represented by this object are the latest published source |
782 | @@ -22,7 +22,7 @@ |
783 | """ |
784 | |
785 | cache_id = Attribute( |
786 | - "The id of the associated LatestPersonSourcepackageReleaseCache" |
787 | + "The id of the associated LatestPersonSourcePackageReleaseCache" |
788 | "record.") |
789 | sourcepackagerelease = Attribute( |
790 | "The SourcePackageRelease which this object represents.") |
791 | |
792 | === modified file 'lib/lp/soyuz/model/reporting.py' |
793 | --- lib/lp/soyuz/model/reporting.py 2012-11-07 12:38:19 +0000 |
794 | +++ lib/lp/soyuz/model/reporting.py 2012-11-14 07:45:27 +0000 |
795 | @@ -3,7 +3,7 @@ |
796 | |
797 | __metaclass__ = type |
798 | __all__ = [ |
799 | - 'LatestPersonSourcepackageReleaseCache', |
800 | + 'LatestPersonSourcePackageReleaseCache', |
801 | ] |
802 | |
803 | from lazr.delegates import delegates |
804 | @@ -18,17 +18,17 @@ |
805 | from lp.services.database.enumcol import EnumCol |
806 | from lp.soyuz.enums import ArchivePurpose |
807 | from lp.soyuz.interfaces.reporting import ( |
808 | - ILatestPersonSourcepackageReleaseCache, |
809 | + ILatestPersonSourcePackageReleaseCache, |
810 | ) |
811 | from lp.soyuz.interfaces.sourcepackagerelease import ISourcePackageRelease |
812 | |
813 | |
814 | -class LatestPersonSourcepackageReleaseCache(Storm): |
815 | - """See `LatestPersonSourcepackageReleaseCache`.""" |
816 | - implements(ILatestPersonSourcepackageReleaseCache) |
817 | +class LatestPersonSourcePackageReleaseCache(Storm): |
818 | + """See `LatestPersonSourcePackageReleaseCache`.""" |
819 | + implements(ILatestPersonSourcePackageReleaseCache) |
820 | delegates(ISourcePackageRelease, context='sourcepackagerelease') |
821 | |
822 | - __storm_table__ = 'LatestPersonSourcepackageReleaseCache' |
823 | + __storm_table__ = 'LatestPersonSourcePackageReleaseCache' |
824 | |
825 | cache_id = Int(name='id', primary=True) |
826 | publication_id = Int(name='publication') |
827 | |
828 | === modified file 'lib/lp/soyuz/stories/soyuz/xx-person-packages.txt' |
829 | --- lib/lp/soyuz/stories/soyuz/xx-person-packages.txt 2012-11-07 12:38:19 +0000 |
830 | +++ lib/lp/soyuz/stories/soyuz/xx-person-packages.txt 2012-11-14 07:45:27 +0000 |
831 | @@ -163,7 +163,7 @@ |
832 | Make a function to update the cached latest person source package release |
833 | records. |
834 | |
835 | - >>> from lp.scripts.garbo import PopulateLatestPersonSourcepackageReleaseCache |
836 | + >>> from lp.scripts.garbo import PopulateLatestPersonSourcePackageReleaseCache |
837 | >>> from lp.services.database.sqlbase import flush_database_updates |
838 | >>> from lp.services.log.logger import FakeLogger |
839 | >>> from lp.testing.dbuser import switch_dbuser |
840 | @@ -180,7 +180,7 @@ |
841 | ... "delete from latestpersonsourcepackagereleasecache") |
842 | ... flush_database_updates() |
843 | ... switch_dbuser('garbo_frequently') |
844 | - ... job = PopulateLatestPersonSourcepackageReleaseCache(FakeLogger()) |
845 | + ... job = PopulateLatestPersonSourcePackageReleaseCache(FakeLogger()) |
846 | ... while not job.isDone(): |
847 | ... job(chunk_size=100) |
848 | ... switch_dbuser('launchpad') |
I don't think you wan to disable half of the production garbojobs or the threading. I think you need to uncomment them to restore the garbo features.