Merge lp:~pwlars/snappy-proposed-selftest-agent/selftest-agent-retry into lp:snappy-proposed-selftest-agent

Proposed by Paul Larson
Status: Merged
Approved by: Paul Larson
Approved revision: 16
Merged at revision: 15
Proposed branch: lp:~pwlars/snappy-proposed-selftest-agent/selftest-agent-retry
Merge into: lp:snappy-proposed-selftest-agent
Diff against target: 85 lines (+44/-2)
2 files modified
snappy_proposed_selftest_agent/tests/test_worker.py (+21/-0)
snappy_proposed_selftest_agent/worker.py (+23/-2)
To merge this branch: bzr merge lp:~pwlars/snappy-proposed-selftest-agent/selftest-agent-retry
Reviewer Review Type Date Requested Status
Joe Talbott (community) Approve
Paul Larson Needs Resubmitting
Review via email: mp+261128@code.launchpad.net

Commit message

Retry incoming messages if they have a bad payload a few times, but deadletter them if they still fail after a few attempts

Description of the change

Retry incoming messages if they have a bad payload a few times, but deadletter them if they still fail after a few attempts

To post a comment you must log in.
Revision history for this message
Joe Talbott (joetalbott) wrote :

Take a look at my inline suggestion and let me know what you think.

Thanks for doing this work.

review: Needs Information
16. By Paul Larson

restructure the try block to make things more clear

Revision history for this message
Paul Larson (pwlars) wrote :

Good idea, I think it's easier to follow like that. Fixed.

review: Needs Resubmitting
Revision history for this message
Joe Talbott (joetalbott) wrote :

Thanks, looks great.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'snappy_proposed_selftest_agent/tests/test_worker.py'
2--- snappy_proposed_selftest_agent/tests/test_worker.py 2015-05-29 13:41:30 +0000
3+++ snappy_proposed_selftest_agent/tests/test_worker.py 2015-06-04 18:36:09 +0000
4@@ -70,6 +70,27 @@
5 ['foo', 'bar'],
6 build_worker.messages_seen[0].get('image_binaries'))
7
8+ def test_bad_candidates_message(self):
9+ """ Insert a bad message and make sure it fails properly """
10+ conn = kombu.Connection('memory:///')
11+ queue_message(conn, {'test': 'value'})
12+
13+ build_worker = LoggingConsumer()
14+ result_worker = LoggingConsumer()
15+ config = read_config()
16+ q = CoreSelftestAgentWorker(conn, config,
17+ build_worker, result_worker)
18+ # The message will be retried 3 times, and the 4th time we call
19+ # consume it will go to the dead letter queue
20+ max_retries = 3
21+ for retry in range(max_retries+1):
22+ next(q.consume(limit=1, timeout=5.0))
23+ self.assertEqual(build_worker.messages_seen, [])
24+ self.assertEqual(result_worker.messages_seen, [])
25+ with conn.SimpleQueue(constants.DEAD_LETTER_QUEUE) as q:
26+ message = q.get(block=False, timeout=5.0)
27+ self.assertEqual(message.payload.get('test'), 'value')
28+
29
30 class LoggingConsumer(object):
31 """A consumer callback object that acks and logs all received payloads."""
32
33=== modified file 'snappy_proposed_selftest_agent/worker.py'
34--- snappy_proposed_selftest_agent/worker.py 2015-05-29 13:41:30 +0000
35+++ snappy_proposed_selftest_agent/worker.py 2015-06-04 18:36:09 +0000
36@@ -37,6 +37,7 @@
37 exchange = kombu.Exchange(
38 constants.INPUT_EXCHANGE, type='fanout')
39 self.queue = kombu.Queue(constants.INPUT_QUEUE, exchange)
40+ self.max_retries = 3
41
42 def get_consumers(self, Consumer, channel):
43 """Return consumers instances for all configured queues."""
44@@ -71,10 +72,29 @@
45
46 def process(self, body, message):
47 """ Process incoming messages about candidate packages """
48+ try:
49+ series = body['series']
50+ source_binaries = body['source_binaries']
51+ except KeyError:
52+ retry_count = int(body.get('candidates_retry_count', '0'))
53+ if retry_count < self.max_retries:
54+ body['candidates_retry_count'] = retry_count + 1
55+ with self.connection.SimpleQueue(self.queue) as q:
56+ q.put(body)
57+ else:
58+ logger.error(
59+ "Rejecting message due to retry count exceeding maximum. "
60+ "Message will reside on the dead letter queue",
61+ )
62+ with self.connection.SimpleQueue(
63+ constants.DEAD_LETTER_QUEUE) as q:
64+ q.put(body)
65+ message.ack()
66+ return
67 # Dismiss if it's not the series (derived from ubuntu 'release')
68 # this agent cares about.
69 context_series = constants.RELEASE_MAP[constants.RELEASE]
70- if context_series != body['series']:
71+ if context_series != series:
72 message.ack()
73 return
74
75@@ -82,8 +102,9 @@
76 # in the image manifest, it does not need testing.
77 image_binaries = list(
78 self.get_manifest_binaries(
79- body['source_binaries'], context_series, constants.ARCH)
80+ source_binaries, context_series, constants.ARCH)
81 )
82+
83 if not image_binaries:
84 message.ack()
85 return

Subscribers

People subscribed via source and target branches