Merge lp:~wgrant/launchpad/webhook-delivery-tweaks into lp:launchpad

Proposed by William Grant
Status: Merged
Merged at revision: 17671
Proposed branch: lp:~wgrant/launchpad/webhook-delivery-tweaks
Merge into: lp:launchpad
Prerequisite: lp:~wgrant/launchpad/job-configurable-lease-duration
Diff against target: 439 lines (+167/-43)
6 files modified
lib/lp/services/job/tests/test_celery_configuration.py (+2/-0)
lib/lp/services/webhooks/client.py (+23/-4)
lib/lp/services/webhooks/interfaces.py (+8/-1)
lib/lp/services/webhooks/model.py (+15/-1)
lib/lp/services/webhooks/tests/test_webhookjob.py (+117/-34)
lib/lp/testing/factory.py (+2/-3)
To merge this branch: bzr merge lp:~wgrant/launchpad/webhook-delivery-tweaks
Reviewer Review Type Date Requested Status
Colin Watson (community) Approve
Review via email: mp+266703@code.launchpad.net

Commit message

Webhook delivery tweaks: 30 second timeout, User-Agent, payload signatures, and faster retries.

Description of the change

Webhook delivery tweaks: 30 second timeout, User-Agent, payload signatures, and faster retries.

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
2--- lib/lp/services/job/tests/test_celery_configuration.py 2015-08-03 10:16:49 +0000
3+++ lib/lp/services/job/tests/test_celery_configuration.py 2015-08-04 00:12:30 +0000
4@@ -4,6 +4,8 @@
5 from contextlib import contextmanager
6 from testtools.matchers import MatchesRegex
7
8+from testtools.matchers import MatchesRegex
9+
10 from lp.services.config import config
11 from lp.testing import TestCase
12 from lp.testing.layers import RabbitMQLayer
13
14=== modified file 'lib/lp/services/webhooks/client.py'
15--- lib/lp/services/webhooks/client.py 2015-07-12 23:36:55 +0000
16+++ lib/lp/services/webhooks/client.py 2015-08-04 00:12:30 +0000
17@@ -8,16 +8,32 @@
18 'WebhookClient',
19 ]
20
21+import hashlib
22+import hmac
23+import json
24+
25 import requests
26 from zope.interface import implementer
27
28 from lp.services.webhooks.interfaces import IWebhookClient
29
30
31+def create_request(user_agent, secret, payload):
32+ body = json.dumps(payload)
33+ headers = {
34+ 'User-Agent': user_agent,
35+ 'Content-Type': 'application/json',
36+ }
37+ if secret is not None:
38+ hexdigest = hmac.new(secret, body, digestmod=hashlib.sha1).hexdigest()
39+ headers['X-Hub-Signature'] = 'sha1=%s' % hexdigest
40+ return (body, headers)
41+
42+
43 @implementer(IWebhookClient)
44 class WebhookClient:
45
46- def deliver(self, url, proxy, payload):
47+ def deliver(self, url, proxy, user_agent, timeout, secret, payload):
48 """See `IWebhookClient`."""
49 # We never want to execute a job if there's no proxy configured, as
50 # we'd then be sending near-arbitrary requests from a trusted
51@@ -32,8 +48,11 @@
52 session = requests.Session()
53 session.trust_env = False
54 session.headers = {}
55- preq = session.prepare_request(
56- requests.Request('POST', url, json=payload))
57+
58+ body, headers = create_request(user_agent, secret, payload)
59+ preq = session.prepare_request(requests.Request(
60+ 'POST', url, data=body, headers=headers))
61+
62 result = {
63 'request': {
64 'url': url,
65@@ -43,7 +62,7 @@
66 },
67 }
68 try:
69- resp = session.send(preq, proxies=proxies)
70+ resp = session.send(preq, proxies=proxies, timeout=timeout)
71 result['response'] = {
72 'status_code': resp.status_code,
73 'headers': dict(resp.headers),
74
75=== modified file 'lib/lp/services/webhooks/interfaces.py'
76--- lib/lp/services/webhooks/interfaces.py 2015-07-29 08:37:51 +0000
77+++ lib/lp/services/webhooks/interfaces.py 2015-08-04 00:12:30 +0000
78@@ -249,7 +249,7 @@
79
80 class IWebhookClient(Interface):
81
82- def deliver(self, url, proxy, payload):
83+ def deliver(self, url, proxy, user_agent, timeout, secret, payload):
84 """Deliver a payload to a webhook endpoint.
85
86 Returns a dict of request and response details. The 'request' key
87@@ -260,6 +260,13 @@
88 cannot be the fault of the remote endpoint. For example, a 404 will
89 return a response, and a DNS error returns a connection_error, but
90 the proxy being offline will raise an exception.
91+
92+ The timeout is just given to the underlying requests library, so
93+ it only provides connect and inter-read timeouts. A reliable
94+ overall request timeout will require another mechanism.
95+
96+ If secret is not None, a PubSubHubbub-compatible X-Hub-Signature
97+ header will be sent using HMAC-SHA1.
98 """
99
100 patch_collection_property(IWebhook, 'deliveries', IWebhookDeliveryJob)
101
102=== modified file 'lib/lp/services/webhooks/model.py'
103--- lib/lp/services/webhooks/model.py 2015-07-29 08:31:06 +0000
104+++ lib/lp/services/webhooks/model.py 2015-08-04 00:12:30 +0000
105@@ -38,6 +38,7 @@
106 )
107 from zope.security.proxy import removeSecurityProxy
108
109+import lp.app.versioninfo
110 from lp.registry.model.person import Person
111 from lp.services.config import config
112 from lp.services.database.bulk import load_related
113@@ -296,6 +297,13 @@
114 retry_error_types = (WebhookDeliveryRetry,)
115 user_error_types = (WebhookDeliveryFailure,)
116
117+ # The request timeout is 30 seconds, requests timeouts aren't
118+ # totally reliable so we also have a relatively low celery timeout
119+ # as a backup. The celery timeout and lease expiry have a bit of
120+ # slack to cope with slow job start/finish without conflicts.
121+ soft_time_limit = timedelta(seconds=45)
122+ lease_duration = timedelta(seconds=60)
123+
124 # Effectively infinite, as we give up by checking
125 # retry_automatically and raising a fatal exception instead.
126 max_retries = 1000
127@@ -366,14 +374,20 @@
128
129 @property
130 def retry_delay(self):
131- if self._time_since_first_attempt < timedelta(hours=1):
132+ if self._time_since_first_attempt < timedelta(minutes=10):
133+ return timedelta(minutes=1)
134+ elif self._time_since_first_attempt < timedelta(hours=1):
135 return timedelta(minutes=5)
136 else:
137 return timedelta(hours=1)
138
139 def run(self):
140+ user_agent = '%s-Webhooks/r%s' % (
141+ config.vhost.mainsite.hostname, lp.app.versioninfo.revno)
142+ secret = self.webhook.secret
143 result = getUtility(IWebhookClient).deliver(
144 self.webhook.delivery_url, config.webhooks.http_proxy,
145+ user_agent, 30, secret.encode('utf-8') if secret else None,
146 self.payload)
147 # Request and response headers and body may be large, so don't
148 # store them in the frequently-used JSON. We could store them in
149
150=== modified file 'lib/lp/services/webhooks/tests/test_webhookjob.py'
151--- lib/lp/services/webhooks/tests/test_webhookjob.py 2015-07-29 08:31:06 +0000
152+++ lib/lp/services/webhooks/tests/test_webhookjob.py 2015-08-04 00:12:30 +0000
153@@ -5,12 +5,16 @@
154
155 __metaclass__ = type
156
157-from datetime import timedelta
158+from datetime import (
159+ datetime,
160+ timedelta,
161+ )
162
163 from httmock import (
164 HTTMock,
165 urlmatch,
166 )
167+from pytz import utc
168 import requests
169 from storm.store import Store
170 from testtools import TestCase
171@@ -23,18 +27,24 @@
172 KeysEqual,
173 LessThan,
174 MatchesAll,
175+ MatchesDict,
176 MatchesStructure,
177 Not,
178 )
179 import transaction
180 from zope.component import getUtility
181+from zope.security.proxy import removeSecurityProxy
182
183+from lp.app.versioninfo import revno
184 from lp.services.features.testing import FeatureFixture
185 from lp.services.job.interfaces.job import JobStatus
186 from lp.services.job.runner import JobRunner
187 from lp.services.job.tests import block_on_job
188 from lp.services.scripts.tests import run_script
189-from lp.services.webhooks.client import WebhookClient
190+from lp.services.webhooks.client import (
191+ create_request,
192+ WebhookClient,
193+ )
194 from lp.services.webhooks.interfaces import (
195 IWebhookClient,
196 IWebhookDeliveryJob,
197@@ -129,7 +139,7 @@
198 def sendToWebhook(self, response_status=200, raises=None):
199 reqs = []
200
201- @urlmatch(netloc='hookep.com')
202+ @urlmatch(netloc='example.com')
203 def endpoint_mock(url, request):
204 if raises:
205 raise raises
206@@ -138,58 +148,89 @@
207
208 with HTTMock(endpoint_mock):
209 result = WebhookClient().deliver(
210- 'http://hookep.com/foo',
211- {'http': 'http://squid.example.com:3128'},
212- {'foo': 'bar'})
213+ 'http://example.com/ep', 'http://squid.example.com:3128',
214+ 'TestWebhookClient', 30, 'sekrit', {'foo': 'bar'})
215
216 return reqs, result
217
218+ @property
219+ def request_matcher(self):
220+ return MatchesDict({
221+ 'url': Equals('http://example.com/ep'),
222+ 'method': Equals('POST'),
223+ 'headers': Equals(
224+ {'Content-Type': 'application/json',
225+ 'Content-Length': '14',
226+ 'User-Agent': 'TestWebhookClient',
227+ 'X-Hub-Signature':
228+ 'sha1=de75f136c37d89f5eb24834468c1ecd602fa95dd',
229+ }),
230+ 'body': Equals('{"foo": "bar"}'),
231+ })
232+
233 def test_sends_request(self):
234 [request], result = self.sendToWebhook()
235- self.assertEqual(
236- {'Content-Type': 'application/json', 'Content-Length': '14'},
237- result['request']['headers'])
238- self.assertEqual('{"foo": "bar"}', result['request']['body'])
239- self.assertEqual(200, result['response']['status_code'])
240- self.assertEqual({}, result['response']['headers'])
241- self.assertEqual('Content', result['response']['body'])
242+ self.assertThat(
243+ result,
244+ MatchesDict({
245+ 'request': self.request_matcher,
246+ 'response': MatchesDict({
247+ 'status_code': Equals(200),
248+ 'headers': Equals({}),
249+ 'body': Equals('Content'),
250+ }),
251+ }))
252
253 def test_accepts_404(self):
254 [request], result = self.sendToWebhook(response_status=404)
255- self.assertEqual(
256- {'Content-Type': 'application/json', 'Content-Length': '14'},
257- result['request']['headers'])
258- self.assertEqual('{"foo": "bar"}', result['request']['body'])
259- self.assertEqual(404, result['response']['status_code'])
260- self.assertEqual({}, result['response']['headers'])
261- self.assertEqual('Content', result['response']['body'])
262+ self.assertThat(
263+ result,
264+ MatchesDict({
265+ 'request': self.request_matcher,
266+ 'response': MatchesDict({
267+ 'status_code': Equals(404),
268+ 'headers': Equals({}),
269+ 'body': Equals('Content'),
270+ }),
271+ }))
272
273 def test_connection_error(self):
274 # Attempts that fail to connect have a connection_error rather
275 # than a response.
276 reqs, result = self.sendToWebhook(
277 raises=requests.ConnectionError('Connection refused'))
278- self.assertNotIn('response', result)
279- self.assertEqual(
280- 'Connection refused', result['connection_error'])
281+ self.assertThat(
282+ result,
283+ MatchesDict({
284+ 'request': self.request_matcher,
285+ 'connection_error': Equals('Connection refused'),
286+ }))
287 self.assertEqual([], reqs)
288
289
290-class MockWebhookClient:
291+class MockWebhookClient(WebhookClient):
292
293 def __init__(self, response_status=200, raises=None):
294 self.response_status = response_status
295 self.raises = raises
296 self.requests = []
297
298- def deliver(self, url, proxy, payload):
299- result = {'request': {}}
300+ def deliver(self, url, proxy, user_agent, timeout, secret, payload):
301+ body, headers = create_request(user_agent, secret, payload)
302+ result = {
303+ 'request': {
304+ 'url': url,
305+ 'method': 'POST',
306+ 'headers': headers,
307+ 'body': body,
308+ },
309+ }
310 if isinstance(self.raises, requests.ConnectionError):
311 result['connection_error'] = str(self.raises)
312 elif self.raises is not None:
313 raise self.raises
314 else:
315- self.requests.append(('POST', url))
316+ self.requests.append(('POST', url, result['request']['headers']))
317 result['response'] = {'status_code': self.response_status}
318 return result
319
320@@ -199,8 +240,10 @@
321
322 layer = ZopelessDatabaseLayer
323
324- def makeAndRunJob(self, response_status=200, raises=None, mock=True):
325- hook = self.factory.makeWebhook(delivery_url=u'http://hookep.com/foo')
326+ def makeAndRunJob(self, response_status=200, raises=None, mock=True,
327+ secret=None):
328+ hook = self.factory.makeWebhook(
329+ delivery_url=u'http://example.com/ep', secret=secret)
330 job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'})
331
332 client = MockWebhookClient(
333@@ -217,6 +260,21 @@
334 self.assertProvides(
335 WebhookDeliveryJob.create(hook, payload={}), IWebhookDeliveryJob)
336
337+ def test_short_lease_and_timeout(self):
338+ # Webhook jobs have a request timeout of 30 seconds, a celery
339+ # timeout of 45 seconds, and a lease of 60 seconds, to give
340+ # reasonable time for sluggish things to catch up.
341+ hook = self.factory.makeWebhook()
342+ job = hook.ping()
343+ job.acquireLease()
344+ self.assertThat(
345+ job.lease_expires - datetime.now(utc),
346+ MatchesAll(
347+ GreaterThan(timedelta(seconds=50)),
348+ LessThan(timedelta(seconds=60))))
349+ self.assertEqual(
350+ timedelta(seconds=45), removeSecurityProxy(job).soft_time_limit)
351+
352 def test_run_200(self):
353 # A request that returns 200 is a success.
354 with CaptureOops() as oopses:
355@@ -235,7 +293,26 @@
356 {'response': ContainsDict(
357 {'status_code': Equals(200)})}))})))
358 self.assertEqual(1, len(reqs))
359- self.assertEqual([('POST', 'http://hookep.com/foo')], reqs)
360+ self.assertEqual([
361+ ('POST', 'http://example.com/ep',
362+ {'Content-Type': 'application/json',
363+ 'User-Agent': 'launchpad.dev-Webhooks/r%s' % revno}),
364+ ], reqs)
365+ self.assertEqual([], oopses.oopses)
366+
367+ def test_run_signature(self):
368+ # If the webhook has a secret, the request is signed in a
369+ # PubSubHubbub-compatible way.
370+ with CaptureOops() as oopses:
371+ job, reqs = self.makeAndRunJob(
372+ response_status=200, secret=u'sekrit')
373+ self.assertEqual([
374+ ('POST', 'http://example.com/ep',
375+ {'Content-Type': 'application/json',
376+ 'User-Agent': 'launchpad.dev-Webhooks/r%s' % revno,
377+ 'X-Hub-Signature':
378+ 'sha1=de75f136c37d89f5eb24834468c1ecd602fa95dd'}),
379+ ], reqs)
380 self.assertEqual([], oopses.oopses)
381
382 def test_run_404(self):
383@@ -318,9 +395,15 @@
384 self.assertEqual(orig_first_sent, job.date_first_sent)
385
386 def test_retry_delay(self):
387- # Deliveries are retried every 5 minutes for the first hour, and
388- # every hour thereafter.
389+ # Deliveries are retried every minute for the first 10 minutes,
390+ # every 5 minutes up to an hour, and every hour thereafter.
391 job, reqs = self.makeAndRunJob(response_status=404)
392+ self.assertEqual(timedelta(minutes=1), job.retry_delay)
393+ job.json_data['date_first_sent'] = (
394+ job.date_first_sent - timedelta(minutes=5)).isoformat()
395+ self.assertEqual(timedelta(minutes=1), job.retry_delay)
396+ job.json_data['date_first_sent'] = (
397+ job.date_first_sent - timedelta(minutes=5)).isoformat()
398 self.assertEqual(timedelta(minutes=5), job.retry_delay)
399 job.json_data['date_first_sent'] = (
400 job.date_first_sent - timedelta(minutes=30)).isoformat()
401@@ -436,7 +519,7 @@
402 layer = ZopelessDatabaseLayer
403
404 def test_run_from_cronscript(self):
405- hook = self.factory.makeWebhook(delivery_url=u'http://hookep.com/foo')
406+ hook = self.factory.makeWebhook(delivery_url=u'http://example.com/ep')
407 job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'})
408 self.assertEqual(JobStatus.WAITING, job.status)
409 transaction.commit()
410@@ -462,7 +545,7 @@
411
412 def test_WebhookDeliveryJob(self):
413 """WebhookDeliveryJob runs under Celery."""
414- hook = self.factory.makeWebhook(delivery_url=u'http://hookep.com/foo')
415+ hook = self.factory.makeWebhook(delivery_url=u'http://example.com/ep')
416
417 self.useFixture(FeatureFixture(
418 {'jobs.celery.enabled_classes': 'WebhookDeliveryJob'}))
419
420=== modified file 'lib/lp/testing/factory.py'
421--- lib/lp/testing/factory.py 2015-07-30 14:57:06 +0000
422+++ lib/lp/testing/factory.py 2015-08-04 00:12:30 +0000
423@@ -4525,14 +4525,13 @@
424 return ProxyFactory(
425 LiveFSFile(livefsbuild=livefsbuild, libraryfile=libraryfile))
426
427- def makeWebhook(self, target=None, delivery_url=None):
428+ def makeWebhook(self, target=None, delivery_url=None, secret=None):
429 if target is None:
430 target = self.makeGitRepository()
431 if delivery_url is None:
432 delivery_url = self.getUniqueURL().decode('utf-8')
433 return getUtility(IWebhookSource).new(
434- target, self.makePerson(), delivery_url, [], True,
435- self.getUniqueUnicode())
436+ target, self.makePerson(), delivery_url, [], True, secret)
437
438 def makeSnap(self, registrant=None, owner=None, distroseries=None,
439 name=None, branch=None, git_ref=None,