Merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-upload-to-glance into lp:core-image-publisher

Proposed by Thomi Richards
Status: Merged
Approved by: Thomi Richards
Approved revision: 22
Merged at revision: 13
Proposed branch: lp:~thomir-deactivatedaccount/core-image-publisher/trunk-upload-to-glance
Merge into: lp:core-image-publisher
Diff against target: 457 lines (+267/-47)
10 files modified
README.rst (+6/-0)
core-service.conf (+7/-1)
core_image_publisher/__init__.py (+5/-1)
core_image_publisher/cloud.py (+40/-0)
core_image_publisher/constants.py (+2/-0)
core_image_publisher/queue.py (+64/-1)
core_image_publisher/tests/test_queue_integration.py (+2/-3)
core_image_publisher/utils.py (+64/-0)
core_image_publisher/worker.py (+75/-40)
requirements.txt (+2/-1)
To merge this branch: bzr merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-upload-to-glance
Reviewer Review Type Date Requested Status
Celso Providelo (community) Approve
Review via email: mp+254529@code.launchpad.net

Commit message

Add code to push converted images into glance.

Description of the change

Add code to push converted images into glance.

To post a comment you must log in.
18. By Thomi Richards

Fix typo.

19. By Thomi Richards

Further separate business logic from kombu objects.

Revision history for this message
Celso Providelo (cprov) wrote :

Thomi,

It is looking good, but I think we have to adjust maybe_requeue_message() behavior a bit, see the inline comments and let me know if the make sense to our plans.

[]

review: Needs Information
20. By Thomi Richards

Update test cases.

Revision history for this message
Francis Ginther (fginther) wrote :

Just a few more comments and questions. Nothing that needs to be addressed now.

21. By Thomi Richards

Fix typo.

22. By Thomi Richards

Put messages on the dead letter queue if their retry count is exceeded, rather than rejecting them.

Revision history for this message
Thomi Richards (thomir-deactivatedaccount) wrote :

I replied to most comments, and fixed the remaining issues. Please re-review!

Revision history for this message
Celso Providelo (cprov) wrote :

I am fine with landing this with a comment clarifying the use of the routing_key to get the queue_name dynamically on maybe_requeue().

I just don't see much value using this in the message producer-side if any consumers interested in that particular key will have to hardcode it.

I think we to balance the predilection for testable/readable code (modules, unittest-ing, etc) and the complexity and indirection it brings to the logic flow. It's a highly subjective topic ... let's keep talking about it while the code evolves.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'README.rst'
2--- README.rst 2015-03-25 23:54:45 +0000
3+++ README.rst 2015-03-30 20:05:33 +0000
4@@ -57,6 +57,12 @@
5 [amqp]
6 uris = amqp://guest:guest@localhost:5672//
7
8+ [nova]
9+ os_username = foo
10+ os_password = <redacted>
11+ os_tenant_name = foo_project
12+ os_auth_url = http://172.20.161.138:5000/v2.0/
13+
14 ...optionally, add a logstash section, which will turn on the logstash handler::
15
16 [logstash]
17
18=== modified file 'core-service.conf'
19--- core-service.conf 2015-03-25 23:40:55 +0000
20+++ core-service.conf 2015-03-30 20:05:33 +0000
21@@ -1,3 +1,9 @@
22
23 [amqp]
24-uris = amqp://guest:guest@localhost:5672//
25\ No newline at end of file
26+uris = amqp://guest:guest@localhost:5672//
27+
28+[nova]
29+os_username = foo
30+os_password = <redacted>
31+os_tenant_name = foo_project
32+os_auth_url = http://172.20.161.138:5000/v2.0/
33
34=== modified file 'core_image_publisher/__init__.py'
35--- core_image_publisher/__init__.py 2015-03-27 04:26:08 +0000
36+++ core_image_publisher/__init__.py 2015-03-30 20:05:33 +0000
37@@ -52,6 +52,10 @@
38 root_logger = logging.getLogger()
39 root_logger.setLevel(logging.INFO)
40
41+ # glanceclient & keystoneclient use requests, which logs for every
42+ # new HTTP connection, so let's make it a little less chatty:
43+ logging.getLogger('requests').setLevel(logging.WARNING)
44+
45 # If there is no ./logs directory, fallback to stderr.
46 log_path = os.path.abspath(
47 os.path.join(__file__, '../../logs/core-image-publisher.log'))
48@@ -91,7 +95,7 @@
49 with connection:
50 monitor = CoreImagePublisherQueueMonitor(
51 connection,
52- worker.logging_worker
53+ worker.ImagePublisherWorker(config)
54 )
55 monitor.run()
56 except KeyboardInterrupt:
57
58=== added file 'core_image_publisher/cloud.py'
59--- core_image_publisher/cloud.py 1970-01-01 00:00:00 +0000
60+++ core_image_publisher/cloud.py 2015-03-30 20:05:33 +0000
61@@ -0,0 +1,40 @@
62+# core-image-publisher
63+# Copyright (C) 2015 Canonical
64+#
65+# This program is free software: you can redistribute it and/or modify
66+# it under the terms of the GNU General Public License as published by
67+# the Free Software Foundation, either version 3 of the License, or
68+# (at your option) any later version.
69+#
70+# This program is distributed in the hope that it will be useful,
71+# but WITHOUT ANY WARRANTY; without even the implied warranty of
72+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
73+# GNU General Public License for more details.
74+#
75+# You should have received a copy of the GNU General Public License
76+# along with this program. If not, see <http://www.gnu.org/licenses/>.
77+#
78+
79+"""Functions for cloud interactions."""
80+
81+import glanceclient
82+import keystoneclient.v2_0.client
83+
84+
85+def get_glance_client(config):
86+ keystone = keystoneclient.v2_0.client.Client(
87+ username=config.get('nova', 'os_username'),
88+ password=config.get('nova', 'os_password'),
89+ tenant_name=config.get('nova', 'os_tenant_name'),
90+ auth_url=config.get('nova', 'os_auth_url')
91+ )
92+ glance_endpoints = keystone.service_catalog.get_endpoints(
93+ service_type='image'
94+ )
95+ glance_endpoint_url = glance_endpoints['image'][0]['internalURL']
96+ glance_client = glanceclient.Client(
97+ version='1',
98+ endpoint=glance_endpoint_url,
99+ token=keystone.auth_token
100+ )
101+ return glance_client
102
103=== modified file 'core_image_publisher/constants.py'
104--- core_image_publisher/constants.py 2015-03-25 21:36:10 +0000
105+++ core_image_publisher/constants.py 2015-03-30 20:05:33 +0000
106@@ -18,3 +18,5 @@
107 """Constants for this service."""
108
109 API_VERSION="v1"
110+
111+RETRY_COUNT = 3
112
113=== modified file 'core_image_publisher/queue.py'
114--- core_image_publisher/queue.py 2015-03-26 21:35:31 +0000
115+++ core_image_publisher/queue.py 2015-03-30 20:05:33 +0000
116@@ -15,7 +15,9 @@
117 # along with this program. If not, see <http://www.gnu.org/licenses/>.
118 #
119
120+import enum
121 import logging
122+from pdb import bdb
123
124 import kombu
125 from kombu.mixins import ConsumerMixin
126@@ -31,6 +33,24 @@
127 ]
128
129
130+class MessageActions(enum.Enum):
131+
132+ """Actions that the message processing CB can request.
133+
134+ These enum values can be returned from the message processing callback,
135+ and the CoreImagePublisherQueueMonitor will take appropriate action.
136+
137+ """
138+
139+ # The message needs to be retried. This might cause the message to be
140+ # retried, or it might reject the message if it's retry count has been
141+ # exceeded.
142+ Retry = 1
143+
144+ # The message was processed properly, and should be acknowledged.
145+ Acknowledge = 2
146+
147+
148 class CoreImagePublisherQueueMonitor(ConsumerMixin):
149
150 """A class that watches for incoming messages from a queue, and calls
151@@ -70,7 +90,14 @@
152 """
153 logger.info('Got: %r', body, extra=body)
154 try:
155- self.on_message_cb(message)
156+ requested_action = self.on_message_cb(message.payload)
157+ if requested_action == MessageActions.Retry:
158+ self.maybe_requeue_message(message)
159+ elif requested_action == MessageActions.Acknowledge:
160+ message.ack()
161+ # Make it possible to use a debugger inside the worker callback:
162+ except bdb.BdbQuit:
163+ raise
164 except Exception as e:
165 logger.error(
166 "Caught unhandled exception while processing message: %s",
167@@ -79,6 +106,42 @@
168 )
169 message.requeue()
170
171+ def maybe_requeue_message(self, message):
172+ """Maybe requeue the message, based on it's retry count.
173+
174+ Note: kombu (or maybe rabbitmq) does not allow you to alter a message's
175+ payload and then call requue(), which would be super helpful here. Instead
176+ we explicitly queue it (using kombu.SimpleQueue) to the last queue the
177+ message was on.
178+
179+ The key we use to store the retry count is based on the message's routing
180+ key, which allows us to have a separate retry count for each stage of the
181+ system.
182+
183+ """
184+ retry_key = "{}_retry_count".format(
185+ message.delivery_info.get('routing_key', '')
186+ )
187+ retry_count = int(message.payload.get(retry_key, '0'))
188+ if retry_count < constants.RETRY_COUNT:
189+ message.payload[retry_key] = retry_count + 1
190+ queue = self.connection.SimpleQueue(
191+ 'core.image.{}'.format(constants.API_VERSION))
192+ queue.put(message.payload)
193+ queue.close()
194+ message.ack()
195+ else:
196+ logger.error(
197+ "Rejecting message due to retry count exceeding maximum. "
198+ "Message will reside on the dead letter queue",
199+ extra=message.payload
200+ )
201+ queue = self.connection.SimpleQueue(
202+ 'core.deadletters.{}'.format(constants.API_VERSION))
203+ queue.put(message.payload)
204+ queue.close()
205+ message.ack()
206+
207
208 def create_connection_from_config(config):
209 """Create a connection given a config object.
210
211=== modified file 'core_image_publisher/tests/test_queue_integration.py'
212--- core_image_publisher/tests/test_queue_integration.py 2015-03-25 21:36:10 +0000
213+++ core_image_publisher/tests/test_queue_integration.py 2015-03-30 20:05:33 +0000
214@@ -65,9 +65,8 @@
215 def __init__(self):
216 self.got_messages = []
217
218- def __call__(self, message):
219- self.got_messages.append(message.payload)
220- message.ack()
221+ def __call__(self, payload):
222+ self.got_messages.append(payload)
223
224
225 def queue_message(conn, message):
226
227=== added file 'core_image_publisher/utils.py'
228--- core_image_publisher/utils.py 1970-01-01 00:00:00 +0000
229+++ core_image_publisher/utils.py 2015-03-30 20:05:33 +0000
230@@ -0,0 +1,64 @@
231+# core-image-publisher
232+# Copyright (C) 2015 Canonical
233+#
234+# This program is free software: you can redistribute it and/or modify
235+# it under the terms of the GNU General Public License as published by
236+# the Free Software Foundation, either version 3 of the License, or
237+# (at your option) any later version.
238+#
239+# This program is distributed in the hope that it will be useful,
240+# but WITHOUT ANY WARRANTY; without even the implied warranty of
241+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
242+# GNU General Public License for more details.
243+#
244+# You should have received a copy of the GNU General Public License
245+# along with this program. If not, see <http://www.gnu.org/licenses/>.
246+#
247+
248+"Utilities that might be useful elsewhere."
249+
250+import subprocess
251+import textwrap
252+
253+
254+def check_call(*popenargs, **kwargs):
255+ """A better version of subprocess.check_call that gives better errors.
256+
257+ If a command invoked via 'check_call' fails, the error looks like::
258+
259+ Command '['juju', 'bootstrap']' returned non-zero exit status 2
260+
261+ While concise, that doesn't help the user of this script see what went
262+ wrong. This function invokes a command, but captures stdout and stderr,
263+ and includes that information in the exception.
264+
265+ """
266+ if 'stdout' in kwargs or 'stderr' in kwargs:
267+ # user did something with stdout or stderr already, so just call
268+ # through to check_call directly:
269+ return subprocess.check_call(*popenargs, **kwargs)
270+ kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
271+ process = subprocess.Popen(*popenargs, **kwargs)
272+ stdout, stderr = process.communicate()
273+ if process.returncode != 0:
274+ raise BetterCalledProcessError(
275+ process.returncode,
276+ popenargs[0],
277+ textwrap.dedent(
278+ '''
279+ stdout:
280+ """\\
281+ {}"""
282+ stderr:
283+ """\\
284+ {}"""
285+ ''').format(stdout.decode(), stderr.decode())
286+ )
287+
288+
289+class BetterCalledProcessError(subprocess.CalledProcessError):
290+
291+ """Extend CalledProcessError to show failed command output as well."""
292+
293+ def __str__(self):
294+ return super().__str__() + self.output
295
296=== modified file 'core_image_publisher/worker.py'
297--- core_image_publisher/worker.py 2015-03-29 21:12:41 +0000
298+++ core_image_publisher/worker.py 2015-03-30 20:05:33 +0000
299@@ -21,46 +21,71 @@
300 import os
301 import subprocess
302 import tempfile
303-from core_image_publisher.queue import enqueue_message
304+
305+from core_image_publisher.queue import (
306+ enqueue_message,
307+ MessageActions,
308+)
309+from core_image_publisher.cloud import get_glance_client
310+from core_image_publisher.utils import check_call
311
312 logger = logging.getLogger(__name__)
313
314
315-def logging_worker(message):
316- logger.info("Got %r", message.payload, extra=message.payload)
317- payload = message.payload
318- try:
319- image_name = payload['image_name']
320- channel = payload['channel']
321- device = payload['device']
322- except KeyError as e:
323- logger.error(
324- "Unable to deserialize message payload - rejecting message: %s",
325- e,
326- extra=payload
327- )
328- message.requeue()
329- return
330-
331- with tempfile.TemporaryDirectory as tmpdir:
332- try:
333- image_path = download_image(image_name, channel, device, tmpdir)
334- except subprocess.CalledProcessError as e:
335- logger.error("Unable to download core image: %s", e, extra=payload)
336- message.requeue()
337- return
338- try:
339- nova_image_path = convert_nova_image(image_path)
340- except subprocess.CalledProcessError as e:
341- logger.error("Unable to convert core image to qcow2 image: %s", e,
342- extra=payload)
343- message.requeue()
344- return
345- glance_image_name = upload_image_to_glance(nova_image_path)
346-
347- payload['nova_image'] = glance_image_name
348- enqueue_message(payload)
349- message.ack()
350+class ImagePublisherWorker(object):
351+
352+ """A worker callable that contains all our main logic."""
353+
354+ def __init__(self, config):
355+ self._config = config
356+
357+ def __call__(self, payload):
358+ logger.info("Got %r", payload, extra=payload)
359+ try:
360+ image_name = payload['image_name']
361+ channel = payload['channel']
362+ device = payload['device']
363+ except KeyError as e:
364+ logger.error(
365+ "Unable to deserialize message payload - rejecting message: %s",
366+ e,
367+ extra=payload
368+ )
369+ return MessageActions.Retry
370+
371+ with tempfile.TemporaryDirectory() as tmpdir:
372+ logger.info("Beginning image download.", extra=payload)
373+ try:
374+ image_path = download_image(image_name, channel, device, tmpdir)
375+ except subprocess.CalledProcessError as e:
376+ logger.error("Unable to download core image: %s", e, extra=payload)
377+ return MessageActions.Retry
378+ logger.info("Ubuntu Core image downloaded OK.", extra=payload)
379+
380+ try:
381+ nova_image_path = convert_nova_image(image_path)
382+ except subprocess.CalledProcessError as e:
383+ logger.error("Unable to convert core image to qcow2 image: %s", e,
384+ extra=payload)
385+ return MessageActions.Retry
386+ logger.info("Image converted to qcow2 OK.", extra=payload)
387+
388+ try:
389+ logger.info("Beginning image upload to glance...")
390+ glance_image_name = upload_image_to_glance(
391+ nova_image_path,
392+ self._config
393+ )
394+ except Exception as e:
395+ logger.error("Unable to upload image to glance: %s", e,
396+ extra=payload)
397+ return MessageActions.Retry
398+ logger.info("Image uploaded to glance OK.", extra=payload)
399+
400+ payload['nova_image'] = glance_image_name
401+ enqueue_message(payload)
402+ logger.info("Processing completed.", extra=payload)
403+ return MessageActions.Acknowledge
404
405
406 def download_image(name, channel, device, tmpdir):
407@@ -70,13 +95,13 @@
408 'ubuntu-device-flash',
409 '--revision', name,
410 'core',
411- '--device', generic_amd64,
412+ '--device', device,
413 '--channel', channel,
414 '--size', '3',
415 '-o', image_path,
416 '--developer-mode',
417 '--cloud']
418- subprocess.check_call(cmd)
419+ check_call(cmd)
420 return image_path
421
422
423@@ -91,11 +116,21 @@
424 '-O', 'qcow2',
425 image_path,
426 converted_image_path]
427- subprocess.check_call(cmd)
428+ check_call(cmd)
429 return converted_image_path
430
431
432-def upload_image_to_glance(nova_image_path):
433+def upload_image_to_glance(nova_image_path, config):
434 """
435 Upload the nova image to glance, returning the name of the image in glance.
436 """
437+ glance = get_glance_client(config)
438+ image_name = os.path.basename(nova_image_path)
439+ image = glance.images.create(
440+ name=image_name,
441+ data=open(nova_image_path, 'rb'),
442+ disk_format='qcow2',
443+ container_format='bare',
444+ )
445+ return image_name
446+
447
448=== modified file 'requirements.txt'
449--- requirements.txt 2015-03-25 23:51:59 +0000
450+++ requirements.txt 2015-03-30 20:05:33 +0000
451@@ -1,2 +1,3 @@
452 kombu==3.0.24
453-python-logstash==0.4.2
454\ No newline at end of file
455+python-logstash==0.4.2
456+python-glanceclient==0.17.0
457\ No newline at end of file

Subscribers

People subscribed via source and target branches

to all changes: