Merge lp:~mvo/click/acquire-asyncio into lp:click/devel

Proposed by Michael Vogt
Status: Needs review
Proposed branch: lp:~mvo/click/acquire-asyncio
Merge into: lp:click/devel
Prerequisite: lp:~mvo/click/acquire
Diff against target: 272 lines (+77/-100)
2 files modified
click/acquire.py (+69/-75)
click/tests/test_acquire.py (+8/-25)
To merge this branch: bzr merge lp:~mvo/click/acquire-asyncio
Reviewer Review Type Date Requested Status
PS Jenkins bot (community) continuous-integration Needs Fixing
click hackers Pending
Review via email: mp+236499@code.launchpad.net

Commit message

Use asyncio for the method communication.

Description of the change

Simplify the non-blocking code in the subprocess handling by using the new asyncio. This means we will be python3+ only (unless we use the python-trollius compat module for python2 and older python3).

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :

FAILED: Continuous integration, rev:550
No commit message was specified in the merge proposal. Click on the following link and set the commit message (if you want a jenkins rebuild you need to trigger it yourself):
https://code.launchpad.net/~mvo/click/acquire-asyncio/+merge/236499/+edit-commit-message

http://jenkins.qa.ubuntu.com/job/click-devel-ci/89/
Executed test runs:
    SUCCESS: http://jenkins.qa.ubuntu.com/job/click-devel-utopic-amd64-ci/91
    SUCCESS: http://jenkins.qa.ubuntu.com/job/click-devel-utopic-armhf-ci/89
        deb: http://jenkins.qa.ubuntu.com/job/click-devel-utopic-armhf-ci/89/artifact/work/output/*zip*/output.zip
    SUCCESS: http://jenkins.qa.ubuntu.com/job/click-devel-utopic-i386-ci/89

Click here to trigger a rebuild:
http://s-jenkins.ubuntu-ci:8080/job/click-devel-ci/89/rebuild

review: Needs Fixing (continuous-integration)

Unmerged revisions

550. By Michael Vogt

refactor so that AcquireMethod.run() also uses asyncio

549. By Michael Vogt

port to asyncio

548. By Michael Vogt

add missing python3-dbus build-dependency

547. By Michael Vogt

rename read_messages() -> _read_messages()

546. By Michael Vogt

click/acquire.py: add note about ssl options for curl

545. By Michael Vogt

first round of addressing review points from Colin

544. By Michael Vogt

add python3-pycurl to b-d (needed during the tests

543. By Michael Vogt

add missing python3-pycurl depdendency

542. By Michael Vogt

click/acquire.py: make dbus optional

541. By Michael Vogt

remove debug message

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'click/acquire.py'
--- click/acquire.py 2014-09-30 12:27:37 +0000
+++ click/acquire.py 2014-09-30 12:27:37 +0000
@@ -28,8 +28,8 @@
28 'ClickAcquireStatusText',28 'ClickAcquireStatusText',
29 ]29 ]
3030
31import asyncio
31import os32import os
32import select
33import subprocess33import subprocess
34import sys34import sys
35from textwrap import dedent35from textwrap import dedent
@@ -49,47 +49,32 @@
49 pass49 pass
5050
5151
52def extract_one_message(lines):52# hrm, why is this not in the asyncio directly?
53def get_asyncio_stream_reader_from_file(loop, file_like):
54 """Return (yield from) a StreamReader for the given file like object"""
55 stream_reader = asyncio.streams.StreamReader(loop=loop)
56 transport, protocol = yield from loop.connect_read_pipe(
57 lambda: asyncio.streams.StreamReaderProtocol(stream_reader),
58 file_like)
59 return stream_reader
60
61
62@asyncio.coroutine
63def _read_message(input_file):
53 msg = []64 msg = []
54 while True:65 while True:
55 try:66 raw_line = yield from input_file.readline()
56 line = lines.pop(0)67 line = raw_line.decode("utf-8")
57 except IndexError:68 # consume all top "\n"
69 if len(msg) == 0 and line == "\n":
70 continue
71 if line == "\n":
58 break72 break
59 if os.environ.get("CLICK_DEBUG_ACQUIRE", ""):
60 sys.stderr.write("[%s] raw_line: '%s'\n" % (
61 os.getpid(), line.replace("\n", "\\n")))
62 msg.append(line)73 msg.append(line)
63 if len(msg) > 0 and line == "":
64 # we are done, collect all remaining "\n" and stop
65 while len(lines) > 0 and lines[0] == "":
66 lines.pop(0)
67 break
68 if len(msg) < 2:74 if len(msg) < 2:
69 return -1 , ""75 return -1 , ""
70 if os.environ.get("CLICK_DEBUG_ACQUIRE", ""):
71 sys.stderr.write("[%s] msg: '%s'\n" % (os.getpid(), str(msg)))
72 number = int(msg[0].split()[0])76 number = int(msg[0].split()[0])
73 return number, "\n".join(msg)77 return number, "".join(msg)
74
75
76def _read_messages(input_file, timeout=None):
77 infd = input_file.fileno()
78 msgs = []
79 rl, wl, xl = select.select([infd], [], [], timeout)
80 if rl:
81 # FIXME: 16k message limit is arbitrary
82 buf = os.read(infd, 16*1024).decode("utf-8")
83 if os.environ.get("CLICK_DEBUG_ACQUIRE", ""):
84 sys.stderr.write("[%s] read buf: '%s'\n" % (
85 os.getpid(), buf))
86 lines = buf.split("\n")
87 while True:
88 number, msg = extract_one_message(lines)
89 if number < 0:
90 break
91 msgs.append( (number, msg) )
92 return msgs
9378
9479
95class ClickAcquireStatus:80class ClickAcquireStatus:
@@ -149,44 +134,49 @@
149 Filename: {destfile}134 Filename: {destfile}
150135
151 """).format(uri=uri, destfile=destfile)136 """).format(uri=uri, destfile=destfile)
152 pipe.write(cmd)137 pipe.write(cmd.encode("utf-8"))
153 pipe.flush()138 yield from pipe.drain()
154139
155 def _redirect(self, pipe, new_uri, destfile):140 def _redirect(self, pipe, new_uri, destfile):
156 self._uri_acquire(pipe, new_uri, destfile)141 self._uri_acquire(pipe, new_uri, destfile)
157142
143 @asyncio.coroutine
144 def _communicate_with_method(self, cmd, uri, destfile):
145 p = yield from asyncio.create_subprocess_exec(cmd,
146 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
147 while True:
148 number, raw_message = yield from _read_message(p.stdout)
149 message = Deb822(raw_message)
150 if number == self.M_CAPABILITIES:
151 yield from self._uri_acquire(p.stdin, uri, destfile)
152 elif number == self.M_STATUS:
153 pass
154 elif number == self.M_REDIRECT:
155 self._redirect(p.stdin, message.get("New-URI"), destfile)
156 elif number == self.URI_START:
157 self._status.uri = message.get("URI", 0)
158 self._status.total_bytes = int(message.get("Size", 0))
159 elif number == self.URI_SUCCESS:
160 self._status.fetched_bytes = int(message.get("Size"))
161 self._status.done()
162 return True
163 elif number == self.URI_FAILURE:
164 raise ClickAcquireError()
165
166 def _report_progress(self, loop, destfile):
167 if os.path.exists(destfile):
168 self._status.fetched_bytes = os.path.getsize(destfile)
169 self._status.pulse()
170 loop.call_later(self.TIMEOUT, self._report_progress, loop, destfile)
171
158 def _run_acquire_method(self, uri, destfile):172 def _run_acquire_method(self, uri, destfile):
159 parsed_uri = urlparse(uri)173 parsed_uri = urlparse(uri)
174 loop = asyncio.get_event_loop()
160 cmd = os.path.join(acquire_methods_dir, parsed_uri.scheme)175 cmd = os.path.join(acquire_methods_dir, parsed_uri.scheme)
161 p = subprocess.Popen([cmd],176 loop.call_later(self.TIMEOUT, self._report_progress, loop, destfile)
162 stdout=subprocess.PIPE, stdin=subprocess.PIPE,177 loop.run_until_complete(
163 universal_newlines=True)178 self._communicate_with_method(cmd, uri, destfile))
164 while True:179 loop.close()
165 for number, raw_message in _read_messages(p.stdout, self.TIMEOUT):
166 message = Deb822(raw_message)
167 if number == self.M_CAPABILITIES:
168 self._uri_acquire(p.stdin, uri, destfile)
169 elif number == self.M_STATUS:
170 pass
171 elif number == self.M_REDIRECT:
172 self._redirect(p.stdin, message.get("New-URI"), destfile)
173 elif number == self.URI_START:
174 self._status.uri = message.get("URI", 0)
175 self._status.total_bytes = int(message.get("Size", 0))
176 elif number == self.URI_SUCCESS:
177 self._status.fetched_bytes = int(message.get("Size"))
178 self._status.done()
179 p.stdout.close()
180 p.stdin.close()
181 return True
182 elif number == self.URI_FAILURE:
183 p.stdout.close()
184 p.stdin.close()
185 raise ClickAcquireError()
186 # update progress
187 if os.path.exists(destfile):
188 self._status.fetched_bytes = os.path.getsize(destfile)
189 self._status.pulse()
190 return False180 return False
191181
192 def fetch(self, uri, destfile):182 def fetch(self, uri, destfile):
@@ -212,16 +202,20 @@
212 sys.stdout.flush()202 sys.stdout.flush()
213203
214 def run(self):204 def run(self):
205 loop = asyncio.get_event_loop()
206 loop.run_until_complete(self._run(loop))
207
208 @asyncio.coroutine
209 def _run(self, loop):
210 stdin_reader = yield from get_asyncio_stream_reader_from_file(
211 loop, sys.stdin)
215 while True:212 while True:
216 msgs = _read_messages(sys.stdin)213 number, raw_message = yield from _read_message(stdin_reader)
217 if not msgs:214 message = Deb822(raw_message)
218 break215 if number == self.M_CONFIGURATION:
219 for number, raw_message in msgs:216 pass
220 message = Deb822(raw_message)217 elif number == self.M_FETCH:
221 if number == self.M_CONFIGURATION:218 self.fetch(message.get("URI"), message.get("FileName"))
222 pass
223 elif number == self.M_FETCH:
224 self.fetch(message.get("URI"), message.get("FileName"))
225219
226 def uri_start(self, uri, filename, size):220 def uri_start(self, uri, filename, size):
227 # note that apt itself does not use "Filename" here because it221 # note that apt itself does not use "Filename" here because it
228222
=== modified file 'click/tests/test_acquire.py'
--- click/tests/test_acquire.py 2014-09-30 12:27:37 +0000
+++ click/tests/test_acquire.py 2014-09-30 12:27:37 +0000
@@ -24,7 +24,6 @@
2424
25import multiprocessing25import multiprocessing
26import os.path26import os.path
27import time
2827
29from six.moves import (28from six.moves import (
30 SimpleHTTPServer,29 SimpleHTTPServer,
@@ -34,7 +33,7 @@
34from click.acquire import (33from click.acquire import (
35 ClickAcquire,34 ClickAcquire,
36 ClickAcquireError,35 ClickAcquireError,
37 _read_messages,36 _read_message,
38)37)
3938
40from click.tests.helpers import (39from click.tests.helpers import (
@@ -108,7 +107,7 @@
108 self.assertEqual(canary_str, data)107 self.assertEqual(canary_str, data)
109108
110109
111class TestClickAcquireReadMessages(TestCase):110class TestClickAcquireReadMessage(TestCase):
112111
113 def test_forked_read_message(self):112 def test_forked_read_message(self):
114 read_end, write_end = os.pipe()113 read_end, write_end = os.pipe()
@@ -116,7 +115,7 @@
116 if pid == 0:115 if pid == 0:
117 os.close(write_end)116 os.close(write_end)
118 with os.fdopen(read_end) as f:117 with os.fdopen(read_end) as f:
119 number, msg = _read_messages(f)[0]118 number, msg = yield from _read_message(f)
120 self.assertEqual(msg, "102 Status\nFoo: Bar\n")119 self.assertEqual(msg, "102 Status\nFoo: Bar\n")
121 self.assertEqual(number, 102)120 self.assertEqual(number, 102)
122 os._exit(0)121 os._exit(0)
@@ -131,7 +130,9 @@
131 if pid == 0:130 if pid == 0:
132 os.close(write_end)131 os.close(write_end)
133 with os.fdopen(read_end) as f:132 with os.fdopen(read_end) as f:
134 msgs = _read_messages(f)133 msg1 = yield from _read_message(f)
134 msg2 = yield from _read_message(f)
135 msgs = [msg1, msg2]
135 self.assertEqual(len(msgs), 2)136 self.assertEqual(len(msgs), 2)
136 self.assertEqual(msgs[0][0], 100)137 self.assertEqual(msgs[0][0], 100)
137 self.assertEqual(msgs[1][0], 200)138 self.assertEqual(msgs[1][0], 200)
@@ -147,28 +148,10 @@
147 if pid == 0:148 if pid == 0:
148 os.close(write_end)149 os.close(write_end)
149 with os.fdopen(read_end) as f:150 with os.fdopen(read_end) as f:
150 self.assertEqual(_read_messages(f), [])151 msg = yield from _read_message(f)
152 self.assertEqual(msg, (-1, []))
151 os._exit(0)153 os._exit(0)
152 os.close(read_end)154 os.close(read_end)
153 os.write(write_end, "102 close-before-msg".encode("utf-8"))155 os.write(write_end, "102 close-before-msg".encode("utf-8"))
154 os.close(write_end)156 os.close(write_end)
155 os.waitpid(pid, 0)157 os.waitpid(pid, 0)
156
157 def test_forked_read_message_with_timeout(self):
158 read_end, write_end = os.pipe()
159 pid = os.fork()
160 if pid == 0:
161 os.close(write_end)
162 with os.fdopen(read_end) as f:
163 self.assertEqual(_read_messages(f, 0.1), [])
164 msgs = _read_messages(f)
165 self.assertEqual(len(msgs), 2)
166 self.assertEqual(msgs[0][1], "100 aaa\n")
167 self.assertEqual(msgs[1][1], "200 bbb\n")
168 os._exit(0)
169 os.close(read_end)
170 # simulate slow method
171 time.sleep(0.2)
172 os.write(write_end, "100 aaa\n\n".encode("utf-8"))
173 os.write(write_end, "200 bbb\n\n".encode("utf-8"))
174 os.waitpid(pid, 0)

Subscribers

People subscribed via source and target branches

to all changes: