Merge ~cjwatson/launchpad:stormify-codeimportjob into launchpad:master
- Git
- lp:~cjwatson/launchpad
- stormify-codeimportjob
- Merge into master
Proposed by
Colin Watson
Status: | Merged |
---|---|
Approved by: | Colin Watson |
Approved revision: | 0096679e8ec4085ec5f94b7d38bb20e40fcb41de |
Merge reported by: | Otto Co-Pilot |
Merged at revision: | not available |
Proposed branch: | ~cjwatson/launchpad:stormify-codeimportjob |
Merge into: | launchpad:master |
Diff against target: |
535 lines (+106/-93) 9 files modified
lib/lp/code/model/codeimport.py (+4/-4) lib/lp/code/model/codeimportjob.py (+67/-66) lib/lp/code/model/codeimportmachine.py (+5/-4) lib/lp/code/model/tests/test_codeimport.py (+3/-2) lib/lp/code/model/tests/test_codeimportjob.py (+3/-3) lib/lp/code/xmlrpc/codeimportscheduler.py (+7/-4) lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py (+4/-3) lib/lp/codehosting/codeimport/tests/test_workermonitor.py (+4/-3) lib/lp/codehosting/codeimport/workermonitor.py (+9/-4) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Tom Wardill (community) | Approve | ||
Review via email: mp+380981@code.launchpad.net |
Commit message
Convert CodeImportJob to Storm
Description of the change
We need to take a little care around the XML-RPC scheduler: despite XML-RPC strings being Unicode, Python 2's xmlrpclib likes to unmarshal them as plain str when they're within the ASCII subset, so we need to turn them back into Unicode.
To post a comment you must log in.
Revision history for this message
Tom Wardill (twom) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/lib/lp/code/model/codeimport.py b/lib/lp/code/model/codeimport.py |
2 | index a92ee7d..4162632 100644 |
3 | --- a/lib/lp/code/model/codeimport.py |
4 | +++ b/lib/lp/code/model/codeimport.py |
5 | @@ -1,4 +1,4 @@ |
6 | -# Copyright 2009-2016 Canonical Ltd. This software is licensed under the |
7 | +# Copyright 2009-2020 Canonical Ltd. This software is licensed under the |
8 | # GNU Affero General Public License version 3 (see the file LICENSE). |
9 | |
10 | """Database classes including and related to CodeImport.""" |
11 | @@ -34,6 +34,7 @@ from storm.references import Reference |
12 | from zope.component import getUtility |
13 | from zope.event import notify |
14 | from zope.interface import implementer |
15 | +from zope.security.proxy import removeSecurityProxy |
16 | |
17 | from lp.app.errors import NotFoundError |
18 | from lp.code.enums import ( |
19 | @@ -158,7 +159,7 @@ class CodeImport(SQLBase): |
20 | seconds = default_interval_dict.get(self.rcs_type, 21600) |
21 | return timedelta(seconds=seconds) |
22 | |
23 | - import_job = Reference("<primary key>", "CodeImportJob.code_importID", |
24 | + import_job = Reference("<primary key>", "CodeImportJob.code_import_id", |
25 | on_remote=True) |
26 | |
27 | def getImportDetailsForDisplay(self): |
28 | @@ -347,9 +348,8 @@ class CodeImportSet: |
29 | |
30 | def delete(self, code_import): |
31 | """See `ICodeImportSet`.""" |
32 | - from lp.code.model.codeimportjob import CodeImportJob |
33 | if code_import.import_job is not None: |
34 | - CodeImportJob.delete(code_import.import_job.id) |
35 | + removeSecurityProxy(code_import.import_job).destroySelf() |
36 | CodeImport.delete(code_import.id) |
37 | |
38 | def get(self, id): |
39 | diff --git a/lib/lp/code/model/codeimportjob.py b/lib/lp/code/model/codeimportjob.py |
40 | index 702c93c..a758adc 100644 |
41 | --- a/lib/lp/code/model/codeimportjob.py |
42 | +++ b/lib/lp/code/model/codeimportjob.py |
43 | @@ -1,8 +1,10 @@ |
44 | -# Copyright 2009-2019 Canonical Ltd. This software is licensed under the |
45 | +# Copyright 2009-2020 Canonical Ltd. This software is licensed under the |
46 | # GNU Affero General Public License version 3 (see the file LICENSE). |
47 | |
48 | """Database classes for the CodeImportJob table.""" |
49 | |
50 | +from __future__ import absolute_import, print_function, unicode_literals |
51 | + |
52 | __metaclass__ = type |
53 | __all__ = [ |
54 | 'CodeImportJob', |
55 | @@ -10,13 +12,17 @@ __all__ = [ |
56 | 'CodeImportJobWorkflow', |
57 | ] |
58 | |
59 | -import datetime |
60 | - |
61 | -from sqlobject import ( |
62 | - ForeignKey, |
63 | - IntCol, |
64 | - SQLObjectNotFound, |
65 | - StringCol, |
66 | +from datetime import timedelta |
67 | + |
68 | +import pytz |
69 | +from storm.expr import Cast |
70 | +from storm.locals import ( |
71 | + DateTime, |
72 | + Desc, |
73 | + Int, |
74 | + Reference, |
75 | + Store, |
76 | + Unicode, |
77 | ) |
78 | from zope.component import getUtility |
79 | from zope.interface import implementer |
80 | @@ -50,13 +56,10 @@ from lp.code.model.codeimportresult import CodeImportResult |
81 | from lp.registry.interfaces.person import validate_public_person |
82 | from lp.services.config import config |
83 | from lp.services.database.constants import UTC_NOW |
84 | -from lp.services.database.datetimecol import UtcDateTimeCol |
85 | -from lp.services.database.enumcol import EnumCol |
86 | +from lp.services.database.enumcol import DBEnum |
87 | from lp.services.database.interfaces import IStore |
88 | -from lp.services.database.sqlbase import ( |
89 | - SQLBase, |
90 | - sqlvalues, |
91 | - ) |
92 | +from lp.services.database.sqlbase import get_transaction_timestamp |
93 | +from lp.services.database.stormbase import StormBase |
94 | from lp.services.macaroons.interfaces import ( |
95 | BadMacaroonContext, |
96 | IMacaroonIssuer, |
97 | @@ -66,53 +69,48 @@ from lp.services.macaroons.model import MacaroonIssuerBase |
98 | |
99 | |
100 | @implementer(ICodeImportJob) |
101 | -class CodeImportJob(SQLBase): |
102 | +class CodeImportJob(StormBase): |
103 | """See `ICodeImportJob`.""" |
104 | |
105 | - date_created = UtcDateTimeCol(notNull=True, default=UTC_NOW) |
106 | + __storm_table__ = 'CodeImportJob' |
107 | |
108 | - code_import = ForeignKey( |
109 | - dbName='code_import', foreignKey='CodeImport', notNull=True) |
110 | + id = Int(primary=True) |
111 | |
112 | - machine = ForeignKey( |
113 | - dbName='machine', foreignKey='CodeImportMachine', |
114 | - notNull=False, default=None) |
115 | + date_created = DateTime(tzinfo=pytz.UTC, allow_none=False, default=UTC_NOW) |
116 | |
117 | - date_due = UtcDateTimeCol(notNull=True) |
118 | + code_import_id = Int(name='code_import', allow_none=False) |
119 | + code_import = Reference(code_import_id, 'CodeImport.id') |
120 | |
121 | - state = EnumCol( |
122 | - enum=CodeImportJobState, notNull=True, |
123 | + machine_id = Int(name='machine', allow_none=True, default=None) |
124 | + machine = Reference(machine_id, 'CodeImportMachine.id') |
125 | + |
126 | + date_due = DateTime(tzinfo=pytz.UTC, allow_none=False) |
127 | + |
128 | + state = DBEnum( |
129 | + enum=CodeImportJobState, allow_none=False, |
130 | default=CodeImportJobState.PENDING) |
131 | |
132 | - requesting_user = ForeignKey( |
133 | - dbName='requesting_user', foreignKey='Person', |
134 | - storm_validator=validate_public_person, |
135 | - notNull=False, default=None) |
136 | + requesting_user_id = Int( |
137 | + name='requesting_user', allow_none=True, |
138 | + validator=validate_public_person, default=None) |
139 | + requesting_user = Reference(requesting_user_id, 'Person.id') |
140 | + |
141 | + ordering = Int(allow_none=True, default=None) |
142 | |
143 | - ordering = IntCol(notNull=False, default=None) |
144 | + heartbeat = DateTime(tzinfo=pytz.UTC, allow_none=True, default=None) |
145 | |
146 | - heartbeat = UtcDateTimeCol(notNull=False, default=None) |
147 | + logtail = Unicode(allow_none=True, default=None) |
148 | |
149 | - logtail = StringCol(notNull=False, default=None) |
150 | + date_started = DateTime(tzinfo=pytz.UTC, allow_none=True, default=None) |
151 | |
152 | - date_started = UtcDateTimeCol(notNull=False, default=None) |
153 | + def __init__(self, code_import, date_due): |
154 | + super(CodeImportJob, self).__init__() |
155 | + self.code_import = code_import |
156 | + self.date_due = date_due |
157 | |
158 | def isOverdue(self): |
159 | """See `ICodeImportJob`.""" |
160 | - # SQLObject offers no easy way to compare a timestamp to UTC_NOW, so |
161 | - # we must use trickery here. |
162 | - |
163 | - # First we flush any pending update to self to ensure that the |
164 | - # following database query will give the correct result even if |
165 | - # date_due was modified in this transaction. |
166 | - self.syncUpdate() |
167 | - |
168 | - # Then, we try to find a CodeImportJob object with the id of self, and |
169 | - # a date_due of now or past. If we find one, this means self is |
170 | - # overdue. |
171 | - import_job = CodeImportJob.selectOne( |
172 | - "id = %s AND date_due <= %s" % sqlvalues(self.id, UTC_NOW)) |
173 | - return import_job is not None |
174 | + return self.date_due <= get_transaction_timestamp(Store.of(self)) |
175 | |
176 | def makeWorkerArguments(self): |
177 | """See `ICodeImportJob`.""" |
178 | @@ -169,6 +167,9 @@ class CodeImportJob(SQLBase): |
179 | result.append(macaroon.serialize()) |
180 | return result |
181 | |
182 | + def destroySelf(self): |
183 | + Store.of(self).remove(self) |
184 | + |
185 | |
186 | @implementer(ICodeImportJobSet, ICodeImportJobSetPublic) |
187 | class CodeImportJobSet(object): |
188 | @@ -179,10 +180,7 @@ class CodeImportJobSet(object): |
189 | |
190 | def getById(self, id): |
191 | """See `ICodeImportJobSet`.""" |
192 | - try: |
193 | - return CodeImportJob.get(id) |
194 | - except SQLObjectNotFound: |
195 | - return None |
196 | + return IStore(CodeImportJob).get(CodeImportJob, id) |
197 | |
198 | def getJobForMachine(self, hostname, worker_limit): |
199 | """See `ICodeImportJobSet`.""" |
200 | @@ -195,12 +193,12 @@ class CodeImportJobSet(object): |
201 | hostname, CodeImportMachineState.ONLINE) |
202 | elif not machine.shouldLookForJob(worker_limit): |
203 | return None |
204 | - job = CodeImportJob.selectOne( |
205 | - """id IN (SELECT id FROM CodeImportJob |
206 | - WHERE date_due <= %s AND state = %s |
207 | - ORDER BY requesting_user IS NULL, date_due |
208 | - LIMIT 1)""" |
209 | - % sqlvalues(UTC_NOW, CodeImportJobState.PENDING)) |
210 | + job = IStore(CodeImportJob).find( |
211 | + CodeImportJob, |
212 | + CodeImportJob.date_due <= UTC_NOW, |
213 | + CodeImportJob.state == CodeImportJobState.PENDING).order_by( |
214 | + CodeImportJob.requesting_user == None, |
215 | + CodeImportJob.date_due).first() |
216 | if job is not None: |
217 | job_workflow.startJob(job, machine) |
218 | return job |
219 | @@ -209,11 +207,12 @@ class CodeImportJobSet(object): |
220 | |
221 | def getReclaimableJobs(self): |
222 | """See `ICodeImportJobSet`.""" |
223 | + interval = config.codeimportworker.maximum_heartbeat_interval |
224 | return IStore(CodeImportJob).find( |
225 | CodeImportJob, |
226 | - "state = %s and heartbeat < %s + '-%s seconds'" |
227 | - % sqlvalues(CodeImportJobState.RUNNING, UTC_NOW, |
228 | - config.codeimportworker.maximum_heartbeat_interval)) |
229 | + CodeImportJob.state == CodeImportJobState.RUNNING, |
230 | + CodeImportJob.heartbeat < ( |
231 | + UTC_NOW - Cast(timedelta(seconds=interval), 'interval'))) |
232 | |
233 | |
234 | @implementer(ICodeImportJobWorkflow) |
235 | @@ -233,16 +232,18 @@ class CodeImportJobWorkflow: |
236 | interval = code_import.effective_update_interval |
237 | |
238 | job = CodeImportJob(code_import=code_import, date_due=UTC_NOW) |
239 | + IStore(CodeImportJob).add(job) |
240 | |
241 | # Find the most recent CodeImportResult for this CodeImport. We |
242 | # sort by date_created because we do not have an index on |
243 | # date_job_started in the database, and that should give the same |
244 | # sort order. |
245 | - most_recent_result_list = list(CodeImportResult.selectBy( |
246 | - code_import=code_import).orderBy(['-date_created']).limit(1)) |
247 | + most_recent_result = IStore(CodeImportResult).find( |
248 | + CodeImportResult, |
249 | + CodeImportResult.code_import == code_import).order_by( |
250 | + Desc(CodeImportResult.date_created)).first() |
251 | |
252 | - if len(most_recent_result_list) != 0: |
253 | - [most_recent_result] = most_recent_result_list |
254 | + if most_recent_result is not None: |
255 | date_due = most_recent_result.date_job_started + interval |
256 | job.date_due = max(job.date_due, date_due) |
257 | |
258 | @@ -301,7 +302,7 @@ class CodeImportJobWorkflow: |
259 | naked_job = removeSecurityProxy(import_job) |
260 | naked_job.date_started = UTC_NOW |
261 | naked_job.heartbeat = UTC_NOW |
262 | - naked_job.logtail = u'' |
263 | + naked_job.logtail = '' |
264 | naked_job.machine = machine |
265 | naked_job.state = CodeImportJobState.RUNNING |
266 | getUtility(ICodeImportEventSet).newStart( |
267 | @@ -365,7 +366,7 @@ class CodeImportJobWorkflow: |
268 | code_import.updateFromData( |
269 | dict(review_status=CodeImportReviewStatus.FAILING), None) |
270 | elif status == CodeImportResultStatus.SUCCESS_PARTIAL: |
271 | - interval = datetime.timedelta(0) |
272 | + interval = timedelta(0) |
273 | elif failure_count > 0: |
274 | interval = (code_import.effective_update_interval * |
275 | (2 ** (failure_count - 1))) |
276 | @@ -406,7 +407,7 @@ class CodeImportJobWorkflow: |
277 | import_job, CodeImportResultStatus.RECLAIMED, None) |
278 | # 3) |
279 | if code_import.review_status == CodeImportReviewStatus.REVIEWED: |
280 | - self.newJob(code_import, datetime.timedelta(0)) |
281 | + self.newJob(code_import, timedelta(0)) |
282 | # 4) |
283 | getUtility(ICodeImportEventSet).newReclaim( |
284 | code_import, machine, job_id) |
285 | diff --git a/lib/lp/code/model/codeimportmachine.py b/lib/lp/code/model/codeimportmachine.py |
286 | index 4dfaeb7..1983adc 100644 |
287 | --- a/lib/lp/code/model/codeimportmachine.py |
288 | +++ b/lib/lp/code/model/codeimportmachine.py |
289 | @@ -1,4 +1,4 @@ |
290 | -# Copyright 2009 Canonical Ltd. This software is licensed under the |
291 | +# Copyright 2009-2020 Canonical Ltd. This software is licensed under the |
292 | # GNU Affero General Public License version 3 (see the file LICENSE). |
293 | |
294 | """Database classes including and related to CodeImportMachine.""" |
295 | @@ -14,6 +14,7 @@ from sqlobject import ( |
296 | SQLMultipleJoin, |
297 | StringCol, |
298 | ) |
299 | +from storm.locals import ReferenceSet |
300 | from zope.component import getUtility |
301 | from zope.interface import implementer |
302 | |
303 | @@ -48,9 +49,9 @@ class CodeImportMachine(SQLBase): |
304 | default=CodeImportMachineState.OFFLINE) |
305 | heartbeat = UtcDateTimeCol(notNull=False) |
306 | |
307 | - current_jobs = SQLMultipleJoin( |
308 | - 'CodeImportJob', joinColumn='machine', |
309 | - orderBy=['date_started', 'id']) |
310 | + current_jobs = ReferenceSet( |
311 | + '<primary key>', 'CodeImportJob.machine_id', |
312 | + order_by=('CodeImportJob.date_started', 'CodeImportJob.id')) |
313 | |
314 | events = SQLMultipleJoin( |
315 | 'CodeImportEvent', joinColumn='machine', |
316 | diff --git a/lib/lp/code/model/tests/test_codeimport.py b/lib/lp/code/model/tests/test_codeimport.py |
317 | index a57f79e..7b5166a 100644 |
318 | --- a/lib/lp/code/model/tests/test_codeimport.py |
319 | +++ b/lib/lp/code/model/tests/test_codeimport.py |
320 | @@ -1,4 +1,4 @@ |
321 | -# Copyright 2009-2017 Canonical Ltd. This software is licensed under the |
322 | +# Copyright 2009-2020 Canonical Ltd. This software is licensed under the |
323 | # GNU Affero General Public License version 3 (see the file LICENSE). |
324 | |
325 | """Unit tests for methods of CodeImport and CodeImportSet.""" |
326 | @@ -47,6 +47,7 @@ from lp.code.model.codeimportresult import CodeImportResult |
327 | from lp.code.tests.codeimporthelpers import make_running_import |
328 | from lp.code.tests.helpers import GitHostingFixture |
329 | from lp.registry.interfaces.person import IPersonSet |
330 | +from lp.services.database.interfaces import IStore |
331 | from lp.testing import ( |
332 | login, |
333 | login_person, |
334 | @@ -377,7 +378,7 @@ class TestCodeImportStatusUpdate(TestCodeImportBase): |
335 | self.import_operator = getUtility(IPersonSet).getByEmail( |
336 | 'david.allouche@canonical.com') |
337 | # Remove existing jobs. |
338 | - for job in CodeImportJob.select(): |
339 | + for job in IStore(CodeImportJob).find(CodeImportJob): |
340 | job.destroySelf() |
341 | |
342 | def makeApprovedImportWithPendingJob(self): |
343 | diff --git a/lib/lp/code/model/tests/test_codeimportjob.py b/lib/lp/code/model/tests/test_codeimportjob.py |
344 | index 68499dd..c099944 100644 |
345 | --- a/lib/lp/code/model/tests/test_codeimportjob.py |
346 | +++ b/lib/lp/code/model/tests/test_codeimportjob.py |
347 | @@ -1,4 +1,4 @@ |
348 | -# Copyright 2009-2019 Canonical Ltd. This software is licensed under the |
349 | +# Copyright 2009-2020 Canonical Ltd. This software is licensed under the |
350 | # GNU Affero General Public License version 3 (see the file LICENSE). |
351 | |
352 | """Unit tests for CodeImportJob and CodeImportJobWorkflow.""" |
353 | @@ -237,7 +237,7 @@ class TestCodeImportJobSetGetJobForMachine(TestCaseWithFactory): |
354 | # the sample data and set up some objects. |
355 | super(TestCodeImportJobSetGetJobForMachine, self).setUp() |
356 | login_for_code_imports() |
357 | - for job in CodeImportJob.select(): |
358 | + for job in IStore(CodeImportJob).find(CodeImportJob): |
359 | job.destroySelf() |
360 | self.machine = self.factory.makeCodeImportMachine(set_online=True) |
361 | |
362 | @@ -351,7 +351,7 @@ class ReclaimableJobTests(TestCaseWithFactory): |
363 | def setUp(self): |
364 | super(ReclaimableJobTests, self).setUp() |
365 | login_for_code_imports() |
366 | - for job in CodeImportJob.select(): |
367 | + for job in IStore(CodeImportJob).find(CodeImportJob): |
368 | job.destroySelf() |
369 | |
370 | def makeJobWithHeartbeatInPast(self, seconds_in_past): |
371 | diff --git a/lib/lp/code/xmlrpc/codeimportscheduler.py b/lib/lp/code/xmlrpc/codeimportscheduler.py |
372 | index 1cf05cb..35ba346 100644 |
373 | --- a/lib/lp/code/xmlrpc/codeimportscheduler.py |
374 | +++ b/lib/lp/code/xmlrpc/codeimportscheduler.py |
375 | @@ -1,4 +1,4 @@ |
376 | -# Copyright 2009-2018 Canonical Ltd. This software is licensed under the |
377 | +# Copyright 2009-2020 Canonical Ltd. This software is licensed under the |
378 | # GNU Affero General Public License version 3 (see the file LICENSE). |
379 | |
380 | """The code import scheduler XML-RPC API.""" |
381 | @@ -8,6 +8,7 @@ __all__ = [ |
382 | 'CodeImportSchedulerAPI', |
383 | ] |
384 | |
385 | +import six |
386 | from zope.component import getUtility |
387 | from zope.interface import implementer |
388 | from zope.security.proxy import removeSecurityProxy |
389 | @@ -35,7 +36,7 @@ class CodeImportSchedulerAPI(LaunchpadXMLRPCView): |
390 | def getJobForMachine(self, hostname, worker_limit): |
391 | """See `ICodeImportScheduler`.""" |
392 | job = getUtility(ICodeImportJobSet).getJobForMachine( |
393 | - hostname, worker_limit) |
394 | + six.ensure_text(hostname), worker_limit) |
395 | if job is not None: |
396 | return job.id |
397 | else: |
398 | @@ -58,11 +59,13 @@ class CodeImportSchedulerAPI(LaunchpadXMLRPCView): |
399 | |
400 | def updateHeartbeat(self, job_id, log_tail): |
401 | """See `ICodeImportScheduler`.""" |
402 | - return self._updateHeartbeat(job_id, log_tail) |
403 | + return self._updateHeartbeat(job_id, six.ensure_text(log_tail)) |
404 | |
405 | def finishJobID(self, job_id, status_name, log_file_alias_url): |
406 | """See `ICodeImportScheduler`.""" |
407 | - return self._finishJobID(job_id, status_name, log_file_alias_url) |
408 | + return self._finishJobID( |
409 | + job_id, six.ensure_text(status_name), |
410 | + six.ensure_text(log_file_alias_url)) |
411 | |
412 | @return_fault |
413 | def _getImportDataForJobID(self, job_id): |
414 | diff --git a/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py b/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py |
415 | index 60ad9ce..91ceae3 100644 |
416 | --- a/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py |
417 | +++ b/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py |
418 | @@ -1,4 +1,4 @@ |
419 | -# Copyright 2010-2018 Canonical Ltd. This software is licensed under the |
420 | +# Copyright 2010-2020 Canonical Ltd. This software is licensed under the |
421 | # GNU Affero General Public License version 3 (see the file LICENSE). |
422 | |
423 | """Test for the methods of `ICodeImportScheduler`.""" |
424 | @@ -15,6 +15,7 @@ from lp.code.model.codeimportjob import CodeImportJob |
425 | from lp.code.tests.codeimporthelpers import make_running_import |
426 | from lp.code.xmlrpc.codeimportscheduler import CodeImportSchedulerAPI |
427 | from lp.services.database.constants import UTC_NOW |
428 | +from lp.services.database.interfaces import IStore |
429 | from lp.services.webapp import canonical_url |
430 | from lp.testing import ( |
431 | run_with_login, |
432 | @@ -32,7 +33,7 @@ class TestCodeImportSchedulerAPI(TestCaseWithFactory): |
433 | TestCaseWithFactory.setUp(self) |
434 | self.api = CodeImportSchedulerAPI(None, None) |
435 | self.machine = self.factory.makeCodeImportMachine(set_online=True) |
436 | - for job in CodeImportJob.select(): |
437 | + for job in IStore(CodeImportJob).find(CodeImportJob): |
438 | job.destroySelf() |
439 | |
440 | def makeCodeImportJob(self, running): |
441 | @@ -84,7 +85,7 @@ class TestCodeImportSchedulerAPI(TestCaseWithFactory): |
442 | def test_updateHeartbeat(self): |
443 | # updateHeartbeat calls the updateHeartbeat job workflow method. |
444 | code_import_job = self.makeCodeImportJob(running=True) |
445 | - log_tail = self.factory.getUniqueString() |
446 | + log_tail = self.factory.getUniqueUnicode() |
447 | self.api.updateHeartbeat(code_import_job.id, log_tail) |
448 | self.assertSqlAttributeEqualsDate( |
449 | code_import_job, 'heartbeat', UTC_NOW) |
450 | diff --git a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py |
451 | index 047cbc9..d2422cb 100644 |
452 | --- a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py |
453 | +++ b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py |
454 | @@ -1,4 +1,4 @@ |
455 | -# Copyright 2009-2018 Canonical Ltd. This software is licensed under the |
456 | +# Copyright 2009-2020 Canonical Ltd. This software is licensed under the |
457 | # GNU Affero General Public License version 3 (see the file LICENSE). |
458 | |
459 | """Tests for the CodeImportWorkerMonitor and related classes.""" |
460 | @@ -72,6 +72,7 @@ from lp.services.config.fixture import ( |
461 | ConfigFixture, |
462 | ConfigUseFixture, |
463 | ) |
464 | +from lp.services.database.interfaces import IStore |
465 | from lp.services.log.logger import BufferLogger |
466 | from lp.services.twistedsupport import suppress_stderr |
467 | from lp.services.twistedsupport.tests.test_processmonitor import ( |
468 | @@ -669,7 +670,7 @@ class TestWorkerMonitorRunNoProcess(TestCase): |
469 | |
470 | def nuke_codeimport_sample_data(): |
471 | """Delete all the sample data that might interfere with tests.""" |
472 | - for job in CodeImportJob.select(): |
473 | + for job in IStore(CodeImportJob).find(CodeImportJob): |
474 | job.destroySelf() |
475 | for code_import in CodeImport.select(): |
476 | code_import.destroySelf() |
477 | @@ -796,7 +797,7 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase): |
478 | code_import.updateFromData( |
479 | {'review_status': CodeImportReviewStatus.REVIEWED}, |
480 | self.factory.makePerson()) |
481 | - job = getUtility(ICodeImportJobSet).getJobForMachine('machine', 10) |
482 | + job = getUtility(ICodeImportJobSet).getJobForMachine(u'machine', 10) |
483 | self.assertEqual(code_import, job.code_import) |
484 | source_details = CodeImportSourceDetails.fromArguments( |
485 | removeSecurityProxy(job.makeWorkerArguments())) |
486 | diff --git a/lib/lp/codehosting/codeimport/workermonitor.py b/lib/lp/codehosting/codeimport/workermonitor.py |
487 | index 767f0e3..2a2035a 100644 |
488 | --- a/lib/lp/codehosting/codeimport/workermonitor.py |
489 | +++ b/lib/lp/codehosting/codeimport/workermonitor.py |
490 | @@ -1,4 +1,4 @@ |
491 | -# Copyright 2009-2018 Canonical Ltd. This software is licensed under the |
492 | +# Copyright 2009-2020 Canonical Ltd. This software is licensed under the |
493 | # GNU Affero General Public License version 3 (see the file LICENSE). |
494 | |
495 | """Code to talk to the database about what the worker script is doing.""" |
496 | @@ -10,6 +10,7 @@ __all__ = [] |
497 | import os |
498 | import tempfile |
499 | |
500 | +import six |
501 | from twisted.internet import ( |
502 | defer, |
503 | error, |
504 | @@ -57,7 +58,7 @@ class CodeImportWorkerMonitorProtocol(ProcessMonitorProtocolWithTimeout): |
505 | self, deferred, clock=clock, |
506 | timeout=config.codeimport.worker_inactivity_timeout) |
507 | self.worker_monitor = worker_monitor |
508 | - self._tail = '' |
509 | + self._tail = b'' |
510 | self._log_file = log_file |
511 | self._looping_call = task.LoopingCall(self._updateHeartbeat) |
512 | self._looping_call.clock = self._clock |
513 | @@ -91,7 +92,7 @@ class CodeImportWorkerMonitorProtocol(ProcessMonitorProtocolWithTimeout): |
514 | """ |
515 | self.resetTimeout() |
516 | self._log_file.write(data) |
517 | - self._tail = '\n'.join((self._tail + data).split('\n')[-5:]) |
518 | + self._tail = b'\n'.join((self._tail + data).split(b'\n')[-5:]) |
519 | |
520 | errReceived = outReceived |
521 | |
522 | @@ -195,8 +196,12 @@ class CodeImportWorkerMonitor: |
523 | def updateHeartbeat(self, tail): |
524 | """Call the updateHeartbeat method for the job we are working on.""" |
525 | self._logger.debug("Updating heartbeat.") |
526 | + # The log tail is really bytes, but it's stored in the database as a |
527 | + # text column, so it's easiest to convert it to text now; passing |
528 | + # text over XML-RPC requires less boilerplate than bytes anyway. |
529 | deferred = self.codeimport_endpoint.callRemote( |
530 | - 'updateHeartbeat', self._job_id, tail) |
531 | + 'updateHeartbeat', self._job_id, |
532 | + six.ensure_text(tail, errors='replace')) |
533 | return deferred.addErrback(self._trap_nosuchcodeimportjob) |
534 | |
535 | def _createLibrarianFileAlias(self, name, size, file, contentType): |