Merge lp:~doanac/ubuntu-ci-services-itself/runworker-cancellable into lp:ubuntu-ci-services-itself

Proposed by Andy Doan
Status: Merged
Approved by: Andy Doan
Approved revision: 343
Merged at revision: 357
Proposed branch: lp:~doanac/ubuntu-ci-services-itself/runworker-cancellable
Merge into: lp:ubuntu-ci-services-itself
Prerequisite: lp:~doanac/ubuntu-ci-services-itself/runworker-basic
Diff against target: 177 lines (+109/-1)
2 files modified
ci-utils/ci_utils/amqp_worker.py (+62/-1)
ci-utils/ci_utils/tests/test_amqp_worker.py (+47/-0)
To merge this branch: bzr merge lp:~doanac/ubuntu-ci-services-itself/runworker-cancellable
Reviewer Review Type Date Requested Status
Vincent Ladeuil (community) Approve
PS Jenkins bot (community) continuous-integration Approve
Review via email: mp+210077@code.launchpad.net

Commit message

run-worker: add cancelling logic

This provides "advisory" cancelling logic to the run-worker base. If
subclasses choose, they may periodically check the state of

   params.get('cancelled')

to see if they should clean up and exit

Description of the change

Part 2 of my 3 part series.

Adds logic for determining if a worker's corresponding jenkins job has been cancelled. The mechanism currently used is purely advisory for now (ie - the worker has the option to try and honor this or not).

We could enhance this to allow workers to specify they want a more forceful cancel option (ie sigterm ourselves or something). However, I think this is probably a better first step and keeps things easier to review.

I'm not a huge fan of tests that use time.sleep, but I really wanted to test this really worked without overly mocking it.

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :

PASSED: Continuous integration, rev:343
http://s-jenkins.ubuntu-ci:8080/job/uci-engine-ci/350/
Executed test runs:

Click here to trigger a rebuild:
http://s-jenkins.ubuntu-ci:8080/job/uci-engine-ci/350/rebuild

review: Approve (continuous-integration)
Revision history for this message
Vincent Ladeuil (vila) wrote :

57 + # how often to check if the current task should be cancelled
58 + cancel_interval = 120

We'll probably want a config option there.

84 + t.start()
85 + return t

You don't check that the thread has really started (you would need an
additional even for that), we don't really care here.

158 + self.assertTrue(retval['cancelled'])

But you have a race here. That test should sleep at least as long the
cancel_interval or it could fail because the thread don't get enough time to
reach the code that set cancelled to True.

On the overall I'm not a big fan of that approach, I would rather see
SIGALARM be used to be able to raise an exception that could interrupt
handle_message.

Here, we're requiring collaboration from the request handler and delegating
the control to the url.

I approve nevertheless in case I don't get the use cases clearly and advisory is enough, happy to discuss more otherwise.

review: Approve
Revision history for this message
Andy Doan (doanac) wrote :

On 03/10/2014 01:14 PM, Vincent Ladeuil wrote:
> 158 + self.assertTrue(retval['cancelled'])
>
> But you have a race here. That test should sleep at least as long the
> cancel_interval or it could fail because the thread don't get enough time to
> reach the code that set cancelled to True.

Taking a more complete context:

153 + worker._on_message(msg)
154 + msg.channel.basic_ack.assert_called_once_with('foo')
155 + amqp_utils.progress_completed.assert_called_once_with(
156 + 'queue-name', retval)
157 + retval = amqp_utils.progress_completed.call_args[0][1]
158 + self.assertTrue(retval['cancelled'])

_on_message is actually a blocking call that will be dealing with the
threading. This test is just checking that we send a "canceled" value
back to the lander so that viewing the error logs would indicate this.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ci-utils/ci_utils/amqp_worker.py'
2--- ci-utils/ci_utils/amqp_worker.py 2014-03-09 06:27:58 +0000
3+++ ci-utils/ci_utils/amqp_worker.py 2014-03-09 06:27:58 +0000
4@@ -17,7 +17,9 @@
5 import logging
6 import os
7 import StringIO
8+import threading
9 import traceback
10+import urllib2
11 import yaml
12
13 from ci_utils import amqp_utils, data_store, dump_stack
14@@ -26,6 +28,38 @@
15 log = logging.getLogger(__name__)
16
17
18+def _get(url):
19+ try:
20+ resp = urllib2.urlopen(url)
21+ resp = resp.read()
22+ return json.loads(resp)
23+ except:
24+ logging.exception('error checking url')
25+ return {}
26+
27+
28+class _timer(threading.Thread):
29+ '''similar to threading.Timer, but repeats until cancel is called'''
30+ def __init__(self, interval, cb):
31+ super(_timer, self).__init__()
32+ self.interval = interval
33+ self.cb = cb
34+ self.finished = threading.Event()
35+ self.exit = False
36+
37+ def cancel(self):
38+ """Stop the timer if it hasn't finished yet"""
39+ self.finished.set()
40+ self.exit = True
41+
42+ def run(self):
43+ while not self.exit:
44+ self.finished.wait(self.interval)
45+ if not self.finished.is_set():
46+ self.cb()
47+ self.finished.set()
48+
49+
50 class AMQPWorker(object):
51 '''Base class that handles the more complex issues of a rabbit worker.
52
53@@ -33,6 +67,9 @@
54 method.
55 '''
56
57+ # how often to check if the current task should be cancelled
58+ cancel_interval = 120
59+
60 def handle_request(self, logger, params):
61 raise NotImplementedError
62
63@@ -84,16 +121,38 @@
64 except:
65 log.exception('unable to store worker log')
66
67+ def _handle_cancel(self, params):
68+ '''provide an advisory mechansim to inform the worker to stop
69+
70+ This checks a URL to determine if the job has been cancelled. If so,
71+ it updates the "params" object with a params['cancelled'] = True. Thus
72+ this is an advisory mechansim that the worker can choose how/if to
73+ deal with.
74+ '''
75+ url = params.get('cancel_url')
76+ if not url:
77+ return None
78+
79+ def check_url():
80+ data = _get(url)
81+ if not data.get('building', True):
82+ params['cancelled'] = True
83+ t = _timer(self.cancel_interval, check_url)
84+ t.start()
85+ return t
86+
87 def _on_message(self, msg):
88 log.info('on_message: %s', msg.body)
89 ret = {}
90- store = logger = trigger = None
91+ store = logger = trigger = cancel_thread = None
92 try:
93 params = json.loads(msg.body)
94 trigger = params['progress_trigger']
95 amqp_utils.progress_update(trigger, params)
96 store = self._create_data_store(params['ticket_id'])
97
98+ cancel_thread = self._handle_cancel(params)
99+
100 logger = self._create_worker_logger(trigger)
101 amqp_cb, ret = self.handle_request(logger, params)
102 self._store_worker_log(store, logger, ret)
103@@ -112,3 +171,5 @@
104 raise # re-raise so amqp_utils.process_queue can exit
105 finally:
106 msg.channel.basic_ack(msg.delivery_tag)
107+ if cancel_thread:
108+ cancel_thread.cancel()
109
110=== modified file 'ci-utils/ci_utils/tests/test_amqp_worker.py'
111--- ci-utils/ci_utils/tests/test_amqp_worker.py 2014-03-09 06:27:58 +0000
112+++ ci-utils/ci_utils/tests/test_amqp_worker.py 2014-03-09 06:27:58 +0000
113@@ -14,6 +14,7 @@
114 # along with this program. If not, see <http://www.gnu.org/licenses/>.
115
116 import json
117+import time
118 import unittest
119
120 import mock
121@@ -42,6 +43,9 @@
122 raise self.exception(self.error_msg)
123 if self.error_msg:
124 return self.amqp_utils.progress_failed, self.retval
125+ if params.get('cancel_url'):
126+ time.sleep(0.1)
127+ self.retval['cancelled'] = params['cancelled']
128 return self.amqp_utils.progress_completed, self.retval
129
130
131@@ -169,3 +173,46 @@
132 worker._on_message(msg)
133 # all we can check is that the message got acked
134 msg.channel.basic_ack.assert_called_once_with('foo')
135+
136+ @mock.patch('ci_utils.amqp_worker.amqp_utils')
137+ @mock.patch('ci_utils.amqp_worker._get')
138+ def testCancel(self, get, amqp_utils):
139+ '''Ensure workers get the "cancel" message'''
140+ msg = mock.Mock()
141+ msg.delivery_tag = 'foo'
142+ msg.body = json.dumps({
143+ 'ticket_id': 1,
144+ 'progress_trigger': 'queue-name',
145+ 'param1': 'foo',
146+ 'param2': 42,
147+ 'cancel_url': 'foo'
148+ })
149+ get.return_value = {'building': False}
150+ retval = {'msg': 'this is the worker return value'}
151+ worker = _worker(amqp_utils, retval)
152+ worker.cancel_interval = 0.01
153+ worker._on_message(msg)
154+ msg.channel.basic_ack.assert_called_once_with('foo')
155+ amqp_utils.progress_completed.assert_called_once_with(
156+ 'queue-name', retval)
157+ retval = amqp_utils.progress_completed.call_args[0][1]
158+ self.assertTrue(retval['cancelled'])
159+
160+
161+class TestTimer(unittest.TestCase):
162+ def testCanCancel(self):
163+ def cb():
164+ raise RuntimeError('cancel failed')
165+ t = amqp_worker._timer(1, cb)
166+ t.start()
167+ t.cancel()
168+
169+ def testCBRuns(self):
170+ def cb():
171+ self.run = True
172+ self.run = False
173+ t = amqp_worker._timer(0.01, cb)
174+ t.start()
175+ time.sleep(.02)
176+ t.cancel()
177+ self.assertTrue(self.run)

Subscribers

People subscribed via source and target branches