Merge lp:~cjwatson/ubuntu-archive-publishing/parallel-germinate into lp:ubuntu-archive-publishing

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: 44
Merged at revision: 44
Proposed branch: lp:~cjwatson/ubuntu-archive-publishing/parallel-germinate
Merge into: lp:ubuntu-archive-publishing
Diff against target: 176 lines (+74/-17)
2 files modified
lib/scripts/generate_extra_overrides.py (+70/-9)
tests/test_generate_extra_overrides.py (+4/-8)
To merge this branch: bzr merge lp:~cjwatson/ubuntu-archive-publishing/parallel-germinate
Reviewer Review Type Date Requested Status
William Grant code Approve
Ubuntu Package Archive Administrators Pending
Review via email: mp+222607@code.launchpad.net

Commit message

Parallelise generate-extra-overrides by architecture, saving about two minutes off each publisher run.

Description of the change

Parallelise generate-extra-overrides by architecture, saving about two minutes off each publisher run.

I considered using the multiprocessing module, but gave up after six hours of fighting with obscure errors; it's simple enough to do by hand. Note that attempts to do this using threading instead are unlikely to scale well, as germinate is CPU-heavy and Python threads all have to share the GIL.

(This is ported from https://code.launchpad.net/~cjwatson/launchpad/parallel-germinate/+merge/220610, after extracting cron.germinate and friends from Launchpad.)

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) wrote :

This should really use multiprocessing long-term, but it works for now.

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/scripts/generate_extra_overrides.py'
--- lib/scripts/generate_extra_overrides.py 2014-06-09 13:56:53 +0000
+++ lib/scripts/generate_extra_overrides.py 2014-06-10 09:19:56 +0000
@@ -10,6 +10,7 @@
10 'GenerateExtraOverrides',10 'GenerateExtraOverrides',
11 ]11 ]
1212
13import copy
13import errno14import errno
14import fcntl15import fcntl
15from functools import partial16from functools import partial
@@ -20,9 +21,12 @@
20 OptionValueError,21 OptionValueError,
21 )22 )
22import os23import os
24import pickle
23import re25import re
26from StringIO import StringIO
24import sys27import sys
25import time28import time
29import traceback
2630
27from germinate.archive import TagFile31from germinate.archive import TagFile
28from germinate.germinator import Germinator32from germinate.germinator import Germinator
@@ -98,6 +102,21 @@
98 os.rename("%s.new" % self.filename, self.filename)102 os.rename("%s.new" % self.filename, self.filename)
99103
100104
105class BufferHandler(logging.Handler):
106 """A log handler which stores records for emission by another logger."""
107
108 def __init__(self):
109 super(BufferHandler, self).__init__()
110 self.records = []
111
112 def emit(self, record):
113 # Record arguments may not be available at the other end.
114 record_copy = copy.copy(record)
115 record_copy.msg = record.getMessage()
116 record_copy.args = None
117 self.records.append(pickle.dumps(record_copy, -1))
118
119
101def find_operable_series(distribution):120def find_operable_series(distribution):
102 """Find all the series we can operate on in this distribution.121 """Find all the series we can operate on in this distribution.
103122
@@ -276,9 +295,9 @@
276 self.germinate_logger = logging.getLogger("germinate")295 self.germinate_logger = logging.getLogger("germinate")
277 self.germinate_logger.setLevel(logging.INFO)296 self.germinate_logger.setLevel(logging.INFO)
278 self.log_file = os.path.join(self.germinateroot, "germinate.output")297 self.log_file = os.path.join(self.germinateroot, "germinate.output")
279 handler = logging.FileHandler(self.log_file, mode="w")298 self.log_handler = logging.FileHandler(self.log_file, mode="w")
280 handler.setFormatter(GerminateFormatter())299 self.log_handler.setFormatter(GerminateFormatter())
281 self.germinate_logger.addHandler(handler)300 self.germinate_logger.addHandler(self.log_handler)
282 self.germinate_logger.propagate = False301 self.germinate_logger.propagate = False
283302
284 def setUp(self):303 def setUp(self):
@@ -448,9 +467,15 @@
448 if "build-essential" in structure.names and primary_flavour:467 if "build-essential" in structure.names and primary_flavour:
449 write_overrides("build-essential", "Build-Essential", "yes")468 write_overrides("build-essential", "Build-Essential", "yes")
450469
451 def germinateArch(self, override_file, series_name, components, arch,470 def germinateArch(self, series_name, components, arch, flavours,
452 flavours, structures, seed_outputs=None):471 structures):
453 """Germinate seeds on all flavours for a single architecture."""472 """Germinate seeds on all flavours for a single architecture."""
473 # Buffer log output for each architecture so that it appears
474 # sequential.
475 self.germinate_logger.removeHandler(self.log_handler)
476 log_handler = BufferHandler()
477 self.germinate_logger.addHandler(log_handler)
478
454 germinator = Germinator(arch)479 germinator = Germinator(arch)
455480
456 # Read archive metadata.481 # Read archive metadata.
@@ -459,6 +484,8 @@
459 cleanup=True)484 cleanup=True)
460 germinator.parse_archive(archive)485 germinator.parse_archive(archive)
461486
487 override_file = StringIO()
488 seed_outputs = set()
462 for flavour in flavours:489 for flavour in flavours:
463 self.logger.info(490 self.logger.info(
464 "Germinating for %s/%s/%s", flavour, series_name, arch)491 "Germinating for %s/%s/%s", flavour, series_name, arch)
@@ -473,6 +500,20 @@
473 structures[flavour], flavour == flavours[0],500 structures[flavour], flavour == flavours[0],
474 seed_outputs=seed_outputs)501 seed_outputs=seed_outputs)
475502
503 return log_handler.records, override_file.getvalue(), seed_outputs
504
505 def germinateArchChild(self, close_in_child, wfd, *args):
506 """Helper method to call germinateArch in a forked child process."""
507 try:
508 for fd in close_in_child:
509 os.close(fd)
510 with os.fdopen(wfd, "wb") as writer:
511 pickle.dump(self.germinateArch(*args), writer, -1)
512 return 0
513 except:
514 traceback.print_exc()
515 return 1
516
476 def removeStaleOutputs(self, series_name, seed_outputs):517 def removeStaleOutputs(self, series_name, seed_outputs):
477 """Remove stale outputs for a series.518 """Remove stale outputs for a series.
478519
@@ -490,14 +531,34 @@
490 series_name, flavours, seed_bases=seed_bases)531 series_name, flavours, seed_bases=seed_bases)
491532
492 if structures:533 if structures:
534 procs = []
535 close_in_child = []
536 for arch in architectures:
537 rfd, wfd = os.pipe()
538 close_in_child.append(rfd)
539 pid = os.fork()
540 if pid == 0: # child
541 os._exit(self.germinateArchChild(
542 close_in_child, wfd,
543 series_name, components, arch, flavours, structures))
544 else: # parent
545 os.close(wfd)
546 reader = os.fdopen(rfd, "rb")
547 procs.append((pid, reader))
548
493 seed_outputs = set()549 seed_outputs = set()
494 override_path = os.path.join(550 override_path = os.path.join(
495 self.miscroot, "more-extra.override.%s.main" % series_name)551 self.miscroot, "more-extra.override.%s.main" % series_name)
496 with AtomicFile(override_path) as override_file:552 with AtomicFile(override_path) as override_file:
497 for arch in architectures:553 for pid, reader in procs:
498 self.germinateArch(554 log_records, overrides, arch_seed_outputs = pickle.load(
499 override_file, series_name, components, arch,555 reader)
500 flavours, structures, seed_outputs=seed_outputs)556 for log_record in log_records:
557 self.germinate_logger.handle(pickle.loads(log_record))
558 override_file.write(overrides)
559 seed_outputs |= arch_seed_outputs
560 reader.close()
561 os.waitpid(pid, 0)
501 self.removeStaleOutputs(series_name, seed_outputs)562 self.removeStaleOutputs(series_name, seed_outputs)
502563
503 def process(self, seed_bases=None):564 def process(self, seed_bases=None):
504565
=== modified file 'tests/test_generate_extra_overrides.py'
--- tests/test_generate_extra_overrides.py 2014-06-09 13:56:53 +0000
+++ tests/test_generate_extra_overrides.py 2014-06-10 09:19:56 +0000
@@ -11,7 +11,6 @@
11import logging11import logging
12from optparse import OptionValueError12from optparse import OptionValueError
13import os13import os
14import tempfile
15try:14try:
16 from unittest import mock15 from unittest import mock
17except ImportError:16except ImportError:
@@ -496,13 +495,10 @@
496 distroseries.name, flavours,495 distroseries.name, flavours,
497 seed_bases=["file://%s" % self.seeddir])496 seed_bases=["file://%s" % self.seeddir])
498497
499 override_fd, override_path = tempfile.mkstemp()498 _, overrides, _ = script.germinateArch(
500 with os.fdopen(override_fd, "w") as override_file:499 distroseries.name, script.getComponents(distroseries), arch,
501 script.germinateArch(500 flavours, structures)
502 override_file, distroseries.name,501 return overrides.splitlines()
503 script.getComponents(distroseries), arch, flavours,
504 structures)
505 return file_contents(override_path).splitlines()
506502
507 def test_germinate_output(self):503 def test_germinate_output(self):
508 # A single call to germinateArch produces output for all flavours on504 # A single call to germinateArch produces output for all flavours on

Subscribers

People subscribed via source and target branches