Merge lp:~frankban/python-shelltoolbox/helpers into lp:python-shelltoolbox

Proposed by Francesco Banconi
Status: Merged
Merged at revision: 14
Proposed branch: lp:~frankban/python-shelltoolbox/helpers
Merge into: lp:python-shelltoolbox
Diff against target: 1062 lines (+802/-147)
3 files modified
setup.py (+1/-1)
shelltoolbox/__init__.py (+430/-114)
tests.py (+371/-32)
To merge this branch: bzr merge lp:~frankban/python-shelltoolbox/helpers
Reviewer Review Type Date Requested Status
Gary Poster (community) Approve
Review via email: mp+96180@code.launchpad.net

Description of the change

== Changes ==

- Added several helper functions:
  - bzr_whois
  - file_append
  - file_prepend
  - generate_ssh_keys
  - get_su_command
  - join_command
  - mkdirs
  - ssh
  - user_exists

- Added `environ` context manager and updated `su` to use it.

- Changed `apt_get_install` helper: now the function uses environ to run dpkg
  in non-interactive mode.

- Changed `get_user_home` to handle non-existent users
  (returning a default home directory)

- Updated the `install_extra_repositories` helper to accept distribution
  placeholders.

- Updated `cd` context manager: yield in a try/finally block.

- Fixed `grep` helper.

== Tests ==

python tests.py
.....................................................
----------------------------------------------------------------------
Ran 53 tests in 0.124s

OK

To post a comment you must log in.
Revision history for this message
Gary Poster (gary) wrote :

Thank you! Small comments and suggestions follow.

I asked about reordering the functions for alphabetization. Not having run at the top is somewhat surprising to me, but I am ok with it.

I was struck that file_append really expects the line to be added to end with a \n. I suppose we could enforce that one way or another...or not. I'm OK with leaving as is, or you could add a \n if the line does not already end with one (and do this before you check if the line is in the content). On a somewhat related note, it might be better to check if the line to be added is in the output of readlines(), not read(); that would verify that the line, not a line that ends with the line, is in the file. For example, right now a file of "abc\ndef\n" would incorrectly show that the "ef\n" line was in the file.

In file_prepend, I am tempted to suggest that this snippet should be changed:
            if line in lines:
                lines.remove(line)
This iterates through the lines twice, which is mildly annoying to me but almost certainly not a practical concern for us with these use cases. That said, I'd be tempted to change that to the following:
            try:
                lines.remove(line)
            except ValueError:
                pass
You could also verify that the line to be added in this function ended with a \n if you wanted to.

I don't really understand this part of the docstring for get_su_command:
    This can be used together with `run` when the `su` context manager is not
    enough (e.g. an external program uses uid rather than euid).
Oh! You mean that you would use run(*get_su_command(...)). Maybe an example would help.

I'm a bit concerned about what you are doing here in get_value_from_line:
    return line.split('=')[1].strip('"\' ')
would maybe json be a safer approach? No, I tried it, and json.loads('"hi"') is fine but json.loads("'hi'") is not. :-/ I dunno. When do we use this? The logic seems a bit idiosyncratic.

The grep command also seems a bit idiosyncratic. Why is it reasonable to always strip the result? I also would be tempted to rename the funtion: grep does a lot more than that. simply search_file? Hm, I see that you just reordered this file, you didn't actually add grep.... Never mind then, I guess. I should have commented on this sooner, but you shouldn't have to worry about it. If you *do* want to change it, please leave the "grep" alias around for now so that our charms do not suddenly break.

I don't understand why you removed the Serializer. Was it defined twice?

I thought the choices you made for doc examples and for unit tests were good. Thank you.

Great job!

Gary

review: Approve
Revision history for this message
Francesco Banconi (frankban) wrote :

> I was struck that file_append really expects the line to be added to end with
> a \n. I suppose we could enforce that one way or another...or not. I'm OK
> with leaving as is, or you could add a \n if the line does not already end
> with one (and do this before you check if the line is in the content). On a
> somewhat related note, it might be better to check if the line to be added is
> in the output of readlines(), not read(); that would verify that the line, not
> a line that ends with the line, is in the file. For example, right now a file
> of "abc\ndef\n" would incorrectly show that the "ef\n" line was in the file.

I agree with your suggestions. I've changed the function and removed those
ambiguities. I've added tests for line not ending with '\n' and for line
fragments.

> In file_prepend, I am tempted to suggest that this snippet should be changed:
> if line in lines:
> lines.remove(line)
> This iterates through the lines twice, which is mildly annoying to me but
> almost certainly not a practical concern for us with these use cases. That
> said, I'd be tempted to change that to the following:
> try:
> lines.remove(line)
> except ValueError:
> pass
> You could also verify that the line to be added in this function ended with a
> \n if you wanted to.

Done.

> I'm a bit concerned about what you are doing here in get_value_from_line:
> return line.split('=')[1].strip('"\' ')
> would maybe json be a safer approach? No, I tried it, and json.loads('"hi"')
> is fine but json.loads("'hi'") is not. :-/ I dunno. When do we use this?
> The logic seems a bit idiosyncratic.

I don't remember the origin of that function. Since it is not used in the
charms or in setuplxc/lpsetup, I am going to remove it.

> The grep command also seems a bit idiosyncratic. Why is it reasonable to
> always strip the result? I also would be tempted to rename the funtion: grep
> does a lot more than that. simply search_file? Hm, I see that you just
> reordered this file, you didn't actually add grep.... Never mind then, I
> guess. I should have commented on this sooner, but you shouldn't have to
> worry about it. If you *do* want to change it, please leave the "grep" alias
> around for now so that our charms do not suddenly break.

Actually that helper is not used, and I remember it was before we changed the
way the buildslave is reconfigured. I think it's safe to rename it and change
the returned value.

> I don't understand why you removed the Serializer. Was it defined twice?

Yes it was.

Thanks Gary.

33. By Francesco Banconi

Fixed file_append.

34. By Francesco Banconi

Fixed file_prepend.

35. By Francesco Banconi

Removed get_value_from_line.

36. By Francesco Banconi

s/grep/search_file/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'setup.py'
--- setup.py 2012-03-01 20:49:48 +0000
+++ setup.py 2012-03-07 10:19:06 +0000
@@ -9,7 +9,7 @@
9ez_setup.use_setuptools()9ez_setup.use_setuptools()
1010
1111
12__version__ = '0.1.1'12__version__ = '0.2.0'
1313
14from setuptools import setup14from setuptools import setup
1515
1616
=== modified file 'shelltoolbox/__init__.py'
--- shelltoolbox/__init__.py 2012-03-02 14:59:46 +0000
+++ shelltoolbox/__init__.py 2012-03-07 10:19:06 +0000
@@ -19,25 +19,38 @@
19__metaclass__ = type19__metaclass__ = type
20__all__ = [20__all__ = [
21 'apt_get_install',21 'apt_get_install',
22 'bzr_whois',
22 'cd',23 'cd',
23 'command',24 'command',
24 'DictDiffer',25 'DictDiffer',
26 'environ',
27 'file_append',
28 'file_prepend',
29 'generate_ssh_keys',
30 'get_su_command',
31 'get_user_home',
25 'get_user_ids',32 'get_user_ids',
26 'get_user_home',
27 'get_value_from_line',
28 'grep',
29 'install_extra_repositories',33 'install_extra_repositories',
34 'join_command',
35 'mkdirs',
30 'run',36 'run',
31 'Serializer',37 'Serializer',
32 'script_name',38 'script_name',
39 'search_file',
40 'ssh',
33 'su',41 'su',
42 'user_exists',
43 'wait_for_page_contents',
34 ]44 ]
3545
36from collections import namedtuple46from collections import namedtuple
37from contextlib import contextmanager47from contextlib import contextmanager
48from email.Utils import parseaddr
49import errno
38import json50import json
39import operator51import operator
40import os52import os
53import pipes
41import pwd54import pwd
42import re55import re
43import subprocess56import subprocess
@@ -50,26 +63,50 @@
50Env = namedtuple('Env', 'uid gid home')63Env = namedtuple('Env', 'uid gid home')
5164
5265
53def run(*args, **kwargs):66def apt_get_install(*args, **kwargs):
54 """Run the command with the given arguments.67 """Install given packages using apt.
5568
56 The first argument is the path to the command to run.69 It is possible to pass environment variables to be set during install
57 Subsequent arguments are command-line arguments to be passed.70 using keyword arguments.
5871
59 This function accepts all optional keyword arguments accepted by72 :raises: subprocess.CalledProcessError
60 `subprocess.Popen`.73 """
61 """74 debian_frontend = kwargs.pop('DEBIAN_FRONTEND', 'noninteractive')
62 args = [i for i in args if i is not None]75 with environ(DEBIAN_FRONTEND=debian_frontend, **kwargs):
63 pipe = subprocess.PIPE76 cmd = ('apt-get', '-y', 'install') + args
64 process = subprocess.Popen(77 return run(*cmd)
65 args, stdout=kwargs.pop('stdout', pipe),78
66 stderr=kwargs.pop('stderr', pipe),79
67 close_fds=kwargs.pop('close_fds', True), **kwargs)80def bzr_whois(user):
68 stdout, stderr = process.communicate()81 """Return full name and email of bzr `user`.
69 if process.returncode:82
70 raise subprocess.CalledProcessError(83 Return None if the given `user` does not have a bzr user id.
71 process.returncode, repr(args), output=stdout+stderr)84 """
72 return stdout85 with su(user):
86 try:
87 whoami = run('bzr', 'whoami')
88 except (subprocess.CalledProcessError, OSError):
89 return None
90 return parseaddr(whoami)
91
92
93@contextmanager
94def cd(directory):
95 """A context manager to temporarily change current working dir, e.g.::
96
97 >>> import os
98 >>> os.chdir('/tmp')
99 >>> with cd('/bin'): print os.getcwd()
100 /bin
101 >>> print os.getcwd()
102 /tmp
103 """
104 cwd = os.getcwd()
105 os.chdir(directory)
106 try:
107 yield
108 finally:
109 os.chdir(cwd)
73110
74111
75def command(*base_args):112def command(*base_args):
@@ -97,37 +134,393 @@
97 return callable_command134 return callable_command
98135
99136
100apt_get_install = command('apt-get', 'install', '-y', '--force-yes')137@contextmanager
138def environ(**kwargs):
139 """A context manager to temporarily change environment variables.
140
141 If an existing environment variable is changed, it is restored during
142 context cleanup::
143
144 >>> import os
145 >>> os.environ['MY_VARIABLE'] = 'foo'
146 >>> with environ(MY_VARIABLE='bar'): print os.getenv('MY_VARIABLE')
147 bar
148 >>> print os.getenv('MY_VARIABLE')
149 foo
150 >>> del os.environ['MY_VARIABLE']
151
152 If we are adding environment variables, they are removed during context
153 cleanup::
154
155 >>> import os
156 >>> with environ(MY_VAR1='foo', MY_VAR2='bar'):
157 ... print os.getenv('MY_VAR1'), os.getenv('MY_VAR2')
158 foo bar
159 >>> os.getenv('MY_VAR1') == os.getenv('MY_VAR2') == None
160 True
161 """
162 backup = {}
163 for key, value in kwargs.items():
164 backup[key] = os.getenv(key)
165 os.environ[key] = value
166 try:
167 yield
168 finally:
169 for key, value in backup.items():
170 if value is None:
171 del os.environ[key]
172 else:
173 os.environ[key] = value
174
175
176def file_append(filename, line):
177 r"""Append given `line`, if not present, at the end of `filename`.
178
179 Usage example::
180
181 >>> import tempfile
182 >>> f = tempfile.NamedTemporaryFile('w', delete=False)
183 >>> f.write('line1\n')
184 >>> f.close()
185 >>> file_append(f.name, 'new line\n')
186 >>> open(f.name).read()
187 'line1\nnew line\n'
188
189 Nothing happens if the file already contains the given `line`::
190
191 >>> file_append(f.name, 'new line\n')
192 >>> open(f.name).read()
193 'line1\nnew line\n'
194
195 A new line is automatically added before the given `line` if it is not
196 present at the end of current file content::
197
198 >>> import tempfile
199 >>> f = tempfile.NamedTemporaryFile('w', delete=False)
200 >>> f.write('line1')
201 >>> f.close()
202 >>> file_append(f.name, 'new line\n')
203 >>> open(f.name).read()
204 'line1\nnew line\n'
205
206 The file is created if it does not exist::
207
208 >>> import tempfile
209 >>> filename = tempfile.mktemp()
210 >>> file_append(filename, 'line1\n')
211 >>> open(filename).read()
212 'line1\n'
213 """
214 if not line.endswith('\n'):
215 line += '\n'
216 with open(filename, 'a+') as f:
217 lines = f.readlines()
218 if line not in lines:
219 if not lines or lines[-1].endswith('\n'):
220 f.write(line)
221 else:
222 f.write('\n' + line)
223
224
225def file_prepend(filename, line):
226 r"""Insert given `line`, if not present, at the beginning of `filename`.
227
228 Usage example::
229
230 >>> import tempfile
231 >>> f = tempfile.NamedTemporaryFile('w', delete=False)
232 >>> f.write('line1\n')
233 >>> f.close()
234 >>> file_prepend(f.name, 'line0\n')
235 >>> open(f.name).read()
236 'line0\nline1\n'
237
238 If the file starts with the given `line`, nothing happens::
239
240 >>> file_prepend(f.name, 'line0\n')
241 >>> open(f.name).read()
242 'line0\nline1\n'
243
244 If the file contains the given `line`, but not at the beginning,
245 the line is moved on top::
246
247 >>> file_prepend(f.name, 'line1\n')
248 >>> open(f.name).read()
249 'line1\nline0\n'
250 """
251 if not line.endswith('\n'):
252 line += '\n'
253 with open(filename, 'r+') as f:
254 lines = f.readlines()
255 if lines[0] != line:
256 try:
257 lines.remove(line)
258 except ValueError:
259 pass
260 lines.insert(0, line)
261 f.seek(0)
262 f.writelines(lines)
263
264
265def generate_ssh_keys(path, passphrase=''):
266 """Generate ssh key pair, saving them inside the given `directory`.
267
268 >>> generate_ssh_keys('/tmp/id_rsa')
269 0
270 >>> open('/tmp/id_rsa').readlines()[0].strip()
271 '-----BEGIN RSA PRIVATE KEY-----'
272 >>> open('/tmp/id_rsa.pub').read().startswith('ssh-rsa')
273 True
274 >>> os.remove('/tmp/id_rsa')
275 >>> os.remove('/tmp/id_rsa.pub')
276 """
277 return subprocess.call([
278 'ssh-keygen', '-q', '-t', 'rsa', '-N', passphrase, '-f', path])
279
280
281def get_su_command(user, args):
282 """Return a command line as a sequence, prepending "su" if necessary.
283
284 This can be used together with `run` when the `su` context manager is not
285 enough (e.g. an external program uses uid rather than euid).
286
287 run(*get_su_command(user, ['bzr', 'whoami']))
288
289 If the su is requested as current user, the arguments are returned as
290 given::
291
292 >>> import getpass
293 >>> current_user = getpass.getuser()
294
295 >>> get_su_command(current_user, ('ls', '-l'))
296 ('ls', '-l')
297
298 Otherwise, "su" is prepended::
299
300 >>> get_su_command('nobody', ('ls', '-l', 'my file'))
301 ('su', 'nobody', '-c', "ls -l 'my file'")
302 """
303 if get_user_ids(user)[0] != os.getuid():
304 args = [i for i in args if i is not None]
305 return ('su', user, '-c', join_command(args))
306 return args
307
308
309def get_user_home(user):
310 """Return the home directory of the given `user`.
311
312 >>> get_user_home('root')
313 '/root'
314
315 If the user does not exist, return a default /home/[username] home::
316
317 >>> get_user_home('_this_user_does_not_exist_')
318 '/home/_this_user_does_not_exist_'
319 """
320 try:
321 return pwd.getpwnam(user).pw_dir
322 except KeyError:
323 return os.path.join(os.path.sep, 'home', user)
324
325
326def get_user_ids(user):
327 """Return the uid and gid of given `user`, e.g.::
328
329 >>> get_user_ids('root')
330 (0, 0)
331 """
332 userdata = pwd.getpwnam(user)
333 return userdata.pw_uid, userdata.pw_gid
101334
102335
103def install_extra_repositories(*repositories):336def install_extra_repositories(*repositories):
104 """Install all of the extra repositories and update apt.337 """Install all of the extra repositories and update apt.
105338
339 Given repositories can contain a "{distribution}" placeholder, that will
340 be replaced by current distribution codename.
341
106 :raises: subprocess.CalledProcessError342 :raises: subprocess.CalledProcessError
107 """343 """
108 distribution = run('lsb_release', '-cs').strip()344 distribution = run('lsb_release', '-cs').strip()
109 # Starting from Oneiric, the `apt-add-repository` is interactive by345 # Starting from Oneiric, `apt-add-repository` is interactive by
110 # default, and requires a "-y" flag to be set.346 # default, and requires a "-y" flag to be set.
111 assume_yes = None if distribution == 'lucid' else '-y'347 assume_yes = None if distribution == 'lucid' else '-y'
112 for repo in repositories:348 for repo in repositories:
113 run('apt-add-repository', assume_yes, repo)349 repository = repo.format(distribution=distribution)
350 run('apt-add-repository', assume_yes, repository)
114 run('apt-get', 'clean')351 run('apt-get', 'clean')
115 run('apt-get', 'update')352 run('apt-get', 'update')
116353
117354
118def grep(content, filename):355def join_command(args):
356 """Return a valid Unix command line from `args`.
357
358 >>> join_command(['ls', '-l'])
359 'ls -l'
360
361 Arguments containing spaces and empty args are correctly quoted::
362
363 >>> join_command(['command', 'arg1', 'arg containing spaces', ''])
364 "command arg1 'arg containing spaces' ''"
365 """
366 return ' '.join(pipes.quote(arg) for arg in args)
367
368
369def mkdirs(*args):
370 """Create leaf directories (given as `args`) and all intermediate ones.
371
372 >>> import tempfile
373 >>> base_dir = tempfile.mktemp(suffix='/')
374 >>> dir1 = tempfile.mktemp(prefix=base_dir)
375 >>> dir2 = tempfile.mktemp(prefix=base_dir)
376 >>> mkdirs(dir1, dir2)
377 >>> os.path.isdir(dir1)
378 True
379 >>> os.path.isdir(dir2)
380 True
381
382 If the leaf directory already exists the function returns without errors::
383
384 >>> mkdirs(dir1)
385
386 An `OSError` is raised if the leaf path exists and it is a file::
387
388 >>> f = tempfile.NamedTemporaryFile(
389 ... 'w', delete=False, prefix=base_dir)
390 >>> f.close()
391 >>> mkdirs(f.name) # doctest: +ELLIPSIS
392 Traceback (most recent call last):
393 OSError: ...
394 """
395 for directory in args:
396 try:
397 os.makedirs(directory)
398 except OSError as err:
399 if err.errno != errno.EEXIST or os.path.isfile(directory):
400 raise
401
402
403def run(*args, **kwargs):
404 """Run the command with the given arguments.
405
406 The first argument is the path to the command to run.
407 Subsequent arguments are command-line arguments to be passed.
408
409 This function accepts all optional keyword arguments accepted by
410 `subprocess.Popen`.
411 """
412 args = [i for i in args if i is not None]
413 pipe = subprocess.PIPE
414 process = subprocess.Popen(
415 args, stdout=kwargs.pop('stdout', pipe),
416 stderr=kwargs.pop('stderr', pipe),
417 close_fds=kwargs.pop('close_fds', True), **kwargs)
418 stdout, stderr = process.communicate()
419 if process.returncode:
420 raise subprocess.CalledProcessError(
421 process.returncode, repr(args), output=stdout+stderr)
422 return stdout
423
424
425def script_name():
426 """Return the name of this script."""
427 return os.path.basename(sys.argv[0])
428
429
430def search_file(regexp, filename):
431 """Return the first line in `filename` that matches `regexp`."""
119 with open(filename) as f:432 with open(filename) as f:
120 for line in f:433 for line in f:
121 if re.match(content, line):434 if re.search(regexp, line):
122 return line.strip()435 return line
123436
124437
125def get_value_from_line(line):438def ssh(location, user=None, key=None, caller=subprocess.call):
126 return line.split('=')[1].strip('"\' ')439 """Return a callable that can be used to run ssh shell commands.
127440
128441 The ssh `location` and, optionally, `user` must be given.
129def script_name():442 If the user is None then the current user is used for the connection.
130 return os.path.basename(sys.argv[0])443
444 The callable internally uses the given `caller`::
445
446 >>> def caller(cmd):
447 ... print tuple(cmd)
448 >>> sshcall = ssh('example.com', 'myuser', caller=caller)
449 >>> root_sshcall = ssh('example.com', caller=caller)
450 >>> sshcall('ls -l') # doctest: +ELLIPSIS
451 ('ssh', '-t', ..., 'myuser@example.com', '--', 'ls -l')
452 >>> root_sshcall('ls -l') # doctest: +ELLIPSIS
453 ('ssh', '-t', ..., 'example.com', '--', 'ls -l')
454
455 The ssh key path can be optionally provided::
456
457 >>> root_sshcall = ssh('example.com', key='/tmp/foo', caller=caller)
458 >>> root_sshcall('ls -l') # doctest: +ELLIPSIS
459 ('ssh', '-t', ..., '-i', '/tmp/foo', 'example.com', '--', 'ls -l')
460
461 If the ssh command exits with an error code,
462 a `subprocess.CalledProcessError` is raised::
463
464 >>> ssh('loc', caller=lambda cmd: 1)('ls -l') # doctest: +ELLIPSIS
465 Traceback (most recent call last):
466 CalledProcessError: ...
467
468 If ignore_errors is set to True when executing the command, no error
469 will be raised, even if the command itself returns an error code.
470
471 >>> sshcall = ssh('loc', caller=lambda cmd: 1)
472 >>> sshcall('ls -l', ignore_errors=True)
473 """
474 sshcmd = [
475 'ssh',
476 '-t',
477 '-t', # Yes, this second -t is deliberate. See `man ssh`.
478 '-o', 'StrictHostKeyChecking=no',
479 '-o', 'UserKnownHostsFile=/dev/null',
480 ]
481 if key is not None:
482 sshcmd.extend(['-i', key])
483 if user is not None:
484 location = '{}@{}'.format(user, location)
485 sshcmd.extend([location, '--'])
486
487 def _sshcall(cmd, ignore_errors=False):
488 command = sshcmd + [cmd]
489 retcode = caller(command)
490 if retcode and not ignore_errors:
491 raise subprocess.CalledProcessError(retcode, ' '.join(command))
492
493 return _sshcall
494
495
496@contextmanager
497def su(user):
498 """A context manager to temporarily run the script as a different user."""
499 uid, gid = get_user_ids(user)
500 os.setegid(gid)
501 os.seteuid(uid)
502 home = get_user_home(user)
503 with environ(HOME=home):
504 try:
505 yield Env(uid, gid, home)
506 finally:
507 os.setegid(os.getgid())
508 os.seteuid(os.getuid())
509
510
511def user_exists(username):
512 """Return True if given `username` exists, e.g.::
513
514 >>> user_exists('root')
515 True
516 >>> user_exists('_this_user_does_not_exist_')
517 False
518 """
519 try:
520 pwd.getpwnam(username)
521 except KeyError:
522 return False
523 return True
131524
132525
133def wait_for_page_contents(url, contents, timeout=120, validate=None):526def wait_for_page_contents(url, contents, timeout=120, validate=None):
@@ -148,83 +541,6 @@
148 time.sleep(0.1)541 time.sleep(0.1)
149542
150543
151class Serializer:
152 """Handle JSON (de)serialization."""
153
154 def __init__(self, path, default=None, serialize=None, deserialize=None):
155 self.path = path
156 self.default = default or {}
157 self.serialize = serialize or json.dump
158 self.deserialize = deserialize or json.load
159
160 def exists(self):
161 return os.path.exists(self.path)
162
163 def get(self):
164 if self.exists():
165 with open(self.path) as f:
166 return self.deserialize(f)
167 return self.default
168
169 def set(self, data):
170 with open(self.path, 'w') as f:
171 self.serialize(data, f)
172
173
174def get_user_ids(user):
175 """Return the uid and gid of given `user`, e.g.::
176
177 >>> get_user_ids('root')
178 (0, 0)
179 """
180 userdata = pwd.getpwnam(user)
181 return userdata.pw_uid, userdata.pw_gid
182
183
184def get_user_home(user):
185 """Return the home directory of the given `user`.
186
187 >>> get_user_home('root')
188 '/root'
189 """
190 return pwd.getpwnam(user).pw_dir
191
192
193@contextmanager
194def cd(directory):
195 """A context manager to temporary change current working dir, e.g.::
196
197 >>> import os
198 >>> os.chdir('/tmp')
199 >>> with cd('/bin'): print os.getcwd()
200 /bin
201 >>> os.getcwd()
202 '/tmp'
203 """
204 cwd = os.getcwd()
205 os.chdir(directory)
206 yield
207 os.chdir(cwd)
208
209
210@contextmanager
211def su(user):
212 """A context manager to temporary run the script as a different user."""
213 uid, gid = get_user_ids(user)
214 os.setegid(gid)
215 os.seteuid(uid)
216 current_home = os.getenv('HOME')
217 home = get_user_home(user)
218 os.environ['HOME'] = home
219 try:
220 yield Env(uid, gid, home)
221 finally:
222 os.setegid(os.getgid())
223 os.seteuid(os.getuid())
224 if current_home is not None:
225 os.environ['HOME'] = current_home
226
227
228class DictDiffer:544class DictDiffer:
229 """545 """
230 Calculate the difference between two dictionaries as:546 Calculate the difference between two dictionaries as:
231547
=== modified file 'tests.py'
--- tests.py 2012-03-02 13:58:36 +0000
+++ tests.py 2012-03-07 10:19:06 +0000
@@ -6,44 +6,42 @@
6__metaclass__ = type6__metaclass__ = type
77
88
9import getpass
9import os10import os
10from subprocess import CalledProcessError11from subprocess import CalledProcessError
12import tempfile
11import unittest13import unittest
1214
13from shelltoolbox import (15from shelltoolbox import (
14 cd,16 cd,
15 command,17 command,
16 DictDiffer,18 DictDiffer,
19 environ,
20 file_append,
21 file_prepend,
22 generate_ssh_keys,
23 get_su_command,
24 get_user_home,
25 get_user_ids,
26 join_command,
27 mkdirs,
17 run,28 run,
29 search_file,
30 Serializer,
31 ssh,
18 su,32 su,
33 user_exists,
19 )34 )
2035
2136
22class TestRun(unittest.TestCase):37class TestCdContextManager(unittest.TestCase):
2338
24 def testSimpleCommand(self):39 def test_cd(self):
25 # Running a simple command (ls) works and running the command40 curdir = os.getcwd()
26 # produces a string.41 self.assertNotEqual('/var', curdir)
27 self.assertIsInstance(run('/bin/ls'), str)42 with cd('/var'):
2843 self.assertEqual('/var', os.getcwd())
29 def testStdoutReturned(self):44 self.assertEqual(curdir, os.getcwd())
30 # Running a simple command (ls) works and running the command
31 # produces a string.
32 self.assertIn('Usage:', run('/bin/ls', '--help'))
33
34 def testCalledProcessErrorRaised(self):
35 # If an error occurs a CalledProcessError is raised with the return
36 # code, command executed, and the output of the command.
37 with self.assertRaises(CalledProcessError) as info:
38 run('ls', '--not a valid switch')
39 exception = info.exception
40 self.assertEqual(2, exception.returncode)
41 self.assertEqual("['ls', '--not a valid switch']", exception.cmd)
42 self.assertIn('unrecognized option', exception.output)
43
44 def testNoneArguments(self):
45 # Ensure None is ignored when passed as positional argument.
46 self.assertIn('Usage:', run('/bin/ls', None, '--help', None))
4745
4846
49class TestCommand(unittest.TestCase):47class TestCommand(unittest.TestCase):
@@ -112,6 +110,349 @@
112 self.assertEquals(expected, diff.added_or_changed)110 self.assertEquals(expected, diff.added_or_changed)
113111
114112
113class TestEnviron(unittest.TestCase):
114
115 def test_existing(self):
116 # If an existing environment variable is changed, it is
117 # restored during context cleanup.
118 os.environ['MY_VARIABLE'] = 'foo'
119 with environ(MY_VARIABLE='bar'):
120 self.assertEqual('bar', os.getenv('MY_VARIABLE'))
121 self.assertEqual('foo', os.getenv('MY_VARIABLE'))
122 del os.environ['MY_VARIABLE']
123
124 def test_new(self):
125 # If a new environment variable is added, it is removed during
126 # context cleanup.
127 with environ(MY_VAR1='foo', MY_VAR2='bar'):
128 self.assertEqual('foo', os.getenv('MY_VAR1'))
129 self.assertEqual('bar', os.getenv('MY_VAR2'))
130 self.assertIsNone(os.getenv('MY_VAR1'))
131 self.assertIsNone(os.getenv('MY_VAR2'))
132
133
134class BaseCreateFile(object):
135
136 def create_file(self, content):
137 f = tempfile.NamedTemporaryFile('w', delete=False)
138 f.write(content)
139 f.close()
140 return f
141
142
143class BaseTestFile(BaseCreateFile):
144
145 base_content = 'line1\n'
146 new_content = 'new line\n'
147
148 def check_file_content(self, content, filename):
149 self.assertEqual(content, open(filename).read())
150
151
152class TestFileAppend(unittest.TestCase, BaseTestFile):
153
154 def test_append(self):
155 # Ensure the new line is correctly added at the end of the file.
156 f = self.create_file(self.base_content)
157 file_append(f.name, self.new_content)
158 self.check_file_content(self.base_content + self.new_content, f.name)
159
160 def test_existing_content(self):
161 # Ensure nothing happens if the file already contains the given line.
162 content = self.base_content + self.new_content
163 f = self.create_file(content)
164 file_append(f.name, self.new_content)
165 self.check_file_content(content, f.name)
166
167 def test_new_line_in_file_contents(self):
168 # A new line is automatically added before the given content if it
169 # is not present at the end of current file.
170 f = self.create_file(self.base_content.strip())
171 file_append(f.name, self.new_content)
172 self.check_file_content(self.base_content + self.new_content, f.name)
173
174 def test_new_line_in_given_line(self):
175 # A new line is automatically added to the given line if not present.
176 f = self.create_file(self.base_content)
177 file_append(f.name, self.new_content.strip())
178 self.check_file_content(self.base_content + self.new_content, f.name)
179
180 def test_non_existent_file(self):
181 # Ensure the file is created if it does not exist.
182 filename = tempfile.mktemp()
183 file_append(filename, self.base_content)
184 self.check_file_content(self.base_content, filename)
185
186 def test_fragment(self):
187 # Ensure a line fragment is not matched.
188 f = self.create_file(self.base_content)
189 fragment = self.base_content[2:]
190 file_append(f.name, fragment)
191 self.check_file_content(self.base_content + fragment, f.name)
192
193
194class TestFilePrepend(unittest.TestCase, BaseTestFile):
195
196 def test_prpend(self):
197 # Ensure the new content is correctly prepended at the beginning of
198 # the file.
199 f = self.create_file(self.base_content)
200 file_prepend(f.name, self.new_content)
201 self.check_file_content(self.new_content + self.base_content, f.name)
202
203 def test_existing_content(self):
204 # Ensure nothing happens if the file already starts with the given
205 # content.
206 content = self.base_content + self.new_content
207 f = self.create_file(content)
208 file_prepend(f.name, self.base_content)
209 self.check_file_content(content, f.name)
210
211 def test_move_content(self):
212 # If the file contains the given content, but not at the beginning,
213 # the content is moved on top.
214 f = self.create_file(self.base_content + self.new_content)
215 file_prepend(f.name, self.new_content)
216 self.check_file_content(self.new_content + self.base_content, f.name)
217
218 def test_new_line_in_given_line(self):
219 # A new line is automatically added to the given line if not present.
220 f = self.create_file(self.base_content)
221 file_prepend(f.name, self.new_content.strip())
222 self.check_file_content(self.new_content + self.base_content, f.name)
223
224
225class TestGenerateSSHKeys(unittest.TestCase):
226
227 def test_generation(self):
228 # Ensure ssh keys are correctly generated.
229 filename = tempfile.mktemp()
230 generate_ssh_keys(filename)
231 first_line = open(filename).readlines()[0].strip()
232 self.assertEqual('-----BEGIN RSA PRIVATE KEY-----', first_line)
233 pub_content = open(filename + '.pub').read()
234 self.assertTrue(pub_content.startswith('ssh-rsa'))
235
236
237class TestGetSuCommand(unittest.TestCase):
238
239 def test_current_user(self):
240 # If the su is requested as current user, the arguments are
241 # returned as given.
242 cmd = ('ls', '-l')
243 command = get_su_command(getpass.getuser(), cmd)
244 self.assertSequenceEqual(cmd, command)
245
246 def test_another_user(self):
247 # Ensure "su" is prepended and arguments are correctly quoted.
248 command = get_su_command('nobody', ('ls', '-l', 'my file'))
249 self.assertSequenceEqual(
250 ('su', 'nobody', '-c', "ls -l 'my file'"), command)
251
252
253class TestGetUserHome(unittest.TestCase):
254
255 def test_existent(self):
256 # Ensure the real home directory is returned for existing users.
257 self.assertEqual('/root', get_user_home('root'))
258
259 def test_non_existent(self):
260 # If the user does not exist, return a default /home/[username] home.
261 user = '_this_user_does_not_exist_'
262 self.assertEqual('/home/' + user, get_user_home(user))
263
264
265class TestGetUserIds(unittest.TestCase):
266
267 def test_get_user_ids(self):
268 # Ensure the correct uid and gid are returned.
269 uid, gid = get_user_ids('root')
270 self.assertEqual(0, uid)
271 self.assertEqual(0, gid)
272
273
274class TestJoinCommand(unittest.TestCase):
275
276 def test_normal(self):
277 # Ensure a normal command is correctly parsed.
278 command = 'ls -l'
279 self.assertEqual(command, join_command(command.split()))
280
281 def test_containing_spaces(self):
282 # Ensure args containing spaces are correctly quoted.
283 args = ('command', 'arg containig spaces')
284 self.assertEqual("command 'arg containig spaces'", join_command(args))
285
286 def test_empty(self):
287 # Ensure empty args are correctly quoted.
288 args = ('command', '')
289 self.assertEqual("command ''", join_command(args))
290
291
292class TestMkdirs(unittest.TestCase):
293
294 def test_intermediate_dirs(self):
295 # Ensure the leaf directory and all intermediate ones are created.
296 base_dir = tempfile.mktemp(suffix='/')
297 dir1 = tempfile.mktemp(prefix=base_dir)
298 dir2 = tempfile.mktemp(prefix=base_dir)
299 mkdirs(dir1, dir2)
300 self.assertTrue(os.path.isdir(dir1))
301 self.assertTrue(os.path.isdir(dir2))
302
303 def test_existing_dir(self):
304 # If the leaf directory already exists the function returns
305 # without errors.
306 mkdirs('/tmp')
307
308 def test_existing_file(self):
309 # An `OSError` is raised if the leaf path exists and it is a file.
310 f = tempfile.NamedTemporaryFile('w', delete=False)
311 f.close()
312 with self.assertRaises(OSError):
313 mkdirs(f.name)
314
315
316class TestRun(unittest.TestCase):
317
318 def testSimpleCommand(self):
319 # Running a simple command (ls) works and running the command
320 # produces a string.
321 self.assertIsInstance(run('/bin/ls'), str)
322
323 def testStdoutReturned(self):
324 # Running a simple command (ls) works and running the command
325 # produces a string.
326 self.assertIn('Usage:', run('/bin/ls', '--help'))
327
328 def testCalledProcessErrorRaised(self):
329 # If an error occurs a CalledProcessError is raised with the return
330 # code, command executed, and the output of the command.
331 with self.assertRaises(CalledProcessError) as info:
332 run('ls', '--not a valid switch')
333 exception = info.exception
334 self.assertEqual(2, exception.returncode)
335 self.assertEqual("['ls', '--not a valid switch']", exception.cmd)
336 self.assertIn('unrecognized option', exception.output)
337
338 def testNoneArguments(self):
339 # Ensure None is ignored when passed as positional argument.
340 self.assertIn('Usage:', run('/bin/ls', None, '--help', None))
341
342
343class TestSearchFile(unittest.TestCase, BaseCreateFile):
344
345 content1 = 'content1\n'
346 content2 = 'content2\n'
347
348 def setUp(self):
349 self.filename = self.create_file(self.content1 + self.content2).name
350
351 def tearDown(self):
352 os.remove(self.filename)
353
354 def test_grep(self):
355 # Ensure plain text is correctly matched.
356 self.assertEqual(self.content2, search_file('ent2', self.filename))
357 self.assertEqual(self.content1, search_file('content', self.filename))
358
359 def test_no_match(self):
360 # Ensure the function does not return false positives.
361 self.assertIsNone(search_file('no_match', self.filename))
362
363 def test_regexp(self):
364 # Ensure the function works with regular expressions.
365 self.assertEqual(self.content2, search_file('\w2', self.filename))
366
367
368class TestSerializer(unittest.TestCase):
369
370 def setUp(self):
371 self.path = tempfile.mktemp()
372 self.data = {'key': 'value'}
373
374 def tearDown(self):
375 if os.path.exists(self.path):
376 os.remove(self.path)
377
378 def test_serializer(self):
379 # Ensure data is correctly serializied and deserialized.
380 s = Serializer(self.path)
381 s.set(self.data)
382 self.assertEqual(self.data, s.get())
383
384 def test_existence(self):
385 # Ensure the file is created only when needed.
386 s = Serializer(self.path)
387 self.assertFalse(s.exists())
388 s.set(self.data)
389 self.assertTrue(s.exists())
390
391 def test_default_value(self):
392 # If the file does not exist, the serializer returns a default value.
393 s = Serializer(self.path)
394 self.assertEqual({}, s.get())
395 s = Serializer(self.path, default=47)
396 self.assertEqual(47, s.get())
397
398 def test_another_serializer(self):
399 # It is possible to use a custom serializer (e.g. pickle).
400 import pickle
401 s = Serializer(
402 self.path, serialize=pickle.dump, deserialize=pickle.load)
403 s.set(self.data)
404 self.assertEqual(self.data, s.get())
405
406
407class TestSSH(unittest.TestCase):
408
409 def setUp(self):
410 self.last_command = None
411
412 def remove_command_options(self, cmd):
413 cmd = list(cmd)
414 del cmd[1:7]
415 return cmd
416
417 def caller(self, cmd):
418 self.last_command = self.remove_command_options(cmd)
419
420 def check_last_command(self, expected):
421 self.assertSequenceEqual(expected, self.last_command)
422
423 def test_current_user(self):
424 # Ensure ssh command is correctly generated for current user.
425 sshcall = ssh('example.com', caller=self.caller)
426 sshcall('ls -l')
427 self.check_last_command(['ssh', 'example.com', '--', 'ls -l'])
428
429 def test_another_user(self):
430 # Ensure ssh command is correctly generated for a different user.
431 sshcall = ssh('example.com', 'myuser', caller=self.caller)
432 sshcall('ls -l')
433 self.check_last_command(['ssh', 'myuser@example.com', '--', 'ls -l'])
434
435 def test_ssh_key(self):
436 # The ssh key path can be optionally provided.
437 sshcall = ssh('example.com', key='/tmp/foo', caller=self.caller)
438 sshcall('ls -l')
439 self.check_last_command([
440 'ssh', '-i', '/tmp/foo', 'example.com', '--', 'ls -l'])
441
442 def test_error(self):
443 # If the ssh command exits with an error code, a
444 # `subprocess.CalledProcessError` is raised.
445 sshcall = ssh('example.com', caller=lambda cmd: 1)
446 with self.assertRaises(CalledProcessError):
447 sshcall('ls -l')
448
449 def test_ignore_errors(self):
450 # If ignore_errors is set to True when executing the command, no error
451 # will be raised, even if the command itself returns an error code.
452 sshcall = ssh('example.com', caller=lambda cmd: 1)
453 sshcall('ls -l', ignore_errors=True)
454
455
115current_euid = os.geteuid()456current_euid = os.geteuid()
116current_egid = os.getegid()457current_egid = os.getegid()
117current_home = os.environ['HOME']458current_home = os.environ['HOME']
@@ -180,13 +521,11 @@
180 self.assertEqual(current_home, os.environ['HOME'])521 self.assertEqual(current_home, os.environ['HOME'])
181522
182523
183class TestCdContextManager(unittest.TestCase):524class TestUserExists(unittest.TestCase):
184 def test_cd(self):525
185 curdir = os.getcwd()526 def test_user_exists(self):
186 self.assertNotEqual('/var', curdir)527 self.assertTrue(user_exists('root'))
187 with cd('/var'):528 self.assertFalse(user_exists('_this_user_does_not_exist_'))
188 self.assertEqual('/var', os.getcwd())
189 self.assertEqual(curdir, os.getcwd())
190529
191530
192if __name__ == '__main__':531if __name__ == '__main__':

Subscribers

People subscribed via source and target branches