Merge lp:~wgrant/launchpad/webhook-delivery-tweaks into lp:launchpad
- webhook-delivery-tweaks
- Merge into devel
Proposed by
William Grant
Status: | Merged | ||||
---|---|---|---|---|---|
Merged at revision: | 17671 | ||||
Proposed branch: | lp:~wgrant/launchpad/webhook-delivery-tweaks | ||||
Merge into: | lp:launchpad | ||||
Prerequisite: | lp:~wgrant/launchpad/job-configurable-lease-duration | ||||
Diff against target: |
439 lines (+167/-43) 6 files modified
lib/lp/services/job/tests/test_celery_configuration.py (+2/-0) lib/lp/services/webhooks/client.py (+23/-4) lib/lp/services/webhooks/interfaces.py (+8/-1) lib/lp/services/webhooks/model.py (+15/-1) lib/lp/services/webhooks/tests/test_webhookjob.py (+117/-34) lib/lp/testing/factory.py (+2/-3) |
||||
To merge this branch: | bzr merge lp:~wgrant/launchpad/webhook-delivery-tweaks | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Colin Watson (community) | Approve | ||
Review via email: mp+266703@code.launchpad.net |
Commit message
Webhook delivery tweaks: 30 second timeout, User-Agent, payload signatures, and faster retries.
Description of the change
Webhook delivery tweaks: 30 second timeout, User-Agent, payload signatures, and faster retries.
To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'lib/lp/services/job/tests/test_celery_configuration.py' |
2 | --- lib/lp/services/job/tests/test_celery_configuration.py 2015-08-03 10:16:49 +0000 |
3 | +++ lib/lp/services/job/tests/test_celery_configuration.py 2015-08-04 00:12:30 +0000 |
4 | @@ -4,6 +4,8 @@ |
5 | from contextlib import contextmanager |
6 | from testtools.matchers import MatchesRegex |
7 | |
8 | +from testtools.matchers import MatchesRegex |
9 | + |
10 | from lp.services.config import config |
11 | from lp.testing import TestCase |
12 | from lp.testing.layers import RabbitMQLayer |
13 | |
14 | === modified file 'lib/lp/services/webhooks/client.py' |
15 | --- lib/lp/services/webhooks/client.py 2015-07-12 23:36:55 +0000 |
16 | +++ lib/lp/services/webhooks/client.py 2015-08-04 00:12:30 +0000 |
17 | @@ -8,16 +8,32 @@ |
18 | 'WebhookClient', |
19 | ] |
20 | |
21 | +import hashlib |
22 | +import hmac |
23 | +import json |
24 | + |
25 | import requests |
26 | from zope.interface import implementer |
27 | |
28 | from lp.services.webhooks.interfaces import IWebhookClient |
29 | |
30 | |
31 | +def create_request(user_agent, secret, payload): |
32 | + body = json.dumps(payload) |
33 | + headers = { |
34 | + 'User-Agent': user_agent, |
35 | + 'Content-Type': 'application/json', |
36 | + } |
37 | + if secret is not None: |
38 | + hexdigest = hmac.new(secret, body, digestmod=hashlib.sha1).hexdigest() |
39 | + headers['X-Hub-Signature'] = 'sha1=%s' % hexdigest |
40 | + return (body, headers) |
41 | + |
42 | + |
43 | @implementer(IWebhookClient) |
44 | class WebhookClient: |
45 | |
46 | - def deliver(self, url, proxy, payload): |
47 | + def deliver(self, url, proxy, user_agent, timeout, secret, payload): |
48 | """See `IWebhookClient`.""" |
49 | # We never want to execute a job if there's no proxy configured, as |
50 | # we'd then be sending near-arbitrary requests from a trusted |
51 | @@ -32,8 +48,11 @@ |
52 | session = requests.Session() |
53 | session.trust_env = False |
54 | session.headers = {} |
55 | - preq = session.prepare_request( |
56 | - requests.Request('POST', url, json=payload)) |
57 | + |
58 | + body, headers = create_request(user_agent, secret, payload) |
59 | + preq = session.prepare_request(requests.Request( |
60 | + 'POST', url, data=body, headers=headers)) |
61 | + |
62 | result = { |
63 | 'request': { |
64 | 'url': url, |
65 | @@ -43,7 +62,7 @@ |
66 | }, |
67 | } |
68 | try: |
69 | - resp = session.send(preq, proxies=proxies) |
70 | + resp = session.send(preq, proxies=proxies, timeout=timeout) |
71 | result['response'] = { |
72 | 'status_code': resp.status_code, |
73 | 'headers': dict(resp.headers), |
74 | |
75 | === modified file 'lib/lp/services/webhooks/interfaces.py' |
76 | --- lib/lp/services/webhooks/interfaces.py 2015-07-29 08:37:51 +0000 |
77 | +++ lib/lp/services/webhooks/interfaces.py 2015-08-04 00:12:30 +0000 |
78 | @@ -249,7 +249,7 @@ |
79 | |
80 | class IWebhookClient(Interface): |
81 | |
82 | - def deliver(self, url, proxy, payload): |
83 | + def deliver(self, url, proxy, user_agent, timeout, secret, payload): |
84 | """Deliver a payload to a webhook endpoint. |
85 | |
86 | Returns a dict of request and response details. The 'request' key |
87 | @@ -260,6 +260,13 @@ |
88 | cannot be the fault of the remote endpoint. For example, a 404 will |
89 | return a response, and a DNS error returns a connection_error, but |
90 | the proxy being offline will raise an exception. |
91 | + |
92 | + The timeout is just given to the underlying requests library, so |
93 | + it only provides connect and inter-read timeouts. A reliable |
94 | + overall request timeout will require another mechanism. |
95 | + |
96 | + If secret is not None, a PubSubHubbub-compatible X-Hub-Signature |
97 | + header will be sent using HMAC-SHA1. |
98 | """ |
99 | |
100 | patch_collection_property(IWebhook, 'deliveries', IWebhookDeliveryJob) |
101 | |
102 | === modified file 'lib/lp/services/webhooks/model.py' |
103 | --- lib/lp/services/webhooks/model.py 2015-07-29 08:31:06 +0000 |
104 | +++ lib/lp/services/webhooks/model.py 2015-08-04 00:12:30 +0000 |
105 | @@ -38,6 +38,7 @@ |
106 | ) |
107 | from zope.security.proxy import removeSecurityProxy |
108 | |
109 | +import lp.app.versioninfo |
110 | from lp.registry.model.person import Person |
111 | from lp.services.config import config |
112 | from lp.services.database.bulk import load_related |
113 | @@ -296,6 +297,13 @@ |
114 | retry_error_types = (WebhookDeliveryRetry,) |
115 | user_error_types = (WebhookDeliveryFailure,) |
116 | |
117 | + # The request timeout is 30 seconds, requests timeouts aren't |
118 | + # totally reliable so we also have a relatively low celery timeout |
119 | + # as a backup. The celery timeout and lease expiry have a bit of |
120 | + # slack to cope with slow job start/finish without conflicts. |
121 | + soft_time_limit = timedelta(seconds=45) |
122 | + lease_duration = timedelta(seconds=60) |
123 | + |
124 | # Effectively infinite, as we give up by checking |
125 | # retry_automatically and raising a fatal exception instead. |
126 | max_retries = 1000 |
127 | @@ -366,14 +374,20 @@ |
128 | |
129 | @property |
130 | def retry_delay(self): |
131 | - if self._time_since_first_attempt < timedelta(hours=1): |
132 | + if self._time_since_first_attempt < timedelta(minutes=10): |
133 | + return timedelta(minutes=1) |
134 | + elif self._time_since_first_attempt < timedelta(hours=1): |
135 | return timedelta(minutes=5) |
136 | else: |
137 | return timedelta(hours=1) |
138 | |
139 | def run(self): |
140 | + user_agent = '%s-Webhooks/r%s' % ( |
141 | + config.vhost.mainsite.hostname, lp.app.versioninfo.revno) |
142 | + secret = self.webhook.secret |
143 | result = getUtility(IWebhookClient).deliver( |
144 | self.webhook.delivery_url, config.webhooks.http_proxy, |
145 | + user_agent, 30, secret.encode('utf-8') if secret else None, |
146 | self.payload) |
147 | # Request and response headers and body may be large, so don't |
148 | # store them in the frequently-used JSON. We could store them in |
149 | |
150 | === modified file 'lib/lp/services/webhooks/tests/test_webhookjob.py' |
151 | --- lib/lp/services/webhooks/tests/test_webhookjob.py 2015-07-29 08:31:06 +0000 |
152 | +++ lib/lp/services/webhooks/tests/test_webhookjob.py 2015-08-04 00:12:30 +0000 |
153 | @@ -5,12 +5,16 @@ |
154 | |
155 | __metaclass__ = type |
156 | |
157 | -from datetime import timedelta |
158 | +from datetime import ( |
159 | + datetime, |
160 | + timedelta, |
161 | + ) |
162 | |
163 | from httmock import ( |
164 | HTTMock, |
165 | urlmatch, |
166 | ) |
167 | +from pytz import utc |
168 | import requests |
169 | from storm.store import Store |
170 | from testtools import TestCase |
171 | @@ -23,18 +27,24 @@ |
172 | KeysEqual, |
173 | LessThan, |
174 | MatchesAll, |
175 | + MatchesDict, |
176 | MatchesStructure, |
177 | Not, |
178 | ) |
179 | import transaction |
180 | from zope.component import getUtility |
181 | +from zope.security.proxy import removeSecurityProxy |
182 | |
183 | +from lp.app.versioninfo import revno |
184 | from lp.services.features.testing import FeatureFixture |
185 | from lp.services.job.interfaces.job import JobStatus |
186 | from lp.services.job.runner import JobRunner |
187 | from lp.services.job.tests import block_on_job |
188 | from lp.services.scripts.tests import run_script |
189 | -from lp.services.webhooks.client import WebhookClient |
190 | +from lp.services.webhooks.client import ( |
191 | + create_request, |
192 | + WebhookClient, |
193 | + ) |
194 | from lp.services.webhooks.interfaces import ( |
195 | IWebhookClient, |
196 | IWebhookDeliveryJob, |
197 | @@ -129,7 +139,7 @@ |
198 | def sendToWebhook(self, response_status=200, raises=None): |
199 | reqs = [] |
200 | |
201 | - @urlmatch(netloc='hookep.com') |
202 | + @urlmatch(netloc='example.com') |
203 | def endpoint_mock(url, request): |
204 | if raises: |
205 | raise raises |
206 | @@ -138,58 +148,89 @@ |
207 | |
208 | with HTTMock(endpoint_mock): |
209 | result = WebhookClient().deliver( |
210 | - 'http://hookep.com/foo', |
211 | - {'http': 'http://squid.example.com:3128'}, |
212 | - {'foo': 'bar'}) |
213 | + 'http://example.com/ep', 'http://squid.example.com:3128', |
214 | + 'TestWebhookClient', 30, 'sekrit', {'foo': 'bar'}) |
215 | |
216 | return reqs, result |
217 | |
218 | + @property |
219 | + def request_matcher(self): |
220 | + return MatchesDict({ |
221 | + 'url': Equals('http://example.com/ep'), |
222 | + 'method': Equals('POST'), |
223 | + 'headers': Equals( |
224 | + {'Content-Type': 'application/json', |
225 | + 'Content-Length': '14', |
226 | + 'User-Agent': 'TestWebhookClient', |
227 | + 'X-Hub-Signature': |
228 | + 'sha1=de75f136c37d89f5eb24834468c1ecd602fa95dd', |
229 | + }), |
230 | + 'body': Equals('{"foo": "bar"}'), |
231 | + }) |
232 | + |
233 | def test_sends_request(self): |
234 | [request], result = self.sendToWebhook() |
235 | - self.assertEqual( |
236 | - {'Content-Type': 'application/json', 'Content-Length': '14'}, |
237 | - result['request']['headers']) |
238 | - self.assertEqual('{"foo": "bar"}', result['request']['body']) |
239 | - self.assertEqual(200, result['response']['status_code']) |
240 | - self.assertEqual({}, result['response']['headers']) |
241 | - self.assertEqual('Content', result['response']['body']) |
242 | + self.assertThat( |
243 | + result, |
244 | + MatchesDict({ |
245 | + 'request': self.request_matcher, |
246 | + 'response': MatchesDict({ |
247 | + 'status_code': Equals(200), |
248 | + 'headers': Equals({}), |
249 | + 'body': Equals('Content'), |
250 | + }), |
251 | + })) |
252 | |
253 | def test_accepts_404(self): |
254 | [request], result = self.sendToWebhook(response_status=404) |
255 | - self.assertEqual( |
256 | - {'Content-Type': 'application/json', 'Content-Length': '14'}, |
257 | - result['request']['headers']) |
258 | - self.assertEqual('{"foo": "bar"}', result['request']['body']) |
259 | - self.assertEqual(404, result['response']['status_code']) |
260 | - self.assertEqual({}, result['response']['headers']) |
261 | - self.assertEqual('Content', result['response']['body']) |
262 | + self.assertThat( |
263 | + result, |
264 | + MatchesDict({ |
265 | + 'request': self.request_matcher, |
266 | + 'response': MatchesDict({ |
267 | + 'status_code': Equals(404), |
268 | + 'headers': Equals({}), |
269 | + 'body': Equals('Content'), |
270 | + }), |
271 | + })) |
272 | |
273 | def test_connection_error(self): |
274 | # Attempts that fail to connect have a connection_error rather |
275 | # than a response. |
276 | reqs, result = self.sendToWebhook( |
277 | raises=requests.ConnectionError('Connection refused')) |
278 | - self.assertNotIn('response', result) |
279 | - self.assertEqual( |
280 | - 'Connection refused', result['connection_error']) |
281 | + self.assertThat( |
282 | + result, |
283 | + MatchesDict({ |
284 | + 'request': self.request_matcher, |
285 | + 'connection_error': Equals('Connection refused'), |
286 | + })) |
287 | self.assertEqual([], reqs) |
288 | |
289 | |
290 | -class MockWebhookClient: |
291 | +class MockWebhookClient(WebhookClient): |
292 | |
293 | def __init__(self, response_status=200, raises=None): |
294 | self.response_status = response_status |
295 | self.raises = raises |
296 | self.requests = [] |
297 | |
298 | - def deliver(self, url, proxy, payload): |
299 | - result = {'request': {}} |
300 | + def deliver(self, url, proxy, user_agent, timeout, secret, payload): |
301 | + body, headers = create_request(user_agent, secret, payload) |
302 | + result = { |
303 | + 'request': { |
304 | + 'url': url, |
305 | + 'method': 'POST', |
306 | + 'headers': headers, |
307 | + 'body': body, |
308 | + }, |
309 | + } |
310 | if isinstance(self.raises, requests.ConnectionError): |
311 | result['connection_error'] = str(self.raises) |
312 | elif self.raises is not None: |
313 | raise self.raises |
314 | else: |
315 | - self.requests.append(('POST', url)) |
316 | + self.requests.append(('POST', url, result['request']['headers'])) |
317 | result['response'] = {'status_code': self.response_status} |
318 | return result |
319 | |
320 | @@ -199,8 +240,10 @@ |
321 | |
322 | layer = ZopelessDatabaseLayer |
323 | |
324 | - def makeAndRunJob(self, response_status=200, raises=None, mock=True): |
325 | - hook = self.factory.makeWebhook(delivery_url=u'http://hookep.com/foo') |
326 | + def makeAndRunJob(self, response_status=200, raises=None, mock=True, |
327 | + secret=None): |
328 | + hook = self.factory.makeWebhook( |
329 | + delivery_url=u'http://example.com/ep', secret=secret) |
330 | job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'}) |
331 | |
332 | client = MockWebhookClient( |
333 | @@ -217,6 +260,21 @@ |
334 | self.assertProvides( |
335 | WebhookDeliveryJob.create(hook, payload={}), IWebhookDeliveryJob) |
336 | |
337 | + def test_short_lease_and_timeout(self): |
338 | + # Webhook jobs have a request timeout of 30 seconds, a celery |
339 | + # timeout of 45 seconds, and a lease of 60 seconds, to give |
340 | + # reasonable time for sluggish things to catch up. |
341 | + hook = self.factory.makeWebhook() |
342 | + job = hook.ping() |
343 | + job.acquireLease() |
344 | + self.assertThat( |
345 | + job.lease_expires - datetime.now(utc), |
346 | + MatchesAll( |
347 | + GreaterThan(timedelta(seconds=50)), |
348 | + LessThan(timedelta(seconds=60)))) |
349 | + self.assertEqual( |
350 | + timedelta(seconds=45), removeSecurityProxy(job).soft_time_limit) |
351 | + |
352 | def test_run_200(self): |
353 | # A request that returns 200 is a success. |
354 | with CaptureOops() as oopses: |
355 | @@ -235,7 +293,26 @@ |
356 | {'response': ContainsDict( |
357 | {'status_code': Equals(200)})}))}))) |
358 | self.assertEqual(1, len(reqs)) |
359 | - self.assertEqual([('POST', 'http://hookep.com/foo')], reqs) |
360 | + self.assertEqual([ |
361 | + ('POST', 'http://example.com/ep', |
362 | + {'Content-Type': 'application/json', |
363 | + 'User-Agent': 'launchpad.dev-Webhooks/r%s' % revno}), |
364 | + ], reqs) |
365 | + self.assertEqual([], oopses.oopses) |
366 | + |
367 | + def test_run_signature(self): |
368 | + # If the webhook has a secret, the request is signed in a |
369 | + # PubSubHubbub-compatible way. |
370 | + with CaptureOops() as oopses: |
371 | + job, reqs = self.makeAndRunJob( |
372 | + response_status=200, secret=u'sekrit') |
373 | + self.assertEqual([ |
374 | + ('POST', 'http://example.com/ep', |
375 | + {'Content-Type': 'application/json', |
376 | + 'User-Agent': 'launchpad.dev-Webhooks/r%s' % revno, |
377 | + 'X-Hub-Signature': |
378 | + 'sha1=de75f136c37d89f5eb24834468c1ecd602fa95dd'}), |
379 | + ], reqs) |
380 | self.assertEqual([], oopses.oopses) |
381 | |
382 | def test_run_404(self): |
383 | @@ -318,9 +395,15 @@ |
384 | self.assertEqual(orig_first_sent, job.date_first_sent) |
385 | |
386 | def test_retry_delay(self): |
387 | - # Deliveries are retried every 5 minutes for the first hour, and |
388 | - # every hour thereafter. |
389 | + # Deliveries are retried every minute for the first 10 minutes, |
390 | + # every 5 minutes up to an hour, and every hour thereafter. |
391 | job, reqs = self.makeAndRunJob(response_status=404) |
392 | + self.assertEqual(timedelta(minutes=1), job.retry_delay) |
393 | + job.json_data['date_first_sent'] = ( |
394 | + job.date_first_sent - timedelta(minutes=5)).isoformat() |
395 | + self.assertEqual(timedelta(minutes=1), job.retry_delay) |
396 | + job.json_data['date_first_sent'] = ( |
397 | + job.date_first_sent - timedelta(minutes=5)).isoformat() |
398 | self.assertEqual(timedelta(minutes=5), job.retry_delay) |
399 | job.json_data['date_first_sent'] = ( |
400 | job.date_first_sent - timedelta(minutes=30)).isoformat() |
401 | @@ -436,7 +519,7 @@ |
402 | layer = ZopelessDatabaseLayer |
403 | |
404 | def test_run_from_cronscript(self): |
405 | - hook = self.factory.makeWebhook(delivery_url=u'http://hookep.com/foo') |
406 | + hook = self.factory.makeWebhook(delivery_url=u'http://example.com/ep') |
407 | job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'}) |
408 | self.assertEqual(JobStatus.WAITING, job.status) |
409 | transaction.commit() |
410 | @@ -462,7 +545,7 @@ |
411 | |
412 | def test_WebhookDeliveryJob(self): |
413 | """WebhookDeliveryJob runs under Celery.""" |
414 | - hook = self.factory.makeWebhook(delivery_url=u'http://hookep.com/foo') |
415 | + hook = self.factory.makeWebhook(delivery_url=u'http://example.com/ep') |
416 | |
417 | self.useFixture(FeatureFixture( |
418 | {'jobs.celery.enabled_classes': 'WebhookDeliveryJob'})) |
419 | |
420 | === modified file 'lib/lp/testing/factory.py' |
421 | --- lib/lp/testing/factory.py 2015-07-30 14:57:06 +0000 |
422 | +++ lib/lp/testing/factory.py 2015-08-04 00:12:30 +0000 |
423 | @@ -4525,14 +4525,13 @@ |
424 | return ProxyFactory( |
425 | LiveFSFile(livefsbuild=livefsbuild, libraryfile=libraryfile)) |
426 | |
427 | - def makeWebhook(self, target=None, delivery_url=None): |
428 | + def makeWebhook(self, target=None, delivery_url=None, secret=None): |
429 | if target is None: |
430 | target = self.makeGitRepository() |
431 | if delivery_url is None: |
432 | delivery_url = self.getUniqueURL().decode('utf-8') |
433 | return getUtility(IWebhookSource).new( |
434 | - target, self.makePerson(), delivery_url, [], True, |
435 | - self.getUniqueUnicode()) |
436 | + target, self.makePerson(), delivery_url, [], True, secret) |
437 | |
438 | def makeSnap(self, registrant=None, owner=None, distroseries=None, |
439 | name=None, branch=None, git_ref=None, |