Merge lp:~adeuring/lazr.jobrunner/bug1015667-2 into lp:lazr.jobrunner

Proposed by Abel Deuring
Status: Merged
Approved by: Aaron Bentley
Approved revision: 44
Merged at revision: 43
Proposed branch: lp:~adeuring/lazr.jobrunner/bug1015667-2
Merge into: lp:lazr.jobrunner
Diff against target: 199 lines (+61/-37)
5 files modified
src/lazr/jobrunner/celerytask.py (+4/-3)
src/lazr/jobrunner/tests/config1.py (+1/-4)
src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py (+2/-0)
src/lazr/jobrunner/tests/simple_config.py (+4/-0)
src/lazr/jobrunner/tests/test_celerytask.py (+50/-30)
To merge this branch: bzr merge lp:~adeuring/lazr.jobrunner/bug1015667-2
Reviewer Review Type Date Requested Status
Aaron Bentley (community) Approve
Review via email: mp+113962@code.launchpad.net

Description of the change

This branch makes the lazr.jobrunner script usable with the Celery
configuration used by Launchpad.

The problem: LP's Celery configuration sets CELERY_CREATE_MISSING_QUEUES
to False to avoid the creation of arbitrary queues. With this setting,
an attempt to create an instance of app.amqp.Router in the function
drain_queues() fails with the error "queue not found in CELERY_QUEUES".

This error can be avoid by calling app.amqp.Router(create_missing=True).

Attempts to write a test for this change revealed a test isolation problem
in the existing tests of clear_queues(): The est modules simply imported
the function clear_queues() from the module bin.clear_queues, and
clear_queues() does a "late import" of lazr.jobrunner.celerytask, which
in turn imports indirectly a Celery config module.

This meant that the Celery configuration of the first test was used
in all subsequent tests.

I changed the tests so that the tests now invoke the real script in a
subprocess.

This is done via the context manager running(), which tried to call
proc.terminate(). This failed with an exception for the clear-queues
script because the script is already terminated when proc.terminate()
is called. The call of proc.terminate() is now optional in running().

Finally, I refactored to Celery test configurations a bit: We should
always parameters like BROKER_VHOST or CELERY_RESULT_BACKEND, but
overall quite simple the configuration "config1" needs an environment
variable FILE_JOB_DIR, which is unnecessary for the tests of
clear-queues, so I added a quite basic configuration "simple_config",
which is imported by config1 and by the second new configuration
config_do_not_create_missing_queues.

test: make check

no lint

To post a comment you must log in.
44. By Abel Deuring

use Popen() directly in runClearQueues()

Revision history for this message
Aaron Bentley (abentley) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/lazr/jobrunner/celerytask.py'
2--- src/lazr/jobrunner/celerytask.py 2012-07-03 14:09:46 +0000
3+++ src/lazr/jobrunner/celerytask.py 2012-07-09 15:57:22 +0000
4@@ -95,7 +95,7 @@
5 if callbacks is None:
6 callbacks = [lambda x, y: None]
7 bindings = []
8- router = app.amqp.Router()
9+ router = app.amqp.Router(create_missing=True)
10 for queue_name in queue_names:
11 destination = router.expand_destination(queue_name)
12 exchange = Exchange(destination['exchange'])
13@@ -111,8 +111,9 @@
14 # This is basically copied from kombu.Queue.declare().
15 # We can't use this method directly because queue_declare()
16 # must be called with passive=True for result queues.
17- # Otherwise, attempts to connect to the queue fail with
18- # AMQPChannelException: (406, u"PRECONDITION_FAILED...", ...)
19+ # Otherwise, attempts to connect to the queue fails with
20+ # celery.exceptions.QueueNotFound: "Queue ... is not defined
21+ # in CELERY_QUEUES".
22 for queue in consumer.queues:
23 if queue.exchange:
24 queue.exchange.declare()
25
26=== modified file 'src/lazr/jobrunner/tests/config1.py'
27--- src/lazr/jobrunner/tests/config1.py 2012-03-21 20:38:50 +0000
28+++ src/lazr/jobrunner/tests/config1.py 2012-07-09 15:57:22 +0000
29@@ -1,7 +1,4 @@
30-BROKER_VHOST = "/"
31-CELERY_RESULT_BACKEND = "amqp"
32-CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
33-CELERYD_CONCURRENCY = 1
34+from simple_config import *
35 import os
36 import oops
37 CELERY_ANNOTATIONS = {
38
39=== added file 'src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py'
40--- src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py 1970-01-01 00:00:00 +0000
41+++ src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py 2012-07-09 15:57:22 +0000
42@@ -0,0 +1,2 @@
43+from simple_config import *
44+CELERY_CREATE_MISSING_QUEUES = False
45
46=== added file 'src/lazr/jobrunner/tests/simple_config.py'
47--- src/lazr/jobrunner/tests/simple_config.py 1970-01-01 00:00:00 +0000
48+++ src/lazr/jobrunner/tests/simple_config.py 2012-07-09 15:57:22 +0000
49@@ -0,0 +1,4 @@
50+BROKER_VHOST = "/"
51+CELERY_RESULT_BACKEND = "amqp"
52+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
53+CELERYD_CONCURRENCY = 1
54
55=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
56--- src/lazr/jobrunner/tests/test_celerytask.py 2012-07-04 16:34:25 +0000
57+++ src/lazr/jobrunner/tests/test_celerytask.py 2012-07-09 15:57:22 +0000
58@@ -18,7 +18,6 @@
59
60
61 import contextlib
62-from cStringIO import StringIO
63 import errno
64 import json
65 import os
66@@ -29,7 +28,6 @@
67 )
68 import shutil
69 import subprocess
70-import sys
71 import tempfile
72 from time import sleep
73 from unittest import TestCase
74@@ -39,7 +37,6 @@
75
76 from celery.exceptions import SoftTimeLimitExceeded
77
78-from lazr.jobrunner.bin.clear_queues import clear_queues
79 from lazr.jobrunner.celerytask import (
80 drain_queues,
81 list_queued,
82@@ -481,30 +478,28 @@
83 self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
84
85
86-class TestInspectQueues(TestCase):
87+class TestClearQueues(TestCase):
88 """Tests for the script inspect-queues."""
89
90 def queueName(self, task_id):
91 return task_id.replace('-', '')
92
93- def runInspectQueues(self, celery_config, task_ids):
94+ def runClearQueues(self, celery_config, task_ids):
95 """Invoke clear_queues() and catch the data written to stdout
96 and stderr.
97 """
98+ # Simply calling the function clear_queues() from bin.clear_queues()
99+ # leads to a one-time import of the celery config module; the
100+ # config setting from the test that runs first would override
101+ # any different configuration setting in a later test.
102+ # Running the script in a subprocess avoids this problem.
103 queues = [self.queueName(task_id) for task_id in task_ids]
104- real_stdout = sys.stdout
105- real_stderr = sys.stderr
106- try:
107- sys.stdout = StringIO()
108- sys.stderr = StringIO()
109- args = ['program', '-c', celery_config] + queues
110- clear_queues(args)
111- fake_stdout = sys.stdout.getvalue()
112- fake_stderr = sys.stderr.getvalue()
113- finally:
114- sys.stdout = real_stdout
115- sys.stderr = real_stderr
116- return fake_stdout, fake_stderr
117+ args = ['bin/clear-queues', '-c', celery_config] + queues
118+ proc = subprocess.Popen(
119+ args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
120+ cwd=get_root())
121+ stdout, stderr = proc.communicate()
122+ return stdout, stderr
123
124 def invokeJob(self, celery_config, task, delay=1, job_args={}):
125 """Run the given task.
126@@ -537,15 +532,19 @@
127 but the result is not consumed, the related message can be
128 retrieved with clear_queues().
129 """
130- celery_config = 'lazr.jobrunner.tests.config1'
131- task_id = self.invokeJob(celery_config, RunFileJob)
132- stdout, stderr = self.runInspectQueues(celery_config, [task_id])
133+ celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
134+ task_id = self.invokeJob(celery_config_jobrunner, RunFileJob)
135+ # The script clear_queues does not have to use the same
136+ # Celery configuration as the job runner: clear_config just
137+ # needs to know how to connect to AMQP server.
138+ clear_queue_config = 'lazr.jobrunner.tests.simple_config'
139+ stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
140 self.assertEqual(self.successMessage(task_id), stdout)
141 self.assertEqual('', stderr)
142
143 # Reading a queue is destructive. An attempt to read again from
144 # a queue results in an error.
145- stdout, stderr = self.runInspectQueues(celery_config, [task_id])
146+ stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
147 self.assertEqual('', stdout)
148 self.assertEqual(self.noQueueMessage(task_id), stderr)
149
150@@ -553,11 +552,12 @@
151 """More than one queue can be inspected in one call of
152 clear_queue().
153 """
154- celery_config = 'lazr.jobrunner.tests.config1'
155- task_id_1 = self.invokeJob(celery_config, RunFileJob)
156- task_id_2 = self.invokeJob(celery_config, RunFileJob)
157- stdout, stderr = self.runInspectQueues(
158- celery_config, [task_id_1, task_id_2])
159+ celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
160+ task_id_1 = self.invokeJob(celery_config_jobrunner, RunFileJob)
161+ task_id_2 = self.invokeJob(celery_config_jobrunner, RunFileJob)
162+ clear_queue_config = 'lazr.jobrunner.tests.simple_config'
163+ stdout, stderr = self.runClearQueues(
164+ clear_queue_config, [task_id_1, task_id_2])
165 expected_stdout = (
166 self.successMessage(task_id_1) + self.successMessage(task_id_2))
167 self.assertEqual(expected_stdout, stdout)
168@@ -567,8 +567,28 @@
169 """A Celery task which was started so that no result is returned
170 does not write to a task queue.
171 """
172- celery_config = 'lazr.jobrunner.tests.config1'
173- task_id = self.invokeJob(celery_config, RunFileJobNoResult)
174- stdout, stderr = self.runInspectQueues(celery_config, [task_id])
175+ celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
176+ task_id = self.invokeJob(celery_config_jobrunner, RunFileJobNoResult)
177+ clear_queue_config = 'lazr.jobrunner.tests.simple_config'
178+ stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
179+ self.assertEqual('', stdout)
180+ self.assertEqual(self.noQueueMessage(task_id), stderr)
181+
182+ def test_clear_queues__config_create_missing_queues_false(self):
183+ """If CELERY_CREATE_MISSING_QUEUES is False, drain_queues()
184+ must override this setting by creating the router instance
185+ with the parameter create_missing=True. Otherwise,
186+ app.amqp.Router() will fail with
187+ 'celery.exceptions.QueueNotFound: "Queue ... is not defined in
188+ CELERY_QUEUES"'. Note that this does not mean that a non-existent
189+ queue is actually created, see
190+ assertEqual(self.noQueueMessage(task_id), stderr) below...
191+ """
192+ celery_config = (
193+ 'lazr.jobrunner.tests.config_do_not_create_missing_queues')
194+ # A test isolation problem: Specifying a configuration module does
195+ # not have the desired effect
196+ task_id = 'this-queue-does-not-exist'
197+ stdout, stderr = self.runClearQueues(celery_config, [task_id])
198 self.assertEqual('', stdout)
199 self.assertEqual(self.noQueueMessage(task_id), stderr)

Subscribers

People subscribed via source and target branches

to all changes: