Merge lp:~frankban/python-shelltoolbox/helpers into lp:python-shelltoolbox
- helpers
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 14 |
Proposed branch: | lp:~frankban/python-shelltoolbox/helpers |
Merge into: | lp:python-shelltoolbox |
Diff against target: |
1062 lines (+802/-147) 3 files modified
setup.py (+1/-1) shelltoolbox/__init__.py (+430/-114) tests.py (+371/-32) |
To merge this branch: | bzr merge lp:~frankban/python-shelltoolbox/helpers |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gary Poster (community) | Approve | ||
Review via email: mp+96180@code.launchpad.net |
Commit message
Description of the change
== Changes ==
- Added several helper functions:
- bzr_whois
- file_append
- file_prepend
- generate_ssh_keys
- get_su_command
- join_command
- mkdirs
- ssh
- user_exists
- Added `environ` context manager and updated `su` to use it.
- Changed `apt_get_install` helper: now the function uses environ to run dpkg
in non-interactive mode.
- Changed `get_user_home` to handle non-existent users
(returning a default home directory)
- Updated the `install_
placeholders.
- Updated `cd` context manager: yield in a try/finally block.
- Fixed `grep` helper.
== Tests ==
python tests.py
.......
-------
Ran 53 tests in 0.124s
OK
Francesco Banconi (frankban) wrote : | # |
> I was struck that file_append really expects the line to be added to end with
> a \n. I suppose we could enforce that one way or another...or not. I'm OK
> with leaving as is, or you could add a \n if the line does not already end
> with one (and do this before you check if the line is in the content). On a
> somewhat related note, it might be better to check if the line to be added is
> in the output of readlines(), not read(); that would verify that the line, not
> a line that ends with the line, is in the file. For example, right now a file
> of "abc\ndef\n" would incorrectly show that the "ef\n" line was in the file.
I agree with your suggestions. I've changed the function and removed those
ambiguities. I've added tests for line not ending with '\n' and for line
fragments.
> In file_prepend, I am tempted to suggest that this snippet should be changed:
> if line in lines:
> lines.remove(line)
> This iterates through the lines twice, which is mildly annoying to me but
> almost certainly not a practical concern for us with these use cases. That
> said, I'd be tempted to change that to the following:
> try:
> lines.remove(line)
> except ValueError:
> pass
> You could also verify that the line to be added in this function ended with a
> \n if you wanted to.
Done.
> I'm a bit concerned about what you are doing here in get_value_
> return line.split(
> would maybe json be a safer approach? No, I tried it, and json.loads('"hi"')
> is fine but json.loads("'hi'") is not. :-/ I dunno. When do we use this?
> The logic seems a bit idiosyncratic.
I don't remember the origin of that function. Since it is not used in the
charms or in setuplxc/lpsetup, I am going to remove it.
> The grep command also seems a bit idiosyncratic. Why is it reasonable to
> always strip the result? I also would be tempted to rename the funtion: grep
> does a lot more than that. simply search_file? Hm, I see that you just
> reordered this file, you didn't actually add grep.... Never mind then, I
> guess. I should have commented on this sooner, but you shouldn't have to
> worry about it. If you *do* want to change it, please leave the "grep" alias
> around for now so that our charms do not suddenly break.
Actually that helper is not used, and I remember it was before we changed the
way the buildslave is reconfigured. I think it's safe to rename it and change
the returned value.
> I don't understand why you removed the Serializer. Was it defined twice?
Yes it was.
Thanks Gary.
- 33. By Francesco Banconi
-
Fixed file_append.
- 34. By Francesco Banconi
-
Fixed file_prepend.
- 35. By Francesco Banconi
-
Removed get_value_
from_line. - 36. By Francesco Banconi
-
s/grep/search_file/
Preview Diff
1 | === modified file 'setup.py' | |||
2 | --- setup.py 2012-03-01 20:49:48 +0000 | |||
3 | +++ setup.py 2012-03-07 10:19:06 +0000 | |||
4 | @@ -9,7 +9,7 @@ | |||
5 | 9 | ez_setup.use_setuptools() | 9 | ez_setup.use_setuptools() |
6 | 10 | 10 | ||
7 | 11 | 11 | ||
9 | 12 | __version__ = '0.1.1' | 12 | __version__ = '0.2.0' |
10 | 13 | 13 | ||
11 | 14 | from setuptools import setup | 14 | from setuptools import setup |
12 | 15 | 15 | ||
13 | 16 | 16 | ||
14 | === modified file 'shelltoolbox/__init__.py' | |||
15 | --- shelltoolbox/__init__.py 2012-03-02 14:59:46 +0000 | |||
16 | +++ shelltoolbox/__init__.py 2012-03-07 10:19:06 +0000 | |||
17 | @@ -19,25 +19,38 @@ | |||
18 | 19 | __metaclass__ = type | 19 | __metaclass__ = type |
19 | 20 | __all__ = [ | 20 | __all__ = [ |
20 | 21 | 'apt_get_install', | 21 | 'apt_get_install', |
21 | 22 | 'bzr_whois', | ||
22 | 22 | 'cd', | 23 | 'cd', |
23 | 23 | 'command', | 24 | 'command', |
24 | 24 | 'DictDiffer', | 25 | 'DictDiffer', |
25 | 26 | 'environ', | ||
26 | 27 | 'file_append', | ||
27 | 28 | 'file_prepend', | ||
28 | 29 | 'generate_ssh_keys', | ||
29 | 30 | 'get_su_command', | ||
30 | 31 | 'get_user_home', | ||
31 | 25 | 'get_user_ids', | 32 | 'get_user_ids', |
32 | 26 | 'get_user_home', | ||
33 | 27 | 'get_value_from_line', | ||
34 | 28 | 'grep', | ||
35 | 29 | 'install_extra_repositories', | 33 | 'install_extra_repositories', |
36 | 34 | 'join_command', | ||
37 | 35 | 'mkdirs', | ||
38 | 30 | 'run', | 36 | 'run', |
39 | 31 | 'Serializer', | 37 | 'Serializer', |
40 | 32 | 'script_name', | 38 | 'script_name', |
41 | 39 | 'search_file', | ||
42 | 40 | 'ssh', | ||
43 | 33 | 'su', | 41 | 'su', |
44 | 42 | 'user_exists', | ||
45 | 43 | 'wait_for_page_contents', | ||
46 | 34 | ] | 44 | ] |
47 | 35 | 45 | ||
48 | 36 | from collections import namedtuple | 46 | from collections import namedtuple |
49 | 37 | from contextlib import contextmanager | 47 | from contextlib import contextmanager |
50 | 48 | from email.Utils import parseaddr | ||
51 | 49 | import errno | ||
52 | 38 | import json | 50 | import json |
53 | 39 | import operator | 51 | import operator |
54 | 40 | import os | 52 | import os |
55 | 53 | import pipes | ||
56 | 41 | import pwd | 54 | import pwd |
57 | 42 | import re | 55 | import re |
58 | 43 | import subprocess | 56 | import subprocess |
59 | @@ -50,26 +63,50 @@ | |||
60 | 50 | Env = namedtuple('Env', 'uid gid home') | 63 | Env = namedtuple('Env', 'uid gid home') |
61 | 51 | 64 | ||
62 | 52 | 65 | ||
83 | 53 | def run(*args, **kwargs): | 66 | def apt_get_install(*args, **kwargs): |
84 | 54 | """Run the command with the given arguments. | 67 | """Install given packages using apt. |
85 | 55 | 68 | ||
86 | 56 | The first argument is the path to the command to run. | 69 | It is possible to pass environment variables to be set during install |
87 | 57 | Subsequent arguments are command-line arguments to be passed. | 70 | using keyword arguments. |
88 | 58 | 71 | ||
89 | 59 | This function accepts all optional keyword arguments accepted by | 72 | :raises: subprocess.CalledProcessError |
90 | 60 | `subprocess.Popen`. | 73 | """ |
91 | 61 | """ | 74 | debian_frontend = kwargs.pop('DEBIAN_FRONTEND', 'noninteractive') |
92 | 62 | args = [i for i in args if i is not None] | 75 | with environ(DEBIAN_FRONTEND=debian_frontend, **kwargs): |
93 | 63 | pipe = subprocess.PIPE | 76 | cmd = ('apt-get', '-y', 'install') + args |
94 | 64 | process = subprocess.Popen( | 77 | return run(*cmd) |
95 | 65 | args, stdout=kwargs.pop('stdout', pipe), | 78 | |
96 | 66 | stderr=kwargs.pop('stderr', pipe), | 79 | |
97 | 67 | close_fds=kwargs.pop('close_fds', True), **kwargs) | 80 | def bzr_whois(user): |
98 | 68 | stdout, stderr = process.communicate() | 81 | """Return full name and email of bzr `user`. |
99 | 69 | if process.returncode: | 82 | |
100 | 70 | raise subprocess.CalledProcessError( | 83 | Return None if the given `user` does not have a bzr user id. |
101 | 71 | process.returncode, repr(args), output=stdout+stderr) | 84 | """ |
102 | 72 | return stdout | 85 | with su(user): |
103 | 86 | try: | ||
104 | 87 | whoami = run('bzr', 'whoami') | ||
105 | 88 | except (subprocess.CalledProcessError, OSError): | ||
106 | 89 | return None | ||
107 | 90 | return parseaddr(whoami) | ||
108 | 91 | |||
109 | 92 | |||
110 | 93 | @contextmanager | ||
111 | 94 | def cd(directory): | ||
112 | 95 | """A context manager to temporarily change current working dir, e.g.:: | ||
113 | 96 | |||
114 | 97 | >>> import os | ||
115 | 98 | >>> os.chdir('/tmp') | ||
116 | 99 | >>> with cd('/bin'): print os.getcwd() | ||
117 | 100 | /bin | ||
118 | 101 | >>> print os.getcwd() | ||
119 | 102 | /tmp | ||
120 | 103 | """ | ||
121 | 104 | cwd = os.getcwd() | ||
122 | 105 | os.chdir(directory) | ||
123 | 106 | try: | ||
124 | 107 | yield | ||
125 | 108 | finally: | ||
126 | 109 | os.chdir(cwd) | ||
127 | 73 | 110 | ||
128 | 74 | 111 | ||
129 | 75 | def command(*base_args): | 112 | def command(*base_args): |
130 | @@ -97,37 +134,393 @@ | |||
131 | 97 | return callable_command | 134 | return callable_command |
132 | 98 | 135 | ||
133 | 99 | 136 | ||
135 | 100 | apt_get_install = command('apt-get', 'install', '-y', '--force-yes') | 137 | @contextmanager |
136 | 138 | def environ(**kwargs): | ||
137 | 139 | """A context manager to temporarily change environment variables. | ||
138 | 140 | |||
139 | 141 | If an existing environment variable is changed, it is restored during | ||
140 | 142 | context cleanup:: | ||
141 | 143 | |||
142 | 144 | >>> import os | ||
143 | 145 | >>> os.environ['MY_VARIABLE'] = 'foo' | ||
144 | 146 | >>> with environ(MY_VARIABLE='bar'): print os.getenv('MY_VARIABLE') | ||
145 | 147 | bar | ||
146 | 148 | >>> print os.getenv('MY_VARIABLE') | ||
147 | 149 | foo | ||
148 | 150 | >>> del os.environ['MY_VARIABLE'] | ||
149 | 151 | |||
150 | 152 | If we are adding environment variables, they are removed during context | ||
151 | 153 | cleanup:: | ||
152 | 154 | |||
153 | 155 | >>> import os | ||
154 | 156 | >>> with environ(MY_VAR1='foo', MY_VAR2='bar'): | ||
155 | 157 | ... print os.getenv('MY_VAR1'), os.getenv('MY_VAR2') | ||
156 | 158 | foo bar | ||
157 | 159 | >>> os.getenv('MY_VAR1') == os.getenv('MY_VAR2') == None | ||
158 | 160 | True | ||
159 | 161 | """ | ||
160 | 162 | backup = {} | ||
161 | 163 | for key, value in kwargs.items(): | ||
162 | 164 | backup[key] = os.getenv(key) | ||
163 | 165 | os.environ[key] = value | ||
164 | 166 | try: | ||
165 | 167 | yield | ||
166 | 168 | finally: | ||
167 | 169 | for key, value in backup.items(): | ||
168 | 170 | if value is None: | ||
169 | 171 | del os.environ[key] | ||
170 | 172 | else: | ||
171 | 173 | os.environ[key] = value | ||
172 | 174 | |||
173 | 175 | |||
174 | 176 | def file_append(filename, line): | ||
175 | 177 | r"""Append given `line`, if not present, at the end of `filename`. | ||
176 | 178 | |||
177 | 179 | Usage example:: | ||
178 | 180 | |||
179 | 181 | >>> import tempfile | ||
180 | 182 | >>> f = tempfile.NamedTemporaryFile('w', delete=False) | ||
181 | 183 | >>> f.write('line1\n') | ||
182 | 184 | >>> f.close() | ||
183 | 185 | >>> file_append(f.name, 'new line\n') | ||
184 | 186 | >>> open(f.name).read() | ||
185 | 187 | 'line1\nnew line\n' | ||
186 | 188 | |||
187 | 189 | Nothing happens if the file already contains the given `line`:: | ||
188 | 190 | |||
189 | 191 | >>> file_append(f.name, 'new line\n') | ||
190 | 192 | >>> open(f.name).read() | ||
191 | 193 | 'line1\nnew line\n' | ||
192 | 194 | |||
193 | 195 | A new line is automatically added before the given `line` if it is not | ||
194 | 196 | present at the end of current file content:: | ||
195 | 197 | |||
196 | 198 | >>> import tempfile | ||
197 | 199 | >>> f = tempfile.NamedTemporaryFile('w', delete=False) | ||
198 | 200 | >>> f.write('line1') | ||
199 | 201 | >>> f.close() | ||
200 | 202 | >>> file_append(f.name, 'new line\n') | ||
201 | 203 | >>> open(f.name).read() | ||
202 | 204 | 'line1\nnew line\n' | ||
203 | 205 | |||
204 | 206 | The file is created if it does not exist:: | ||
205 | 207 | |||
206 | 208 | >>> import tempfile | ||
207 | 209 | >>> filename = tempfile.mktemp() | ||
208 | 210 | >>> file_append(filename, 'line1\n') | ||
209 | 211 | >>> open(filename).read() | ||
210 | 212 | 'line1\n' | ||
211 | 213 | """ | ||
212 | 214 | if not line.endswith('\n'): | ||
213 | 215 | line += '\n' | ||
214 | 216 | with open(filename, 'a+') as f: | ||
215 | 217 | lines = f.readlines() | ||
216 | 218 | if line not in lines: | ||
217 | 219 | if not lines or lines[-1].endswith('\n'): | ||
218 | 220 | f.write(line) | ||
219 | 221 | else: | ||
220 | 222 | f.write('\n' + line) | ||
221 | 223 | |||
222 | 224 | |||
223 | 225 | def file_prepend(filename, line): | ||
224 | 226 | r"""Insert given `line`, if not present, at the beginning of `filename`. | ||
225 | 227 | |||
226 | 228 | Usage example:: | ||
227 | 229 | |||
228 | 230 | >>> import tempfile | ||
229 | 231 | >>> f = tempfile.NamedTemporaryFile('w', delete=False) | ||
230 | 232 | >>> f.write('line1\n') | ||
231 | 233 | >>> f.close() | ||
232 | 234 | >>> file_prepend(f.name, 'line0\n') | ||
233 | 235 | >>> open(f.name).read() | ||
234 | 236 | 'line0\nline1\n' | ||
235 | 237 | |||
236 | 238 | If the file starts with the given `line`, nothing happens:: | ||
237 | 239 | |||
238 | 240 | >>> file_prepend(f.name, 'line0\n') | ||
239 | 241 | >>> open(f.name).read() | ||
240 | 242 | 'line0\nline1\n' | ||
241 | 243 | |||
242 | 244 | If the file contains the given `line`, but not at the beginning, | ||
243 | 245 | the line is moved on top:: | ||
244 | 246 | |||
245 | 247 | >>> file_prepend(f.name, 'line1\n') | ||
246 | 248 | >>> open(f.name).read() | ||
247 | 249 | 'line1\nline0\n' | ||
248 | 250 | """ | ||
249 | 251 | if not line.endswith('\n'): | ||
250 | 252 | line += '\n' | ||
251 | 253 | with open(filename, 'r+') as f: | ||
252 | 254 | lines = f.readlines() | ||
253 | 255 | if lines[0] != line: | ||
254 | 256 | try: | ||
255 | 257 | lines.remove(line) | ||
256 | 258 | except ValueError: | ||
257 | 259 | pass | ||
258 | 260 | lines.insert(0, line) | ||
259 | 261 | f.seek(0) | ||
260 | 262 | f.writelines(lines) | ||
261 | 263 | |||
262 | 264 | |||
263 | 265 | def generate_ssh_keys(path, passphrase=''): | ||
264 | 266 | """Generate ssh key pair, saving them inside the given `directory`. | ||
265 | 267 | |||
266 | 268 | >>> generate_ssh_keys('/tmp/id_rsa') | ||
267 | 269 | 0 | ||
268 | 270 | >>> open('/tmp/id_rsa').readlines()[0].strip() | ||
269 | 271 | '-----BEGIN RSA PRIVATE KEY-----' | ||
270 | 272 | >>> open('/tmp/id_rsa.pub').read().startswith('ssh-rsa') | ||
271 | 273 | True | ||
272 | 274 | >>> os.remove('/tmp/id_rsa') | ||
273 | 275 | >>> os.remove('/tmp/id_rsa.pub') | ||
274 | 276 | """ | ||
275 | 277 | return subprocess.call([ | ||
276 | 278 | 'ssh-keygen', '-q', '-t', 'rsa', '-N', passphrase, '-f', path]) | ||
277 | 279 | |||
278 | 280 | |||
279 | 281 | def get_su_command(user, args): | ||
280 | 282 | """Return a command line as a sequence, prepending "su" if necessary. | ||
281 | 283 | |||
282 | 284 | This can be used together with `run` when the `su` context manager is not | ||
283 | 285 | enough (e.g. an external program uses uid rather than euid). | ||
284 | 286 | |||
285 | 287 | run(*get_su_command(user, ['bzr', 'whoami'])) | ||
286 | 288 | |||
287 | 289 | If the su is requested as current user, the arguments are returned as | ||
288 | 290 | given:: | ||
289 | 291 | |||
290 | 292 | >>> import getpass | ||
291 | 293 | >>> current_user = getpass.getuser() | ||
292 | 294 | |||
293 | 295 | >>> get_su_command(current_user, ('ls', '-l')) | ||
294 | 296 | ('ls', '-l') | ||
295 | 297 | |||
296 | 298 | Otherwise, "su" is prepended:: | ||
297 | 299 | |||
298 | 300 | >>> get_su_command('nobody', ('ls', '-l', 'my file')) | ||
299 | 301 | ('su', 'nobody', '-c', "ls -l 'my file'") | ||
300 | 302 | """ | ||
301 | 303 | if get_user_ids(user)[0] != os.getuid(): | ||
302 | 304 | args = [i for i in args if i is not None] | ||
303 | 305 | return ('su', user, '-c', join_command(args)) | ||
304 | 306 | return args | ||
305 | 307 | |||
306 | 308 | |||
307 | 309 | def get_user_home(user): | ||
308 | 310 | """Return the home directory of the given `user`. | ||
309 | 311 | |||
310 | 312 | >>> get_user_home('root') | ||
311 | 313 | '/root' | ||
312 | 314 | |||
313 | 315 | If the user does not exist, return a default /home/[username] home:: | ||
314 | 316 | |||
315 | 317 | >>> get_user_home('_this_user_does_not_exist_') | ||
316 | 318 | '/home/_this_user_does_not_exist_' | ||
317 | 319 | """ | ||
318 | 320 | try: | ||
319 | 321 | return pwd.getpwnam(user).pw_dir | ||
320 | 322 | except KeyError: | ||
321 | 323 | return os.path.join(os.path.sep, 'home', user) | ||
322 | 324 | |||
323 | 325 | |||
324 | 326 | def get_user_ids(user): | ||
325 | 327 | """Return the uid and gid of given `user`, e.g.:: | ||
326 | 328 | |||
327 | 329 | >>> get_user_ids('root') | ||
328 | 330 | (0, 0) | ||
329 | 331 | """ | ||
330 | 332 | userdata = pwd.getpwnam(user) | ||
331 | 333 | return userdata.pw_uid, userdata.pw_gid | ||
332 | 101 | 334 | ||
333 | 102 | 335 | ||
334 | 103 | def install_extra_repositories(*repositories): | 336 | def install_extra_repositories(*repositories): |
335 | 104 | """Install all of the extra repositories and update apt. | 337 | """Install all of the extra repositories and update apt. |
336 | 105 | 338 | ||
337 | 339 | Given repositories can contain a "{distribution}" placeholder, that will | ||
338 | 340 | be replaced by current distribution codename. | ||
339 | 341 | |||
340 | 106 | :raises: subprocess.CalledProcessError | 342 | :raises: subprocess.CalledProcessError |
341 | 107 | """ | 343 | """ |
342 | 108 | distribution = run('lsb_release', '-cs').strip() | 344 | distribution = run('lsb_release', '-cs').strip() |
344 | 109 | # Starting from Oneiric, the `apt-add-repository` is interactive by | 345 | # Starting from Oneiric, `apt-add-repository` is interactive by |
345 | 110 | # default, and requires a "-y" flag to be set. | 346 | # default, and requires a "-y" flag to be set. |
346 | 111 | assume_yes = None if distribution == 'lucid' else '-y' | 347 | assume_yes = None if distribution == 'lucid' else '-y' |
347 | 112 | for repo in repositories: | 348 | for repo in repositories: |
349 | 113 | run('apt-add-repository', assume_yes, repo) | 349 | repository = repo.format(distribution=distribution) |
350 | 350 | run('apt-add-repository', assume_yes, repository) | ||
351 | 114 | run('apt-get', 'clean') | 351 | run('apt-get', 'clean') |
352 | 115 | run('apt-get', 'update') | 352 | run('apt-get', 'update') |
353 | 116 | 353 | ||
354 | 117 | 354 | ||
356 | 118 | def grep(content, filename): | 355 | def join_command(args): |
357 | 356 | """Return a valid Unix command line from `args`. | ||
358 | 357 | |||
359 | 358 | >>> join_command(['ls', '-l']) | ||
360 | 359 | 'ls -l' | ||
361 | 360 | |||
362 | 361 | Arguments containing spaces and empty args are correctly quoted:: | ||
363 | 362 | |||
364 | 363 | >>> join_command(['command', 'arg1', 'arg containing spaces', '']) | ||
365 | 364 | "command arg1 'arg containing spaces' ''" | ||
366 | 365 | """ | ||
367 | 366 | return ' '.join(pipes.quote(arg) for arg in args) | ||
368 | 367 | |||
369 | 368 | |||
370 | 369 | def mkdirs(*args): | ||
371 | 370 | """Create leaf directories (given as `args`) and all intermediate ones. | ||
372 | 371 | |||
373 | 372 | >>> import tempfile | ||
374 | 373 | >>> base_dir = tempfile.mktemp(suffix='/') | ||
375 | 374 | >>> dir1 = tempfile.mktemp(prefix=base_dir) | ||
376 | 375 | >>> dir2 = tempfile.mktemp(prefix=base_dir) | ||
377 | 376 | >>> mkdirs(dir1, dir2) | ||
378 | 377 | >>> os.path.isdir(dir1) | ||
379 | 378 | True | ||
380 | 379 | >>> os.path.isdir(dir2) | ||
381 | 380 | True | ||
382 | 381 | |||
383 | 382 | If the leaf directory already exists the function returns without errors:: | ||
384 | 383 | |||
385 | 384 | >>> mkdirs(dir1) | ||
386 | 385 | |||
387 | 386 | An `OSError` is raised if the leaf path exists and it is a file:: | ||
388 | 387 | |||
389 | 388 | >>> f = tempfile.NamedTemporaryFile( | ||
390 | 389 | ... 'w', delete=False, prefix=base_dir) | ||
391 | 390 | >>> f.close() | ||
392 | 391 | >>> mkdirs(f.name) # doctest: +ELLIPSIS | ||
393 | 392 | Traceback (most recent call last): | ||
394 | 393 | OSError: ... | ||
395 | 394 | """ | ||
396 | 395 | for directory in args: | ||
397 | 396 | try: | ||
398 | 397 | os.makedirs(directory) | ||
399 | 398 | except OSError as err: | ||
400 | 399 | if err.errno != errno.EEXIST or os.path.isfile(directory): | ||
401 | 400 | raise | ||
402 | 401 | |||
403 | 402 | |||
404 | 403 | def run(*args, **kwargs): | ||
405 | 404 | """Run the command with the given arguments. | ||
406 | 405 | |||
407 | 406 | The first argument is the path to the command to run. | ||
408 | 407 | Subsequent arguments are command-line arguments to be passed. | ||
409 | 408 | |||
410 | 409 | This function accepts all optional keyword arguments accepted by | ||
411 | 410 | `subprocess.Popen`. | ||
412 | 411 | """ | ||
413 | 412 | args = [i for i in args if i is not None] | ||
414 | 413 | pipe = subprocess.PIPE | ||
415 | 414 | process = subprocess.Popen( | ||
416 | 415 | args, stdout=kwargs.pop('stdout', pipe), | ||
417 | 416 | stderr=kwargs.pop('stderr', pipe), | ||
418 | 417 | close_fds=kwargs.pop('close_fds', True), **kwargs) | ||
419 | 418 | stdout, stderr = process.communicate() | ||
420 | 419 | if process.returncode: | ||
421 | 420 | raise subprocess.CalledProcessError( | ||
422 | 421 | process.returncode, repr(args), output=stdout+stderr) | ||
423 | 422 | return stdout | ||
424 | 423 | |||
425 | 424 | |||
426 | 425 | def script_name(): | ||
427 | 426 | """Return the name of this script.""" | ||
428 | 427 | return os.path.basename(sys.argv[0]) | ||
429 | 428 | |||
430 | 429 | |||
431 | 430 | def search_file(regexp, filename): | ||
432 | 431 | """Return the first line in `filename` that matches `regexp`.""" | ||
433 | 119 | with open(filename) as f: | 432 | with open(filename) as f: |
434 | 120 | for line in f: | 433 | for line in f: |
445 | 121 | if re.match(content, line): | 434 | if re.search(regexp, line): |
446 | 122 | return line.strip() | 435 | return line |
447 | 123 | 436 | ||
448 | 124 | 437 | ||
449 | 125 | def get_value_from_line(line): | 438 | def ssh(location, user=None, key=None, caller=subprocess.call): |
450 | 126 | return line.split('=')[1].strip('"\' ') | 439 | """Return a callable that can be used to run ssh shell commands. |
451 | 127 | 440 | ||
452 | 128 | 441 | The ssh `location` and, optionally, `user` must be given. | |
453 | 129 | def script_name(): | 442 | If the user is None then the current user is used for the connection. |
454 | 130 | return os.path.basename(sys.argv[0]) | 443 | |
455 | 444 | The callable internally uses the given `caller`:: | ||
456 | 445 | |||
457 | 446 | >>> def caller(cmd): | ||
458 | 447 | ... print tuple(cmd) | ||
459 | 448 | >>> sshcall = ssh('example.com', 'myuser', caller=caller) | ||
460 | 449 | >>> root_sshcall = ssh('example.com', caller=caller) | ||
461 | 450 | >>> sshcall('ls -l') # doctest: +ELLIPSIS | ||
462 | 451 | ('ssh', '-t', ..., 'myuser@example.com', '--', 'ls -l') | ||
463 | 452 | >>> root_sshcall('ls -l') # doctest: +ELLIPSIS | ||
464 | 453 | ('ssh', '-t', ..., 'example.com', '--', 'ls -l') | ||
465 | 454 | |||
466 | 455 | The ssh key path can be optionally provided:: | ||
467 | 456 | |||
468 | 457 | >>> root_sshcall = ssh('example.com', key='/tmp/foo', caller=caller) | ||
469 | 458 | >>> root_sshcall('ls -l') # doctest: +ELLIPSIS | ||
470 | 459 | ('ssh', '-t', ..., '-i', '/tmp/foo', 'example.com', '--', 'ls -l') | ||
471 | 460 | |||
472 | 461 | If the ssh command exits with an error code, | ||
473 | 462 | a `subprocess.CalledProcessError` is raised:: | ||
474 | 463 | |||
475 | 464 | >>> ssh('loc', caller=lambda cmd: 1)('ls -l') # doctest: +ELLIPSIS | ||
476 | 465 | Traceback (most recent call last): | ||
477 | 466 | CalledProcessError: ... | ||
478 | 467 | |||
479 | 468 | If ignore_errors is set to True when executing the command, no error | ||
480 | 469 | will be raised, even if the command itself returns an error code. | ||
481 | 470 | |||
482 | 471 | >>> sshcall = ssh('loc', caller=lambda cmd: 1) | ||
483 | 472 | >>> sshcall('ls -l', ignore_errors=True) | ||
484 | 473 | """ | ||
485 | 474 | sshcmd = [ | ||
486 | 475 | 'ssh', | ||
487 | 476 | '-t', | ||
488 | 477 | '-t', # Yes, this second -t is deliberate. See `man ssh`. | ||
489 | 478 | '-o', 'StrictHostKeyChecking=no', | ||
490 | 479 | '-o', 'UserKnownHostsFile=/dev/null', | ||
491 | 480 | ] | ||
492 | 481 | if key is not None: | ||
493 | 482 | sshcmd.extend(['-i', key]) | ||
494 | 483 | if user is not None: | ||
495 | 484 | location = '{}@{}'.format(user, location) | ||
496 | 485 | sshcmd.extend([location, '--']) | ||
497 | 486 | |||
498 | 487 | def _sshcall(cmd, ignore_errors=False): | ||
499 | 488 | command = sshcmd + [cmd] | ||
500 | 489 | retcode = caller(command) | ||
501 | 490 | if retcode and not ignore_errors: | ||
502 | 491 | raise subprocess.CalledProcessError(retcode, ' '.join(command)) | ||
503 | 492 | |||
504 | 493 | return _sshcall | ||
505 | 494 | |||
506 | 495 | |||
507 | 496 | @contextmanager | ||
508 | 497 | def su(user): | ||
509 | 498 | """A context manager to temporarily run the script as a different user.""" | ||
510 | 499 | uid, gid = get_user_ids(user) | ||
511 | 500 | os.setegid(gid) | ||
512 | 501 | os.seteuid(uid) | ||
513 | 502 | home = get_user_home(user) | ||
514 | 503 | with environ(HOME=home): | ||
515 | 504 | try: | ||
516 | 505 | yield Env(uid, gid, home) | ||
517 | 506 | finally: | ||
518 | 507 | os.setegid(os.getgid()) | ||
519 | 508 | os.seteuid(os.getuid()) | ||
520 | 509 | |||
521 | 510 | |||
522 | 511 | def user_exists(username): | ||
523 | 512 | """Return True if given `username` exists, e.g.:: | ||
524 | 513 | |||
525 | 514 | >>> user_exists('root') | ||
526 | 515 | True | ||
527 | 516 | >>> user_exists('_this_user_does_not_exist_') | ||
528 | 517 | False | ||
529 | 518 | """ | ||
530 | 519 | try: | ||
531 | 520 | pwd.getpwnam(username) | ||
532 | 521 | except KeyError: | ||
533 | 522 | return False | ||
534 | 523 | return True | ||
535 | 131 | 524 | ||
536 | 132 | 525 | ||
537 | 133 | def wait_for_page_contents(url, contents, timeout=120, validate=None): | 526 | def wait_for_page_contents(url, contents, timeout=120, validate=None): |
538 | @@ -148,83 +541,6 @@ | |||
539 | 148 | time.sleep(0.1) | 541 | time.sleep(0.1) |
540 | 149 | 542 | ||
541 | 150 | 543 | ||
542 | 151 | class Serializer: | ||
543 | 152 | """Handle JSON (de)serialization.""" | ||
544 | 153 | |||
545 | 154 | def __init__(self, path, default=None, serialize=None, deserialize=None): | ||
546 | 155 | self.path = path | ||
547 | 156 | self.default = default or {} | ||
548 | 157 | self.serialize = serialize or json.dump | ||
549 | 158 | self.deserialize = deserialize or json.load | ||
550 | 159 | |||
551 | 160 | def exists(self): | ||
552 | 161 | return os.path.exists(self.path) | ||
553 | 162 | |||
554 | 163 | def get(self): | ||
555 | 164 | if self.exists(): | ||
556 | 165 | with open(self.path) as f: | ||
557 | 166 | return self.deserialize(f) | ||
558 | 167 | return self.default | ||
559 | 168 | |||
560 | 169 | def set(self, data): | ||
561 | 170 | with open(self.path, 'w') as f: | ||
562 | 171 | self.serialize(data, f) | ||
563 | 172 | |||
564 | 173 | |||
565 | 174 | def get_user_ids(user): | ||
566 | 175 | """Return the uid and gid of given `user`, e.g.:: | ||
567 | 176 | |||
568 | 177 | >>> get_user_ids('root') | ||
569 | 178 | (0, 0) | ||
570 | 179 | """ | ||
571 | 180 | userdata = pwd.getpwnam(user) | ||
572 | 181 | return userdata.pw_uid, userdata.pw_gid | ||
573 | 182 | |||
574 | 183 | |||
575 | 184 | def get_user_home(user): | ||
576 | 185 | """Return the home directory of the given `user`. | ||
577 | 186 | |||
578 | 187 | >>> get_user_home('root') | ||
579 | 188 | '/root' | ||
580 | 189 | """ | ||
581 | 190 | return pwd.getpwnam(user).pw_dir | ||
582 | 191 | |||
583 | 192 | |||
584 | 193 | @contextmanager | ||
585 | 194 | def cd(directory): | ||
586 | 195 | """A context manager to temporary change current working dir, e.g.:: | ||
587 | 196 | |||
588 | 197 | >>> import os | ||
589 | 198 | >>> os.chdir('/tmp') | ||
590 | 199 | >>> with cd('/bin'): print os.getcwd() | ||
591 | 200 | /bin | ||
592 | 201 | >>> os.getcwd() | ||
593 | 202 | '/tmp' | ||
594 | 203 | """ | ||
595 | 204 | cwd = os.getcwd() | ||
596 | 205 | os.chdir(directory) | ||
597 | 206 | yield | ||
598 | 207 | os.chdir(cwd) | ||
599 | 208 | |||
600 | 209 | |||
601 | 210 | @contextmanager | ||
602 | 211 | def su(user): | ||
603 | 212 | """A context manager to temporary run the script as a different user.""" | ||
604 | 213 | uid, gid = get_user_ids(user) | ||
605 | 214 | os.setegid(gid) | ||
606 | 215 | os.seteuid(uid) | ||
607 | 216 | current_home = os.getenv('HOME') | ||
608 | 217 | home = get_user_home(user) | ||
609 | 218 | os.environ['HOME'] = home | ||
610 | 219 | try: | ||
611 | 220 | yield Env(uid, gid, home) | ||
612 | 221 | finally: | ||
613 | 222 | os.setegid(os.getgid()) | ||
614 | 223 | os.seteuid(os.getuid()) | ||
615 | 224 | if current_home is not None: | ||
616 | 225 | os.environ['HOME'] = current_home | ||
617 | 226 | |||
618 | 227 | |||
619 | 228 | class DictDiffer: | 544 | class DictDiffer: |
620 | 229 | """ | 545 | """ |
621 | 230 | Calculate the difference between two dictionaries as: | 546 | Calculate the difference between two dictionaries as: |
622 | 231 | 547 | ||
623 | === modified file 'tests.py' | |||
624 | --- tests.py 2012-03-02 13:58:36 +0000 | |||
625 | +++ tests.py 2012-03-07 10:19:06 +0000 | |||
626 | @@ -6,44 +6,42 @@ | |||
627 | 6 | __metaclass__ = type | 6 | __metaclass__ = type |
628 | 7 | 7 | ||
629 | 8 | 8 | ||
630 | 9 | import getpass | ||
631 | 9 | import os | 10 | import os |
632 | 10 | from subprocess import CalledProcessError | 11 | from subprocess import CalledProcessError |
633 | 12 | import tempfile | ||
634 | 11 | import unittest | 13 | import unittest |
635 | 12 | 14 | ||
636 | 13 | from shelltoolbox import ( | 15 | from shelltoolbox import ( |
637 | 14 | cd, | 16 | cd, |
638 | 15 | command, | 17 | command, |
639 | 16 | DictDiffer, | 18 | DictDiffer, |
640 | 19 | environ, | ||
641 | 20 | file_append, | ||
642 | 21 | file_prepend, | ||
643 | 22 | generate_ssh_keys, | ||
644 | 23 | get_su_command, | ||
645 | 24 | get_user_home, | ||
646 | 25 | get_user_ids, | ||
647 | 26 | join_command, | ||
648 | 27 | mkdirs, | ||
649 | 17 | run, | 28 | run, |
650 | 29 | search_file, | ||
651 | 30 | Serializer, | ||
652 | 31 | ssh, | ||
653 | 18 | su, | 32 | su, |
654 | 33 | user_exists, | ||
655 | 19 | ) | 34 | ) |
656 | 20 | 35 | ||
657 | 21 | 36 | ||
683 | 22 | class TestRun(unittest.TestCase): | 37 | class TestCdContextManager(unittest.TestCase): |
684 | 23 | 38 | ||
685 | 24 | def testSimpleCommand(self): | 39 | def test_cd(self): |
686 | 25 | # Running a simple command (ls) works and running the command | 40 | curdir = os.getcwd() |
687 | 26 | # produces a string. | 41 | self.assertNotEqual('/var', curdir) |
688 | 27 | self.assertIsInstance(run('/bin/ls'), str) | 42 | with cd('/var'): |
689 | 28 | 43 | self.assertEqual('/var', os.getcwd()) | |
690 | 29 | def testStdoutReturned(self): | 44 | self.assertEqual(curdir, os.getcwd()) |
666 | 30 | # Running a simple command (ls) works and running the command | ||
667 | 31 | # produces a string. | ||
668 | 32 | self.assertIn('Usage:', run('/bin/ls', '--help')) | ||
669 | 33 | |||
670 | 34 | def testCalledProcessErrorRaised(self): | ||
671 | 35 | # If an error occurs a CalledProcessError is raised with the return | ||
672 | 36 | # code, command executed, and the output of the command. | ||
673 | 37 | with self.assertRaises(CalledProcessError) as info: | ||
674 | 38 | run('ls', '--not a valid switch') | ||
675 | 39 | exception = info.exception | ||
676 | 40 | self.assertEqual(2, exception.returncode) | ||
677 | 41 | self.assertEqual("['ls', '--not a valid switch']", exception.cmd) | ||
678 | 42 | self.assertIn('unrecognized option', exception.output) | ||
679 | 43 | |||
680 | 44 | def testNoneArguments(self): | ||
681 | 45 | # Ensure None is ignored when passed as positional argument. | ||
682 | 46 | self.assertIn('Usage:', run('/bin/ls', None, '--help', None)) | ||
691 | 47 | 45 | ||
692 | 48 | 46 | ||
693 | 49 | class TestCommand(unittest.TestCase): | 47 | class TestCommand(unittest.TestCase): |
694 | @@ -112,6 +110,349 @@ | |||
695 | 112 | self.assertEquals(expected, diff.added_or_changed) | 110 | self.assertEquals(expected, diff.added_or_changed) |
696 | 113 | 111 | ||
697 | 114 | 112 | ||
698 | 113 | class TestEnviron(unittest.TestCase): | ||
699 | 114 | |||
700 | 115 | def test_existing(self): | ||
701 | 116 | # If an existing environment variable is changed, it is | ||
702 | 117 | # restored during context cleanup. | ||
703 | 118 | os.environ['MY_VARIABLE'] = 'foo' | ||
704 | 119 | with environ(MY_VARIABLE='bar'): | ||
705 | 120 | self.assertEqual('bar', os.getenv('MY_VARIABLE')) | ||
706 | 121 | self.assertEqual('foo', os.getenv('MY_VARIABLE')) | ||
707 | 122 | del os.environ['MY_VARIABLE'] | ||
708 | 123 | |||
709 | 124 | def test_new(self): | ||
710 | 125 | # If a new environment variable is added, it is removed during | ||
711 | 126 | # context cleanup. | ||
712 | 127 | with environ(MY_VAR1='foo', MY_VAR2='bar'): | ||
713 | 128 | self.assertEqual('foo', os.getenv('MY_VAR1')) | ||
714 | 129 | self.assertEqual('bar', os.getenv('MY_VAR2')) | ||
715 | 130 | self.assertIsNone(os.getenv('MY_VAR1')) | ||
716 | 131 | self.assertIsNone(os.getenv('MY_VAR2')) | ||
717 | 132 | |||
718 | 133 | |||
719 | 134 | class BaseCreateFile(object): | ||
720 | 135 | |||
721 | 136 | def create_file(self, content): | ||
722 | 137 | f = tempfile.NamedTemporaryFile('w', delete=False) | ||
723 | 138 | f.write(content) | ||
724 | 139 | f.close() | ||
725 | 140 | return f | ||
726 | 141 | |||
727 | 142 | |||
728 | 143 | class BaseTestFile(BaseCreateFile): | ||
729 | 144 | |||
730 | 145 | base_content = 'line1\n' | ||
731 | 146 | new_content = 'new line\n' | ||
732 | 147 | |||
733 | 148 | def check_file_content(self, content, filename): | ||
734 | 149 | self.assertEqual(content, open(filename).read()) | ||
735 | 150 | |||
736 | 151 | |||
737 | 152 | class TestFileAppend(unittest.TestCase, BaseTestFile): | ||
738 | 153 | |||
739 | 154 | def test_append(self): | ||
740 | 155 | # Ensure the new line is correctly added at the end of the file. | ||
741 | 156 | f = self.create_file(self.base_content) | ||
742 | 157 | file_append(f.name, self.new_content) | ||
743 | 158 | self.check_file_content(self.base_content + self.new_content, f.name) | ||
744 | 159 | |||
745 | 160 | def test_existing_content(self): | ||
746 | 161 | # Ensure nothing happens if the file already contains the given line. | ||
747 | 162 | content = self.base_content + self.new_content | ||
748 | 163 | f = self.create_file(content) | ||
749 | 164 | file_append(f.name, self.new_content) | ||
750 | 165 | self.check_file_content(content, f.name) | ||
751 | 166 | |||
752 | 167 | def test_new_line_in_file_contents(self): | ||
753 | 168 | # A new line is automatically added before the given content if it | ||
754 | 169 | # is not present at the end of current file. | ||
755 | 170 | f = self.create_file(self.base_content.strip()) | ||
756 | 171 | file_append(f.name, self.new_content) | ||
757 | 172 | self.check_file_content(self.base_content + self.new_content, f.name) | ||
758 | 173 | |||
759 | 174 | def test_new_line_in_given_line(self): | ||
760 | 175 | # A new line is automatically added to the given line if not present. | ||
761 | 176 | f = self.create_file(self.base_content) | ||
762 | 177 | file_append(f.name, self.new_content.strip()) | ||
763 | 178 | self.check_file_content(self.base_content + self.new_content, f.name) | ||
764 | 179 | |||
765 | 180 | def test_non_existent_file(self): | ||
766 | 181 | # Ensure the file is created if it does not exist. | ||
767 | 182 | filename = tempfile.mktemp() | ||
768 | 183 | file_append(filename, self.base_content) | ||
769 | 184 | self.check_file_content(self.base_content, filename) | ||
770 | 185 | |||
771 | 186 | def test_fragment(self): | ||
772 | 187 | # Ensure a line fragment is not matched. | ||
773 | 188 | f = self.create_file(self.base_content) | ||
774 | 189 | fragment = self.base_content[2:] | ||
775 | 190 | file_append(f.name, fragment) | ||
776 | 191 | self.check_file_content(self.base_content + fragment, f.name) | ||
777 | 192 | |||
778 | 193 | |||
779 | 194 | class TestFilePrepend(unittest.TestCase, BaseTestFile): | ||
780 | 195 | |||
781 | 196 | def test_prpend(self): | ||
782 | 197 | # Ensure the new content is correctly prepended at the beginning of | ||
783 | 198 | # the file. | ||
784 | 199 | f = self.create_file(self.base_content) | ||
785 | 200 | file_prepend(f.name, self.new_content) | ||
786 | 201 | self.check_file_content(self.new_content + self.base_content, f.name) | ||
787 | 202 | |||
788 | 203 | def test_existing_content(self): | ||
789 | 204 | # Ensure nothing happens if the file already starts with the given | ||
790 | 205 | # content. | ||
791 | 206 | content = self.base_content + self.new_content | ||
792 | 207 | f = self.create_file(content) | ||
793 | 208 | file_prepend(f.name, self.base_content) | ||
794 | 209 | self.check_file_content(content, f.name) | ||
795 | 210 | |||
796 | 211 | def test_move_content(self): | ||
797 | 212 | # If the file contains the given content, but not at the beginning, | ||
798 | 213 | # the content is moved on top. | ||
799 | 214 | f = self.create_file(self.base_content + self.new_content) | ||
800 | 215 | file_prepend(f.name, self.new_content) | ||
801 | 216 | self.check_file_content(self.new_content + self.base_content, f.name) | ||
802 | 217 | |||
803 | 218 | def test_new_line_in_given_line(self): | ||
804 | 219 | # A new line is automatically added to the given line if not present. | ||
805 | 220 | f = self.create_file(self.base_content) | ||
806 | 221 | file_prepend(f.name, self.new_content.strip()) | ||
807 | 222 | self.check_file_content(self.new_content + self.base_content, f.name) | ||
808 | 223 | |||
809 | 224 | |||
810 | 225 | class TestGenerateSSHKeys(unittest.TestCase): | ||
811 | 226 | |||
812 | 227 | def test_generation(self): | ||
813 | 228 | # Ensure ssh keys are correctly generated. | ||
814 | 229 | filename = tempfile.mktemp() | ||
815 | 230 | generate_ssh_keys(filename) | ||
816 | 231 | first_line = open(filename).readlines()[0].strip() | ||
817 | 232 | self.assertEqual('-----BEGIN RSA PRIVATE KEY-----', first_line) | ||
818 | 233 | pub_content = open(filename + '.pub').read() | ||
819 | 234 | self.assertTrue(pub_content.startswith('ssh-rsa')) | ||
820 | 235 | |||
821 | 236 | |||
822 | 237 | class TestGetSuCommand(unittest.TestCase): | ||
823 | 238 | |||
824 | 239 | def test_current_user(self): | ||
825 | 240 | # If the su is requested as current user, the arguments are | ||
826 | 241 | # returned as given. | ||
827 | 242 | cmd = ('ls', '-l') | ||
828 | 243 | command = get_su_command(getpass.getuser(), cmd) | ||
829 | 244 | self.assertSequenceEqual(cmd, command) | ||
830 | 245 | |||
831 | 246 | def test_another_user(self): | ||
832 | 247 | # Ensure "su" is prepended and arguments are correctly quoted. | ||
833 | 248 | command = get_su_command('nobody', ('ls', '-l', 'my file')) | ||
834 | 249 | self.assertSequenceEqual( | ||
835 | 250 | ('su', 'nobody', '-c', "ls -l 'my file'"), command) | ||
836 | 251 | |||
837 | 252 | |||
838 | 253 | class TestGetUserHome(unittest.TestCase): | ||
839 | 254 | |||
840 | 255 | def test_existent(self): | ||
841 | 256 | # Ensure the real home directory is returned for existing users. | ||
842 | 257 | self.assertEqual('/root', get_user_home('root')) | ||
843 | 258 | |||
844 | 259 | def test_non_existent(self): | ||
845 | 260 | # If the user does not exist, return a default /home/[username] home. | ||
846 | 261 | user = '_this_user_does_not_exist_' | ||
847 | 262 | self.assertEqual('/home/' + user, get_user_home(user)) | ||
848 | 263 | |||
849 | 264 | |||
850 | 265 | class TestGetUserIds(unittest.TestCase): | ||
851 | 266 | |||
852 | 267 | def test_get_user_ids(self): | ||
853 | 268 | # Ensure the correct uid and gid are returned. | ||
854 | 269 | uid, gid = get_user_ids('root') | ||
855 | 270 | self.assertEqual(0, uid) | ||
856 | 271 | self.assertEqual(0, gid) | ||
857 | 272 | |||
858 | 273 | |||
859 | 274 | class TestJoinCommand(unittest.TestCase): | ||
860 | 275 | |||
861 | 276 | def test_normal(self): | ||
862 | 277 | # Ensure a normal command is correctly parsed. | ||
863 | 278 | command = 'ls -l' | ||
864 | 279 | self.assertEqual(command, join_command(command.split())) | ||
865 | 280 | |||
866 | 281 | def test_containing_spaces(self): | ||
867 | 282 | # Ensure args containing spaces are correctly quoted. | ||
868 | 283 | args = ('command', 'arg containig spaces') | ||
869 | 284 | self.assertEqual("command 'arg containig spaces'", join_command(args)) | ||
870 | 285 | |||
871 | 286 | def test_empty(self): | ||
872 | 287 | # Ensure empty args are correctly quoted. | ||
873 | 288 | args = ('command', '') | ||
874 | 289 | self.assertEqual("command ''", join_command(args)) | ||
875 | 290 | |||
876 | 291 | |||
877 | 292 | class TestMkdirs(unittest.TestCase): | ||
878 | 293 | |||
879 | 294 | def test_intermediate_dirs(self): | ||
880 | 295 | # Ensure the leaf directory and all intermediate ones are created. | ||
881 | 296 | base_dir = tempfile.mktemp(suffix='/') | ||
882 | 297 | dir1 = tempfile.mktemp(prefix=base_dir) | ||
883 | 298 | dir2 = tempfile.mktemp(prefix=base_dir) | ||
884 | 299 | mkdirs(dir1, dir2) | ||
885 | 300 | self.assertTrue(os.path.isdir(dir1)) | ||
886 | 301 | self.assertTrue(os.path.isdir(dir2)) | ||
887 | 302 | |||
888 | 303 | def test_existing_dir(self): | ||
889 | 304 | # If the leaf directory already exists the function returns | ||
890 | 305 | # without errors. | ||
891 | 306 | mkdirs('/tmp') | ||
892 | 307 | |||
893 | 308 | def test_existing_file(self): | ||
894 | 309 | # An `OSError` is raised if the leaf path exists and it is a file. | ||
895 | 310 | f = tempfile.NamedTemporaryFile('w', delete=False) | ||
896 | 311 | f.close() | ||
897 | 312 | with self.assertRaises(OSError): | ||
898 | 313 | mkdirs(f.name) | ||
899 | 314 | |||
900 | 315 | |||
901 | 316 | class TestRun(unittest.TestCase): | ||
902 | 317 | |||
903 | 318 | def testSimpleCommand(self): | ||
904 | 319 | # Running a simple command (ls) works and running the command | ||
905 | 320 | # produces a string. | ||
906 | 321 | self.assertIsInstance(run('/bin/ls'), str) | ||
907 | 322 | |||
908 | 323 | def testStdoutReturned(self): | ||
909 | 324 | # Running a simple command (ls) works and running the command | ||
910 | 325 | # produces a string. | ||
911 | 326 | self.assertIn('Usage:', run('/bin/ls', '--help')) | ||
912 | 327 | |||
913 | 328 | def testCalledProcessErrorRaised(self): | ||
914 | 329 | # If an error occurs a CalledProcessError is raised with the return | ||
915 | 330 | # code, command executed, and the output of the command. | ||
916 | 331 | with self.assertRaises(CalledProcessError) as info: | ||
917 | 332 | run('ls', '--not a valid switch') | ||
918 | 333 | exception = info.exception | ||
919 | 334 | self.assertEqual(2, exception.returncode) | ||
920 | 335 | self.assertEqual("['ls', '--not a valid switch']", exception.cmd) | ||
921 | 336 | self.assertIn('unrecognized option', exception.output) | ||
922 | 337 | |||
923 | 338 | def testNoneArguments(self): | ||
924 | 339 | # Ensure None is ignored when passed as positional argument. | ||
925 | 340 | self.assertIn('Usage:', run('/bin/ls', None, '--help', None)) | ||
926 | 341 | |||
927 | 342 | |||
928 | 343 | class TestSearchFile(unittest.TestCase, BaseCreateFile): | ||
929 | 344 | |||
930 | 345 | content1 = 'content1\n' | ||
931 | 346 | content2 = 'content2\n' | ||
932 | 347 | |||
933 | 348 | def setUp(self): | ||
934 | 349 | self.filename = self.create_file(self.content1 + self.content2).name | ||
935 | 350 | |||
936 | 351 | def tearDown(self): | ||
937 | 352 | os.remove(self.filename) | ||
938 | 353 | |||
939 | 354 | def test_grep(self): | ||
940 | 355 | # Ensure plain text is correctly matched. | ||
941 | 356 | self.assertEqual(self.content2, search_file('ent2', self.filename)) | ||
942 | 357 | self.assertEqual(self.content1, search_file('content', self.filename)) | ||
943 | 358 | |||
944 | 359 | def test_no_match(self): | ||
945 | 360 | # Ensure the function does not return false positives. | ||
946 | 361 | self.assertIsNone(search_file('no_match', self.filename)) | ||
947 | 362 | |||
948 | 363 | def test_regexp(self): | ||
949 | 364 | # Ensure the function works with regular expressions. | ||
950 | 365 | self.assertEqual(self.content2, search_file('\w2', self.filename)) | ||
951 | 366 | |||
952 | 367 | |||
953 | 368 | class TestSerializer(unittest.TestCase): | ||
954 | 369 | |||
955 | 370 | def setUp(self): | ||
956 | 371 | self.path = tempfile.mktemp() | ||
957 | 372 | self.data = {'key': 'value'} | ||
958 | 373 | |||
959 | 374 | def tearDown(self): | ||
960 | 375 | if os.path.exists(self.path): | ||
961 | 376 | os.remove(self.path) | ||
962 | 377 | |||
963 | 378 | def test_serializer(self): | ||
964 | 379 | # Ensure data is correctly serializied and deserialized. | ||
965 | 380 | s = Serializer(self.path) | ||
966 | 381 | s.set(self.data) | ||
967 | 382 | self.assertEqual(self.data, s.get()) | ||
968 | 383 | |||
969 | 384 | def test_existence(self): | ||
970 | 385 | # Ensure the file is created only when needed. | ||
971 | 386 | s = Serializer(self.path) | ||
972 | 387 | self.assertFalse(s.exists()) | ||
973 | 388 | s.set(self.data) | ||
974 | 389 | self.assertTrue(s.exists()) | ||
975 | 390 | |||
976 | 391 | def test_default_value(self): | ||
977 | 392 | # If the file does not exist, the serializer returns a default value. | ||
978 | 393 | s = Serializer(self.path) | ||
979 | 394 | self.assertEqual({}, s.get()) | ||
980 | 395 | s = Serializer(self.path, default=47) | ||
981 | 396 | self.assertEqual(47, s.get()) | ||
982 | 397 | |||
983 | 398 | def test_another_serializer(self): | ||
984 | 399 | # It is possible to use a custom serializer (e.g. pickle). | ||
985 | 400 | import pickle | ||
986 | 401 | s = Serializer( | ||
987 | 402 | self.path, serialize=pickle.dump, deserialize=pickle.load) | ||
988 | 403 | s.set(self.data) | ||
989 | 404 | self.assertEqual(self.data, s.get()) | ||
990 | 405 | |||
991 | 406 | |||
992 | 407 | class TestSSH(unittest.TestCase): | ||
993 | 408 | |||
994 | 409 | def setUp(self): | ||
995 | 410 | self.last_command = None | ||
996 | 411 | |||
997 | 412 | def remove_command_options(self, cmd): | ||
998 | 413 | cmd = list(cmd) | ||
999 | 414 | del cmd[1:7] | ||
1000 | 415 | return cmd | ||
1001 | 416 | |||
1002 | 417 | def caller(self, cmd): | ||
1003 | 418 | self.last_command = self.remove_command_options(cmd) | ||
1004 | 419 | |||
1005 | 420 | def check_last_command(self, expected): | ||
1006 | 421 | self.assertSequenceEqual(expected, self.last_command) | ||
1007 | 422 | |||
1008 | 423 | def test_current_user(self): | ||
1009 | 424 | # Ensure ssh command is correctly generated for current user. | ||
1010 | 425 | sshcall = ssh('example.com', caller=self.caller) | ||
1011 | 426 | sshcall('ls -l') | ||
1012 | 427 | self.check_last_command(['ssh', 'example.com', '--', 'ls -l']) | ||
1013 | 428 | |||
1014 | 429 | def test_another_user(self): | ||
1015 | 430 | # Ensure ssh command is correctly generated for a different user. | ||
1016 | 431 | sshcall = ssh('example.com', 'myuser', caller=self.caller) | ||
1017 | 432 | sshcall('ls -l') | ||
1018 | 433 | self.check_last_command(['ssh', 'myuser@example.com', '--', 'ls -l']) | ||
1019 | 434 | |||
1020 | 435 | def test_ssh_key(self): | ||
1021 | 436 | # The ssh key path can be optionally provided. | ||
1022 | 437 | sshcall = ssh('example.com', key='/tmp/foo', caller=self.caller) | ||
1023 | 438 | sshcall('ls -l') | ||
1024 | 439 | self.check_last_command([ | ||
1025 | 440 | 'ssh', '-i', '/tmp/foo', 'example.com', '--', 'ls -l']) | ||
1026 | 441 | |||
1027 | 442 | def test_error(self): | ||
1028 | 443 | # If the ssh command exits with an error code, a | ||
1029 | 444 | # `subprocess.CalledProcessError` is raised. | ||
1030 | 445 | sshcall = ssh('example.com', caller=lambda cmd: 1) | ||
1031 | 446 | with self.assertRaises(CalledProcessError): | ||
1032 | 447 | sshcall('ls -l') | ||
1033 | 448 | |||
1034 | 449 | def test_ignore_errors(self): | ||
1035 | 450 | # If ignore_errors is set to True when executing the command, no error | ||
1036 | 451 | # will be raised, even if the command itself returns an error code. | ||
1037 | 452 | sshcall = ssh('example.com', caller=lambda cmd: 1) | ||
1038 | 453 | sshcall('ls -l', ignore_errors=True) | ||
1039 | 454 | |||
1040 | 455 | |||
1041 | 115 | current_euid = os.geteuid() | 456 | current_euid = os.geteuid() |
1042 | 116 | current_egid = os.getegid() | 457 | current_egid = os.getegid() |
1043 | 117 | current_home = os.environ['HOME'] | 458 | current_home = os.environ['HOME'] |
1044 | @@ -180,13 +521,11 @@ | |||
1045 | 180 | self.assertEqual(current_home, os.environ['HOME']) | 521 | self.assertEqual(current_home, os.environ['HOME']) |
1046 | 181 | 522 | ||
1047 | 182 | 523 | ||
1055 | 183 | class TestCdContextManager(unittest.TestCase): | 524 | class TestUserExists(unittest.TestCase): |
1056 | 184 | def test_cd(self): | 525 | |
1057 | 185 | curdir = os.getcwd() | 526 | def test_user_exists(self): |
1058 | 186 | self.assertNotEqual('/var', curdir) | 527 | self.assertTrue(user_exists('root')) |
1059 | 187 | with cd('/var'): | 528 | self.assertFalse(user_exists('_this_user_does_not_exist_')) |
1053 | 188 | self.assertEqual('/var', os.getcwd()) | ||
1054 | 189 | self.assertEqual(curdir, os.getcwd()) | ||
1060 | 190 | 529 | ||
1061 | 191 | 530 | ||
1062 | 192 | if __name__ == '__main__': | 531 | if __name__ == '__main__': |
Thank you! Small comments and suggestions follow.
I asked about reordering the functions for alphabetization. Not having run at the top is somewhat surprising to me, but I am ok with it.
I was struck that file_append really expects the line to be added to end with a \n. I suppose we could enforce that one way or another...or not. I'm OK with leaving as is, or you could add a \n if the line does not already end with one (and do this before you check if the line is in the content). On a somewhat related note, it might be better to check if the line to be added is in the output of readlines(), not read(); that would verify that the line, not a line that ends with the line, is in the file. For example, right now a file of "abc\ndef\n" would incorrectly show that the "ef\n" line was in the file.
In file_prepend, I am tempted to suggest that this snippet should be changed:
lines. remove( line)
lines. remove( line)
pass
if line in lines:
This iterates through the lines twice, which is mildly annoying to me but almost certainly not a practical concern for us with these use cases. That said, I'd be tempted to change that to the following:
try:
except ValueError:
You could also verify that the line to be added in this function ended with a \n if you wanted to.
I don't really understand this part of the docstring for get_su_command: su_command( ...)). Maybe an example would help.
This can be used together with `run` when the `su` context manager is not
enough (e.g. an external program uses uid rather than euid).
Oh! You mean that you would use run(*get_
I'm a bit concerned about what you are doing here in get_value_ from_line: '=')[1] .strip( '"\' ')
return line.split(
would maybe json be a safer approach? No, I tried it, and json.loads('"hi"') is fine but json.loads("'hi'") is not. :-/ I dunno. When do we use this? The logic seems a bit idiosyncratic.
The grep command also seems a bit idiosyncratic. Why is it reasonable to always strip the result? I also would be tempted to rename the funtion: grep does a lot more than that. simply search_file? Hm, I see that you just reordered this file, you didn't actually add grep.... Never mind then, I guess. I should have commented on this sooner, but you shouldn't have to worry about it. If you *do* want to change it, please leave the "grep" alias around for now so that our charms do not suddenly break.
I don't understand why you removed the Serializer. Was it defined twice?
I thought the choices you made for doc examples and for unit tests were good. Thank you.
Great job!
Gary