Merge ~pappacena/launchpad:mirror-prober-untwist-db-calls into launchpad:master

Proposed by Thiago F. Pappacena
Status: Merged
Approved by: Thiago F. Pappacena
Approved revision: b234886a7b278833a26b6c66795897cc596d20ab
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~pappacena/launchpad:mirror-prober-untwist-db-calls
Merge into: launchpad:master
Diff against target: 412 lines (+233/-17)
2 files modified
lib/lp/registry/scripts/distributionmirror_prober.py (+101/-16)
lib/lp/registry/tests/test_distributionmirror_prober.py (+132/-1)
Reviewer Review Type Date Requested Status
Colin Watson (community) Approve
Review via email: mp+394494@code.launchpad.net

Commit message

Moving responses processing out of reactor on mirror prober to avoid fake HTTP timeouts

Description of the change

Ideally, we should keep the reactor free to do the HTTP requests as fast as possible, and move all other processing (specially database calls) to outside the event loop. This refactoring is meant to keep track of the callback calls being made, in order to run them after the reactor finishes the job of doing the batch HTTP requests.

Unfortunately, the archive prober needs the response of a first HTTP call to do some database operations, and after that it shoots several other HTTP calls. Since the original bug report is not about the archive prober, we keep this other refactoring for the future.

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) wrote :

This is pretty difficult to follow, but I'm OK with it if it works.

However, did you consider instead using storm.twisted.transact for this? It's designed for this sort of thing, and it might be possible to use it to make things significantly easier to follow.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/registry/scripts/distributionmirror_prober.py b/lib/lp/registry/scripts/distributionmirror_prober.py
2index ca6c4d0..6d2fbf2 100644
3--- a/lib/lp/registry/scripts/distributionmirror_prober.py
4+++ b/lib/lp/registry/scripts/distributionmirror_prober.py
5@@ -582,6 +582,47 @@ class UnknownURLSchemeAfterRedirect(UnknownURLScheme):
6 "to check this kind of URL." % self.url)
7
8
9+class CallScheduler:
10+ """Keep track of the calls done as callback of deferred or directly,
11+ so we can postpone them to after the reactor is done.
12+
13+ The main limitation for deferred callbacks is that we don't deal with
14+ errors here. You should do error handling synchronously on the methods
15+ scheduled.
16+ """
17+ def __init__(self, mirror, series):
18+ self.mirror = mirror
19+ self.series = series
20+ # A list of tuples with the format:
21+ # (is_a_callback, callback_result, method, args, kwargs)
22+ self.calls = []
23+
24+ def sched(self, method, *args, **kwargs):
25+ self.calls.append((False, None, method, args, kwargs))
26+
27+ def schedCallback(self, method, *args, **kwargs):
28+ def callback(result):
29+ self.calls.append((True, result, method, args, kwargs))
30+ return result
31+ return callback
32+
33+ def run(self):
34+ """Runs all the delayed calls, passing forward the result from one
35+ callback to the next.
36+ """
37+ null = object()
38+ last_result = null
39+ for is_callback, result, method, args, kwargs in self.calls:
40+ if is_callback:
41+ # If it was scheduled as a callback, take care of previous
42+ # result.
43+ result = result if last_result is null else last_result
44+ last_result = method(result, *args, **kwargs)
45+ else:
46+ # If it was scheduled as a sync call, just execute the method.
47+ method(*args, **kwargs)
48+
49+
50 class ArchiveMirrorProberCallbacks(LoggingMixin):
51
52 expected_failures = (
53@@ -592,13 +633,15 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
54 InvalidHTTPSCertificateSkipped,
55 )
56
57- def __init__(self, mirror, series, pocket, component, url, log_file):
58+ def __init__(self, mirror, series, pocket, component, url, log_file,
59+ call_sched=None):
60 self.mirror = mirror
61 self.series = series
62 self.pocket = pocket
63 self.component = component
64 self.url = url
65 self.log_file = log_file
66+ self.call_sched = call_sched
67 if IDistroArchSeries.providedBy(series):
68 self.mirror_class_name = 'MirrorDistroArchSeries'
69 self.deleteMethod = self.mirror.deleteMirrorDistroArchSeries
70@@ -661,24 +704,29 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
71 # this host and thus should skip it, so it's better to delete this
72 # MirrorDistroArchSeries/MirrorDistroSeriesSource than to keep
73 # it with an UNKNOWN freshness.
74- self.deleteMethod(self.series, self.pocket, self.component)
75+ self.call_sched.sched(
76+ self.deleteMethod, self.series, self.pocket, self.component)
77 return
78
79 deferredList = []
80 # We start setting the freshness to unknown, and then we move on
81 # trying to find one of the recently published packages mirrored
82 # there.
83- arch_or_source_mirror.freshness = MirrorFreshness.UNKNOWN
84+ self.call_sched.sched(
85+ self.setMirrorFreshnessUnknown, arch_or_source_mirror)
86 for freshness, url in freshness_url_map.items():
87 prober = RedirectAwareProberFactory(url)
88 deferred = request_manager.run(prober.request_host, prober.probe)
89- deferred.addCallback(
90- self.setMirrorFreshness, arch_or_source_mirror, freshness,
91- url)
92 deferred.addErrback(self.logError, url)
93+ deferred.addCallback(self.call_sched.schedCallback(
94+ self.setMirrorFreshness, arch_or_source_mirror,
95+ freshness, url))
96 deferredList.append(deferred)
97 return defer.DeferredList(deferredList)
98
99+ def setMirrorFreshnessUnknown(self, arch_or_source_mirror):
100+ arch_or_source_mirror.freshness = MirrorFreshness.UNKNOWN
101+
102 def setMirrorFreshness(
103 self, http_status, arch_or_source_mirror, freshness, url):
104 """Update the freshness of the given arch or source mirror.
105@@ -688,9 +736,11 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
106 """
107 if freshness < arch_or_source_mirror.freshness:
108 msg = ('Found that %s exists. Updating %s of %s freshness to '
109- '%s.\n' % (url, self.mirror_class_name,
110- self._getSeriesPocketAndComponentDescription(),
111- freshness.title))
112+ '%s.\n')
113+ msg = msg % (
114+ url, self.mirror_class_name,
115+ self._getSeriesPocketAndComponentDescription(),
116+ freshness.title)
117 self.logMessage(msg)
118 arch_or_source_mirror.freshness = freshness
119
120@@ -779,6 +829,15 @@ class MirrorCDImageProberCallbacks(LoggingMixin):
121 "Failed %s: %s\n" % (url, failure.getErrorMessage()))
122 return failure
123
124+ def urlCallback(self, result, url):
125+ """The callback to be called for each URL."""
126+ if isinstance(result, Failure):
127+ self.logMissingURL(result, url)
128+
129+ def finalResultCallback(self, result):
130+ """The callback to be called once all URLs have been probed."""
131+ return self.ensureOrDeleteMirrorCDImageSeries(result)
132+
133
134 def _get_cdimage_file_list():
135 url = config.distributionmirrorprober.cdimage_file_list_url
136@@ -851,23 +910,34 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger,
137 sources_paths = mirror.getExpectedSourcesPaths()
138 all_paths = itertools.chain(packages_paths, sources_paths)
139 request_manager = RequestManager(max_parallel, max_parallel_per_host)
140+
141+ call_scheds = []
142 for series, pocket, component, path in all_paths:
143+ sched = CallScheduler(mirror, series)
144+ call_scheds.append(sched)
145 url = urljoin(base_url, path)
146 callbacks = ArchiveMirrorProberCallbacks(
147- mirror, series, pocket, component, url, logfile)
148+ mirror, series, pocket, component, url, logfile, sched)
149 unchecked_keys.append(url)
150 # APT has supported redirects since 0.7.21 (2009-04-14), so allow
151 # them here too.
152 prober = RedirectAwareProberFactory(url)
153
154 deferred = request_manager.run(prober.request_host, prober.probe)
155+
156+ # XXX pappacena 2020-11-25: This will do some database operation
157+ # inside reactor, which might cause problems like timeouts when
158+ # running HTTP requests. This should be the next optimization point:
159+ # run {ensure|delete}MirrorSeries and gather all mirror freshness URLs
160+ # synchronously here, and ask reactor to run just the HTTP requests.
161 deferred.addCallbacks(
162 callbacks.ensureMirrorSeries, callbacks.deleteMirrorSeries)
163-
164- deferred.addCallback(callbacks.updateMirrorFreshness, request_manager)
165+ deferred.addCallback(
166+ callbacks.updateMirrorFreshness, request_manager)
167 deferred.addErrback(logger.error)
168
169 deferred.addBoth(checkComplete, url, unchecked_keys)
170+ return call_scheds
171
172
173 def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
174@@ -894,6 +964,7 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
175 logger.error(e)
176 return
177
178+ call_scheds = []
179 for series, flavour, paths in cdimage_paths:
180 callbacks = MirrorCDImageProberCallbacks(
181 mirror, series, flavour, logfile)
182@@ -906,14 +977,21 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
183 url = urljoin(base_url, path)
184 # Use a RedirectAwareProberFactory because CD mirrors are allowed
185 # to redirect, and we need to cope with that.
186+ sched = CallScheduler(mirror, series)
187+ call_scheds.append(sched)
188 prober = RedirectAwareProberFactory(url)
189 deferred = request_manager.run(prober.request_host, prober.probe)
190- deferred.addErrback(callbacks.logMissingURL, url)
191+ deferred.addErrback(
192+ sched.schedCallback(callbacks.urlCallback, url))
193 deferredList.append(deferred)
194
195+ sched = CallScheduler(mirror, series)
196+ call_scheds.append(sched)
197 deferredList = defer.DeferredList(deferredList, consumeErrors=True)
198- deferredList.addCallback(callbacks.ensureOrDeleteMirrorCDImageSeries)
199+ deferredList.addCallback(
200+ sched.schedCallback(callbacks.finalResultCallback))
201 deferredList.addCallback(checkComplete, mirror_key, unchecked_keys)
202+ return call_scheds
203
204
205 def should_skip_host(host):
206@@ -1029,6 +1107,7 @@ class DistroMirrorProber:
207 logfiles = {}
208 probed_mirrors = []
209
210+ all_scheduled_calls = []
211 for mirror_id in mirror_ids:
212 mirror = mirror_set[mirror_id]
213 if not self._sanity_check_mirror(mirror):
214@@ -1047,12 +1126,18 @@ class DistroMirrorProber:
215 probed_mirrors.append(mirror)
216 logfile = six.StringIO()
217 logfiles[mirror_id] = logfile
218- probe_function(mirror, logfile, unchecked_keys, self.logger,
219- max_parallel, max_parallel_per_host)
220+ prob_scheduled_calls = probe_function(
221+ mirror, logfile, unchecked_keys, self.logger,
222+ max_parallel, max_parallel_per_host)
223+ all_scheduled_calls += prob_scheduled_calls
224
225 if probed_mirrors:
226 reactor.run()
227 self.logger.info('Probed %d mirrors.' % len(probed_mirrors))
228+ self.logger.info(
229+ 'Starting to update mirrors statuses outside reactor now.')
230+ for sched_calls in all_scheduled_calls:
231+ sched_calls.run()
232 else:
233 self.logger.info('No mirrors to probe.')
234
235diff --git a/lib/lp/registry/tests/test_distributionmirror_prober.py b/lib/lp/registry/tests/test_distributionmirror_prober.py
236index 4e6abab..ee7780f 100644
237--- a/lib/lp/registry/tests/test_distributionmirror_prober.py
238+++ b/lib/lp/registry/tests/test_distributionmirror_prober.py
239@@ -8,6 +8,8 @@ __metaclass__ = type
240 from datetime import datetime
241 import logging
242 import os
243+import re
244+from textwrap import dedent
245
246 from fixtures import MockPatchObject
247 from lazr.uri import URI
248@@ -25,6 +27,7 @@ from testtools.twistedsupport import (
249 AsynchronousDeferredRunTest,
250 AsynchronousDeferredRunTestForBrokenTwisted,
251 )
252+import transaction
253 from twisted.internet import (
254 defer,
255 reactor,
256@@ -37,8 +40,18 @@ from zope.component import getUtility
257 from zope.security.proxy import removeSecurityProxy
258
259 from lp.app.interfaces.launchpad import ILaunchpadCelebrities
260+from lp.registry.interfaces.distributionmirror import (
261+ MirrorContent,
262+ MirrorStatus,
263+ )
264 from lp.registry.interfaces.pocket import PackagePublishingPocket
265-from lp.registry.model.distributionmirror import DistributionMirror
266+from lp.registry.model.distributionmirror import (
267+ DistributionMirror,
268+ MirrorCDImageDistroSeries,
269+ MirrorDistroArchSeries,
270+ MirrorDistroSeriesSource,
271+ MirrorProbeRecord,
272+ )
273 from lp.registry.scripts import distributionmirror_prober
274 from lp.registry.scripts.distributionmirror_prober import (
275 _get_cdimage_file_list,
276@@ -72,14 +85,18 @@ from lp.registry.tests.distributionmirror_http_server import (
277 )
278 from lp.services.config import config
279 from lp.services.daemons.tachandler import TacTestSetup
280+from lp.services.database.interfaces import IStore
281 from lp.services.httpproxy.connect_tunneling import TunnelingAgent
282 from lp.services.timeout import default_timeout
283 from lp.testing import (
284+ admin_logged_in,
285 clean_up_reactor,
286+ run_script,
287 TestCase,
288 TestCaseWithFactory,
289 )
290 from lp.testing.layers import (
291+ LaunchpadZopelessLayer,
292 TwistedLayer,
293 ZopelessDatabaseLayer,
294 )
295@@ -1174,3 +1191,117 @@ class TestLoggingMixin(TestCase):
296 logger.log_file.seek(0)
297 message = logger.log_file.read()
298 self.assertNotEqual(None, message)
299+
300+
301+class TestDistroMirrorProberFunctional(TestCaseWithFactory):
302+
303+ layer = LaunchpadZopelessLayer
304+
305+ def setUp(self):
306+ super(TestDistroMirrorProberFunctional, self).setUp()
307+ # Makes a clean distro mirror set, with only the mirrors we want.
308+ self.removeMirrors()
309+
310+ def removeMirrors(self):
311+ """Removes all mirror information from database."""
312+ store = IStore(DistributionMirror)
313+ store.find(MirrorProbeRecord).remove()
314+ store.find(MirrorDistroArchSeries).remove()
315+ store.find(MirrorDistroSeriesSource).remove()
316+ store.find(MirrorCDImageDistroSeries).remove()
317+ store.find(DistributionMirror).remove()
318+ store.flush()
319+
320+ def makeMirror(self, content_type, distro=None):
321+ with admin_logged_in():
322+ if distro is None:
323+ distro = self.factory.makeDistribution()
324+ distro.supports_mirrors = True
325+ self.factory.makeDistroSeries(distribution=distro)
326+ mirror = self.factory.makeMirror(
327+ distro, http_url="http://fake-url.invalid")
328+ mirror.enabled = True
329+ mirror.status = MirrorStatus.OFFICIAL
330+ mirror.official_candidate = True
331+ mirror.content = content_type
332+ return mirror
333+
334+ def test_cdimage_prober(self):
335+ """Checks that CD image prober works fine, end to end."""
336+ mirror = self.makeMirror(content_type=MirrorContent.RELEASE)
337+ transaction.commit()
338+
339+ out, err, exit_code = run_script(
340+ "cronscripts/distributionmirror-prober.py --no-remote-hosts "
341+ "--content-type=cdimage --no-owner-notification --force")
342+ self.assertEqual(0, exit_code, err)
343+
344+ lock_file = "/var/lock/launchpad-distributionmirror-prober.lock"
345+ self.assertEqual(dedent("""\
346+ INFO Creating lockfile: %s
347+ INFO Probing CD Image Mirrors
348+ INFO Probed 1 mirrors.
349+ INFO Starting to update mirrors statuses outside reactor now.
350+ INFO Done.
351+ """) % lock_file, err)
352+
353+ with admin_logged_in():
354+ record = removeSecurityProxy(mirror.last_probe_record)
355+
356+ log_lines = record.log_file.read()
357+ self.assertEqual(4, len(log_lines.split("\n")))
358+ self.assertIn(
359+ "Found all ISO images for series The Hoary Hedgehog Release "
360+ "and flavour kubuntu.", log_lines)
361+ self.assertIn(
362+ "Found all ISO images for series The Hoary Hedgehog Release "
363+ "and flavour ubuntu.", log_lines)
364+ self.assertIn(
365+ "Found all ISO images for series The Warty Warthog Release "
366+ "and flavour ubuntu.", log_lines)
367+
368+ def test_archive_prober(self):
369+ """Checks that archive prober works fine, end to end."""
370+ # Using ubuntu to avoid the need to create all the packages that
371+ # will be checked by prober.
372+ ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
373+ mirror = self.makeMirror(
374+ content_type=MirrorContent.ARCHIVE, distro=ubuntu)
375+ transaction.commit()
376+
377+ out, err, exit_code = run_script(
378+ "cronscripts/distributionmirror-prober.py --no-remote-hosts "
379+ "--content-type=archive --no-owner-notification --force")
380+ self.assertEqual(0, exit_code, err)
381+
382+ lock_file = "/var/lock/launchpad-distributionmirror-prober.lock"
383+ self.assertEqual(dedent("""\
384+ INFO Creating lockfile: %s
385+ INFO Probing Archive Mirrors
386+ INFO Probed 1 mirrors.
387+ INFO Starting to update mirrors statuses outside reactor now.
388+ INFO Done.
389+ """) % lock_file, err)
390+
391+ with admin_logged_in():
392+ record = removeSecurityProxy(mirror.last_probe_record)
393+
394+ log_lines = record.log_file.read()
395+
396+ # Make sure that prober output seems reasonable.
397+ self.assertEqual(85, len(log_lines.split("\n")))
398+ url = "http://fake-url.invalid/dists/"
399+ self.assertEqual(40, len(re.findall(
400+ (r"Ensuring MirrorDistroSeries of .* with url %s" % url) +
401+ r".* exists in the database",
402+ log_lines)))
403+ self.assertEqual(40, len(re.findall(
404+ (r"Ensuring MirrorDistroArchSeries of .* with url %s" % url) +
405+ r".* exists in the database",
406+ log_lines)))
407+ self.assertEqual(1, len(re.findall(
408+ r"Updating MirrorDistroArchSeries of .* freshness to Up to date",
409+ log_lines)))
410+ self.assertEqual(3, len(re.findall(
411+ r"Updating MirrorDistroSeries of .* freshness to Up to date",
412+ log_lines)))