Merge lp:~frankban/python-shelltoolbox/helpers into lp:python-shelltoolbox
- helpers
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 14 |
Proposed branch: | lp:~frankban/python-shelltoolbox/helpers |
Merge into: | lp:python-shelltoolbox |
Diff against target: |
1062 lines (+802/-147) 3 files modified
setup.py (+1/-1) shelltoolbox/__init__.py (+430/-114) tests.py (+371/-32) |
To merge this branch: | bzr merge lp:~frankban/python-shelltoolbox/helpers |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gary Poster (community) | Approve | ||
Review via email: mp+96180@code.launchpad.net |
Commit message
Description of the change
== Changes ==
- Added several helper functions:
- bzr_whois
- file_append
- file_prepend
- generate_ssh_keys
- get_su_command
- join_command
- mkdirs
- ssh
- user_exists
- Added `environ` context manager and updated `su` to use it.
- Changed `apt_get_install` helper: now the function uses environ to run dpkg
in non-interactive mode.
- Changed `get_user_home` to handle non-existent users
(returning a default home directory)
- Updated the `install_
placeholders.
- Updated `cd` context manager: yield in a try/finally block.
- Fixed `grep` helper.
== Tests ==
python tests.py
.......
-------
Ran 53 tests in 0.124s
OK
Francesco Banconi (frankban) wrote : | # |
> I was struck that file_append really expects the line to be added to end with
> a \n. I suppose we could enforce that one way or another...or not. I'm OK
> with leaving as is, or you could add a \n if the line does not already end
> with one (and do this before you check if the line is in the content). On a
> somewhat related note, it might be better to check if the line to be added is
> in the output of readlines(), not read(); that would verify that the line, not
> a line that ends with the line, is in the file. For example, right now a file
> of "abc\ndef\n" would incorrectly show that the "ef\n" line was in the file.
I agree with your suggestions. I've changed the function and removed those
ambiguities. I've added tests for line not ending with '\n' and for line
fragments.
> In file_prepend, I am tempted to suggest that this snippet should be changed:
> if line in lines:
> lines.remove(line)
> This iterates through the lines twice, which is mildly annoying to me but
> almost certainly not a practical concern for us with these use cases. That
> said, I'd be tempted to change that to the following:
> try:
> lines.remove(line)
> except ValueError:
> pass
> You could also verify that the line to be added in this function ended with a
> \n if you wanted to.
Done.
> I'm a bit concerned about what you are doing here in get_value_
> return line.split(
> would maybe json be a safer approach? No, I tried it, and json.loads('"hi"')
> is fine but json.loads("'hi'") is not. :-/ I dunno. When do we use this?
> The logic seems a bit idiosyncratic.
I don't remember the origin of that function. Since it is not used in the
charms or in setuplxc/lpsetup, I am going to remove it.
> The grep command also seems a bit idiosyncratic. Why is it reasonable to
> always strip the result? I also would be tempted to rename the funtion: grep
> does a lot more than that. simply search_file? Hm, I see that you just
> reordered this file, you didn't actually add grep.... Never mind then, I
> guess. I should have commented on this sooner, but you shouldn't have to
> worry about it. If you *do* want to change it, please leave the "grep" alias
> around for now so that our charms do not suddenly break.
Actually that helper is not used, and I remember it was before we changed the
way the buildslave is reconfigured. I think it's safe to rename it and change
the returned value.
> I don't understand why you removed the Serializer. Was it defined twice?
Yes it was.
Thanks Gary.
- 33. By Francesco Banconi
-
Fixed file_append.
- 34. By Francesco Banconi
-
Fixed file_prepend.
- 35. By Francesco Banconi
-
Removed get_value_
from_line. - 36. By Francesco Banconi
-
s/grep/search_file/
Preview Diff
1 | === modified file 'setup.py' |
2 | --- setup.py 2012-03-01 20:49:48 +0000 |
3 | +++ setup.py 2012-03-07 10:19:06 +0000 |
4 | @@ -9,7 +9,7 @@ |
5 | ez_setup.use_setuptools() |
6 | |
7 | |
8 | -__version__ = '0.1.1' |
9 | +__version__ = '0.2.0' |
10 | |
11 | from setuptools import setup |
12 | |
13 | |
14 | === modified file 'shelltoolbox/__init__.py' |
15 | --- shelltoolbox/__init__.py 2012-03-02 14:59:46 +0000 |
16 | +++ shelltoolbox/__init__.py 2012-03-07 10:19:06 +0000 |
17 | @@ -19,25 +19,38 @@ |
18 | __metaclass__ = type |
19 | __all__ = [ |
20 | 'apt_get_install', |
21 | + 'bzr_whois', |
22 | 'cd', |
23 | 'command', |
24 | 'DictDiffer', |
25 | + 'environ', |
26 | + 'file_append', |
27 | + 'file_prepend', |
28 | + 'generate_ssh_keys', |
29 | + 'get_su_command', |
30 | + 'get_user_home', |
31 | 'get_user_ids', |
32 | - 'get_user_home', |
33 | - 'get_value_from_line', |
34 | - 'grep', |
35 | 'install_extra_repositories', |
36 | + 'join_command', |
37 | + 'mkdirs', |
38 | 'run', |
39 | 'Serializer', |
40 | 'script_name', |
41 | + 'search_file', |
42 | + 'ssh', |
43 | 'su', |
44 | + 'user_exists', |
45 | + 'wait_for_page_contents', |
46 | ] |
47 | |
48 | from collections import namedtuple |
49 | from contextlib import contextmanager |
50 | +from email.Utils import parseaddr |
51 | +import errno |
52 | import json |
53 | import operator |
54 | import os |
55 | +import pipes |
56 | import pwd |
57 | import re |
58 | import subprocess |
59 | @@ -50,26 +63,50 @@ |
60 | Env = namedtuple('Env', 'uid gid home') |
61 | |
62 | |
63 | -def run(*args, **kwargs): |
64 | - """Run the command with the given arguments. |
65 | - |
66 | - The first argument is the path to the command to run. |
67 | - Subsequent arguments are command-line arguments to be passed. |
68 | - |
69 | - This function accepts all optional keyword arguments accepted by |
70 | - `subprocess.Popen`. |
71 | - """ |
72 | - args = [i for i in args if i is not None] |
73 | - pipe = subprocess.PIPE |
74 | - process = subprocess.Popen( |
75 | - args, stdout=kwargs.pop('stdout', pipe), |
76 | - stderr=kwargs.pop('stderr', pipe), |
77 | - close_fds=kwargs.pop('close_fds', True), **kwargs) |
78 | - stdout, stderr = process.communicate() |
79 | - if process.returncode: |
80 | - raise subprocess.CalledProcessError( |
81 | - process.returncode, repr(args), output=stdout+stderr) |
82 | - return stdout |
83 | +def apt_get_install(*args, **kwargs): |
84 | + """Install given packages using apt. |
85 | + |
86 | + It is possible to pass environment variables to be set during install |
87 | + using keyword arguments. |
88 | + |
89 | + :raises: subprocess.CalledProcessError |
90 | + """ |
91 | + debian_frontend = kwargs.pop('DEBIAN_FRONTEND', 'noninteractive') |
92 | + with environ(DEBIAN_FRONTEND=debian_frontend, **kwargs): |
93 | + cmd = ('apt-get', '-y', 'install') + args |
94 | + return run(*cmd) |
95 | + |
96 | + |
97 | +def bzr_whois(user): |
98 | + """Return full name and email of bzr `user`. |
99 | + |
100 | + Return None if the given `user` does not have a bzr user id. |
101 | + """ |
102 | + with su(user): |
103 | + try: |
104 | + whoami = run('bzr', 'whoami') |
105 | + except (subprocess.CalledProcessError, OSError): |
106 | + return None |
107 | + return parseaddr(whoami) |
108 | + |
109 | + |
110 | +@contextmanager |
111 | +def cd(directory): |
112 | + """A context manager to temporarily change current working dir, e.g.:: |
113 | + |
114 | + >>> import os |
115 | + >>> os.chdir('/tmp') |
116 | + >>> with cd('/bin'): print os.getcwd() |
117 | + /bin |
118 | + >>> print os.getcwd() |
119 | + /tmp |
120 | + """ |
121 | + cwd = os.getcwd() |
122 | + os.chdir(directory) |
123 | + try: |
124 | + yield |
125 | + finally: |
126 | + os.chdir(cwd) |
127 | |
128 | |
129 | def command(*base_args): |
130 | @@ -97,37 +134,393 @@ |
131 | return callable_command |
132 | |
133 | |
134 | -apt_get_install = command('apt-get', 'install', '-y', '--force-yes') |
135 | +@contextmanager |
136 | +def environ(**kwargs): |
137 | + """A context manager to temporarily change environment variables. |
138 | + |
139 | + If an existing environment variable is changed, it is restored during |
140 | + context cleanup:: |
141 | + |
142 | + >>> import os |
143 | + >>> os.environ['MY_VARIABLE'] = 'foo' |
144 | + >>> with environ(MY_VARIABLE='bar'): print os.getenv('MY_VARIABLE') |
145 | + bar |
146 | + >>> print os.getenv('MY_VARIABLE') |
147 | + foo |
148 | + >>> del os.environ['MY_VARIABLE'] |
149 | + |
150 | + If we are adding environment variables, they are removed during context |
151 | + cleanup:: |
152 | + |
153 | + >>> import os |
154 | + >>> with environ(MY_VAR1='foo', MY_VAR2='bar'): |
155 | + ... print os.getenv('MY_VAR1'), os.getenv('MY_VAR2') |
156 | + foo bar |
157 | + >>> os.getenv('MY_VAR1') == os.getenv('MY_VAR2') == None |
158 | + True |
159 | + """ |
160 | + backup = {} |
161 | + for key, value in kwargs.items(): |
162 | + backup[key] = os.getenv(key) |
163 | + os.environ[key] = value |
164 | + try: |
165 | + yield |
166 | + finally: |
167 | + for key, value in backup.items(): |
168 | + if value is None: |
169 | + del os.environ[key] |
170 | + else: |
171 | + os.environ[key] = value |
172 | + |
173 | + |
174 | +def file_append(filename, line): |
175 | + r"""Append given `line`, if not present, at the end of `filename`. |
176 | + |
177 | + Usage example:: |
178 | + |
179 | + >>> import tempfile |
180 | + >>> f = tempfile.NamedTemporaryFile('w', delete=False) |
181 | + >>> f.write('line1\n') |
182 | + >>> f.close() |
183 | + >>> file_append(f.name, 'new line\n') |
184 | + >>> open(f.name).read() |
185 | + 'line1\nnew line\n' |
186 | + |
187 | + Nothing happens if the file already contains the given `line`:: |
188 | + |
189 | + >>> file_append(f.name, 'new line\n') |
190 | + >>> open(f.name).read() |
191 | + 'line1\nnew line\n' |
192 | + |
193 | + A new line is automatically added before the given `line` if it is not |
194 | + present at the end of current file content:: |
195 | + |
196 | + >>> import tempfile |
197 | + >>> f = tempfile.NamedTemporaryFile('w', delete=False) |
198 | + >>> f.write('line1') |
199 | + >>> f.close() |
200 | + >>> file_append(f.name, 'new line\n') |
201 | + >>> open(f.name).read() |
202 | + 'line1\nnew line\n' |
203 | + |
204 | + The file is created if it does not exist:: |
205 | + |
206 | + >>> import tempfile |
207 | + >>> filename = tempfile.mktemp() |
208 | + >>> file_append(filename, 'line1\n') |
209 | + >>> open(filename).read() |
210 | + 'line1\n' |
211 | + """ |
212 | + if not line.endswith('\n'): |
213 | + line += '\n' |
214 | + with open(filename, 'a+') as f: |
215 | + lines = f.readlines() |
216 | + if line not in lines: |
217 | + if not lines or lines[-1].endswith('\n'): |
218 | + f.write(line) |
219 | + else: |
220 | + f.write('\n' + line) |
221 | + |
222 | + |
223 | +def file_prepend(filename, line): |
224 | + r"""Insert given `line`, if not present, at the beginning of `filename`. |
225 | + |
226 | + Usage example:: |
227 | + |
228 | + >>> import tempfile |
229 | + >>> f = tempfile.NamedTemporaryFile('w', delete=False) |
230 | + >>> f.write('line1\n') |
231 | + >>> f.close() |
232 | + >>> file_prepend(f.name, 'line0\n') |
233 | + >>> open(f.name).read() |
234 | + 'line0\nline1\n' |
235 | + |
236 | + If the file starts with the given `line`, nothing happens:: |
237 | + |
238 | + >>> file_prepend(f.name, 'line0\n') |
239 | + >>> open(f.name).read() |
240 | + 'line0\nline1\n' |
241 | + |
242 | + If the file contains the given `line`, but not at the beginning, |
243 | + the line is moved on top:: |
244 | + |
245 | + >>> file_prepend(f.name, 'line1\n') |
246 | + >>> open(f.name).read() |
247 | + 'line1\nline0\n' |
248 | + """ |
249 | + if not line.endswith('\n'): |
250 | + line += '\n' |
251 | + with open(filename, 'r+') as f: |
252 | + lines = f.readlines() |
253 | + if lines[0] != line: |
254 | + try: |
255 | + lines.remove(line) |
256 | + except ValueError: |
257 | + pass |
258 | + lines.insert(0, line) |
259 | + f.seek(0) |
260 | + f.writelines(lines) |
261 | + |
262 | + |
263 | +def generate_ssh_keys(path, passphrase=''): |
264 | + """Generate ssh key pair, saving them inside the given `directory`. |
265 | + |
266 | + >>> generate_ssh_keys('/tmp/id_rsa') |
267 | + 0 |
268 | + >>> open('/tmp/id_rsa').readlines()[0].strip() |
269 | + '-----BEGIN RSA PRIVATE KEY-----' |
270 | + >>> open('/tmp/id_rsa.pub').read().startswith('ssh-rsa') |
271 | + True |
272 | + >>> os.remove('/tmp/id_rsa') |
273 | + >>> os.remove('/tmp/id_rsa.pub') |
274 | + """ |
275 | + return subprocess.call([ |
276 | + 'ssh-keygen', '-q', '-t', 'rsa', '-N', passphrase, '-f', path]) |
277 | + |
278 | + |
279 | +def get_su_command(user, args): |
280 | + """Return a command line as a sequence, prepending "su" if necessary. |
281 | + |
282 | + This can be used together with `run` when the `su` context manager is not |
283 | + enough (e.g. an external program uses uid rather than euid). |
284 | + |
285 | + run(*get_su_command(user, ['bzr', 'whoami'])) |
286 | + |
287 | + If the su is requested as current user, the arguments are returned as |
288 | + given:: |
289 | + |
290 | + >>> import getpass |
291 | + >>> current_user = getpass.getuser() |
292 | + |
293 | + >>> get_su_command(current_user, ('ls', '-l')) |
294 | + ('ls', '-l') |
295 | + |
296 | + Otherwise, "su" is prepended:: |
297 | + |
298 | + >>> get_su_command('nobody', ('ls', '-l', 'my file')) |
299 | + ('su', 'nobody', '-c', "ls -l 'my file'") |
300 | + """ |
301 | + if get_user_ids(user)[0] != os.getuid(): |
302 | + args = [i for i in args if i is not None] |
303 | + return ('su', user, '-c', join_command(args)) |
304 | + return args |
305 | + |
306 | + |
307 | +def get_user_home(user): |
308 | + """Return the home directory of the given `user`. |
309 | + |
310 | + >>> get_user_home('root') |
311 | + '/root' |
312 | + |
313 | + If the user does not exist, return a default /home/[username] home:: |
314 | + |
315 | + >>> get_user_home('_this_user_does_not_exist_') |
316 | + '/home/_this_user_does_not_exist_' |
317 | + """ |
318 | + try: |
319 | + return pwd.getpwnam(user).pw_dir |
320 | + except KeyError: |
321 | + return os.path.join(os.path.sep, 'home', user) |
322 | + |
323 | + |
324 | +def get_user_ids(user): |
325 | + """Return the uid and gid of given `user`, e.g.:: |
326 | + |
327 | + >>> get_user_ids('root') |
328 | + (0, 0) |
329 | + """ |
330 | + userdata = pwd.getpwnam(user) |
331 | + return userdata.pw_uid, userdata.pw_gid |
332 | |
333 | |
334 | def install_extra_repositories(*repositories): |
335 | """Install all of the extra repositories and update apt. |
336 | |
337 | + Given repositories can contain a "{distribution}" placeholder, that will |
338 | + be replaced by current distribution codename. |
339 | + |
340 | :raises: subprocess.CalledProcessError |
341 | """ |
342 | distribution = run('lsb_release', '-cs').strip() |
343 | - # Starting from Oneiric, the `apt-add-repository` is interactive by |
344 | + # Starting from Oneiric, `apt-add-repository` is interactive by |
345 | # default, and requires a "-y" flag to be set. |
346 | assume_yes = None if distribution == 'lucid' else '-y' |
347 | for repo in repositories: |
348 | - run('apt-add-repository', assume_yes, repo) |
349 | + repository = repo.format(distribution=distribution) |
350 | + run('apt-add-repository', assume_yes, repository) |
351 | run('apt-get', 'clean') |
352 | run('apt-get', 'update') |
353 | |
354 | |
355 | -def grep(content, filename): |
356 | +def join_command(args): |
357 | + """Return a valid Unix command line from `args`. |
358 | + |
359 | + >>> join_command(['ls', '-l']) |
360 | + 'ls -l' |
361 | + |
362 | + Arguments containing spaces and empty args are correctly quoted:: |
363 | + |
364 | + >>> join_command(['command', 'arg1', 'arg containing spaces', '']) |
365 | + "command arg1 'arg containing spaces' ''" |
366 | + """ |
367 | + return ' '.join(pipes.quote(arg) for arg in args) |
368 | + |
369 | + |
370 | +def mkdirs(*args): |
371 | + """Create leaf directories (given as `args`) and all intermediate ones. |
372 | + |
373 | + >>> import tempfile |
374 | + >>> base_dir = tempfile.mktemp(suffix='/') |
375 | + >>> dir1 = tempfile.mktemp(prefix=base_dir) |
376 | + >>> dir2 = tempfile.mktemp(prefix=base_dir) |
377 | + >>> mkdirs(dir1, dir2) |
378 | + >>> os.path.isdir(dir1) |
379 | + True |
380 | + >>> os.path.isdir(dir2) |
381 | + True |
382 | + |
383 | + If the leaf directory already exists the function returns without errors:: |
384 | + |
385 | + >>> mkdirs(dir1) |
386 | + |
387 | + An `OSError` is raised if the leaf path exists and it is a file:: |
388 | + |
389 | + >>> f = tempfile.NamedTemporaryFile( |
390 | + ... 'w', delete=False, prefix=base_dir) |
391 | + >>> f.close() |
392 | + >>> mkdirs(f.name) # doctest: +ELLIPSIS |
393 | + Traceback (most recent call last): |
394 | + OSError: ... |
395 | + """ |
396 | + for directory in args: |
397 | + try: |
398 | + os.makedirs(directory) |
399 | + except OSError as err: |
400 | + if err.errno != errno.EEXIST or os.path.isfile(directory): |
401 | + raise |
402 | + |
403 | + |
404 | +def run(*args, **kwargs): |
405 | + """Run the command with the given arguments. |
406 | + |
407 | + The first argument is the path to the command to run. |
408 | + Subsequent arguments are command-line arguments to be passed. |
409 | + |
410 | + This function accepts all optional keyword arguments accepted by |
411 | + `subprocess.Popen`. |
412 | + """ |
413 | + args = [i for i in args if i is not None] |
414 | + pipe = subprocess.PIPE |
415 | + process = subprocess.Popen( |
416 | + args, stdout=kwargs.pop('stdout', pipe), |
417 | + stderr=kwargs.pop('stderr', pipe), |
418 | + close_fds=kwargs.pop('close_fds', True), **kwargs) |
419 | + stdout, stderr = process.communicate() |
420 | + if process.returncode: |
421 | + raise subprocess.CalledProcessError( |
422 | + process.returncode, repr(args), output=stdout+stderr) |
423 | + return stdout |
424 | + |
425 | + |
426 | +def script_name(): |
427 | + """Return the name of this script.""" |
428 | + return os.path.basename(sys.argv[0]) |
429 | + |
430 | + |
431 | +def search_file(regexp, filename): |
432 | + """Return the first line in `filename` that matches `regexp`.""" |
433 | with open(filename) as f: |
434 | for line in f: |
435 | - if re.match(content, line): |
436 | - return line.strip() |
437 | - |
438 | - |
439 | -def get_value_from_line(line): |
440 | - return line.split('=')[1].strip('"\' ') |
441 | - |
442 | - |
443 | -def script_name(): |
444 | - return os.path.basename(sys.argv[0]) |
445 | + if re.search(regexp, line): |
446 | + return line |
447 | + |
448 | + |
449 | +def ssh(location, user=None, key=None, caller=subprocess.call): |
450 | + """Return a callable that can be used to run ssh shell commands. |
451 | + |
452 | + The ssh `location` and, optionally, `user` must be given. |
453 | + If the user is None then the current user is used for the connection. |
454 | + |
455 | + The callable internally uses the given `caller`:: |
456 | + |
457 | + >>> def caller(cmd): |
458 | + ... print tuple(cmd) |
459 | + >>> sshcall = ssh('example.com', 'myuser', caller=caller) |
460 | + >>> root_sshcall = ssh('example.com', caller=caller) |
461 | + >>> sshcall('ls -l') # doctest: +ELLIPSIS |
462 | + ('ssh', '-t', ..., 'myuser@example.com', '--', 'ls -l') |
463 | + >>> root_sshcall('ls -l') # doctest: +ELLIPSIS |
464 | + ('ssh', '-t', ..., 'example.com', '--', 'ls -l') |
465 | + |
466 | + The ssh key path can be optionally provided:: |
467 | + |
468 | + >>> root_sshcall = ssh('example.com', key='/tmp/foo', caller=caller) |
469 | + >>> root_sshcall('ls -l') # doctest: +ELLIPSIS |
470 | + ('ssh', '-t', ..., '-i', '/tmp/foo', 'example.com', '--', 'ls -l') |
471 | + |
472 | + If the ssh command exits with an error code, |
473 | + a `subprocess.CalledProcessError` is raised:: |
474 | + |
475 | + >>> ssh('loc', caller=lambda cmd: 1)('ls -l') # doctest: +ELLIPSIS |
476 | + Traceback (most recent call last): |
477 | + CalledProcessError: ... |
478 | + |
479 | + If ignore_errors is set to True when executing the command, no error |
480 | + will be raised, even if the command itself returns an error code. |
481 | + |
482 | + >>> sshcall = ssh('loc', caller=lambda cmd: 1) |
483 | + >>> sshcall('ls -l', ignore_errors=True) |
484 | + """ |
485 | + sshcmd = [ |
486 | + 'ssh', |
487 | + '-t', |
488 | + '-t', # Yes, this second -t is deliberate. See `man ssh`. |
489 | + '-o', 'StrictHostKeyChecking=no', |
490 | + '-o', 'UserKnownHostsFile=/dev/null', |
491 | + ] |
492 | + if key is not None: |
493 | + sshcmd.extend(['-i', key]) |
494 | + if user is not None: |
495 | + location = '{}@{}'.format(user, location) |
496 | + sshcmd.extend([location, '--']) |
497 | + |
498 | + def _sshcall(cmd, ignore_errors=False): |
499 | + command = sshcmd + [cmd] |
500 | + retcode = caller(command) |
501 | + if retcode and not ignore_errors: |
502 | + raise subprocess.CalledProcessError(retcode, ' '.join(command)) |
503 | + |
504 | + return _sshcall |
505 | + |
506 | + |
507 | +@contextmanager |
508 | +def su(user): |
509 | + """A context manager to temporarily run the script as a different user.""" |
510 | + uid, gid = get_user_ids(user) |
511 | + os.setegid(gid) |
512 | + os.seteuid(uid) |
513 | + home = get_user_home(user) |
514 | + with environ(HOME=home): |
515 | + try: |
516 | + yield Env(uid, gid, home) |
517 | + finally: |
518 | + os.setegid(os.getgid()) |
519 | + os.seteuid(os.getuid()) |
520 | + |
521 | + |
522 | +def user_exists(username): |
523 | + """Return True if given `username` exists, e.g.:: |
524 | + |
525 | + >>> user_exists('root') |
526 | + True |
527 | + >>> user_exists('_this_user_does_not_exist_') |
528 | + False |
529 | + """ |
530 | + try: |
531 | + pwd.getpwnam(username) |
532 | + except KeyError: |
533 | + return False |
534 | + return True |
535 | |
536 | |
537 | def wait_for_page_contents(url, contents, timeout=120, validate=None): |
538 | @@ -148,83 +541,6 @@ |
539 | time.sleep(0.1) |
540 | |
541 | |
542 | -class Serializer: |
543 | - """Handle JSON (de)serialization.""" |
544 | - |
545 | - def __init__(self, path, default=None, serialize=None, deserialize=None): |
546 | - self.path = path |
547 | - self.default = default or {} |
548 | - self.serialize = serialize or json.dump |
549 | - self.deserialize = deserialize or json.load |
550 | - |
551 | - def exists(self): |
552 | - return os.path.exists(self.path) |
553 | - |
554 | - def get(self): |
555 | - if self.exists(): |
556 | - with open(self.path) as f: |
557 | - return self.deserialize(f) |
558 | - return self.default |
559 | - |
560 | - def set(self, data): |
561 | - with open(self.path, 'w') as f: |
562 | - self.serialize(data, f) |
563 | - |
564 | - |
565 | -def get_user_ids(user): |
566 | - """Return the uid and gid of given `user`, e.g.:: |
567 | - |
568 | - >>> get_user_ids('root') |
569 | - (0, 0) |
570 | - """ |
571 | - userdata = pwd.getpwnam(user) |
572 | - return userdata.pw_uid, userdata.pw_gid |
573 | - |
574 | - |
575 | -def get_user_home(user): |
576 | - """Return the home directory of the given `user`. |
577 | - |
578 | - >>> get_user_home('root') |
579 | - '/root' |
580 | - """ |
581 | - return pwd.getpwnam(user).pw_dir |
582 | - |
583 | - |
584 | -@contextmanager |
585 | -def cd(directory): |
586 | - """A context manager to temporary change current working dir, e.g.:: |
587 | - |
588 | - >>> import os |
589 | - >>> os.chdir('/tmp') |
590 | - >>> with cd('/bin'): print os.getcwd() |
591 | - /bin |
592 | - >>> os.getcwd() |
593 | - '/tmp' |
594 | - """ |
595 | - cwd = os.getcwd() |
596 | - os.chdir(directory) |
597 | - yield |
598 | - os.chdir(cwd) |
599 | - |
600 | - |
601 | -@contextmanager |
602 | -def su(user): |
603 | - """A context manager to temporary run the script as a different user.""" |
604 | - uid, gid = get_user_ids(user) |
605 | - os.setegid(gid) |
606 | - os.seteuid(uid) |
607 | - current_home = os.getenv('HOME') |
608 | - home = get_user_home(user) |
609 | - os.environ['HOME'] = home |
610 | - try: |
611 | - yield Env(uid, gid, home) |
612 | - finally: |
613 | - os.setegid(os.getgid()) |
614 | - os.seteuid(os.getuid()) |
615 | - if current_home is not None: |
616 | - os.environ['HOME'] = current_home |
617 | - |
618 | - |
619 | class DictDiffer: |
620 | """ |
621 | Calculate the difference between two dictionaries as: |
622 | |
623 | === modified file 'tests.py' |
624 | --- tests.py 2012-03-02 13:58:36 +0000 |
625 | +++ tests.py 2012-03-07 10:19:06 +0000 |
626 | @@ -6,44 +6,42 @@ |
627 | __metaclass__ = type |
628 | |
629 | |
630 | +import getpass |
631 | import os |
632 | from subprocess import CalledProcessError |
633 | +import tempfile |
634 | import unittest |
635 | |
636 | from shelltoolbox import ( |
637 | cd, |
638 | command, |
639 | DictDiffer, |
640 | + environ, |
641 | + file_append, |
642 | + file_prepend, |
643 | + generate_ssh_keys, |
644 | + get_su_command, |
645 | + get_user_home, |
646 | + get_user_ids, |
647 | + join_command, |
648 | + mkdirs, |
649 | run, |
650 | + search_file, |
651 | + Serializer, |
652 | + ssh, |
653 | su, |
654 | + user_exists, |
655 | ) |
656 | |
657 | |
658 | -class TestRun(unittest.TestCase): |
659 | - |
660 | - def testSimpleCommand(self): |
661 | - # Running a simple command (ls) works and running the command |
662 | - # produces a string. |
663 | - self.assertIsInstance(run('/bin/ls'), str) |
664 | - |
665 | - def testStdoutReturned(self): |
666 | - # Running a simple command (ls) works and running the command |
667 | - # produces a string. |
668 | - self.assertIn('Usage:', run('/bin/ls', '--help')) |
669 | - |
670 | - def testCalledProcessErrorRaised(self): |
671 | - # If an error occurs a CalledProcessError is raised with the return |
672 | - # code, command executed, and the output of the command. |
673 | - with self.assertRaises(CalledProcessError) as info: |
674 | - run('ls', '--not a valid switch') |
675 | - exception = info.exception |
676 | - self.assertEqual(2, exception.returncode) |
677 | - self.assertEqual("['ls', '--not a valid switch']", exception.cmd) |
678 | - self.assertIn('unrecognized option', exception.output) |
679 | - |
680 | - def testNoneArguments(self): |
681 | - # Ensure None is ignored when passed as positional argument. |
682 | - self.assertIn('Usage:', run('/bin/ls', None, '--help', None)) |
683 | +class TestCdContextManager(unittest.TestCase): |
684 | + |
685 | + def test_cd(self): |
686 | + curdir = os.getcwd() |
687 | + self.assertNotEqual('/var', curdir) |
688 | + with cd('/var'): |
689 | + self.assertEqual('/var', os.getcwd()) |
690 | + self.assertEqual(curdir, os.getcwd()) |
691 | |
692 | |
693 | class TestCommand(unittest.TestCase): |
694 | @@ -112,6 +110,349 @@ |
695 | self.assertEquals(expected, diff.added_or_changed) |
696 | |
697 | |
698 | +class TestEnviron(unittest.TestCase): |
699 | + |
700 | + def test_existing(self): |
701 | + # If an existing environment variable is changed, it is |
702 | + # restored during context cleanup. |
703 | + os.environ['MY_VARIABLE'] = 'foo' |
704 | + with environ(MY_VARIABLE='bar'): |
705 | + self.assertEqual('bar', os.getenv('MY_VARIABLE')) |
706 | + self.assertEqual('foo', os.getenv('MY_VARIABLE')) |
707 | + del os.environ['MY_VARIABLE'] |
708 | + |
709 | + def test_new(self): |
710 | + # If a new environment variable is added, it is removed during |
711 | + # context cleanup. |
712 | + with environ(MY_VAR1='foo', MY_VAR2='bar'): |
713 | + self.assertEqual('foo', os.getenv('MY_VAR1')) |
714 | + self.assertEqual('bar', os.getenv('MY_VAR2')) |
715 | + self.assertIsNone(os.getenv('MY_VAR1')) |
716 | + self.assertIsNone(os.getenv('MY_VAR2')) |
717 | + |
718 | + |
719 | +class BaseCreateFile(object): |
720 | + |
721 | + def create_file(self, content): |
722 | + f = tempfile.NamedTemporaryFile('w', delete=False) |
723 | + f.write(content) |
724 | + f.close() |
725 | + return f |
726 | + |
727 | + |
728 | +class BaseTestFile(BaseCreateFile): |
729 | + |
730 | + base_content = 'line1\n' |
731 | + new_content = 'new line\n' |
732 | + |
733 | + def check_file_content(self, content, filename): |
734 | + self.assertEqual(content, open(filename).read()) |
735 | + |
736 | + |
737 | +class TestFileAppend(unittest.TestCase, BaseTestFile): |
738 | + |
739 | + def test_append(self): |
740 | + # Ensure the new line is correctly added at the end of the file. |
741 | + f = self.create_file(self.base_content) |
742 | + file_append(f.name, self.new_content) |
743 | + self.check_file_content(self.base_content + self.new_content, f.name) |
744 | + |
745 | + def test_existing_content(self): |
746 | + # Ensure nothing happens if the file already contains the given line. |
747 | + content = self.base_content + self.new_content |
748 | + f = self.create_file(content) |
749 | + file_append(f.name, self.new_content) |
750 | + self.check_file_content(content, f.name) |
751 | + |
752 | + def test_new_line_in_file_contents(self): |
753 | + # A new line is automatically added before the given content if it |
754 | + # is not present at the end of current file. |
755 | + f = self.create_file(self.base_content.strip()) |
756 | + file_append(f.name, self.new_content) |
757 | + self.check_file_content(self.base_content + self.new_content, f.name) |
758 | + |
759 | + def test_new_line_in_given_line(self): |
760 | + # A new line is automatically added to the given line if not present. |
761 | + f = self.create_file(self.base_content) |
762 | + file_append(f.name, self.new_content.strip()) |
763 | + self.check_file_content(self.base_content + self.new_content, f.name) |
764 | + |
765 | + def test_non_existent_file(self): |
766 | + # Ensure the file is created if it does not exist. |
767 | + filename = tempfile.mktemp() |
768 | + file_append(filename, self.base_content) |
769 | + self.check_file_content(self.base_content, filename) |
770 | + |
771 | + def test_fragment(self): |
772 | + # Ensure a line fragment is not matched. |
773 | + f = self.create_file(self.base_content) |
774 | + fragment = self.base_content[2:] |
775 | + file_append(f.name, fragment) |
776 | + self.check_file_content(self.base_content + fragment, f.name) |
777 | + |
778 | + |
779 | +class TestFilePrepend(unittest.TestCase, BaseTestFile): |
780 | + |
781 | + def test_prpend(self): |
782 | + # Ensure the new content is correctly prepended at the beginning of |
783 | + # the file. |
784 | + f = self.create_file(self.base_content) |
785 | + file_prepend(f.name, self.new_content) |
786 | + self.check_file_content(self.new_content + self.base_content, f.name) |
787 | + |
788 | + def test_existing_content(self): |
789 | + # Ensure nothing happens if the file already starts with the given |
790 | + # content. |
791 | + content = self.base_content + self.new_content |
792 | + f = self.create_file(content) |
793 | + file_prepend(f.name, self.base_content) |
794 | + self.check_file_content(content, f.name) |
795 | + |
796 | + def test_move_content(self): |
797 | + # If the file contains the given content, but not at the beginning, |
798 | + # the content is moved on top. |
799 | + f = self.create_file(self.base_content + self.new_content) |
800 | + file_prepend(f.name, self.new_content) |
801 | + self.check_file_content(self.new_content + self.base_content, f.name) |
802 | + |
803 | + def test_new_line_in_given_line(self): |
804 | + # A new line is automatically added to the given line if not present. |
805 | + f = self.create_file(self.base_content) |
806 | + file_prepend(f.name, self.new_content.strip()) |
807 | + self.check_file_content(self.new_content + self.base_content, f.name) |
808 | + |
809 | + |
810 | +class TestGenerateSSHKeys(unittest.TestCase): |
811 | + |
812 | + def test_generation(self): |
813 | + # Ensure ssh keys are correctly generated. |
814 | + filename = tempfile.mktemp() |
815 | + generate_ssh_keys(filename) |
816 | + first_line = open(filename).readlines()[0].strip() |
817 | + self.assertEqual('-----BEGIN RSA PRIVATE KEY-----', first_line) |
818 | + pub_content = open(filename + '.pub').read() |
819 | + self.assertTrue(pub_content.startswith('ssh-rsa')) |
820 | + |
821 | + |
822 | +class TestGetSuCommand(unittest.TestCase): |
823 | + |
824 | + def test_current_user(self): |
825 | + # If the su is requested as current user, the arguments are |
826 | + # returned as given. |
827 | + cmd = ('ls', '-l') |
828 | + command = get_su_command(getpass.getuser(), cmd) |
829 | + self.assertSequenceEqual(cmd, command) |
830 | + |
831 | + def test_another_user(self): |
832 | + # Ensure "su" is prepended and arguments are correctly quoted. |
833 | + command = get_su_command('nobody', ('ls', '-l', 'my file')) |
834 | + self.assertSequenceEqual( |
835 | + ('su', 'nobody', '-c', "ls -l 'my file'"), command) |
836 | + |
837 | + |
838 | +class TestGetUserHome(unittest.TestCase): |
839 | + |
840 | + def test_existent(self): |
841 | + # Ensure the real home directory is returned for existing users. |
842 | + self.assertEqual('/root', get_user_home('root')) |
843 | + |
844 | + def test_non_existent(self): |
845 | + # If the user does not exist, return a default /home/[username] home. |
846 | + user = '_this_user_does_not_exist_' |
847 | + self.assertEqual('/home/' + user, get_user_home(user)) |
848 | + |
849 | + |
850 | +class TestGetUserIds(unittest.TestCase): |
851 | + |
852 | + def test_get_user_ids(self): |
853 | + # Ensure the correct uid and gid are returned. |
854 | + uid, gid = get_user_ids('root') |
855 | + self.assertEqual(0, uid) |
856 | + self.assertEqual(0, gid) |
857 | + |
858 | + |
859 | +class TestJoinCommand(unittest.TestCase): |
860 | + |
861 | + def test_normal(self): |
862 | + # Ensure a normal command is correctly parsed. |
863 | + command = 'ls -l' |
864 | + self.assertEqual(command, join_command(command.split())) |
865 | + |
866 | + def test_containing_spaces(self): |
867 | + # Ensure args containing spaces are correctly quoted. |
868 | + args = ('command', 'arg containig spaces') |
869 | + self.assertEqual("command 'arg containig spaces'", join_command(args)) |
870 | + |
871 | + def test_empty(self): |
872 | + # Ensure empty args are correctly quoted. |
873 | + args = ('command', '') |
874 | + self.assertEqual("command ''", join_command(args)) |
875 | + |
876 | + |
877 | +class TestMkdirs(unittest.TestCase): |
878 | + |
879 | + def test_intermediate_dirs(self): |
880 | + # Ensure the leaf directory and all intermediate ones are created. |
881 | + base_dir = tempfile.mktemp(suffix='/') |
882 | + dir1 = tempfile.mktemp(prefix=base_dir) |
883 | + dir2 = tempfile.mktemp(prefix=base_dir) |
884 | + mkdirs(dir1, dir2) |
885 | + self.assertTrue(os.path.isdir(dir1)) |
886 | + self.assertTrue(os.path.isdir(dir2)) |
887 | + |
888 | + def test_existing_dir(self): |
889 | + # If the leaf directory already exists the function returns |
890 | + # without errors. |
891 | + mkdirs('/tmp') |
892 | + |
893 | + def test_existing_file(self): |
894 | + # An `OSError` is raised if the leaf path exists and it is a file. |
895 | + f = tempfile.NamedTemporaryFile('w', delete=False) |
896 | + f.close() |
897 | + with self.assertRaises(OSError): |
898 | + mkdirs(f.name) |
899 | + |
900 | + |
901 | +class TestRun(unittest.TestCase): |
902 | + |
903 | + def testSimpleCommand(self): |
904 | + # Running a simple command (ls) works and running the command |
905 | + # produces a string. |
906 | + self.assertIsInstance(run('/bin/ls'), str) |
907 | + |
908 | + def testStdoutReturned(self): |
909 | + # Running a simple command (ls) works and running the command |
910 | + # produces a string. |
911 | + self.assertIn('Usage:', run('/bin/ls', '--help')) |
912 | + |
913 | + def testCalledProcessErrorRaised(self): |
914 | + # If an error occurs a CalledProcessError is raised with the return |
915 | + # code, command executed, and the output of the command. |
916 | + with self.assertRaises(CalledProcessError) as info: |
917 | + run('ls', '--not a valid switch') |
918 | + exception = info.exception |
919 | + self.assertEqual(2, exception.returncode) |
920 | + self.assertEqual("['ls', '--not a valid switch']", exception.cmd) |
921 | + self.assertIn('unrecognized option', exception.output) |
922 | + |
923 | + def testNoneArguments(self): |
924 | + # Ensure None is ignored when passed as positional argument. |
925 | + self.assertIn('Usage:', run('/bin/ls', None, '--help', None)) |
926 | + |
927 | + |
928 | +class TestSearchFile(unittest.TestCase, BaseCreateFile): |
929 | + |
930 | + content1 = 'content1\n' |
931 | + content2 = 'content2\n' |
932 | + |
933 | + def setUp(self): |
934 | + self.filename = self.create_file(self.content1 + self.content2).name |
935 | + |
936 | + def tearDown(self): |
937 | + os.remove(self.filename) |
938 | + |
939 | + def test_grep(self): |
940 | + # Ensure plain text is correctly matched. |
941 | + self.assertEqual(self.content2, search_file('ent2', self.filename)) |
942 | + self.assertEqual(self.content1, search_file('content', self.filename)) |
943 | + |
944 | + def test_no_match(self): |
945 | + # Ensure the function does not return false positives. |
946 | + self.assertIsNone(search_file('no_match', self.filename)) |
947 | + |
948 | + def test_regexp(self): |
949 | + # Ensure the function works with regular expressions. |
950 | + self.assertEqual(self.content2, search_file('\w2', self.filename)) |
951 | + |
952 | + |
953 | +class TestSerializer(unittest.TestCase): |
954 | + |
955 | + def setUp(self): |
956 | + self.path = tempfile.mktemp() |
957 | + self.data = {'key': 'value'} |
958 | + |
959 | + def tearDown(self): |
960 | + if os.path.exists(self.path): |
961 | + os.remove(self.path) |
962 | + |
963 | + def test_serializer(self): |
964 | + # Ensure data is correctly serializied and deserialized. |
965 | + s = Serializer(self.path) |
966 | + s.set(self.data) |
967 | + self.assertEqual(self.data, s.get()) |
968 | + |
969 | + def test_existence(self): |
970 | + # Ensure the file is created only when needed. |
971 | + s = Serializer(self.path) |
972 | + self.assertFalse(s.exists()) |
973 | + s.set(self.data) |
974 | + self.assertTrue(s.exists()) |
975 | + |
976 | + def test_default_value(self): |
977 | + # If the file does not exist, the serializer returns a default value. |
978 | + s = Serializer(self.path) |
979 | + self.assertEqual({}, s.get()) |
980 | + s = Serializer(self.path, default=47) |
981 | + self.assertEqual(47, s.get()) |
982 | + |
983 | + def test_another_serializer(self): |
984 | + # It is possible to use a custom serializer (e.g. pickle). |
985 | + import pickle |
986 | + s = Serializer( |
987 | + self.path, serialize=pickle.dump, deserialize=pickle.load) |
988 | + s.set(self.data) |
989 | + self.assertEqual(self.data, s.get()) |
990 | + |
991 | + |
992 | +class TestSSH(unittest.TestCase): |
993 | + |
994 | + def setUp(self): |
995 | + self.last_command = None |
996 | + |
997 | + def remove_command_options(self, cmd): |
998 | + cmd = list(cmd) |
999 | + del cmd[1:7] |
1000 | + return cmd |
1001 | + |
1002 | + def caller(self, cmd): |
1003 | + self.last_command = self.remove_command_options(cmd) |
1004 | + |
1005 | + def check_last_command(self, expected): |
1006 | + self.assertSequenceEqual(expected, self.last_command) |
1007 | + |
1008 | + def test_current_user(self): |
1009 | + # Ensure ssh command is correctly generated for current user. |
1010 | + sshcall = ssh('example.com', caller=self.caller) |
1011 | + sshcall('ls -l') |
1012 | + self.check_last_command(['ssh', 'example.com', '--', 'ls -l']) |
1013 | + |
1014 | + def test_another_user(self): |
1015 | + # Ensure ssh command is correctly generated for a different user. |
1016 | + sshcall = ssh('example.com', 'myuser', caller=self.caller) |
1017 | + sshcall('ls -l') |
1018 | + self.check_last_command(['ssh', 'myuser@example.com', '--', 'ls -l']) |
1019 | + |
1020 | + def test_ssh_key(self): |
1021 | + # The ssh key path can be optionally provided. |
1022 | + sshcall = ssh('example.com', key='/tmp/foo', caller=self.caller) |
1023 | + sshcall('ls -l') |
1024 | + self.check_last_command([ |
1025 | + 'ssh', '-i', '/tmp/foo', 'example.com', '--', 'ls -l']) |
1026 | + |
1027 | + def test_error(self): |
1028 | + # If the ssh command exits with an error code, a |
1029 | + # `subprocess.CalledProcessError` is raised. |
1030 | + sshcall = ssh('example.com', caller=lambda cmd: 1) |
1031 | + with self.assertRaises(CalledProcessError): |
1032 | + sshcall('ls -l') |
1033 | + |
1034 | + def test_ignore_errors(self): |
1035 | + # If ignore_errors is set to True when executing the command, no error |
1036 | + # will be raised, even if the command itself returns an error code. |
1037 | + sshcall = ssh('example.com', caller=lambda cmd: 1) |
1038 | + sshcall('ls -l', ignore_errors=True) |
1039 | + |
1040 | + |
1041 | current_euid = os.geteuid() |
1042 | current_egid = os.getegid() |
1043 | current_home = os.environ['HOME'] |
1044 | @@ -180,13 +521,11 @@ |
1045 | self.assertEqual(current_home, os.environ['HOME']) |
1046 | |
1047 | |
1048 | -class TestCdContextManager(unittest.TestCase): |
1049 | - def test_cd(self): |
1050 | - curdir = os.getcwd() |
1051 | - self.assertNotEqual('/var', curdir) |
1052 | - with cd('/var'): |
1053 | - self.assertEqual('/var', os.getcwd()) |
1054 | - self.assertEqual(curdir, os.getcwd()) |
1055 | +class TestUserExists(unittest.TestCase): |
1056 | + |
1057 | + def test_user_exists(self): |
1058 | + self.assertTrue(user_exists('root')) |
1059 | + self.assertFalse(user_exists('_this_user_does_not_exist_')) |
1060 | |
1061 | |
1062 | if __name__ == '__main__': |
Thank you! Small comments and suggestions follow.
I asked about reordering the functions for alphabetization. Not having run at the top is somewhat surprising to me, but I am ok with it.
I was struck that file_append really expects the line to be added to end with a \n. I suppose we could enforce that one way or another...or not. I'm OK with leaving as is, or you could add a \n if the line does not already end with one (and do this before you check if the line is in the content). On a somewhat related note, it might be better to check if the line to be added is in the output of readlines(), not read(); that would verify that the line, not a line that ends with the line, is in the file. For example, right now a file of "abc\ndef\n" would incorrectly show that the "ef\n" line was in the file.
In file_prepend, I am tempted to suggest that this snippet should be changed:
lines. remove( line)
lines. remove( line)
pass
if line in lines:
This iterates through the lines twice, which is mildly annoying to me but almost certainly not a practical concern for us with these use cases. That said, I'd be tempted to change that to the following:
try:
except ValueError:
You could also verify that the line to be added in this function ended with a \n if you wanted to.
I don't really understand this part of the docstring for get_su_command: su_command( ...)). Maybe an example would help.
This can be used together with `run` when the `su` context manager is not
enough (e.g. an external program uses uid rather than euid).
Oh! You mean that you would use run(*get_
I'm a bit concerned about what you are doing here in get_value_ from_line: '=')[1] .strip( '"\' ')
return line.split(
would maybe json be a safer approach? No, I tried it, and json.loads('"hi"') is fine but json.loads("'hi'") is not. :-/ I dunno. When do we use this? The logic seems a bit idiosyncratic.
The grep command also seems a bit idiosyncratic. Why is it reasonable to always strip the result? I also would be tempted to rename the funtion: grep does a lot more than that. simply search_file? Hm, I see that you just reordered this file, you didn't actually add grep.... Never mind then, I guess. I should have commented on this sooner, but you shouldn't have to worry about it. If you *do* want to change it, please leave the "grep" alias around for now so that our charms do not suddenly break.
I don't understand why you removed the Serializer. Was it defined twice?
I thought the choices you made for doc examples and for unit tests were good. Thank you.
Great job!
Gary