Merge lp:~abentley/workspace-runner/robust-connections into lp:workspace-runner

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 18
Proposed branch: lp:~abentley/workspace-runner/robust-connections
Merge into: lp:workspace-runner
Diff against target: 285 lines (+133/-25)
2 files modified
workspace_runner/__init__.py (+60/-21)
workspace_runner/tests/__init__.py (+73/-4)
To merge this branch: bzr merge lp:~abentley/workspace-runner/robust-connections
Reviewer Review Type Date Requested Status
Curtis Hovey (community) code Approve
Review via email: mp+262986@code.launchpad.net

Commit message

Make workspace-runner robust against connection failures.

Description of the change

The main purpose of this branch is adding retry support to workspace-runner.

This is done by creating a retry_ssh decorator, and decorating all relevant functions.

It also tries to improve separation of concerns by extracting call_with_input from run_python, and moving run_python back to SSHPrimitives.

Finally, it adds a --verbose option that causes each ssh command to be printed, and retries to be noted.

I experimented with SSH connection sharing, but determined that it does not prevent connection failures. Even after an initial successful connection, subsequent connections could fail with "Received disconnect from 54.165.231.203: 2: fork failed: Resource temporarily unavailable"

To post a comment you must log in.
Revision history for this message
Curtis Hovey (sinzui) wrote :

Thank you.

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'workspace_runner/__init__.py'
--- workspace_runner/__init__.py 2015-06-24 20:07:17 +0000
+++ workspace_runner/__init__.py 2015-06-25 15:14:56 +0000
@@ -18,7 +18,9 @@
18 parser = ArgumentParser()18 parser = ArgumentParser()
19 parser.add_argument('config', help='Config file to use.')19 parser.add_argument('config', help='Config file to use.')
20 parser.add_argument('host', help='Machine to run the command on.')20 parser.add_argument('host', help='Machine to run the command on.')
21 parser.add_argument('--private-key', '-i', help='Private SSH key to use')21 parser.add_argument('--private-key', '-i', help='Private SSH key to use.')
22 parser.add_argument('--verbose', '-v', help='Verbose output.',
23 action='store_true')
22 return parser.parse_args(argv)24 return parser.parse_args(argv)
2325
2426
@@ -36,6 +38,25 @@
36 return cls(ssh_connection, tempdir)38 return cls(ssh_connection, tempdir)
3739
3840
41def retry_ssh(func):
42 """Decorator that retries a function if it looks like SSH failed.
43
44 SSH connection failures are a CalledProcessError with returncode 255.
45 """
46 def decorated(*args, **kwargs):
47 for x in range(3):
48 if x != 0:
49 logging.info('Retrying due to ssh failure.')
50 try:
51 return func(*args, **kwargs)
52 except subprocess.CalledProcessError as e:
53 if e.returncode != 255:
54 raise
55 else:
56 raise
57 return decorated
58
59
39class SSHConnection:60class SSHConnection:
40 """Class representing an SSH connection.61 """Class representing an SSH connection.
4162
@@ -60,36 +81,34 @@
60 command = ['ssh'] + self.get_ssh_options() + [self.host] + args81 command = ['ssh'] + self.get_ssh_options() + [self.host] + args
61 return command82 return command
6283
63 def run_python(self, script, argv=None):84 @retry_ssh
64 """Run a python script using this connection.85 def call_with_input(self, args, stdin):
6586 """Run a command remotely, accepting stdin."""
66 :param script: The text of the script.87 remote_command = ' '.join(quote(x) for x in args)
67 :param argv: The values to provide as sys.argv in the script.88 command = self.get_ssh_cmd([remote_command])
68 """89 logging.info('Running: {}'.format(' '.join(command)))
69 if argv is None:
70 argv = []
71 python_command = ' '.join(['python', '-'] + [quote(x) for x in argv])
72 command = self.get_ssh_cmd([python_command])
73 proc = subprocess.Popen(command, stdin=subprocess.PIPE,90 proc = subprocess.Popen(command, stdin=subprocess.PIPE,
74 stdout=subprocess.PIPE)91 stdout=subprocess.PIPE)
75 output = proc.communicate(script)92 output = proc.communicate(stdin)
76 retcode = proc.wait()93 retcode = proc.wait()
77 if retcode:94 if retcode:
78 raise subprocess.CalledProcessError(95 raise subprocess.CalledProcessError(
79 retcode, command, output=output)96 retcode, command, output=output)
80 return output[0].rstrip('\r\n')97 return output[0]
8198
99 @retry_ssh
82 def scp_to_remote(self, sources, dest_path):100 def scp_to_remote(self, sources, dest_path):
83 """Copy files to the specified path on the remote host."""101 """Copy files to the specified path on the remote host."""
84 dest_loc = '{}:{}/'.format(self.host, quote(dest_path))102 dest_loc = '{}:{}/'.format(self.host, quote(dest_path))
85 subprocess.check_call(['scp'] + self.get_ssh_options() + sources +103 command = ['scp'] + self.get_ssh_options() + sources + [dest_loc]
86 [dest_loc])104 logging.info('Running: {}'.format(' '.join(command)))
105 subprocess.check_call(command)
87106
88107
89class SSHPrimitives(Primitives):108class SSHPrimitives(Primitives):
90109
91 def mkdir(self, path):110 def mkdir(self, path):
92 return self.ssh_connection.run_python(dedent("""111 return self.run_python(self.ssh_connection, dedent("""
93 import os112 import os
94 import sys113 import sys
95114
@@ -99,24 +118,39 @@
99 @classmethod118 @classmethod
100 def mktemp(cls, ssh_connection):119 def mktemp(cls, ssh_connection):
101 """Make a temporary directory on the specified host via ssh."""120 """Make a temporary directory on the specified host via ssh."""
102 return ssh_connection.run_python(dedent("""121 return cls.run_python(ssh_connection, dedent("""
103 import tempfile122 import tempfile
104 print tempfile.mkdtemp()123 print tempfile.mkdtemp()
105 """))124 """))
106125
107 def destroy(self):126 def destroy(self):
108 """Destroy the workspace."""127 """Destroy the workspace."""
109 return self.ssh_connection.run_python(dedent("""128 return self.run_python(self.ssh_connection, dedent("""
110 import shutil129 import shutil
111 import sys130 import sys
112 shutil.rmtree(sys.argv[1])131 shutil.rmtree(sys.argv[1])
113 """), [self.workspace])132 """), [self.workspace])
114133
134 @retry_ssh
115 def run(self, args, out_file):135 def run(self, args, out_file):
116 """Run a command in the workspace."""136 """Run a command in the workspace."""
117 subprocess.check_call(137 command = self.ssh_connection.get_ssh_cmd(
118 self.ssh_connection.get_ssh_cmd(138 ['cd', quote(self.workspace), ';'] + args)
119 ['cd', quote(self.workspace), ';'] + args), stdout=out_file)139 logging.info('Running: {}'.format(' '.join(command)))
140 subprocess.check_call(command, stdout=out_file)
141
142 @staticmethod
143 def run_python(ssh_connection, script, argv=None):
144 """Run a python script using this connection.
145
146 :param script: The text of the script.
147 :param argv: The values to provide as sys.argv in the script.
148 """
149 if argv is None:
150 argv = []
151 python_args = ['python', '-'] + argv
152 output = ssh_connection.call_with_input(python_args, script)
153 return output.rstrip('\r\n')
120154
121 def install(self, sources, target):155 def install(self, sources, target):
122 """Copy files into the workspace."""156 """Copy files into the workspace."""
@@ -149,6 +183,11 @@
149def workspace_run(argv=None, primitives_factory=SSHPrimitives):183def workspace_run(argv=None, primitives_factory=SSHPrimitives):
150 """Run an operation in a workspace."""184 """Run an operation in a workspace."""
151 args = parse_args(argv)185 args = parse_args(argv)
186 if args.verbose:
187 level = logging.INFO
188 else:
189 level = logging.WARNING
190 logging.basicConfig(level=level)
152 with open(args.config) as config_file:191 with open(args.config) as config_file:
153 config = safe_load(config_file)192 config = safe_load(config_file)
154 with workspace_context(args.host, args.private_key,193 with workspace_context(args.host, args.private_key,
155194
=== modified file 'workspace_runner/tests/__init__.py'
--- workspace_runner/tests/__init__.py 2015-06-24 18:59:56 +0000
+++ workspace_runner/tests/__init__.py 2015-06-25 15:14:56 +0000
@@ -1,6 +1,7 @@
1from argparse import Namespace1from argparse import Namespace
2from contextlib import contextmanager2from contextlib import contextmanager
3import os3import os
4import logging
4from mock import patch5from mock import patch
5from pipes import quote6from pipes import quote
6from shutil import (7from shutil import (
@@ -21,6 +22,7 @@
21from workspace_runner import (22from workspace_runner import (
22 parse_args,23 parse_args,
23 Primitives,24 Primitives,
25 retry_ssh,
24 SSHConnection,26 SSHConnection,
25 SSHPrimitives,27 SSHPrimitives,
26 workspace_context,28 workspace_context,
@@ -42,7 +44,7 @@
42 def test_minimal(self):44 def test_minimal(self):
43 args = parse_args(['foo', 'bar'])45 args = parse_args(['foo', 'bar'])
44 self.assertEqual(args, Namespace(config='foo', host='bar',46 self.assertEqual(args, Namespace(config='foo', host='bar',
45 private_key=None))47 private_key=None, verbose=False))
4648
47 def test_private_key(self):49 def test_private_key(self):
48 args = parse_args(['foo', 'bar', '--private-key', 'key'])50 args = parse_args(['foo', 'bar', '--private-key', 'key'])
@@ -50,6 +52,12 @@
50 args = parse_args(['foo', 'bar', '-i', 'key2'])52 args = parse_args(['foo', 'bar', '-i', 'key2'])
51 self.assertEqual(args.private_key, 'key2')53 self.assertEqual(args.private_key, 'key2')
5254
55 def test_verbose(self):
56 args = parse_args(['foo', 'bar', '--verbose'])
57 self.assertEqual(args.verbose, True)
58 args = parse_args(['foo', 'bar', '-v'])
59 self.assertEqual(args.verbose, True)
60
5361
54class FakePrimitives(Primitives):62class FakePrimitives(Primitives):
5563
@@ -117,6 +125,57 @@
117 return self.last_instance125 return self.last_instance
118126
119127
128class TestRetrySSH(TestCase):
129
130 def test_retry_until_failure(self):
131 attempts = []
132
133 @retry_ssh
134 def fail_repeatedly():
135 attempts.append('attempt')
136 raise subprocess.CalledProcessError(255, [])
137
138 with self.assertRaises(subprocess.CalledProcessError):
139 fail_repeatedly()
140 self.assertEqual(len(attempts), 3)
141
142 def test_immediate_success(self):
143 attempts = []
144
145 @retry_ssh
146 def succeed():
147 attempts.append('success')
148 return 'success'
149
150 self.assertEqual(succeed(), 'success')
151 self.assertEqual(len(attempts), 1)
152
153 def test_no_retry_non_ssh(self):
154 attempts = []
155
156 @retry_ssh
157 def fail_repeatedly():
158 attempts.append('attempt')
159 raise subprocess.CalledProcessError(254, [])
160
161 with self.assertRaises(subprocess.CalledProcessError):
162 fail_repeatedly()
163 self.assertEqual(len(attempts), 1)
164
165 def test_nearly_fail(self):
166 attempts = []
167
168 @retry_ssh
169 def nearly_fail():
170 attempts.append('attempt')
171 if len(attempts) == 3:
172 return 'success'
173 raise subprocess.CalledProcessError(255, [])
174
175 self.assertEqual(nearly_fail(), 'success')
176 self.assertEqual(len(attempts), 3)
177
178
120class TestSSHConnection(TestCase):179class TestSSHConnection(TestCase):
121180
122 def setUp(self):181 def setUp(self):
@@ -138,12 +197,12 @@
138 '-o', 'UserKnownHostsFile=/dev/null',197 '-o', 'UserKnownHostsFile=/dev/null',
139 '-i', 'private-key'])198 '-i', 'private-key'])
140199
141 def test_run_python_error(self):200 def test_call_with_input_error(self):
142 connection = SSHConnection('host', 'key')201 connection = SSHConnection('host', 'key')
143 with patch('subprocess.Popen') as po_mock:202 with patch('subprocess.Popen') as po_mock:
144 po_mock.return_value.wait.return_value = 27203 po_mock.return_value.wait.return_value = 27
145 with self.assertRaises(subprocess.CalledProcessError):204 with self.assertRaises(subprocess.CalledProcessError):
146 connection.run_python('')205 connection.call_with_input([], '')
147206
148207
149class TestSSHPrimitives(TestPrimitives, TestCase):208class TestSSHPrimitives(TestPrimitives, TestCase):
@@ -311,7 +370,10 @@
311370
312 def test_minimal(self):371 def test_minimal(self):
313 with self.config_file() as config_file:372 with self.config_file() as config_file:
314 primitives = self.run_primitives([config_file.name, 'bar'])373 with patch('logging.root.handlers', []):
374 primitives = self.run_primitives([config_file.name, 'bar'])
375 self.assertEqual(logging.getLogger().getEffectiveLevel(),
376 logging.WARNING)
315 self.assertEqual(primitives.run_calls, [['run', 'this']])377 self.assertEqual(primitives.run_calls, [['run', 'this']])
316 self.assertFalse(os.path.exists(primitives.workspace))378 self.assertFalse(os.path.exists(primitives.workspace))
317 self.assertEqual(primitives.walk_results,379 self.assertEqual(primitives.walk_results,
@@ -341,3 +403,10 @@
341 (primitives.workspace, ['bin-dir'], []),403 (primitives.workspace, ['bin-dir'], []),
342 (bin_path, [], [install_base]),404 (bin_path, [], [install_base]),
343 ])405 ])
406
407 def test_verbose(self):
408 with self.config_file() as config_file:
409 with patch('logging.root.handlers', []):
410 self.run_primitives([config_file.name, 'bar', '-v'])
411 self.assertEqual(logging.getLogger().getEffectiveLevel(),
412 logging.INFO)

Subscribers

People subscribed via source and target branches