Merge ~cjwatson/launchpad:code-import-worker-librarian into launchpad:master

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: 7dc5fd4ab7dc47e41688bfdaaf930949145a4026
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~cjwatson/launchpad:code-import-worker-librarian
Merge into: launchpad:master
Diff against target: 338 lines (+23/-178)
2 files modified
lib/lp/codehosting/codeimport/tests/test_workermonitor.py (+16/-106)
lib/lp/codehosting/codeimport/workermonitor.py (+7/-72)
Reviewer Review Type Date Requested Status
Ioana Lasc (community) Approve
Review via email: mp+392369@code.launchpad.net

Commit message

Send code import log file data over XML-RPC

Description of the change

This will make it easier to split out the code import worker from the main Launchpad codebase, as it will no longer need the remote librarian upload client which currently expects to have direct Launchpad database access.

To post a comment you must log in.
Revision history for this message
Ioana Lasc (ilasc) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
index 2810f86..df206b6 100644
--- a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
+++ b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
@@ -20,20 +20,17 @@ from bzrlib.branch import Branch
20from bzrlib.tests import TestCaseInTempDir20from bzrlib.tests import TestCaseInTempDir
21from dulwich.repo import Repo as GitRepo21from dulwich.repo import Repo as GitRepo
22import oops_twisted22import oops_twisted
23from six.moves.urllib.request import urlopen23from six.moves import xmlrpc_client
24from testtools.twistedsupport import (24from testtools.twistedsupport import (
25 assert_fails_with,25 assert_fails_with,
26 AsynchronousDeferredRunTest,26 AsynchronousDeferredRunTest,
27 flush_logged_errors,
28 )27 )
29import transaction
30from twisted.internet import (28from twisted.internet import (
31 defer,29 defer,
32 error,30 error,
33 protocol,31 protocol,
34 reactor,32 reactor,
35 )33 )
36from twisted.python import log
37from twisted.web import xmlrpc34from twisted.web import xmlrpc
38from zope.component import getUtility35from zope.component import getUtility
39from zope.security.proxy import removeSecurityProxy36from zope.security.proxy import removeSecurityProxy
@@ -88,7 +85,6 @@ from lp.testing import (
88 TestCase,85 TestCase,
89 )86 )
90from lp.testing.factory import LaunchpadObjectFactory87from lp.testing.factory import LaunchpadObjectFactory
91from lp.testing.fakemethod import FakeMethod
92from lp.testing.layers import (88from lp.testing.layers import (
93 LaunchpadZopelessLayer,89 LaunchpadZopelessLayer,
94 ZopelessAppServerLayer,90 ZopelessAppServerLayer,
@@ -224,20 +220,14 @@ class TestWorkerMonitorUnit(TestCase):
224 layer = LaunchpadZopelessLayer220 layer = LaunchpadZopelessLayer
225 run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)221 run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
226222
227 class WorkerMonitor(CodeImportWorkerMonitor):223 def makeWorkerMonitorWithJob(self, job_id=1, job_data={}):
228 """A subclass of CodeImportWorkerMonitor that stubs logging OOPSes."""224 return CodeImportWorkerMonitor(
229
230 def _logOopsFromFailure(self, failure):
231 log.err(failure)
232
233 def makeWorkerMonitorWithJob(self, job_id=1, job_data=()):
234 return self.WorkerMonitor(
235 job_id, BufferLogger(),225 job_id, BufferLogger(),
236 FakeCodeImportScheduleEndpointProxy({job_id: job_data}),226 FakeCodeImportScheduleEndpointProxy({job_id: job_data}),
237 "anything")227 "anything")
238228
239 def makeWorkerMonitorWithoutJob(self, exception=None):229 def makeWorkerMonitorWithoutJob(self, exception=None):
240 return self.WorkerMonitor(230 return CodeImportWorkerMonitor(
241 1, BufferLogger(),231 1, BufferLogger(),
242 FakeCodeImportScheduleEndpointProxy({}, exception),232 FakeCodeImportScheduleEndpointProxy({}, exception),
243 None)233 None)
@@ -247,64 +237,11 @@ class TestWorkerMonitorUnit(TestCase):
247 # 'arguments' part of what getImportDataForJobID returns.237 # 'arguments' part of what getImportDataForJobID returns.
248 args = [self.factory.getUniqueString(),238 args = [self.factory.getUniqueString(),
249 self.factory.getUniqueString()]239 self.factory.getUniqueString()]
250 worker_monitor = self.makeWorkerMonitorWithJob(1, (args, 1, 2))240 data = {'arguments': args}
251 return worker_monitor.getWorkerArguments().addCallback(
252 self.assertEqual, args)
253
254 def test_getWorkerArguments_dict(self):
255 # getWorkerArguments returns a deferred that fires with the
256 # 'arguments' part of what getImportDataForJobID returns.
257 # (New protocol: data passed as a dict.)
258 args = [self.factory.getUniqueString(),
259 self.factory.getUniqueString()]
260 data = {'arguments': args, 'target_url': 1, 'log_file_name': 2}
261 worker_monitor = self.makeWorkerMonitorWithJob(1, data)241 worker_monitor = self.makeWorkerMonitorWithJob(1, data)
262 return worker_monitor.getWorkerArguments().addCallback(242 return worker_monitor.getWorkerArguments().addCallback(
263 self.assertEqual, args)243 self.assertEqual, args)
264244
265 def test_getWorkerArguments_sets_target_url_and_logfilename(self):
266 # getWorkerArguments sets the _target_url (for use in oops reports)
267 # and _log_file_name (for upload to the librarian) attributes on the
268 # WorkerMonitor from the data returned by getImportDataForJobID.
269 target_url = self.factory.getUniqueString()
270 log_file_name = self.factory.getUniqueString()
271 worker_monitor = self.makeWorkerMonitorWithJob(
272 1, (['a'], target_url, log_file_name))
273
274 def check_branch_log(ignored):
275 # Looking at the _ attributes here is in slightly poor taste, but
276 # much much easier than them by logging and parsing an oops, etc.
277 self.assertEqual(
278 (target_url, log_file_name),
279 (worker_monitor._target_url, worker_monitor._log_file_name))
280
281 return worker_monitor.getWorkerArguments().addCallback(
282 check_branch_log)
283
284 def test_getWorkerArguments_sets_target_url_and_logfilename_dict(self):
285 # getWorkerArguments sets the _target_url (for use in oops reports)
286 # and _log_file_name (for upload to the librarian) attributes on the
287 # WorkerMonitor from the data returned by getImportDataForJobID.
288 # (New protocol: data passed as a dict.)
289 target_url = self.factory.getUniqueString()
290 log_file_name = self.factory.getUniqueString()
291 data = {
292 'arguments': ['a'],
293 'target_url': target_url,
294 'log_file_name': log_file_name,
295 }
296 worker_monitor = self.makeWorkerMonitorWithJob(1, data)
297
298 def check_branch_log(ignored):
299 # Looking at the _ attributes here is in slightly poor taste, but
300 # much much easier than them by logging and parsing an oops, etc.
301 self.assertEqual(
302 (target_url, log_file_name),
303 (worker_monitor._target_url, worker_monitor._log_file_name))
304
305 return worker_monitor.getWorkerArguments().addCallback(
306 check_branch_log)
307
308 def test_getWorkerArguments_job_not_found_raises_exit_quietly(self):245 def test_getWorkerArguments_job_not_found_raises_exit_quietly(self):
309 # When getImportDataForJobID signals a fault indicating that246 # When getImportDataForJobID signals a fault indicating that
310 # getWorkerArguments didn't find the supplied job, getWorkerArguments247 # getWorkerArguments didn't find the supplied job, getWorkerArguments
@@ -345,61 +282,34 @@ class TestWorkerMonitorUnit(TestCase):
345282
346 def test_finishJob_calls_finishJobID_empty_log_file(self):283 def test_finishJob_calls_finishJobID_empty_log_file(self):
347 # When the log file is empty, finishJob calls finishJobID with the284 # When the log file is empty, finishJob calls finishJobID with the
348 # name of the status enum and an empty string to indicate that no log285 # name of the status enum and an empty binary string.
349 # file was uplaoded to the librarian.
350 job_id = self.factory.getUniqueInteger()286 job_id = self.factory.getUniqueInteger()
351 worker_monitor = self.makeWorkerMonitorWithJob(job_id)287 worker_monitor = self.makeWorkerMonitorWithJob(job_id)
352 self.assertEqual(worker_monitor._log_file.tell(), 0)288 self.assertEqual(worker_monitor._log_file.tell(), 0)
353289
354 def check_finishJob_called(result):290 def check_finishJob_called(result):
355 self.assertEqual(291 self.assertEqual(
356 [('finishJobID', job_id, 'SUCCESS', '')],292 [('finishJobID', job_id, 'SUCCESS',
293 xmlrpc_client.Binary(b''))],
357 worker_monitor.codeimport_endpoint.calls)294 worker_monitor.codeimport_endpoint.calls)
358295
359 return worker_monitor.finishJob(296 return worker_monitor.finishJob(
360 CodeImportResultStatus.SUCCESS).addCallback(297 CodeImportResultStatus.SUCCESS).addCallback(
361 check_finishJob_called)298 check_finishJob_called)
362299
363 def test_finishJob_uploads_nonempty_file_to_librarian(self):300 def test_finishJob_sends_nonempty_file_to_scheduler(self):
364 # finishJob method uploads the log file to the librarian and calls the301 # finishJob method calls finishJobID with the contents of the log
365 # finishJobID XML-RPC method with the url of that file.302 # file.
366 self.layer.force_dirty_database()
367 log_bytes = self.factory.getUniqueBytes()
368 worker_monitor = self.makeWorkerMonitorWithJob()
369 worker_monitor._log_file.write(log_bytes)
370
371 def check_file_uploaded(result):
372 transaction.abort()
373 url = worker_monitor.codeimport_endpoint.calls[0][3]
374 got_log_bytes = urlopen(url).read()
375 self.assertEqual(log_bytes, got_log_bytes)
376
377 return worker_monitor.finishJob(
378 CodeImportResultStatus.SUCCESS).addCallback(
379 check_file_uploaded)
380
381 @suppress_stderr
382 def test_finishJob_still_calls_finishJobID_if_upload_fails(self):
383 # If the upload to the librarian fails for any reason, the worker
384 # monitor still calls the finishJobID XML-RPC method, but logs an
385 # error to indicate there was a problem.
386 class Fail(Exception):
387 """Some arbitrary failure."""
388
389 # Write some text so that we try to upload the log.
390 job_id = self.factory.getUniqueInteger()303 job_id = self.factory.getUniqueInteger()
304 log_bytes = self.factory.getUniqueBytes()
391 worker_monitor = self.makeWorkerMonitorWithJob(job_id)305 worker_monitor = self.makeWorkerMonitorWithJob(job_id)
392 worker_monitor._log_file.write(b'some text')306 worker_monitor._log_file.write(log_bytes)
393
394 # Make _createLibrarianFileAlias fail in a distinctive way.
395 worker_monitor._createLibrarianFileAlias = FakeMethod(failure=Fail())
396307
397 def check_finishJob_called(result):308 def check_finishJob_called(result):
398 self.assertEqual(309 self.assertEqual(
399 [('finishJobID', job_id, 'SUCCESS', '')],310 [('finishJobID', job_id, 'SUCCESS',
311 xmlrpc_client.Binary(log_bytes))],
400 worker_monitor.codeimport_endpoint.calls)312 worker_monitor.codeimport_endpoint.calls)
401 errors = flush_logged_errors(Fail)
402 self.assertEqual(1, len(errors))
403313
404 return worker_monitor.finishJob(314 return worker_monitor.finishJob(
405 CodeImportResultStatus.SUCCESS).addCallback(315 CodeImportResultStatus.SUCCESS).addCallback(
@@ -578,7 +488,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
578488
579 def __init__(self, process_deferred, has_job=True):489 def __init__(self, process_deferred, has_job=True):
580 if has_job:490 if has_job:
581 job_data = {1: ([], '', '')}491 job_data = {1: {'arguments': []}}
582 else:492 else:
583 job_data = {}493 job_data = {}
584 CodeImportWorkerMonitor.__init__(494 CodeImportWorkerMonitor.__init__(
diff --git a/lib/lp/codehosting/codeimport/workermonitor.py b/lib/lp/codehosting/codeimport/workermonitor.py
index 1fc9ec2..6fa304b 100644
--- a/lib/lp/codehosting/codeimport/workermonitor.py
+++ b/lib/lp/codehosting/codeimport/workermonitor.py
@@ -13,6 +13,7 @@ import os
13import tempfile13import tempfile
1414
15import six15import six
16from six.moves import xmlrpc_client
16from twisted.internet import (17from twisted.internet import (
17 defer,18 defer,
18 error,19 error,
@@ -21,16 +22,13 @@ from twisted.internet import (
21 )22 )
22from twisted.python import failure23from twisted.python import failure
23from twisted.web import xmlrpc24from twisted.web import xmlrpc
24from zope.component import getUtility
2525
26from lp.code.enums import CodeImportResultStatus26from lp.code.enums import CodeImportResultStatus
27from lp.codehosting.codeimport.worker import CodeImportWorkerExitCode27from lp.codehosting.codeimport.worker import CodeImportWorkerExitCode
28from lp.services.config import config28from lp.services.config import config
29from lp.services.librarian.interfaces.client import IFileUploadClient
30from lp.services.twistedsupport.processmonitor import (29from lp.services.twistedsupport.processmonitor import (
31 ProcessMonitorProtocolWithTimeout,30 ProcessMonitorProtocolWithTimeout,
32 )31 )
33from lp.services.webapp import errorlog
34from lp.xmlrpc.faults import NoSuchCodeImportJob32from lp.xmlrpc.faults import NoSuchCodeImportJob
3533
3634
@@ -137,30 +135,8 @@ class CodeImportWorkerMonitor:
137 self.codeimport_endpoint = codeimport_endpoint135 self.codeimport_endpoint = codeimport_endpoint
138 self._call_finish_job = True136 self._call_finish_job = True
139 self._log_file = tempfile.TemporaryFile()137 self._log_file = tempfile.TemporaryFile()
140 self._target_url = None
141 self._log_file_name = 'no-name-set.txt'
142 self._access_policy = access_policy138 self._access_policy = access_policy
143139
144 def _logOopsFromFailure(self, failure):
145 config = errorlog.globalErrorUtility._oops_config
146 context = {
147 'twisted_failure': failure,
148 'http_request': errorlog.ScriptRequest(
149 [('code_import_job_id', self._job_id)], self._target_url),
150 }
151 report = config.create(context)
152
153 def log_oops_if_published(ids):
154 if ids:
155 self._logger.info(
156 "Logged OOPS id %s: %s: %s",
157 report['id'], report.get('type', 'No exception type'),
158 report.get('value', 'No exception value'))
159
160 d = config.publish(report)
161 d.addCallback(log_oops_if_published)
162 return d
163
164 def _trap_nosuchcodeimportjob(self, failure):140 def _trap_nosuchcodeimportjob(self, failure):
165 failure.trap(xmlrpc.Fault)141 failure.trap(xmlrpc.Fault)
166 if failure.value.faultCode == NoSuchCodeImportJob.error_code:142 if failure.value.faultCode == NoSuchCodeImportJob.error_code:
@@ -170,25 +146,12 @@ class CodeImportWorkerMonitor:
170 raise failure.value146 raise failure.value
171147
172 def getWorkerArguments(self):148 def getWorkerArguments(self):
173 """Get arguments for the worker for the import we are working on.149 """Get arguments for the worker for the import we are working on."""
174
175 This also sets the _target_url and _log_file_name attributes for use
176 in the _logOopsFromFailure and finishJob methods respectively.
177 """
178 deferred = self.codeimport_endpoint.callRemote(150 deferred = self.codeimport_endpoint.callRemote(
179 'getImportDataForJobID', self._job_id)151 'getImportDataForJobID', self._job_id)
180152
181 def _processResult(result):153 def _processResult(result):
182 if isinstance(result, dict):154 code_import_arguments = result['arguments']
183 code_import_arguments = result['arguments']
184 target_url = result['target_url']
185 log_file_name = result['log_file_name']
186 else:
187 # XXX cjwatson 2018-03-15: Remove once the scheduler always
188 # sends a dict.
189 code_import_arguments, target_url, log_file_name = result
190 self._target_url = target_url
191 self._log_file_name = log_file_name
192 self._logger.info(155 self._logger.info(
193 'Found source details: %s', code_import_arguments)156 'Found source details: %s', code_import_arguments)
194 return code_import_arguments157 return code_import_arguments
@@ -206,40 +169,12 @@ class CodeImportWorkerMonitor:
206 six.ensure_text(tail, errors='replace'))169 six.ensure_text(tail, errors='replace'))
207 return deferred.addErrback(self._trap_nosuchcodeimportjob)170 return deferred.addErrback(self._trap_nosuchcodeimportjob)
208171
209 def _createLibrarianFileAlias(self, name, size, file, contentType):
210 """Call `IFileUploadClient.remoteAddFile` with the given parameters.
211
212 This is a separate method that exists only to be patched in tests.
213 """
214 # This blocks, but never mind: nothing else is going on in the process
215 # by this point. We could dispatch to a thread if we felt like it, or
216 # even come up with an asynchronous implementation of the librarian
217 # protocol (it's not very complicated).
218 return getUtility(IFileUploadClient).remoteAddFile(
219 name, size, file, contentType)
220
221 def finishJob(self, status):172 def finishJob(self, status):
222 """Call the finishJobID method for the job we are working on.173 """Call the finishJobID method for the job we are working on."""
223174 self._log_file.seek(0)
224 This method uploads the log file to the librarian first.
225 """
226 log_file_size = self._log_file.tell()
227 if log_file_size > 0:
228 self._log_file.seek(0)
229 try:
230 log_file_alias_url = self._createLibrarianFileAlias(
231 self._log_file_name, log_file_size, self._log_file,
232 'text/plain')
233 self._logger.info(
234 "Uploaded logs to librarian %s.", log_file_alias_url)
235 except:
236 self._logger.error("Upload to librarian failed.")
237 self._logOopsFromFailure(failure.Failure())
238 log_file_alias_url = ''
239 else:
240 log_file_alias_url = ''
241 return self.codeimport_endpoint.callRemote(175 return self.codeimport_endpoint.callRemote(
242 'finishJobID', self._job_id, status.name, log_file_alias_url176 'finishJobID', self._job_id, status.name,
177 xmlrpc_client.Binary(self._log_file.read())
243 ).addErrback(self._trap_nosuchcodeimportjob)178 ).addErrback(self._trap_nosuchcodeimportjob)
244179
245 def _makeProcessProtocol(self, deferred):180 def _makeProcessProtocol(self, deferred):

Subscribers

People subscribed via source and target branches

to status/vote changes: