Merge ~pappacena/launchpad:mirror-prober-parallelism-control into launchpad:master

Proposed by Thiago F. Pappacena
Status: Merged
Approved by: Thiago F. Pappacena
Approved revision: 7def96a144fc2cc88b7e62deab41472e86ac8156
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~pappacena/launchpad:mirror-prober-parallelism-control
Merge into: launchpad:master
Diff against target: 274 lines (+63/-38)
3 files modified
cronscripts/distributionmirror-prober.py (+20/-2)
lib/lp/registry/scripts/distributionmirror_prober.py (+30/-23)
lib/lp/registry/tests/test_distributionmirror_prober.py (+13/-13)
Reviewer Review Type Date Requested Status
Colin Watson (community) Approve
Review via email: mp+380777@code.launchpad.net

Commit message

Adding options to distribution mirror prober to allow us to easily control maximum number of requests in parallel that should be issued.

Description of the change

This change is basically removing the 2 hardcoded constants that control amount of parallel requests (in general and per host) and creating 2 parameters at the script.

The major change was having the RequestManager to create the locks on __init__ instead of a global one, and sharing the same RequestManager on probe_archive_mirror and updateMirrorFreshness.

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) :
review: Approve
Revision history for this message
Thiago F. Pappacena (pappacena) wrote :

Pushed the requested change. I'll top-approve the MP now.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/cronscripts/distributionmirror-prober.py b/cronscripts/distributionmirror-prober.py
2index 6d38880..513ba38 100755
3--- a/cronscripts/distributionmirror-prober.py
4+++ b/cronscripts/distributionmirror-prober.py
5@@ -1,6 +1,6 @@
6 #!/usr/bin/python -S
7 #
8-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
9+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
10 # GNU Affero General Public License version 3 (see the file LICENSE).
11
12 """Script to probe distribution mirrors and check how up-to-date they are."""
13@@ -38,6 +38,23 @@ class DistroMirrorProberScript(LaunchpadCronScript):
14 dest='max_mirrors', default=None, action='store', type="int",
15 help='Only probe N mirrors.')
16
17+ # IMPORTANT: Don't change this unless you really know what you're
18+ # doing. Using a too big value can cause spurious failures on lots of
19+ # mirrors and a too small one can cause the prober to run for hours.
20+ self.parser.add_option('--max-parallel-per-host',
21+ dest='max_parallel_per_host', default=2,
22+ action='store', type="int",
23+ help='Keep maximum N parallel requests per host at a time.'
24+ ' (default=2)')
25+
26+ # We limit the overall number of simultaneous requests as well to
27+ # prevent them from stalling and timing out before they even get a
28+ # chance to start connecting.
29+ self.parser.add_option('--max-parallel',
30+ dest='max_parallel', default=100,
31+ action='store', type="int",
32+ help='Keep maximum N parallel requests at a time (default=100).')
33+
34 def main(self):
35 if self.options.content_type == 'archive':
36 content_type = MirrorContent.ARCHIVE
37@@ -52,7 +69,8 @@ class DistroMirrorProberScript(LaunchpadCronScript):
38 lambda: config.distributionmirrorprober.timeout)
39 DistroMirrorProber(self.txn, self.logger).probe(
40 content_type, self.options.no_remote_hosts, self.options.force,
41- self.options.max_mirrors, not self.options.no_owner_notification)
42+ self.options.max_mirrors, not self.options.no_owner_notification,
43+ self.options.max_parallel, self.options.max_parallel_per_host)
44
45
46 if __name__ == '__main__':
47diff --git a/lib/lp/registry/scripts/distributionmirror_prober.py b/lib/lp/registry/scripts/distributionmirror_prober.py
48index 5b5d424..a6350ca 100644
49--- a/lib/lp/registry/scripts/distributionmirror_prober.py
50+++ b/lib/lp/registry/scripts/distributionmirror_prober.py
51@@ -81,17 +81,6 @@ invalid_certificate_hosts = set()
52
53 MAX_REDIRECTS = 3
54
55-# Number of simultaneous requests we issue on a given host.
56-# IMPORTANT: Don't change this unless you really know what you're doing. Using
57-# a too big value can cause spurious failures on lots of mirrors and a too
58-# small one can cause the prober to run for hours.
59-PER_HOST_REQUESTS = 2
60-
61-# We limit the overall number of simultaneous requests as well to prevent
62-# them from stalling and timing out before they even get a chance to
63-# start connecting.
64-OVERALL_REQUESTS = 100
65-
66
67 class LoggingMixin:
68 """Common logging class for archive and releases mirror messages."""
69@@ -110,12 +99,15 @@ class LoggingMixin:
70
71 class RequestManager:
72
73- overall_semaphore = DeferredSemaphore(OVERALL_REQUESTS)
74-
75 # Yes, I want a mutable class attribute because I want changes done in an
76 # instance to be visible in other instances as well.
77 host_locks = {}
78
79+ def __init__(self, max_parallel, max_parallel_per_host):
80+ self.max_parallel = max_parallel
81+ self.max_parallel_per_host = max_parallel_per_host
82+ self.overall_semaphore = DeferredSemaphore(max_parallel)
83+
84 def run(self, host, probe_func):
85 # Use a MultiLock with one semaphore limiting the overall
86 # connections and another limiting the per-host connections.
87@@ -123,7 +115,8 @@ class RequestManager:
88 multi_lock = self.host_locks[host]
89 else:
90 multi_lock = MultiLock(
91- self.overall_semaphore, DeferredSemaphore(PER_HOST_REQUESTS))
92+ self.overall_semaphore,
93+ DeferredSemaphore(self.max_parallel_per_host))
94 self.host_locks[host] = multi_lock
95 return multi_lock.run(probe_func)
96
97@@ -639,7 +632,7 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
98 self.logMessage(msg)
99 return mirror
100
101- def updateMirrorFreshness(self, arch_or_source_mirror):
102+ def updateMirrorFreshness(self, arch_or_source_mirror, request_manager):
103 """Update the freshness of this MirrorDistro{ArchSeries,SeriesSource}.
104
105 This is done by issuing HTTP HEAD requests on that mirror looking for
106@@ -664,7 +657,6 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
107 self.deleteMethod(self.series, self.pocket, self.component)
108 return
109
110- request_manager = RequestManager()
111 deferredList = []
112 # We start setting the freshness to unknown, and then we move on
113 # trying to find one of the recently published packages mirrored
114@@ -836,7 +828,8 @@ def checkComplete(result, key, unchecked_keys):
115 return result
116
117
118-def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
119+def probe_archive_mirror(mirror, logfile, unchecked_keys, logger,
120+ max_parallel, max_parallel_per_host):
121 """Probe an archive mirror for its contents and freshness.
122
123 First we issue a set of HTTP HEAD requests on some key files to find out
124@@ -850,7 +843,7 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
125 packages_paths = mirror.getExpectedPackagesPaths()
126 sources_paths = mirror.getExpectedSourcesPaths()
127 all_paths = itertools.chain(packages_paths, sources_paths)
128- request_manager = RequestManager()
129+ request_manager = RequestManager(max_parallel, max_parallel_per_host)
130 for series, pocket, component, path in all_paths:
131 url = urljoin(base_url, path)
132 callbacks = ArchiveMirrorProberCallbacks(
133@@ -864,13 +857,14 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
134 deferred.addCallbacks(
135 callbacks.ensureMirrorSeries, callbacks.deleteMirrorSeries)
136
137- deferred.addCallback(callbacks.updateMirrorFreshness)
138+ deferred.addCallback(callbacks.updateMirrorFreshness, request_manager)
139 deferred.addErrback(logger.error)
140
141 deferred.addBoth(checkComplete, url, unchecked_keys)
142
143
144-def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger):
145+def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
146+ max_parallel, max_parallel_per_host):
147 """Probe a cdimage mirror for its contents.
148
149 This is done by checking the list of files for each flavour and series
150@@ -900,7 +894,7 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger):
151 mirror_key = (series, flavour)
152 unchecked_keys.append(mirror_key)
153 deferredList = []
154- request_manager = RequestManager()
155+ request_manager = RequestManager(max_parallel, max_parallel_per_host)
156 for path in paths:
157 url = urljoin(base_url, path)
158 # Use a RedirectAwareProberFactory because CD mirrors are allowed
159@@ -971,9 +965,17 @@ class DistroMirrorProber:
160 mirror.newProbeRecord(log_file)
161
162 def probe(self, content_type, no_remote_hosts, ignore_last_probe,
163- max_mirrors, notify_owner):
164+ max_mirrors, notify_owner, max_parallel=100,
165+ max_parallel_per_host=2):
166 """Probe distribution mirrors.
167
168+ You should control carefully the parallelism here. Increasing too
169+ much the number of max_parallel_per_host could make the mirrors take
170+ too much to answer or deny our requests.
171+
172+ If we increase too much the max_parallel, we might experience timeouts
173+ because of our production proxy or internet bandwidth.
174+
175 :param content_type: The type of mirrored content, as a
176 `MirrorContent`.
177 :param no_remote_hosts: If True, restrict access to localhost.
178@@ -983,6 +985,10 @@ class DistroMirrorProber:
179 no maximum.
180 :param notify_owner: Send failure notification to the owners of the
181 mirrors.
182+ :param max_parallel: Maximum number of requests happening
183+ simultaneously.
184+ :param max_parallel_per_host: Maximum number of requests to the same
185+ host happening simultaneously.
186 """
187 if content_type == MirrorContent.ARCHIVE:
188 probe_function = probe_archive_mirror
189@@ -1033,7 +1039,8 @@ class DistroMirrorProber:
190 probed_mirrors.append(mirror)
191 logfile = StringIO()
192 logfiles[mirror_id] = logfile
193- probe_function(mirror, logfile, unchecked_keys, self.logger)
194+ probe_function(mirror, logfile, unchecked_keys, self.logger,
195+ max_parallel, max_parallel_per_host)
196
197 if probed_mirrors:
198 reactor.run()
199diff --git a/lib/lp/registry/tests/test_distributionmirror_prober.py b/lib/lp/registry/tests/test_distributionmirror_prober.py
200index 82db551..c674ac0 100644
201--- a/lib/lp/registry/tests/test_distributionmirror_prober.py
202+++ b/lib/lp/registry/tests/test_distributionmirror_prober.py
203@@ -56,8 +56,6 @@ from lp.registry.scripts.distributionmirror_prober import (
204 MIN_REQUESTS_TO_CONSIDER_RATIO,
205 MirrorCDImageProberCallbacks,
206 MultiLock,
207- OVERALL_REQUESTS,
208- PER_HOST_REQUESTS,
209 probe_archive_mirror,
210 probe_cdimage_mirror,
211 ProberFactory,
212@@ -1052,7 +1050,7 @@ class TestProbeFunctionSemaphores(TestCase):
213 # Note that calling this function won't actually probe any mirrors; we
214 # need to call reactor.run() to actually start the probing.
215 with default_timeout(15.0):
216- probe_cdimage_mirror(mirror, StringIO(), [], logging)
217+ probe_cdimage_mirror(mirror, StringIO(), [], logging, 100, 2)
218 self.assertEqual(0, len(mirror.cdimage_series))
219
220 def test_archive_mirror_probe_function(self):
221@@ -1081,43 +1079,45 @@ class TestProbeFunctionSemaphores(TestCase):
222 The given probe_function must be either probe_cdimage_mirror or
223 probe_archive_mirror.
224 """
225- request_manager = RequestManager()
226+ max_per_host_requests = 2
227+ max_requests = 100
228+ request_manager = RequestManager(max_requests, max_per_host_requests)
229 mirror1_host = URI(mirror1.base_url).host
230 mirror2_host = URI(mirror2.base_url).host
231 mirror3_host = URI(mirror3.base_url).host
232
233- probe_function(mirror1, StringIO(), [], logging)
234+ probe_function(mirror1, StringIO(), [], logging, 100, 2)
235 # Since we have a single mirror to probe we need to have a single
236- # DeferredSemaphore with a limit of PER_HOST_REQUESTS, to ensure we
237+ # DeferredSemaphore with a limit of max_per_host_requests, to ensure we
238 # don't issue too many simultaneous connections on that host.
239 self.assertEqual(len(request_manager.host_locks), 1)
240 multi_lock = request_manager.host_locks[mirror1_host]
241- self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
242+ self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
243 # Note that our multi_lock contains another semaphore to control the
244 # overall number of requests.
245- self.assertEqual(multi_lock.overall_lock.limit, OVERALL_REQUESTS)
246+ self.assertEqual(multi_lock.overall_lock.limit, max_requests)
247
248- probe_function(mirror2, StringIO(), [], logging)
249+ probe_function(mirror2, StringIO(), [], logging, 100, 2)
250 # Now we have two mirrors to probe, but they have the same hostname,
251 # so we'll still have a single semaphore in host_semaphores.
252 self.assertEqual(mirror2_host, mirror1_host)
253 self.assertEqual(len(request_manager.host_locks), 1)
254 multi_lock = request_manager.host_locks[mirror2_host]
255- self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
256+ self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
257
258- probe_function(mirror3, StringIO(), [], logging)
259+ probe_function(mirror3, StringIO(), [], logging, 100, 2)
260 # This third mirror is on a separate host, so we'll have a second
261 # semaphore added to host_semaphores.
262 self.assertTrue(mirror3_host != mirror1_host)
263 self.assertEqual(len(request_manager.host_locks), 2)
264 multi_lock = request_manager.host_locks[mirror3_host]
265- self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
266+ self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
267
268 # When using an http_proxy, even though we'll actually connect to the
269 # proxy, we'll use the mirror's host as the key to find the semaphore
270 # that should be used
271 self.pushConfig('launchpad', http_proxy='http://squid.internal:3128/')
272- probe_function(mirror3, StringIO(), [], logging)
273+ probe_function(mirror3, StringIO(), [], logging, 100, 2)
274 self.assertEqual(len(request_manager.host_locks), 2)
275
276