Merge lp:~cjwatson/ubuntu-archive-publishing/parallel-germinate into lp:ubuntu-archive-publishing

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: 44
Merged at revision: 44
Proposed branch: lp:~cjwatson/ubuntu-archive-publishing/parallel-germinate
Merge into: lp:ubuntu-archive-publishing
Diff against target: 176 lines (+74/-17)
2 files modified
lib/scripts/generate_extra_overrides.py (+70/-9)
tests/test_generate_extra_overrides.py (+4/-8)
To merge this branch: bzr merge lp:~cjwatson/ubuntu-archive-publishing/parallel-germinate
Reviewer Review Type Date Requested Status
William Grant code Approve
Ubuntu Package Archive Administrators Pending
Review via email: mp+222607@code.launchpad.net

Commit message

Parallelise generate-extra-overrides by architecture, saving about two minutes off each publisher run.

Description of the change

Parallelise generate-extra-overrides by architecture, saving about two minutes off each publisher run.

I considered using the multiprocessing module, but gave up after six hours of fighting with obscure errors; it's simple enough to do by hand. Note that attempts to do this using threading instead are unlikely to scale well, as germinate is CPU-heavy and Python threads all have to share the GIL.

(This is ported from https://code.launchpad.net/~cjwatson/launchpad/parallel-germinate/+merge/220610, after extracting cron.germinate and friends from Launchpad.)

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) wrote :

This should really use multiprocessing long-term, but it works for now.

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/scripts/generate_extra_overrides.py'
2--- lib/scripts/generate_extra_overrides.py 2014-06-09 13:56:53 +0000
3+++ lib/scripts/generate_extra_overrides.py 2014-06-10 09:19:56 +0000
4@@ -10,6 +10,7 @@
5 'GenerateExtraOverrides',
6 ]
7
8+import copy
9 import errno
10 import fcntl
11 from functools import partial
12@@ -20,9 +21,12 @@
13 OptionValueError,
14 )
15 import os
16+import pickle
17 import re
18+from StringIO import StringIO
19 import sys
20 import time
21+import traceback
22
23 from germinate.archive import TagFile
24 from germinate.germinator import Germinator
25@@ -98,6 +102,21 @@
26 os.rename("%s.new" % self.filename, self.filename)
27
28
29+class BufferHandler(logging.Handler):
30+ """A log handler which stores records for emission by another logger."""
31+
32+ def __init__(self):
33+ super(BufferHandler, self).__init__()
34+ self.records = []
35+
36+ def emit(self, record):
37+ # Record arguments may not be available at the other end.
38+ record_copy = copy.copy(record)
39+ record_copy.msg = record.getMessage()
40+ record_copy.args = None
41+ self.records.append(pickle.dumps(record_copy, -1))
42+
43+
44 def find_operable_series(distribution):
45 """Find all the series we can operate on in this distribution.
46
47@@ -276,9 +295,9 @@
48 self.germinate_logger = logging.getLogger("germinate")
49 self.germinate_logger.setLevel(logging.INFO)
50 self.log_file = os.path.join(self.germinateroot, "germinate.output")
51- handler = logging.FileHandler(self.log_file, mode="w")
52- handler.setFormatter(GerminateFormatter())
53- self.germinate_logger.addHandler(handler)
54+ self.log_handler = logging.FileHandler(self.log_file, mode="w")
55+ self.log_handler.setFormatter(GerminateFormatter())
56+ self.germinate_logger.addHandler(self.log_handler)
57 self.germinate_logger.propagate = False
58
59 def setUp(self):
60@@ -448,9 +467,15 @@
61 if "build-essential" in structure.names and primary_flavour:
62 write_overrides("build-essential", "Build-Essential", "yes")
63
64- def germinateArch(self, override_file, series_name, components, arch,
65- flavours, structures, seed_outputs=None):
66+ def germinateArch(self, series_name, components, arch, flavours,
67+ structures):
68 """Germinate seeds on all flavours for a single architecture."""
69+ # Buffer log output for each architecture so that it appears
70+ # sequential.
71+ self.germinate_logger.removeHandler(self.log_handler)
72+ log_handler = BufferHandler()
73+ self.germinate_logger.addHandler(log_handler)
74+
75 germinator = Germinator(arch)
76
77 # Read archive metadata.
78@@ -459,6 +484,8 @@
79 cleanup=True)
80 germinator.parse_archive(archive)
81
82+ override_file = StringIO()
83+ seed_outputs = set()
84 for flavour in flavours:
85 self.logger.info(
86 "Germinating for %s/%s/%s", flavour, series_name, arch)
87@@ -473,6 +500,20 @@
88 structures[flavour], flavour == flavours[0],
89 seed_outputs=seed_outputs)
90
91+ return log_handler.records, override_file.getvalue(), seed_outputs
92+
93+ def germinateArchChild(self, close_in_child, wfd, *args):
94+ """Helper method to call germinateArch in a forked child process."""
95+ try:
96+ for fd in close_in_child:
97+ os.close(fd)
98+ with os.fdopen(wfd, "wb") as writer:
99+ pickle.dump(self.germinateArch(*args), writer, -1)
100+ return 0
101+ except:
102+ traceback.print_exc()
103+ return 1
104+
105 def removeStaleOutputs(self, series_name, seed_outputs):
106 """Remove stale outputs for a series.
107
108@@ -490,14 +531,34 @@
109 series_name, flavours, seed_bases=seed_bases)
110
111 if structures:
112+ procs = []
113+ close_in_child = []
114+ for arch in architectures:
115+ rfd, wfd = os.pipe()
116+ close_in_child.append(rfd)
117+ pid = os.fork()
118+ if pid == 0: # child
119+ os._exit(self.germinateArchChild(
120+ close_in_child, wfd,
121+ series_name, components, arch, flavours, structures))
122+ else: # parent
123+ os.close(wfd)
124+ reader = os.fdopen(rfd, "rb")
125+ procs.append((pid, reader))
126+
127 seed_outputs = set()
128 override_path = os.path.join(
129 self.miscroot, "more-extra.override.%s.main" % series_name)
130 with AtomicFile(override_path) as override_file:
131- for arch in architectures:
132- self.germinateArch(
133- override_file, series_name, components, arch,
134- flavours, structures, seed_outputs=seed_outputs)
135+ for pid, reader in procs:
136+ log_records, overrides, arch_seed_outputs = pickle.load(
137+ reader)
138+ for log_record in log_records:
139+ self.germinate_logger.handle(pickle.loads(log_record))
140+ override_file.write(overrides)
141+ seed_outputs |= arch_seed_outputs
142+ reader.close()
143+ os.waitpid(pid, 0)
144 self.removeStaleOutputs(series_name, seed_outputs)
145
146 def process(self, seed_bases=None):
147
148=== modified file 'tests/test_generate_extra_overrides.py'
149--- tests/test_generate_extra_overrides.py 2014-06-09 13:56:53 +0000
150+++ tests/test_generate_extra_overrides.py 2014-06-10 09:19:56 +0000
151@@ -11,7 +11,6 @@
152 import logging
153 from optparse import OptionValueError
154 import os
155-import tempfile
156 try:
157 from unittest import mock
158 except ImportError:
159@@ -496,13 +495,10 @@
160 distroseries.name, flavours,
161 seed_bases=["file://%s" % self.seeddir])
162
163- override_fd, override_path = tempfile.mkstemp()
164- with os.fdopen(override_fd, "w") as override_file:
165- script.germinateArch(
166- override_file, distroseries.name,
167- script.getComponents(distroseries), arch, flavours,
168- structures)
169- return file_contents(override_path).splitlines()
170+ _, overrides, _ = script.germinateArch(
171+ distroseries.name, script.getComponents(distroseries), arch,
172+ flavours, structures)
173+ return overrides.splitlines()
174
175 def test_germinate_output(self):
176 # A single call to germinateArch produces output for all flavours on

Subscribers

People subscribed via source and target branches