Merge lp:~mwhudson/launchpad/no-codeimportworker-db-access into lp:launchpad
- no-codeimportworker-db-access
- Merge into devel
Proposed by
Michael Hudson-Doyle
Status: | Merged | ||||||||
---|---|---|---|---|---|---|---|---|---|
Approved by: | Tim Penhey | ||||||||
Approved revision: | no longer in the source branch. | ||||||||
Merged at revision: | not available | ||||||||
Proposed branch: | lp:~mwhudson/launchpad/no-codeimportworker-db-access | ||||||||
Merge into: | lp:launchpad | ||||||||
Diff against target: |
1534 lines (+585/-360) 15 files modified
database/schema/security.cfg (+0/-22) lib/canonical/config/schema-lazr.conf (+2/-2) lib/canonical/launchpad/interfaces/librarian.py (+1/-1) lib/canonical/launchpad/xmlrpc/faults.py (+10/-0) lib/lp/code/interfaces/codeimportscheduler.py (+26/-0) lib/lp/code/model/tests/test_codeimportjob.py (+4/-27) lib/lp/code/xmlrpc/codeimportscheduler.py (+66/-5) lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py (+140/-0) lib/lp/codehosting/codeimport/dispatcher.py (+1/-1) lib/lp/codehosting/codeimport/tests/test_workermonitor.py (+244/-179) lib/lp/codehosting/codeimport/workermonitor.py (+58/-111) lib/lp/codehosting/inmemory.py (+12/-2) lib/lp/testing/__init__.py (+1/-1) scripts/code-import-worker-monitor.py (+19/-8) scripts/code-import-worker.py (+1/-1) |
||||||||
To merge this branch: | bzr merge lp:~mwhudson/launchpad/no-codeimportworker-db-access | ||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Tim Penhey (community) | Approve | ||
Review via email: mp+21201@code.launchpad.net |
Commit message
Have the code import worker monitor talk to the internal xml-rpc server rather than directly to the database.
Description of the change
Hi there,
this branch removes the need for the code import worker monitor script to talk directly to the database, and makes it talk to the internal xml-rpc server instead.
In some sense this isn't very exciting, although the diff is rather large. Much of the change is in the worker monitor tests -- all the rest is fairly straightforward I think.
Cheers,
mwh
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'database/schema/security.cfg' |
2 | --- database/schema/security.cfg 2010-03-10 17:55:59 +0000 |
3 | +++ database/schema/security.cfg 2010-03-14 20:28:32 +0000 |
4 | @@ -586,28 +586,6 @@ |
5 | public.validpersonorteamcache = SELECT |
6 | public.wikiname = SELECT, INSERT |
7 | |
8 | -[codeimportworker] |
9 | -type=user |
10 | -public.branch = SELECT, UPDATE |
11 | -public.branchsubscription = SELECT |
12 | -public.codeimport = SELECT, UPDATE |
13 | -public.codeimportevent = SELECT, INSERT, UPDATE |
14 | -public.codeimporteventdata = SELECT, INSERT |
15 | -public.codeimportmachine = SELECT |
16 | -public.codeimportresult = SELECT, INSERT, UPDATE |
17 | -public.codeimportjob = SELECT, INSERT, UPDATE, DELETE |
18 | -public.distribution = SELECT |
19 | -public.distroseries = SELECT |
20 | -public.emailaddress = SELECT |
21 | -public.libraryfilealias = SELECT, INSERT, UPDATE |
22 | -public.libraryfilecontent = SELECT, INSERT |
23 | -public.person = SELECT |
24 | -public.product = SELECT |
25 | -public.productseries = SELECT |
26 | -public.productseriescodeimport = SELECT |
27 | -public.sourcepackagename = SELECT |
28 | -public.teammembership = SELECT |
29 | - |
30 | [branchscanner] |
31 | type=user |
32 | groups=write, script |
33 | |
34 | === modified file 'lib/canonical/config/schema-lazr.conf' |
35 | --- lib/canonical/config/schema-lazr.conf 2010-02-24 10:18:16 +0000 |
36 | +++ lib/canonical/config/schema-lazr.conf 2010-03-14 20:28:32 +0000 |
37 | @@ -472,8 +472,8 @@ |
38 | |
39 | |
40 | [codeimportworker] |
41 | -# This code is used by the code-import-worker-db which lives in |
42 | -# scripts/code-import-worker-db.py and |
43 | +# This code is used by the code-import-worker-monitor which lives in |
44 | +# scripts/code-import-worker-monitor.py and |
45 | # lib/lp/codehosting/codeimport/worker_monitor.py. |
46 | |
47 | # The interval in seconds the worker should wait between updates to the |
48 | |
49 | === modified file 'lib/canonical/launchpad/interfaces/librarian.py' |
50 | --- lib/canonical/launchpad/interfaces/librarian.py 2009-11-24 15:36:44 +0000 |
51 | +++ lib/canonical/launchpad/interfaces/librarian.py 2010-03-14 20:28:32 +0000 |
52 | @@ -150,7 +150,7 @@ |
53 | from the Librarian at this time. See LibrarianGarbageCollection. |
54 | |
55 | If restricted is True, the file will be created through the |
56 | - IRestricteLibrarianClient utility. |
57 | + IRestrictedLibrarianClient utility. |
58 | """ |
59 | |
60 | def __getitem__(key): |
61 | |
62 | === modified file 'lib/canonical/launchpad/xmlrpc/faults.py' |
63 | --- lib/canonical/launchpad/xmlrpc/faults.py 2010-02-21 22:13:34 +0000 |
64 | +++ lib/canonical/launchpad/xmlrpc/faults.py 2010-03-14 20:28:32 +0000 |
65 | @@ -27,6 +27,7 @@ |
66 | 'NoLinkedBranch', |
67 | 'NoSuchBranch', |
68 | 'NoSuchBug', |
69 | + 'NoSuchCodeImportJob', |
70 | 'NoSuchDistribution', |
71 | 'NoSuchPackage', |
72 | 'NoSuchPerson', |
73 | @@ -474,3 +475,12 @@ |
74 | self.sourcepackagename = sourcepackagename |
75 | LaunchpadFault.__init__(self, sourcepackagename=sourcepackagename) |
76 | |
77 | + |
78 | +class NoSuchCodeImportJob(LaunchpadFault): |
79 | + """Raised by `ICodeImportScheduler` methods when a job is not found.""" |
80 | + |
81 | + error_code = 360 |
82 | + msg_template = 'Job %(job_id)d not found.' |
83 | + |
84 | + def __init__(self, job_id): |
85 | + LaunchpadFault.__init__(self, job_id=job_id) |
86 | |
87 | === modified file 'lib/lp/code/interfaces/codeimportscheduler.py' |
88 | --- lib/lp/code/interfaces/codeimportscheduler.py 2010-02-22 08:10:03 +0000 |
89 | +++ lib/lp/code/interfaces/codeimportscheduler.py 2010-03-14 20:28:32 +0000 |
90 | @@ -16,6 +16,7 @@ |
91 | |
92 | from zope.interface import Interface |
93 | |
94 | + |
95 | class ICodeImportSchedulerApplication(ILaunchpadApplication): |
96 | """Code import scheduler application root.""" |
97 | |
98 | @@ -35,3 +36,28 @@ |
99 | mark it as having started on said machine and return its id, |
100 | or 0 if there are no jobs pending. |
101 | """ |
102 | + |
103 | + def getImportDataForJobID(job_id): |
104 | + """Get data about the import with job id `job_id`. |
105 | + |
106 | + :return: ``(worker_arguments, branch_url, log_file_name)`` where: |
107 | + * ``worker_arguments`` are the arguments to pass to the code |
108 | + import worker subprocess. |
109 | + * ``branch_url`` is the URL of the import branch (only used to put |
110 | + in OOPS reports) |
111 | + * ``log_file_name`` is the name of the log file to create in the |
112 | + librarian. |
113 | + :raise NoSuchCodeImportJob: if no job with id `job_id` exists. |
114 | + """ |
115 | + |
116 | + def updateHeartbeat(job_id, log_tail): |
117 | + """Call `ICodeImportJobWorkflow.updateHeartbeat` for job `job_id`. |
118 | + |
119 | + :raise NoSuchCodeImportJob: if no job with id `job_id` exists. |
120 | + """ |
121 | + |
122 | + def finishJobID(job_id, status_name, log_file_alias_url): |
123 | + """Call `ICodeImportJobWorkflow.finishJob` for job `job_id`. |
124 | + |
125 | + :raise NoSuchCodeImportJob: if no job with id `job_id` exists. |
126 | + """ |
127 | |
128 | === modified file 'lib/lp/code/model/tests/test_codeimportjob.py' |
129 | --- lib/lp/code/model/tests/test_codeimportjob.py 2010-02-24 13:17:01 +0000 |
130 | +++ lib/lp/code/model/tests/test_codeimportjob.py 2010-03-14 20:28:32 +0000 |
131 | @@ -14,6 +14,8 @@ |
132 | |
133 | from sqlobject.sqlbuilder import SQLConstant |
134 | |
135 | +import transaction |
136 | + |
137 | from twisted.python.util import mergeFunctionMetadata |
138 | |
139 | from zope.component import getUtility |
140 | @@ -702,24 +704,13 @@ |
141 | AssertFailureMixin, AssertEventMixin): |
142 | """Unit tests for CodeImportJobWorkflow.finishJob.""" |
143 | |
144 | - layer = LaunchpadZopelessLayer |
145 | + layer = LaunchpadFunctionalLayer |
146 | |
147 | def setUp(self): |
148 | super(TestCodeImportJobWorkflowFinishJob, self).setUp() |
149 | login_for_code_imports() |
150 | self.machine = self.factory.makeCodeImportMachine() |
151 | self.machine.setOnline() |
152 | - self.switchDbUser_called = False |
153 | - |
154 | - def tearDown(self): |
155 | - super(TestCodeImportJobWorkflowFinishJob, self).tearDown() |
156 | - self.assertTrue( |
157 | - self.switchDbUser_called, "switchDbUser() not called!") |
158 | - |
159 | - def switchDbUser(self): |
160 | - self.layer.txn.commit() |
161 | - self.layer.switchDbUser('codeimportworker') |
162 | - self.switchDbUser_called = True |
163 | |
164 | def makeRunningJob(self, code_import=None): |
165 | """Make and return a CodeImportJob object with state==RUNNING. |
166 | @@ -740,7 +731,6 @@ |
167 | # Calling finishJob with a job whose state is not RUNNING is an error. |
168 | code_import = self.factory.makeCodeImport() |
169 | job = self.factory.makeCodeImportJob(code_import) |
170 | - self.switchDbUser() |
171 | self.assertFailure( |
172 | "The CodeImportJob associated with %s is " |
173 | "PENDING." % code_import.branch.unique_name, |
174 | @@ -754,7 +744,6 @@ |
175 | # finishJob() deletes the job it is passed. |
176 | running_job = self.makeRunningJob() |
177 | running_job_id = running_job.id |
178 | - self.switchDbUser() |
179 | getUtility(ICodeImportJobWorkflow).finishJob( |
180 | running_job, CodeImportResultStatus.SUCCESS, None) |
181 | self.assertEqual( |
182 | @@ -765,7 +754,6 @@ |
183 | # scheduled appropriately far in the future. |
184 | running_job = self.makeRunningJob() |
185 | code_import = running_job.code_import |
186 | - self.switchDbUser() |
187 | getUtility(ICodeImportJobWorkflow).finishJob( |
188 | running_job, CodeImportResultStatus.SUCCESS, None) |
189 | new_job = code_import.import_job |
190 | @@ -782,7 +770,6 @@ |
191 | # now. |
192 | running_job = self.makeRunningJob() |
193 | code_import = running_job.code_import |
194 | - self.switchDbUser() |
195 | getUtility(ICodeImportJobWorkflow).finishJob( |
196 | running_job, CodeImportResultStatus.SUCCESS_PARTIAL, None) |
197 | new_job = code_import.import_job |
198 | @@ -800,7 +787,6 @@ |
199 | 'david.allouche@canonical.com') |
200 | code_import.updateFromData( |
201 | {'review_status': CodeImportReviewStatus.SUSPENDED}, ddaa) |
202 | - self.switchDbUser() |
203 | getUtility(ICodeImportJobWorkflow).finishJob( |
204 | running_job, CodeImportResultStatus.SUCCESS, None) |
205 | self.assertTrue(code_import.import_job is None) |
206 | @@ -812,7 +798,6 @@ |
207 | # Before calling finishJob() there are no CodeImportResults for the |
208 | # given import... |
209 | self.assertEqual(len(list(code_import.results)), 0) |
210 | - self.switchDbUser() |
211 | getUtility(ICodeImportJobWorkflow).finishJob( |
212 | running_job, CodeImportResultStatus.SUCCESS, None) |
213 | # ... and after, there is exactly one. |
214 | @@ -841,13 +826,11 @@ |
215 | # methods -- e.g. calling requestJob to set requesting_user -- but |
216 | # using removeSecurityProxy and forcing here is expedient. |
217 | setattr(removeSecurityProxy(job), from_field, value) |
218 | - self.switchDbUser() |
219 | result = self.getResultForJob(job) |
220 | self.assertEqual( |
221 | value, getattr(result, to_field), |
222 | "Value %r in job field %r was not passed through to result field" |
223 | " %r." % (value, from_field, to_field)) |
224 | - self.layer.switchDbUser('launchpad') |
225 | |
226 | def test_resultObjectFields(self): |
227 | # The CodeImportResult object that finishJob creates contains all the |
228 | @@ -898,7 +881,6 @@ |
229 | status_jobs = [] |
230 | for status in CodeImportResultStatus.items: |
231 | status_jobs.append((status, self.makeRunningJob())) |
232 | - self.switchDbUser() |
233 | for status, job in status_jobs: |
234 | result = self.getResultForJob(job, status) |
235 | self.assertEqual(result.status, status) |
236 | @@ -909,12 +891,11 @@ |
237 | |
238 | job = self.makeRunningJob() |
239 | |
240 | - self.switchDbUser() |
241 | log_data = 'several\nlines\nof\nlog data' |
242 | log_alias_id = getUtility(ILibrarianClient).addFile( |
243 | 'import_log.txt', len(log_data), |
244 | StringIO.StringIO(log_data), 'text/plain') |
245 | - self.layer.txn.commit() |
246 | + transaction.commit() |
247 | log_alias = getUtility(ILibraryFileAliasSet)[log_alias_id] |
248 | result = self.getResultForJob(job, log_alias=log_alias) |
249 | |
250 | @@ -927,7 +908,6 @@ |
251 | code_import = running_job.code_import |
252 | machine = running_job.machine |
253 | new_events = NewEvents() |
254 | - self.switchDbUser() |
255 | getUtility(ICodeImportJobWorkflow).finishJob( |
256 | running_job, CodeImportResultStatus.SUCCESS, None) |
257 | [finish_event] = list(new_events) |
258 | @@ -941,7 +921,6 @@ |
259 | status_jobs = [] |
260 | for status in CodeImportResultStatus.items: |
261 | status_jobs.append((status, self.makeRunningJob())) |
262 | - self.switchDbUser() |
263 | for status, job in status_jobs: |
264 | code_import = job.code_import |
265 | self.assertTrue(code_import.date_last_successful is None) |
266 | @@ -958,7 +937,6 @@ |
267 | status_jobs = [] |
268 | for status in CodeImportResultStatus.items: |
269 | status_jobs.append((status, self.makeRunningJob())) |
270 | - self.switchDbUser() |
271 | for status, job in status_jobs: |
272 | code_import = job.code_import |
273 | self.assertTrue(code_import.date_last_successful is None) |
274 | @@ -984,7 +962,6 @@ |
275 | self.assertEqual( |
276 | CodeImportReviewStatus.REVIEWED, code_import.review_status) |
277 | running_job = self.makeRunningJob(code_import) |
278 | - self.switchDbUser() |
279 | getUtility(ICodeImportJobWorkflow).finishJob( |
280 | running_job, CodeImportResultStatus.FAILURE, None) |
281 | self.assertEqual( |
282 | |
283 | === modified file 'lib/lp/code/xmlrpc/codeimportscheduler.py' |
284 | --- lib/lp/code/xmlrpc/codeimportscheduler.py 2010-02-19 03:15:04 +0000 |
285 | +++ lib/lp/code/xmlrpc/codeimportscheduler.py 2010-03-14 20:28:32 +0000 |
286 | @@ -8,13 +8,20 @@ |
287 | 'CodeImportSchedulerAPI', |
288 | ] |
289 | |
290 | - |
291 | -from lp.code.interfaces.codeimportjob import ICodeImportJobSet |
292 | -from lp.code.interfaces.codeimportscheduler import ICodeImportScheduler |
293 | -from canonical.launchpad.webapp import LaunchpadXMLRPCView |
294 | - |
295 | from zope.component import getUtility |
296 | from zope.interface import implements |
297 | +from zope.security.proxy import removeSecurityProxy |
298 | + |
299 | +from canonical.launchpad.interfaces import ILibraryFileAliasSet |
300 | +from canonical.launchpad.webapp import canonical_url, LaunchpadXMLRPCView |
301 | +from canonical.launchpad.xmlrpc.faults import NoSuchCodeImportJob |
302 | +from canonical.launchpad.xmlrpc.helpers import return_fault |
303 | + |
304 | +from lp.code.enums import CodeImportResultStatus |
305 | +from lp.code.interfaces.codeimportjob import ( |
306 | + ICodeImportJobSet, ICodeImportJobWorkflow) |
307 | +from lp.code.interfaces.codeimportscheduler import ICodeImportScheduler |
308 | +from lp.codehosting.codeimport.worker import CodeImportSourceDetails |
309 | |
310 | |
311 | class CodeImportSchedulerAPI(LaunchpadXMLRPCView): |
312 | @@ -30,3 +37,57 @@ |
313 | return job.id |
314 | else: |
315 | return 0 |
316 | + |
317 | + def _getJob(self, job_id): |
318 | + job_set = removeSecurityProxy(getUtility(ICodeImportJobSet)) |
319 | + job = removeSecurityProxy(job_set.getById(job_id)) |
320 | + if job is None: |
321 | + raise NoSuchCodeImportJob(job_id) |
322 | + return job |
323 | + |
324 | + # Because you can't use a decorated function as the implementation of a |
325 | + # method exported over XML-RPC, the implementations just thunk to an |
326 | + # implementation wrapped with @return_fault. |
327 | + |
328 | + def getImportDataForJobID(self, job_id): |
329 | + """See `ICodeImportScheduler`.""" |
330 | + return self._getImportDataForJobID(job_id) |
331 | + |
332 | + def updateHeartbeat(self, job_id, log_tail): |
333 | + """See `ICodeImportScheduler`.""" |
334 | + return self._updateHeartbeat(job_id, log_tail) |
335 | + |
336 | + def finishJobID(self, job_id, status_name, log_file_alias_url): |
337 | + """See `ICodeImportScheduler`.""" |
338 | + return self._finishJobID(job_id, status_name, log_file_alias_url) |
339 | + |
340 | + @return_fault |
341 | + def _getImportDataForJobID(self, job_id): |
342 | + job = self._getJob(job_id) |
343 | + arguments = CodeImportSourceDetails.fromCodeImport( |
344 | + job.code_import).asArguments() |
345 | + branch = job.code_import.branch |
346 | + branch_url = canonical_url(branch) |
347 | + log_file_name = '%s.log' % branch.unique_name[1:].replace('/', '-') |
348 | + return (arguments, branch_url, log_file_name) |
349 | + |
350 | + @return_fault |
351 | + def _updateHeartbeat(self, job_id, log_tail): |
352 | + job = self._getJob(job_id) |
353 | + workflow = removeSecurityProxy(getUtility(ICodeImportJobWorkflow)) |
354 | + workflow.updateHeartbeat(job, log_tail) |
355 | + return 0 |
356 | + |
357 | + @return_fault |
358 | + def _finishJobID(self, job_id, status_name, log_file_alias_url): |
359 | + job = self._getJob(job_id) |
360 | + status = CodeImportResultStatus.items[status_name] |
361 | + workflow = removeSecurityProxy(getUtility(ICodeImportJobWorkflow)) |
362 | + if log_file_alias_url: |
363 | + library_file_alias_set = getUtility(ILibraryFileAliasSet) |
364 | + # XXX This is so so so terrible: |
365 | + log_file_alias_id = int(log_file_alias_url.split('/')[-2]) |
366 | + log_file_alias = library_file_alias_set[log_file_alias_id] |
367 | + else: |
368 | + log_file_alias = None |
369 | + workflow.finishJob(job, status, log_file_alias) |
370 | |
371 | === added file 'lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py' |
372 | --- lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py 1970-01-01 00:00:00 +0000 |
373 | +++ lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py 2010-03-14 20:28:32 +0000 |
374 | @@ -0,0 +1,140 @@ |
375 | +# Copyright 2010 Canonical Ltd. This software is licensed under the |
376 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
377 | + |
378 | +"""Test for the methods of `ICodeImportScheduler`.""" |
379 | + |
380 | +__metaclass__ = type |
381 | + |
382 | +import unittest |
383 | +import xmlrpclib |
384 | + |
385 | +from zope.component import getUtility |
386 | +from zope.security.proxy import removeSecurityProxy |
387 | + |
388 | +from canonical.database.constants import UTC_NOW |
389 | +from canonical.launchpad.interfaces import ILaunchpadCelebrities |
390 | +from canonical.launchpad.webapp import canonical_url |
391 | +from canonical.launchpad.xmlrpc.faults import NoSuchCodeImportJob |
392 | +from canonical.launchpad.testing.codeimporthelpers import make_running_import |
393 | +from canonical.testing.layers import LaunchpadFunctionalLayer |
394 | + |
395 | +from lp.code.enums import CodeImportResultStatus |
396 | +from lp.code.model.codeimportjob import CodeImportJob |
397 | +from lp.code.xmlrpc.codeimportscheduler import CodeImportSchedulerAPI |
398 | +from lp.codehosting.codeimport.worker import CodeImportSourceDetails |
399 | +from lp.testing import run_with_login, TestCaseWithFactory |
400 | + |
401 | + |
402 | +class TestCodeImportSchedulerAPI(TestCaseWithFactory): |
403 | + |
404 | + layer = LaunchpadFunctionalLayer |
405 | + |
406 | + def setUp(self): |
407 | + TestCaseWithFactory.setUp(self) |
408 | + self.api = CodeImportSchedulerAPI(None, None) |
409 | + self.machine = self.factory.makeCodeImportMachine(set_online=True) |
410 | + for job in CodeImportJob.select(): |
411 | + job.destroySelf() |
412 | + |
413 | + def makeCodeImportJob(self, running): |
414 | + person = getUtility(ILaunchpadCelebrities).vcs_imports.teamowner |
415 | + if running: |
416 | + return removeSecurityProxy(run_with_login(person, make_running_import)).import_job |
417 | + else: |
418 | + return run_with_login(person, self.factory.makeCodeImportJob) |
419 | + |
420 | + def test_getJobForMachine_no_job_waiting(self): |
421 | + # If no job is waiting getJobForMachine returns 0. |
422 | + job_id = self.api.getJobForMachine(self.machine.hostname, 10) |
423 | + self.assertEqual(0, job_id) |
424 | + |
425 | + def test_getJobForMachine_job_waiting(self): |
426 | + # If a job is waiting getJobForMachine returns its id. |
427 | + code_import_job = self.makeCodeImportJob(running=False) |
428 | + job_id = self.api.getJobForMachine(self.machine.hostname, 10) |
429 | + self.assertEqual(code_import_job.id, job_id) |
430 | + |
431 | + def test_getImportDataForJobID(self): |
432 | + # getImportDataForJobID returns the worker arguments, branch url and |
433 | + # log file name for an import corresponding to a particular job. |
434 | + code_import_job = self.makeCodeImportJob(running=True) |
435 | + code_import = removeSecurityProxy(code_import_job).code_import |
436 | + code_import_arguments, branch_url, log_file_name = \ |
437 | + self.api.getImportDataForJobID(code_import_job.id) |
438 | + import_as_arguments = CodeImportSourceDetails.fromCodeImport( |
439 | + code_import).asArguments() |
440 | + expected_log_file_name = '%s.log' % ( |
441 | + code_import.branch.unique_name[1:].replace('/', '-')) |
442 | + self.assertEqual( |
443 | + (import_as_arguments, canonical_url(code_import.branch), |
444 | + expected_log_file_name), |
445 | + (code_import_arguments, branch_url, log_file_name)) |
446 | + |
447 | + def test_getImportDataForJobID_not_found(self): |
448 | + # getImportDataForJobID returns a NoSuchCodeImportJob fault when there |
449 | + # is no code import job with the given ID. |
450 | + fault = self.api.getImportDataForJobID(-1) |
451 | + self.assertTrue( |
452 | + isinstance(fault, xmlrpclib.Fault), |
453 | + "getImportDataForJobID(-1) returned %r, not a Fault." |
454 | + % (fault,)) |
455 | + self.assertEqual(NoSuchCodeImportJob, fault.__class__) |
456 | + |
457 | + def test_updateHeartbeat(self): |
458 | + # updateHeartbeat calls the updateHeartbeat job workflow method. |
459 | + code_import_job = self.makeCodeImportJob(running=True) |
460 | + log_tail = self.factory.getUniqueString() |
461 | + self.api.updateHeartbeat(code_import_job.id, log_tail) |
462 | + self.assertSqlAttributeEqualsDate( |
463 | + code_import_job, 'heartbeat', UTC_NOW) |
464 | + self.assertEqual(log_tail, code_import_job.logtail) |
465 | + |
466 | + def test_updateHeartbeat_not_found(self): |
467 | + # updateHeartbeat returns a NoSuchCodeImportJob fault when there is no |
468 | + # code import job with the given ID. |
469 | + fault = self.api.updateHeartbeat(-1, '') |
470 | + self.assertTrue( |
471 | + isinstance(fault, xmlrpclib.Fault), |
472 | + "updateHeartbeat(-1, '') returned %r, not a Fault." |
473 | + % (fault,)) |
474 | + self.assertEqual(NoSuchCodeImportJob, fault.__class__) |
475 | + |
476 | + def test_finishJobID_no_log_file(self): |
477 | + # finishJobID calls the finishJobID job workflow method. Passing '' |
478 | + # means no log file was uploaded to the librarian. |
479 | + code_import_job = self.makeCodeImportJob(running=True) |
480 | + code_import = code_import_job.code_import |
481 | + self.api.finishJobID( |
482 | + code_import_job.id, CodeImportResultStatus.SUCCESS.name, '') |
483 | + # finishJob does many things, we just check one of them: setting |
484 | + # date_last_successful in the case of success. |
485 | + self.assertSqlAttributeEqualsDate( |
486 | + code_import, 'date_last_successful', UTC_NOW) |
487 | + |
488 | + def test_finishJobID_with_log_file(self): |
489 | + # finishJobID calls the finishJobID job workflow method and can parse |
490 | + # a librarian file's http url to figure out its ID. |
491 | + code_import_job = self.makeCodeImportJob(running=True) |
492 | + code_import = code_import_job.code_import |
493 | + log_file_alias = self.factory.makeLibraryFileAlias() |
494 | + self.api.finishJobID( |
495 | + code_import_job.id, CodeImportResultStatus.SUCCESS.name, |
496 | + log_file_alias.http_url) |
497 | + self.assertEqual( |
498 | + log_file_alias, code_import.results[-1].log_file) |
499 | + |
500 | + def test_finishJobID_not_found(self): |
501 | + # getImportDataForJobID returns a NoSuchCodeImportJob fault when there |
502 | + # is no code import job with the given ID. |
503 | + fault = self.api.finishJobID( |
504 | + -1, CodeImportResultStatus.SUCCESS.name, '') |
505 | + self.assertTrue( |
506 | + isinstance(fault, xmlrpclib.Fault), |
507 | + "finishJobID(-1, 'SUCCESS', 0) returned %r, not a Fault." |
508 | + % (fault,)) |
509 | + self.assertEqual(NoSuchCodeImportJob, fault.__class__) |
510 | + |
511 | + |
512 | +def test_suite(): |
513 | + return unittest.TestLoader().loadTestsFromName(__name__) |
514 | + |
515 | |
516 | === modified file 'lib/lp/codehosting/codeimport/dispatcher.py' |
517 | --- lib/lp/codehosting/codeimport/dispatcher.py 2010-02-22 05:07:23 +0000 |
518 | +++ lib/lp/codehosting/codeimport/dispatcher.py 2010-03-14 20:28:32 +0000 |
519 | @@ -31,7 +31,7 @@ |
520 | """ |
521 | |
522 | worker_script = os.path.join( |
523 | - config.root, 'scripts', 'code-import-worker-db.py') |
524 | + config.root, 'scripts', 'code-import-worker-monitor.py') |
525 | |
526 | def __init__(self, logger, worker_limit, _sleep=time.sleep): |
527 | """Initialize an instance. |
528 | |
529 | === modified file 'lib/lp/codehosting/codeimport/tests/test_workermonitor.py' |
530 | --- lib/lp/codehosting/codeimport/tests/test_workermonitor.py 2010-03-08 22:26:29 +0000 |
531 | +++ lib/lp/codehosting/codeimport/tests/test_workermonitor.py 2010-03-14 20:28:32 +0000 |
532 | @@ -13,27 +13,33 @@ |
533 | import StringIO |
534 | import tempfile |
535 | import unittest |
536 | +import urllib |
537 | |
538 | from bzrlib.branch import Branch |
539 | from bzrlib.tests import TestCase as BzrTestCase |
540 | |
541 | from twisted.internet import defer, error, protocol, reactor, task |
542 | +from twisted.python import failure, log |
543 | from twisted.trial.unittest import TestCase as TrialTestCase |
544 | +from twisted.web import xmlrpc |
545 | + |
546 | +import transaction |
547 | |
548 | from zope.component import getUtility |
549 | -from zope.security.proxy import removeSecurityProxy |
550 | |
551 | from canonical.config import config |
552 | from canonical.launchpad.scripts.logger import QuietFakeLogger |
553 | +from canonical.launchpad.xmlrpc.faults import NoSuchCodeImportJob |
554 | from canonical.testing.layers import ( |
555 | - TwistedLayer, TwistedLaunchpadZopelessLayer) |
556 | + TwistedAppServerLayer, TwistedLaunchpadZopelessLayer, TwistedLayer) |
557 | +from canonical.twistedsupport import suppress_stderr |
558 | from canonical.twistedsupport.tests.test_processmonitor import ( |
559 | makeFailure, ProcessTestsMixin) |
560 | + |
561 | from lp.code.enums import ( |
562 | CodeImportResultStatus, CodeImportReviewStatus, RevisionControlSystems) |
563 | from lp.code.interfaces.codeimport import ICodeImportSet |
564 | -from lp.code.interfaces.codeimportjob import ( |
565 | - ICodeImportJobSet, ICodeImportJobWorkflow) |
566 | +from lp.code.interfaces.codeimportjob import ICodeImportJobSet |
567 | from lp.code.model.codeimport import CodeImport |
568 | from lp.code.model.codeimportjob import CodeImportJob |
569 | from lp.codehosting import load_optional_plugin |
570 | @@ -41,13 +47,12 @@ |
571 | CodeImportSourceDetails, CodeImportWorkerExitCode, |
572 | get_default_bazaar_branch_store) |
573 | from lp.codehosting.codeimport.workermonitor import ( |
574 | - CodeImportWorkerMonitor, CodeImportWorkerMonitorProtocol, ExitQuietly, |
575 | - read_only_transaction) |
576 | + CodeImportWorkerMonitor, CodeImportWorkerMonitorProtocol, ExitQuietly) |
577 | from lp.codehosting.codeimport.tests.servers import ( |
578 | CVSServer, GitServer, MercurialServer, SubversionServer) |
579 | from lp.codehosting.codeimport.tests.test_worker import ( |
580 | clean_up_default_stores_for_import) |
581 | -from lp.testing import login, logout |
582 | +from lp.testing import login, logout, TestCase |
583 | from lp.testing.factory import LaunchpadObjectFactory |
584 | |
585 | |
586 | @@ -129,7 +134,45 @@ |
587 | self.protocol._tail, 'line 3\nline 4\nline 5\nline 6\n') |
588 | |
589 | |
590 | -class TestWorkerMonitorUnit(TrialTestCase): |
591 | +class FakeCodeImportScheduleEndpointProxy: |
592 | + """A fake implementation of a proxy to `ICodeImportScheduler`. |
593 | + |
594 | + The constructor takes a dictionary mapping job ids to information that |
595 | + should be returned by getImportDataForJobID and the exception to raise if |
596 | + getImportDataForJobID is called with a job id not in the passed-in |
597 | + dictionary, defaulting to a fault with the same code as |
598 | + NoSuchCodeImportJob (because the class of the exception is lost when you |
599 | + go through XML-RPC serialization). |
600 | + """ |
601 | + |
602 | + def __init__(self, jobs_dict, no_such_job_exception=None): |
603 | + self.calls = [] |
604 | + self.jobs_dict = jobs_dict |
605 | + if no_such_job_exception is None: |
606 | + no_such_job_exception = xmlrpc.Fault( |
607 | + faultCode=NoSuchCodeImportJob.error_code, faultString='') |
608 | + self.no_such_job_exception = no_such_job_exception |
609 | + |
610 | + def callRemote(self, method_name, *args): |
611 | + method = getattr(self, '_remote_%s' % method_name, self._default) |
612 | + deferred = defer.maybeDeferred(method, *args) |
613 | + def append_to_log(pass_through): |
614 | + self.calls.append((method_name,) + tuple(args)) |
615 | + return pass_through |
616 | + deferred.addCallback(append_to_log) |
617 | + return deferred |
618 | + |
619 | + def _default(self, *args): |
620 | + return None |
621 | + |
622 | + def _remote_getImportDataForJobID(self, job_id): |
623 | + if job_id in self.jobs_dict: |
624 | + return self.jobs_dict[job_id] |
625 | + else: |
626 | + raise self.no_such_job_exception |
627 | + |
628 | + |
629 | +class TestWorkerMonitorUnit(TrialTestCase, TestCase): |
630 | """Unit tests for most of the `CodeImportWorkerMonitor` class. |
631 | |
632 | We have to pay attention to the fact that several of the methods of the |
633 | @@ -140,164 +183,172 @@ |
634 | |
635 | layer = TwistedLaunchpadZopelessLayer |
636 | |
637 | + # This works around a clash between the TrialTestCase and our TestCase. |
638 | + skip = None |
639 | + |
640 | class WorkerMonitor(CodeImportWorkerMonitor): |
641 | """A subclass of CodeImportWorkerMonitor that stubs logging OOPSes.""" |
642 | |
643 | def _logOopsFromFailure(self, failure): |
644 | - self._failures.append(failure) |
645 | - |
646 | - def getResultsForOurCodeImport(self): |
647 | - """Return the `CodeImportResult`s for the `CodeImport` we created. |
648 | - """ |
649 | - code_import = getUtility(ICodeImportSet).get(self.code_import_id) |
650 | - return code_import.results |
651 | - |
652 | - def getOneResultForOurCodeImport(self): |
653 | - """Return the only `CodeImportResult` for the `CodeImport` we created. |
654 | - |
655 | - This method fails the test if there is more than one |
656 | - `CodeImportResult` for this `CodeImport`. |
657 | - """ |
658 | - results = list(self.getResultsForOurCodeImport()) |
659 | - self.failUnlessEqual(len(results), 1) |
660 | - return results[0] |
661 | + log.err(failure) |
662 | |
663 | def assertOopsesLogged(self, exc_types): |
664 | - self.assertEqual(len(exc_types), len(self.worker_monitor._failures)) |
665 | - for failure, exc_type in zip(self.worker_monitor._failures, |
666 | - exc_types): |
667 | - self.assert_(failure.check(exc_type)) |
668 | - |
669 | - def setUp(self): |
670 | - login('no-priv@canonical.com') |
671 | - self.factory = LaunchpadObjectFactory() |
672 | - job = self.factory.makeCodeImportJob() |
673 | - self.code_import_id = job.code_import.id |
674 | - getUtility(ICodeImportJobWorkflow).startJob( |
675 | - job, self.factory.makeCodeImportMachine(set_online=True)) |
676 | - self.job_id = job.id |
677 | - self.worker_monitor = self.WorkerMonitor(job.id, QuietFakeLogger()) |
678 | - self.worker_monitor._failures = [] |
679 | - self.layer.txn.commit() |
680 | - self.layer.switchDbUser('codeimportworker') |
681 | - |
682 | - def tearDown(self): |
683 | - logout() |
684 | - |
685 | - def test_getJob(self): |
686 | - # getJob() returns the job whose id we passed to the constructor. |
687 | - return self.assertEqual( |
688 | - self.worker_monitor.getJob().id, self.job_id) |
689 | - |
690 | - def test_getJobWhenJobDeleted(self): |
691 | - # If the job has been deleted, getJob sets _call_finish_job to False |
692 | - # and raises ExitQuietly. |
693 | - job = self.worker_monitor.getJob() |
694 | - removeSecurityProxy(job).destroySelf() |
695 | - self.assertRaises(ExitQuietly, self.worker_monitor.getJob) |
696 | - self.assertNot(self.worker_monitor._call_finish_job) |
697 | - |
698 | - def test_getSourceDetails(self): |
699 | - # getSourceDetails extracts the details from the CodeImport database |
700 | - # object. |
701 | - @read_only_transaction |
702 | - def check_source_details(details): |
703 | - job = self.worker_monitor.getJob() |
704 | - self.assertEqual( |
705 | - details.url, job.code_import.url) |
706 | - self.assertEqual( |
707 | - details.cvs_root, job.code_import.cvs_root) |
708 | - self.assertEqual( |
709 | - details.cvs_module, job.code_import.cvs_module) |
710 | - return self.worker_monitor.getSourceDetails().addCallback( |
711 | - check_source_details) |
712 | + failures = self.flushLoggedErrors() |
713 | + self.assertEqual(len(exc_types), len(failures)) |
714 | + for fail, exc_type in zip(failures, exc_types): |
715 | + self.assert_(fail.check(exc_type)) |
716 | + |
717 | + def makeWorkerMonitorWithJob(self, job_id=1, job_data=()): |
718 | + return self.WorkerMonitor( |
719 | + job_id, QuietFakeLogger(), |
720 | + FakeCodeImportScheduleEndpointProxy({job_id: job_data})) |
721 | + |
722 | + def makeWorkerMonitorWithoutJob(self, exception=None): |
723 | + return self.WorkerMonitor( |
724 | + 1, QuietFakeLogger(), |
725 | + FakeCodeImportScheduleEndpointProxy({}, exception)) |
726 | + |
727 | + def test_getWorkerArguments(self): |
728 | + # getWorkerArguments returns a deferred that fires with the |
729 | + # 'arguments' part of what getImportDataForJobID returns. |
730 | + args = [self.factory.getUniqueString(), |
731 | + self.factory.getUniqueString()] |
732 | + worker_monitor = self.makeWorkerMonitorWithJob(1, (args, 1, 2)) |
733 | + return worker_monitor.getWorkerArguments().addCallback( |
734 | + self.assertEqual, args) |
735 | + |
736 | + def test_getWorkerArguments_sets_branch_url_and_logfilename(self): |
737 | + # getWorkerArguments sets the _branch_url (for use in oops reports) |
738 | + # and _log_file_name (for upload to the librarian) attributes on the |
739 | + # WorkerMonitor from the data returned by getImportDataForJobID. |
740 | + branch_url = self.factory.getUniqueString() |
741 | + log_file_name = self.factory.getUniqueString() |
742 | + worker_monitor = self.makeWorkerMonitorWithJob( |
743 | + 1, (['a'], branch_url, log_file_name)) |
744 | + def check_branch_log(ignored): |
745 | + # Looking at the _ attributes here is in slightly poor taste, but |
746 | + # much much easier than them by logging and parsing an oops, etc. |
747 | + self.assertEqual( |
748 | + (branch_url, log_file_name), |
749 | + (worker_monitor._branch_url, worker_monitor._log_file_name)) |
750 | + return worker_monitor.getWorkerArguments().addCallback( |
751 | + check_branch_log) |
752 | + |
753 | + def test_getWorkerArguments_job_not_found_raises_exit_quietly(self): |
754 | + # When getImportDataForJobID signals a fault indicating that |
755 | + # getWorkerArguments didn't find the supplied job, getWorkerArguments |
756 | + # translates this to an 'ExitQuietly' exception. |
757 | + worker_monitor = self.makeWorkerMonitorWithoutJob() |
758 | + return self.assertFailure( |
759 | + worker_monitor.getWorkerArguments(), ExitQuietly) |
760 | + |
761 | + def test_getWorkerArguments_endpoint_failure_raises(self): |
762 | + # When getImportDataForJobID raises an arbitrary exception, it is not |
763 | + # handled in any special way by getWorkerArguments. |
764 | + worker_monitor = self.makeWorkerMonitorWithoutJob( |
765 | + exception=ZeroDivisionError()) |
766 | + return self.assertFailure( |
767 | + worker_monitor.getWorkerArguments(), ZeroDivisionError) |
768 | + |
769 | + def test_getWorkerArguments_arbitrary_fault_raises(self): |
770 | + # When getImportDataForJobID signals an arbitrary fault, it is not |
771 | + # handled in any special way by getWorkerArguments. |
772 | + worker_monitor = self.makeWorkerMonitorWithoutJob( |
773 | + exception=xmlrpc.Fault(1, '')) |
774 | + return self.assertFailure( |
775 | + worker_monitor.getWorkerArguments(), xmlrpc.Fault) |
776 | |
777 | def test_updateHeartbeat(self): |
778 | - # The worker monitor's updateHeartbeat method calls the |
779 | - # updateHeartbeat job workflow method. |
780 | - @read_only_transaction |
781 | + # updateHeartbeat calls the updateHeartbeat XML-RPC method. |
782 | + log_tail = self.factory.getUniqueString() |
783 | + job_id = self.factory.getUniqueInteger() |
784 | + worker_monitor = self.makeWorkerMonitorWithJob(job_id) |
785 | def check_updated_details(result): |
786 | - job = self.worker_monitor.getJob() |
787 | - self.assertEqual(job.logtail, 'log tail') |
788 | - return self.worker_monitor.updateHeartbeat('log tail').addCallback( |
789 | + self.assertEqual( |
790 | + [('updateHeartbeat', job_id, log_tail)], |
791 | + worker_monitor.codeimport_endpoint.calls) |
792 | + return worker_monitor.updateHeartbeat(log_tail).addCallback( |
793 | check_updated_details) |
794 | |
795 | - def test_finishJobCallsFinishJob(self): |
796 | - # The worker monitor's finishJob method calls the |
797 | - # finishJob job workflow method. |
798 | - @read_only_transaction |
799 | + def test_finishJob_calls_finishJobID_empty_log_file(self): |
800 | + # When the log file is empty, finishJob calls finishJobID with the |
801 | + # name of the status enum and an empty string to indicate that no log |
802 | + # file was uplaoded to the librarian. |
803 | + job_id = self.factory.getUniqueInteger() |
804 | + worker_monitor = self.makeWorkerMonitorWithJob(job_id) |
805 | + self.assertEqual(worker_monitor._log_file.tell(), 0) |
806 | def check_finishJob_called(result): |
807 | - # We take as indication that finishJob was called that a |
808 | - # CodeImportResult was created. |
809 | self.assertEqual( |
810 | - len(list(self.getResultsForOurCodeImport())), 1) |
811 | - return self.worker_monitor.finishJob( |
812 | + [('finishJobID', job_id, 'SUCCESS', '')], |
813 | + worker_monitor.codeimport_endpoint.calls) |
814 | + return worker_monitor.finishJob( |
815 | CodeImportResultStatus.SUCCESS).addCallback( |
816 | check_finishJob_called) |
817 | |
818 | - def test_finishJobDoesntUploadEmptyFileToLibrarian(self): |
819 | - # The worker monitor's finishJob method does not try to upload an |
820 | - # empty log file to the librarian. |
821 | - self.assertEqual(self.worker_monitor._log_file.tell(), 0) |
822 | - @read_only_transaction |
823 | - def check_no_file_uploaded(result): |
824 | - result = self.getOneResultForOurCodeImport() |
825 | - self.assertIdentical(result.log_file, None) |
826 | - return self.worker_monitor.finishJob( |
827 | - CodeImportResultStatus.SUCCESS).addCallback( |
828 | - check_no_file_uploaded) |
829 | - |
830 | - def test_finishJobUploadsNonEmptyFileToLibrarian(self): |
831 | - # The worker monitor's finishJob method uploads the log file to the |
832 | - # librarian. |
833 | - self.worker_monitor._log_file.write('some text') |
834 | - @read_only_transaction |
835 | + def test_finishJob_uploads_nonempty_file_to_librarian(self): |
836 | + # finishJob method uploads the log file to the librarian and calls the |
837 | + # finishJobID XML-RPC method with the url of that file. |
838 | + self.layer.force_dirty_database() |
839 | + log_text = self.factory.getUniqueString() |
840 | + worker_monitor = self.makeWorkerMonitorWithJob() |
841 | + worker_monitor._log_file.write(log_text) |
842 | def check_file_uploaded(result): |
843 | - result = self.getOneResultForOurCodeImport() |
844 | - self.assertNotIdentical(result.log_file, None) |
845 | - self.assertEqual(result.log_file.read(), 'some text') |
846 | - return self.worker_monitor.finishJob( |
847 | + transaction.abort() |
848 | + url = worker_monitor.codeimport_endpoint.calls[0][3] |
849 | + text = urllib.urlopen(url).read() |
850 | + self.assertEqual(log_text, text) |
851 | + return worker_monitor.finishJob( |
852 | CodeImportResultStatus.SUCCESS).addCallback( |
853 | check_file_uploaded) |
854 | |
855 | - def test_finishJobStillCreatesResultWhenLibrarianUploadFails(self): |
856 | - # If the upload to the librarian fails for any reason, the |
857 | - # worker monitor still calls the finishJob workflow method, |
858 | - # but an OOPS is logged to indicate there was a problem. |
859 | + @suppress_stderr |
860 | + def test_finishJob_still_calls_finishJobID_if_upload_fails(self): |
861 | + # If the upload to the librarian fails for any reason, the worker |
862 | + # monitor still calls the finishJobID XML-RPC method, but logs an |
863 | + # error to indicate there was a problem. |
864 | + |
865 | # Write some text so that we try to upload the log. |
866 | - self.worker_monitor._log_file.write('some text') |
867 | + job_id = self.factory.getUniqueInteger() |
868 | + worker_monitor = self.makeWorkerMonitorWithJob(job_id) |
869 | + worker_monitor._log_file.write('some text') |
870 | + |
871 | # Make _createLibrarianFileAlias fail in a distinctive way. |
872 | - self.worker_monitor._createLibrarianFileAlias = lambda *args: 1/0 |
873 | - def check_oops_logged_and_result_created(ignored): |
874 | - self.assertOopsesLogged([ZeroDivisionError]) |
875 | + worker_monitor._createLibrarianFileAlias = lambda *args: 1/0 |
876 | + def check_finishJob_called(result): |
877 | self.assertEqual( |
878 | - len(list(self.getResultsForOurCodeImport())), 1) |
879 | - return self.worker_monitor.finishJob( |
880 | + [('finishJobID', job_id, 'SUCCESS', '')], |
881 | + worker_monitor.codeimport_endpoint.calls) |
882 | + errors = self.flushLoggedErrors(ZeroDivisionError) |
883 | + self.assertEqual(1, len(errors)) |
884 | + return worker_monitor.finishJob( |
885 | CodeImportResultStatus.SUCCESS).addCallback( |
886 | - check_oops_logged_and_result_created) |
887 | + check_finishJob_called) |
888 | |
889 | - def patchOutFinishJob(self): |
890 | + def patchOutFinishJob(self, worker_monitor): |
891 | calls = [] |
892 | def finishJob(status): |
893 | calls.append(status) |
894 | return defer.succeed(None) |
895 | - self.worker_monitor.finishJob = finishJob |
896 | + worker_monitor.finishJob = finishJob |
897 | return calls |
898 | |
899 | def test_callFinishJobCallsFinishJobSuccess(self): |
900 | # callFinishJob calls finishJob with CodeImportResultStatus.SUCCESS if |
901 | # its argument is not a Failure. |
902 | - calls = self.patchOutFinishJob() |
903 | - self.worker_monitor.callFinishJob(None) |
904 | + worker_monitor = self.makeWorkerMonitorWithJob() |
905 | + calls = self.patchOutFinishJob(worker_monitor) |
906 | + worker_monitor.callFinishJob(None) |
907 | self.assertEqual(calls, [CodeImportResultStatus.SUCCESS]) |
908 | |
909 | + @suppress_stderr |
910 | def test_callFinishJobCallsFinishJobFailure(self): |
911 | # callFinishJob calls finishJob with CodeImportResultStatus.FAILURE |
912 | # and swallows the failure if its argument indicates that the |
913 | # subprocess exited with an exit code of |
914 | # CodeImportWorkerExitCode.FAILURE. |
915 | - calls = self.patchOutFinishJob() |
916 | - ret = self.worker_monitor.callFinishJob( |
917 | + worker_monitor = self.makeWorkerMonitorWithJob() |
918 | + calls = self.patchOutFinishJob(worker_monitor) |
919 | + ret = worker_monitor.callFinishJob( |
920 | makeFailure( |
921 | error.ProcessTerminated, |
922 | exitCode=CodeImportWorkerExitCode.FAILURE)) |
923 | @@ -311,8 +362,9 @@ |
924 | # If the argument to callFinishJob indicates that the subprocess |
925 | # exited with a code of CodeImportWorkerExitCode.SUCCESS_NOCHANGE, it |
926 | # calls finishJob with a status of SUCCESS_NOCHANGE. |
927 | - calls = self.patchOutFinishJob() |
928 | - ret = self.worker_monitor.callFinishJob( |
929 | + worker_monitor = self.makeWorkerMonitorWithJob() |
930 | + calls = self.patchOutFinishJob(worker_monitor) |
931 | + ret = worker_monitor.callFinishJob( |
932 | makeFailure( |
933 | error.ProcessTerminated, |
934 | exitCode=CodeImportWorkerExitCode.SUCCESS_NOCHANGE)) |
935 | @@ -322,12 +374,14 @@ |
936 | # callFinishJob did not swallow the error, this will fail the test. |
937 | return ret |
938 | |
939 | + @suppress_stderr |
940 | def test_callFinishJobCallsFinishJobArbitraryFailure(self): |
941 | # If the argument to callFinishJob indicates that there was some other |
942 | # failure that had nothing to do with the subprocess, it records |
943 | # failure. |
944 | - calls = self.patchOutFinishJob() |
945 | - ret = self.worker_monitor.callFinishJob(makeFailure(RuntimeError)) |
946 | + worker_monitor = self.makeWorkerMonitorWithJob() |
947 | + calls = self.patchOutFinishJob(worker_monitor) |
948 | + ret = worker_monitor.callFinishJob(makeFailure(RuntimeError)) |
949 | self.assertEqual(calls, [CodeImportResultStatus.FAILURE]) |
950 | self.assertOopsesLogged([RuntimeError]) |
951 | # We return the deferred that callFinishJob returns -- if |
952 | @@ -338,8 +392,9 @@ |
953 | # If the argument to callFinishJob indicates that the subprocess |
954 | # exited with a code of CodeImportWorkerExitCode.SUCCESS_PARTIAL, it |
955 | # calls finishJob with a status of SUCCESS_PARTIAL. |
956 | - calls = self.patchOutFinishJob() |
957 | - ret = self.worker_monitor.callFinishJob( |
958 | + worker_monitor = self.makeWorkerMonitorWithJob() |
959 | + calls = self.patchOutFinishJob(worker_monitor) |
960 | + ret = worker_monitor.callFinishJob( |
961 | makeFailure( |
962 | error.ProcessTerminated, |
963 | exitCode=CodeImportWorkerExitCode.SUCCESS_PARTIAL)) |
964 | @@ -349,23 +404,31 @@ |
965 | # callFinishJob did not swallow the error, this will fail the test. |
966 | return ret |
967 | |
968 | + @suppress_stderr |
969 | def test_callFinishJobLogsTracebackOnFailure(self): |
970 | # When callFinishJob is called with a failure, it dumps the traceback |
971 | # of the failure into the log file. |
972 | - ret = self.worker_monitor.callFinishJob(makeFailure(RuntimeError)) |
973 | + worker_monitor = self.makeWorkerMonitorWithJob() |
974 | + ret = worker_monitor.callFinishJob(makeFailure(RuntimeError)) |
975 | def check_log_file(ignored): |
976 | - self.worker_monitor._log_file.seek(0) |
977 | - log_text = self.worker_monitor._log_file.read() |
978 | - self.assertIn('RuntimeError', log_text) |
979 | + failures = self.flushLoggedErrors(RuntimeError) |
980 | + self.assertEqual(1, len(failures)) |
981 | + fail = failures[0] |
982 | + traceback_file = StringIO.StringIO() |
983 | + fail.printTraceback(traceback_file) |
984 | + worker_monitor._log_file.seek(0) |
985 | + log_text = worker_monitor._log_file.read() |
986 | + self.assertIn(traceback_file.read(), log_text) |
987 | return ret.addCallback(check_log_file) |
988 | |
989 | def test_callFinishJobRespects_call_finish_job(self): |
990 | # callFinishJob does not call finishJob if _call_finish_job is False. |
991 | # This is to support exiting without fuss when the job we are working |
992 | # on is deleted in the web UI. |
993 | - calls = self.patchOutFinishJob() |
994 | - self.worker_monitor._call_finish_job = False |
995 | - self.worker_monitor.callFinishJob(None) |
996 | + worker_monitor = self.makeWorkerMonitorWithJob() |
997 | + calls = self.patchOutFinishJob(worker_monitor) |
998 | + worker_monitor._call_finish_job = False |
999 | + worker_monitor.callFinishJob(None) |
1000 | self.assertEqual(calls, []) |
1001 | |
1002 | |
1003 | @@ -376,14 +439,29 @@ |
1004 | # This works around a clash between the TrialTestCase and the BzrTestCase. |
1005 | skip = None |
1006 | |
1007 | + layer = TwistedLayer |
1008 | + |
1009 | class WorkerMonitor(CodeImportWorkerMonitor): |
1010 | """See `CodeImportWorkerMonitor`. |
1011 | |
1012 | Override _launchProcess to return a deferred that we can |
1013 | - callback/errback as we choose. |
1014 | + callback/errback as we choose. Passing ``has_job=False`` to the |
1015 | + constructor will cause getWorkerArguments() to raise ExitQuietly (this |
1016 | + bit is tested above). |
1017 | """ |
1018 | |
1019 | - def _launchProcess(self, source_details): |
1020 | + def __init__(self, process_deferred, has_job=True): |
1021 | + if has_job: |
1022 | + job_data = {1: ([], '', '')} |
1023 | + else: |
1024 | + job_data = {} |
1025 | + CodeImportWorkerMonitor.__init__( |
1026 | + self, 1, QuietFakeLogger(), |
1027 | + FakeCodeImportScheduleEndpointProxy(job_data)) |
1028 | + self.result_status = None |
1029 | + self.process_deferred = process_deferred |
1030 | + |
1031 | + def _launchProcess(self, worker_arguments): |
1032 | return self.process_deferred |
1033 | |
1034 | def finishJob(self, status): |
1035 | @@ -391,60 +469,47 @@ |
1036 | self.result_status = status |
1037 | return defer.succeed(None) |
1038 | |
1039 | - layer = TwistedLaunchpadZopelessLayer |
1040 | - |
1041 | - def setUp(self): |
1042 | - self.factory = LaunchpadObjectFactory() |
1043 | - login('no-priv@canonical.com') |
1044 | - job = self.factory.makeCodeImportJob() |
1045 | - self.code_import_id = job.code_import.id |
1046 | - getUtility(ICodeImportJobWorkflow).startJob( |
1047 | - job, self.factory.makeCodeImportMachine(set_online=True)) |
1048 | - self.job_id = job.id |
1049 | - self.worker_monitor = self.WorkerMonitor(job.id, QuietFakeLogger()) |
1050 | - self.worker_monitor.result_status = None |
1051 | - self.layer.txn.commit() |
1052 | - self.layer.switchDbUser('codeimportworker') |
1053 | - |
1054 | - def tearDown(self): |
1055 | - logout() |
1056 | - |
1057 | - @read_only_transaction |
1058 | - def assertFinishJobCalledWithStatus(self, ignored, status): |
1059 | - """Assert that finishJob was called with the given status.""" |
1060 | - self.assertEqual(self.worker_monitor.result_status, status) |
1061 | + def assertFinishJobCalledWithStatus(self, ignored, worker_monitor, status): |
1062 | + """Assert that finishJob was called with the given status.""" |
1063 | + self.assertEqual(worker_monitor.result_status, status) |
1064 | + |
1065 | + def assertFinishJobNotCalled(self, ignored, worker_monitor): |
1066 | + """Assert that finishJob was called with the given status.""" |
1067 | + self.assertFinishJobCalledWithStatus(ignored, worker_monitor, None) |
1068 | |
1069 | def test_success(self): |
1070 | # In the successful case, finishJob is called with |
1071 | # CodeImportResultStatus.SUCCESS. |
1072 | - self.worker_monitor.process_deferred = defer.succeed(None) |
1073 | - return self.worker_monitor.run().addCallback( |
1074 | - self.assertFinishJobCalledWithStatus, |
1075 | + worker_monitor = self.WorkerMonitor(defer.succeed(None)) |
1076 | + return worker_monitor.run().addCallback( |
1077 | + self.assertFinishJobCalledWithStatus, worker_monitor, |
1078 | CodeImportResultStatus.SUCCESS) |
1079 | |
1080 | def test_failure(self): |
1081 | # If the process deferred is fired with a failure, finishJob is called |
1082 | # with CodeImportResultStatus.FAILURE, but the call to run() still |
1083 | # succeeds. |
1084 | - self.worker_monitor.process_deferred = defer.fail(RuntimeError()) |
1085 | - return self.worker_monitor.run().addCallback( |
1086 | - self.assertFinishJobCalledWithStatus, |
1087 | + worker_monitor = self.WorkerMonitor(defer.fail(RuntimeError())) |
1088 | + return worker_monitor.run().addCallback( |
1089 | + self.assertFinishJobCalledWithStatus, worker_monitor, |
1090 | CodeImportResultStatus.FAILURE) |
1091 | |
1092 | def test_quiet_exit(self): |
1093 | # If the process deferred fails with ExitQuietly, the call to run() |
1094 | - # succeeds. |
1095 | - self.worker_monitor.process_deferred = defer.fail(ExitQuietly()) |
1096 | - return self.worker_monitor.run() |
1097 | + # succeeds, and finishJob is not called at all. |
1098 | + worker_monitor = self.WorkerMonitor( |
1099 | + defer.succeed(None), has_job=False) |
1100 | + return worker_monitor.run().addCallback( |
1101 | + self.assertFinishJobNotCalled, worker_monitor) |
1102 | |
1103 | def test_quiet_exit_from_finishJob(self): |
1104 | # If finishJob fails with ExitQuietly, the call to run() still |
1105 | # succeeds. |
1106 | - self.worker_monitor.process_deferred = defer.succeed(None) |
1107 | + worker_monitor = self.WorkerMonitor(defer.succeed(None)) |
1108 | def finishJob(reason): |
1109 | raise ExitQuietly |
1110 | - self.worker_monitor.finishJob = finishJob |
1111 | - return self.worker_monitor.run() |
1112 | + worker_monitor.finishJob = finishJob |
1113 | + return worker_monitor.run() |
1114 | |
1115 | |
1116 | def nuke_codeimport_sample_data(): |
1117 | @@ -488,7 +553,7 @@ |
1118 | |
1119 | class TestWorkerMonitorIntegration(TrialTestCase, BzrTestCase): |
1120 | |
1121 | - layer = TwistedLaunchpadZopelessLayer |
1122 | + layer = TwistedAppServerLayer |
1123 | |
1124 | # This works around a clash between the TrialTestCase and the BzrTestCase. |
1125 | skip = None |
1126 | @@ -599,7 +664,6 @@ |
1127 | self.assertEqual( |
1128 | self.foreign_commit_count, len(branch.revision_history())) |
1129 | |
1130 | - @read_only_transaction |
1131 | def assertImported(self, ignored, code_import_id): |
1132 | """Assert that the `CodeImport` of the given id was imported.""" |
1133 | # In the in-memory tests, check that resetTimeout on the |
1134 | @@ -617,8 +681,9 @@ |
1135 | |
1136 | This implementation does it in-process. |
1137 | """ |
1138 | - self.layer.switchDbUser('codeimportworker') |
1139 | - monitor = CIWorkerMonitorForTesting(job_id, QuietFakeLogger()) |
1140 | + monitor = CIWorkerMonitorForTesting( |
1141 | + job_id, QuietFakeLogger(), |
1142 | + xmlrpc.Proxy(config.codeimportdispatcher.codeimportscheduler_url)) |
1143 | deferred = monitor.run() |
1144 | def save_protocol_object(result): |
1145 | """Save the process protocol object. |
1146 | @@ -704,7 +769,7 @@ |
1147 | This implementation does it in a child process. |
1148 | """ |
1149 | script_path = os.path.join( |
1150 | - config.root, 'scripts', 'code-import-worker-db.py') |
1151 | + config.root, 'scripts', 'code-import-worker-monitor.py') |
1152 | process_end_deferred = defer.Deferred() |
1153 | # The "childFDs={0:0, 1:1, 2:2}" means that any output from the script |
1154 | # goes to the test runner's console rather than to pipes that noone is |
1155 | |
1156 | === modified file 'lib/lp/codehosting/codeimport/workermonitor.py' |
1157 | --- lib/lp/codehosting/codeimport/workermonitor.py 2010-02-17 04:28:48 +0000 |
1158 | +++ lib/lp/codehosting/codeimport/workermonitor.py 2010-03-14 20:28:32 +0000 |
1159 | @@ -14,26 +14,19 @@ |
1160 | |
1161 | from twisted.internet import defer, error, reactor, task |
1162 | from twisted.python import failure |
1163 | -from twisted.python.util import mergeFunctionMetadata |
1164 | +from twisted.web import xmlrpc |
1165 | |
1166 | from zope.component import getUtility |
1167 | |
1168 | from canonical.config import config |
1169 | -from canonical.database.sqlbase import begin, commit, rollback |
1170 | -from canonical.launchpad.interfaces import ILibraryFileAliasSet |
1171 | -from canonical.launchpad.webapp.interaction import Participation |
1172 | -from canonical.launchpad.webapp import canonical_url |
1173 | -from canonical.twistedsupport import defer_to_thread |
1174 | +from canonical.launchpad.xmlrpc.faults import NoSuchCodeImportJob |
1175 | +from canonical.librarian.interfaces import IFileUploadClient |
1176 | from canonical.twistedsupport.loggingsupport import ( |
1177 | log_oops_from_failure) |
1178 | from canonical.twistedsupport.processmonitor import ( |
1179 | ProcessMonitorProtocolWithTimeout) |
1180 | from lp.code.enums import CodeImportResultStatus |
1181 | -from lp.code.interfaces.codeimportjob import ( |
1182 | - ICodeImportJobSet, ICodeImportJobWorkflow) |
1183 | -from lp.codehosting.codeimport.worker import ( |
1184 | - CodeImportSourceDetails, CodeImportWorkerExitCode) |
1185 | -from lp.testing import login, logout, ANONYMOUS |
1186 | +from lp.codehosting.codeimport.worker import CodeImportWorkerExitCode |
1187 | |
1188 | |
1189 | class CodeImportWorkerMonitorProtocol(ProcessMonitorProtocolWithTimeout): |
1190 | @@ -109,49 +102,6 @@ |
1191 | self._looping_call.stop() |
1192 | |
1193 | |
1194 | -def read_only_transaction(function): |
1195 | - """Wrap 'function' in a transaction and Zope session. |
1196 | - |
1197 | - The transaction is always aborted.""" |
1198 | - def transacted(*args, **kwargs): |
1199 | - begin() |
1200 | - # XXX gary 20-Oct-2008 bug 285808 |
1201 | - # We should reconsider using a ftest helper for production code. For |
1202 | - # now, we explicitly keep the code from using a test request by using |
1203 | - # a basic participation. |
1204 | - login(ANONYMOUS, Participation()) |
1205 | - try: |
1206 | - return function(*args, **kwargs) |
1207 | - finally: |
1208 | - logout() |
1209 | - rollback() |
1210 | - return mergeFunctionMetadata(function, transacted) |
1211 | - |
1212 | - |
1213 | -def writing_transaction(function): |
1214 | - """Wrap 'function' in a transaction and Zope session. |
1215 | - |
1216 | - The transaction is committed if 'function' returns normally and |
1217 | - aborted if it raises an exception.""" |
1218 | - def transacted(*args, **kwargs): |
1219 | - begin() |
1220 | - # XXX gary 20-Oct-2008 bug 285808 |
1221 | - # We should reconsider using a ftest helper for production code. For |
1222 | - # now, we explicitly keep the code from using a test request by using |
1223 | - # a basic participation. |
1224 | - login(ANONYMOUS, Participation()) |
1225 | - try: |
1226 | - ret = function(*args, **kwargs) |
1227 | - except: |
1228 | - logout() |
1229 | - rollback() |
1230 | - raise |
1231 | - logout() |
1232 | - commit() |
1233 | - return ret |
1234 | - return mergeFunctionMetadata(function, transacted) |
1235 | - |
1236 | - |
1237 | class ExitQuietly(Exception): |
1238 | """Raised to indicate that we should abort and exit without fuss. |
1239 | |
1240 | @@ -171,113 +121,105 @@ |
1241 | path_to_script = os.path.join( |
1242 | config.root, 'scripts', 'code-import-worker.py') |
1243 | |
1244 | - def __init__(self, job_id, logger): |
1245 | + def __init__(self, job_id, logger, codeimport_endpoint): |
1246 | """Construct an instance. |
1247 | |
1248 | :param job_id: The ID of the CodeImportJob we are to work on. |
1249 | :param logger: A `Logger` object. |
1250 | """ |
1251 | + self._job_id = job_id |
1252 | self._logger = logger |
1253 | - self._job_id = job_id |
1254 | + self.codeimport_endpoint = codeimport_endpoint |
1255 | self._call_finish_job = True |
1256 | self._log_file = tempfile.TemporaryFile() |
1257 | - self._source_details = None |
1258 | - self._code_import_id = None |
1259 | self._branch_url = None |
1260 | + self._log_file_name = 'no-name-set.txt' |
1261 | |
1262 | def _logOopsFromFailure(self, failure): |
1263 | request = log_oops_from_failure( |
1264 | - failure, code_import_job_id=self._job_id, |
1265 | - code_import_id=self._code_import_id, URL=self._branch_url) |
1266 | + failure, code_import_job_id=self._job_id, URL=self._branch_url) |
1267 | self._logger.info( |
1268 | "Logged OOPS id %s: %s: %s", |
1269 | request.oopsid, failure.type.__name__, failure.value) |
1270 | |
1271 | - def getJob(self): |
1272 | - """Fetch the `CodeImportJob` object we are working on from the DB. |
1273 | - |
1274 | - Only call this from defer_to_thread-ed methods! |
1275 | - |
1276 | - :raises ExitQuietly: if the job is not found. |
1277 | - """ |
1278 | - job = getUtility(ICodeImportJobSet).getById(self._job_id) |
1279 | - if job is None: |
1280 | - self._logger.info( |
1281 | - "Job %d not found, exiting quietly.", self._job_id) |
1282 | + def _trap_nosuchcodeimportjob(self, failure): |
1283 | + failure.trap(xmlrpc.Fault) |
1284 | + if failure.value.faultCode == NoSuchCodeImportJob.error_code: |
1285 | self._call_finish_job = False |
1286 | raise ExitQuietly |
1287 | else: |
1288 | - return job |
1289 | - |
1290 | - @defer_to_thread |
1291 | - @read_only_transaction |
1292 | - def getSourceDetails(self): |
1293 | - """Get a `CodeImportSourceDetails` for the job we are working on.""" |
1294 | - code_import = self.getJob().code_import |
1295 | - source_details = CodeImportSourceDetails.fromCodeImport(code_import) |
1296 | - self._logger.info( |
1297 | - 'Found source details: %s', source_details.asArguments()) |
1298 | - self._branch_url = canonical_url(code_import.branch) |
1299 | - self._code_import_id = code_import.id |
1300 | - return source_details |
1301 | - |
1302 | - @defer_to_thread |
1303 | - @writing_transaction |
1304 | + raise failure.value |
1305 | + |
1306 | + def getWorkerArguments(self): |
1307 | + """Get arguments for the worker for the import we are working on. |
1308 | + |
1309 | + This also sets the _branch_url and _log_file_name attributes for use |
1310 | + in the _logOopsFromFailure and finishJob methods respectively. |
1311 | + """ |
1312 | + deferred = self.codeimport_endpoint.callRemote( |
1313 | + 'getImportDataForJobID', self._job_id) |
1314 | + def _processResult(result): |
1315 | + code_import_arguments, branch_url, log_file_name = result |
1316 | + self._branch_url = branch_url |
1317 | + self._log_file_name = log_file_name |
1318 | + self._logger.info( |
1319 | + 'Found source details: %s', code_import_arguments) |
1320 | + return code_import_arguments |
1321 | + return deferred.addCallbacks(_processResult, self._trap_nosuchcodeimportjob) |
1322 | + |
1323 | def updateHeartbeat(self, tail): |
1324 | """Call the updateHeartbeat method for the job we are working on.""" |
1325 | self._logger.debug("Updating heartbeat.") |
1326 | - getUtility(ICodeImportJobWorkflow).updateHeartbeat( |
1327 | - self.getJob(), tail) |
1328 | + deferred = self.codeimport_endpoint.callRemote( |
1329 | + 'updateHeartbeat', self._job_id, tail) |
1330 | + return deferred.addErrback(self._trap_nosuchcodeimportjob) |
1331 | |
1332 | def _createLibrarianFileAlias(self, name, size, file, contentType): |
1333 | - """Call `ILibraryFileAliasSet.create` with the given parameters. |
1334 | + """Call `IFileUploadClient.remoteAddFile` with the given parameters. |
1335 | |
1336 | - This is a separate method that exists only to be patched in |
1337 | - tests. |
1338 | + This is a separate method that exists only to be patched in tests. |
1339 | """ |
1340 | - return getUtility(ILibraryFileAliasSet).create( |
1341 | + # This blocks, but never mind: nothing else is going on in the process |
1342 | + # by this point. We could dispatch to a thread if we felt like it, or |
1343 | + # even come up with an asynchronous implementation of the librarian |
1344 | + # protocol (it's not very complicated). |
1345 | + return getUtility(IFileUploadClient).remoteAddFile( |
1346 | name, size, file, contentType) |
1347 | |
1348 | - @defer_to_thread |
1349 | - @writing_transaction |
1350 | def finishJob(self, status): |
1351 | - """Call the finishJob method for the job we are working on. |
1352 | + """Call the finishJobID method for the job we are working on. |
1353 | |
1354 | This method uploads the log file to the librarian first. |
1355 | """ |
1356 | - job = self.getJob() |
1357 | log_file_size = self._log_file.tell() |
1358 | if log_file_size > 0: |
1359 | self._log_file.seek(0) |
1360 | - branch = job.code_import.branch |
1361 | - log_file_name = '%s-%s-%s-log.txt' % ( |
1362 | - branch.owner.name, branch.product.name, branch.name) |
1363 | try: |
1364 | - log_file_alias = self._createLibrarianFileAlias( |
1365 | - log_file_name, log_file_size, self._log_file, |
1366 | + log_file_alias_url = self._createLibrarianFileAlias( |
1367 | + self._log_file_name, log_file_size, self._log_file, |
1368 | 'text/plain') |
1369 | self._logger.info( |
1370 | - "Uploaded logs to librarian %s.", log_file_alias.getURL()) |
1371 | + "Uploaded logs to librarian %s.", log_file_alias_url) |
1372 | except: |
1373 | self._logger.error("Upload to librarian failed.") |
1374 | self._logOopsFromFailure(failure.Failure()) |
1375 | - log_file_alias = None |
1376 | + log_file_alias_url = '' |
1377 | else: |
1378 | - log_file_alias = None |
1379 | - getUtility(ICodeImportJobWorkflow).finishJob( |
1380 | - job, status, log_file_alias) |
1381 | + log_file_alias_url = '' |
1382 | + return self.codeimport_endpoint.callRemote( |
1383 | + 'finishJobID', self._job_id, status.name, log_file_alias_url |
1384 | + ).addErrback(self._trap_nosuchcodeimportjob) |
1385 | |
1386 | def _makeProcessProtocol(self, deferred): |
1387 | """Make an `CodeImportWorkerMonitorProtocol` for a subprocess.""" |
1388 | return CodeImportWorkerMonitorProtocol(deferred, self, self._log_file) |
1389 | |
1390 | - def _launchProcess(self, source_details): |
1391 | + def _launchProcess(self, worker_arguments): |
1392 | """Launch the code-import-worker.py child process.""" |
1393 | deferred = defer.Deferred() |
1394 | protocol = self._makeProcessProtocol(deferred) |
1395 | interpreter = '%s/bin/py' % config.root |
1396 | - command = [interpreter, self.path_to_script] |
1397 | - command.extend(source_details.asArguments()) |
1398 | + command = [interpreter, self.path_to_script] + worker_arguments |
1399 | self._logger.info( |
1400 | "Launching worker child process %s.", command) |
1401 | reactor.spawnProcess( |
1402 | @@ -286,7 +228,7 @@ |
1403 | |
1404 | def run(self): |
1405 | """Perform the import.""" |
1406 | - return self.getSourceDetails().addCallback( |
1407 | + return self.getWorkerArguments().addCallback( |
1408 | self._launchProcess).addBoth( |
1409 | self.callFinishJob).addErrback( |
1410 | self._silenceQuietExit) |
1411 | @@ -297,6 +239,11 @@ |
1412 | return None |
1413 | |
1414 | def _reasonToStatus(self, reason): |
1415 | + """Translate the 'reason' for process exit into a result status. |
1416 | + |
1417 | + Different exit codes are presumed by Twisted to be errors, but are |
1418 | + different kinds of success for us. |
1419 | + """ |
1420 | if isinstance(reason, failure.Failure): |
1421 | if reason.check(error.ProcessTerminated): |
1422 | if reason.value.exitCode == \ |
1423 | |
1424 | === modified file 'lib/lp/codehosting/inmemory.py' |
1425 | --- lib/lp/codehosting/inmemory.py 2010-02-24 04:24:01 +0000 |
1426 | +++ lib/lp/codehosting/inmemory.py 2010-03-14 20:28:32 +0000 |
1427 | @@ -62,12 +62,22 @@ |
1428 | expected_value = kwargs[attribute] |
1429 | branch = self._object_set.get(branch_id) |
1430 | if branch is None: |
1431 | - return None |
1432 | + return FakeResult(None) |
1433 | if expected_value is getattr(branch, attribute): |
1434 | - return branch |
1435 | + return FakeResult(branch) |
1436 | return None |
1437 | |
1438 | |
1439 | +class FakeResult: |
1440 | + """As with FakeStore, just enough of a result to pass tests.""" |
1441 | + |
1442 | + def __init__(self, branch): |
1443 | + self._branch = branch |
1444 | + |
1445 | + def one(self): |
1446 | + return self._branch |
1447 | + |
1448 | + |
1449 | class FakeDatabaseObject: |
1450 | """Base class for fake database objects.""" |
1451 | |
1452 | |
1453 | === modified file 'lib/lp/testing/__init__.py' |
1454 | --- lib/lp/testing/__init__.py 2010-02-17 20:54:36 +0000 |
1455 | +++ lib/lp/testing/__init__.py 2010-03-14 20:28:32 +0000 |
1456 | @@ -294,7 +294,7 @@ |
1457 | sql_class = type(sql_object) |
1458 | store = Store.of(sql_object) |
1459 | found_object = store.find( |
1460 | - sql_class, **({'id': sql_object.id, attribute_name: date})) |
1461 | + sql_class, **({'id': sql_object.id, attribute_name: date})).one() |
1462 | if found_object is None: |
1463 | self.fail( |
1464 | "Expected %s to be %s, but it was %s." |
1465 | |
1466 | === renamed file 'scripts/code-import-worker-db.py' => 'scripts/code-import-worker-monitor.py' |
1467 | --- scripts/code-import-worker-db.py 2009-10-13 14:38:07 +0000 |
1468 | +++ scripts/code-import-worker-monitor.py 2010-03-14 20:28:32 +0000 |
1469 | @@ -22,11 +22,14 @@ |
1470 | |
1471 | from twisted.internet import defer, reactor |
1472 | from twisted.python import log |
1473 | +from twisted.web import xmlrpc |
1474 | + |
1475 | +from canonical.config import config |
1476 | +from canonical.twistedsupport.loggingsupport import set_up_oops_reporting |
1477 | |
1478 | from lp.codehosting.codeimport.workermonitor import ( |
1479 | CodeImportWorkerMonitor) |
1480 | from lp.services.scripts.base import LaunchpadScript |
1481 | -from canonical.twistedsupport.loggingsupport import set_up_oops_reporting |
1482 | |
1483 | |
1484 | class CodeImportWorker(LaunchpadScript): |
1485 | @@ -35,22 +38,30 @@ |
1486 | LaunchpadScript.__init__(self, name, dbuser, test_args) |
1487 | set_up_oops_reporting(name, mangle_stdout=True) |
1488 | |
1489 | + def _init_db(self, implicit_begin, isolation): |
1490 | + # This script doesn't access the database. |
1491 | + pass |
1492 | + |
1493 | def main(self): |
1494 | + arg, = self.args |
1495 | + job_id = int(arg) |
1496 | # XXX: MichaelHudson 2008-05-07 bug=227586: Setting up the component |
1497 | # architecture overrides $GNUPGHOME to something stupid. |
1498 | os.environ['GNUPGHOME'] = '' |
1499 | - reactor.callWhenRunning(self._run_reactor) |
1500 | + reactor.callWhenRunning(self._do_import, job_id) |
1501 | reactor.run() |
1502 | |
1503 | - def _run_reactor(self): |
1504 | - defer.maybeDeferred(self._main).addErrback( |
1505 | + def _do_import(self, job_id): |
1506 | + defer.maybeDeferred(self._main, job_id).addErrback( |
1507 | log.err).addCallback( |
1508 | lambda ignored: reactor.stop()) |
1509 | |
1510 | - def _main(self): |
1511 | - arg, = self.args |
1512 | - return CodeImportWorkerMonitor(int(arg), self.logger).run() |
1513 | + def _main(self, job_id): |
1514 | + worker = CodeImportWorkerMonitor( |
1515 | + job_id, self.logger, |
1516 | + xmlrpc.Proxy(config.codeimportdispatcher.codeimportscheduler_url)) |
1517 | + return worker.run() |
1518 | |
1519 | if __name__ == '__main__': |
1520 | - script = CodeImportWorker('codeimportworker', dbuser='codeimportworker') |
1521 | + script = CodeImportWorker('codeimportworker') |
1522 | script.run() |
1523 | |
1524 | === modified file 'scripts/code-import-worker.py' |
1525 | --- scripts/code-import-worker.py 2010-02-04 01:12:05 +0000 |
1526 | +++ scripts/code-import-worker.py 2010-03-14 20:28:32 +0000 |
1527 | @@ -8,7 +8,7 @@ |
1528 | By 'processing a code import' we mean importing or updating code from a |
1529 | remote, non-Bazaar, repository. |
1530 | |
1531 | -This script is usually run by the code-import-worker-db.py script that |
1532 | +This script is usually run by the code-import-worker-monitor.py script that |
1533 | communicates progress and results to the database. |
1534 | """ |
1535 |
There is a lot here, but I think it is all pretty good. Very extensive testing, I bet it wasn't a huge amount of fun to write :)