Merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-upload-to-glance into lp:core-image-publisher
- trunk-upload-to-glance
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Thomi Richards |
Approved revision: | 22 |
Merged at revision: | 13 |
Proposed branch: | lp:~thomir-deactivatedaccount/core-image-publisher/trunk-upload-to-glance |
Merge into: | lp:core-image-publisher |
Diff against target: |
457 lines (+267/-47) 10 files modified
README.rst (+6/-0) core-service.conf (+7/-1) core_image_publisher/__init__.py (+5/-1) core_image_publisher/cloud.py (+40/-0) core_image_publisher/constants.py (+2/-0) core_image_publisher/queue.py (+64/-1) core_image_publisher/tests/test_queue_integration.py (+2/-3) core_image_publisher/utils.py (+64/-0) core_image_publisher/worker.py (+75/-40) requirements.txt (+2/-1) |
To merge this branch: | bzr merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-upload-to-glance |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Celso Providelo (community) | Approve | ||
Review via email: mp+254529@code.launchpad.net |
Commit message
Add code to push converted images into glance.
Description of the change
Add code to push converted images into glance.
- 18. By Thomi Richards
-
Fix typo.
- 19. By Thomi Richards
-
Further separate business logic from kombu objects.
- 20. By Thomi Richards
-
Update test cases.
Francis Ginther (fginther) wrote : | # |
Just a few more comments and questions. Nothing that needs to be addressed now.
- 21. By Thomi Richards
-
Fix typo.
- 22. By Thomi Richards
-
Put messages on the dead letter queue if their retry count is exceeded, rather than rejecting them.
Thomi Richards (thomir-deactivatedaccount) wrote : | # |
I replied to most comments, and fixed the remaining issues. Please re-review!
Celso Providelo (cprov) wrote : | # |
I am fine with landing this with a comment clarifying the use of the routing_key to get the queue_name dynamically on maybe_requeue().
I just don't see much value using this in the message producer-side if any consumers interested in that particular key will have to hardcode it.
I think we to balance the predilection for testable/readable code (modules, unittest-ing, etc) and the complexity and indirection it brings to the logic flow. It's a highly subjective topic ... let's keep talking about it while the code evolves.
Preview Diff
1 | === modified file 'README.rst' |
2 | --- README.rst 2015-03-25 23:54:45 +0000 |
3 | +++ README.rst 2015-03-30 20:05:33 +0000 |
4 | @@ -57,6 +57,12 @@ |
5 | [amqp] |
6 | uris = amqp://guest:guest@localhost:5672// |
7 | |
8 | + [nova] |
9 | + os_username = foo |
10 | + os_password = <redacted> |
11 | + os_tenant_name = foo_project |
12 | + os_auth_url = http://172.20.161.138:5000/v2.0/ |
13 | + |
14 | ...optionally, add a logstash section, which will turn on the logstash handler:: |
15 | |
16 | [logstash] |
17 | |
18 | === modified file 'core-service.conf' |
19 | --- core-service.conf 2015-03-25 23:40:55 +0000 |
20 | +++ core-service.conf 2015-03-30 20:05:33 +0000 |
21 | @@ -1,3 +1,9 @@ |
22 | |
23 | [amqp] |
24 | -uris = amqp://guest:guest@localhost:5672// |
25 | \ No newline at end of file |
26 | +uris = amqp://guest:guest@localhost:5672// |
27 | + |
28 | +[nova] |
29 | +os_username = foo |
30 | +os_password = <redacted> |
31 | +os_tenant_name = foo_project |
32 | +os_auth_url = http://172.20.161.138:5000/v2.0/ |
33 | |
34 | === modified file 'core_image_publisher/__init__.py' |
35 | --- core_image_publisher/__init__.py 2015-03-27 04:26:08 +0000 |
36 | +++ core_image_publisher/__init__.py 2015-03-30 20:05:33 +0000 |
37 | @@ -52,6 +52,10 @@ |
38 | root_logger = logging.getLogger() |
39 | root_logger.setLevel(logging.INFO) |
40 | |
41 | + # glanceclient & keystoneclient use requests, which logs for every |
42 | + # new HTTP connection, so let's make it a little less chatty: |
43 | + logging.getLogger('requests').setLevel(logging.WARNING) |
44 | + |
45 | # If there is no ./logs directory, fallback to stderr. |
46 | log_path = os.path.abspath( |
47 | os.path.join(__file__, '../../logs/core-image-publisher.log')) |
48 | @@ -91,7 +95,7 @@ |
49 | with connection: |
50 | monitor = CoreImagePublisherQueueMonitor( |
51 | connection, |
52 | - worker.logging_worker |
53 | + worker.ImagePublisherWorker(config) |
54 | ) |
55 | monitor.run() |
56 | except KeyboardInterrupt: |
57 | |
58 | === added file 'core_image_publisher/cloud.py' |
59 | --- core_image_publisher/cloud.py 1970-01-01 00:00:00 +0000 |
60 | +++ core_image_publisher/cloud.py 2015-03-30 20:05:33 +0000 |
61 | @@ -0,0 +1,40 @@ |
62 | +# core-image-publisher |
63 | +# Copyright (C) 2015 Canonical |
64 | +# |
65 | +# This program is free software: you can redistribute it and/or modify |
66 | +# it under the terms of the GNU General Public License as published by |
67 | +# the Free Software Foundation, either version 3 of the License, or |
68 | +# (at your option) any later version. |
69 | +# |
70 | +# This program is distributed in the hope that it will be useful, |
71 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
72 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
73 | +# GNU General Public License for more details. |
74 | +# |
75 | +# You should have received a copy of the GNU General Public License |
76 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
77 | +# |
78 | + |
79 | +"""Functions for cloud interactions.""" |
80 | + |
81 | +import glanceclient |
82 | +import keystoneclient.v2_0.client |
83 | + |
84 | + |
85 | +def get_glance_client(config): |
86 | + keystone = keystoneclient.v2_0.client.Client( |
87 | + username=config.get('nova', 'os_username'), |
88 | + password=config.get('nova', 'os_password'), |
89 | + tenant_name=config.get('nova', 'os_tenant_name'), |
90 | + auth_url=config.get('nova', 'os_auth_url') |
91 | + ) |
92 | + glance_endpoints = keystone.service_catalog.get_endpoints( |
93 | + service_type='image' |
94 | + ) |
95 | + glance_endpoint_url = glance_endpoints['image'][0]['internalURL'] |
96 | + glance_client = glanceclient.Client( |
97 | + version='1', |
98 | + endpoint=glance_endpoint_url, |
99 | + token=keystone.auth_token |
100 | + ) |
101 | + return glance_client |
102 | |
103 | === modified file 'core_image_publisher/constants.py' |
104 | --- core_image_publisher/constants.py 2015-03-25 21:36:10 +0000 |
105 | +++ core_image_publisher/constants.py 2015-03-30 20:05:33 +0000 |
106 | @@ -18,3 +18,5 @@ |
107 | """Constants for this service.""" |
108 | |
109 | API_VERSION="v1" |
110 | + |
111 | +RETRY_COUNT = 3 |
112 | |
113 | === modified file 'core_image_publisher/queue.py' |
114 | --- core_image_publisher/queue.py 2015-03-26 21:35:31 +0000 |
115 | +++ core_image_publisher/queue.py 2015-03-30 20:05:33 +0000 |
116 | @@ -15,7 +15,9 @@ |
117 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
118 | # |
119 | |
120 | +import enum |
121 | import logging |
122 | +from pdb import bdb |
123 | |
124 | import kombu |
125 | from kombu.mixins import ConsumerMixin |
126 | @@ -31,6 +33,24 @@ |
127 | ] |
128 | |
129 | |
130 | +class MessageActions(enum.Enum): |
131 | + |
132 | + """Actions that the message processing CB can request. |
133 | + |
134 | + These enum values can be returned from the message processing callback, |
135 | + and the CoreImagePublisherQueueMonitor will take appropriate action. |
136 | + |
137 | + """ |
138 | + |
139 | + # The message needs to be retried. This might cause the message to be |
140 | + # retried, or it might reject the message if it's retry count has been |
141 | + # exceeded. |
142 | + Retry = 1 |
143 | + |
144 | + # The message was processed properly, and should be acknowledged. |
145 | + Acknowledge = 2 |
146 | + |
147 | + |
148 | class CoreImagePublisherQueueMonitor(ConsumerMixin): |
149 | |
150 | """A class that watches for incoming messages from a queue, and calls |
151 | @@ -70,7 +90,14 @@ |
152 | """ |
153 | logger.info('Got: %r', body, extra=body) |
154 | try: |
155 | - self.on_message_cb(message) |
156 | + requested_action = self.on_message_cb(message.payload) |
157 | + if requested_action == MessageActions.Retry: |
158 | + self.maybe_requeue_message(message) |
159 | + elif requested_action == MessageActions.Acknowledge: |
160 | + message.ack() |
161 | + # Make it possible to use a debugger inside the worker callback: |
162 | + except bdb.BdbQuit: |
163 | + raise |
164 | except Exception as e: |
165 | logger.error( |
166 | "Caught unhandled exception while processing message: %s", |
167 | @@ -79,6 +106,42 @@ |
168 | ) |
169 | message.requeue() |
170 | |
171 | + def maybe_requeue_message(self, message): |
172 | + """Maybe requeue the message, based on it's retry count. |
173 | + |
174 | + Note: kombu (or maybe rabbitmq) does not allow you to alter a message's |
175 | + payload and then call requue(), which would be super helpful here. Instead |
176 | + we explicitly queue it (using kombu.SimpleQueue) to the last queue the |
177 | + message was on. |
178 | + |
179 | + The key we use to store the retry count is based on the message's routing |
180 | + key, which allows us to have a separate retry count for each stage of the |
181 | + system. |
182 | + |
183 | + """ |
184 | + retry_key = "{}_retry_count".format( |
185 | + message.delivery_info.get('routing_key', '') |
186 | + ) |
187 | + retry_count = int(message.payload.get(retry_key, '0')) |
188 | + if retry_count < constants.RETRY_COUNT: |
189 | + message.payload[retry_key] = retry_count + 1 |
190 | + queue = self.connection.SimpleQueue( |
191 | + 'core.image.{}'.format(constants.API_VERSION)) |
192 | + queue.put(message.payload) |
193 | + queue.close() |
194 | + message.ack() |
195 | + else: |
196 | + logger.error( |
197 | + "Rejecting message due to retry count exceeding maximum. " |
198 | + "Message will reside on the dead letter queue", |
199 | + extra=message.payload |
200 | + ) |
201 | + queue = self.connection.SimpleQueue( |
202 | + 'core.deadletters.{}'.format(constants.API_VERSION)) |
203 | + queue.put(message.payload) |
204 | + queue.close() |
205 | + message.ack() |
206 | + |
207 | |
208 | def create_connection_from_config(config): |
209 | """Create a connection given a config object. |
210 | |
211 | === modified file 'core_image_publisher/tests/test_queue_integration.py' |
212 | --- core_image_publisher/tests/test_queue_integration.py 2015-03-25 21:36:10 +0000 |
213 | +++ core_image_publisher/tests/test_queue_integration.py 2015-03-30 20:05:33 +0000 |
214 | @@ -65,9 +65,8 @@ |
215 | def __init__(self): |
216 | self.got_messages = [] |
217 | |
218 | - def __call__(self, message): |
219 | - self.got_messages.append(message.payload) |
220 | - message.ack() |
221 | + def __call__(self, payload): |
222 | + self.got_messages.append(payload) |
223 | |
224 | |
225 | def queue_message(conn, message): |
226 | |
227 | === added file 'core_image_publisher/utils.py' |
228 | --- core_image_publisher/utils.py 1970-01-01 00:00:00 +0000 |
229 | +++ core_image_publisher/utils.py 2015-03-30 20:05:33 +0000 |
230 | @@ -0,0 +1,64 @@ |
231 | +# core-image-publisher |
232 | +# Copyright (C) 2015 Canonical |
233 | +# |
234 | +# This program is free software: you can redistribute it and/or modify |
235 | +# it under the terms of the GNU General Public License as published by |
236 | +# the Free Software Foundation, either version 3 of the License, or |
237 | +# (at your option) any later version. |
238 | +# |
239 | +# This program is distributed in the hope that it will be useful, |
240 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
241 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
242 | +# GNU General Public License for more details. |
243 | +# |
244 | +# You should have received a copy of the GNU General Public License |
245 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
246 | +# |
247 | + |
248 | +"Utilities that might be useful elsewhere." |
249 | + |
250 | +import subprocess |
251 | +import textwrap |
252 | + |
253 | + |
254 | +def check_call(*popenargs, **kwargs): |
255 | + """A better version of subprocess.check_call that gives better errors. |
256 | + |
257 | + If a command invoked via 'check_call' fails, the error looks like:: |
258 | + |
259 | + Command '['juju', 'bootstrap']' returned non-zero exit status 2 |
260 | + |
261 | + While concise, that doesn't help the user of this script see what went |
262 | + wrong. This function invokes a command, but captures stdout and stderr, |
263 | + and includes that information in the exception. |
264 | + |
265 | + """ |
266 | + if 'stdout' in kwargs or 'stderr' in kwargs: |
267 | + # user did something with stdout or stderr already, so just call |
268 | + # through to check_call directly: |
269 | + return subprocess.check_call(*popenargs, **kwargs) |
270 | + kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE |
271 | + process = subprocess.Popen(*popenargs, **kwargs) |
272 | + stdout, stderr = process.communicate() |
273 | + if process.returncode != 0: |
274 | + raise BetterCalledProcessError( |
275 | + process.returncode, |
276 | + popenargs[0], |
277 | + textwrap.dedent( |
278 | + ''' |
279 | + stdout: |
280 | + """\\ |
281 | + {}""" |
282 | + stderr: |
283 | + """\\ |
284 | + {}""" |
285 | + ''').format(stdout.decode(), stderr.decode()) |
286 | + ) |
287 | + |
288 | + |
289 | +class BetterCalledProcessError(subprocess.CalledProcessError): |
290 | + |
291 | + """Extend CalledProcessError to show failed command output as well.""" |
292 | + |
293 | + def __str__(self): |
294 | + return super().__str__() + self.output |
295 | |
296 | === modified file 'core_image_publisher/worker.py' |
297 | --- core_image_publisher/worker.py 2015-03-29 21:12:41 +0000 |
298 | +++ core_image_publisher/worker.py 2015-03-30 20:05:33 +0000 |
299 | @@ -21,46 +21,71 @@ |
300 | import os |
301 | import subprocess |
302 | import tempfile |
303 | -from core_image_publisher.queue import enqueue_message |
304 | + |
305 | +from core_image_publisher.queue import ( |
306 | + enqueue_message, |
307 | + MessageActions, |
308 | +) |
309 | +from core_image_publisher.cloud import get_glance_client |
310 | +from core_image_publisher.utils import check_call |
311 | |
312 | logger = logging.getLogger(__name__) |
313 | |
314 | |
315 | -def logging_worker(message): |
316 | - logger.info("Got %r", message.payload, extra=message.payload) |
317 | - payload = message.payload |
318 | - try: |
319 | - image_name = payload['image_name'] |
320 | - channel = payload['channel'] |
321 | - device = payload['device'] |
322 | - except KeyError as e: |
323 | - logger.error( |
324 | - "Unable to deserialize message payload - rejecting message: %s", |
325 | - e, |
326 | - extra=payload |
327 | - ) |
328 | - message.requeue() |
329 | - return |
330 | - |
331 | - with tempfile.TemporaryDirectory as tmpdir: |
332 | - try: |
333 | - image_path = download_image(image_name, channel, device, tmpdir) |
334 | - except subprocess.CalledProcessError as e: |
335 | - logger.error("Unable to download core image: %s", e, extra=payload) |
336 | - message.requeue() |
337 | - return |
338 | - try: |
339 | - nova_image_path = convert_nova_image(image_path) |
340 | - except subprocess.CalledProcessError as e: |
341 | - logger.error("Unable to convert core image to qcow2 image: %s", e, |
342 | - extra=payload) |
343 | - message.requeue() |
344 | - return |
345 | - glance_image_name = upload_image_to_glance(nova_image_path) |
346 | - |
347 | - payload['nova_image'] = glance_image_name |
348 | - enqueue_message(payload) |
349 | - message.ack() |
350 | +class ImagePublisherWorker(object): |
351 | + |
352 | + """A worker callable that contains all our main logic.""" |
353 | + |
354 | + def __init__(self, config): |
355 | + self._config = config |
356 | + |
357 | + def __call__(self, payload): |
358 | + logger.info("Got %r", payload, extra=payload) |
359 | + try: |
360 | + image_name = payload['image_name'] |
361 | + channel = payload['channel'] |
362 | + device = payload['device'] |
363 | + except KeyError as e: |
364 | + logger.error( |
365 | + "Unable to deserialize message payload - rejecting message: %s", |
366 | + e, |
367 | + extra=payload |
368 | + ) |
369 | + return MessageActions.Retry |
370 | + |
371 | + with tempfile.TemporaryDirectory() as tmpdir: |
372 | + logger.info("Beginning image download.", extra=payload) |
373 | + try: |
374 | + image_path = download_image(image_name, channel, device, tmpdir) |
375 | + except subprocess.CalledProcessError as e: |
376 | + logger.error("Unable to download core image: %s", e, extra=payload) |
377 | + return MessageActions.Retry |
378 | + logger.info("Ubuntu Core image downloaded OK.", extra=payload) |
379 | + |
380 | + try: |
381 | + nova_image_path = convert_nova_image(image_path) |
382 | + except subprocess.CalledProcessError as e: |
383 | + logger.error("Unable to convert core image to qcow2 image: %s", e, |
384 | + extra=payload) |
385 | + return MessageActions.Retry |
386 | + logger.info("Image converted to qcow2 OK.", extra=payload) |
387 | + |
388 | + try: |
389 | + logger.info("Beginning image upload to glance...") |
390 | + glance_image_name = upload_image_to_glance( |
391 | + nova_image_path, |
392 | + self._config |
393 | + ) |
394 | + except Exception as e: |
395 | + logger.error("Unable to upload image to glance: %s", e, |
396 | + extra=payload) |
397 | + return MessageActions.Retry |
398 | + logger.info("Image uploaded to glance OK.", extra=payload) |
399 | + |
400 | + payload['nova_image'] = glance_image_name |
401 | + enqueue_message(payload) |
402 | + logger.info("Processing completed.", extra=payload) |
403 | + return MessageActions.Acknowledge |
404 | |
405 | |
406 | def download_image(name, channel, device, tmpdir): |
407 | @@ -70,13 +95,13 @@ |
408 | 'ubuntu-device-flash', |
409 | '--revision', name, |
410 | 'core', |
411 | - '--device', generic_amd64, |
412 | + '--device', device, |
413 | '--channel', channel, |
414 | '--size', '3', |
415 | '-o', image_path, |
416 | '--developer-mode', |
417 | '--cloud'] |
418 | - subprocess.check_call(cmd) |
419 | + check_call(cmd) |
420 | return image_path |
421 | |
422 | |
423 | @@ -91,11 +116,21 @@ |
424 | '-O', 'qcow2', |
425 | image_path, |
426 | converted_image_path] |
427 | - subprocess.check_call(cmd) |
428 | + check_call(cmd) |
429 | return converted_image_path |
430 | |
431 | |
432 | -def upload_image_to_glance(nova_image_path): |
433 | +def upload_image_to_glance(nova_image_path, config): |
434 | """ |
435 | Upload the nova image to glance, returning the name of the image in glance. |
436 | """ |
437 | + glance = get_glance_client(config) |
438 | + image_name = os.path.basename(nova_image_path) |
439 | + image = glance.images.create( |
440 | + name=image_name, |
441 | + data=open(nova_image_path, 'rb'), |
442 | + disk_format='qcow2', |
443 | + container_format='bare', |
444 | + ) |
445 | + return image_name |
446 | + |
447 | |
448 | === modified file 'requirements.txt' |
449 | --- requirements.txt 2015-03-25 23:51:59 +0000 |
450 | +++ requirements.txt 2015-03-30 20:05:33 +0000 |
451 | @@ -1,2 +1,3 @@ |
452 | kombu==3.0.24 |
453 | -python-logstash==0.4.2 |
454 | \ No newline at end of file |
455 | +python-logstash==0.4.2 |
456 | +python-glanceclient==0.17.0 |
457 | \ No newline at end of file |
Thomi,
It is looking good, but I think we have to adjust maybe_requeue_ message( ) behavior a bit, see the inline comments and let me know if the make sense to our plans.
[]