Merge ~cjwatson/launchpad:code-import-worker-librarian into launchpad:master

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: 7dc5fd4ab7dc47e41688bfdaaf930949145a4026
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~cjwatson/launchpad:code-import-worker-librarian
Merge into: launchpad:master
Diff against target: 338 lines (+23/-178)
2 files modified
lib/lp/codehosting/codeimport/tests/test_workermonitor.py (+16/-106)
lib/lp/codehosting/codeimport/workermonitor.py (+7/-72)
Reviewer Review Type Date Requested Status
Ioana Lasc (community) Approve
Review via email: mp+392369@code.launchpad.net

Commit message

Send code import log file data over XML-RPC

Description of the change

This will make it easier to split out the code import worker from the main Launchpad codebase, as it will no longer need the remote librarian upload client which currently expects to have direct Launchpad database access.

To post a comment you must log in.
Revision history for this message
Ioana Lasc (ilasc) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
2index 2810f86..df206b6 100644
3--- a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
4+++ b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
5@@ -20,20 +20,17 @@ from bzrlib.branch import Branch
6 from bzrlib.tests import TestCaseInTempDir
7 from dulwich.repo import Repo as GitRepo
8 import oops_twisted
9-from six.moves.urllib.request import urlopen
10+from six.moves import xmlrpc_client
11 from testtools.twistedsupport import (
12 assert_fails_with,
13 AsynchronousDeferredRunTest,
14- flush_logged_errors,
15 )
16-import transaction
17 from twisted.internet import (
18 defer,
19 error,
20 protocol,
21 reactor,
22 )
23-from twisted.python import log
24 from twisted.web import xmlrpc
25 from zope.component import getUtility
26 from zope.security.proxy import removeSecurityProxy
27@@ -88,7 +85,6 @@ from lp.testing import (
28 TestCase,
29 )
30 from lp.testing.factory import LaunchpadObjectFactory
31-from lp.testing.fakemethod import FakeMethod
32 from lp.testing.layers import (
33 LaunchpadZopelessLayer,
34 ZopelessAppServerLayer,
35@@ -224,20 +220,14 @@ class TestWorkerMonitorUnit(TestCase):
36 layer = LaunchpadZopelessLayer
37 run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
38
39- class WorkerMonitor(CodeImportWorkerMonitor):
40- """A subclass of CodeImportWorkerMonitor that stubs logging OOPSes."""
41-
42- def _logOopsFromFailure(self, failure):
43- log.err(failure)
44-
45- def makeWorkerMonitorWithJob(self, job_id=1, job_data=()):
46- return self.WorkerMonitor(
47+ def makeWorkerMonitorWithJob(self, job_id=1, job_data={}):
48+ return CodeImportWorkerMonitor(
49 job_id, BufferLogger(),
50 FakeCodeImportScheduleEndpointProxy({job_id: job_data}),
51 "anything")
52
53 def makeWorkerMonitorWithoutJob(self, exception=None):
54- return self.WorkerMonitor(
55+ return CodeImportWorkerMonitor(
56 1, BufferLogger(),
57 FakeCodeImportScheduleEndpointProxy({}, exception),
58 None)
59@@ -247,64 +237,11 @@ class TestWorkerMonitorUnit(TestCase):
60 # 'arguments' part of what getImportDataForJobID returns.
61 args = [self.factory.getUniqueString(),
62 self.factory.getUniqueString()]
63- worker_monitor = self.makeWorkerMonitorWithJob(1, (args, 1, 2))
64- return worker_monitor.getWorkerArguments().addCallback(
65- self.assertEqual, args)
66-
67- def test_getWorkerArguments_dict(self):
68- # getWorkerArguments returns a deferred that fires with the
69- # 'arguments' part of what getImportDataForJobID returns.
70- # (New protocol: data passed as a dict.)
71- args = [self.factory.getUniqueString(),
72- self.factory.getUniqueString()]
73- data = {'arguments': args, 'target_url': 1, 'log_file_name': 2}
74+ data = {'arguments': args}
75 worker_monitor = self.makeWorkerMonitorWithJob(1, data)
76 return worker_monitor.getWorkerArguments().addCallback(
77 self.assertEqual, args)
78
79- def test_getWorkerArguments_sets_target_url_and_logfilename(self):
80- # getWorkerArguments sets the _target_url (for use in oops reports)
81- # and _log_file_name (for upload to the librarian) attributes on the
82- # WorkerMonitor from the data returned by getImportDataForJobID.
83- target_url = self.factory.getUniqueString()
84- log_file_name = self.factory.getUniqueString()
85- worker_monitor = self.makeWorkerMonitorWithJob(
86- 1, (['a'], target_url, log_file_name))
87-
88- def check_branch_log(ignored):
89- # Looking at the _ attributes here is in slightly poor taste, but
90- # much much easier than them by logging and parsing an oops, etc.
91- self.assertEqual(
92- (target_url, log_file_name),
93- (worker_monitor._target_url, worker_monitor._log_file_name))
94-
95- return worker_monitor.getWorkerArguments().addCallback(
96- check_branch_log)
97-
98- def test_getWorkerArguments_sets_target_url_and_logfilename_dict(self):
99- # getWorkerArguments sets the _target_url (for use in oops reports)
100- # and _log_file_name (for upload to the librarian) attributes on the
101- # WorkerMonitor from the data returned by getImportDataForJobID.
102- # (New protocol: data passed as a dict.)
103- target_url = self.factory.getUniqueString()
104- log_file_name = self.factory.getUniqueString()
105- data = {
106- 'arguments': ['a'],
107- 'target_url': target_url,
108- 'log_file_name': log_file_name,
109- }
110- worker_monitor = self.makeWorkerMonitorWithJob(1, data)
111-
112- def check_branch_log(ignored):
113- # Looking at the _ attributes here is in slightly poor taste, but
114- # much much easier than them by logging and parsing an oops, etc.
115- self.assertEqual(
116- (target_url, log_file_name),
117- (worker_monitor._target_url, worker_monitor._log_file_name))
118-
119- return worker_monitor.getWorkerArguments().addCallback(
120- check_branch_log)
121-
122 def test_getWorkerArguments_job_not_found_raises_exit_quietly(self):
123 # When getImportDataForJobID signals a fault indicating that
124 # getWorkerArguments didn't find the supplied job, getWorkerArguments
125@@ -345,61 +282,34 @@ class TestWorkerMonitorUnit(TestCase):
126
127 def test_finishJob_calls_finishJobID_empty_log_file(self):
128 # When the log file is empty, finishJob calls finishJobID with the
129- # name of the status enum and an empty string to indicate that no log
130- # file was uplaoded to the librarian.
131+ # name of the status enum and an empty binary string.
132 job_id = self.factory.getUniqueInteger()
133 worker_monitor = self.makeWorkerMonitorWithJob(job_id)
134 self.assertEqual(worker_monitor._log_file.tell(), 0)
135
136 def check_finishJob_called(result):
137 self.assertEqual(
138- [('finishJobID', job_id, 'SUCCESS', '')],
139+ [('finishJobID', job_id, 'SUCCESS',
140+ xmlrpc_client.Binary(b''))],
141 worker_monitor.codeimport_endpoint.calls)
142
143 return worker_monitor.finishJob(
144 CodeImportResultStatus.SUCCESS).addCallback(
145 check_finishJob_called)
146
147- def test_finishJob_uploads_nonempty_file_to_librarian(self):
148- # finishJob method uploads the log file to the librarian and calls the
149- # finishJobID XML-RPC method with the url of that file.
150- self.layer.force_dirty_database()
151- log_bytes = self.factory.getUniqueBytes()
152- worker_monitor = self.makeWorkerMonitorWithJob()
153- worker_monitor._log_file.write(log_bytes)
154-
155- def check_file_uploaded(result):
156- transaction.abort()
157- url = worker_monitor.codeimport_endpoint.calls[0][3]
158- got_log_bytes = urlopen(url).read()
159- self.assertEqual(log_bytes, got_log_bytes)
160-
161- return worker_monitor.finishJob(
162- CodeImportResultStatus.SUCCESS).addCallback(
163- check_file_uploaded)
164-
165- @suppress_stderr
166- def test_finishJob_still_calls_finishJobID_if_upload_fails(self):
167- # If the upload to the librarian fails for any reason, the worker
168- # monitor still calls the finishJobID XML-RPC method, but logs an
169- # error to indicate there was a problem.
170- class Fail(Exception):
171- """Some arbitrary failure."""
172-
173- # Write some text so that we try to upload the log.
174+ def test_finishJob_sends_nonempty_file_to_scheduler(self):
175+ # finishJob method calls finishJobID with the contents of the log
176+ # file.
177 job_id = self.factory.getUniqueInteger()
178+ log_bytes = self.factory.getUniqueBytes()
179 worker_monitor = self.makeWorkerMonitorWithJob(job_id)
180- worker_monitor._log_file.write(b'some text')
181-
182- # Make _createLibrarianFileAlias fail in a distinctive way.
183- worker_monitor._createLibrarianFileAlias = FakeMethod(failure=Fail())
184+ worker_monitor._log_file.write(log_bytes)
185
186 def check_finishJob_called(result):
187 self.assertEqual(
188- [('finishJobID', job_id, 'SUCCESS', '')],
189+ [('finishJobID', job_id, 'SUCCESS',
190+ xmlrpc_client.Binary(log_bytes))],
191 worker_monitor.codeimport_endpoint.calls)
192- errors = flush_logged_errors(Fail)
193- self.assertEqual(1, len(errors))
194
195 return worker_monitor.finishJob(
196 CodeImportResultStatus.SUCCESS).addCallback(
197@@ -578,7 +488,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
198
199 def __init__(self, process_deferred, has_job=True):
200 if has_job:
201- job_data = {1: ([], '', '')}
202+ job_data = {1: {'arguments': []}}
203 else:
204 job_data = {}
205 CodeImportWorkerMonitor.__init__(
206diff --git a/lib/lp/codehosting/codeimport/workermonitor.py b/lib/lp/codehosting/codeimport/workermonitor.py
207index 1fc9ec2..6fa304b 100644
208--- a/lib/lp/codehosting/codeimport/workermonitor.py
209+++ b/lib/lp/codehosting/codeimport/workermonitor.py
210@@ -13,6 +13,7 @@ import os
211 import tempfile
212
213 import six
214+from six.moves import xmlrpc_client
215 from twisted.internet import (
216 defer,
217 error,
218@@ -21,16 +22,13 @@ from twisted.internet import (
219 )
220 from twisted.python import failure
221 from twisted.web import xmlrpc
222-from zope.component import getUtility
223
224 from lp.code.enums import CodeImportResultStatus
225 from lp.codehosting.codeimport.worker import CodeImportWorkerExitCode
226 from lp.services.config import config
227-from lp.services.librarian.interfaces.client import IFileUploadClient
228 from lp.services.twistedsupport.processmonitor import (
229 ProcessMonitorProtocolWithTimeout,
230 )
231-from lp.services.webapp import errorlog
232 from lp.xmlrpc.faults import NoSuchCodeImportJob
233
234
235@@ -137,30 +135,8 @@ class CodeImportWorkerMonitor:
236 self.codeimport_endpoint = codeimport_endpoint
237 self._call_finish_job = True
238 self._log_file = tempfile.TemporaryFile()
239- self._target_url = None
240- self._log_file_name = 'no-name-set.txt'
241 self._access_policy = access_policy
242
243- def _logOopsFromFailure(self, failure):
244- config = errorlog.globalErrorUtility._oops_config
245- context = {
246- 'twisted_failure': failure,
247- 'http_request': errorlog.ScriptRequest(
248- [('code_import_job_id', self._job_id)], self._target_url),
249- }
250- report = config.create(context)
251-
252- def log_oops_if_published(ids):
253- if ids:
254- self._logger.info(
255- "Logged OOPS id %s: %s: %s",
256- report['id'], report.get('type', 'No exception type'),
257- report.get('value', 'No exception value'))
258-
259- d = config.publish(report)
260- d.addCallback(log_oops_if_published)
261- return d
262-
263 def _trap_nosuchcodeimportjob(self, failure):
264 failure.trap(xmlrpc.Fault)
265 if failure.value.faultCode == NoSuchCodeImportJob.error_code:
266@@ -170,25 +146,12 @@ class CodeImportWorkerMonitor:
267 raise failure.value
268
269 def getWorkerArguments(self):
270- """Get arguments for the worker for the import we are working on.
271-
272- This also sets the _target_url and _log_file_name attributes for use
273- in the _logOopsFromFailure and finishJob methods respectively.
274- """
275+ """Get arguments for the worker for the import we are working on."""
276 deferred = self.codeimport_endpoint.callRemote(
277 'getImportDataForJobID', self._job_id)
278
279 def _processResult(result):
280- if isinstance(result, dict):
281- code_import_arguments = result['arguments']
282- target_url = result['target_url']
283- log_file_name = result['log_file_name']
284- else:
285- # XXX cjwatson 2018-03-15: Remove once the scheduler always
286- # sends a dict.
287- code_import_arguments, target_url, log_file_name = result
288- self._target_url = target_url
289- self._log_file_name = log_file_name
290+ code_import_arguments = result['arguments']
291 self._logger.info(
292 'Found source details: %s', code_import_arguments)
293 return code_import_arguments
294@@ -206,40 +169,12 @@ class CodeImportWorkerMonitor:
295 six.ensure_text(tail, errors='replace'))
296 return deferred.addErrback(self._trap_nosuchcodeimportjob)
297
298- def _createLibrarianFileAlias(self, name, size, file, contentType):
299- """Call `IFileUploadClient.remoteAddFile` with the given parameters.
300-
301- This is a separate method that exists only to be patched in tests.
302- """
303- # This blocks, but never mind: nothing else is going on in the process
304- # by this point. We could dispatch to a thread if we felt like it, or
305- # even come up with an asynchronous implementation of the librarian
306- # protocol (it's not very complicated).
307- return getUtility(IFileUploadClient).remoteAddFile(
308- name, size, file, contentType)
309-
310 def finishJob(self, status):
311- """Call the finishJobID method for the job we are working on.
312-
313- This method uploads the log file to the librarian first.
314- """
315- log_file_size = self._log_file.tell()
316- if log_file_size > 0:
317- self._log_file.seek(0)
318- try:
319- log_file_alias_url = self._createLibrarianFileAlias(
320- self._log_file_name, log_file_size, self._log_file,
321- 'text/plain')
322- self._logger.info(
323- "Uploaded logs to librarian %s.", log_file_alias_url)
324- except:
325- self._logger.error("Upload to librarian failed.")
326- self._logOopsFromFailure(failure.Failure())
327- log_file_alias_url = ''
328- else:
329- log_file_alias_url = ''
330+ """Call the finishJobID method for the job we are working on."""
331+ self._log_file.seek(0)
332 return self.codeimport_endpoint.callRemote(
333- 'finishJobID', self._job_id, status.name, log_file_alias_url
334+ 'finishJobID', self._job_id, status.name,
335+ xmlrpc_client.Binary(self._log_file.read())
336 ).addErrback(self._trap_nosuchcodeimportjob)
337
338 def _makeProcessProtocol(self, deferred):

Subscribers

People subscribed via source and target branches

to status/vote changes: