Merge lp:~abentley/workspace-runner/robust-connections into lp:workspace-runner

Proposed by Aaron Bentley on 2015-06-25
Status: Merged
Merged at revision: 18
Proposed branch: lp:~abentley/workspace-runner/robust-connections
Merge into: lp:workspace-runner
Diff against target: 285 lines (+133/-25)
2 files modified
workspace_runner/__init__.py (+60/-21)
workspace_runner/tests/__init__.py (+73/-4)
To merge this branch: bzr merge lp:~abentley/workspace-runner/robust-connections
Reviewer Review Type Date Requested Status
Curtis Hovey (community) code 2015-06-25 Approve on 2015-06-25
Review via email: mp+262986@code.launchpad.net

Commit message

Make workspace-runner robust against connection failures.

Description of the change

The main purpose of this branch is adding retry support to workspace-runner.

This is done by creating a retry_ssh decorator, and decorating all relevant functions.

It also tries to improve separation of concerns by extracting call_with_input from run_python, and moving run_python back to SSHPrimitives.

Finally, it adds a --verbose option that causes each ssh command to be printed, and retries to be noted.

I experimented with SSH connection sharing, but determined that it does not prevent connection failures. Even after an initial successful connection, subsequent connections could fail with "Received disconnect from 54.165.231.203: 2: fork failed: Resource temporarily unavailable"

To post a comment you must log in.
Curtis Hovey (sinzui) wrote :

Thank you.

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'workspace_runner/__init__.py'
2--- workspace_runner/__init__.py 2015-06-24 20:07:17 +0000
3+++ workspace_runner/__init__.py 2015-06-25 15:14:56 +0000
4@@ -18,7 +18,9 @@
5 parser = ArgumentParser()
6 parser.add_argument('config', help='Config file to use.')
7 parser.add_argument('host', help='Machine to run the command on.')
8- parser.add_argument('--private-key', '-i', help='Private SSH key to use')
9+ parser.add_argument('--private-key', '-i', help='Private SSH key to use.')
10+ parser.add_argument('--verbose', '-v', help='Verbose output.',
11+ action='store_true')
12 return parser.parse_args(argv)
13
14
15@@ -36,6 +38,25 @@
16 return cls(ssh_connection, tempdir)
17
18
19+def retry_ssh(func):
20+ """Decorator that retries a function if it looks like SSH failed.
21+
22+ SSH connection failures are a CalledProcessError with returncode 255.
23+ """
24+ def decorated(*args, **kwargs):
25+ for x in range(3):
26+ if x != 0:
27+ logging.info('Retrying due to ssh failure.')
28+ try:
29+ return func(*args, **kwargs)
30+ except subprocess.CalledProcessError as e:
31+ if e.returncode != 255:
32+ raise
33+ else:
34+ raise
35+ return decorated
36+
37+
38 class SSHConnection:
39 """Class representing an SSH connection.
40
41@@ -60,36 +81,34 @@
42 command = ['ssh'] + self.get_ssh_options() + [self.host] + args
43 return command
44
45- def run_python(self, script, argv=None):
46- """Run a python script using this connection.
47-
48- :param script: The text of the script.
49- :param argv: The values to provide as sys.argv in the script.
50- """
51- if argv is None:
52- argv = []
53- python_command = ' '.join(['python', '-'] + [quote(x) for x in argv])
54- command = self.get_ssh_cmd([python_command])
55+ @retry_ssh
56+ def call_with_input(self, args, stdin):
57+ """Run a command remotely, accepting stdin."""
58+ remote_command = ' '.join(quote(x) for x in args)
59+ command = self.get_ssh_cmd([remote_command])
60+ logging.info('Running: {}'.format(' '.join(command)))
61 proc = subprocess.Popen(command, stdin=subprocess.PIPE,
62 stdout=subprocess.PIPE)
63- output = proc.communicate(script)
64+ output = proc.communicate(stdin)
65 retcode = proc.wait()
66 if retcode:
67 raise subprocess.CalledProcessError(
68 retcode, command, output=output)
69- return output[0].rstrip('\r\n')
70+ return output[0]
71
72+ @retry_ssh
73 def scp_to_remote(self, sources, dest_path):
74 """Copy files to the specified path on the remote host."""
75 dest_loc = '{}:{}/'.format(self.host, quote(dest_path))
76- subprocess.check_call(['scp'] + self.get_ssh_options() + sources +
77- [dest_loc])
78+ command = ['scp'] + self.get_ssh_options() + sources + [dest_loc]
79+ logging.info('Running: {}'.format(' '.join(command)))
80+ subprocess.check_call(command)
81
82
83 class SSHPrimitives(Primitives):
84
85 def mkdir(self, path):
86- return self.ssh_connection.run_python(dedent("""
87+ return self.run_python(self.ssh_connection, dedent("""
88 import os
89 import sys
90
91@@ -99,24 +118,39 @@
92 @classmethod
93 def mktemp(cls, ssh_connection):
94 """Make a temporary directory on the specified host via ssh."""
95- return ssh_connection.run_python(dedent("""
96+ return cls.run_python(ssh_connection, dedent("""
97 import tempfile
98 print tempfile.mkdtemp()
99 """))
100
101 def destroy(self):
102 """Destroy the workspace."""
103- return self.ssh_connection.run_python(dedent("""
104+ return self.run_python(self.ssh_connection, dedent("""
105 import shutil
106 import sys
107 shutil.rmtree(sys.argv[1])
108 """), [self.workspace])
109
110+ @retry_ssh
111 def run(self, args, out_file):
112 """Run a command in the workspace."""
113- subprocess.check_call(
114- self.ssh_connection.get_ssh_cmd(
115- ['cd', quote(self.workspace), ';'] + args), stdout=out_file)
116+ command = self.ssh_connection.get_ssh_cmd(
117+ ['cd', quote(self.workspace), ';'] + args)
118+ logging.info('Running: {}'.format(' '.join(command)))
119+ subprocess.check_call(command, stdout=out_file)
120+
121+ @staticmethod
122+ def run_python(ssh_connection, script, argv=None):
123+ """Run a python script using this connection.
124+
125+ :param script: The text of the script.
126+ :param argv: The values to provide as sys.argv in the script.
127+ """
128+ if argv is None:
129+ argv = []
130+ python_args = ['python', '-'] + argv
131+ output = ssh_connection.call_with_input(python_args, script)
132+ return output.rstrip('\r\n')
133
134 def install(self, sources, target):
135 """Copy files into the workspace."""
136@@ -149,6 +183,11 @@
137 def workspace_run(argv=None, primitives_factory=SSHPrimitives):
138 """Run an operation in a workspace."""
139 args = parse_args(argv)
140+ if args.verbose:
141+ level = logging.INFO
142+ else:
143+ level = logging.WARNING
144+ logging.basicConfig(level=level)
145 with open(args.config) as config_file:
146 config = safe_load(config_file)
147 with workspace_context(args.host, args.private_key,
148
149=== modified file 'workspace_runner/tests/__init__.py'
150--- workspace_runner/tests/__init__.py 2015-06-24 18:59:56 +0000
151+++ workspace_runner/tests/__init__.py 2015-06-25 15:14:56 +0000
152@@ -1,6 +1,7 @@
153 from argparse import Namespace
154 from contextlib import contextmanager
155 import os
156+import logging
157 from mock import patch
158 from pipes import quote
159 from shutil import (
160@@ -21,6 +22,7 @@
161 from workspace_runner import (
162 parse_args,
163 Primitives,
164+ retry_ssh,
165 SSHConnection,
166 SSHPrimitives,
167 workspace_context,
168@@ -42,7 +44,7 @@
169 def test_minimal(self):
170 args = parse_args(['foo', 'bar'])
171 self.assertEqual(args, Namespace(config='foo', host='bar',
172- private_key=None))
173+ private_key=None, verbose=False))
174
175 def test_private_key(self):
176 args = parse_args(['foo', 'bar', '--private-key', 'key'])
177@@ -50,6 +52,12 @@
178 args = parse_args(['foo', 'bar', '-i', 'key2'])
179 self.assertEqual(args.private_key, 'key2')
180
181+ def test_verbose(self):
182+ args = parse_args(['foo', 'bar', '--verbose'])
183+ self.assertEqual(args.verbose, True)
184+ args = parse_args(['foo', 'bar', '-v'])
185+ self.assertEqual(args.verbose, True)
186+
187
188 class FakePrimitives(Primitives):
189
190@@ -117,6 +125,57 @@
191 return self.last_instance
192
193
194+class TestRetrySSH(TestCase):
195+
196+ def test_retry_until_failure(self):
197+ attempts = []
198+
199+ @retry_ssh
200+ def fail_repeatedly():
201+ attempts.append('attempt')
202+ raise subprocess.CalledProcessError(255, [])
203+
204+ with self.assertRaises(subprocess.CalledProcessError):
205+ fail_repeatedly()
206+ self.assertEqual(len(attempts), 3)
207+
208+ def test_immediate_success(self):
209+ attempts = []
210+
211+ @retry_ssh
212+ def succeed():
213+ attempts.append('success')
214+ return 'success'
215+
216+ self.assertEqual(succeed(), 'success')
217+ self.assertEqual(len(attempts), 1)
218+
219+ def test_no_retry_non_ssh(self):
220+ attempts = []
221+
222+ @retry_ssh
223+ def fail_repeatedly():
224+ attempts.append('attempt')
225+ raise subprocess.CalledProcessError(254, [])
226+
227+ with self.assertRaises(subprocess.CalledProcessError):
228+ fail_repeatedly()
229+ self.assertEqual(len(attempts), 1)
230+
231+ def test_nearly_fail(self):
232+ attempts = []
233+
234+ @retry_ssh
235+ def nearly_fail():
236+ attempts.append('attempt')
237+ if len(attempts) == 3:
238+ return 'success'
239+ raise subprocess.CalledProcessError(255, [])
240+
241+ self.assertEqual(nearly_fail(), 'success')
242+ self.assertEqual(len(attempts), 3)
243+
244+
245 class TestSSHConnection(TestCase):
246
247 def setUp(self):
248@@ -138,12 +197,12 @@
249 '-o', 'UserKnownHostsFile=/dev/null',
250 '-i', 'private-key'])
251
252- def test_run_python_error(self):
253+ def test_call_with_input_error(self):
254 connection = SSHConnection('host', 'key')
255 with patch('subprocess.Popen') as po_mock:
256 po_mock.return_value.wait.return_value = 27
257 with self.assertRaises(subprocess.CalledProcessError):
258- connection.run_python('')
259+ connection.call_with_input([], '')
260
261
262 class TestSSHPrimitives(TestPrimitives, TestCase):
263@@ -311,7 +370,10 @@
264
265 def test_minimal(self):
266 with self.config_file() as config_file:
267- primitives = self.run_primitives([config_file.name, 'bar'])
268+ with patch('logging.root.handlers', []):
269+ primitives = self.run_primitives([config_file.name, 'bar'])
270+ self.assertEqual(logging.getLogger().getEffectiveLevel(),
271+ logging.WARNING)
272 self.assertEqual(primitives.run_calls, [['run', 'this']])
273 self.assertFalse(os.path.exists(primitives.workspace))
274 self.assertEqual(primitives.walk_results,
275@@ -341,3 +403,10 @@
276 (primitives.workspace, ['bin-dir'], []),
277 (bin_path, [], [install_base]),
278 ])
279+
280+ def test_verbose(self):
281+ with self.config_file() as config_file:
282+ with patch('logging.root.handlers', []):
283+ self.run_primitives([config_file.name, 'bar', '-v'])
284+ self.assertEqual(logging.getLogger().getEffectiveLevel(),
285+ logging.INFO)

Subscribers

People subscribed via source and target branches