Merge lp:~thomir-deactivatedaccount/core-result-checker/trunk-check-swift-better into lp:core-result-checker

Proposed by Thomi Richards
Status: Merged
Approved by: Thomi Richards
Approved revision: 15
Merged at revision: 13
Proposed branch: lp:~thomir-deactivatedaccount/core-result-checker/trunk-check-swift-better
Merge into: lp:core-result-checker
Diff against target: 175 lines (+78/-18)
1 file modified
core_result_checker/__init__.py (+78/-18)
To merge this branch: bzr merge lp:~thomir-deactivatedaccount/core-result-checker/trunk-check-swift-better
Reviewer Review Type Date Requested Status
Celso Providelo (community) Approve
Review via email: mp+255320@code.launchpad.net

Commit message

Wait for swift to be consistent.

Description of the change

Add a check for swift to become consistent before we try and make the container public.

To post a comment you must log in.
Revision history for this message
Celso Providelo (cprov) wrote :

Right, let's see the SwiftManager in action.

I just think that 10 minutes spinning message waiting for swift seems excessive (I can see the logging storm already), but we shall see ...

For situations like this I'd prefer moving jobs quickly (< 30 s) to the deadletter queue and have proper monitoring and replay scripts.

review: Approve
15. By Thomi Richards

Set default timeout to 60 seconds, not 600.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'core_result_checker/__init__.py'
--- core_result_checker/__init__.py 2015-04-07 00:06:54 +0000
+++ core_result_checker/__init__.py 2015-04-07 04:03:38 +0000
@@ -17,6 +17,7 @@
1717
18import argparse18import argparse
19import configparser19import configparser
20from datetime import datetime
20import logging21import logging
21import os22import os
2223
@@ -37,9 +38,9 @@
3738
38class Worker(object):39class Worker(object):
3940
40 def __init__(self, swift_publisher, retry_publisher):41 def __init__(self, swift_manager, retry_publisher):
41 get_logger(__name__).info("Service Started.")42 get_logger(__name__).info("Service Started.")
42 self.swift_publisher = swift_publisher43 self.swift_manager = swift_manager
43 self.retry_publisher = retry_publisher44 self.retry_publisher = retry_publisher
4445
45 def __call__(self, message):46 def __call__(self, message):
@@ -54,6 +55,7 @@
54 logger.error("Unable to unpack incoming message: %s", str(e))55 logger.error("Unable to unpack incoming message: %s", str(e))
55 return MessageActions.Retry56 return MessageActions.Retry
5657
58 # check to see if adt-run reported infrastructure failures:
57 if int(exit_code) in (16, 20, 100):59 if int(exit_code) in (16, 20, 100):
58 logger.info(60 logger.info(
59 "Test run infrastructure failed (exit code %s), retrying.",61 "Test run infrastructure failed (exit code %s), retrying.",
@@ -66,8 +68,18 @@
66 device,68 device,
67 image_name,69 image_name,
68 )70 )
71
72 # check to see if swift is consistent yet:
73 if not self.swift_manager.container_contains_file(
74 container_name,
75 "results.tgz"
76 ):
77 logger.warning("Swift container not yet consistent.")
78 self.retry_publisher.retry_for_swift(message)
79 return MessageActions.Acknowledge
80
69 try:81 try:
70 self.swift_publisher.make_container_public(container_name)82 self.swift_manager.make_container_public(container_name)
71 except RuntimeError as e:83 except RuntimeError as e:
72 logger.error(84 logger.error(
73 "Unable to publish swift container '%s'. Error: %s.",85 "Unable to publish swift container '%s'. Error: %s.",
@@ -105,9 +117,9 @@
105 return config117 return config
106118
107119
108class SwiftPublisher(object):120class SwiftManager(object):
109121
110 """A class that knows how to make a swift container public."""122 """A class that knows how to interact with swift."""
111123
112 def __init__(self, nova_config):124 def __init__(self, nova_config):
113 self.config = nova_config125 self.config = nova_config
@@ -131,6 +143,11 @@
131 )143 )
132 )144 )
133145
146 def container_contains_file(self, container_name, file_name):
147 swift = SwiftService(self.config)
148 result = next(swift.stat(container_name, [file_name]))
149 return result['success']
150
134151
135class RetryPublisher(object):152class RetryPublisher(object):
136153
@@ -139,9 +156,10 @@
139 def __init__(self, connection, max_retries):156 def __init__(self, connection, max_retries):
140 self.connection = connection157 self.connection = connection
141 self.max_retries = max_retries158 self.max_retries = max_retries
159 self.swift_retry_seconds = 60
142160
143 def retry_test_run(self, payload):161 def retry_test_run(self, payload):
144 """Maybe retry a test payload.162 """Maybe retry a test payload. Due to adt-run reported failure.
145163
146 This will look at it's retry count, and either requeue it in the164 This will look at it's retry count, and either requeue it in the
147 test queue, or insert it into the dead letter queue.165 test queue, or insert it into the dead letter queue.
@@ -149,23 +167,65 @@
149 """167 """
150 retry_count = int(payload.get('test_run_retry_count', '0'))168 retry_count = int(payload.get('test_run_retry_count', '0'))
151 if retry_count < self.max_retries:169 if retry_count < self.max_retries:
152 q = self.connection.SimpleQueue(
153 "core.tests.{}".format(constants.API_VERSION)
154 )
155 payload['test_run_retry_count'] = retry_count + 1170 payload['test_run_retry_count'] = retry_count + 1
156 q.put(payload)171 self._insert_payload_into_queue(
157 q.close()172 "core.tests.{}".format(constants.API_VERSION),
173 payload
174 )
158 else:175 else:
159 logger = get_logger(__name__, payload)176 logger = get_logger(__name__, payload)
160 logger.error(177 logger.error(
161 "Test retry count exceeds maximum. Inserting into dead "178 "Test retry count exceeds maximum. Inserting into dead "
162 "letter queue"179 "letter queue"
163 )180 )
164 q = self.connection.SimpleQueue(181 self._insert_payload_into_queue(
165 "core.deadletters.{}".format(constants.API_VERSION)182 "core.deadletters.{}".format(constants.API_VERSION),
166 )183 payload
167 q.put(payload)184 )
168 q.close()185
186 def retry_for_swift(self, payload):
187 """Maybe retry a test payload while waiting for swift.
188
189 This will look at the first time the message was checked, and give a
190 certain grace period. After that period is over, we will put the
191 payload into the dead letter queue.
192
193 """
194 swift_check_key = 'swift_first_check_time'
195 first_swift_check_time = payload.get(swift_check_key)
196 if first_swift_check_time is None:
197 first_swift_check_time = datetime.utcnow()
198 payload[swift_check_key] = str(first_swift_check_time.timestamp())
199 else:
200 first_swift_check_time = datetime.fromtimestamp(
201 float(first_swift_check_time)
202 )
203 now = datetime.utcnow()
204 time_since_first_check = now - first_swift_check_time
205 logger = get_logger(__name__, payload)
206 if time_since_first_check.total_seconds() < self.swift_retry_seconds:
207 logger.info(
208 "Requeueing message since swift is not yet consistent"
209 )
210 self._insert_payload_into_queue(
211 "core.result.{}".format(constants.API_VERSION),
212 payload
213 )
214 else:
215 logger.error(
216 "swift is still not consistent after %d seconds. Inserting "
217 "message into dead letter queue.",
218 self.swift_retry_seconds
219 )
220 self._insert_payload_into_queue(
221 "core.deadletters.{}".format(constants.API_VERSION),
222 payload
223 )
224
225 def _insert_payload_into_queue(self, queue, payload):
226 q = self.connection.SimpleQueue(queue)
227 q.put(payload)
228 q.close()
169229
170230
171def main():231def main():
@@ -184,9 +244,9 @@
184 )244 )
185 try:245 try:
186 with kombu.Connection(amqp_uris) as connection:246 with kombu.Connection(amqp_uris) as connection:
187 swift_publisher = SwiftPublisher(config['nova'])247 swift_manager = SwiftManager(config['nova'])
188 retry_publisher = RetryPublisher(connection, 3)248 retry_publisher = RetryPublisher(connection, 3)
189 worker = Worker(swift_publisher, retry_publisher)249 worker = Worker(swift_manager, retry_publisher)
190 queue_monitor = SimpleRabbitQueueWorker(250 queue_monitor = SimpleRabbitQueueWorker(
191 connection,251 connection,
192 "core.result.{}".format(constants.API_VERSION),252 "core.result.{}".format(constants.API_VERSION),

Subscribers

People subscribed via source and target branches

to all changes: