Merge lp:~mvo/click/acquire-asyncio into lp:click/devel

Proposed by Michael Vogt
Status: Needs review
Proposed branch: lp:~mvo/click/acquire-asyncio
Merge into: lp:click/devel
Prerequisite: lp:~mvo/click/acquire
Diff against target: 272 lines (+77/-100)
2 files modified
click/acquire.py (+69/-75)
click/tests/test_acquire.py (+8/-25)
To merge this branch: bzr merge lp:~mvo/click/acquire-asyncio
Reviewer Review Type Date Requested Status
PS Jenkins bot (community) continuous-integration Needs Fixing
click hackers Pending
Review via email: mp+236499@code.launchpad.net

Commit message

Use asyncio for the method communication.

Description of the change

Simplify the non-blocking code in the subprocess handling by using the new asyncio. This means we will be python3+ only (unless we use the python-trollius compat module for python2 and older python3).

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :

FAILED: Continuous integration, rev:550
No commit message was specified in the merge proposal. Click on the following link and set the commit message (if you want a jenkins rebuild you need to trigger it yourself):
https://code.launchpad.net/~mvo/click/acquire-asyncio/+merge/236499/+edit-commit-message

http://jenkins.qa.ubuntu.com/job/click-devel-ci/89/
Executed test runs:
    SUCCESS: http://jenkins.qa.ubuntu.com/job/click-devel-utopic-amd64-ci/91
    SUCCESS: http://jenkins.qa.ubuntu.com/job/click-devel-utopic-armhf-ci/89
        deb: http://jenkins.qa.ubuntu.com/job/click-devel-utopic-armhf-ci/89/artifact/work/output/*zip*/output.zip
    SUCCESS: http://jenkins.qa.ubuntu.com/job/click-devel-utopic-i386-ci/89

Click here to trigger a rebuild:
http://s-jenkins.ubuntu-ci:8080/job/click-devel-ci/89/rebuild

review: Needs Fixing (continuous-integration)

Unmerged revisions

550. By Michael Vogt

refactor so that AcquireMethod.run() also uses asyncio

549. By Michael Vogt

port to asyncio

548. By Michael Vogt

add missing python3-dbus build-dependency

547. By Michael Vogt

rename read_messages() -> _read_messages()

546. By Michael Vogt

click/acquire.py: add note about ssl options for curl

545. By Michael Vogt

first round of addressing review points from Colin

544. By Michael Vogt

add python3-pycurl to b-d (needed during the tests

543. By Michael Vogt

add missing python3-pycurl depdendency

542. By Michael Vogt

click/acquire.py: make dbus optional

541. By Michael Vogt

remove debug message

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'click/acquire.py'
2--- click/acquire.py 2014-09-30 12:27:37 +0000
3+++ click/acquire.py 2014-09-30 12:27:37 +0000
4@@ -28,8 +28,8 @@
5 'ClickAcquireStatusText',
6 ]
7
8+import asyncio
9 import os
10-import select
11 import subprocess
12 import sys
13 from textwrap import dedent
14@@ -49,47 +49,32 @@
15 pass
16
17
18-def extract_one_message(lines):
19+# hrm, why is this not in the asyncio directly?
20+def get_asyncio_stream_reader_from_file(loop, file_like):
21+ """Return (yield from) a StreamReader for the given file like object"""
22+ stream_reader = asyncio.streams.StreamReader(loop=loop)
23+ transport, protocol = yield from loop.connect_read_pipe(
24+ lambda: asyncio.streams.StreamReaderProtocol(stream_reader),
25+ file_like)
26+ return stream_reader
27+
28+
29+@asyncio.coroutine
30+def _read_message(input_file):
31 msg = []
32 while True:
33- try:
34- line = lines.pop(0)
35- except IndexError:
36+ raw_line = yield from input_file.readline()
37+ line = raw_line.decode("utf-8")
38+ # consume all top "\n"
39+ if len(msg) == 0 and line == "\n":
40+ continue
41+ if line == "\n":
42 break
43- if os.environ.get("CLICK_DEBUG_ACQUIRE", ""):
44- sys.stderr.write("[%s] raw_line: '%s'\n" % (
45- os.getpid(), line.replace("\n", "\\n")))
46 msg.append(line)
47- if len(msg) > 0 and line == "":
48- # we are done, collect all remaining "\n" and stop
49- while len(lines) > 0 and lines[0] == "":
50- lines.pop(0)
51- break
52 if len(msg) < 2:
53 return -1 , ""
54- if os.environ.get("CLICK_DEBUG_ACQUIRE", ""):
55- sys.stderr.write("[%s] msg: '%s'\n" % (os.getpid(), str(msg)))
56 number = int(msg[0].split()[0])
57- return number, "\n".join(msg)
58-
59-
60-def _read_messages(input_file, timeout=None):
61- infd = input_file.fileno()
62- msgs = []
63- rl, wl, xl = select.select([infd], [], [], timeout)
64- if rl:
65- # FIXME: 16k message limit is arbitrary
66- buf = os.read(infd, 16*1024).decode("utf-8")
67- if os.environ.get("CLICK_DEBUG_ACQUIRE", ""):
68- sys.stderr.write("[%s] read buf: '%s'\n" % (
69- os.getpid(), buf))
70- lines = buf.split("\n")
71- while True:
72- number, msg = extract_one_message(lines)
73- if number < 0:
74- break
75- msgs.append( (number, msg) )
76- return msgs
77+ return number, "".join(msg)
78
79
80 class ClickAcquireStatus:
81@@ -149,44 +134,49 @@
82 Filename: {destfile}
83
84 """).format(uri=uri, destfile=destfile)
85- pipe.write(cmd)
86- pipe.flush()
87+ pipe.write(cmd.encode("utf-8"))
88+ yield from pipe.drain()
89
90 def _redirect(self, pipe, new_uri, destfile):
91 self._uri_acquire(pipe, new_uri, destfile)
92
93+ @asyncio.coroutine
94+ def _communicate_with_method(self, cmd, uri, destfile):
95+ p = yield from asyncio.create_subprocess_exec(cmd,
96+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
97+ while True:
98+ number, raw_message = yield from _read_message(p.stdout)
99+ message = Deb822(raw_message)
100+ if number == self.M_CAPABILITIES:
101+ yield from self._uri_acquire(p.stdin, uri, destfile)
102+ elif number == self.M_STATUS:
103+ pass
104+ elif number == self.M_REDIRECT:
105+ self._redirect(p.stdin, message.get("New-URI"), destfile)
106+ elif number == self.URI_START:
107+ self._status.uri = message.get("URI", 0)
108+ self._status.total_bytes = int(message.get("Size", 0))
109+ elif number == self.URI_SUCCESS:
110+ self._status.fetched_bytes = int(message.get("Size"))
111+ self._status.done()
112+ return True
113+ elif number == self.URI_FAILURE:
114+ raise ClickAcquireError()
115+
116+ def _report_progress(self, loop, destfile):
117+ if os.path.exists(destfile):
118+ self._status.fetched_bytes = os.path.getsize(destfile)
119+ self._status.pulse()
120+ loop.call_later(self.TIMEOUT, self._report_progress, loop, destfile)
121+
122 def _run_acquire_method(self, uri, destfile):
123 parsed_uri = urlparse(uri)
124+ loop = asyncio.get_event_loop()
125 cmd = os.path.join(acquire_methods_dir, parsed_uri.scheme)
126- p = subprocess.Popen([cmd],
127- stdout=subprocess.PIPE, stdin=subprocess.PIPE,
128- universal_newlines=True)
129- while True:
130- for number, raw_message in _read_messages(p.stdout, self.TIMEOUT):
131- message = Deb822(raw_message)
132- if number == self.M_CAPABILITIES:
133- self._uri_acquire(p.stdin, uri, destfile)
134- elif number == self.M_STATUS:
135- pass
136- elif number == self.M_REDIRECT:
137- self._redirect(p.stdin, message.get("New-URI"), destfile)
138- elif number == self.URI_START:
139- self._status.uri = message.get("URI", 0)
140- self._status.total_bytes = int(message.get("Size", 0))
141- elif number == self.URI_SUCCESS:
142- self._status.fetched_bytes = int(message.get("Size"))
143- self._status.done()
144- p.stdout.close()
145- p.stdin.close()
146- return True
147- elif number == self.URI_FAILURE:
148- p.stdout.close()
149- p.stdin.close()
150- raise ClickAcquireError()
151- # update progress
152- if os.path.exists(destfile):
153- self._status.fetched_bytes = os.path.getsize(destfile)
154- self._status.pulse()
155+ loop.call_later(self.TIMEOUT, self._report_progress, loop, destfile)
156+ loop.run_until_complete(
157+ self._communicate_with_method(cmd, uri, destfile))
158+ loop.close()
159 return False
160
161 def fetch(self, uri, destfile):
162@@ -212,16 +202,20 @@
163 sys.stdout.flush()
164
165 def run(self):
166+ loop = asyncio.get_event_loop()
167+ loop.run_until_complete(self._run(loop))
168+
169+ @asyncio.coroutine
170+ def _run(self, loop):
171+ stdin_reader = yield from get_asyncio_stream_reader_from_file(
172+ loop, sys.stdin)
173 while True:
174- msgs = _read_messages(sys.stdin)
175- if not msgs:
176- break
177- for number, raw_message in msgs:
178- message = Deb822(raw_message)
179- if number == self.M_CONFIGURATION:
180- pass
181- elif number == self.M_FETCH:
182- self.fetch(message.get("URI"), message.get("FileName"))
183+ number, raw_message = yield from _read_message(stdin_reader)
184+ message = Deb822(raw_message)
185+ if number == self.M_CONFIGURATION:
186+ pass
187+ elif number == self.M_FETCH:
188+ self.fetch(message.get("URI"), message.get("FileName"))
189
190 def uri_start(self, uri, filename, size):
191 # note that apt itself does not use "Filename" here because it
192
193=== modified file 'click/tests/test_acquire.py'
194--- click/tests/test_acquire.py 2014-09-30 12:27:37 +0000
195+++ click/tests/test_acquire.py 2014-09-30 12:27:37 +0000
196@@ -24,7 +24,6 @@
197
198 import multiprocessing
199 import os.path
200-import time
201
202 from six.moves import (
203 SimpleHTTPServer,
204@@ -34,7 +33,7 @@
205 from click.acquire import (
206 ClickAcquire,
207 ClickAcquireError,
208- _read_messages,
209+ _read_message,
210 )
211
212 from click.tests.helpers import (
213@@ -108,7 +107,7 @@
214 self.assertEqual(canary_str, data)
215
216
217-class TestClickAcquireReadMessages(TestCase):
218+class TestClickAcquireReadMessage(TestCase):
219
220 def test_forked_read_message(self):
221 read_end, write_end = os.pipe()
222@@ -116,7 +115,7 @@
223 if pid == 0:
224 os.close(write_end)
225 with os.fdopen(read_end) as f:
226- number, msg = _read_messages(f)[0]
227+ number, msg = yield from _read_message(f)
228 self.assertEqual(msg, "102 Status\nFoo: Bar\n")
229 self.assertEqual(number, 102)
230 os._exit(0)
231@@ -131,7 +130,9 @@
232 if pid == 0:
233 os.close(write_end)
234 with os.fdopen(read_end) as f:
235- msgs = _read_messages(f)
236+ msg1 = yield from _read_message(f)
237+ msg2 = yield from _read_message(f)
238+ msgs = [msg1, msg2]
239 self.assertEqual(len(msgs), 2)
240 self.assertEqual(msgs[0][0], 100)
241 self.assertEqual(msgs[1][0], 200)
242@@ -147,28 +148,10 @@
243 if pid == 0:
244 os.close(write_end)
245 with os.fdopen(read_end) as f:
246- self.assertEqual(_read_messages(f), [])
247+ msg = yield from _read_message(f)
248+ self.assertEqual(msg, (-1, []))
249 os._exit(0)
250 os.close(read_end)
251 os.write(write_end, "102 close-before-msg".encode("utf-8"))
252 os.close(write_end)
253 os.waitpid(pid, 0)
254-
255- def test_forked_read_message_with_timeout(self):
256- read_end, write_end = os.pipe()
257- pid = os.fork()
258- if pid == 0:
259- os.close(write_end)
260- with os.fdopen(read_end) as f:
261- self.assertEqual(_read_messages(f, 0.1), [])
262- msgs = _read_messages(f)
263- self.assertEqual(len(msgs), 2)
264- self.assertEqual(msgs[0][1], "100 aaa\n")
265- self.assertEqual(msgs[1][1], "200 bbb\n")
266- os._exit(0)
267- os.close(read_end)
268- # simulate slow method
269- time.sleep(0.2)
270- os.write(write_end, "100 aaa\n\n".encode("utf-8"))
271- os.write(write_end, "200 bbb\n\n".encode("utf-8"))
272- os.waitpid(pid, 0)

Subscribers

People subscribed via source and target branches

to all changes: