Merge ~cjwatson/launchpad:code-import-worker-librarian into launchpad:master
- Git
- lp:~cjwatson/launchpad
- code-import-worker-librarian
- Merge into master
Proposed by
Colin Watson
Status: | Merged |
---|---|
Approved by: | Colin Watson |
Approved revision: | 7dc5fd4ab7dc47e41688bfdaaf930949145a4026 |
Merge reported by: | Otto Co-Pilot |
Merged at revision: | not available |
Proposed branch: | ~cjwatson/launchpad:code-import-worker-librarian |
Merge into: | launchpad:master |
Diff against target: |
338 lines (+23/-178) 2 files modified
lib/lp/codehosting/codeimport/tests/test_workermonitor.py (+16/-106) lib/lp/codehosting/codeimport/workermonitor.py (+7/-72) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ioana Lasc (community) | Approve | ||
Review via email: mp+392369@code.launchpad.net |
Commit message
Send code import log file data over XML-RPC
Description of the change
This will make it easier to split out the code import worker from the main Launchpad codebase, as it will no longer need the remote librarian upload client which currently expects to have direct Launchpad database access.
To post a comment you must log in.
Revision history for this message
Ioana Lasc (ilasc) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py |
2 | index 2810f86..df206b6 100644 |
3 | --- a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py |
4 | +++ b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py |
5 | @@ -20,20 +20,17 @@ from bzrlib.branch import Branch |
6 | from bzrlib.tests import TestCaseInTempDir |
7 | from dulwich.repo import Repo as GitRepo |
8 | import oops_twisted |
9 | -from six.moves.urllib.request import urlopen |
10 | +from six.moves import xmlrpc_client |
11 | from testtools.twistedsupport import ( |
12 | assert_fails_with, |
13 | AsynchronousDeferredRunTest, |
14 | - flush_logged_errors, |
15 | ) |
16 | -import transaction |
17 | from twisted.internet import ( |
18 | defer, |
19 | error, |
20 | protocol, |
21 | reactor, |
22 | ) |
23 | -from twisted.python import log |
24 | from twisted.web import xmlrpc |
25 | from zope.component import getUtility |
26 | from zope.security.proxy import removeSecurityProxy |
27 | @@ -88,7 +85,6 @@ from lp.testing import ( |
28 | TestCase, |
29 | ) |
30 | from lp.testing.factory import LaunchpadObjectFactory |
31 | -from lp.testing.fakemethod import FakeMethod |
32 | from lp.testing.layers import ( |
33 | LaunchpadZopelessLayer, |
34 | ZopelessAppServerLayer, |
35 | @@ -224,20 +220,14 @@ class TestWorkerMonitorUnit(TestCase): |
36 | layer = LaunchpadZopelessLayer |
37 | run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20) |
38 | |
39 | - class WorkerMonitor(CodeImportWorkerMonitor): |
40 | - """A subclass of CodeImportWorkerMonitor that stubs logging OOPSes.""" |
41 | - |
42 | - def _logOopsFromFailure(self, failure): |
43 | - log.err(failure) |
44 | - |
45 | - def makeWorkerMonitorWithJob(self, job_id=1, job_data=()): |
46 | - return self.WorkerMonitor( |
47 | + def makeWorkerMonitorWithJob(self, job_id=1, job_data={}): |
48 | + return CodeImportWorkerMonitor( |
49 | job_id, BufferLogger(), |
50 | FakeCodeImportScheduleEndpointProxy({job_id: job_data}), |
51 | "anything") |
52 | |
53 | def makeWorkerMonitorWithoutJob(self, exception=None): |
54 | - return self.WorkerMonitor( |
55 | + return CodeImportWorkerMonitor( |
56 | 1, BufferLogger(), |
57 | FakeCodeImportScheduleEndpointProxy({}, exception), |
58 | None) |
59 | @@ -247,64 +237,11 @@ class TestWorkerMonitorUnit(TestCase): |
60 | # 'arguments' part of what getImportDataForJobID returns. |
61 | args = [self.factory.getUniqueString(), |
62 | self.factory.getUniqueString()] |
63 | - worker_monitor = self.makeWorkerMonitorWithJob(1, (args, 1, 2)) |
64 | - return worker_monitor.getWorkerArguments().addCallback( |
65 | - self.assertEqual, args) |
66 | - |
67 | - def test_getWorkerArguments_dict(self): |
68 | - # getWorkerArguments returns a deferred that fires with the |
69 | - # 'arguments' part of what getImportDataForJobID returns. |
70 | - # (New protocol: data passed as a dict.) |
71 | - args = [self.factory.getUniqueString(), |
72 | - self.factory.getUniqueString()] |
73 | - data = {'arguments': args, 'target_url': 1, 'log_file_name': 2} |
74 | + data = {'arguments': args} |
75 | worker_monitor = self.makeWorkerMonitorWithJob(1, data) |
76 | return worker_monitor.getWorkerArguments().addCallback( |
77 | self.assertEqual, args) |
78 | |
79 | - def test_getWorkerArguments_sets_target_url_and_logfilename(self): |
80 | - # getWorkerArguments sets the _target_url (for use in oops reports) |
81 | - # and _log_file_name (for upload to the librarian) attributes on the |
82 | - # WorkerMonitor from the data returned by getImportDataForJobID. |
83 | - target_url = self.factory.getUniqueString() |
84 | - log_file_name = self.factory.getUniqueString() |
85 | - worker_monitor = self.makeWorkerMonitorWithJob( |
86 | - 1, (['a'], target_url, log_file_name)) |
87 | - |
88 | - def check_branch_log(ignored): |
89 | - # Looking at the _ attributes here is in slightly poor taste, but |
90 | - # much much easier than them by logging and parsing an oops, etc. |
91 | - self.assertEqual( |
92 | - (target_url, log_file_name), |
93 | - (worker_monitor._target_url, worker_monitor._log_file_name)) |
94 | - |
95 | - return worker_monitor.getWorkerArguments().addCallback( |
96 | - check_branch_log) |
97 | - |
98 | - def test_getWorkerArguments_sets_target_url_and_logfilename_dict(self): |
99 | - # getWorkerArguments sets the _target_url (for use in oops reports) |
100 | - # and _log_file_name (for upload to the librarian) attributes on the |
101 | - # WorkerMonitor from the data returned by getImportDataForJobID. |
102 | - # (New protocol: data passed as a dict.) |
103 | - target_url = self.factory.getUniqueString() |
104 | - log_file_name = self.factory.getUniqueString() |
105 | - data = { |
106 | - 'arguments': ['a'], |
107 | - 'target_url': target_url, |
108 | - 'log_file_name': log_file_name, |
109 | - } |
110 | - worker_monitor = self.makeWorkerMonitorWithJob(1, data) |
111 | - |
112 | - def check_branch_log(ignored): |
113 | - # Looking at the _ attributes here is in slightly poor taste, but |
114 | - # much much easier than them by logging and parsing an oops, etc. |
115 | - self.assertEqual( |
116 | - (target_url, log_file_name), |
117 | - (worker_monitor._target_url, worker_monitor._log_file_name)) |
118 | - |
119 | - return worker_monitor.getWorkerArguments().addCallback( |
120 | - check_branch_log) |
121 | - |
122 | def test_getWorkerArguments_job_not_found_raises_exit_quietly(self): |
123 | # When getImportDataForJobID signals a fault indicating that |
124 | # getWorkerArguments didn't find the supplied job, getWorkerArguments |
125 | @@ -345,61 +282,34 @@ class TestWorkerMonitorUnit(TestCase): |
126 | |
127 | def test_finishJob_calls_finishJobID_empty_log_file(self): |
128 | # When the log file is empty, finishJob calls finishJobID with the |
129 | - # name of the status enum and an empty string to indicate that no log |
130 | - # file was uplaoded to the librarian. |
131 | + # name of the status enum and an empty binary string. |
132 | job_id = self.factory.getUniqueInteger() |
133 | worker_monitor = self.makeWorkerMonitorWithJob(job_id) |
134 | self.assertEqual(worker_monitor._log_file.tell(), 0) |
135 | |
136 | def check_finishJob_called(result): |
137 | self.assertEqual( |
138 | - [('finishJobID', job_id, 'SUCCESS', '')], |
139 | + [('finishJobID', job_id, 'SUCCESS', |
140 | + xmlrpc_client.Binary(b''))], |
141 | worker_monitor.codeimport_endpoint.calls) |
142 | |
143 | return worker_monitor.finishJob( |
144 | CodeImportResultStatus.SUCCESS).addCallback( |
145 | check_finishJob_called) |
146 | |
147 | - def test_finishJob_uploads_nonempty_file_to_librarian(self): |
148 | - # finishJob method uploads the log file to the librarian and calls the |
149 | - # finishJobID XML-RPC method with the url of that file. |
150 | - self.layer.force_dirty_database() |
151 | - log_bytes = self.factory.getUniqueBytes() |
152 | - worker_monitor = self.makeWorkerMonitorWithJob() |
153 | - worker_monitor._log_file.write(log_bytes) |
154 | - |
155 | - def check_file_uploaded(result): |
156 | - transaction.abort() |
157 | - url = worker_monitor.codeimport_endpoint.calls[0][3] |
158 | - got_log_bytes = urlopen(url).read() |
159 | - self.assertEqual(log_bytes, got_log_bytes) |
160 | - |
161 | - return worker_monitor.finishJob( |
162 | - CodeImportResultStatus.SUCCESS).addCallback( |
163 | - check_file_uploaded) |
164 | - |
165 | - @suppress_stderr |
166 | - def test_finishJob_still_calls_finishJobID_if_upload_fails(self): |
167 | - # If the upload to the librarian fails for any reason, the worker |
168 | - # monitor still calls the finishJobID XML-RPC method, but logs an |
169 | - # error to indicate there was a problem. |
170 | - class Fail(Exception): |
171 | - """Some arbitrary failure.""" |
172 | - |
173 | - # Write some text so that we try to upload the log. |
174 | + def test_finishJob_sends_nonempty_file_to_scheduler(self): |
175 | + # finishJob method calls finishJobID with the contents of the log |
176 | + # file. |
177 | job_id = self.factory.getUniqueInteger() |
178 | + log_bytes = self.factory.getUniqueBytes() |
179 | worker_monitor = self.makeWorkerMonitorWithJob(job_id) |
180 | - worker_monitor._log_file.write(b'some text') |
181 | - |
182 | - # Make _createLibrarianFileAlias fail in a distinctive way. |
183 | - worker_monitor._createLibrarianFileAlias = FakeMethod(failure=Fail()) |
184 | + worker_monitor._log_file.write(log_bytes) |
185 | |
186 | def check_finishJob_called(result): |
187 | self.assertEqual( |
188 | - [('finishJobID', job_id, 'SUCCESS', '')], |
189 | + [('finishJobID', job_id, 'SUCCESS', |
190 | + xmlrpc_client.Binary(log_bytes))], |
191 | worker_monitor.codeimport_endpoint.calls) |
192 | - errors = flush_logged_errors(Fail) |
193 | - self.assertEqual(1, len(errors)) |
194 | |
195 | return worker_monitor.finishJob( |
196 | CodeImportResultStatus.SUCCESS).addCallback( |
197 | @@ -578,7 +488,7 @@ class TestWorkerMonitorRunNoProcess(TestCase): |
198 | |
199 | def __init__(self, process_deferred, has_job=True): |
200 | if has_job: |
201 | - job_data = {1: ([], '', '')} |
202 | + job_data = {1: {'arguments': []}} |
203 | else: |
204 | job_data = {} |
205 | CodeImportWorkerMonitor.__init__( |
206 | diff --git a/lib/lp/codehosting/codeimport/workermonitor.py b/lib/lp/codehosting/codeimport/workermonitor.py |
207 | index 1fc9ec2..6fa304b 100644 |
208 | --- a/lib/lp/codehosting/codeimport/workermonitor.py |
209 | +++ b/lib/lp/codehosting/codeimport/workermonitor.py |
210 | @@ -13,6 +13,7 @@ import os |
211 | import tempfile |
212 | |
213 | import six |
214 | +from six.moves import xmlrpc_client |
215 | from twisted.internet import ( |
216 | defer, |
217 | error, |
218 | @@ -21,16 +22,13 @@ from twisted.internet import ( |
219 | ) |
220 | from twisted.python import failure |
221 | from twisted.web import xmlrpc |
222 | -from zope.component import getUtility |
223 | |
224 | from lp.code.enums import CodeImportResultStatus |
225 | from lp.codehosting.codeimport.worker import CodeImportWorkerExitCode |
226 | from lp.services.config import config |
227 | -from lp.services.librarian.interfaces.client import IFileUploadClient |
228 | from lp.services.twistedsupport.processmonitor import ( |
229 | ProcessMonitorProtocolWithTimeout, |
230 | ) |
231 | -from lp.services.webapp import errorlog |
232 | from lp.xmlrpc.faults import NoSuchCodeImportJob |
233 | |
234 | |
235 | @@ -137,30 +135,8 @@ class CodeImportWorkerMonitor: |
236 | self.codeimport_endpoint = codeimport_endpoint |
237 | self._call_finish_job = True |
238 | self._log_file = tempfile.TemporaryFile() |
239 | - self._target_url = None |
240 | - self._log_file_name = 'no-name-set.txt' |
241 | self._access_policy = access_policy |
242 | |
243 | - def _logOopsFromFailure(self, failure): |
244 | - config = errorlog.globalErrorUtility._oops_config |
245 | - context = { |
246 | - 'twisted_failure': failure, |
247 | - 'http_request': errorlog.ScriptRequest( |
248 | - [('code_import_job_id', self._job_id)], self._target_url), |
249 | - } |
250 | - report = config.create(context) |
251 | - |
252 | - def log_oops_if_published(ids): |
253 | - if ids: |
254 | - self._logger.info( |
255 | - "Logged OOPS id %s: %s: %s", |
256 | - report['id'], report.get('type', 'No exception type'), |
257 | - report.get('value', 'No exception value')) |
258 | - |
259 | - d = config.publish(report) |
260 | - d.addCallback(log_oops_if_published) |
261 | - return d |
262 | - |
263 | def _trap_nosuchcodeimportjob(self, failure): |
264 | failure.trap(xmlrpc.Fault) |
265 | if failure.value.faultCode == NoSuchCodeImportJob.error_code: |
266 | @@ -170,25 +146,12 @@ class CodeImportWorkerMonitor: |
267 | raise failure.value |
268 | |
269 | def getWorkerArguments(self): |
270 | - """Get arguments for the worker for the import we are working on. |
271 | - |
272 | - This also sets the _target_url and _log_file_name attributes for use |
273 | - in the _logOopsFromFailure and finishJob methods respectively. |
274 | - """ |
275 | + """Get arguments for the worker for the import we are working on.""" |
276 | deferred = self.codeimport_endpoint.callRemote( |
277 | 'getImportDataForJobID', self._job_id) |
278 | |
279 | def _processResult(result): |
280 | - if isinstance(result, dict): |
281 | - code_import_arguments = result['arguments'] |
282 | - target_url = result['target_url'] |
283 | - log_file_name = result['log_file_name'] |
284 | - else: |
285 | - # XXX cjwatson 2018-03-15: Remove once the scheduler always |
286 | - # sends a dict. |
287 | - code_import_arguments, target_url, log_file_name = result |
288 | - self._target_url = target_url |
289 | - self._log_file_name = log_file_name |
290 | + code_import_arguments = result['arguments'] |
291 | self._logger.info( |
292 | 'Found source details: %s', code_import_arguments) |
293 | return code_import_arguments |
294 | @@ -206,40 +169,12 @@ class CodeImportWorkerMonitor: |
295 | six.ensure_text(tail, errors='replace')) |
296 | return deferred.addErrback(self._trap_nosuchcodeimportjob) |
297 | |
298 | - def _createLibrarianFileAlias(self, name, size, file, contentType): |
299 | - """Call `IFileUploadClient.remoteAddFile` with the given parameters. |
300 | - |
301 | - This is a separate method that exists only to be patched in tests. |
302 | - """ |
303 | - # This blocks, but never mind: nothing else is going on in the process |
304 | - # by this point. We could dispatch to a thread if we felt like it, or |
305 | - # even come up with an asynchronous implementation of the librarian |
306 | - # protocol (it's not very complicated). |
307 | - return getUtility(IFileUploadClient).remoteAddFile( |
308 | - name, size, file, contentType) |
309 | - |
310 | def finishJob(self, status): |
311 | - """Call the finishJobID method for the job we are working on. |
312 | - |
313 | - This method uploads the log file to the librarian first. |
314 | - """ |
315 | - log_file_size = self._log_file.tell() |
316 | - if log_file_size > 0: |
317 | - self._log_file.seek(0) |
318 | - try: |
319 | - log_file_alias_url = self._createLibrarianFileAlias( |
320 | - self._log_file_name, log_file_size, self._log_file, |
321 | - 'text/plain') |
322 | - self._logger.info( |
323 | - "Uploaded logs to librarian %s.", log_file_alias_url) |
324 | - except: |
325 | - self._logger.error("Upload to librarian failed.") |
326 | - self._logOopsFromFailure(failure.Failure()) |
327 | - log_file_alias_url = '' |
328 | - else: |
329 | - log_file_alias_url = '' |
330 | + """Call the finishJobID method for the job we are working on.""" |
331 | + self._log_file.seek(0) |
332 | return self.codeimport_endpoint.callRemote( |
333 | - 'finishJobID', self._job_id, status.name, log_file_alias_url |
334 | + 'finishJobID', self._job_id, status.name, |
335 | + xmlrpc_client.Binary(self._log_file.read()) |
336 | ).addErrback(self._trap_nosuchcodeimportjob) |
337 | |
338 | def _makeProcessProtocol(self, deferred): |