Merge lp:~bac/charmworld/make-queue-a-job into lp:charmworld

Proposed by Brad Crittenden
Status: Merged
Approved by: Brad Crittenden
Approved revision: 470
Merged at revision: 469
Proposed branch: lp:~bac/charmworld/make-queue-a-job
Merge into: lp:charmworld
Diff against target: 448 lines (+177/-45)
6 files modified
charmworld/jobs/lp.py (+16/-6)
charmworld/jobs/tests/test_lp.py (+47/-0)
charmworld/jobs/tests/test_worker.py (+58/-26)
charmworld/jobs/worker.py (+26/-11)
charmworld/testing/__init__.py (+29/-0)
docs/hacking.rst (+1/-2)
To merge this branch: bzr merge lp:~bac/charmworld/make-queue-a-job
Reviewer Review Type Date Requested Status
Juju Gui Bot continuous-integration Approve
Charmworld Developers Pending
Review via email: mp+200027@code.launchpad.net

Commit message

Run charm/bundle queue and ingest from one script

Originally, queueing was run from a cronjob every fifteen minutes. Stuff was
added without regard to it being queued up already.

Ingest was run continuously but slept fifteen minutes when it completed before
looking at the queue again.

This change makes the supervisord-run worker do the queue job, then ingest
charms and bundles. At the end, it sleeps for fifteen minutes minus the
amount spent doing the work.

The charmworld charm needs to be fixed to not create the cronjob for
queueing. It hasn't been done yet but the bin/queued script that is called by
the cronjob needs to be disabled before this change lands or the queueing will
be run by two different approaches.

https://codereview.appspot.com/45440043/

R=benji

Description of the change

Run charm/bundle queue and ingest from one script

Originally, queueing was run from a cronjob every fifteen minutes. Stuff was
added without regard to it being queued up already.

Ingest was run continuously but slept fifteen minutes when it completed before
looking at the queue again.

This change makes the supervisord-run worker do the queue job, then ingest
charms and bundles. At the end, it sleeps for fifteen minutes minus the
amount spent doing the work.

The charmworld charm needs to be fixed to not create the cronjob for
queueing. It hasn't been done yet but the bin/queued script that is called by
the cronjob needs to be disabled before this change lands or the queueing will
be run by two different approaches.

https://codereview.appspot.com/45440043/

To post a comment you must log in.
Revision history for this message
Brad Crittenden (bac) wrote :

Reviewers: mp+200027_code.launchpad.net,

Message:
Please take a look.

Description:
Run charm/bundle queue and ingest from one script

Originally, queueing was run from a cronjob every fifteen minutes.
Stuff was
added without regard to it being queued up already.

Ingest was run continuously but slept fifteen minutes when it completed
before
looking at the queue again.

This change makes the supervisord-run worker do the queue job, then
ingest
charms and bundles. At the end, it sleeps for fifteen minutes minus the
amount spent doing the work.

The charmworld charm needs to be fixed to not create the cronjob for
queueing. It hasn't been done yet but the bin/queued script that is
called by
the cronjob needs to be disabled before this change lands or the
queueing will
be run by two different approaches.

https://code.launchpad.net/~bac/charmworld/make-queue-a-job/+merge/200027

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/45440043/

Affected files (+147, -43 lines):
   A [revision details]
   M charmworld/jobs/lp.py
   M charmworld/jobs/tests/test_lp.py
   M charmworld/jobs/tests/test_worker.py
   M charmworld/jobs/worker.py
   M charmworld/testing/__init__.py

Revision history for this message
Brad Crittenden (bac) wrote :

Ignore my comment about the cronjob. It seems bin/enqueue is handy to
keep around so I'll just manually disable the cronjob on staging before
this branch lands and then follow quickly with an updated charm.

https://codereview.appspot.com/45440043/

lp:~bac/charmworld/make-queue-a-job updated
469. By Brad Crittenden

Updated hacking.rst to remove bin/enqueue.

Revision history for this message
Benji York (benji) wrote :

LGTM -- there is one critical bug that should be easy to fix (and
probably deserves a test so something like that doesn't happen again).

https://codereview.appspot.com/45440043/diff/1/charmworld/jobs/lp.py
File charmworld/jobs/lp.py (right):

https://codereview.appspot.com/45440043/diff/1/charmworld/jobs/lp.py#newcode206
charmworld/jobs/lp.py:206: def main(arglist):
The bin/enqueue script is an entry point wrapper around this function
and doesn't pass any arguments, so if you run bin/enqueue you'll get

     TypeError: main() takes exactly 1 argument (0 given)

I would use a default argument of None and add this to the body of main:

if arglist is None:
     arglist = sys.argv[1:]

https://codereview.appspot.com/45440043/diff/1/charmworld/jobs/tests/test_lp.py
File charmworld/jobs/tests/test_lp.py (right):

https://codereview.appspot.com/45440043/diff/1/charmworld/jobs/tests/test_lp.py#newcode374
charmworld/jobs/tests/test_lp.py:374: namespace = Namespace()
It took a couple of readings for me to understand this test. Perhaps
naming "namespace" "expected" instead would have helped.

https://codereview.appspot.com/45440043/diff/1/charmworld/jobs/tests/test_lp.py#newcode375
charmworld/jobs/tests/test_lp.py:375: setattr(namespace, 'prefix',
'~sinzui')
I'm curious as to why you used setattr instead of "namespace.prefix =
'~sinzui'".

https://codereview.appspot.com/45440043/diff/1/charmworld/jobs/tests/test_lp.py#newcode380
charmworld/jobs/tests/test_lp.py:380:
This looks like an errant newline. Or maybe a space reserved for a
forgotten comment.

https://codereview.appspot.com/45440043/diff/1/charmworld/jobs/tests/test_worker.py
File charmworld/jobs/tests/test_worker.py (right):

https://codereview.appspot.com/45440043/diff/1/charmworld/jobs/tests/test_worker.py#newcode231
charmworld/jobs/tests/test_worker.py:231: def test_lp_run_limit(self):
I'm a big fan of the
first-line-of-a-test-is-a-comment-describing-what-the-test-is-about
pattern. Hint, hint. ;)

https://codereview.appspot.com/45440043/

lp:~bac/charmworld/make-queue-a-job updated
470. By Brad Crittenden

Fixes from review. Allow lp-main to be called with no args since that's what bin/enqueue does. Add tests. Make existing tests more readable and comment intent.

Revision history for this message
Juju Gui Bot (juju-gui-bot) :
review: Approve (continuous-integration)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'charmworld/jobs/lp.py'
2--- charmworld/jobs/lp.py 2013-10-23 21:53:17 +0000
3+++ charmworld/jobs/lp.py 2014-01-02 19:14:28 +0000
4@@ -8,6 +8,7 @@
5 )
6 import logging
7 import pymongo
8+import sys
9
10 from charmworld.charmstore import (
11 CharmStore,
12@@ -169,10 +170,7 @@
13 log.warn(str(error))
14
15
16-# XXX bug=1130732: This function does not appear to be tested.
17-def main():
18- configure_logging()
19- parser = argparse.ArgumentParser()
20+def configure_parser(parser):
21 parser.add_argument(
22 "--limit",
23 help="Maximum number of charms to queue.",
24@@ -181,7 +179,9 @@
25 "--prefix",
26 help="Only branches matching this prefix are queued.",
27 type=str, default=unspecified)
28- args = parser.parse_args()
29+
30+
31+def run(args):
32 log = logging.getLogger("charm.launchpad")
33 settings = get_ini()
34 if args.limit is None:
35@@ -203,5 +203,15 @@
36 log.warn(str(error))
37
38
39+def main(arglist=None):
40+ if arglist is None:
41+ arglist = sys.argv[1:]
42+ configure_logging()
43+ parser = argparse.ArgumentParser()
44+ configure_parser(parser)
45+ args = parser.parse_args(arglist)
46+ run(args)
47+
48+
49 if __name__ == '__main__':
50- main()
51+ main(sys.argv[1:])
52
53=== modified file 'charmworld/jobs/tests/test_lp.py'
54--- charmworld/jobs/tests/test_lp.py 2013-10-24 17:24:43 +0000
55+++ charmworld/jobs/tests/test_lp.py 2014-01-02 19:14:28 +0000
56@@ -1,6 +1,7 @@
57 # Copyright 2012, 2013 Canonical Ltd. This software is licensed under the
58 # GNU Affero General Public License version 3 (see the file LICENSE).
59
60+from argparse import Namespace
61 import json
62 from requests.exceptions import ConnectionError
63
64@@ -16,6 +17,8 @@
65 JobTestBase,
66 MONGO_DATABASE,
67 MongoTestBase,
68+ redirect_stderr,
69+ substitute_argv,
70 )
71
72 from charmworld.jobs.lp import (
73@@ -24,6 +27,7 @@
74 CHARM_QUEUE,
75 db_charms,
76 dequeue,
77+ main,
78 queue_from_branches,
79 unspecified,
80 )
81@@ -354,3 +358,46 @@
82 '~charmers/charms/precise'):
83 self.queue_from_branches()
84 self.assertIs(None, self.charm_queue.next())
85+
86+ def run_main(self, extra_args=None):
87+ with patch('charmworld.jobs.lp.configure_logging') as mock_cl:
88+ with patch('charmworld.jobs.lp.run') as mock_run:
89+ fake_argv = None
90+ if extra_args is not None:
91+ # Listify and add.
92+ fake_argv = extra_args.split()
93+ if fake_argv is not None:
94+ main(fake_argv)
95+ else:
96+ # We must be able to call main with no arguments.
97+ main()
98+ self.assertTrue(mock_cl.called)
99+ return mock_run
100+
101+ def test_main_args(self):
102+ # Show arguments are passed and used by main.
103+ mock = self.run_main('--limit 10 --prefix ~sinzui')
104+ expected = Namespace()
105+ expected.prefix = '~sinzui'
106+ expected.limit = 10
107+ mock.assert_called_with(expected)
108+
109+ def test_main_bad_args(self):
110+ # Show bad arguments passed to main are caught.
111+ with redirect_stderr() as stderr:
112+ with self.assertRaises(SystemExit) as e:
113+ self.run_main('--nolimit 10 --prefix ~sinzui')
114+ self.assertEqual(2, e.exception.code)
115+ self.assertIn(
116+ 'error: unrecognized arguments: --nolimit 10', stderr.read())
117+
118+ def test_main_no_args(self):
119+ # Show main can be called with no arguments and in that case uses
120+ # sys.argv.
121+ test_args = ['ignore', '--prefix', '~benji', '--limit', '101']
122+ with substitute_argv(test_args):
123+ mock = self.run_main()
124+ expected = Namespace()
125+ expected.prefix = '~benji'
126+ expected.limit = 101
127+ mock.assert_called_with(expected)
128
129=== modified file 'charmworld/jobs/tests/test_worker.py'
130--- charmworld/jobs/tests/test_worker.py 2013-07-30 19:14:07 +0000
131+++ charmworld/jobs/tests/test_worker.py 2014-01-02 19:14:28 +0000
132@@ -59,6 +59,10 @@
133 self.charms.append(charm_data)
134
135
136+class FakeArgs(object):
137+ run_forever = False
138+
139+
140 @contextmanager
141 def main_environment(charm, index_client, queue):
142 """An environment in which worker.main() can be run."""
143@@ -67,16 +71,18 @@
144 yield
145
146
147-def FauxWorkerFactory(test_case):
148+def FauxWorkerFactory(test_case, expected_arg_hash):
149 class FauxWorker:
150 run_called = False
151
152- def __init__(self, *args):
153+ def __init__(self, *ignored):
154 pass
155
156 @classmethod
157- def run(cls, run_forever):
158- test_case.assertFalse(run_forever)
159+ def run(cls, args_):
160+ for argname, expected in expected_arg_hash.items():
161+ actual = args_.__getattribute__(argname)
162+ test_case.assertEqual(expected, actual)
163 cls.run_called = True
164
165 return FauxWorker
166@@ -90,6 +96,7 @@
167 self.basket_queue = get_queue(self.db, BASKET_QUEUE)
168 self.charm_queue.put({'charm': 'foo'})
169 self.charm_queue.put({'charm': 'bar'})
170+ self.args = FakeArgs()
171
172 def tearDown(self):
173 self.charm_queue.clear()
174@@ -103,10 +110,11 @@
175 GatheringRun(),
176 )
177 queue_jobs = {
178- self.charm_queue: ingest_jobs
179+ self.charm_queue: ingest_jobs,
180 }
181- worker = QueueWorker(self.db, queue_jobs)
182- worker.run()
183+ worker = QueueWorker(self.db, None, queue_jobs)
184+
185+ worker.run(self.args)
186
187 self.assertEqual(0, self.charm_queue.size())
188 expected = [
189@@ -119,10 +127,10 @@
190 def test_failure_while_running(self):
191 ingest_jobs = (FailureRun(), GatheringRun())
192 queue_jobs = {
193- self.charm_queue: ingest_jobs
194+ self.charm_queue: ingest_jobs,
195 }
196- worker = QueueWorker(self.db, queue_jobs)
197- worker.run()
198+ worker = QueueWorker(self.db, None, queue_jobs)
199+ worker.run(self.args)
200
201 self.assertEqual(0, self.charm_queue.size())
202 expected = [
203@@ -150,26 +158,30 @@
204 SuccessfulRun(),
205 GatheringRun(),
206 )
207- worker = QueueWorker(self.db, {LockedJobQueue: ingest_jobs})
208- worker.run()
209+ worker = QueueWorker(self.db, None, {LockedJobQueue: ingest_jobs})
210+ worker.run(self.args)
211
212 def test_ingest_job_contract(self):
213 # All ingest jobs accept the right inputs.
214 ingest_jobs = [create_autospec(UpdateCharmJob)()]
215- worker = QueueWorker(self.db, {self.charm_queue: ingest_jobs})
216- worker.run()
217+ worker = QueueWorker(self.db, None, {self.charm_queue: ingest_jobs})
218+ worker.run(self.args)
219 self.assertEqual(0, self.charm_queue.size())
220
221 def test_interval_is_int(self):
222- worker = QueueWorker(self.db, {self.charm_queue: []})
223+ worker = QueueWorker(self.db, None, {self.charm_queue: []})
224 self.assertIs(int, type(worker.interval),
225 'QueueWorker.interval is a %s, not an int.' %
226 type(worker.interval))
227
228- def run_main_with_exit(self):
229+ def run_main_with_exit(self, extra_args=None):
230 with patch('charmworld.jobs.worker.configure_logging'):
231 with self.assertRaises(SystemExit) as e:
232- main(['ingest-queued'], db=self.db)
233+ fake_argv = ['ingest-queued']
234+ if extra_args is not None:
235+ # Listify and add.
236+ fake_argv.extend(extra_args.split())
237+ main(fake_argv, db=self.db)
238 return e.exception.code
239
240 def test_elasticsearch_failure(self):
241@@ -184,7 +196,8 @@
242 values={'es_urls': 'http://localhost:70'}):
243 with main_environment(charm, index_client, self.charm_queue):
244 with logger_buffer('charm.worker') as handler:
245- exit_code = self.run_main_with_exit()
246+ with patch('charmworld.jobs.worker.lp.run'):
247+ exit_code = self.run_main_with_exit()
248 self.assertEqual(40, handler.buffer[1].levelno)
249 self.assertIn('Could not connect', handler.buffer[1].msg)
250 self.assertEqual(1, exit_code)
251@@ -195,19 +208,38 @@
252 queue = get_queue(self.db, BASKET_QUEUE)
253 queue_entry = {'bundle': 'foo'}
254 queue.put(queue_entry)
255- with patch('charmworld.jobs.worker.run_job') as run_job:
256- self.run_main_with_exit()
257- queue_entries_run = [x[0][1] for x in run_job.call_args_list]
258- self.assertIn(queue_entry, queue_entries_run)
259+ with patch('charmworld.jobs.worker.lp.run'):
260+ with patch('charmworld.jobs.worker.run_job') as run_job:
261+ self.run_main_with_exit()
262+ queue_entries_run = [x[0][1] for x in run_job.call_args_list]
263+ self.assertIn(queue_entry, queue_entries_run)
264
265 def test_worker_does_not_run_forever_by_default(self):
266- FauxWorker = FauxWorkerFactory(self)
267+ expected = {'run_forever': False}
268+ FauxWorker = FauxWorkerFactory(self, expected)
269 with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker):
270 self.run_main_with_exit()
271 self.assertTrue(FauxWorker.run_called)
272
273 def test_worker_runs_forever_if_command_line_says_to(self):
274- FauxWorker = FauxWorkerFactory(self)
275- with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker):
276- self.run_main_with_exit()
277+ expected = {'run_forever': True}
278+ FauxWorker = FauxWorkerFactory(self, expected)
279+ with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker):
280+ self.run_main_with_exit('--run-forever')
281+ self.assertTrue(FauxWorker.run_called)
282+
283+ def test_lp_run_limit(self):
284+ # Show that the '--limit' argument is passed and used.
285+ expected = {'limit': 5}
286+ FauxWorker = FauxWorkerFactory(self, expected)
287+ with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker):
288+ self.run_main_with_exit('--limit 5')
289+ self.assertTrue(FauxWorker.run_called)
290+
291+ def test_lp_run_prefix(self):
292+ # Show that the '--prefix' argument is passed and used.
293+ expected = {'prefix': 'abentley'}
294+ FauxWorker = FauxWorkerFactory(self, expected)
295+ with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker):
296+ self.run_main_with_exit('--prefix abentley')
297 self.assertTrue(FauxWorker.run_called)
298
299=== modified file 'charmworld/jobs/worker.py'
300--- charmworld/jobs/worker.py 2013-07-22 15:49:43 +0000
301+++ charmworld/jobs/worker.py 2014-01-02 19:14:28 +0000
302@@ -3,6 +3,7 @@
303
304 import argparse
305 import logging
306+import math
307 import sys
308 import time
309
310@@ -15,6 +16,7 @@
311 UpdateBundleJob,
312 UpdateCharmJob,
313 )
314+from charmworld.jobs import lp
315 from charmworld.jobs.utils import get_queue
316 from charmworld.models import getconnection
317 from charmworld.models import getdb
318@@ -32,20 +34,29 @@
319 class QueueWorker(object):
320 """Processes raw charm data"""
321
322- def __init__(self, db, queue_jobs, interval=INTERVAL):
323+ def __init__(self, db, queue_jobs, ingest_jobs, interval=INTERVAL):
324 self.db = db
325- self.queue_jobs = queue_jobs
326+ self.queue_jobs = queue_jobs or []
327+ self.ingest_jobs = ingest_jobs
328 self.log = logging.getLogger('charm.worker')
329 self.interval = interval
330
331- def run(self, run_forever=False):
332+ @staticmethod
333+ def wait_seconds(interval, start, now):
334+ remaining = interval - (now - start)
335+ return math.max(0, remaining)
336+
337+ def run(self, args, now=None):
338 self.log.info('Starting processing')
339+ self.start_time = now if now is not None else time.time()
340 return_code = 0
341- for jobs in self.queue_jobs.values():
342+ for jobs in self.ingest_jobs.values():
343 for job in jobs:
344 job.setup(db=self.db)
345 while True:
346- for queue, jobs in self.queue_jobs.items():
347+ for job in self.queue_jobs:
348+ job(args)
349+ for queue, jobs in self.ingest_jobs.items():
350 while True:
351 item = queue.next()
352 if item is None:
353@@ -64,15 +75,17 @@
354 finally:
355 item.complete()
356
357- if not run_forever:
358+ if not args.run_forever:
359 self.log.info('Empty queue and not running forever; quitting.')
360 break
361
362- wait_time_in_minutes = self.interval / 60
363+ wait_seconds = self.wait_seconds(
364+ self.interval, self.start_time, time.time())
365+ wait_time_in_minutes = wait_seconds / 60
366 self.log.info(
367 'Empty queue. Waiting %s minutes to recheck.' %
368 wait_time_in_minutes)
369- time.sleep(self.interval)
370+ time.sleep(wait_seconds)
371 return return_code
372
373
374@@ -88,11 +101,13 @@
375 help='Keep running after the queues are processed and process more '
376 'later.',
377 action='store_true', dest='run_forever')
378+ lp.configure_parser(parser)
379 # Explicitly pass sys.argv to make monkeypatching more reliable.
380 args = parser.parse_args(argv[1:])
381- queue_jobs = {
382+ queue_jobs = (lp.run, )
383+ ingest_jobs = {
384 get_queue(db, CHARM_QUEUE): [UpdateCharmJob()],
385 get_queue(db, BASKET_QUEUE): [UpdateBundleJob()],
386 }
387- worker = QueueWorker(db, queue_jobs)
388- sys.exit(worker.run(run_forever=args.run_forever))
389+ worker = QueueWorker(db, queue_jobs, ingest_jobs)
390+ sys.exit(worker.run(args))
391
392=== modified file 'charmworld/testing/__init__.py'
393--- charmworld/testing/__init__.py 2013-10-14 19:36:57 +0000
394+++ charmworld/testing/__init__.py 2014-01-02 19:14:28 +0000
395@@ -6,6 +6,7 @@
396 import os
397 import shutil
398 from StringIO import StringIO
399+import sys
400 from tempfile import mkdtemp
401 import unittest
402 from urllib2 import URLError
403@@ -281,3 +282,31 @@
404 filename,
405 )
406 return open(data_path).read()
407+
408+
409+@contextmanager
410+def redirect_stderr():
411+ """Context manager for redirecting sys.stderr.
412+
413+ The new stream where sys.stderr is redirected is return.
414+ Upon exiting the context manager the stream is set to the first byte
415+ written so it can easily be read.
416+ """
417+ orig_stderr = sys.stderr
418+ stream = StringIO()
419+ sys.stderr = stream
420+ try:
421+ yield stream
422+ finally:
423+ stream.seek(0)
424+ sys.stderr = orig_stderr
425+
426+
427+@contextmanager
428+def substitute_argv(args):
429+ orig_argv = sys.argv
430+ sys.argv = args
431+ try:
432+ yield
433+ finally:
434+ sys.argv = orig_argv
435
436=== modified file 'docs/hacking.rst'
437--- docs/hacking.rst 2013-10-30 18:35:32 +0000
438+++ docs/hacking.rst 2014-01-02 19:14:28 +0000
439@@ -95,8 +95,7 @@
440 $ make sysdeps
441 $ make install
442 $ make run
443- $ bin/enqueue --prefix=~charmers/charms/precise --limit=100
444- $ bin/ingest-queued
445+ $ bin/ingest-queued --prefix=~charmers/charms/precise --limit=100
446
447
448 Contributing work

Subscribers

People subscribed via source and target branches

to all changes: