Merge lp:~wgrant/launchpad/webhook-retries into lp:launchpad

Proposed by William Grant
Status: Merged
Merged at revision: 17650
Proposed branch: lp:~wgrant/launchpad/webhook-retries
Merge into: lp:launchpad
Prerequisite: lp:~wgrant/launchpad/job-scheduled_start-retries
Diff against target: 383 lines (+181/-25)
5 files modified
lib/lp/services/job/model/job.py (+2/-2)
lib/lp/services/webhooks/interfaces.py (+17/-0)
lib/lp/services/webhooks/model.py (+58/-9)
lib/lp/services/webhooks/tests/test_webhookjob.py (+101/-11)
lib/lp/services/webhooks/tests/test_webservice.py (+3/-3)
To merge this branch: bzr merge lp:~wgrant/launchpad/webhook-retries
Reviewer Review Type Date Requested Status
Colin Watson (community) Approve
Review via email: mp+266193@code.launchpad.net

Commit message

Automatically retry webhook deliveries for 24 hours after they are first attempted.

Description of the change

Automatically retry webhook deliveries for 24 hours after they are first attempted.

Rather than using the normal count-based timeout mechanism, we retry every so often until at least 24 hours have elapsed.

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/model/job.py'
2--- lib/lp/services/job/model/job.py 2015-07-29 08:38:11 +0000
3+++ lib/lp/services/job/model/job.py 2015-07-29 08:38:11 +0000
4@@ -102,8 +102,8 @@
5 JobStatus.FAILED,
6 JobStatus.SUSPENDED,
7 JobStatus.WAITING),
8- JobStatus.FAILED: (),
9- JobStatus.COMPLETED: (),
10+ JobStatus.FAILED: (JobStatus.WAITING,),
11+ JobStatus.COMPLETED: (JobStatus.WAITING,),
12 JobStatus.SUSPENDED:
13 (JobStatus.WAITING,),
14 }
15
16=== modified file 'lib/lp/services/webhooks/interfaces.py'
17--- lib/lp/services/webhooks/interfaces.py 2015-07-20 07:01:40 +0000
18+++ lib/lp/services/webhooks/interfaces.py 2015-07-29 08:38:11 +0000
19@@ -14,6 +14,8 @@
20 'IWebhookJobSource',
21 'IWebhookSource',
22 'IWebhookTarget',
23+ 'WebhookDeliveryFailure',
24+ 'WebhookDeliveryRetry',
25 'WebhookFeatureDisabled',
26 ]
27
28@@ -70,6 +72,16 @@
29 self, "This webhook feature is not available yet.")
30
31
32+class WebhookDeliveryFailure(Exception):
33+ """A webhook delivery failed and should not be retried."""
34+ pass
35+
36+
37+class WebhookDeliveryRetry(Exception):
38+ """A webhook delivery failed and should be retried."""
39+ pass
40+
41+
42 class IWebhook(Interface):
43
44 export_as_webservice_entry(as_of='beta')
45@@ -200,6 +212,11 @@
46 date_created = exported(Datetime(
47 title=_("Date created"), required=True, readonly=True))
48
49+ date_first_sent = exported(Datetime(
50+ title=_("Date first sent"),
51+ description=_("Timestamp of the first delivery attempt."),
52+ required=False, readonly=True))
53+
54 date_sent = exported(Datetime(
55 title=_("Date sent"),
56 description=_("Timestamp of the last delivery attempt."),
57
58=== modified file 'lib/lp/services/webhooks/model.py'
59--- lib/lp/services/webhooks/model.py 2015-07-20 07:01:40 +0000
60+++ lib/lp/services/webhooks/model.py 2015-07-29 08:38:11 +0000
61@@ -10,7 +10,10 @@
62 'WebhookTargetMixin',
63 ]
64
65-import datetime
66+from datetime import (
67+ datetime,
68+ timedelta,
69+ )
70
71 import iso8601
72 from lazr.delegates import delegate_to
73@@ -18,7 +21,7 @@
74 DBEnumeratedType,
75 DBItem,
76 )
77-import pytz
78+from pytz import utc
79 from storm.properties import (
80 Bool,
81 DateTime,
82@@ -60,6 +63,8 @@
83 IWebhookJob,
84 IWebhookJobSource,
85 IWebhookSource,
86+ WebhookDeliveryFailure,
87+ WebhookDeliveryRetry,
88 WebhookFeatureDisabled,
89 )
90
91@@ -87,8 +92,8 @@
92
93 registrant_id = Int(name='registrant', allow_none=False)
94 registrant = Reference(registrant_id, 'Person.id')
95- date_created = DateTime(tzinfo=pytz.UTC, allow_none=False)
96- date_last_modified = DateTime(tzinfo=pytz.UTC, allow_none=False)
97+ date_created = DateTime(tzinfo=utc, allow_none=False)
98+ date_last_modified = DateTime(tzinfo=utc, allow_none=False)
99
100 delivery_url = Unicode(allow_none=False)
101 active = Bool(default=True, allow_none=False)
102@@ -247,7 +252,7 @@
103 def deleteByIDs(webhookjob_ids):
104 """See `IWebhookJobSource`."""
105 # Assumes that Webhook's PK is its FK to Job.id.
106- webookjob_ids = list(webhookjob_ids)
107+ webhookjob_ids = list(webhookjob_ids)
108 IStore(WebhookJob).find(
109 WebhookJob, WebhookJob.job_id.is_in(webhookjob_ids)).remove()
110 IStore(Job).find(Job, Job.id.is_in(webhookjob_ids)).remove()
111@@ -288,6 +293,12 @@
112
113 class_job_type = WebhookJobType.DELIVERY
114
115+ retry_error_types = (WebhookDeliveryRetry,)
116+
117+ # Effectively infinite, as we give up by checking
118+ # retry_automatically and raising a fatal exception instead.
119+ max_retries = 1000
120+
121 config = config.IWebhookDeliveryJobSource
122
123 @classmethod
124@@ -306,10 +317,25 @@
125 def successful(self):
126 if 'result' not in self.json_data:
127 return None
128- if 'connection_error' in self.json_data['result']:
129- return False
130+ return self.failure_detail is None
131+
132+ @property
133+ def failure_detail(self):
134+ if 'result' not in self.json_data:
135+ return None
136+ connection_error = self.json_data['result'].get('connection_error')
137+ if connection_error is not None:
138+ return 'Connection error: %s' % connection_error
139 status_code = self.json_data['result']['response']['status_code']
140- return 200 <= status_code <= 299
141+ if 200 <= status_code <= 299:
142+ return None
143+ return 'Bad HTTP response: %d' % status_code
144+
145+ @property
146+ def date_first_sent(self):
147+ if 'date_first_sent' not in self.json_data:
148+ return None
149+ return iso8601.parse_date(self.json_data['date_first_sent'])
150
151 @property
152 def date_sent(self):
153@@ -321,6 +347,21 @@
154 def payload(self):
155 return self.json_data['payload']
156
157+ @property
158+ def _time_since_first_attempt(self):
159+ return datetime.now(utc) - (self.date_first_sent or self.date_created)
160+
161+ @property
162+ def retry_automatically(self):
163+ return self._time_since_first_attempt < timedelta(days=1)
164+
165+ @property
166+ def retry_delay(self):
167+ if self._time_since_first_attempt < timedelta(hours=1):
168+ return timedelta(minutes=5)
169+ else:
170+ return timedelta(hours=1)
171+
172 def run(self):
173 result = getUtility(IWebhookClient).deliver(
174 self.webhook.delivery_url, config.webhooks.http_proxy,
175@@ -334,5 +375,13 @@
176 del result[direction][attr]
177 updated_data = self.json_data
178 updated_data['result'] = result
179- updated_data['date_sent'] = datetime.datetime.now(pytz.UTC).isoformat()
180+ updated_data['date_sent'] = datetime.now(utc).isoformat()
181+ if 'date_first_sent' not in updated_data:
182+ updated_data['date_first_sent'] = updated_data['date_sent']
183 self.json_data = updated_data
184+
185+ if not self.successful:
186+ if self.retry_automatically:
187+ raise WebhookDeliveryRetry()
188+ else:
189+ raise WebhookDeliveryFailure(self.failure_detail)
190
191=== modified file 'lib/lp/services/webhooks/tests/test_webhookjob.py'
192--- lib/lp/services/webhooks/tests/test_webhookjob.py 2015-07-17 01:17:05 +0000
193+++ lib/lp/services/webhooks/tests/test_webhookjob.py 2015-07-29 08:38:11 +0000
194@@ -5,6 +5,8 @@
195
196 __metaclass__ = type
197
198+from datetime import timedelta
199+
200 from httmock import (
201 HTTMock,
202 urlmatch,
203@@ -16,6 +18,7 @@
204 Contains,
205 ContainsDict,
206 Equals,
207+ GreaterThan,
208 Is,
209 KeysEqual,
210 MatchesAll,
211@@ -235,15 +238,15 @@
212 self.assertEqual([], oopses.oopses)
213
214 def test_run_404(self):
215- # The job succeeds even if the response is an error. A job only
216- # fails if it was definitely a problem on our end.
217+ # A request that returns a non-2xx response is a failure and
218+ # gets retried.
219 with CaptureOops() as oopses:
220 job, reqs = self.makeAndRunJob(response_status=404)
221 self.assertThat(
222 job,
223 MatchesStructure(
224- status=Equals(JobStatus.COMPLETED),
225- pending=Equals(False),
226+ status=Equals(JobStatus.WAITING),
227+ pending=Equals(True),
228 successful=Equals(False),
229 date_sent=Not(Is(None)),
230 json_data=ContainsDict(
231@@ -256,16 +259,16 @@
232 self.assertEqual([], oopses.oopses)
233
234 def test_run_connection_error(self):
235- # Jobs that fail to connecthave a connection_error rather than a
236- # response.
237+ # Jobs that fail to connect have a connection_error rather than a
238+ # response. They too trigger a retry.
239 with CaptureOops() as oopses:
240 job, reqs = self.makeAndRunJob(
241 raises=requests.ConnectionError('Connection refused'))
242 self.assertThat(
243 job,
244 MatchesStructure(
245- status=Equals(JobStatus.COMPLETED),
246- pending=Equals(False),
247+ status=Equals(JobStatus.WAITING),
248+ pending=Equals(True),
249 successful=Equals(False),
250 date_sent=Not(Is(None)),
251 json_data=ContainsDict(
252@@ -298,6 +301,90 @@
253 self.assertEqual(
254 'No webhook proxy configured.', oopses.oopses[0]['value'])
255
256+ def test_date_first_sent(self):
257+ job, reqs = self.makeAndRunJob(response_status=404)
258+ self.assertEqual(job.date_first_sent, job.date_sent)
259+ orig_first_sent = job.date_first_sent
260+ self.assertEqual(JobStatus.WAITING, job.status)
261+ self.assertEqual(1, job.attempt_count)
262+ job.lease_expires = None
263+ job.scheduled_start = None
264+ with dbuser("webhookrunner"):
265+ JobRunner([job]).runAll()
266+ self.assertEqual(JobStatus.WAITING, job.status)
267+ self.assertEqual(2, job.attempt_count)
268+ self.assertNotEqual(job.date_first_sent, job.date_sent)
269+ self.assertEqual(orig_first_sent, job.date_first_sent)
270+
271+ def test_retry_delay(self):
272+ # Deliveries are retried every 5 minutes for the first hour, and
273+ # every hour thereafter.
274+ job, reqs = self.makeAndRunJob(response_status=404)
275+ self.assertEqual(timedelta(minutes=5), job.retry_delay)
276+ job.json_data['date_first_sent'] = (
277+ job.date_first_sent - timedelta(minutes=30)).isoformat()
278+ self.assertEqual(timedelta(minutes=5), job.retry_delay)
279+ job.json_data['date_first_sent'] = (
280+ job.date_first_sent - timedelta(minutes=30)).isoformat()
281+ self.assertEqual(timedelta(hours=1), job.retry_delay)
282+
283+ def test_retry_automatically(self):
284+ # Deliveries are automatically retried until 24 hours after the
285+ # initial attempt.
286+ job, reqs = self.makeAndRunJob(response_status=404)
287+ self.assertTrue(job.retry_automatically)
288+ job.json_data['date_first_sent'] = (
289+ job.date_first_sent - timedelta(hours=24)).isoformat()
290+ self.assertFalse(job.retry_automatically)
291+
292+ def test_automatic_retries(self):
293+ hook = self.factory.makeWebhook()
294+ job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'})
295+
296+ client = MockWebhookClient(response_status=404)
297+ self.useFixture(ZopeUtilityFixture(client, IWebhookClient))
298+
299+ def run_job(job):
300+ with dbuser("webhookrunner"):
301+ runner = JobRunner([job])
302+ runner.runAll()
303+ if len(runner.completed_jobs) == 1 and not runner.incomplete_jobs:
304+ return True
305+ if len(runner.incomplete_jobs) == 1 and not runner.completed_jobs:
306+ job.lease_expires = None
307+ return False
308+ if not runner.incomplete_jobs and not runner.completed_jobs:
309+ return None
310+ raise Exception("Unexpected jobs.")
311+
312+ # The first attempt fails but schedules a retry five minutes later.
313+ self.assertEqual(False, run_job(job))
314+ self.assertEqual(JobStatus.WAITING, job.status)
315+ self.assertEqual(False, job.successful)
316+ self.assertTrue(job.pending)
317+ self.assertIsNot(None, job.date_sent)
318+ last_date_sent = job.date_sent
319+
320+ # Pretend we're five minutes in the future and try again. The
321+ # job will be retried again.
322+ job.json_data['date_first_sent'] = (
323+ job.date_first_sent - timedelta(minutes=5)).isoformat()
324+ job.scheduled_start -= timedelta(minutes=5)
325+ self.assertEqual(False, run_job(job))
326+ self.assertEqual(JobStatus.WAITING, job.status)
327+ self.assertEqual(False, job.successful)
328+ self.assertTrue(job.pending)
329+ self.assertThat(job.date_sent, GreaterThan(last_date_sent))
330+
331+ # If the job was first tried a day ago, the next attempt gives up.
332+ job.json_data['date_first_sent'] = (
333+ job.date_first_sent - timedelta(hours=24)).isoformat()
334+ job.scheduled_start -= timedelta(hours=24)
335+ self.assertEqual(False, run_job(job))
336+ self.assertEqual(JobStatus.FAILED, job.status)
337+ self.assertEqual(False, job.successful)
338+ self.assertFalse(job.pending)
339+
340
341 class TestViaCronscript(TestCaseWithFactory):
342
343@@ -313,9 +400,12 @@
344 'cronscripts/process-job-source.py', ['IWebhookDeliveryJobSource'],
345 expect_returncode=0)
346 self.assertEqual('', stdout)
347- self.assertIn('INFO Ran 1 WebhookDeliveryJob jobs.\n', stderr)
348+ self.assertIn(
349+ 'WARNING Scheduling retry due to WebhookDeliveryRetry', stderr)
350+ self.assertIn(
351+ 'INFO 1 WebhookDeliveryJob jobs did not complete.\n', stderr)
352
353- self.assertEqual(JobStatus.COMPLETED, job.status)
354+ self.assertEqual(JobStatus.WAITING, job.status)
355 self.assertIn(
356 'Cannot connect to proxy',
357 job.json_data['result']['connection_error'])
358@@ -335,7 +425,7 @@
359 job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'})
360 transaction.commit()
361
362- self.assertEqual(JobStatus.COMPLETED, job.status)
363+ self.assertEqual(JobStatus.WAITING, job.status)
364 self.assertIn(
365 'Cannot connect to proxy',
366 job.json_data['result']['connection_error'])
367
368=== modified file 'lib/lp/services/webhooks/tests/test_webservice.py'
369--- lib/lp/services/webhooks/tests/test_webservice.py 2015-07-20 07:01:40 +0000
370+++ lib/lp/services/webhooks/tests/test_webservice.py 2015-07-29 08:38:11 +0000
371@@ -160,9 +160,9 @@
372 representation,
373 MatchesAll(
374 KeysEqual(
375- 'date_created', 'date_sent', 'http_etag', 'payload',
376- 'pending', 'resource_type_link', 'self_link',
377- 'successful', 'web_link', 'webhook_link'),
378+ 'date_created', 'date_first_sent', 'date_sent',
379+ 'http_etag', 'payload', 'pending', 'resource_type_link',
380+ 'self_link', 'successful', 'web_link', 'webhook_link'),
381 ContainsDict(
382 {'payload': Equals({'ping': True}),
383 'pending': Equals(True),