Merge ~cjwatson/launchpad:codeimport-git-progress into launchpad:master

Proposed by Colin Watson on 2019-10-07
Status: Needs review
Proposed branch: ~cjwatson/launchpad:codeimport-git-progress
Merge into: launchpad:master
Diff against target: 217 lines (+141/-7)
2 files modified
lib/lp/codehosting/codeimport/tests/test_worker.py (+52/-0)
lib/lp/codehosting/codeimport/worker.py (+89/-7)
Reviewer Review Type Date Requested Status
Launchpad code reviewers 2019-10-07 Pending
Review via email: mp+373732@code.launchpad.net

Commit message

Enable throttled progress output from git-to-git import workers

Description of the change

This may help with bug 1642699, though I'm not sure yet.

Getting things to work with Python 2's disconnect between file objects and the io module is gratuitously inconvenient.

This is essentially the same as https://code.launchpad.net/~cjwatson/launchpad/codeimport-git-progress/+merge/323902, converted to git and rebased on master.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/codehosting/codeimport/tests/test_worker.py b/lib/lp/codehosting/codeimport/tests/test_worker.py
2index 8bb5b7f..1ae4ee7 100644
3--- a/lib/lp/codehosting/codeimport/tests/test_worker.py
4+++ b/lib/lp/codehosting/codeimport/tests/test_worker.py
5@@ -57,6 +57,10 @@ import scandir
6 import subvertpy
7 import subvertpy.client
8 import subvertpy.ra
9+from testtools.matchers import (
10+ ContainsAll,
11+ LessThan,
12+ )
13
14 import lp.codehosting
15 from lp.codehosting.codeimport.tarball import (
16@@ -80,6 +84,7 @@ from lp.codehosting.codeimport.worker import (
17 ForeignTreeStore,
18 get_default_bazaar_branch_store,
19 GitImportWorker,
20+ GitToGitImportWorker,
21 ImportDataStore,
22 ToBzrImportWorker,
23 )
24@@ -1356,6 +1361,53 @@ class TestBzrImport(WorkerTest, TestActualImportMixin,
25 branch.repository.get_revision(branch.last_revision()).committer)
26
27
28+class TestGitToGitImportWorker(TestCase):
29+
30+ def test_throttleProgress(self):
31+ source_details = self.factory.makeCodeImportSourceDetails(
32+ rcstype="git", target_rcstype="git")
33+ logger = BufferLogger()
34+ worker = GitToGitImportWorker(
35+ source_details, logger, AcceptAnythingPolicy())
36+ read_fd, write_fd = os.pipe()
37+ pid = os.fork()
38+ if pid == 0: # child
39+ os.close(read_fd)
40+ with os.fdopen(write_fd, "wb") as write:
41+ write.write(b"Starting\n")
42+ for i in range(50):
43+ time.sleep(0.1)
44+ write.write(("%d ...\r" % i).encode("UTF-8"))
45+ if (i % 10) == 9:
46+ write.write(
47+ ("Interval %d\n" % (i // 10)).encode("UTF-8"))
48+ write.write(b"Finishing\n")
49+ os._exit(0)
50+ else: # parent
51+ os.close(write_fd)
52+ with os.fdopen(read_fd, "rb") as read:
53+ lines = list(worker._throttleProgress(read, timeout=0.5))
54+ os.waitpid(pid, 0)
55+ # Matching the exact sequence of lines would be too brittle, but
56+ # we require some things to be true:
57+ # All the non-progress lines must be present, in the right
58+ # order.
59+ self.assertEqual(
60+ [u"Starting\n", u"Interval 0\n", u"Interval 1\n",
61+ u"Interval 2\n", u"Interval 3\n", u"Interval 4\n",
62+ u"Finishing\n"],
63+ [line for line in lines if not line.endswith(u"\r")])
64+ # No more than 15 progress lines may be present (allowing some
65+ # slack for the child process being slow).
66+ progress_lines = [line for line in lines if line.endswith(u"\r")]
67+ self.assertThat(len(progress_lines), LessThan(16))
68+ # All the progress lines immediately before interval markers
69+ # must be present.
70+ self.assertThat(
71+ progress_lines,
72+ ContainsAll([u"%d ...\r" % i for i in (9, 19, 29, 39, 49)]))
73+
74+
75 class CodeImportBranchOpenPolicyTests(TestCase):
76
77 def setUp(self):
78diff --git a/lib/lp/codehosting/codeimport/worker.py b/lib/lp/codehosting/codeimport/worker.py
79index 8916e8d..371c34d 100644
80--- a/lib/lp/codehosting/codeimport/worker.py
81+++ b/lib/lp/codehosting/codeimport/worker.py
82@@ -19,10 +19,10 @@ __all__ = [
83 'get_default_bazaar_branch_store',
84 ]
85
86-
87 import io
88 import os
89 import shutil
90+import signal
91 import subprocess
92 from urlparse import (
93 urlsplit,
94@@ -78,6 +78,7 @@ from lazr.uri import (
95 )
96 from pymacaroons import Macaroon
97 import SCM
98+import six
99
100 from lp.code.interfaces.branch import get_blacklisted_hostnames
101 from lp.codehosting.codeimport.foreigntree import CVSWorkingTree
102@@ -960,14 +961,94 @@ class BzrImportWorker(PullingImportWorker):
103 class GitToGitImportWorker(ImportWorker):
104 """An import worker for imports from Git to Git."""
105
106+ def _throttleProgress(self, file_obj, timeout=15.0):
107+ """Throttle progress messages from a file object.
108+
109+ git can produce progress output on stderr, but it produces rather a
110+ lot of it and we don't want it all to end up in logs. Throttle this
111+ so that we only produce output every `timeout` seconds, or when we
112+ see a line terminated with a newline rather than a carriage return.
113+
114+ :param file_obj: a file-like object opened in binary mode.
115+ :param timeout: emit progress output only after this many seconds
116+ have elapsed.
117+ :return: an iterator of interesting text lines read from the file.
118+ """
119+ # newline="" requests universal newlines mode, but without
120+ # translation.
121+ if six.PY2 and isinstance(file_obj, file):
122+ # A Python 2 file object can't be used directly to construct an
123+ # io.TextIOWrapper.
124+ class _ReadableFileWrapper:
125+ def __init__(self, raw):
126+ self._raw = raw
127+
128+ def __enter__(self):
129+ return self
130+
131+ def __exit__(self, exc_type, exc_value, exc_tb):
132+ pass
133+
134+ def readable(self):
135+ return True
136+
137+ def writable(self):
138+ return False
139+
140+ def seekable(self):
141+ return True
142+
143+ def __getattr__(self, name):
144+ return getattr(self._raw, name)
145+
146+ with _ReadableFileWrapper(file_obj) as readable:
147+ with io.BufferedReader(readable) as buffered:
148+ for line in self._throttleProgress(
149+ buffered, timeout=timeout):
150+ yield line
151+ return
152+
153+ class ReceivedAlarm(Exception):
154+ pass
155+
156+ def alarm_handler(signum, frame):
157+ raise ReceivedAlarm()
158+
159+ old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
160+ try:
161+ progress_buffer = None
162+ with io.TextIOWrapper(
163+ file_obj, encoding="UTF-8", errors="replace",
164+ newline="") as wrapped_file:
165+ while True:
166+ try:
167+ signal.setitimer(signal.ITIMER_REAL, timeout)
168+ line = next(wrapped_file)
169+ signal.setitimer(signal.ITIMER_REAL, 0)
170+ if line.endswith(u"\r"):
171+ progress_buffer = line
172+ else:
173+ if progress_buffer is not None:
174+ yield progress_buffer
175+ progress_buffer = None
176+ yield line
177+ except ReceivedAlarm:
178+ if progress_buffer is not None:
179+ yield progress_buffer
180+ progress_buffer = None
181+ except StopIteration:
182+ break
183+ finally:
184+ signal.setitimer(signal.ITIMER_REAL, 0)
185+ signal.signal(signal.SIGALRM, old_alarm)
186+
187 def _runGit(self, *args, **kwargs):
188 """Run git with arguments, sending output to the logger."""
189 cmd = ["git"] + list(args)
190 git_process = subprocess.Popen(
191 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs)
192- for line in git_process.stdout:
193- line = line.decode("UTF-8", "replace").rstrip("\n")
194- self._logger.info(sanitise_urls(line))
195+ for line in self._throttleProgress(git_process.stdout):
196+ self._logger.info(sanitise_urls(line.rstrip("\r\n")))
197 retcode = git_process.wait()
198 if retcode:
199 raise subprocess.CalledProcessError(retcode, cmd)
200@@ -1102,13 +1183,14 @@ class GitToGitImportWorker(ImportWorker):
201 # Push the target of HEAD first to ensure that it is always
202 # available.
203 self._runGit(
204- "push", target_url, "+%s:%s" % (new_head, new_head),
205- cwd="repository")
206+ "push", "--progress", target_url,
207+ "+%s:%s" % (new_head, new_head), cwd="repository")
208 try:
209 self._setHead(target_url, new_head)
210 except GitProtocolError as e:
211 self._logger.info("Unable to set default branch: %s" % e)
212- self._runGit("push", "--mirror", target_url, cwd="repository")
213+ self._runGit(
214+ "push", "--progress", "--mirror", target_url, cwd="repository")
215 except subprocess.CalledProcessError as e:
216 self._logger.info(
217 "Unable to push to hosting service: git push exited %s" %

Subscribers

People subscribed via source and target branches