Merge lp:~cjwatson/launchpad/codeimport-git-progress into lp:launchpad

Proposed by Colin Watson
Status: Rejected
Rejected by: Colin Watson
Proposed branch: lp:~cjwatson/launchpad/codeimport-git-progress
Merge into: lp:launchpad
Diff against target: 221 lines (+140/-8)
2 files modified
lib/lp/codehosting/codeimport/tests/test_worker.py (+50/-0)
lib/lp/codehosting/codeimport/worker.py (+90/-8)
To merge this branch: bzr merge lp:~cjwatson/launchpad/codeimport-git-progress
Reviewer Review Type Date Requested Status
Launchpad code reviewers Pending
Review via email: mp+323902@code.launchpad.net

Commit message

Enable throttled progress output from git-to-git import workers.

Description of the change

This may help with bug 1642699, though I'm not sure yet.

Getting things to work with Python 2's disconnect between file objects and the io module is gratuitously inconvenient.

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) wrote :

Unmerged revisions

18374. By Colin Watson

Enable throttled progress output from git-to-git import workers.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/codehosting/codeimport/tests/test_worker.py'
2--- lib/lp/codehosting/codeimport/tests/test_worker.py 2017-01-12 18:01:56 +0000
3+++ lib/lp/codehosting/codeimport/tests/test_worker.py 2017-05-11 12:10:44 +0000
4@@ -50,7 +50,9 @@
5 import subvertpy.client
6 import subvertpy.ra
7 from testtools.matchers import (
8+ ContainsAll,
9 Equals,
10+ LessThan,
11 Matcher,
12 MatchesListwise,
13 Mismatch,
14@@ -87,6 +89,7 @@
15 ForeignTreeStore,
16 get_default_bazaar_branch_store,
17 GitImportWorker,
18+ GitToGitImportWorker,
19 ImportDataStore,
20 ToBzrImportWorker,
21 )
22@@ -1304,6 +1307,53 @@
23 branch.repository.get_revision(branch.last_revision()).committer)
24
25
26+class TestGitToGitImportWorker(TestCase):
27+
28+ def test_throttleProgress(self):
29+ source_details = self.factory.makeCodeImportSourceDetails(
30+ rcstype="git", target_rcstype="git")
31+ logger = BufferLogger()
32+ worker = GitToGitImportWorker(
33+ source_details, logger, AcceptAnythingPolicy())
34+ read_fd, write_fd = os.pipe()
35+ pid = os.fork()
36+ if pid == 0: # child
37+ os.close(read_fd)
38+ with os.fdopen(write_fd, "wb") as write:
39+ write.write(b"Starting\n")
40+ for i in range(50):
41+ time.sleep(0.1)
42+ write.write(("%d ...\r" % i).encode("UTF-8"))
43+ if (i % 10) == 9:
44+ write.write(
45+ ("Interval %d\n" % (i // 10)).encode("UTF-8"))
46+ write.write(b"Finishing\n")
47+ os._exit(0)
48+ else: # parent
49+ os.close(write_fd)
50+ with os.fdopen(read_fd, "rb") as read:
51+ lines = list(worker._throttleProgress(read, timeout=0.5))
52+ os.waitpid(pid, 0)
53+ # Matching the exact sequence of lines would be too brittle, but
54+ # we require some things to be true:
55+ # All the non-progress lines must be present, in the right
56+ # order.
57+ self.assertEqual(
58+ [u"Starting\n", u"Interval 0\n", u"Interval 1\n",
59+ u"Interval 2\n", u"Interval 3\n", u"Interval 4\n",
60+ u"Finishing\n"],
61+ [line for line in lines if not line.endswith(u"\r")])
62+ # No more than 15 progress lines may be present (allowing some
63+ # slack for the child process being slow).
64+ progress_lines = [line for line in lines if line.endswith(u"\r")]
65+ self.assertThat(len(progress_lines), LessThan(16))
66+ # All the progress lines immediately before interval markers
67+ # must be present.
68+ self.assertThat(
69+ progress_lines,
70+ ContainsAll([u"%d ...\r" % i for i in (9, 19, 29, 39, 49)]))
71+
72+
73 class CodeImportBranchOpenPolicyTests(TestCase):
74
75 def setUp(self):
76
77=== modified file 'lib/lp/codehosting/codeimport/worker.py'
78--- lib/lp/codehosting/codeimport/worker.py 2016-11-14 18:29:17 +0000
79+++ lib/lp/codehosting/codeimport/worker.py 2017-05-11 12:10:44 +0000
80@@ -1,4 +1,4 @@
81-# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
82+# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
83 # GNU Affero General Public License version 3 (see the file LICENSE).
84
85 """The code import worker. This imports code from foreign repositories."""
86@@ -19,10 +19,10 @@
87 'get_default_bazaar_branch_store',
88 ]
89
90-
91 import io
92 import os
93 import shutil
94+import signal
95 import subprocess
96 from urlparse import (
97 urlsplit,
98@@ -73,6 +73,7 @@
99 )
100 from pymacaroons import Macaroon
101 import SCM
102+import six
103 from zope.component import getUtility
104 from zope.security.proxy import removeSecurityProxy
105
106@@ -987,14 +988,94 @@
107 class GitToGitImportWorker(ImportWorker):
108 """An import worker for imports from Git to Git."""
109
110+ def _throttleProgress(self, file_obj, timeout=15.0):
111+ """Throttle progress messages from a file object.
112+
113+ git can produce progress output on stderr, but it produces rather a
114+ lot of it and we don't want it all to end up in logs. Throttle this
115+ so that we only produce output every `timeout` seconds, or when we
116+ see a line terminated with a newline rather than a carriage return.
117+
118+ :param file_obj: a file-like object opened in binary mode.
119+ :param timeout: emit progress output only after this many seconds
120+ have elapsed.
121+ :return: an iterator of interesting text lines read from the file.
122+ """
123+ # newline="" requests universal newlines mode, but without
124+ # translation.
125+ if six.PY2 and isinstance(file_obj, file):
126+ # A Python 2 file object can't be used directly to construct an
127+ # io.TextIOWrapper.
128+ class _ReadableFileWrapper:
129+ def __init__(self, raw):
130+ self._raw = raw
131+
132+ def __enter__(self):
133+ return self
134+
135+ def __exit__(self, exc_type, exc_value, exc_tb):
136+ pass
137+
138+ def readable(self):
139+ return True
140+
141+ def writable(self):
142+ return False
143+
144+ def seekable(self):
145+ return True
146+
147+ def __getattr__(self, name):
148+ return getattr(self._raw, name)
149+
150+ with _ReadableFileWrapper(file_obj) as readable:
151+ with io.BufferedReader(readable) as buffered:
152+ for line in self._throttleProgress(
153+ buffered, timeout=timeout):
154+ yield line
155+ return
156+
157+ class ReceivedAlarm(Exception):
158+ pass
159+
160+ def alarm_handler(signum, frame):
161+ raise ReceivedAlarm()
162+
163+ old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
164+ try:
165+ progress_buffer = None
166+ with io.TextIOWrapper(
167+ file_obj, encoding="UTF-8", errors="replace",
168+ newline="") as wrapped_file:
169+ while True:
170+ try:
171+ signal.setitimer(signal.ITIMER_REAL, timeout)
172+ line = next(wrapped_file)
173+ signal.setitimer(signal.ITIMER_REAL, 0)
174+ if line.endswith(u"\r"):
175+ progress_buffer = line
176+ else:
177+ if progress_buffer is not None:
178+ yield progress_buffer
179+ progress_buffer = None
180+ yield line
181+ except ReceivedAlarm:
182+ if progress_buffer is not None:
183+ yield progress_buffer
184+ progress_buffer = None
185+ except StopIteration:
186+ break
187+ finally:
188+ signal.setitimer(signal.ITIMER_REAL, 0)
189+ signal.signal(signal.SIGALRM, old_alarm)
190+
191 def _runGit(self, *args, **kwargs):
192 """Run git with arguments, sending output to the logger."""
193 cmd = ["git"] + list(args)
194 git_process = subprocess.Popen(
195 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs)
196- for line in git_process.stdout:
197- line = line.decode("UTF-8", "replace").rstrip("\n")
198- self._logger.info(sanitise_urls(line))
199+ for line in self._throttleProgress(git_process.stdout):
200+ self._logger.info(sanitise_urls(line.rstrip("\r\n")))
201 retcode = git_process.wait()
202 if retcode:
203 raise subprocess.CalledProcessError(retcode, cmd)
204@@ -1127,13 +1208,14 @@
205 # Push the target of HEAD first to ensure that it is always
206 # available.
207 self._runGit(
208- "push", target_url, "+%s:%s" % (new_head, new_head),
209- cwd="repository")
210+ "push", "--progress", target_url,
211+ "+%s:%s" % (new_head, new_head), cwd="repository")
212 try:
213 self._setHead(target_url, new_head)
214 except GitProtocolError as e:
215 self._logger.info("Unable to set default branch: %s" % e)
216- self._runGit("push", "--mirror", target_url, cwd="repository")
217+ self._runGit(
218+ "push", "--progress", "--mirror", target_url, cwd="repository")
219 except subprocess.CalledProcessError as e:
220 self._logger.info(
221 "Unable to push to hosting service: git push exited %s" %