Merge lp:~benji/charmworld/even-more-bundles into lp:~juju-jitsu/charmworld/trunk

Proposed by Benji York
Status: Merged
Approved by: Benji York
Approved revision: 332
Merged at revision: 320
Proposed branch: lp:~benji/charmworld/even-more-bundles
Merge into: lp:~juju-jitsu/charmworld/trunk
Diff against target: 734 lines (+233/-163)
13 files modified
charmworld/jobs/ingest.py (+11/-4)
charmworld/jobs/lp.py (+4/-3)
charmworld/jobs/supervisord.conf (+6/-8)
charmworld/jobs/tests/test_ingest.py (+44/-6)
charmworld/jobs/tests/test_lp.py (+5/-5)
charmworld/jobs/tests/test_worker.py (+92/-38)
charmworld/jobs/utils.py (+2/-9)
charmworld/jobs/worker.py (+61/-70)
charmworld/models.py (+2/-1)
charmworld/teams.py (+4/-2)
charmworld/tests/test_models.py (+1/-0)
docs/index.rst (+1/-1)
scripts/worker (+0/-16)
To merge this branch: bzr merge lp:~benji/charmworld/even-more-bundles
Reviewer Review Type Date Requested Status
Aaron Bentley (community) Approve
Review via email: mp+176227@code.launchpad.net

Commit message

Make the queue worker process the basket queue.

Description of the change

Generalize the queue worker to process the basket queue as well as the
charm queue.

To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

This all looks good. The only thing I see is a style issue. In several places, you have multiple imports on a single line, e.g. "from charmworld.jobs.config import CHARM_QUEUE, BASKET_QUEUE, settings". Charmworld follows Launchpad style, and puts these on multiple lines to reduce merge conflicts. e.g.:

from charmworld.jobs.config import (
    BASKET_QUEUE,
    CHARM_QUEUE,
    settings,
)

review: Approve
331. By Benji York

tweak import style

332. By Benji York

more import style tweaking

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'charmworld/jobs/ingest.py'
2--- charmworld/jobs/ingest.py 2013-07-18 21:49:30 +0000
3+++ charmworld/jobs/ingest.py 2013-07-22 15:53:24 +0000
4@@ -41,6 +41,7 @@
5 getdb,
6 getfs,
7 options_to_storage,
8+ store_bundles,
9 )
10 from charmworld.search import (
11 ElasticSearchClient,
12@@ -277,6 +278,7 @@
13 fs = getfs(self.db, collection='hashed-files')
14 self.decorate_basket(payload, fs)
15 self.db.baskets.save(payload)
16+ self.store_bundles(fs, payload, self.db.bundles)
17
18 @staticmethod
19 def construct_id(basket_data, revno):
20@@ -290,8 +292,13 @@
21 tree = branch.repository.revision_tree(basket_data['commit'])
22 basket_data['_id'] = self.construct_id(basket_data, revno)
23 basket_data['file_hashes'] = slurp_files(fs, tree)
24- #Basket(basket_data, fs).get_file_contents('')
25- #basket = get_basket(fs, basket_data['file_hashes'])
26+
27+ @staticmethod
28+ def store_bundles(fs, basket_data, collection):
29+ hashes = basket_data['file_hashes']
30+ deployer_config_bytes = fs.get(hashes['bundles.yaml'])
31+ deployer_config = yaml.safe_load(deployer_config_bytes)
32+ store_bundles(collection, deployer_config, basket_data['_id'])
33
34
35 def slurp_files(fs, tree):
36@@ -529,7 +536,7 @@
37 cfile = CharmFileSet.get_by_id(
38 fs, files[metadata_file]['fileid'])
39 try:
40- metadata = quote_yaml(yaml.load(cfile.read()))
41+ metadata = quote_yaml(yaml.safe_load(cfile.read()))
42 except Exception, exc:
43 raise IngestError(
44 'Invalid charm metadata %s: %s' % (
45@@ -543,7 +550,7 @@
46 config_raw = cfile.read()
47
48 try:
49- config_yaml = yaml.load(config_raw)
50+ config_yaml = yaml.safe_load(config_raw)
51 if 'options' in config_yaml:
52 config_yaml['options'] = options_to_storage(
53 config_yaml['options'])
54
55=== modified file 'charmworld/jobs/lp.py'
56--- charmworld/jobs/lp.py 2013-07-19 12:51:01 +0000
57+++ charmworld/jobs/lp.py 2013-07-22 15:53:24 +0000
58@@ -131,7 +131,7 @@
59 try:
60 lease_time = int(settings['script_lease_time']) * 60
61 with lock('ingest-queue', lease_time, db, log):
62- queue = get_queue(CHARM_QUEUE)
63+ queue = get_queue(db, CHARM_QUEUE)
64 queue_size = queue.size()
65 queue.clear()
66 log.info("Dequeued %s charms", queue_size)
67@@ -139,6 +139,7 @@
68 log.warn(str(error))
69
70
71+# XXX bug=1130732: This function does not appear to be tested.
72 def main():
73 configure_logging()
74 parser = argparse.ArgumentParser()
75@@ -164,8 +165,8 @@
76 try:
77 with lock('ingest-queue', int(settings['script_lease_time']) * 60,
78 db, log):
79- charm_queue = get_queue(CHARM_QUEUE)
80- basket_queue = get_queue(BASKET_QUEUE)
81+ charm_queue = get_queue(db, CHARM_QUEUE)
82+ basket_queue = get_queue(db, BASKET_QUEUE)
83 queue_from_branches(db, charm_queue, basket_queue,
84 charm_import_limit, args.prefix)
85 except LockHeld, error:
86
87=== modified file 'charmworld/jobs/supervisord.conf'
88--- charmworld/jobs/supervisord.conf 2013-04-01 15:07:06 +0000
89+++ charmworld/jobs/supervisord.conf 2013-07-22 15:53:24 +0000
90@@ -29,11 +29,9 @@
91 ; supervisor.
92
93 [program:ingest]
94-command=bin/python charmworld/jobs/worker.py ; the program (relative uses PATH, can take args)
95-numprocs=1 ; number of processes copies to start (def 1)
96-;directory=/tmp ; directory to cwd to before exec (def no cwd)
97-;autostart=true ; start at supervisord start (default: true)
98-autorestart=unexpected ; whether/when to restart (default: unexpected)
99-startsecs=1 ; number of secs prog must stay running (def. 1)
100-startretries=3 ; max # of serial start failures (default 3)
101-exitcodes=0,2 ; 'expected' exit codes for process (default 0,2)
102+command=bin/ingest-queued --run-forever
103+numprocs=1
104+autorestart=unexpected
105+startsecs=1
106+startretries=3
107+exitcodes=0,2
108
109=== modified file 'charmworld/jobs/tests/test_ingest.py'
110--- charmworld/jobs/tests/test_ingest.py 2013-07-18 21:49:30 +0000
111+++ charmworld/jobs/tests/test_ingest.py 2013-07-22 15:53:24 +0000
112@@ -11,6 +11,7 @@
113 import os.path
114 import shutil
115 import subprocess
116+from textwrap import dedent
117
118 import bzrlib
119 from bzrlib.bzrdir import BzrDir
120@@ -237,12 +238,13 @@
121 side_effect=checkout_charm_mock
122 ) as checkout_branch:
123 with bzr_isolation() as working_dir:
124- try:
125- yield checkout_branch, working_dir
126- finally:
127- # Clean up despite exceptions raised by the context manager
128- # client.
129- shutil.rmtree(CHARM_DIR, ignore_errors=True)
130+ with patch('charmworld.jobs.ingest.UpdateBundleJob.store_bundles'):
131+ try:
132+ yield checkout_branch, working_dir
133+ finally:
134+ # Clean up despite exceptions raised by the context
135+ # manager client.
136+ shutil.rmtree(CHARM_DIR, ignore_errors=True)
137
138 # TODO complete refactoring of checkout_branch, et al.
139
140@@ -368,6 +370,42 @@
141 basket_id = UpdateBundleJob.construct_id(payload, 5)
142 self.assertEqual('~charmers/mysql-5', basket_id)
143
144+ def test_job_can_store_bundles_in_the_db(self):
145+ basket_data = factory.get_payload_json(name='wordpress')
146+ basket_id = UpdateBundleJob.construct_id(basket_data, 5)
147+ bundle_id = '%s/%s' % (basket_id, 'stage')
148+ basket_data['_id'] = basket_id
149+ job = UpdateBundleJob()
150+ job.setup(self.db)
151+ DEPLOYER_CONFIG_HASH = '31337'
152+ basket_data['file_hashes'] = {
153+ 'bundles.yaml': DEPLOYER_CONFIG_HASH,
154+ }
155+ fs = getfs(self.db)
156+ deployer_config = dedent("""\
157+ stage:
158+ series: precise
159+ services:
160+ blog:
161+ charm: cs:precise/wordpress
162+ constraints: mem=2
163+ options:
164+ tuning: optimized
165+ engine: apache
166+ """)
167+ fs.put(deployer_config, _id=DEPLOYER_CONFIG_HASH)
168+ job.store_bundles(fs, basket_data, self.db.bundles)
169+ self.assertIsNotNone(self.db.bundles.find_one(bundle_id))
170+
171+ def test_job_run_stores_bundles_in_the_db(self):
172+ basket_data = factory.get_payload_json(name='wordpress')
173+ job = UpdateBundleJob()
174+ job.setup(self.db)
175+ with patch.object(job, 'decorate_basket'):
176+ with patch.object(job, 'store_bundles') as store_bundles:
177+ job.run(basket_data)
178+ self.assertTrue(store_bundles.called)
179+
180
181 # TODO add bundles to index
182
183
184=== modified file 'charmworld/jobs/tests/test_lp.py'
185--- charmworld/jobs/tests/test_lp.py 2013-07-19 12:51:01 +0000
186+++ charmworld/jobs/tests/test_lp.py 2013-07-22 15:53:24 +0000
187@@ -258,8 +258,8 @@
188
189 @patch('charmworld.jobs.lp.all_branch_data', all_branch_data_mock)
190 def test_queue_from_branches(self):
191- charm_queue = get_queue('test_charm_queue')
192- basket_queue = get_queue('test_basket_queue')
193+ charm_queue = get_queue(self.db, 'test_charm_queue')
194+ basket_queue = get_queue(self.db, 'test_basket_queue')
195 self.addCleanup(basket_queue.clear)
196 queue_from_branches(self.db, charm_queue, basket_queue)
197 item = charm_queue.next()
198@@ -269,7 +269,7 @@
199 self.assertEqual({'foo': 'bar'}, item.payload)
200
201 def test_dequeue(self):
202- queue = get_queue('test_queue')
203+ queue = get_queue(self.db, 'test_queue')
204 self.addCleanup(queue.clear)
205 charm_data = factory.get_charm_json()
206 queue.put(charm_data)
207@@ -281,7 +281,7 @@
208 with patch(lp_get_queue, return_value=queue) as gq:
209 dequeue()
210 getdb.assert_called_with(ANY, MONGO_DATABASE)
211- gq.assert_called_with(CHARM_QUEUE)
212+ gq.assert_called_with(self.db, CHARM_QUEUE)
213 self.assertEqual(0, queue.size())
214
215 @patch('requests.get', failing_requests_get)
216@@ -290,7 +290,7 @@
217 # charm branches, no ingest jobs are queued. Otherwise, the
218 # promulgated status may be incorrectly reset for promulgated
219 # charms that are not returned by all_branch_data().
220- out_queue = get_queue('test_queue')
221+ out_queue = get_queue(self.db, 'test_queue')
222 factory.makeCharm(self.db)
223 self.assertRaises(
224 ConnectionError, queue_from_branches, self.db,
225
226=== modified file 'charmworld/jobs/tests/test_worker.py'
227--- charmworld/jobs/tests/test_worker.py 2013-07-17 20:40:55 +0000
228+++ charmworld/jobs/tests/test_worker.py 2013-07-22 15:53:24 +0000
229@@ -1,9 +1,10 @@
230 # Copyright 2013 Canonical Ltd. This software is licensed under the
231 # GNU Affero General Public License version 3 (see the file LICENSE).
232
233+__metaclass__ = type
234+
235 from contextlib import contextmanager
236
237-from mongoqueue import MongoQueue
238 from mock import (
239 create_autospec,
240 patch,
241@@ -11,14 +12,21 @@
242 from pyelasticsearch import ElasticSearch
243
244 from charmworld.testing import logger_buffer
245-from charmworld.jobs.config import settings
246+from charmworld.jobs.config import (
247+ CHARM_QUEUE,
248+ BASKET_QUEUE,
249+ settings,
250+)
251 from charmworld.jobs.worker import (
252- CharmWorker,
253- DEFAULT_JOBS,
254+ QueueWorker,
255 main,
256 )
257-from charmworld.jobs.ingest import IngestJob
258+from charmworld.jobs.ingest import (
259+ DBIngestJob,
260+ UpdateCharmJob,
261+)
262 from charmworld.jobs.tests.test_ingest import charm_update_environment
263+from charmworld.jobs.utils import get_queue
264 from charmworld.search import ElasticSearchClient
265 from charmworld.testing import (
266 factory,
267@@ -26,13 +34,13 @@
268 )
269
270
271-class SuccessfulRun(IngestJob):
272+class SuccessfulRun(DBIngestJob):
273
274 def run(self, charm_data):
275 charm_data['ingested'] = True
276
277
278-class FailureRun(IngestJob):
279+class FailureRun(DBIngestJob):
280
281 def run(self, charm_data):
282 if charm_data['charm'] == 'foo':
283@@ -41,9 +49,10 @@
284 charm_data['ingested'] = True
285
286
287-class GatheringRun(IngestJob):
288+class GatheringRun(DBIngestJob):
289
290- def setup(self):
291+ def setup(self, db=None):
292+ super(GatheringRun, self).setup(db)
293 self.charms = []
294
295 def run(self, charm_data):
296@@ -54,23 +63,21 @@
297 def main_environment(charm, index_client, queue):
298 """An environment in which worker.main() can be run."""
299 with patch('charmworld.jobs.worker.configure_logging'):
300- with patch('sys.argv', ['foo', '--quit-on-empty']):
301- with patch('charmworld.jobs.worker.get_queue',
302- lambda x: queue):
303- with charm_update_environment(charm, index_client):
304- yield
305+ with charm_update_environment(charm, index_client):
306+ yield
307
308
309 class WorkerTest(MongoTestBase):
310
311 def setUp(self):
312 super(WorkerTest, self).setUp()
313- self.queue = MongoQueue(self.db.queue, 'queue')
314- self.queue.put({'charm': 'foo'})
315- self.queue.put({'charm': 'bar'})
316+ self.charm_queue = get_queue(self.db, CHARM_QUEUE)
317+ self.basket_queue = get_queue(self.db, BASKET_QUEUE)
318+ self.charm_queue.put({'charm': 'foo'})
319+ self.charm_queue.put({'charm': 'bar'})
320
321 def tearDown(self):
322- self.queue.clear()
323+ self.charm_queue.clear()
324 super(WorkerTest, self).tearDown()
325
326 def test_basic_running(self):
327@@ -80,10 +87,13 @@
328 SuccessfulRun(),
329 GatheringRun(),
330 )
331- worker = CharmWorker(ingest_jobs, self.queue)
332- worker.run(quit_on_empty=True)
333+ queue_jobs = {
334+ self.charm_queue: ingest_jobs
335+ }
336+ worker = QueueWorker(self.db, queue_jobs)
337+ worker.run()
338
339- self.assertEqual(0, self.queue.size())
340+ self.assertEqual(0, self.charm_queue.size())
341 expected = [
342 {'ingested': True, 'charm': 'foo'},
343 {'ingested': True, 'charm': 'bar'},
344@@ -93,10 +103,13 @@
345
346 def test_failure_while_running(self):
347 ingest_jobs = (FailureRun(), GatheringRun())
348- worker = CharmWorker(ingest_jobs, self.queue)
349- worker.run(quit_on_empty=True)
350+ queue_jobs = {
351+ self.charm_queue: ingest_jobs
352+ }
353+ worker = QueueWorker(self.db, queue_jobs)
354+ worker.run()
355
356- self.assertEqual(0, self.queue.size())
357+ self.assertEqual(0, self.charm_queue.size())
358 expected = [
359 {'ingested': True, 'charm': 'bar'},
360 ]
361@@ -122,40 +135,81 @@
362 SuccessfulRun(),
363 GatheringRun(),
364 )
365- worker = CharmWorker(ingest_jobs, LockedJobQueue)
366- worker.run(quit_on_empty=True)
367+ worker = QueueWorker(self.db, {LockedJobQueue: ingest_jobs})
368+ worker.run()
369
370 def test_ingest_job_contract(self):
371 # All ingest jobs accept the right inputs.
372- ingest_jobs = [create_autospec(job)() for job in DEFAULT_JOBS]
373- worker = CharmWorker(ingest_jobs, self.queue)
374- worker.run(quit_on_empty=True)
375- self.assertEqual(0, self.queue.size())
376+ ingest_jobs = [create_autospec(UpdateCharmJob)()]
377+ worker = QueueWorker(self.db, {self.charm_queue: ingest_jobs})
378+ worker.run()
379+ self.assertEqual(0, self.charm_queue.size())
380
381 def test_interval_is_int(self):
382- worker = CharmWorker([], self.queue)
383+ worker = QueueWorker(self.db, {self.charm_queue: []})
384 self.assertIs(int, type(worker.interval),
385- 'CharmWorker.interval is a %s, not an int.' %
386+ 'QueueWorker.interval is a %s, not an int.' %
387 type(worker.interval))
388
389 def run_main_with_exit(self):
390- with self.assertRaises(SystemExit) as e:
391- main()
392+ with patch('charmworld.jobs.worker.configure_logging'):
393+ with self.assertRaises(SystemExit) as e:
394+ main(['ingest-queued'], db=self.db)
395 return e.exception.code
396
397 def test_elasticsearch_failure(self):
398 payload = factory.get_payload_json()
399- self.queue.clear()
400- self.queue.put(payload)
401+ self.charm_queue.clear()
402+ self.charm_queue.put(payload)
403 charm = factory.get_charm_json(payload=payload)
404 index_client = ElasticSearchClient(
405 ElasticSearch('http://localhost:70'), 'foo')
406 with patch.dict(settings,
407 values={'es_urls': 'http://localhost:70'}):
408- with main_environment(charm, index_client, self.queue):
409+ with main_environment(charm, index_client, self.charm_queue):
410 with logger_buffer('charm.worker') as handler:
411 exit_code = self.run_main_with_exit()
412 self.assertEqual(40, handler.buffer[1].levelno)
413 self.assertIn('Could not connect', handler.buffer[1].msg)
414 self.assertEqual(1, exit_code)
415- self.assertEqual(0, self.queue.size())
416+ self.assertEqual(0, self.charm_queue.size())
417+
418+ def test_job_for_bundles_is_run_if_bundle_queue_has_entries(self):
419+ # Basket queue entries are processed by run_job.
420+ queue = get_queue(self.db, BASKET_QUEUE)
421+ queue_entry = {'bundle': 'foo'}
422+ queue.put(queue_entry)
423+ with patch('charmworld.jobs.worker.run_job') as run_job:
424+ self.run_main_with_exit()
425+ queue_entries_run = [x[0][1] for x in run_job.call_args_list]
426+ self.assertIn(queue_entry, queue_entries_run)
427+
428+ def test_worker_does_not_run_forever_by_default(self):
429+ run_called = []
430+ class FauxWorker:
431+ def __init__(self, *args):
432+ pass
433+
434+ @staticmethod
435+ def run(run_forever):
436+ self.assertFalse(run_forever)
437+ run_called.append(True)
438+
439+ with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker):
440+ self.run_main_with_exit()
441+ self.assertTrue(run_called)
442+
443+ def test_worker_runs_forever_if_command_line_says_to(self):
444+ run_called = []
445+ class FauxWorker:
446+ def __init__(self, *args):
447+ pass
448+
449+ @staticmethod
450+ def run(run_forever):
451+ self.assertFalse(run_forever)
452+ run_called.append(True)
453+
454+ with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker):
455+ self.run_main_with_exit()
456+ self.assertTrue(run_called)
457
458=== modified file 'charmworld/jobs/utils.py'
459--- charmworld/jobs/utils.py 2013-06-10 14:24:39 +0000
460+++ charmworld/jobs/utils.py 2013-07-22 15:53:24 +0000
461@@ -10,19 +10,12 @@
462 from socket import gethostname
463 from time import time
464
465-from charmworld.models import getconnection
466-from charmworld.models import getdb
467-from charmworld.utils import get_ini
468-
469-
470-def get_queue(queue_name):
471+
472+def get_queue(db, queue_name):
473 """Get the required mongoqueues for the provided queue name.
474
475 As some jobs only require one queue, only one queue name is mandatory.
476 """
477- settings = get_ini()
478- connection = getconnection(settings)
479- db = getdb(connection, settings.get('mongo.database'))
480 return MongoQueue(db[queue_name], '%s-queue' % queue_name)
481
482
483
484=== modified file 'charmworld/jobs/worker.py'
485--- charmworld/jobs/worker.py 2013-06-14 21:08:36 +0000
486+++ charmworld/jobs/worker.py 2013-07-22 15:53:24 +0000
487@@ -6,102 +6,93 @@
488 import sys
489 import time
490
491-from charmworld.utils import (
492- configure_logging,
493- get_ini,
494+from charmworld.jobs.config import (
495+ CHARM_QUEUE,
496+ BASKET_QUEUE,
497 )
498-from charmworld.jobs.config import CHARM_QUEUE
499 from charmworld.jobs.ingest import (
500 run_job,
501+ UpdateBundleJob,
502 UpdateCharmJob,
503 )
504 from charmworld.jobs.utils import get_queue
505+from charmworld.models import getconnection
506+from charmworld.models import getdb
507 from charmworld.search import SearchServiceNotAvailable
508-
509-
510-DEFAULT_JOBS = (
511- UpdateCharmJob,
512+from charmworld.utils import (
513+ configure_logging,
514+ get_ini,
515 )
516
517+
518 settings = get_ini()
519 INTERVAL = int(settings.get('worker_interval'))
520
521
522-class CharmWorker(object):
523+class QueueWorker(object):
524 """Processes raw charm data"""
525
526- def __init__(self, ingest_jobs, queue, interval=INTERVAL):
527- self.ingest_jobs = ingest_jobs
528- self.queue = queue
529+ def __init__(self, db, queue_jobs, interval=INTERVAL):
530+ self.db = db
531+ self.queue_jobs = queue_jobs
532 self.log = logging.getLogger('charm.worker')
533 self.interval = interval
534
535- def run(self, quit_on_empty=False):
536+ def run(self, run_forever=False):
537 self.log.info('Starting processing')
538 return_code = 0
539- for job in self.ingest_jobs:
540- job.setup()
541+ for jobs in self.queue_jobs.values():
542+ for job in jobs:
543+ job.setup(db=self.db)
544 while True:
545- item = self.queue.next()
546- if item is None:
547- if quit_on_empty:
548- self.log.info(
549- 'Empty queue with quit_on_empty set. Quitting.')
550- break
551- else:
552- wait_time_in_minutes = self.interval / 60
553- self.log.info(
554- 'Empty queue. Waiting %s minutes to recheck.' %
555- wait_time_in_minutes)
556- time.sleep(self.interval)
557- continue
558- charm_data = item.payload
559- try:
560- for job in self.ingest_jobs:
561+ for queue, jobs in self.queue_jobs.items():
562+ while True:
563+ item = queue.next()
564+ if item is None:
565+ break
566+ charm_data = item.payload
567 try:
568- if not run_job(job, charm_data, needs_setup=False):
569- break
570- except SearchServiceNotAvailable as e:
571- self.log.exception(str(e))
572- self.queue.clear()
573- return_code = 1
574-
575- finally:
576- item.complete()
577+ for job in jobs:
578+ try:
579+ if not run_job(
580+ job, charm_data, needs_setup=False):
581+ break
582+ except SearchServiceNotAvailable as e:
583+ self.log.exception(str(e))
584+ queue.clear()
585+ return_code = 1
586+ finally:
587+ item.complete()
588+
589+ if not run_forever:
590+ self.log.info('Empty queue and not running forever; quitting.')
591+ break
592+
593+ wait_time_in_minutes = self.interval / 60
594+ self.log.info(
595+ 'Empty queue. Waiting %s minutes to recheck.' %
596+ wait_time_in_minutes)
597+ time.sleep(self.interval)
598 return return_code
599
600
601-# XXX j.c.sackett Feb 21 2013 Bug:1111708
602-# Functions not currently bound to jobs; probably need new jobs created for
603-# them outside of ingest worker.
604-#def reindex(db, indexer):
605- #log = logging.getLogger("charm.index")
606- #count = 0
607- #for charm in db.find():
608- #log.debug("Indexing %s", charm['branch_spec'])
609- #try:
610- #index_charm(indexer, charm)
611- #except:
612- #log.error("Indexing charm %s", charm['branch_spec'])
613- #raise
614- #count += 1
615- #log.info("Indexed %d Charms" % count)
616-
617-
618-def main():
619+def main(argv=sys.argv, db=None):
620+ if db is None:
621+ settings = get_ini()
622+ connection = getconnection(settings)
623+ db = getdb(connection, settings.get('mongo.database'))
624 configure_logging()
625 parser = argparse.ArgumentParser()
626 parser.add_argument(
627- "--quit-on-empty",
628- help="Shut down the worker if the queue is empty",
629- action="store_true")
630+ '--run-forever',
631+ help='Keep running after the queues are processed and process more '
632+ 'later.',
633+ action='store_true', dest='run_forever')
634 # Explicitly pass sys.argv to make monkeypatching more reliable.
635- args = parser.parse_args(sys.argv[1:])
636- queue = get_queue(CHARM_QUEUE)
637- jobs = [job() for job in DEFAULT_JOBS]
638- worker = CharmWorker(jobs, queue)
639- sys.exit(worker.run(quit_on_empty=args.quit_on_empty))
640-
641-
642-if __name__ == "__main__":
643- main()
644+ args = parser.parse_args(argv[1:])
645+ queue_jobs = {
646+ get_queue(db, CHARM_QUEUE): [UpdateCharmJob()],
647+ get_queue(db, BASKET_QUEUE): [UpdateBundleJob()],
648+ }
649+ worker = QueueWorker(db, queue_jobs)
650+ sys.exit(worker.run(run_forever=args.run_forever))
651
652=== modified file 'charmworld/models.py'
653--- charmworld/models.py 2013-07-19 12:51:01 +0000
654+++ charmworld/models.py 2013-07-22 15:53:24 +0000
655@@ -7,6 +7,7 @@
656 from calendar import timegm
657 import logging
658 from mimetypes import guess_type
659+import os
660 from os.path import dirname
661 from os.path import join
662 from os.path import normpath
663@@ -36,7 +37,7 @@
664 :param settings: dict of application settings including the mongo.X info.
665
666 """
667- url_or_host = settings.get('mongo.url')
668+ url_or_host = os.environ.get('TEST_MONGODB', settings.get('mongo.url'))
669 if not url_or_host:
670 url_or_host = settings.get('mongo.host')
671 port = int(settings.get('mongo.port'))
672
673=== modified file 'charmworld/teams.py'
674--- charmworld/teams.py 2013-01-04 16:42:25 +0000
675+++ charmworld/teams.py 2013-07-22 15:53:24 +0000
676@@ -65,8 +65,10 @@
677 @since: 2.1.1
678 """
679
680-from openid.message import registerNamespaceAlias, \
681- NamespaceAliasRegistrationError
682+from openid.message import (
683+ registerNamespaceAlias,
684+ NamespaceAliasRegistrationError,
685+)
686 from openid.extension import Extension
687 from openid import oidutil
688
689
690=== modified file 'charmworld/tests/test_models.py'
691--- charmworld/tests/test_models.py 2013-07-19 12:51:01 +0000
692+++ charmworld/tests/test_models.py 2013-07-22 15:53:24 +0000
693@@ -1192,6 +1192,7 @@
694 class TestStoreBundles(MongoTestBase):
695
696 def test_store_bundles(self):
697+ # The bundles can be stored in the database.
698 deployer_config = dedent("""\
699 wordpress-stage:
700 series: precise
701
702=== modified file 'docs/index.rst'
703--- docs/index.rst 2013-07-15 20:55:10 +0000
704+++ docs/index.rst 2013-07-22 15:53:24 +0000
705@@ -59,7 +59,7 @@
706 ::
707
708 $ bin/enqueue
709- $ bin/ingest-queued --quit-on-empty
710+ $ bin/ingest-queued
711
712 Output verbosity is controlled by the logging config in charmworld.ini
713
714
715=== removed file 'scripts/worker'
716--- scripts/worker 2013-02-21 19:14:36 +0000
717+++ scripts/worker 1970-01-01 00:00:00 +0000
718@@ -1,16 +0,0 @@
719-#!/bin/bash
720-
721-# Copyright 2012, 2013 Marco Ceppi, Canonical Ltd. This software is
722-# licensed under the GNU Affero General Public License version 3 (see
723-# the file LICENSE).
724-
725-set -e
726-
727-SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
728-PROJECT_DIR="$( dirname $SCRIPT_DIR )"
729-PYTHON_BIN=$PROJECT_DIR/bin/python
730-: ${LOGDIR:="$( dirname $PROJECT_DIR )/var"}
731-JOBDIR=$PROJECT_DIR/charmworld/jobs
732-
733-echo "Starting worker..."
734-$PYTHON_BIN $JOBDIR/worker.py --quit-on-empty &>> $LOGDIR/ingest-worker.log

Subscribers

People subscribed via source and target branches