Merge lp:~benji/charmworld/even-more-bundles into lp:~juju-jitsu/charmworld/trunk
- even-more-bundles
- Merge into trunk
Proposed by
Benji York
Status: | Merged |
---|---|
Approved by: | Benji York |
Approved revision: | 332 |
Merged at revision: | 320 |
Proposed branch: | lp:~benji/charmworld/even-more-bundles |
Merge into: | lp:~juju-jitsu/charmworld/trunk |
Diff against target: |
734 lines (+233/-163) 13 files modified
charmworld/jobs/ingest.py (+11/-4) charmworld/jobs/lp.py (+4/-3) charmworld/jobs/supervisord.conf (+6/-8) charmworld/jobs/tests/test_ingest.py (+44/-6) charmworld/jobs/tests/test_lp.py (+5/-5) charmworld/jobs/tests/test_worker.py (+92/-38) charmworld/jobs/utils.py (+2/-9) charmworld/jobs/worker.py (+61/-70) charmworld/models.py (+2/-1) charmworld/teams.py (+4/-2) charmworld/tests/test_models.py (+1/-0) docs/index.rst (+1/-1) scripts/worker (+0/-16) |
To merge this branch: | bzr merge lp:~benji/charmworld/even-more-bundles |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Aaron Bentley (community) | Approve | ||
Review via email: mp+176227@code.launchpad.net |
Commit message
Make the queue worker process the basket queue.
Description of the change
Generalize the queue worker to process the basket queue as well as the
charm queue.
To post a comment you must log in.
- 331. By Benji York
-
tweak import style
- 332. By Benji York
-
more import style tweaking
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'charmworld/jobs/ingest.py' |
2 | --- charmworld/jobs/ingest.py 2013-07-18 21:49:30 +0000 |
3 | +++ charmworld/jobs/ingest.py 2013-07-22 15:53:24 +0000 |
4 | @@ -41,6 +41,7 @@ |
5 | getdb, |
6 | getfs, |
7 | options_to_storage, |
8 | + store_bundles, |
9 | ) |
10 | from charmworld.search import ( |
11 | ElasticSearchClient, |
12 | @@ -277,6 +278,7 @@ |
13 | fs = getfs(self.db, collection='hashed-files') |
14 | self.decorate_basket(payload, fs) |
15 | self.db.baskets.save(payload) |
16 | + self.store_bundles(fs, payload, self.db.bundles) |
17 | |
18 | @staticmethod |
19 | def construct_id(basket_data, revno): |
20 | @@ -290,8 +292,13 @@ |
21 | tree = branch.repository.revision_tree(basket_data['commit']) |
22 | basket_data['_id'] = self.construct_id(basket_data, revno) |
23 | basket_data['file_hashes'] = slurp_files(fs, tree) |
24 | - #Basket(basket_data, fs).get_file_contents('') |
25 | - #basket = get_basket(fs, basket_data['file_hashes']) |
26 | + |
27 | + @staticmethod |
28 | + def store_bundles(fs, basket_data, collection): |
29 | + hashes = basket_data['file_hashes'] |
30 | + deployer_config_bytes = fs.get(hashes['bundles.yaml']) |
31 | + deployer_config = yaml.safe_load(deployer_config_bytes) |
32 | + store_bundles(collection, deployer_config, basket_data['_id']) |
33 | |
34 | |
35 | def slurp_files(fs, tree): |
36 | @@ -529,7 +536,7 @@ |
37 | cfile = CharmFileSet.get_by_id( |
38 | fs, files[metadata_file]['fileid']) |
39 | try: |
40 | - metadata = quote_yaml(yaml.load(cfile.read())) |
41 | + metadata = quote_yaml(yaml.safe_load(cfile.read())) |
42 | except Exception, exc: |
43 | raise IngestError( |
44 | 'Invalid charm metadata %s: %s' % ( |
45 | @@ -543,7 +550,7 @@ |
46 | config_raw = cfile.read() |
47 | |
48 | try: |
49 | - config_yaml = yaml.load(config_raw) |
50 | + config_yaml = yaml.safe_load(config_raw) |
51 | if 'options' in config_yaml: |
52 | config_yaml['options'] = options_to_storage( |
53 | config_yaml['options']) |
54 | |
55 | === modified file 'charmworld/jobs/lp.py' |
56 | --- charmworld/jobs/lp.py 2013-07-19 12:51:01 +0000 |
57 | +++ charmworld/jobs/lp.py 2013-07-22 15:53:24 +0000 |
58 | @@ -131,7 +131,7 @@ |
59 | try: |
60 | lease_time = int(settings['script_lease_time']) * 60 |
61 | with lock('ingest-queue', lease_time, db, log): |
62 | - queue = get_queue(CHARM_QUEUE) |
63 | + queue = get_queue(db, CHARM_QUEUE) |
64 | queue_size = queue.size() |
65 | queue.clear() |
66 | log.info("Dequeued %s charms", queue_size) |
67 | @@ -139,6 +139,7 @@ |
68 | log.warn(str(error)) |
69 | |
70 | |
71 | +# XXX bug=1130732: This function does not appear to be tested. |
72 | def main(): |
73 | configure_logging() |
74 | parser = argparse.ArgumentParser() |
75 | @@ -164,8 +165,8 @@ |
76 | try: |
77 | with lock('ingest-queue', int(settings['script_lease_time']) * 60, |
78 | db, log): |
79 | - charm_queue = get_queue(CHARM_QUEUE) |
80 | - basket_queue = get_queue(BASKET_QUEUE) |
81 | + charm_queue = get_queue(db, CHARM_QUEUE) |
82 | + basket_queue = get_queue(db, BASKET_QUEUE) |
83 | queue_from_branches(db, charm_queue, basket_queue, |
84 | charm_import_limit, args.prefix) |
85 | except LockHeld, error: |
86 | |
87 | === modified file 'charmworld/jobs/supervisord.conf' |
88 | --- charmworld/jobs/supervisord.conf 2013-04-01 15:07:06 +0000 |
89 | +++ charmworld/jobs/supervisord.conf 2013-07-22 15:53:24 +0000 |
90 | @@ -29,11 +29,9 @@ |
91 | ; supervisor. |
92 | |
93 | [program:ingest] |
94 | -command=bin/python charmworld/jobs/worker.py ; the program (relative uses PATH, can take args) |
95 | -numprocs=1 ; number of processes copies to start (def 1) |
96 | -;directory=/tmp ; directory to cwd to before exec (def no cwd) |
97 | -;autostart=true ; start at supervisord start (default: true) |
98 | -autorestart=unexpected ; whether/when to restart (default: unexpected) |
99 | -startsecs=1 ; number of secs prog must stay running (def. 1) |
100 | -startretries=3 ; max # of serial start failures (default 3) |
101 | -exitcodes=0,2 ; 'expected' exit codes for process (default 0,2) |
102 | +command=bin/ingest-queued --run-forever |
103 | +numprocs=1 |
104 | +autorestart=unexpected |
105 | +startsecs=1 |
106 | +startretries=3 |
107 | +exitcodes=0,2 |
108 | |
109 | === modified file 'charmworld/jobs/tests/test_ingest.py' |
110 | --- charmworld/jobs/tests/test_ingest.py 2013-07-18 21:49:30 +0000 |
111 | +++ charmworld/jobs/tests/test_ingest.py 2013-07-22 15:53:24 +0000 |
112 | @@ -11,6 +11,7 @@ |
113 | import os.path |
114 | import shutil |
115 | import subprocess |
116 | +from textwrap import dedent |
117 | |
118 | import bzrlib |
119 | from bzrlib.bzrdir import BzrDir |
120 | @@ -237,12 +238,13 @@ |
121 | side_effect=checkout_charm_mock |
122 | ) as checkout_branch: |
123 | with bzr_isolation() as working_dir: |
124 | - try: |
125 | - yield checkout_branch, working_dir |
126 | - finally: |
127 | - # Clean up despite exceptions raised by the context manager |
128 | - # client. |
129 | - shutil.rmtree(CHARM_DIR, ignore_errors=True) |
130 | + with patch('charmworld.jobs.ingest.UpdateBundleJob.store_bundles'): |
131 | + try: |
132 | + yield checkout_branch, working_dir |
133 | + finally: |
134 | + # Clean up despite exceptions raised by the context |
135 | + # manager client. |
136 | + shutil.rmtree(CHARM_DIR, ignore_errors=True) |
137 | |
138 | # TODO complete refactoring of checkout_branch, et al. |
139 | |
140 | @@ -368,6 +370,42 @@ |
141 | basket_id = UpdateBundleJob.construct_id(payload, 5) |
142 | self.assertEqual('~charmers/mysql-5', basket_id) |
143 | |
144 | + def test_job_can_store_bundles_in_the_db(self): |
145 | + basket_data = factory.get_payload_json(name='wordpress') |
146 | + basket_id = UpdateBundleJob.construct_id(basket_data, 5) |
147 | + bundle_id = '%s/%s' % (basket_id, 'stage') |
148 | + basket_data['_id'] = basket_id |
149 | + job = UpdateBundleJob() |
150 | + job.setup(self.db) |
151 | + DEPLOYER_CONFIG_HASH = '31337' |
152 | + basket_data['file_hashes'] = { |
153 | + 'bundles.yaml': DEPLOYER_CONFIG_HASH, |
154 | + } |
155 | + fs = getfs(self.db) |
156 | + deployer_config = dedent("""\ |
157 | + stage: |
158 | + series: precise |
159 | + services: |
160 | + blog: |
161 | + charm: cs:precise/wordpress |
162 | + constraints: mem=2 |
163 | + options: |
164 | + tuning: optimized |
165 | + engine: apache |
166 | + """) |
167 | + fs.put(deployer_config, _id=DEPLOYER_CONFIG_HASH) |
168 | + job.store_bundles(fs, basket_data, self.db.bundles) |
169 | + self.assertIsNotNone(self.db.bundles.find_one(bundle_id)) |
170 | + |
171 | + def test_job_run_stores_bundles_in_the_db(self): |
172 | + basket_data = factory.get_payload_json(name='wordpress') |
173 | + job = UpdateBundleJob() |
174 | + job.setup(self.db) |
175 | + with patch.object(job, 'decorate_basket'): |
176 | + with patch.object(job, 'store_bundles') as store_bundles: |
177 | + job.run(basket_data) |
178 | + self.assertTrue(store_bundles.called) |
179 | + |
180 | |
181 | # TODO add bundles to index |
182 | |
183 | |
184 | === modified file 'charmworld/jobs/tests/test_lp.py' |
185 | --- charmworld/jobs/tests/test_lp.py 2013-07-19 12:51:01 +0000 |
186 | +++ charmworld/jobs/tests/test_lp.py 2013-07-22 15:53:24 +0000 |
187 | @@ -258,8 +258,8 @@ |
188 | |
189 | @patch('charmworld.jobs.lp.all_branch_data', all_branch_data_mock) |
190 | def test_queue_from_branches(self): |
191 | - charm_queue = get_queue('test_charm_queue') |
192 | - basket_queue = get_queue('test_basket_queue') |
193 | + charm_queue = get_queue(self.db, 'test_charm_queue') |
194 | + basket_queue = get_queue(self.db, 'test_basket_queue') |
195 | self.addCleanup(basket_queue.clear) |
196 | queue_from_branches(self.db, charm_queue, basket_queue) |
197 | item = charm_queue.next() |
198 | @@ -269,7 +269,7 @@ |
199 | self.assertEqual({'foo': 'bar'}, item.payload) |
200 | |
201 | def test_dequeue(self): |
202 | - queue = get_queue('test_queue') |
203 | + queue = get_queue(self.db, 'test_queue') |
204 | self.addCleanup(queue.clear) |
205 | charm_data = factory.get_charm_json() |
206 | queue.put(charm_data) |
207 | @@ -281,7 +281,7 @@ |
208 | with patch(lp_get_queue, return_value=queue) as gq: |
209 | dequeue() |
210 | getdb.assert_called_with(ANY, MONGO_DATABASE) |
211 | - gq.assert_called_with(CHARM_QUEUE) |
212 | + gq.assert_called_with(self.db, CHARM_QUEUE) |
213 | self.assertEqual(0, queue.size()) |
214 | |
215 | @patch('requests.get', failing_requests_get) |
216 | @@ -290,7 +290,7 @@ |
217 | # charm branches, no ingest jobs are queued. Otherwise, the |
218 | # promulgated status may be incorrectly reset for promulgated |
219 | # charms that are not returned by all_branch_data(). |
220 | - out_queue = get_queue('test_queue') |
221 | + out_queue = get_queue(self.db, 'test_queue') |
222 | factory.makeCharm(self.db) |
223 | self.assertRaises( |
224 | ConnectionError, queue_from_branches, self.db, |
225 | |
226 | === modified file 'charmworld/jobs/tests/test_worker.py' |
227 | --- charmworld/jobs/tests/test_worker.py 2013-07-17 20:40:55 +0000 |
228 | +++ charmworld/jobs/tests/test_worker.py 2013-07-22 15:53:24 +0000 |
229 | @@ -1,9 +1,10 @@ |
230 | # Copyright 2013 Canonical Ltd. This software is licensed under the |
231 | # GNU Affero General Public License version 3 (see the file LICENSE). |
232 | |
233 | +__metaclass__ = type |
234 | + |
235 | from contextlib import contextmanager |
236 | |
237 | -from mongoqueue import MongoQueue |
238 | from mock import ( |
239 | create_autospec, |
240 | patch, |
241 | @@ -11,14 +12,21 @@ |
242 | from pyelasticsearch import ElasticSearch |
243 | |
244 | from charmworld.testing import logger_buffer |
245 | -from charmworld.jobs.config import settings |
246 | +from charmworld.jobs.config import ( |
247 | + CHARM_QUEUE, |
248 | + BASKET_QUEUE, |
249 | + settings, |
250 | +) |
251 | from charmworld.jobs.worker import ( |
252 | - CharmWorker, |
253 | - DEFAULT_JOBS, |
254 | + QueueWorker, |
255 | main, |
256 | ) |
257 | -from charmworld.jobs.ingest import IngestJob |
258 | +from charmworld.jobs.ingest import ( |
259 | + DBIngestJob, |
260 | + UpdateCharmJob, |
261 | +) |
262 | from charmworld.jobs.tests.test_ingest import charm_update_environment |
263 | +from charmworld.jobs.utils import get_queue |
264 | from charmworld.search import ElasticSearchClient |
265 | from charmworld.testing import ( |
266 | factory, |
267 | @@ -26,13 +34,13 @@ |
268 | ) |
269 | |
270 | |
271 | -class SuccessfulRun(IngestJob): |
272 | +class SuccessfulRun(DBIngestJob): |
273 | |
274 | def run(self, charm_data): |
275 | charm_data['ingested'] = True |
276 | |
277 | |
278 | -class FailureRun(IngestJob): |
279 | +class FailureRun(DBIngestJob): |
280 | |
281 | def run(self, charm_data): |
282 | if charm_data['charm'] == 'foo': |
283 | @@ -41,9 +49,10 @@ |
284 | charm_data['ingested'] = True |
285 | |
286 | |
287 | -class GatheringRun(IngestJob): |
288 | +class GatheringRun(DBIngestJob): |
289 | |
290 | - def setup(self): |
291 | + def setup(self, db=None): |
292 | + super(GatheringRun, self).setup(db) |
293 | self.charms = [] |
294 | |
295 | def run(self, charm_data): |
296 | @@ -54,23 +63,21 @@ |
297 | def main_environment(charm, index_client, queue): |
298 | """An environment in which worker.main() can be run.""" |
299 | with patch('charmworld.jobs.worker.configure_logging'): |
300 | - with patch('sys.argv', ['foo', '--quit-on-empty']): |
301 | - with patch('charmworld.jobs.worker.get_queue', |
302 | - lambda x: queue): |
303 | - with charm_update_environment(charm, index_client): |
304 | - yield |
305 | + with charm_update_environment(charm, index_client): |
306 | + yield |
307 | |
308 | |
309 | class WorkerTest(MongoTestBase): |
310 | |
311 | def setUp(self): |
312 | super(WorkerTest, self).setUp() |
313 | - self.queue = MongoQueue(self.db.queue, 'queue') |
314 | - self.queue.put({'charm': 'foo'}) |
315 | - self.queue.put({'charm': 'bar'}) |
316 | + self.charm_queue = get_queue(self.db, CHARM_QUEUE) |
317 | + self.basket_queue = get_queue(self.db, BASKET_QUEUE) |
318 | + self.charm_queue.put({'charm': 'foo'}) |
319 | + self.charm_queue.put({'charm': 'bar'}) |
320 | |
321 | def tearDown(self): |
322 | - self.queue.clear() |
323 | + self.charm_queue.clear() |
324 | super(WorkerTest, self).tearDown() |
325 | |
326 | def test_basic_running(self): |
327 | @@ -80,10 +87,13 @@ |
328 | SuccessfulRun(), |
329 | GatheringRun(), |
330 | ) |
331 | - worker = CharmWorker(ingest_jobs, self.queue) |
332 | - worker.run(quit_on_empty=True) |
333 | + queue_jobs = { |
334 | + self.charm_queue: ingest_jobs |
335 | + } |
336 | + worker = QueueWorker(self.db, queue_jobs) |
337 | + worker.run() |
338 | |
339 | - self.assertEqual(0, self.queue.size()) |
340 | + self.assertEqual(0, self.charm_queue.size()) |
341 | expected = [ |
342 | {'ingested': True, 'charm': 'foo'}, |
343 | {'ingested': True, 'charm': 'bar'}, |
344 | @@ -93,10 +103,13 @@ |
345 | |
346 | def test_failure_while_running(self): |
347 | ingest_jobs = (FailureRun(), GatheringRun()) |
348 | - worker = CharmWorker(ingest_jobs, self.queue) |
349 | - worker.run(quit_on_empty=True) |
350 | + queue_jobs = { |
351 | + self.charm_queue: ingest_jobs |
352 | + } |
353 | + worker = QueueWorker(self.db, queue_jobs) |
354 | + worker.run() |
355 | |
356 | - self.assertEqual(0, self.queue.size()) |
357 | + self.assertEqual(0, self.charm_queue.size()) |
358 | expected = [ |
359 | {'ingested': True, 'charm': 'bar'}, |
360 | ] |
361 | @@ -122,40 +135,81 @@ |
362 | SuccessfulRun(), |
363 | GatheringRun(), |
364 | ) |
365 | - worker = CharmWorker(ingest_jobs, LockedJobQueue) |
366 | - worker.run(quit_on_empty=True) |
367 | + worker = QueueWorker(self.db, {LockedJobQueue: ingest_jobs}) |
368 | + worker.run() |
369 | |
370 | def test_ingest_job_contract(self): |
371 | # All ingest jobs accept the right inputs. |
372 | - ingest_jobs = [create_autospec(job)() for job in DEFAULT_JOBS] |
373 | - worker = CharmWorker(ingest_jobs, self.queue) |
374 | - worker.run(quit_on_empty=True) |
375 | - self.assertEqual(0, self.queue.size()) |
376 | + ingest_jobs = [create_autospec(UpdateCharmJob)()] |
377 | + worker = QueueWorker(self.db, {self.charm_queue: ingest_jobs}) |
378 | + worker.run() |
379 | + self.assertEqual(0, self.charm_queue.size()) |
380 | |
381 | def test_interval_is_int(self): |
382 | - worker = CharmWorker([], self.queue) |
383 | + worker = QueueWorker(self.db, {self.charm_queue: []}) |
384 | self.assertIs(int, type(worker.interval), |
385 | - 'CharmWorker.interval is a %s, not an int.' % |
386 | + 'QueueWorker.interval is a %s, not an int.' % |
387 | type(worker.interval)) |
388 | |
389 | def run_main_with_exit(self): |
390 | - with self.assertRaises(SystemExit) as e: |
391 | - main() |
392 | + with patch('charmworld.jobs.worker.configure_logging'): |
393 | + with self.assertRaises(SystemExit) as e: |
394 | + main(['ingest-queued'], db=self.db) |
395 | return e.exception.code |
396 | |
397 | def test_elasticsearch_failure(self): |
398 | payload = factory.get_payload_json() |
399 | - self.queue.clear() |
400 | - self.queue.put(payload) |
401 | + self.charm_queue.clear() |
402 | + self.charm_queue.put(payload) |
403 | charm = factory.get_charm_json(payload=payload) |
404 | index_client = ElasticSearchClient( |
405 | ElasticSearch('http://localhost:70'), 'foo') |
406 | with patch.dict(settings, |
407 | values={'es_urls': 'http://localhost:70'}): |
408 | - with main_environment(charm, index_client, self.queue): |
409 | + with main_environment(charm, index_client, self.charm_queue): |
410 | with logger_buffer('charm.worker') as handler: |
411 | exit_code = self.run_main_with_exit() |
412 | self.assertEqual(40, handler.buffer[1].levelno) |
413 | self.assertIn('Could not connect', handler.buffer[1].msg) |
414 | self.assertEqual(1, exit_code) |
415 | - self.assertEqual(0, self.queue.size()) |
416 | + self.assertEqual(0, self.charm_queue.size()) |
417 | + |
418 | + def test_job_for_bundles_is_run_if_bundle_queue_has_entries(self): |
419 | + # Basket queue entries are processed by run_job. |
420 | + queue = get_queue(self.db, BASKET_QUEUE) |
421 | + queue_entry = {'bundle': 'foo'} |
422 | + queue.put(queue_entry) |
423 | + with patch('charmworld.jobs.worker.run_job') as run_job: |
424 | + self.run_main_with_exit() |
425 | + queue_entries_run = [x[0][1] for x in run_job.call_args_list] |
426 | + self.assertIn(queue_entry, queue_entries_run) |
427 | + |
428 | + def test_worker_does_not_run_forever_by_default(self): |
429 | + run_called = [] |
430 | + class FauxWorker: |
431 | + def __init__(self, *args): |
432 | + pass |
433 | + |
434 | + @staticmethod |
435 | + def run(run_forever): |
436 | + self.assertFalse(run_forever) |
437 | + run_called.append(True) |
438 | + |
439 | + with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker): |
440 | + self.run_main_with_exit() |
441 | + self.assertTrue(run_called) |
442 | + |
443 | + def test_worker_runs_forever_if_command_line_says_to(self): |
444 | + run_called = [] |
445 | + class FauxWorker: |
446 | + def __init__(self, *args): |
447 | + pass |
448 | + |
449 | + @staticmethod |
450 | + def run(run_forever): |
451 | + self.assertFalse(run_forever) |
452 | + run_called.append(True) |
453 | + |
454 | + with patch('charmworld.jobs.worker.QueueWorker', new=FauxWorker): |
455 | + self.run_main_with_exit() |
456 | + self.assertTrue(run_called) |
457 | |
458 | === modified file 'charmworld/jobs/utils.py' |
459 | --- charmworld/jobs/utils.py 2013-06-10 14:24:39 +0000 |
460 | +++ charmworld/jobs/utils.py 2013-07-22 15:53:24 +0000 |
461 | @@ -10,19 +10,12 @@ |
462 | from socket import gethostname |
463 | from time import time |
464 | |
465 | -from charmworld.models import getconnection |
466 | -from charmworld.models import getdb |
467 | -from charmworld.utils import get_ini |
468 | - |
469 | - |
470 | -def get_queue(queue_name): |
471 | + |
472 | +def get_queue(db, queue_name): |
473 | """Get the required mongoqueues for the provided queue name. |
474 | |
475 | As some jobs only require one queue, only one queue name is mandatory. |
476 | """ |
477 | - settings = get_ini() |
478 | - connection = getconnection(settings) |
479 | - db = getdb(connection, settings.get('mongo.database')) |
480 | return MongoQueue(db[queue_name], '%s-queue' % queue_name) |
481 | |
482 | |
483 | |
484 | === modified file 'charmworld/jobs/worker.py' |
485 | --- charmworld/jobs/worker.py 2013-06-14 21:08:36 +0000 |
486 | +++ charmworld/jobs/worker.py 2013-07-22 15:53:24 +0000 |
487 | @@ -6,102 +6,93 @@ |
488 | import sys |
489 | import time |
490 | |
491 | -from charmworld.utils import ( |
492 | - configure_logging, |
493 | - get_ini, |
494 | +from charmworld.jobs.config import ( |
495 | + CHARM_QUEUE, |
496 | + BASKET_QUEUE, |
497 | ) |
498 | -from charmworld.jobs.config import CHARM_QUEUE |
499 | from charmworld.jobs.ingest import ( |
500 | run_job, |
501 | + UpdateBundleJob, |
502 | UpdateCharmJob, |
503 | ) |
504 | from charmworld.jobs.utils import get_queue |
505 | +from charmworld.models import getconnection |
506 | +from charmworld.models import getdb |
507 | from charmworld.search import SearchServiceNotAvailable |
508 | - |
509 | - |
510 | -DEFAULT_JOBS = ( |
511 | - UpdateCharmJob, |
512 | +from charmworld.utils import ( |
513 | + configure_logging, |
514 | + get_ini, |
515 | ) |
516 | |
517 | + |
518 | settings = get_ini() |
519 | INTERVAL = int(settings.get('worker_interval')) |
520 | |
521 | |
522 | -class CharmWorker(object): |
523 | +class QueueWorker(object): |
524 | """Processes raw charm data""" |
525 | |
526 | - def __init__(self, ingest_jobs, queue, interval=INTERVAL): |
527 | - self.ingest_jobs = ingest_jobs |
528 | - self.queue = queue |
529 | + def __init__(self, db, queue_jobs, interval=INTERVAL): |
530 | + self.db = db |
531 | + self.queue_jobs = queue_jobs |
532 | self.log = logging.getLogger('charm.worker') |
533 | self.interval = interval |
534 | |
535 | - def run(self, quit_on_empty=False): |
536 | + def run(self, run_forever=False): |
537 | self.log.info('Starting processing') |
538 | return_code = 0 |
539 | - for job in self.ingest_jobs: |
540 | - job.setup() |
541 | + for jobs in self.queue_jobs.values(): |
542 | + for job in jobs: |
543 | + job.setup(db=self.db) |
544 | while True: |
545 | - item = self.queue.next() |
546 | - if item is None: |
547 | - if quit_on_empty: |
548 | - self.log.info( |
549 | - 'Empty queue with quit_on_empty set. Quitting.') |
550 | - break |
551 | - else: |
552 | - wait_time_in_minutes = self.interval / 60 |
553 | - self.log.info( |
554 | - 'Empty queue. Waiting %s minutes to recheck.' % |
555 | - wait_time_in_minutes) |
556 | - time.sleep(self.interval) |
557 | - continue |
558 | - charm_data = item.payload |
559 | - try: |
560 | - for job in self.ingest_jobs: |
561 | + for queue, jobs in self.queue_jobs.items(): |
562 | + while True: |
563 | + item = queue.next() |
564 | + if item is None: |
565 | + break |
566 | + charm_data = item.payload |
567 | try: |
568 | - if not run_job(job, charm_data, needs_setup=False): |
569 | - break |
570 | - except SearchServiceNotAvailable as e: |
571 | - self.log.exception(str(e)) |
572 | - self.queue.clear() |
573 | - return_code = 1 |
574 | - |
575 | - finally: |
576 | - item.complete() |
577 | + for job in jobs: |
578 | + try: |
579 | + if not run_job( |
580 | + job, charm_data, needs_setup=False): |
581 | + break |
582 | + except SearchServiceNotAvailable as e: |
583 | + self.log.exception(str(e)) |
584 | + queue.clear() |
585 | + return_code = 1 |
586 | + finally: |
587 | + item.complete() |
588 | + |
589 | + if not run_forever: |
590 | + self.log.info('Empty queue and not running forever; quitting.') |
591 | + break |
592 | + |
593 | + wait_time_in_minutes = self.interval / 60 |
594 | + self.log.info( |
595 | + 'Empty queue. Waiting %s minutes to recheck.' % |
596 | + wait_time_in_minutes) |
597 | + time.sleep(self.interval) |
598 | return return_code |
599 | |
600 | |
601 | -# XXX j.c.sackett Feb 21 2013 Bug:1111708 |
602 | -# Functions not currently bound to jobs; probably need new jobs created for |
603 | -# them outside of ingest worker. |
604 | -#def reindex(db, indexer): |
605 | - #log = logging.getLogger("charm.index") |
606 | - #count = 0 |
607 | - #for charm in db.find(): |
608 | - #log.debug("Indexing %s", charm['branch_spec']) |
609 | - #try: |
610 | - #index_charm(indexer, charm) |
611 | - #except: |
612 | - #log.error("Indexing charm %s", charm['branch_spec']) |
613 | - #raise |
614 | - #count += 1 |
615 | - #log.info("Indexed %d Charms" % count) |
616 | - |
617 | - |
618 | -def main(): |
619 | +def main(argv=sys.argv, db=None): |
620 | + if db is None: |
621 | + settings = get_ini() |
622 | + connection = getconnection(settings) |
623 | + db = getdb(connection, settings.get('mongo.database')) |
624 | configure_logging() |
625 | parser = argparse.ArgumentParser() |
626 | parser.add_argument( |
627 | - "--quit-on-empty", |
628 | - help="Shut down the worker if the queue is empty", |
629 | - action="store_true") |
630 | + '--run-forever', |
631 | + help='Keep running after the queues are processed and process more ' |
632 | + 'later.', |
633 | + action='store_true', dest='run_forever') |
634 | # Explicitly pass sys.argv to make monkeypatching more reliable. |
635 | - args = parser.parse_args(sys.argv[1:]) |
636 | - queue = get_queue(CHARM_QUEUE) |
637 | - jobs = [job() for job in DEFAULT_JOBS] |
638 | - worker = CharmWorker(jobs, queue) |
639 | - sys.exit(worker.run(quit_on_empty=args.quit_on_empty)) |
640 | - |
641 | - |
642 | -if __name__ == "__main__": |
643 | - main() |
644 | + args = parser.parse_args(argv[1:]) |
645 | + queue_jobs = { |
646 | + get_queue(db, CHARM_QUEUE): [UpdateCharmJob()], |
647 | + get_queue(db, BASKET_QUEUE): [UpdateBundleJob()], |
648 | + } |
649 | + worker = QueueWorker(db, queue_jobs) |
650 | + sys.exit(worker.run(run_forever=args.run_forever)) |
651 | |
652 | === modified file 'charmworld/models.py' |
653 | --- charmworld/models.py 2013-07-19 12:51:01 +0000 |
654 | +++ charmworld/models.py 2013-07-22 15:53:24 +0000 |
655 | @@ -7,6 +7,7 @@ |
656 | from calendar import timegm |
657 | import logging |
658 | from mimetypes import guess_type |
659 | +import os |
660 | from os.path import dirname |
661 | from os.path import join |
662 | from os.path import normpath |
663 | @@ -36,7 +37,7 @@ |
664 | :param settings: dict of application settings including the mongo.X info. |
665 | |
666 | """ |
667 | - url_or_host = settings.get('mongo.url') |
668 | + url_or_host = os.environ.get('TEST_MONGODB', settings.get('mongo.url')) |
669 | if not url_or_host: |
670 | url_or_host = settings.get('mongo.host') |
671 | port = int(settings.get('mongo.port')) |
672 | |
673 | === modified file 'charmworld/teams.py' |
674 | --- charmworld/teams.py 2013-01-04 16:42:25 +0000 |
675 | +++ charmworld/teams.py 2013-07-22 15:53:24 +0000 |
676 | @@ -65,8 +65,10 @@ |
677 | @since: 2.1.1 |
678 | """ |
679 | |
680 | -from openid.message import registerNamespaceAlias, \ |
681 | - NamespaceAliasRegistrationError |
682 | +from openid.message import ( |
683 | + registerNamespaceAlias, |
684 | + NamespaceAliasRegistrationError, |
685 | +) |
686 | from openid.extension import Extension |
687 | from openid import oidutil |
688 | |
689 | |
690 | === modified file 'charmworld/tests/test_models.py' |
691 | --- charmworld/tests/test_models.py 2013-07-19 12:51:01 +0000 |
692 | +++ charmworld/tests/test_models.py 2013-07-22 15:53:24 +0000 |
693 | @@ -1192,6 +1192,7 @@ |
694 | class TestStoreBundles(MongoTestBase): |
695 | |
696 | def test_store_bundles(self): |
697 | + # The bundles can be stored in the database. |
698 | deployer_config = dedent("""\ |
699 | wordpress-stage: |
700 | series: precise |
701 | |
702 | === modified file 'docs/index.rst' |
703 | --- docs/index.rst 2013-07-15 20:55:10 +0000 |
704 | +++ docs/index.rst 2013-07-22 15:53:24 +0000 |
705 | @@ -59,7 +59,7 @@ |
706 | :: |
707 | |
708 | $ bin/enqueue |
709 | - $ bin/ingest-queued --quit-on-empty |
710 | + $ bin/ingest-queued |
711 | |
712 | Output verbosity is controlled by the logging config in charmworld.ini |
713 | |
714 | |
715 | === removed file 'scripts/worker' |
716 | --- scripts/worker 2013-02-21 19:14:36 +0000 |
717 | +++ scripts/worker 1970-01-01 00:00:00 +0000 |
718 | @@ -1,16 +0,0 @@ |
719 | -#!/bin/bash |
720 | - |
721 | -# Copyright 2012, 2013 Marco Ceppi, Canonical Ltd. This software is |
722 | -# licensed under the GNU Affero General Public License version 3 (see |
723 | -# the file LICENSE). |
724 | - |
725 | -set -e |
726 | - |
727 | -SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" |
728 | -PROJECT_DIR="$( dirname $SCRIPT_DIR )" |
729 | -PYTHON_BIN=$PROJECT_DIR/bin/python |
730 | -: ${LOGDIR:="$( dirname $PROJECT_DIR )/var"} |
731 | -JOBDIR=$PROJECT_DIR/charmworld/jobs |
732 | - |
733 | -echo "Starting worker..." |
734 | -$PYTHON_BIN $JOBDIR/worker.py --quit-on-empty &>> $LOGDIR/ingest-worker.log |
This all looks good. The only thing I see is a style issue. In several places, you have multiple imports on a single line, e.g. "from charmworld. jobs.config import CHARM_QUEUE, BASKET_QUEUE, settings". Charmworld follows Launchpad style, and puts these on multiple lines to reduce merge conflicts. e.g.:
from charmworld. jobs.config import (
BASKET_QUEUE,
CHARM_QUEUE,
settings,
)