Merge lp:~doanac/ubuntu-ci-services-itself/runworker-basic into lp:ubuntu-ci-services-itself

Proposed by Andy Doan
Status: Merged
Approved by: Andy Doan
Approved revision: 343
Merged at revision: 356
Proposed branch: lp:~doanac/ubuntu-ci-services-itself/runworker-basic
Merge into: lp:ubuntu-ci-services-itself
Diff against target: 304 lines (+295/-0)
2 files modified
ci-utils/ci_utils/amqp_worker.py (+124/-0)
ci-utils/ci_utils/tests/test_amqp_worker.py (+171/-0)
To merge this branch: bzr merge lp:~doanac/ubuntu-ci-services-itself/runworker-basic
Reviewer Review Type Date Requested Status
Vincent Ladeuil (community) Approve
PS Jenkins bot (community) continuous-integration Approve
Review via email: mp+210076@code.launchpad.net

Commit message

run-worker: create a new common run worker

This combines the functionality and error cases of the 3 run-workers
we have into a common base class. It also includes actual test cases
so that its much easier to work on changes.

Description of the change

Part 1 of a 3 part series:

In preparation for run-worker scripts that can be cancelled, i took a brief detour to create a common base class for run-workers that handles all the dirty details they were dealing with. This also has a nice side-effect of being written in a way that we can easily test the hard parts.

I'm trying to keep this in digestible chunks. The main thing I'm hoping people to see here is the "def _on_message(self, msg)" logic. I think it covers the error cases we know of for the bsbuilder, image-builder, and test-runner. And I *think* I've covered all those corners with tests.

To see what this will look like for the worker scripts, you might want to fast forward to part 3 of my series:

 <https://code.launchpad.net/~doanac/ubuntu-ci-services-itself/run-worker-conversion/+merge/210078>

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :

PASSED: Continuous integration, rev:342
http://s-jenkins.ubuntu-ci:8080/job/uci-engine-ci/351/
Executed test runs:

Click here to trigger a rebuild:
http://s-jenkins.ubuntu-ci:8080/job/uci-engine-ci/351/rebuild

review: Approve (continuous-integration)
Revision history for this message
Vincent Ladeuil (vila) wrote :

78 + if not store or not logger:
79 + return

I think we also want to not store the logger if there is nothing there. So:

  log_content = logger.getvalue()
  if not log_content:
      return

102 + amqp_cb, ret = self.handle_request(logger, params)
103 + self._store_worker_log(store, logger, ret)
104 + amqp_cb(trigger, ret)

Pfew, that's dense, took me some back and forth to understand:

ampq_cb is the callback for [n]acking the msg
ret is something I can't define by reading the MP ;-p

Can you document that in handle_request() ? That's what the AMQPWorker users
will need.

The try/except/finally in _on_message() is still big but it makes more sense now. I would love some more comments as this code is quite complex but it's not a blocker.

And that's all I have to say ?

Wow, I had to re-read a few bits but really the above are nits ;)

Nice.

+1

review: Approve
343. By Andy Doan

review comments

fix some suggestions by vila

Revision history for this message
Andy Doan (doanac) wrote :

vila - i know you already +1'd this, but want to take a look at revno 343 to see if it helps sufficiently?

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :

PASSED: Continuous integration, rev:343
http://s-jenkins.ubuntu-ci:8080/job/uci-engine-ci/366/
Executed test runs:

Click here to trigger a rebuild:
http://s-jenkins.ubuntu-ci:8080/job/uci-engine-ci/366/rebuild

review: Approve (continuous-integration)
Revision history for this message
Vincent Ladeuil (vila) wrote :

> vila - i know you already +1'd this, but want to take a look at revno 343 to
> see if it helps sufficiently?

Definitely.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'ci-utils/ci_utils/amqp_worker.py'
2--- ci-utils/ci_utils/amqp_worker.py 1970-01-01 00:00:00 +0000
3+++ ci-utils/ci_utils/amqp_worker.py 2014-03-10 23:10:31 +0000
4@@ -0,0 +1,124 @@
5+# Ubuntu CI Engine
6+# Copyright 2013, 2014 Canonical Ltd.
7+
8+# This program is free software: you can redistribute it and/or modify it
9+# under the terms of the GNU Affero General Public License version 3, as
10+# published by the Free Software Foundation.
11+
12+# This program is distributed in the hope that it will be useful, but
13+# WITHOUT ANY WARRANTY; without even the implied warranties of
14+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
15+# PURPOSE. See the GNU Affero General Public License for more details.
16+
17+# You should have received a copy of the GNU Affero General Public License
18+# along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+import json
21+import logging
22+import os
23+import StringIO
24+import traceback
25+import yaml
26+
27+from ci_utils import amqp_utils, data_store, dump_stack
28+
29+logging.basicConfig(level=logging.INFO)
30+log = logging.getLogger(__name__)
31+
32+
33+class AMQPWorker(object):
34+ '''Base class that handles the more complex issues of a rabbit worker.
35+
36+ This allows a user to subclass this and provide a custom "handle_request"
37+ method.
38+ '''
39+
40+ def handle_request(self, logger, params):
41+ '''To be implemented by the subclass to do their work
42+
43+ amqp_cb should either be one of:
44+ * amqp_utils.progress_failed
45+ * amqp_utils.progress_comple
46+ :return: tuple(ampq_cb, return_value_dictionary)
47+ '''
48+ raise NotImplementedError
49+
50+ def main(self, queue):
51+ dump_stack.install_stack_dump_signal()
52+ config = amqp_utils.get_config()
53+ if not config:
54+ exit(1) # the get_config code prints an error
55+ amqp_utils.process_queue(config, queue, self._on_message)
56+
57+ def _create_worker_logger(self, trigger):
58+ '''Create a logger that captures output into StingIO
59+
60+ This provides an easy way to let workers get free logging that will
61+ get added to the ticket.
62+ '''
63+ log = logging.getLogger(trigger)
64+ log.buffer = StringIO.StringIO()
65+ logstream = logging.StreamHandler(log.buffer)
66+ formatter = logging.Formatter(
67+ '[%(asctime)s] %(name)s:%(levelname)s:%(message)s')
68+ logstream.setFormatter(formatter)
69+ logstream.setLevel(logging.INFO)
70+ log.addHandler(logstream)
71+ return log
72+
73+ def _create_data_store(self, ticket):
74+ fname = os.path.join(os.path.dirname(__file__), '../../unit_config')
75+ with open(fname) as f:
76+ config = yaml.safe_load(f)
77+ return data_store.create_for_ticket(ticket, config)
78+
79+ def _store_worker_log(self, store, logger, retval):
80+ '''An exception safe mechanism to upload log files
81+
82+ This needs to catch any exception so that progress complete/fail
83+ call can be made on the queue
84+ '''
85+ if not store or not logger:
86+ return
87+ content = logger.buffer.getvalue()
88+ if not content:
89+ log.info('no logging content from action, skipping upload')
90+ try:
91+ name = '{}.log'.format(logger.name)
92+ url = store.put_file(name, content, 'text/plain')
93+ retval.setdefault('artifacts', []).append({
94+ 'name': name,
95+ 'reference': url,
96+ 'type': 'LOGS',
97+ })
98+ except:
99+ log.exception('unable to store worker log')
100+
101+ def _on_message(self, msg):
102+ log.info('on_message: %s', msg.body)
103+ ret = {}
104+ store = logger = trigger = None
105+ try:
106+ params = json.loads(msg.body)
107+ trigger = params['progress_trigger']
108+ amqp_utils.progress_update(trigger, params)
109+ store = self._create_data_store(params['ticket_id'])
110+
111+ logger = self._create_worker_logger(trigger)
112+ amqp_cb, ret = self.handle_request(logger, params)
113+ self._store_worker_log(store, logger, ret)
114+ amqp_cb(trigger, ret)
115+ except (KeyboardInterrupt, Exception) as e:
116+ err = 'Unexpected exception occurred'
117+ if isinstance(e, KeyboardInterrupt):
118+ err = 'Worker terminated'
119+ ret['error_message'] = err
120+ ret['traceback'] = traceback.format_exc()
121+ self._store_worker_log(store, logger, ret)
122+ if trigger:
123+ amqp_utils.progress_failed(trigger, ret)
124+ log.exception(err)
125+ if isinstance(e, KeyboardInterrupt):
126+ raise # re-raise so amqp_utils.process_queue can exit
127+ finally:
128+ msg.channel.basic_ack(msg.delivery_tag)
129
130=== added file 'ci-utils/ci_utils/tests/test_amqp_worker.py'
131--- ci-utils/ci_utils/tests/test_amqp_worker.py 1970-01-01 00:00:00 +0000
132+++ ci-utils/ci_utils/tests/test_amqp_worker.py 2014-03-10 23:10:31 +0000
133@@ -0,0 +1,171 @@
134+# Ubuntu CI Engine
135+# Copyright 2013 Canonical Ltd.
136+
137+# This program is free software: you can redistribute it and/or modify it
138+# under the terms of the GNU Affero General Public License version 3, as
139+# published by the Free Software Foundation.
140+
141+# This program is distributed in the hope that it will be useful, but
142+# WITHOUT ANY WARRANTY; without even the implied warranties of
143+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
144+# PURPOSE. See the GNU Affero General Public License for more details.
145+
146+# You should have received a copy of the GNU Affero General Public License
147+# along with this program. If not, see <http://www.gnu.org/licenses/>.
148+
149+import json
150+import unittest
151+
152+import mock
153+
154+from ci_utils import amqp_worker
155+
156+
157+class _worker(amqp_worker.AMQPWorker):
158+ '''simple class to help mock the datastore api used in the worker'''
159+ def __init__(self, amqp_utils, retval, exception=None, error_msg=None):
160+ super(_worker, self).__init__()
161+ self.store = mock.Mock()
162+ if amqp_utils:
163+ self.amqp_utils = amqp_utils
164+ self.retval = retval
165+ self.exception = exception
166+ self.error_msg = error_msg
167+
168+ def _create_data_store(self, ticket):
169+ return self.store
170+
171+ def handle_request(self, log, params):
172+ log.info('called')
173+ self.params = params
174+ if self.exception:
175+ raise self.exception(self.error_msg)
176+ if self.error_msg:
177+ return self.amqp_utils.progress_failed, self.retval
178+ return self.amqp_utils.progress_completed, self.retval
179+
180+
181+class TestAMQPWorker(unittest.TestCase):
182+ @mock.patch('ci_utils.amqp_worker.amqp_utils')
183+ def testOnMessageSimple(self, amqp_utils):
184+ '''Test simple on message logic'''
185+ msg = mock.Mock()
186+ msg.delivery_tag = 'foo'
187+ msg.body = json.dumps({
188+ 'ticket_id': 1,
189+ 'progress_trigger': 'queue-name',
190+ 'param1': 'foo',
191+ 'param2': 42,
192+ })
193+ retval = {'msg': 'this is the worker return value'}
194+ worker = _worker(amqp_utils, retval)
195+ worker._on_message(msg)
196+ self.assertEqual(worker.params['param1'], 'foo')
197+ self.assertEqual(worker.params['param2'], 42)
198+ self.assertEqual(1, amqp_utils.progress_update.call_count)
199+ msg.channel.basic_ack.assert_called_once_with('foo')
200+ self.assertEqual(0, amqp_utils.progress_failed.call_count)
201+ self.assertEqual(1, worker.store.put_file.call_count)
202+ amqp_utils.progress_completed.assert_called_once_with(
203+ 'queue-name', retval)
204+ # assert_called_once_with works, but doesn't check the values
205+ # added to retval
206+ retval = amqp_utils.progress_completed.call_args[0][1]
207+ self.assertEqual(1, len(retval['artifacts']))
208+ self.assertEqual('queue-name.log', retval['artifacts'][0]['name'])
209+
210+ @mock.patch('ci_utils.amqp_worker.amqp_utils')
211+ def testOnMessageFail(self, amqp_utils):
212+ '''Test on message logic for a failure handled by the worker'''
213+ msg = mock.Mock()
214+ msg.delivery_tag = 'foo'
215+ msg.body = json.dumps({
216+ 'ticket_id': 1,
217+ 'progress_trigger': 'queue-name',
218+ 'param1': 'foo',
219+ 'param2': 42,
220+ })
221+ retval = {'msg': 'this is the worker return value'}
222+ worker = _worker(amqp_utils, retval, None, True)
223+ worker._on_message(msg)
224+ self.assertEqual(worker.params['param1'], 'foo')
225+ self.assertEqual(worker.params['param2'], 42)
226+ self.assertEqual(1, amqp_utils.progress_update.call_count)
227+ msg.channel.basic_ack.assert_called_once_with('foo')
228+ amqp_utils.progress_failed.assert_called_once_with(
229+ 'queue-name', retval)
230+ self.assertEqual(0, amqp_utils.progress_completed.call_count)
231+ # assert_called_once_with works, but doesn't check the values
232+ # added to retval
233+ retval = amqp_utils.progress_failed.call_args[0][1]
234+ self.assertEqual(1, len(retval['artifacts']))
235+ self.assertEqual('queue-name.log', retval['artifacts'][0]['name'])
236+
237+ @mock.patch('ci_utils.amqp_worker.amqp_utils')
238+ def testOnMessageUnexpected(self, amqp_utils):
239+ '''Test on message logic for an unexpected failure'''
240+ msg = mock.Mock()
241+ msg.delivery_tag = 'foo'
242+ msg.body = json.dumps({
243+ 'ticket_id': 1,
244+ 'progress_trigger': 'queue-name',
245+ })
246+ err = 'this is the worker error'
247+ worker = _worker(None, None, RuntimeError, err)
248+ worker._on_message(msg)
249+ self.assertEqual(1, amqp_utils.progress_update.call_count)
250+ msg.channel.basic_ack.assert_called_once_with('foo')
251+ self.assertEqual(1, amqp_utils.progress_failed.call_count)
252+ self.assertEqual(
253+ 'queue-name', amqp_utils.progress_failed.call_args[0][0])
254+ self.assertIn(
255+ err, amqp_utils.progress_failed.call_args[0][1]['traceback'])
256+ self.assertEqual(1, worker.store.put_file.call_count)
257+
258+ @mock.patch('ci_utils.amqp_worker.amqp_utils')
259+ def testOnMessageKilled(self, amqp_utils):
260+ '''Test on message logic for handling ctrl-c (upstart stopping)'''
261+ msg = mock.Mock()
262+ msg.delivery_tag = 'foo'
263+ msg.body = json.dumps({
264+ 'ticket_id': 1,
265+ 'progress_trigger': 'queue-name',
266+ })
267+ worker = _worker(None, None, KeyboardInterrupt)
268+ with self.assertRaises(KeyboardInterrupt):
269+ worker._on_message(msg)
270+ self.assertEqual(1, amqp_utils.progress_update.call_count)
271+ msg.channel.basic_ack.assert_called_once_with('foo')
272+ self.assertEqual(1, amqp_utils.progress_failed.call_count)
273+ self.assertEqual(
274+ 'queue-name', amqp_utils.progress_failed.call_args[0][0])
275+ self.assertIn('KeyboardInterrupt',
276+ amqp_utils.progress_failed.call_args[0][1]['traceback'])
277+ self.assertEqual(1, worker.store.put_file.call_count)
278+
279+ @mock.patch('ci_utils.amqp_worker.amqp_utils')
280+ def testNoTicket(self, amqp_utils):
281+ '''Ensure we can gracefully deal with a bad message in the queue'''
282+ msg = mock.Mock()
283+ msg.delivery_tag = 'foo'
284+ msg.body = json.dumps({
285+ 'progress_trigger': 'queue-name',
286+ })
287+ worker = _worker(amqp_utils, 1)
288+ worker._on_message(msg)
289+ msg.channel.basic_ack.assert_called_once_with('foo')
290+ self.assertEqual(1, amqp_utils.progress_failed.call_count)
291+ self.assertEqual(
292+ 'queue-name', amqp_utils.progress_failed.call_args[0][0])
293+
294+ @mock.patch('ci_utils.amqp_worker.amqp_utils')
295+ def testNoQueue(self, amqp_utils):
296+ '''Ensure we can gracefully deal with a bad message in the queue'''
297+ msg = mock.Mock()
298+ msg.delivery_tag = 'foo'
299+ msg.body = json.dumps({
300+ })
301+ worker = _worker(amqp_utils, 1)
302+ worker._on_message(msg)
303+ # all we can check is that the message got acked
304+ msg.channel.basic_ack.assert_called_once_with('foo')

Subscribers

People subscribed via source and target branches