Merge lp:~mwhudson/launchpad/no-codeimportworker-db-access into lp:launchpad

Proposed by Michael Hudson-Doyle
Status: Merged
Approved by: Tim Penhey
Approved revision: no longer in the source branch.
Merged at revision: not available
Proposed branch: lp:~mwhudson/launchpad/no-codeimportworker-db-access
Merge into: lp:launchpad
Diff against target: 1534 lines (+585/-360)
15 files modified
database/schema/security.cfg (+0/-22)
lib/canonical/config/schema-lazr.conf (+2/-2)
lib/canonical/launchpad/interfaces/librarian.py (+1/-1)
lib/canonical/launchpad/xmlrpc/faults.py (+10/-0)
lib/lp/code/interfaces/codeimportscheduler.py (+26/-0)
lib/lp/code/model/tests/test_codeimportjob.py (+4/-27)
lib/lp/code/xmlrpc/codeimportscheduler.py (+66/-5)
lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py (+140/-0)
lib/lp/codehosting/codeimport/dispatcher.py (+1/-1)
lib/lp/codehosting/codeimport/tests/test_workermonitor.py (+244/-179)
lib/lp/codehosting/codeimport/workermonitor.py (+58/-111)
lib/lp/codehosting/inmemory.py (+12/-2)
lib/lp/testing/__init__.py (+1/-1)
scripts/code-import-worker-monitor.py (+19/-8)
scripts/code-import-worker.py (+1/-1)
To merge this branch: bzr merge lp:~mwhudson/launchpad/no-codeimportworker-db-access
Reviewer Review Type Date Requested Status
Tim Penhey (community) Approve
Review via email: mp+21201@code.launchpad.net

Commit message

Have the code import worker monitor talk to the internal xml-rpc server rather than directly to the database.

Description of the change

Hi there,

this branch removes the need for the code import worker monitor script to talk directly to the database, and makes it talk to the internal xml-rpc server instead.

In some sense this isn't very exciting, although the diff is rather large. Much of the change is in the worker monitor tests -- all the rest is fairly straightforward I think.

Cheers,
mwh

To post a comment you must log in.
Revision history for this message
Tim Penhey (thumper) wrote :

There is a lot here, but I think it is all pretty good. Very extensive testing, I bet it wasn't a huge amount of fun to write :)

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'database/schema/security.cfg'
2--- database/schema/security.cfg 2010-03-10 17:55:59 +0000
3+++ database/schema/security.cfg 2010-03-14 20:28:32 +0000
4@@ -586,28 +586,6 @@
5 public.validpersonorteamcache = SELECT
6 public.wikiname = SELECT, INSERT
7
8-[codeimportworker]
9-type=user
10-public.branch = SELECT, UPDATE
11-public.branchsubscription = SELECT
12-public.codeimport = SELECT, UPDATE
13-public.codeimportevent = SELECT, INSERT, UPDATE
14-public.codeimporteventdata = SELECT, INSERT
15-public.codeimportmachine = SELECT
16-public.codeimportresult = SELECT, INSERT, UPDATE
17-public.codeimportjob = SELECT, INSERT, UPDATE, DELETE
18-public.distribution = SELECT
19-public.distroseries = SELECT
20-public.emailaddress = SELECT
21-public.libraryfilealias = SELECT, INSERT, UPDATE
22-public.libraryfilecontent = SELECT, INSERT
23-public.person = SELECT
24-public.product = SELECT
25-public.productseries = SELECT
26-public.productseriescodeimport = SELECT
27-public.sourcepackagename = SELECT
28-public.teammembership = SELECT
29-
30 [branchscanner]
31 type=user
32 groups=write, script
33
34=== modified file 'lib/canonical/config/schema-lazr.conf'
35--- lib/canonical/config/schema-lazr.conf 2010-02-24 10:18:16 +0000
36+++ lib/canonical/config/schema-lazr.conf 2010-03-14 20:28:32 +0000
37@@ -472,8 +472,8 @@
38
39
40 [codeimportworker]
41-# This code is used by the code-import-worker-db which lives in
42-# scripts/code-import-worker-db.py and
43+# This code is used by the code-import-worker-monitor which lives in
44+# scripts/code-import-worker-monitor.py and
45 # lib/lp/codehosting/codeimport/worker_monitor.py.
46
47 # The interval in seconds the worker should wait between updates to the
48
49=== modified file 'lib/canonical/launchpad/interfaces/librarian.py'
50--- lib/canonical/launchpad/interfaces/librarian.py 2009-11-24 15:36:44 +0000
51+++ lib/canonical/launchpad/interfaces/librarian.py 2010-03-14 20:28:32 +0000
52@@ -150,7 +150,7 @@
53 from the Librarian at this time. See LibrarianGarbageCollection.
54
55 If restricted is True, the file will be created through the
56- IRestricteLibrarianClient utility.
57+ IRestrictedLibrarianClient utility.
58 """
59
60 def __getitem__(key):
61
62=== modified file 'lib/canonical/launchpad/xmlrpc/faults.py'
63--- lib/canonical/launchpad/xmlrpc/faults.py 2010-02-21 22:13:34 +0000
64+++ lib/canonical/launchpad/xmlrpc/faults.py 2010-03-14 20:28:32 +0000
65@@ -27,6 +27,7 @@
66 'NoLinkedBranch',
67 'NoSuchBranch',
68 'NoSuchBug',
69+ 'NoSuchCodeImportJob',
70 'NoSuchDistribution',
71 'NoSuchPackage',
72 'NoSuchPerson',
73@@ -474,3 +475,12 @@
74 self.sourcepackagename = sourcepackagename
75 LaunchpadFault.__init__(self, sourcepackagename=sourcepackagename)
76
77+
78+class NoSuchCodeImportJob(LaunchpadFault):
79+ """Raised by `ICodeImportScheduler` methods when a job is not found."""
80+
81+ error_code = 360
82+ msg_template = 'Job %(job_id)d not found.'
83+
84+ def __init__(self, job_id):
85+ LaunchpadFault.__init__(self, job_id=job_id)
86
87=== modified file 'lib/lp/code/interfaces/codeimportscheduler.py'
88--- lib/lp/code/interfaces/codeimportscheduler.py 2010-02-22 08:10:03 +0000
89+++ lib/lp/code/interfaces/codeimportscheduler.py 2010-03-14 20:28:32 +0000
90@@ -16,6 +16,7 @@
91
92 from zope.interface import Interface
93
94+
95 class ICodeImportSchedulerApplication(ILaunchpadApplication):
96 """Code import scheduler application root."""
97
98@@ -35,3 +36,28 @@
99 mark it as having started on said machine and return its id,
100 or 0 if there are no jobs pending.
101 """
102+
103+ def getImportDataForJobID(job_id):
104+ """Get data about the import with job id `job_id`.
105+
106+ :return: ``(worker_arguments, branch_url, log_file_name)`` where:
107+ * ``worker_arguments`` are the arguments to pass to the code
108+ import worker subprocess.
109+ * ``branch_url`` is the URL of the import branch (only used to put
110+ in OOPS reports)
111+ * ``log_file_name`` is the name of the log file to create in the
112+ librarian.
113+ :raise NoSuchCodeImportJob: if no job with id `job_id` exists.
114+ """
115+
116+ def updateHeartbeat(job_id, log_tail):
117+ """Call `ICodeImportJobWorkflow.updateHeartbeat` for job `job_id`.
118+
119+ :raise NoSuchCodeImportJob: if no job with id `job_id` exists.
120+ """
121+
122+ def finishJobID(job_id, status_name, log_file_alias_url):
123+ """Call `ICodeImportJobWorkflow.finishJob` for job `job_id`.
124+
125+ :raise NoSuchCodeImportJob: if no job with id `job_id` exists.
126+ """
127
128=== modified file 'lib/lp/code/model/tests/test_codeimportjob.py'
129--- lib/lp/code/model/tests/test_codeimportjob.py 2010-02-24 13:17:01 +0000
130+++ lib/lp/code/model/tests/test_codeimportjob.py 2010-03-14 20:28:32 +0000
131@@ -14,6 +14,8 @@
132
133 from sqlobject.sqlbuilder import SQLConstant
134
135+import transaction
136+
137 from twisted.python.util import mergeFunctionMetadata
138
139 from zope.component import getUtility
140@@ -702,24 +704,13 @@
141 AssertFailureMixin, AssertEventMixin):
142 """Unit tests for CodeImportJobWorkflow.finishJob."""
143
144- layer = LaunchpadZopelessLayer
145+ layer = LaunchpadFunctionalLayer
146
147 def setUp(self):
148 super(TestCodeImportJobWorkflowFinishJob, self).setUp()
149 login_for_code_imports()
150 self.machine = self.factory.makeCodeImportMachine()
151 self.machine.setOnline()
152- self.switchDbUser_called = False
153-
154- def tearDown(self):
155- super(TestCodeImportJobWorkflowFinishJob, self).tearDown()
156- self.assertTrue(
157- self.switchDbUser_called, "switchDbUser() not called!")
158-
159- def switchDbUser(self):
160- self.layer.txn.commit()
161- self.layer.switchDbUser('codeimportworker')
162- self.switchDbUser_called = True
163
164 def makeRunningJob(self, code_import=None):
165 """Make and return a CodeImportJob object with state==RUNNING.
166@@ -740,7 +731,6 @@
167 # Calling finishJob with a job whose state is not RUNNING is an error.
168 code_import = self.factory.makeCodeImport()
169 job = self.factory.makeCodeImportJob(code_import)
170- self.switchDbUser()
171 self.assertFailure(
172 "The CodeImportJob associated with %s is "
173 "PENDING." % code_import.branch.unique_name,
174@@ -754,7 +744,6 @@
175 # finishJob() deletes the job it is passed.
176 running_job = self.makeRunningJob()
177 running_job_id = running_job.id
178- self.switchDbUser()
179 getUtility(ICodeImportJobWorkflow).finishJob(
180 running_job, CodeImportResultStatus.SUCCESS, None)
181 self.assertEqual(
182@@ -765,7 +754,6 @@
183 # scheduled appropriately far in the future.
184 running_job = self.makeRunningJob()
185 code_import = running_job.code_import
186- self.switchDbUser()
187 getUtility(ICodeImportJobWorkflow).finishJob(
188 running_job, CodeImportResultStatus.SUCCESS, None)
189 new_job = code_import.import_job
190@@ -782,7 +770,6 @@
191 # now.
192 running_job = self.makeRunningJob()
193 code_import = running_job.code_import
194- self.switchDbUser()
195 getUtility(ICodeImportJobWorkflow).finishJob(
196 running_job, CodeImportResultStatus.SUCCESS_PARTIAL, None)
197 new_job = code_import.import_job
198@@ -800,7 +787,6 @@
199 'david.allouche@canonical.com')
200 code_import.updateFromData(
201 {'review_status': CodeImportReviewStatus.SUSPENDED}, ddaa)
202- self.switchDbUser()
203 getUtility(ICodeImportJobWorkflow).finishJob(
204 running_job, CodeImportResultStatus.SUCCESS, None)
205 self.assertTrue(code_import.import_job is None)
206@@ -812,7 +798,6 @@
207 # Before calling finishJob() there are no CodeImportResults for the
208 # given import...
209 self.assertEqual(len(list(code_import.results)), 0)
210- self.switchDbUser()
211 getUtility(ICodeImportJobWorkflow).finishJob(
212 running_job, CodeImportResultStatus.SUCCESS, None)
213 # ... and after, there is exactly one.
214@@ -841,13 +826,11 @@
215 # methods -- e.g. calling requestJob to set requesting_user -- but
216 # using removeSecurityProxy and forcing here is expedient.
217 setattr(removeSecurityProxy(job), from_field, value)
218- self.switchDbUser()
219 result = self.getResultForJob(job)
220 self.assertEqual(
221 value, getattr(result, to_field),
222 "Value %r in job field %r was not passed through to result field"
223 " %r." % (value, from_field, to_field))
224- self.layer.switchDbUser('launchpad')
225
226 def test_resultObjectFields(self):
227 # The CodeImportResult object that finishJob creates contains all the
228@@ -898,7 +881,6 @@
229 status_jobs = []
230 for status in CodeImportResultStatus.items:
231 status_jobs.append((status, self.makeRunningJob()))
232- self.switchDbUser()
233 for status, job in status_jobs:
234 result = self.getResultForJob(job, status)
235 self.assertEqual(result.status, status)
236@@ -909,12 +891,11 @@
237
238 job = self.makeRunningJob()
239
240- self.switchDbUser()
241 log_data = 'several\nlines\nof\nlog data'
242 log_alias_id = getUtility(ILibrarianClient).addFile(
243 'import_log.txt', len(log_data),
244 StringIO.StringIO(log_data), 'text/plain')
245- self.layer.txn.commit()
246+ transaction.commit()
247 log_alias = getUtility(ILibraryFileAliasSet)[log_alias_id]
248 result = self.getResultForJob(job, log_alias=log_alias)
249
250@@ -927,7 +908,6 @@
251 code_import = running_job.code_import
252 machine = running_job.machine
253 new_events = NewEvents()
254- self.switchDbUser()
255 getUtility(ICodeImportJobWorkflow).finishJob(
256 running_job, CodeImportResultStatus.SUCCESS, None)
257 [finish_event] = list(new_events)
258@@ -941,7 +921,6 @@
259 status_jobs = []
260 for status in CodeImportResultStatus.items:
261 status_jobs.append((status, self.makeRunningJob()))
262- self.switchDbUser()
263 for status, job in status_jobs:
264 code_import = job.code_import
265 self.assertTrue(code_import.date_last_successful is None)
266@@ -958,7 +937,6 @@
267 status_jobs = []
268 for status in CodeImportResultStatus.items:
269 status_jobs.append((status, self.makeRunningJob()))
270- self.switchDbUser()
271 for status, job in status_jobs:
272 code_import = job.code_import
273 self.assertTrue(code_import.date_last_successful is None)
274@@ -984,7 +962,6 @@
275 self.assertEqual(
276 CodeImportReviewStatus.REVIEWED, code_import.review_status)
277 running_job = self.makeRunningJob(code_import)
278- self.switchDbUser()
279 getUtility(ICodeImportJobWorkflow).finishJob(
280 running_job, CodeImportResultStatus.FAILURE, None)
281 self.assertEqual(
282
283=== modified file 'lib/lp/code/xmlrpc/codeimportscheduler.py'
284--- lib/lp/code/xmlrpc/codeimportscheduler.py 2010-02-19 03:15:04 +0000
285+++ lib/lp/code/xmlrpc/codeimportscheduler.py 2010-03-14 20:28:32 +0000
286@@ -8,13 +8,20 @@
287 'CodeImportSchedulerAPI',
288 ]
289
290-
291-from lp.code.interfaces.codeimportjob import ICodeImportJobSet
292-from lp.code.interfaces.codeimportscheduler import ICodeImportScheduler
293-from canonical.launchpad.webapp import LaunchpadXMLRPCView
294-
295 from zope.component import getUtility
296 from zope.interface import implements
297+from zope.security.proxy import removeSecurityProxy
298+
299+from canonical.launchpad.interfaces import ILibraryFileAliasSet
300+from canonical.launchpad.webapp import canonical_url, LaunchpadXMLRPCView
301+from canonical.launchpad.xmlrpc.faults import NoSuchCodeImportJob
302+from canonical.launchpad.xmlrpc.helpers import return_fault
303+
304+from lp.code.enums import CodeImportResultStatus
305+from lp.code.interfaces.codeimportjob import (
306+ ICodeImportJobSet, ICodeImportJobWorkflow)
307+from lp.code.interfaces.codeimportscheduler import ICodeImportScheduler
308+from lp.codehosting.codeimport.worker import CodeImportSourceDetails
309
310
311 class CodeImportSchedulerAPI(LaunchpadXMLRPCView):
312@@ -30,3 +37,57 @@
313 return job.id
314 else:
315 return 0
316+
317+ def _getJob(self, job_id):
318+ job_set = removeSecurityProxy(getUtility(ICodeImportJobSet))
319+ job = removeSecurityProxy(job_set.getById(job_id))
320+ if job is None:
321+ raise NoSuchCodeImportJob(job_id)
322+ return job
323+
324+ # Because you can't use a decorated function as the implementation of a
325+ # method exported over XML-RPC, the implementations just thunk to an
326+ # implementation wrapped with @return_fault.
327+
328+ def getImportDataForJobID(self, job_id):
329+ """See `ICodeImportScheduler`."""
330+ return self._getImportDataForJobID(job_id)
331+
332+ def updateHeartbeat(self, job_id, log_tail):
333+ """See `ICodeImportScheduler`."""
334+ return self._updateHeartbeat(job_id, log_tail)
335+
336+ def finishJobID(self, job_id, status_name, log_file_alias_url):
337+ """See `ICodeImportScheduler`."""
338+ return self._finishJobID(job_id, status_name, log_file_alias_url)
339+
340+ @return_fault
341+ def _getImportDataForJobID(self, job_id):
342+ job = self._getJob(job_id)
343+ arguments = CodeImportSourceDetails.fromCodeImport(
344+ job.code_import).asArguments()
345+ branch = job.code_import.branch
346+ branch_url = canonical_url(branch)
347+ log_file_name = '%s.log' % branch.unique_name[1:].replace('/', '-')
348+ return (arguments, branch_url, log_file_name)
349+
350+ @return_fault
351+ def _updateHeartbeat(self, job_id, log_tail):
352+ job = self._getJob(job_id)
353+ workflow = removeSecurityProxy(getUtility(ICodeImportJobWorkflow))
354+ workflow.updateHeartbeat(job, log_tail)
355+ return 0
356+
357+ @return_fault
358+ def _finishJobID(self, job_id, status_name, log_file_alias_url):
359+ job = self._getJob(job_id)
360+ status = CodeImportResultStatus.items[status_name]
361+ workflow = removeSecurityProxy(getUtility(ICodeImportJobWorkflow))
362+ if log_file_alias_url:
363+ library_file_alias_set = getUtility(ILibraryFileAliasSet)
364+ # XXX This is so so so terrible:
365+ log_file_alias_id = int(log_file_alias_url.split('/')[-2])
366+ log_file_alias = library_file_alias_set[log_file_alias_id]
367+ else:
368+ log_file_alias = None
369+ workflow.finishJob(job, status, log_file_alias)
370
371=== added file 'lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py'
372--- lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py 1970-01-01 00:00:00 +0000
373+++ lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py 2010-03-14 20:28:32 +0000
374@@ -0,0 +1,140 @@
375+# Copyright 2010 Canonical Ltd. This software is licensed under the
376+# GNU Affero General Public License version 3 (see the file LICENSE).
377+
378+"""Test for the methods of `ICodeImportScheduler`."""
379+
380+__metaclass__ = type
381+
382+import unittest
383+import xmlrpclib
384+
385+from zope.component import getUtility
386+from zope.security.proxy import removeSecurityProxy
387+
388+from canonical.database.constants import UTC_NOW
389+from canonical.launchpad.interfaces import ILaunchpadCelebrities
390+from canonical.launchpad.webapp import canonical_url
391+from canonical.launchpad.xmlrpc.faults import NoSuchCodeImportJob
392+from canonical.launchpad.testing.codeimporthelpers import make_running_import
393+from canonical.testing.layers import LaunchpadFunctionalLayer
394+
395+from lp.code.enums import CodeImportResultStatus
396+from lp.code.model.codeimportjob import CodeImportJob
397+from lp.code.xmlrpc.codeimportscheduler import CodeImportSchedulerAPI
398+from lp.codehosting.codeimport.worker import CodeImportSourceDetails
399+from lp.testing import run_with_login, TestCaseWithFactory
400+
401+
402+class TestCodeImportSchedulerAPI(TestCaseWithFactory):
403+
404+ layer = LaunchpadFunctionalLayer
405+
406+ def setUp(self):
407+ TestCaseWithFactory.setUp(self)
408+ self.api = CodeImportSchedulerAPI(None, None)
409+ self.machine = self.factory.makeCodeImportMachine(set_online=True)
410+ for job in CodeImportJob.select():
411+ job.destroySelf()
412+
413+ def makeCodeImportJob(self, running):
414+ person = getUtility(ILaunchpadCelebrities).vcs_imports.teamowner
415+ if running:
416+ return removeSecurityProxy(run_with_login(person, make_running_import)).import_job
417+ else:
418+ return run_with_login(person, self.factory.makeCodeImportJob)
419+
420+ def test_getJobForMachine_no_job_waiting(self):
421+ # If no job is waiting getJobForMachine returns 0.
422+ job_id = self.api.getJobForMachine(self.machine.hostname, 10)
423+ self.assertEqual(0, job_id)
424+
425+ def test_getJobForMachine_job_waiting(self):
426+ # If a job is waiting getJobForMachine returns its id.
427+ code_import_job = self.makeCodeImportJob(running=False)
428+ job_id = self.api.getJobForMachine(self.machine.hostname, 10)
429+ self.assertEqual(code_import_job.id, job_id)
430+
431+ def test_getImportDataForJobID(self):
432+ # getImportDataForJobID returns the worker arguments, branch url and
433+ # log file name for an import corresponding to a particular job.
434+ code_import_job = self.makeCodeImportJob(running=True)
435+ code_import = removeSecurityProxy(code_import_job).code_import
436+ code_import_arguments, branch_url, log_file_name = \
437+ self.api.getImportDataForJobID(code_import_job.id)
438+ import_as_arguments = CodeImportSourceDetails.fromCodeImport(
439+ code_import).asArguments()
440+ expected_log_file_name = '%s.log' % (
441+ code_import.branch.unique_name[1:].replace('/', '-'))
442+ self.assertEqual(
443+ (import_as_arguments, canonical_url(code_import.branch),
444+ expected_log_file_name),
445+ (code_import_arguments, branch_url, log_file_name))
446+
447+ def test_getImportDataForJobID_not_found(self):
448+ # getImportDataForJobID returns a NoSuchCodeImportJob fault when there
449+ # is no code import job with the given ID.
450+ fault = self.api.getImportDataForJobID(-1)
451+ self.assertTrue(
452+ isinstance(fault, xmlrpclib.Fault),
453+ "getImportDataForJobID(-1) returned %r, not a Fault."
454+ % (fault,))
455+ self.assertEqual(NoSuchCodeImportJob, fault.__class__)
456+
457+ def test_updateHeartbeat(self):
458+ # updateHeartbeat calls the updateHeartbeat job workflow method.
459+ code_import_job = self.makeCodeImportJob(running=True)
460+ log_tail = self.factory.getUniqueString()
461+ self.api.updateHeartbeat(code_import_job.id, log_tail)
462+ self.assertSqlAttributeEqualsDate(
463+ code_import_job, 'heartbeat', UTC_NOW)
464+ self.assertEqual(log_tail, code_import_job.logtail)
465+
466+ def test_updateHeartbeat_not_found(self):
467+ # updateHeartbeat returns a NoSuchCodeImportJob fault when there is no
468+ # code import job with the given ID.
469+ fault = self.api.updateHeartbeat(-1, '')
470+ self.assertTrue(
471+ isinstance(fault, xmlrpclib.Fault),
472+ "updateHeartbeat(-1, '') returned %r, not a Fault."
473+ % (fault,))
474+ self.assertEqual(NoSuchCodeImportJob, fault.__class__)
475+
476+ def test_finishJobID_no_log_file(self):
477+ # finishJobID calls the finishJobID job workflow method. Passing ''
478+ # means no log file was uploaded to the librarian.
479+ code_import_job = self.makeCodeImportJob(running=True)
480+ code_import = code_import_job.code_import
481+ self.api.finishJobID(
482+ code_import_job.id, CodeImportResultStatus.SUCCESS.name, '')
483+ # finishJob does many things, we just check one of them: setting
484+ # date_last_successful in the case of success.
485+ self.assertSqlAttributeEqualsDate(
486+ code_import, 'date_last_successful', UTC_NOW)
487+
488+ def test_finishJobID_with_log_file(self):
489+ # finishJobID calls the finishJobID job workflow method and can parse
490+ # a librarian file's http url to figure out its ID.
491+ code_import_job = self.makeCodeImportJob(running=True)
492+ code_import = code_import_job.code_import
493+ log_file_alias = self.factory.makeLibraryFileAlias()
494+ self.api.finishJobID(
495+ code_import_job.id, CodeImportResultStatus.SUCCESS.name,
496+ log_file_alias.http_url)
497+ self.assertEqual(
498+ log_file_alias, code_import.results[-1].log_file)
499+
500+ def test_finishJobID_not_found(self):
501+ # getImportDataForJobID returns a NoSuchCodeImportJob fault when there
502+ # is no code import job with the given ID.
503+ fault = self.api.finishJobID(
504+ -1, CodeImportResultStatus.SUCCESS.name, '')
505+ self.assertTrue(
506+ isinstance(fault, xmlrpclib.Fault),
507+ "finishJobID(-1, 'SUCCESS', 0) returned %r, not a Fault."
508+ % (fault,))
509+ self.assertEqual(NoSuchCodeImportJob, fault.__class__)
510+
511+
512+def test_suite():
513+ return unittest.TestLoader().loadTestsFromName(__name__)
514+
515
516=== modified file 'lib/lp/codehosting/codeimport/dispatcher.py'
517--- lib/lp/codehosting/codeimport/dispatcher.py 2010-02-22 05:07:23 +0000
518+++ lib/lp/codehosting/codeimport/dispatcher.py 2010-03-14 20:28:32 +0000
519@@ -31,7 +31,7 @@
520 """
521
522 worker_script = os.path.join(
523- config.root, 'scripts', 'code-import-worker-db.py')
524+ config.root, 'scripts', 'code-import-worker-monitor.py')
525
526 def __init__(self, logger, worker_limit, _sleep=time.sleep):
527 """Initialize an instance.
528
529=== modified file 'lib/lp/codehosting/codeimport/tests/test_workermonitor.py'
530--- lib/lp/codehosting/codeimport/tests/test_workermonitor.py 2010-03-08 22:26:29 +0000
531+++ lib/lp/codehosting/codeimport/tests/test_workermonitor.py 2010-03-14 20:28:32 +0000
532@@ -13,27 +13,33 @@
533 import StringIO
534 import tempfile
535 import unittest
536+import urllib
537
538 from bzrlib.branch import Branch
539 from bzrlib.tests import TestCase as BzrTestCase
540
541 from twisted.internet import defer, error, protocol, reactor, task
542+from twisted.python import failure, log
543 from twisted.trial.unittest import TestCase as TrialTestCase
544+from twisted.web import xmlrpc
545+
546+import transaction
547
548 from zope.component import getUtility
549-from zope.security.proxy import removeSecurityProxy
550
551 from canonical.config import config
552 from canonical.launchpad.scripts.logger import QuietFakeLogger
553+from canonical.launchpad.xmlrpc.faults import NoSuchCodeImportJob
554 from canonical.testing.layers import (
555- TwistedLayer, TwistedLaunchpadZopelessLayer)
556+ TwistedAppServerLayer, TwistedLaunchpadZopelessLayer, TwistedLayer)
557+from canonical.twistedsupport import suppress_stderr
558 from canonical.twistedsupport.tests.test_processmonitor import (
559 makeFailure, ProcessTestsMixin)
560+
561 from lp.code.enums import (
562 CodeImportResultStatus, CodeImportReviewStatus, RevisionControlSystems)
563 from lp.code.interfaces.codeimport import ICodeImportSet
564-from lp.code.interfaces.codeimportjob import (
565- ICodeImportJobSet, ICodeImportJobWorkflow)
566+from lp.code.interfaces.codeimportjob import ICodeImportJobSet
567 from lp.code.model.codeimport import CodeImport
568 from lp.code.model.codeimportjob import CodeImportJob
569 from lp.codehosting import load_optional_plugin
570@@ -41,13 +47,12 @@
571 CodeImportSourceDetails, CodeImportWorkerExitCode,
572 get_default_bazaar_branch_store)
573 from lp.codehosting.codeimport.workermonitor import (
574- CodeImportWorkerMonitor, CodeImportWorkerMonitorProtocol, ExitQuietly,
575- read_only_transaction)
576+ CodeImportWorkerMonitor, CodeImportWorkerMonitorProtocol, ExitQuietly)
577 from lp.codehosting.codeimport.tests.servers import (
578 CVSServer, GitServer, MercurialServer, SubversionServer)
579 from lp.codehosting.codeimport.tests.test_worker import (
580 clean_up_default_stores_for_import)
581-from lp.testing import login, logout
582+from lp.testing import login, logout, TestCase
583 from lp.testing.factory import LaunchpadObjectFactory
584
585
586@@ -129,7 +134,45 @@
587 self.protocol._tail, 'line 3\nline 4\nline 5\nline 6\n')
588
589
590-class TestWorkerMonitorUnit(TrialTestCase):
591+class FakeCodeImportScheduleEndpointProxy:
592+ """A fake implementation of a proxy to `ICodeImportScheduler`.
593+
594+ The constructor takes a dictionary mapping job ids to information that
595+ should be returned by getImportDataForJobID and the exception to raise if
596+ getImportDataForJobID is called with a job id not in the passed-in
597+ dictionary, defaulting to a fault with the same code as
598+ NoSuchCodeImportJob (because the class of the exception is lost when you
599+ go through XML-RPC serialization).
600+ """
601+
602+ def __init__(self, jobs_dict, no_such_job_exception=None):
603+ self.calls = []
604+ self.jobs_dict = jobs_dict
605+ if no_such_job_exception is None:
606+ no_such_job_exception = xmlrpc.Fault(
607+ faultCode=NoSuchCodeImportJob.error_code, faultString='')
608+ self.no_such_job_exception = no_such_job_exception
609+
610+ def callRemote(self, method_name, *args):
611+ method = getattr(self, '_remote_%s' % method_name, self._default)
612+ deferred = defer.maybeDeferred(method, *args)
613+ def append_to_log(pass_through):
614+ self.calls.append((method_name,) + tuple(args))
615+ return pass_through
616+ deferred.addCallback(append_to_log)
617+ return deferred
618+
619+ def _default(self, *args):
620+ return None
621+
622+ def _remote_getImportDataForJobID(self, job_id):
623+ if job_id in self.jobs_dict:
624+ return self.jobs_dict[job_id]
625+ else:
626+ raise self.no_such_job_exception
627+
628+
629+class TestWorkerMonitorUnit(TrialTestCase, TestCase):
630 """Unit tests for most of the `CodeImportWorkerMonitor` class.
631
632 We have to pay attention to the fact that several of the methods of the
633@@ -140,164 +183,172 @@
634
635 layer = TwistedLaunchpadZopelessLayer
636
637+ # This works around a clash between the TrialTestCase and our TestCase.
638+ skip = None
639+
640 class WorkerMonitor(CodeImportWorkerMonitor):
641 """A subclass of CodeImportWorkerMonitor that stubs logging OOPSes."""
642
643 def _logOopsFromFailure(self, failure):
644- self._failures.append(failure)
645-
646- def getResultsForOurCodeImport(self):
647- """Return the `CodeImportResult`s for the `CodeImport` we created.
648- """
649- code_import = getUtility(ICodeImportSet).get(self.code_import_id)
650- return code_import.results
651-
652- def getOneResultForOurCodeImport(self):
653- """Return the only `CodeImportResult` for the `CodeImport` we created.
654-
655- This method fails the test if there is more than one
656- `CodeImportResult` for this `CodeImport`.
657- """
658- results = list(self.getResultsForOurCodeImport())
659- self.failUnlessEqual(len(results), 1)
660- return results[0]
661+ log.err(failure)
662
663 def assertOopsesLogged(self, exc_types):
664- self.assertEqual(len(exc_types), len(self.worker_monitor._failures))
665- for failure, exc_type in zip(self.worker_monitor._failures,
666- exc_types):
667- self.assert_(failure.check(exc_type))
668-
669- def setUp(self):
670- login('no-priv@canonical.com')
671- self.factory = LaunchpadObjectFactory()
672- job = self.factory.makeCodeImportJob()
673- self.code_import_id = job.code_import.id
674- getUtility(ICodeImportJobWorkflow).startJob(
675- job, self.factory.makeCodeImportMachine(set_online=True))
676- self.job_id = job.id
677- self.worker_monitor = self.WorkerMonitor(job.id, QuietFakeLogger())
678- self.worker_monitor._failures = []
679- self.layer.txn.commit()
680- self.layer.switchDbUser('codeimportworker')
681-
682- def tearDown(self):
683- logout()
684-
685- def test_getJob(self):
686- # getJob() returns the job whose id we passed to the constructor.
687- return self.assertEqual(
688- self.worker_monitor.getJob().id, self.job_id)
689-
690- def test_getJobWhenJobDeleted(self):
691- # If the job has been deleted, getJob sets _call_finish_job to False
692- # and raises ExitQuietly.
693- job = self.worker_monitor.getJob()
694- removeSecurityProxy(job).destroySelf()
695- self.assertRaises(ExitQuietly, self.worker_monitor.getJob)
696- self.assertNot(self.worker_monitor._call_finish_job)
697-
698- def test_getSourceDetails(self):
699- # getSourceDetails extracts the details from the CodeImport database
700- # object.
701- @read_only_transaction
702- def check_source_details(details):
703- job = self.worker_monitor.getJob()
704- self.assertEqual(
705- details.url, job.code_import.url)
706- self.assertEqual(
707- details.cvs_root, job.code_import.cvs_root)
708- self.assertEqual(
709- details.cvs_module, job.code_import.cvs_module)
710- return self.worker_monitor.getSourceDetails().addCallback(
711- check_source_details)
712+ failures = self.flushLoggedErrors()
713+ self.assertEqual(len(exc_types), len(failures))
714+ for fail, exc_type in zip(failures, exc_types):
715+ self.assert_(fail.check(exc_type))
716+
717+ def makeWorkerMonitorWithJob(self, job_id=1, job_data=()):
718+ return self.WorkerMonitor(
719+ job_id, QuietFakeLogger(),
720+ FakeCodeImportScheduleEndpointProxy({job_id: job_data}))
721+
722+ def makeWorkerMonitorWithoutJob(self, exception=None):
723+ return self.WorkerMonitor(
724+ 1, QuietFakeLogger(),
725+ FakeCodeImportScheduleEndpointProxy({}, exception))
726+
727+ def test_getWorkerArguments(self):
728+ # getWorkerArguments returns a deferred that fires with the
729+ # 'arguments' part of what getImportDataForJobID returns.
730+ args = [self.factory.getUniqueString(),
731+ self.factory.getUniqueString()]
732+ worker_monitor = self.makeWorkerMonitorWithJob(1, (args, 1, 2))
733+ return worker_monitor.getWorkerArguments().addCallback(
734+ self.assertEqual, args)
735+
736+ def test_getWorkerArguments_sets_branch_url_and_logfilename(self):
737+ # getWorkerArguments sets the _branch_url (for use in oops reports)
738+ # and _log_file_name (for upload to the librarian) attributes on the
739+ # WorkerMonitor from the data returned by getImportDataForJobID.
740+ branch_url = self.factory.getUniqueString()
741+ log_file_name = self.factory.getUniqueString()
742+ worker_monitor = self.makeWorkerMonitorWithJob(
743+ 1, (['a'], branch_url, log_file_name))
744+ def check_branch_log(ignored):
745+ # Looking at the _ attributes here is in slightly poor taste, but
746+ # much much easier than them by logging and parsing an oops, etc.
747+ self.assertEqual(
748+ (branch_url, log_file_name),
749+ (worker_monitor._branch_url, worker_monitor._log_file_name))
750+ return worker_monitor.getWorkerArguments().addCallback(
751+ check_branch_log)
752+
753+ def test_getWorkerArguments_job_not_found_raises_exit_quietly(self):
754+ # When getImportDataForJobID signals a fault indicating that
755+ # getWorkerArguments didn't find the supplied job, getWorkerArguments
756+ # translates this to an 'ExitQuietly' exception.
757+ worker_monitor = self.makeWorkerMonitorWithoutJob()
758+ return self.assertFailure(
759+ worker_monitor.getWorkerArguments(), ExitQuietly)
760+
761+ def test_getWorkerArguments_endpoint_failure_raises(self):
762+ # When getImportDataForJobID raises an arbitrary exception, it is not
763+ # handled in any special way by getWorkerArguments.
764+ worker_monitor = self.makeWorkerMonitorWithoutJob(
765+ exception=ZeroDivisionError())
766+ return self.assertFailure(
767+ worker_monitor.getWorkerArguments(), ZeroDivisionError)
768+
769+ def test_getWorkerArguments_arbitrary_fault_raises(self):
770+ # When getImportDataForJobID signals an arbitrary fault, it is not
771+ # handled in any special way by getWorkerArguments.
772+ worker_monitor = self.makeWorkerMonitorWithoutJob(
773+ exception=xmlrpc.Fault(1, ''))
774+ return self.assertFailure(
775+ worker_monitor.getWorkerArguments(), xmlrpc.Fault)
776
777 def test_updateHeartbeat(self):
778- # The worker monitor's updateHeartbeat method calls the
779- # updateHeartbeat job workflow method.
780- @read_only_transaction
781+ # updateHeartbeat calls the updateHeartbeat XML-RPC method.
782+ log_tail = self.factory.getUniqueString()
783+ job_id = self.factory.getUniqueInteger()
784+ worker_monitor = self.makeWorkerMonitorWithJob(job_id)
785 def check_updated_details(result):
786- job = self.worker_monitor.getJob()
787- self.assertEqual(job.logtail, 'log tail')
788- return self.worker_monitor.updateHeartbeat('log tail').addCallback(
789+ self.assertEqual(
790+ [('updateHeartbeat', job_id, log_tail)],
791+ worker_monitor.codeimport_endpoint.calls)
792+ return worker_monitor.updateHeartbeat(log_tail).addCallback(
793 check_updated_details)
794
795- def test_finishJobCallsFinishJob(self):
796- # The worker monitor's finishJob method calls the
797- # finishJob job workflow method.
798- @read_only_transaction
799+ def test_finishJob_calls_finishJobID_empty_log_file(self):
800+ # When the log file is empty, finishJob calls finishJobID with the
801+ # name of the status enum and an empty string to indicate that no log
802+ # file was uplaoded to the librarian.
803+ job_id = self.factory.getUniqueInteger()
804+ worker_monitor = self.makeWorkerMonitorWithJob(job_id)
805+ self.assertEqual(worker_monitor._log_file.tell(), 0)
806 def check_finishJob_called(result):
807- # We take as indication that finishJob was called that a
808- # CodeImportResult was created.
809 self.assertEqual(
810- len(list(self.getResultsForOurCodeImport())), 1)
811- return self.worker_monitor.finishJob(
812+ [('finishJobID', job_id, 'SUCCESS', '')],
813+ worker_monitor.codeimport_endpoint.calls)
814+ return worker_monitor.finishJob(
815 CodeImportResultStatus.SUCCESS).addCallback(
816 check_finishJob_called)
817
818- def test_finishJobDoesntUploadEmptyFileToLibrarian(self):
819- # The worker monitor's finishJob method does not try to upload an
820- # empty log file to the librarian.
821- self.assertEqual(self.worker_monitor._log_file.tell(), 0)
822- @read_only_transaction
823- def check_no_file_uploaded(result):
824- result = self.getOneResultForOurCodeImport()
825- self.assertIdentical(result.log_file, None)
826- return self.worker_monitor.finishJob(
827- CodeImportResultStatus.SUCCESS).addCallback(
828- check_no_file_uploaded)
829-
830- def test_finishJobUploadsNonEmptyFileToLibrarian(self):
831- # The worker monitor's finishJob method uploads the log file to the
832- # librarian.
833- self.worker_monitor._log_file.write('some text')
834- @read_only_transaction
835+ def test_finishJob_uploads_nonempty_file_to_librarian(self):
836+ # finishJob method uploads the log file to the librarian and calls the
837+ # finishJobID XML-RPC method with the url of that file.
838+ self.layer.force_dirty_database()
839+ log_text = self.factory.getUniqueString()
840+ worker_monitor = self.makeWorkerMonitorWithJob()
841+ worker_monitor._log_file.write(log_text)
842 def check_file_uploaded(result):
843- result = self.getOneResultForOurCodeImport()
844- self.assertNotIdentical(result.log_file, None)
845- self.assertEqual(result.log_file.read(), 'some text')
846- return self.worker_monitor.finishJob(
847+ transaction.abort()
848+ url = worker_monitor.codeimport_endpoint.calls[0][3]
849+ text = urllib.urlopen(url).read()
850+ self.assertEqual(log_text, text)
851+ return worker_monitor.finishJob(
852 CodeImportResultStatus.SUCCESS).addCallback(
853 check_file_uploaded)
854
855- def test_finishJobStillCreatesResultWhenLibrarianUploadFails(self):
856- # If the upload to the librarian fails for any reason, the
857- # worker monitor still calls the finishJob workflow method,
858- # but an OOPS is logged to indicate there was a problem.
859+ @suppress_stderr
860+ def test_finishJob_still_calls_finishJobID_if_upload_fails(self):
861+ # If the upload to the librarian fails for any reason, the worker
862+ # monitor still calls the finishJobID XML-RPC method, but logs an
863+ # error to indicate there was a problem.
864+
865 # Write some text so that we try to upload the log.
866- self.worker_monitor._log_file.write('some text')
867+ job_id = self.factory.getUniqueInteger()
868+ worker_monitor = self.makeWorkerMonitorWithJob(job_id)
869+ worker_monitor._log_file.write('some text')
870+
871 # Make _createLibrarianFileAlias fail in a distinctive way.
872- self.worker_monitor._createLibrarianFileAlias = lambda *args: 1/0
873- def check_oops_logged_and_result_created(ignored):
874- self.assertOopsesLogged([ZeroDivisionError])
875+ worker_monitor._createLibrarianFileAlias = lambda *args: 1/0
876+ def check_finishJob_called(result):
877 self.assertEqual(
878- len(list(self.getResultsForOurCodeImport())), 1)
879- return self.worker_monitor.finishJob(
880+ [('finishJobID', job_id, 'SUCCESS', '')],
881+ worker_monitor.codeimport_endpoint.calls)
882+ errors = self.flushLoggedErrors(ZeroDivisionError)
883+ self.assertEqual(1, len(errors))
884+ return worker_monitor.finishJob(
885 CodeImportResultStatus.SUCCESS).addCallback(
886- check_oops_logged_and_result_created)
887+ check_finishJob_called)
888
889- def patchOutFinishJob(self):
890+ def patchOutFinishJob(self, worker_monitor):
891 calls = []
892 def finishJob(status):
893 calls.append(status)
894 return defer.succeed(None)
895- self.worker_monitor.finishJob = finishJob
896+ worker_monitor.finishJob = finishJob
897 return calls
898
899 def test_callFinishJobCallsFinishJobSuccess(self):
900 # callFinishJob calls finishJob with CodeImportResultStatus.SUCCESS if
901 # its argument is not a Failure.
902- calls = self.patchOutFinishJob()
903- self.worker_monitor.callFinishJob(None)
904+ worker_monitor = self.makeWorkerMonitorWithJob()
905+ calls = self.patchOutFinishJob(worker_monitor)
906+ worker_monitor.callFinishJob(None)
907 self.assertEqual(calls, [CodeImportResultStatus.SUCCESS])
908
909+ @suppress_stderr
910 def test_callFinishJobCallsFinishJobFailure(self):
911 # callFinishJob calls finishJob with CodeImportResultStatus.FAILURE
912 # and swallows the failure if its argument indicates that the
913 # subprocess exited with an exit code of
914 # CodeImportWorkerExitCode.FAILURE.
915- calls = self.patchOutFinishJob()
916- ret = self.worker_monitor.callFinishJob(
917+ worker_monitor = self.makeWorkerMonitorWithJob()
918+ calls = self.patchOutFinishJob(worker_monitor)
919+ ret = worker_monitor.callFinishJob(
920 makeFailure(
921 error.ProcessTerminated,
922 exitCode=CodeImportWorkerExitCode.FAILURE))
923@@ -311,8 +362,9 @@
924 # If the argument to callFinishJob indicates that the subprocess
925 # exited with a code of CodeImportWorkerExitCode.SUCCESS_NOCHANGE, it
926 # calls finishJob with a status of SUCCESS_NOCHANGE.
927- calls = self.patchOutFinishJob()
928- ret = self.worker_monitor.callFinishJob(
929+ worker_monitor = self.makeWorkerMonitorWithJob()
930+ calls = self.patchOutFinishJob(worker_monitor)
931+ ret = worker_monitor.callFinishJob(
932 makeFailure(
933 error.ProcessTerminated,
934 exitCode=CodeImportWorkerExitCode.SUCCESS_NOCHANGE))
935@@ -322,12 +374,14 @@
936 # callFinishJob did not swallow the error, this will fail the test.
937 return ret
938
939+ @suppress_stderr
940 def test_callFinishJobCallsFinishJobArbitraryFailure(self):
941 # If the argument to callFinishJob indicates that there was some other
942 # failure that had nothing to do with the subprocess, it records
943 # failure.
944- calls = self.patchOutFinishJob()
945- ret = self.worker_monitor.callFinishJob(makeFailure(RuntimeError))
946+ worker_monitor = self.makeWorkerMonitorWithJob()
947+ calls = self.patchOutFinishJob(worker_monitor)
948+ ret = worker_monitor.callFinishJob(makeFailure(RuntimeError))
949 self.assertEqual(calls, [CodeImportResultStatus.FAILURE])
950 self.assertOopsesLogged([RuntimeError])
951 # We return the deferred that callFinishJob returns -- if
952@@ -338,8 +392,9 @@
953 # If the argument to callFinishJob indicates that the subprocess
954 # exited with a code of CodeImportWorkerExitCode.SUCCESS_PARTIAL, it
955 # calls finishJob with a status of SUCCESS_PARTIAL.
956- calls = self.patchOutFinishJob()
957- ret = self.worker_monitor.callFinishJob(
958+ worker_monitor = self.makeWorkerMonitorWithJob()
959+ calls = self.patchOutFinishJob(worker_monitor)
960+ ret = worker_monitor.callFinishJob(
961 makeFailure(
962 error.ProcessTerminated,
963 exitCode=CodeImportWorkerExitCode.SUCCESS_PARTIAL))
964@@ -349,23 +404,31 @@
965 # callFinishJob did not swallow the error, this will fail the test.
966 return ret
967
968+ @suppress_stderr
969 def test_callFinishJobLogsTracebackOnFailure(self):
970 # When callFinishJob is called with a failure, it dumps the traceback
971 # of the failure into the log file.
972- ret = self.worker_monitor.callFinishJob(makeFailure(RuntimeError))
973+ worker_monitor = self.makeWorkerMonitorWithJob()
974+ ret = worker_monitor.callFinishJob(makeFailure(RuntimeError))
975 def check_log_file(ignored):
976- self.worker_monitor._log_file.seek(0)
977- log_text = self.worker_monitor._log_file.read()
978- self.assertIn('RuntimeError', log_text)
979+ failures = self.flushLoggedErrors(RuntimeError)
980+ self.assertEqual(1, len(failures))
981+ fail = failures[0]
982+ traceback_file = StringIO.StringIO()
983+ fail.printTraceback(traceback_file)
984+ worker_monitor._log_file.seek(0)
985+ log_text = worker_monitor._log_file.read()
986+ self.assertIn(traceback_file.read(), log_text)
987 return ret.addCallback(check_log_file)
988
989 def test_callFinishJobRespects_call_finish_job(self):
990 # callFinishJob does not call finishJob if _call_finish_job is False.
991 # This is to support exiting without fuss when the job we are working
992 # on is deleted in the web UI.
993- calls = self.patchOutFinishJob()
994- self.worker_monitor._call_finish_job = False
995- self.worker_monitor.callFinishJob(None)
996+ worker_monitor = self.makeWorkerMonitorWithJob()
997+ calls = self.patchOutFinishJob(worker_monitor)
998+ worker_monitor._call_finish_job = False
999+ worker_monitor.callFinishJob(None)
1000 self.assertEqual(calls, [])
1001
1002
1003@@ -376,14 +439,29 @@
1004 # This works around a clash between the TrialTestCase and the BzrTestCase.
1005 skip = None
1006
1007+ layer = TwistedLayer
1008+
1009 class WorkerMonitor(CodeImportWorkerMonitor):
1010 """See `CodeImportWorkerMonitor`.
1011
1012 Override _launchProcess to return a deferred that we can
1013- callback/errback as we choose.
1014+ callback/errback as we choose. Passing ``has_job=False`` to the
1015+ constructor will cause getWorkerArguments() to raise ExitQuietly (this
1016+ bit is tested above).
1017 """
1018
1019- def _launchProcess(self, source_details):
1020+ def __init__(self, process_deferred, has_job=True):
1021+ if has_job:
1022+ job_data = {1: ([], '', '')}
1023+ else:
1024+ job_data = {}
1025+ CodeImportWorkerMonitor.__init__(
1026+ self, 1, QuietFakeLogger(),
1027+ FakeCodeImportScheduleEndpointProxy(job_data))
1028+ self.result_status = None
1029+ self.process_deferred = process_deferred
1030+
1031+ def _launchProcess(self, worker_arguments):
1032 return self.process_deferred
1033
1034 def finishJob(self, status):
1035@@ -391,60 +469,47 @@
1036 self.result_status = status
1037 return defer.succeed(None)
1038
1039- layer = TwistedLaunchpadZopelessLayer
1040-
1041- def setUp(self):
1042- self.factory = LaunchpadObjectFactory()
1043- login('no-priv@canonical.com')
1044- job = self.factory.makeCodeImportJob()
1045- self.code_import_id = job.code_import.id
1046- getUtility(ICodeImportJobWorkflow).startJob(
1047- job, self.factory.makeCodeImportMachine(set_online=True))
1048- self.job_id = job.id
1049- self.worker_monitor = self.WorkerMonitor(job.id, QuietFakeLogger())
1050- self.worker_monitor.result_status = None
1051- self.layer.txn.commit()
1052- self.layer.switchDbUser('codeimportworker')
1053-
1054- def tearDown(self):
1055- logout()
1056-
1057- @read_only_transaction
1058- def assertFinishJobCalledWithStatus(self, ignored, status):
1059- """Assert that finishJob was called with the given status."""
1060- self.assertEqual(self.worker_monitor.result_status, status)
1061+ def assertFinishJobCalledWithStatus(self, ignored, worker_monitor, status):
1062+ """Assert that finishJob was called with the given status."""
1063+ self.assertEqual(worker_monitor.result_status, status)
1064+
1065+ def assertFinishJobNotCalled(self, ignored, worker_monitor):
1066+ """Assert that finishJob was called with the given status."""
1067+ self.assertFinishJobCalledWithStatus(ignored, worker_monitor, None)
1068
1069 def test_success(self):
1070 # In the successful case, finishJob is called with
1071 # CodeImportResultStatus.SUCCESS.
1072- self.worker_monitor.process_deferred = defer.succeed(None)
1073- return self.worker_monitor.run().addCallback(
1074- self.assertFinishJobCalledWithStatus,
1075+ worker_monitor = self.WorkerMonitor(defer.succeed(None))
1076+ return worker_monitor.run().addCallback(
1077+ self.assertFinishJobCalledWithStatus, worker_monitor,
1078 CodeImportResultStatus.SUCCESS)
1079
1080 def test_failure(self):
1081 # If the process deferred is fired with a failure, finishJob is called
1082 # with CodeImportResultStatus.FAILURE, but the call to run() still
1083 # succeeds.
1084- self.worker_monitor.process_deferred = defer.fail(RuntimeError())
1085- return self.worker_monitor.run().addCallback(
1086- self.assertFinishJobCalledWithStatus,
1087+ worker_monitor = self.WorkerMonitor(defer.fail(RuntimeError()))
1088+ return worker_monitor.run().addCallback(
1089+ self.assertFinishJobCalledWithStatus, worker_monitor,
1090 CodeImportResultStatus.FAILURE)
1091
1092 def test_quiet_exit(self):
1093 # If the process deferred fails with ExitQuietly, the call to run()
1094- # succeeds.
1095- self.worker_monitor.process_deferred = defer.fail(ExitQuietly())
1096- return self.worker_monitor.run()
1097+ # succeeds, and finishJob is not called at all.
1098+ worker_monitor = self.WorkerMonitor(
1099+ defer.succeed(None), has_job=False)
1100+ return worker_monitor.run().addCallback(
1101+ self.assertFinishJobNotCalled, worker_monitor)
1102
1103 def test_quiet_exit_from_finishJob(self):
1104 # If finishJob fails with ExitQuietly, the call to run() still
1105 # succeeds.
1106- self.worker_monitor.process_deferred = defer.succeed(None)
1107+ worker_monitor = self.WorkerMonitor(defer.succeed(None))
1108 def finishJob(reason):
1109 raise ExitQuietly
1110- self.worker_monitor.finishJob = finishJob
1111- return self.worker_monitor.run()
1112+ worker_monitor.finishJob = finishJob
1113+ return worker_monitor.run()
1114
1115
1116 def nuke_codeimport_sample_data():
1117@@ -488,7 +553,7 @@
1118
1119 class TestWorkerMonitorIntegration(TrialTestCase, BzrTestCase):
1120
1121- layer = TwistedLaunchpadZopelessLayer
1122+ layer = TwistedAppServerLayer
1123
1124 # This works around a clash between the TrialTestCase and the BzrTestCase.
1125 skip = None
1126@@ -599,7 +664,6 @@
1127 self.assertEqual(
1128 self.foreign_commit_count, len(branch.revision_history()))
1129
1130- @read_only_transaction
1131 def assertImported(self, ignored, code_import_id):
1132 """Assert that the `CodeImport` of the given id was imported."""
1133 # In the in-memory tests, check that resetTimeout on the
1134@@ -617,8 +681,9 @@
1135
1136 This implementation does it in-process.
1137 """
1138- self.layer.switchDbUser('codeimportworker')
1139- monitor = CIWorkerMonitorForTesting(job_id, QuietFakeLogger())
1140+ monitor = CIWorkerMonitorForTesting(
1141+ job_id, QuietFakeLogger(),
1142+ xmlrpc.Proxy(config.codeimportdispatcher.codeimportscheduler_url))
1143 deferred = monitor.run()
1144 def save_protocol_object(result):
1145 """Save the process protocol object.
1146@@ -704,7 +769,7 @@
1147 This implementation does it in a child process.
1148 """
1149 script_path = os.path.join(
1150- config.root, 'scripts', 'code-import-worker-db.py')
1151+ config.root, 'scripts', 'code-import-worker-monitor.py')
1152 process_end_deferred = defer.Deferred()
1153 # The "childFDs={0:0, 1:1, 2:2}" means that any output from the script
1154 # goes to the test runner's console rather than to pipes that noone is
1155
1156=== modified file 'lib/lp/codehosting/codeimport/workermonitor.py'
1157--- lib/lp/codehosting/codeimport/workermonitor.py 2010-02-17 04:28:48 +0000
1158+++ lib/lp/codehosting/codeimport/workermonitor.py 2010-03-14 20:28:32 +0000
1159@@ -14,26 +14,19 @@
1160
1161 from twisted.internet import defer, error, reactor, task
1162 from twisted.python import failure
1163-from twisted.python.util import mergeFunctionMetadata
1164+from twisted.web import xmlrpc
1165
1166 from zope.component import getUtility
1167
1168 from canonical.config import config
1169-from canonical.database.sqlbase import begin, commit, rollback
1170-from canonical.launchpad.interfaces import ILibraryFileAliasSet
1171-from canonical.launchpad.webapp.interaction import Participation
1172-from canonical.launchpad.webapp import canonical_url
1173-from canonical.twistedsupport import defer_to_thread
1174+from canonical.launchpad.xmlrpc.faults import NoSuchCodeImportJob
1175+from canonical.librarian.interfaces import IFileUploadClient
1176 from canonical.twistedsupport.loggingsupport import (
1177 log_oops_from_failure)
1178 from canonical.twistedsupport.processmonitor import (
1179 ProcessMonitorProtocolWithTimeout)
1180 from lp.code.enums import CodeImportResultStatus
1181-from lp.code.interfaces.codeimportjob import (
1182- ICodeImportJobSet, ICodeImportJobWorkflow)
1183-from lp.codehosting.codeimport.worker import (
1184- CodeImportSourceDetails, CodeImportWorkerExitCode)
1185-from lp.testing import login, logout, ANONYMOUS
1186+from lp.codehosting.codeimport.worker import CodeImportWorkerExitCode
1187
1188
1189 class CodeImportWorkerMonitorProtocol(ProcessMonitorProtocolWithTimeout):
1190@@ -109,49 +102,6 @@
1191 self._looping_call.stop()
1192
1193
1194-def read_only_transaction(function):
1195- """Wrap 'function' in a transaction and Zope session.
1196-
1197- The transaction is always aborted."""
1198- def transacted(*args, **kwargs):
1199- begin()
1200- # XXX gary 20-Oct-2008 bug 285808
1201- # We should reconsider using a ftest helper for production code. For
1202- # now, we explicitly keep the code from using a test request by using
1203- # a basic participation.
1204- login(ANONYMOUS, Participation())
1205- try:
1206- return function(*args, **kwargs)
1207- finally:
1208- logout()
1209- rollback()
1210- return mergeFunctionMetadata(function, transacted)
1211-
1212-
1213-def writing_transaction(function):
1214- """Wrap 'function' in a transaction and Zope session.
1215-
1216- The transaction is committed if 'function' returns normally and
1217- aborted if it raises an exception."""
1218- def transacted(*args, **kwargs):
1219- begin()
1220- # XXX gary 20-Oct-2008 bug 285808
1221- # We should reconsider using a ftest helper for production code. For
1222- # now, we explicitly keep the code from using a test request by using
1223- # a basic participation.
1224- login(ANONYMOUS, Participation())
1225- try:
1226- ret = function(*args, **kwargs)
1227- except:
1228- logout()
1229- rollback()
1230- raise
1231- logout()
1232- commit()
1233- return ret
1234- return mergeFunctionMetadata(function, transacted)
1235-
1236-
1237 class ExitQuietly(Exception):
1238 """Raised to indicate that we should abort and exit without fuss.
1239
1240@@ -171,113 +121,105 @@
1241 path_to_script = os.path.join(
1242 config.root, 'scripts', 'code-import-worker.py')
1243
1244- def __init__(self, job_id, logger):
1245+ def __init__(self, job_id, logger, codeimport_endpoint):
1246 """Construct an instance.
1247
1248 :param job_id: The ID of the CodeImportJob we are to work on.
1249 :param logger: A `Logger` object.
1250 """
1251+ self._job_id = job_id
1252 self._logger = logger
1253- self._job_id = job_id
1254+ self.codeimport_endpoint = codeimport_endpoint
1255 self._call_finish_job = True
1256 self._log_file = tempfile.TemporaryFile()
1257- self._source_details = None
1258- self._code_import_id = None
1259 self._branch_url = None
1260+ self._log_file_name = 'no-name-set.txt'
1261
1262 def _logOopsFromFailure(self, failure):
1263 request = log_oops_from_failure(
1264- failure, code_import_job_id=self._job_id,
1265- code_import_id=self._code_import_id, URL=self._branch_url)
1266+ failure, code_import_job_id=self._job_id, URL=self._branch_url)
1267 self._logger.info(
1268 "Logged OOPS id %s: %s: %s",
1269 request.oopsid, failure.type.__name__, failure.value)
1270
1271- def getJob(self):
1272- """Fetch the `CodeImportJob` object we are working on from the DB.
1273-
1274- Only call this from defer_to_thread-ed methods!
1275-
1276- :raises ExitQuietly: if the job is not found.
1277- """
1278- job = getUtility(ICodeImportJobSet).getById(self._job_id)
1279- if job is None:
1280- self._logger.info(
1281- "Job %d not found, exiting quietly.", self._job_id)
1282+ def _trap_nosuchcodeimportjob(self, failure):
1283+ failure.trap(xmlrpc.Fault)
1284+ if failure.value.faultCode == NoSuchCodeImportJob.error_code:
1285 self._call_finish_job = False
1286 raise ExitQuietly
1287 else:
1288- return job
1289-
1290- @defer_to_thread
1291- @read_only_transaction
1292- def getSourceDetails(self):
1293- """Get a `CodeImportSourceDetails` for the job we are working on."""
1294- code_import = self.getJob().code_import
1295- source_details = CodeImportSourceDetails.fromCodeImport(code_import)
1296- self._logger.info(
1297- 'Found source details: %s', source_details.asArguments())
1298- self._branch_url = canonical_url(code_import.branch)
1299- self._code_import_id = code_import.id
1300- return source_details
1301-
1302- @defer_to_thread
1303- @writing_transaction
1304+ raise failure.value
1305+
1306+ def getWorkerArguments(self):
1307+ """Get arguments for the worker for the import we are working on.
1308+
1309+ This also sets the _branch_url and _log_file_name attributes for use
1310+ in the _logOopsFromFailure and finishJob methods respectively.
1311+ """
1312+ deferred = self.codeimport_endpoint.callRemote(
1313+ 'getImportDataForJobID', self._job_id)
1314+ def _processResult(result):
1315+ code_import_arguments, branch_url, log_file_name = result
1316+ self._branch_url = branch_url
1317+ self._log_file_name = log_file_name
1318+ self._logger.info(
1319+ 'Found source details: %s', code_import_arguments)
1320+ return code_import_arguments
1321+ return deferred.addCallbacks(_processResult, self._trap_nosuchcodeimportjob)
1322+
1323 def updateHeartbeat(self, tail):
1324 """Call the updateHeartbeat method for the job we are working on."""
1325 self._logger.debug("Updating heartbeat.")
1326- getUtility(ICodeImportJobWorkflow).updateHeartbeat(
1327- self.getJob(), tail)
1328+ deferred = self.codeimport_endpoint.callRemote(
1329+ 'updateHeartbeat', self._job_id, tail)
1330+ return deferred.addErrback(self._trap_nosuchcodeimportjob)
1331
1332 def _createLibrarianFileAlias(self, name, size, file, contentType):
1333- """Call `ILibraryFileAliasSet.create` with the given parameters.
1334+ """Call `IFileUploadClient.remoteAddFile` with the given parameters.
1335
1336- This is a separate method that exists only to be patched in
1337- tests.
1338+ This is a separate method that exists only to be patched in tests.
1339 """
1340- return getUtility(ILibraryFileAliasSet).create(
1341+ # This blocks, but never mind: nothing else is going on in the process
1342+ # by this point. We could dispatch to a thread if we felt like it, or
1343+ # even come up with an asynchronous implementation of the librarian
1344+ # protocol (it's not very complicated).
1345+ return getUtility(IFileUploadClient).remoteAddFile(
1346 name, size, file, contentType)
1347
1348- @defer_to_thread
1349- @writing_transaction
1350 def finishJob(self, status):
1351- """Call the finishJob method for the job we are working on.
1352+ """Call the finishJobID method for the job we are working on.
1353
1354 This method uploads the log file to the librarian first.
1355 """
1356- job = self.getJob()
1357 log_file_size = self._log_file.tell()
1358 if log_file_size > 0:
1359 self._log_file.seek(0)
1360- branch = job.code_import.branch
1361- log_file_name = '%s-%s-%s-log.txt' % (
1362- branch.owner.name, branch.product.name, branch.name)
1363 try:
1364- log_file_alias = self._createLibrarianFileAlias(
1365- log_file_name, log_file_size, self._log_file,
1366+ log_file_alias_url = self._createLibrarianFileAlias(
1367+ self._log_file_name, log_file_size, self._log_file,
1368 'text/plain')
1369 self._logger.info(
1370- "Uploaded logs to librarian %s.", log_file_alias.getURL())
1371+ "Uploaded logs to librarian %s.", log_file_alias_url)
1372 except:
1373 self._logger.error("Upload to librarian failed.")
1374 self._logOopsFromFailure(failure.Failure())
1375- log_file_alias = None
1376+ log_file_alias_url = ''
1377 else:
1378- log_file_alias = None
1379- getUtility(ICodeImportJobWorkflow).finishJob(
1380- job, status, log_file_alias)
1381+ log_file_alias_url = ''
1382+ return self.codeimport_endpoint.callRemote(
1383+ 'finishJobID', self._job_id, status.name, log_file_alias_url
1384+ ).addErrback(self._trap_nosuchcodeimportjob)
1385
1386 def _makeProcessProtocol(self, deferred):
1387 """Make an `CodeImportWorkerMonitorProtocol` for a subprocess."""
1388 return CodeImportWorkerMonitorProtocol(deferred, self, self._log_file)
1389
1390- def _launchProcess(self, source_details):
1391+ def _launchProcess(self, worker_arguments):
1392 """Launch the code-import-worker.py child process."""
1393 deferred = defer.Deferred()
1394 protocol = self._makeProcessProtocol(deferred)
1395 interpreter = '%s/bin/py' % config.root
1396- command = [interpreter, self.path_to_script]
1397- command.extend(source_details.asArguments())
1398+ command = [interpreter, self.path_to_script] + worker_arguments
1399 self._logger.info(
1400 "Launching worker child process %s.", command)
1401 reactor.spawnProcess(
1402@@ -286,7 +228,7 @@
1403
1404 def run(self):
1405 """Perform the import."""
1406- return self.getSourceDetails().addCallback(
1407+ return self.getWorkerArguments().addCallback(
1408 self._launchProcess).addBoth(
1409 self.callFinishJob).addErrback(
1410 self._silenceQuietExit)
1411@@ -297,6 +239,11 @@
1412 return None
1413
1414 def _reasonToStatus(self, reason):
1415+ """Translate the 'reason' for process exit into a result status.
1416+
1417+ Different exit codes are presumed by Twisted to be errors, but are
1418+ different kinds of success for us.
1419+ """
1420 if isinstance(reason, failure.Failure):
1421 if reason.check(error.ProcessTerminated):
1422 if reason.value.exitCode == \
1423
1424=== modified file 'lib/lp/codehosting/inmemory.py'
1425--- lib/lp/codehosting/inmemory.py 2010-02-24 04:24:01 +0000
1426+++ lib/lp/codehosting/inmemory.py 2010-03-14 20:28:32 +0000
1427@@ -62,12 +62,22 @@
1428 expected_value = kwargs[attribute]
1429 branch = self._object_set.get(branch_id)
1430 if branch is None:
1431- return None
1432+ return FakeResult(None)
1433 if expected_value is getattr(branch, attribute):
1434- return branch
1435+ return FakeResult(branch)
1436 return None
1437
1438
1439+class FakeResult:
1440+ """As with FakeStore, just enough of a result to pass tests."""
1441+
1442+ def __init__(self, branch):
1443+ self._branch = branch
1444+
1445+ def one(self):
1446+ return self._branch
1447+
1448+
1449 class FakeDatabaseObject:
1450 """Base class for fake database objects."""
1451
1452
1453=== modified file 'lib/lp/testing/__init__.py'
1454--- lib/lp/testing/__init__.py 2010-02-17 20:54:36 +0000
1455+++ lib/lp/testing/__init__.py 2010-03-14 20:28:32 +0000
1456@@ -294,7 +294,7 @@
1457 sql_class = type(sql_object)
1458 store = Store.of(sql_object)
1459 found_object = store.find(
1460- sql_class, **({'id': sql_object.id, attribute_name: date}))
1461+ sql_class, **({'id': sql_object.id, attribute_name: date})).one()
1462 if found_object is None:
1463 self.fail(
1464 "Expected %s to be %s, but it was %s."
1465
1466=== renamed file 'scripts/code-import-worker-db.py' => 'scripts/code-import-worker-monitor.py'
1467--- scripts/code-import-worker-db.py 2009-10-13 14:38:07 +0000
1468+++ scripts/code-import-worker-monitor.py 2010-03-14 20:28:32 +0000
1469@@ -22,11 +22,14 @@
1470
1471 from twisted.internet import defer, reactor
1472 from twisted.python import log
1473+from twisted.web import xmlrpc
1474+
1475+from canonical.config import config
1476+from canonical.twistedsupport.loggingsupport import set_up_oops_reporting
1477
1478 from lp.codehosting.codeimport.workermonitor import (
1479 CodeImportWorkerMonitor)
1480 from lp.services.scripts.base import LaunchpadScript
1481-from canonical.twistedsupport.loggingsupport import set_up_oops_reporting
1482
1483
1484 class CodeImportWorker(LaunchpadScript):
1485@@ -35,22 +38,30 @@
1486 LaunchpadScript.__init__(self, name, dbuser, test_args)
1487 set_up_oops_reporting(name, mangle_stdout=True)
1488
1489+ def _init_db(self, implicit_begin, isolation):
1490+ # This script doesn't access the database.
1491+ pass
1492+
1493 def main(self):
1494+ arg, = self.args
1495+ job_id = int(arg)
1496 # XXX: MichaelHudson 2008-05-07 bug=227586: Setting up the component
1497 # architecture overrides $GNUPGHOME to something stupid.
1498 os.environ['GNUPGHOME'] = ''
1499- reactor.callWhenRunning(self._run_reactor)
1500+ reactor.callWhenRunning(self._do_import, job_id)
1501 reactor.run()
1502
1503- def _run_reactor(self):
1504- defer.maybeDeferred(self._main).addErrback(
1505+ def _do_import(self, job_id):
1506+ defer.maybeDeferred(self._main, job_id).addErrback(
1507 log.err).addCallback(
1508 lambda ignored: reactor.stop())
1509
1510- def _main(self):
1511- arg, = self.args
1512- return CodeImportWorkerMonitor(int(arg), self.logger).run()
1513+ def _main(self, job_id):
1514+ worker = CodeImportWorkerMonitor(
1515+ job_id, self.logger,
1516+ xmlrpc.Proxy(config.codeimportdispatcher.codeimportscheduler_url))
1517+ return worker.run()
1518
1519 if __name__ == '__main__':
1520- script = CodeImportWorker('codeimportworker', dbuser='codeimportworker')
1521+ script = CodeImportWorker('codeimportworker')
1522 script.run()
1523
1524=== modified file 'scripts/code-import-worker.py'
1525--- scripts/code-import-worker.py 2010-02-04 01:12:05 +0000
1526+++ scripts/code-import-worker.py 2010-03-14 20:28:32 +0000
1527@@ -8,7 +8,7 @@
1528 By 'processing a code import' we mean importing or updating code from a
1529 remote, non-Bazaar, repository.
1530
1531-This script is usually run by the code-import-worker-db.py script that
1532+This script is usually run by the code-import-worker-monitor.py script that
1533 communicates progress and results to the database.
1534 """
1535