Merge lp:~zyga/checkbox/fix-1443911 into lp:checkbox

Proposed by Zygmunt Krynicki
Status: Merged
Approved by: Maciej Kisielewski
Approved revision: 3690
Merged at revision: 3690
Proposed branch: lp:~zyga/checkbox/fix-1443911
Merge into: lp:checkbox
Diff against target: 283 lines (+105/-14)
5 files modified
plainbox/plainbox/impl/commands/inv_run.py (+2/-2)
plainbox/plainbox/impl/providers/manifest/units/manifest.pxu (+1/-1)
plainbox/plainbox/impl/runner.py (+14/-1)
plainbox/plainbox/vendor/extcmd/__init__.py (+66/-6)
plainbox/plainbox/vendor/extcmd/glibc.py (+22/-4)
To merge this branch: bzr merge lp:~zyga/checkbox/fix-1443911
Reviewer Review Type Date Requested Status
Maciej Kisielewski Approve
Review via email: mp+256138@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Maciej Kisielewski (kissiel) wrote :

+1, altough chunk size of 1 somewhat unsettles me.

review: Approve
Revision history for this message
Zygmunt Krynicki (zyga) wrote :

1 is the only that works as long as we are stuck in blocking mode.

I would like to switch to glibc-based code but we first need to solve
the waiting-for-process-from-a-thread issue.

On Wed, Apr 15, 2015 at 11:58 AM, Maciej Kisielewski
<email address hidden> wrote:
> Review: Approve
>
> +1, altough chunk size of 1 somewhat unsettles me.
>
> --
> https://code.launchpad.net/~zyga/checkbox/fix-1443911/+merge/256138
> You are the owner of lp:~zyga/checkbox/fix-1443911.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'plainbox/plainbox/impl/commands/inv_run.py'
2--- plainbox/plainbox/impl/commands/inv_run.py 2015-03-31 07:26:46 +0000
3+++ plainbox/plainbox/impl/commands/inv_run.py 2015-04-14 13:10:26 +0000
4@@ -197,8 +197,8 @@
5 'stdout': sys.stdout,
6 'stderr': sys.stderr
7 }[stream_name]
8- print(self.C.BLACK(line.decode("UTF-8", "ignore").rstrip('\n')),
9- file=stream)
10+ print(self.C.BLACK(line.decode("UTF-8", "ignore")),
11+ end='', flush=True, file=stream)
12
13 def finished_executing_program(self, returncode):
14 if self.show_cmd_output:
15
16=== modified file 'plainbox/plainbox/impl/providers/manifest/units/manifest.pxu'
17--- plainbox/plainbox/impl/providers/manifest/units/manifest.pxu 2015-03-30 17:36:35 +0000
18+++ plainbox/plainbox/impl/providers/manifest/units/manifest.pxu 2015-04-14 13:10:26 +0000
19@@ -8,7 +8,7 @@
20 plugin: user-interact
21 command: plainbox-manifest-collect
22 estimated_duration: 30
23-flags: preserve-locale
24+flags: preserve-locale use-chunked-io
25
26 unit: job
27 id: manifest
28
29=== modified file 'plainbox/plainbox/impl/runner.py'
30--- plainbox/plainbox/impl/runner.py 2015-04-02 14:44:57 +0000
31+++ plainbox/plainbox/impl/runner.py 2015-04-14 13:10:26 +0000
32@@ -243,6 +243,15 @@
33 if self.ui is not None:
34 self.ui.got_program_output(stream_name, line)
35
36+ def on_chunk(self, stream_name, chunk):
37+ """
38+ Internal method of extcmd.DelegateBase
39+
40+ Called for each chunk of output.
41+ """
42+ if self.ui is not None:
43+ self.ui.got_program_output(stream_name, chunk)
44+
45
46 class JobRunner(IJobRunner):
47 """
48@@ -862,7 +871,11 @@
49 # Create a subprocess.Popen() like object that uses the delegate
50 # system to observe all IO as it occurs in real time.
51 delegate_cls = self._get_delegate_cls(config)
52- extcmd_popen = delegate_cls(delegate)
53+ flags = 0
54+ # Use chunked IO for jobs that explicitly request this
55+ if 'use-chunked-io' in job.get_flag_set():
56+ flags |= extcmd.CHUNKED_IO
57+ extcmd_popen = delegate_cls(delegate, flags=flags)
58 # Stream all IOLogRecord entries to disk
59 record_path = os.path.join(
60 self._jobs_io_log_dir, "{}.record.gz".format(
61
62=== modified file 'plainbox/plainbox/vendor/extcmd/__init__.py'
63--- plainbox/plainbox/vendor/extcmd/__init__.py 2014-12-12 10:13:29 +0000
64+++ plainbox/plainbox/vendor/extcmd/__init__.py 2015-04-14 13:10:26 +0000
65@@ -291,6 +291,18 @@
66 def on_line(self, stream_name, line):
67 """
68 Callback invoked for each line of the output
69+
70+ This method is only called when the ``CHUNKED_IO`` flag is **not**
71+ passed to extcmd. Otherwise :meth:`on_chunk()` will be called instead.
72+ """
73+
74+ @abc.abstractmethod
75+ def on_chunk(self, stream_name, chunk):
76+ """
77+ Callback invoked on each chunk of the input
78+
79+ This method is only called when the ``CHUNKED_IO`` flag is passed to
80+ extcmd. Otherwise :meth:`on_line()` will be called instead.
81 """
82
83 @abc.abstractmethod
84@@ -327,6 +339,11 @@
85 Do nothing
86 """
87
88+ def on_chunk(self, stream_name, chunk):
89+ """
90+ Do nothing
91+ """
92+
93 def on_end(self, returncode):
94 """
95 Do nothing
96@@ -379,6 +396,13 @@
97 if hasattr(self._delegate, "on_line"):
98 self._delegate.on_line(stream_name, line)
99
100+ def on_chunk(self, stream_name, chunk):
101+ """
102+ Call on_chunk() on the wrapped delegate if supported
103+ """
104+ if hasattr(self._delegate, "on_chunk"):
105+ self._delegate.on_chunk(stream_name, chunk)
106+
107 def on_end(self, returncode):
108 """
109 Call on_end() on the wrapped delegate if supported
110@@ -411,6 +435,9 @@
111 return cls(delegate)
112
113
114+CHUNKED_IO = 1
115+
116+
117 class ExternalCommandWithDelegate(ExternalCommand):
118 """
119 The actually interesting subclass of ExternalCommand.
120@@ -426,17 +453,19 @@
121 very heavyweight but (yay) works portably for windows. A unix-specific
122 subclass implementing this with _just_ poll could be provided with the
123 same interface.
124-
125 """
126
127- def __init__(self, delegate, killsig=signal.SIGINT):
128+ def __init__(self, delegate, killsig=signal.SIGINT, flags=0):
129 """
130 Set the delegate helper. Technically it needs to have a 'on_line()'
131 method. For actual example code look at :class:`Tee`.
132 """
133+ _logger.debug("ExternalCommandWithDelegate(%r, killsig=%r, flags=%x)",
134+ delegate, killsig, flags)
135 self._queue = Queue()
136 self._delegate = SafeDelegate.wrap_if_needed(delegate)
137 self._killsig = killsig
138+ self._flags = flags
139
140 def call(self, *args, **kwargs):
141 """
142@@ -567,7 +596,10 @@
143 _logger.debug("_read_stream(%r, %r) entering", stream, stream_name)
144 while True:
145 try:
146- line = stream.readline()
147+ if self._flags & CHUNKED_IO:
148+ data = stream.read(1)
149+ else:
150+ data = stream.readline()
151 except (IOError, ValueError):
152 # Ignore IOError and ValueError that may be raised if
153 # the stream was closed this can happen if the process exits
154@@ -575,9 +607,9 @@
155 # starts to close both of the streams
156 break
157 else:
158- if len(line) == 0:
159+ if len(data) == 0:
160 break
161- cmd = (stream_name, line)
162+ cmd = (stream_name, data)
163 self._queue.put(cmd)
164 _logger.debug("_read_stream(%r, %r) exiting", stream, stream_name)
165
166@@ -587,7 +619,10 @@
167 args = self._queue.get()
168 if args is None:
169 break
170- self._delegate.on_line(*args)
171+ if self._flags & CHUNKED_IO:
172+ self._delegate.on_chunk(*args)
173+ else:
174+ self._delegate.on_line(*args)
175 _logger.debug("_drain_queue() exiting")
176
177
178@@ -627,6 +662,13 @@
179 for delegate in self.delegate_list:
180 delegate.on_line(stream_name, line)
181
182+ def on_chunk(self, stream_name, chunk):
183+ """
184+ Call the on_line() method on each delegate in the list
185+ """
186+ for delegate in self.delegate_list:
187+ delegate.on_chunk(stream_name, chunk)
188+
189 def on_end(self, returncode):
190 """
191 Call the on_end() method on each delegate in the list
192@@ -677,6 +719,16 @@
193 else:
194 self._stderr.write(line)
195
196+ def on_chunk(self, stream_name, chunk):
197+ """
198+ Write each chunk, verbatim, to the desired stream.
199+ """
200+ assert stream_name == 'stdout' or stream_name == 'stderr'
201+ if stream_name == 'stdout':
202+ self._stdout.write(chunk)
203+ else:
204+ self._stderr.write(chunk)
205+
206 def on_end(self, returncode):
207 """
208 Close the output streams if requested
209@@ -724,6 +776,14 @@
210 transformed_line = self._callback(stream_name, line)
211 self._delegate.on_line(stream_name, transformed_line)
212
213+ def on_chunk(self, stream_name, chunk):
214+ """
215+ Transform each chunk by calling callback(stream_name, chunk) and pass
216+ it down to the subsequent delegate.
217+ """
218+ transformed_chunk = self._callback(stream_name, chunk)
219+ self._delegate.on_chunk(stream_name, transformed_chunk)
220+
221 def on_begin(self, args, kwargs):
222 self._delegate.on_begin(args, kwargs)
223
224
225=== modified file 'plainbox/plainbox/vendor/extcmd/glibc.py'
226--- plainbox/plainbox/vendor/extcmd/glibc.py 2014-12-15 20:50:24 +0000
227+++ plainbox/plainbox/vendor/extcmd/glibc.py 2015-04-14 13:10:26 +0000
228@@ -46,6 +46,7 @@
229
230 from plainbox.vendor.extcmd import ExternalCommand
231 from plainbox.vendor.extcmd import SafeDelegate
232+from plainbox.vendor.extcmd import CHUNKED_IO
233
234 _logger = logging.getLogger("plainbox.vendor.extcmd")
235 _bug_logger = logging.getLogger("plainbox.bug")
236@@ -79,9 +80,10 @@
237 multi-threaded application.
238 """
239
240- def __init__(self, delegate, killsig=signal.SIGINT):
241+ def __init__(self, delegate, killsig=signal.SIGINT, flags=0):
242 self._delegate = SafeDelegate.wrap_if_needed(delegate)
243 self._killsig = killsig
244+ self._flags = flags
245
246 def call(self, args, bufsize=-1, executable=None,
247 stdin=None, stdout=None, stderr=None,
248@@ -215,7 +217,18 @@
249 _logger.debug("Sending SIGQUIT to process %d", pid)
250 os.kill(pid, signal.SIGQUIT)
251
252- def _read_pipe(self, fd, name, buffer_map, force_last):
253+ def _read_pipe_chunked(self, fd, name, force_last):
254+ assert name in ('stdout', 'stderr')
255+ pipe_size = fcntl.fcntl(fd, F_GETPIPE_SZ)
256+ _logger.debug("Reading at most %d bytes of data from %s pipe",
257+ pipe_size, name)
258+ data = os.read(fd, pipe_size)
259+ done_reading = force_last or len(data) == 0
260+ _logger.debug("Read %d bytes of data from %s", len(data), name)
261+ self._delegate.on_chunk(name, data)
262+ return done_reading
263+
264+ def _read_pipe_lines(self, fd, name, buffer_map, force_last):
265 assert name in ('stdout', 'stderr')
266 pipe_size = fcntl.fcntl(fd, F_GETPIPE_SZ)
267 _logger.debug("Reading at most %d bytes of data from %s pipe",
268@@ -291,8 +304,13 @@
269 # could have written and don't wait forever if the pipe
270 # has leaked.
271 force_last = 'proc' not in waiting_for
272- if self._read_pipe(key.fd, key.data, buffer_map,
273- force_last):
274+ if self._flags & CHUNKED_IO:
275+ is_done = self._read_pipe_chunked(
276+ key.fd, key.data, force_last)
277+ else:
278+ is_done = self._read_pipe_lines(
279+ key.fd, key.data, buffer_map, force_last)
280+ if is_done:
281 _logger.debug(
282 "pipe %s depleted, unregistering and closing",
283 key.data)

Subscribers

People subscribed via source and target branches