Merge lp:~frankban/python-shelltoolbox/helpers into lp:python-shelltoolbox

Proposed by Francesco Banconi on 2012-03-06
Status: Merged
Merged at revision: 14
Proposed branch: lp:~frankban/python-shelltoolbox/helpers
Merge into: lp:python-shelltoolbox
Diff against target: 1062 lines (+802/-147)
3 files modified
setup.py (+1/-1)
shelltoolbox/__init__.py (+430/-114)
tests.py (+371/-32)
To merge this branch: bzr merge lp:~frankban/python-shelltoolbox/helpers
Reviewer Review Type Date Requested Status
Gary Poster (community) 2012-03-06 Approve on 2012-03-06
Review via email: mp+96180@code.launchpad.net

Description of the change

== Changes ==

- Added several helper functions:
  - bzr_whois
  - file_append
  - file_prepend
  - generate_ssh_keys
  - get_su_command
  - join_command
  - mkdirs
  - ssh
  - user_exists

- Added `environ` context manager and updated `su` to use it.

- Changed `apt_get_install` helper: now the function uses environ to run dpkg
  in non-interactive mode.

- Changed `get_user_home` to handle non-existent users
  (returning a default home directory)

- Updated the `install_extra_repositories` helper to accept distribution
  placeholders.

- Updated `cd` context manager: yield in a try/finally block.

- Fixed `grep` helper.

== Tests ==

python tests.py
.....................................................
----------------------------------------------------------------------
Ran 53 tests in 0.124s

OK

To post a comment you must log in.
Gary Poster (gary) wrote :

Thank you! Small comments and suggestions follow.

I asked about reordering the functions for alphabetization. Not having run at the top is somewhat surprising to me, but I am ok with it.

I was struck that file_append really expects the line to be added to end with a \n. I suppose we could enforce that one way or another...or not. I'm OK with leaving as is, or you could add a \n if the line does not already end with one (and do this before you check if the line is in the content). On a somewhat related note, it might be better to check if the line to be added is in the output of readlines(), not read(); that would verify that the line, not a line that ends with the line, is in the file. For example, right now a file of "abc\ndef\n" would incorrectly show that the "ef\n" line was in the file.

In file_prepend, I am tempted to suggest that this snippet should be changed:
            if line in lines:
                lines.remove(line)
This iterates through the lines twice, which is mildly annoying to me but almost certainly not a practical concern for us with these use cases. That said, I'd be tempted to change that to the following:
            try:
                lines.remove(line)
            except ValueError:
                pass
You could also verify that the line to be added in this function ended with a \n if you wanted to.

I don't really understand this part of the docstring for get_su_command:
    This can be used together with `run` when the `su` context manager is not
    enough (e.g. an external program uses uid rather than euid).
Oh! You mean that you would use run(*get_su_command(...)). Maybe an example would help.

I'm a bit concerned about what you are doing here in get_value_from_line:
    return line.split('=')[1].strip('"\' ')
would maybe json be a safer approach? No, I tried it, and json.loads('"hi"') is fine but json.loads("'hi'") is not. :-/ I dunno. When do we use this? The logic seems a bit idiosyncratic.

The grep command also seems a bit idiosyncratic. Why is it reasonable to always strip the result? I also would be tempted to rename the funtion: grep does a lot more than that. simply search_file? Hm, I see that you just reordered this file, you didn't actually add grep.... Never mind then, I guess. I should have commented on this sooner, but you shouldn't have to worry about it. If you *do* want to change it, please leave the "grep" alias around for now so that our charms do not suddenly break.

I don't understand why you removed the Serializer. Was it defined twice?

I thought the choices you made for doc examples and for unit tests were good. Thank you.

Great job!

Gary

review: Approve
Francesco Banconi (frankban) wrote :

> I was struck that file_append really expects the line to be added to end with
> a \n. I suppose we could enforce that one way or another...or not. I'm OK
> with leaving as is, or you could add a \n if the line does not already end
> with one (and do this before you check if the line is in the content). On a
> somewhat related note, it might be better to check if the line to be added is
> in the output of readlines(), not read(); that would verify that the line, not
> a line that ends with the line, is in the file. For example, right now a file
> of "abc\ndef\n" would incorrectly show that the "ef\n" line was in the file.

I agree with your suggestions. I've changed the function and removed those
ambiguities. I've added tests for line not ending with '\n' and for line
fragments.

> In file_prepend, I am tempted to suggest that this snippet should be changed:
> if line in lines:
> lines.remove(line)
> This iterates through the lines twice, which is mildly annoying to me but
> almost certainly not a practical concern for us with these use cases. That
> said, I'd be tempted to change that to the following:
> try:
> lines.remove(line)
> except ValueError:
> pass
> You could also verify that the line to be added in this function ended with a
> \n if you wanted to.

Done.

> I'm a bit concerned about what you are doing here in get_value_from_line:
> return line.split('=')[1].strip('"\' ')
> would maybe json be a safer approach? No, I tried it, and json.loads('"hi"')
> is fine but json.loads("'hi'") is not. :-/ I dunno. When do we use this?
> The logic seems a bit idiosyncratic.

I don't remember the origin of that function. Since it is not used in the
charms or in setuplxc/lpsetup, I am going to remove it.

> The grep command also seems a bit idiosyncratic. Why is it reasonable to
> always strip the result? I also would be tempted to rename the funtion: grep
> does a lot more than that. simply search_file? Hm, I see that you just
> reordered this file, you didn't actually add grep.... Never mind then, I
> guess. I should have commented on this sooner, but you shouldn't have to
> worry about it. If you *do* want to change it, please leave the "grep" alias
> around for now so that our charms do not suddenly break.

Actually that helper is not used, and I remember it was before we changed the
way the buildslave is reconfigured. I think it's safe to rename it and change
the returned value.

> I don't understand why you removed the Serializer. Was it defined twice?

Yes it was.

Thanks Gary.

33. By Francesco Banconi on 2012-03-07

Fixed file_append.

34. By Francesco Banconi on 2012-03-07

Fixed file_prepend.

35. By Francesco Banconi on 2012-03-07

Removed get_value_from_line.

36. By Francesco Banconi on 2012-03-07

s/grep/search_file/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'setup.py'
2--- setup.py 2012-03-01 20:49:48 +0000
3+++ setup.py 2012-03-07 10:19:06 +0000
4@@ -9,7 +9,7 @@
5 ez_setup.use_setuptools()
6
7
8-__version__ = '0.1.1'
9+__version__ = '0.2.0'
10
11 from setuptools import setup
12
13
14=== modified file 'shelltoolbox/__init__.py'
15--- shelltoolbox/__init__.py 2012-03-02 14:59:46 +0000
16+++ shelltoolbox/__init__.py 2012-03-07 10:19:06 +0000
17@@ -19,25 +19,38 @@
18 __metaclass__ = type
19 __all__ = [
20 'apt_get_install',
21+ 'bzr_whois',
22 'cd',
23 'command',
24 'DictDiffer',
25+ 'environ',
26+ 'file_append',
27+ 'file_prepend',
28+ 'generate_ssh_keys',
29+ 'get_su_command',
30+ 'get_user_home',
31 'get_user_ids',
32- 'get_user_home',
33- 'get_value_from_line',
34- 'grep',
35 'install_extra_repositories',
36+ 'join_command',
37+ 'mkdirs',
38 'run',
39 'Serializer',
40 'script_name',
41+ 'search_file',
42+ 'ssh',
43 'su',
44+ 'user_exists',
45+ 'wait_for_page_contents',
46 ]
47
48 from collections import namedtuple
49 from contextlib import contextmanager
50+from email.Utils import parseaddr
51+import errno
52 import json
53 import operator
54 import os
55+import pipes
56 import pwd
57 import re
58 import subprocess
59@@ -50,26 +63,50 @@
60 Env = namedtuple('Env', 'uid gid home')
61
62
63-def run(*args, **kwargs):
64- """Run the command with the given arguments.
65-
66- The first argument is the path to the command to run.
67- Subsequent arguments are command-line arguments to be passed.
68-
69- This function accepts all optional keyword arguments accepted by
70- `subprocess.Popen`.
71- """
72- args = [i for i in args if i is not None]
73- pipe = subprocess.PIPE
74- process = subprocess.Popen(
75- args, stdout=kwargs.pop('stdout', pipe),
76- stderr=kwargs.pop('stderr', pipe),
77- close_fds=kwargs.pop('close_fds', True), **kwargs)
78- stdout, stderr = process.communicate()
79- if process.returncode:
80- raise subprocess.CalledProcessError(
81- process.returncode, repr(args), output=stdout+stderr)
82- return stdout
83+def apt_get_install(*args, **kwargs):
84+ """Install given packages using apt.
85+
86+ It is possible to pass environment variables to be set during install
87+ using keyword arguments.
88+
89+ :raises: subprocess.CalledProcessError
90+ """
91+ debian_frontend = kwargs.pop('DEBIAN_FRONTEND', 'noninteractive')
92+ with environ(DEBIAN_FRONTEND=debian_frontend, **kwargs):
93+ cmd = ('apt-get', '-y', 'install') + args
94+ return run(*cmd)
95+
96+
97+def bzr_whois(user):
98+ """Return full name and email of bzr `user`.
99+
100+ Return None if the given `user` does not have a bzr user id.
101+ """
102+ with su(user):
103+ try:
104+ whoami = run('bzr', 'whoami')
105+ except (subprocess.CalledProcessError, OSError):
106+ return None
107+ return parseaddr(whoami)
108+
109+
110+@contextmanager
111+def cd(directory):
112+ """A context manager to temporarily change current working dir, e.g.::
113+
114+ >>> import os
115+ >>> os.chdir('/tmp')
116+ >>> with cd('/bin'): print os.getcwd()
117+ /bin
118+ >>> print os.getcwd()
119+ /tmp
120+ """
121+ cwd = os.getcwd()
122+ os.chdir(directory)
123+ try:
124+ yield
125+ finally:
126+ os.chdir(cwd)
127
128
129 def command(*base_args):
130@@ -97,37 +134,393 @@
131 return callable_command
132
133
134-apt_get_install = command('apt-get', 'install', '-y', '--force-yes')
135+@contextmanager
136+def environ(**kwargs):
137+ """A context manager to temporarily change environment variables.
138+
139+ If an existing environment variable is changed, it is restored during
140+ context cleanup::
141+
142+ >>> import os
143+ >>> os.environ['MY_VARIABLE'] = 'foo'
144+ >>> with environ(MY_VARIABLE='bar'): print os.getenv('MY_VARIABLE')
145+ bar
146+ >>> print os.getenv('MY_VARIABLE')
147+ foo
148+ >>> del os.environ['MY_VARIABLE']
149+
150+ If we are adding environment variables, they are removed during context
151+ cleanup::
152+
153+ >>> import os
154+ >>> with environ(MY_VAR1='foo', MY_VAR2='bar'):
155+ ... print os.getenv('MY_VAR1'), os.getenv('MY_VAR2')
156+ foo bar
157+ >>> os.getenv('MY_VAR1') == os.getenv('MY_VAR2') == None
158+ True
159+ """
160+ backup = {}
161+ for key, value in kwargs.items():
162+ backup[key] = os.getenv(key)
163+ os.environ[key] = value
164+ try:
165+ yield
166+ finally:
167+ for key, value in backup.items():
168+ if value is None:
169+ del os.environ[key]
170+ else:
171+ os.environ[key] = value
172+
173+
174+def file_append(filename, line):
175+ r"""Append given `line`, if not present, at the end of `filename`.
176+
177+ Usage example::
178+
179+ >>> import tempfile
180+ >>> f = tempfile.NamedTemporaryFile('w', delete=False)
181+ >>> f.write('line1\n')
182+ >>> f.close()
183+ >>> file_append(f.name, 'new line\n')
184+ >>> open(f.name).read()
185+ 'line1\nnew line\n'
186+
187+ Nothing happens if the file already contains the given `line`::
188+
189+ >>> file_append(f.name, 'new line\n')
190+ >>> open(f.name).read()
191+ 'line1\nnew line\n'
192+
193+ A new line is automatically added before the given `line` if it is not
194+ present at the end of current file content::
195+
196+ >>> import tempfile
197+ >>> f = tempfile.NamedTemporaryFile('w', delete=False)
198+ >>> f.write('line1')
199+ >>> f.close()
200+ >>> file_append(f.name, 'new line\n')
201+ >>> open(f.name).read()
202+ 'line1\nnew line\n'
203+
204+ The file is created if it does not exist::
205+
206+ >>> import tempfile
207+ >>> filename = tempfile.mktemp()
208+ >>> file_append(filename, 'line1\n')
209+ >>> open(filename).read()
210+ 'line1\n'
211+ """
212+ if not line.endswith('\n'):
213+ line += '\n'
214+ with open(filename, 'a+') as f:
215+ lines = f.readlines()
216+ if line not in lines:
217+ if not lines or lines[-1].endswith('\n'):
218+ f.write(line)
219+ else:
220+ f.write('\n' + line)
221+
222+
223+def file_prepend(filename, line):
224+ r"""Insert given `line`, if not present, at the beginning of `filename`.
225+
226+ Usage example::
227+
228+ >>> import tempfile
229+ >>> f = tempfile.NamedTemporaryFile('w', delete=False)
230+ >>> f.write('line1\n')
231+ >>> f.close()
232+ >>> file_prepend(f.name, 'line0\n')
233+ >>> open(f.name).read()
234+ 'line0\nline1\n'
235+
236+ If the file starts with the given `line`, nothing happens::
237+
238+ >>> file_prepend(f.name, 'line0\n')
239+ >>> open(f.name).read()
240+ 'line0\nline1\n'
241+
242+ If the file contains the given `line`, but not at the beginning,
243+ the line is moved on top::
244+
245+ >>> file_prepend(f.name, 'line1\n')
246+ >>> open(f.name).read()
247+ 'line1\nline0\n'
248+ """
249+ if not line.endswith('\n'):
250+ line += '\n'
251+ with open(filename, 'r+') as f:
252+ lines = f.readlines()
253+ if lines[0] != line:
254+ try:
255+ lines.remove(line)
256+ except ValueError:
257+ pass
258+ lines.insert(0, line)
259+ f.seek(0)
260+ f.writelines(lines)
261+
262+
263+def generate_ssh_keys(path, passphrase=''):
264+ """Generate ssh key pair, saving them inside the given `directory`.
265+
266+ >>> generate_ssh_keys('/tmp/id_rsa')
267+ 0
268+ >>> open('/tmp/id_rsa').readlines()[0].strip()
269+ '-----BEGIN RSA PRIVATE KEY-----'
270+ >>> open('/tmp/id_rsa.pub').read().startswith('ssh-rsa')
271+ True
272+ >>> os.remove('/tmp/id_rsa')
273+ >>> os.remove('/tmp/id_rsa.pub')
274+ """
275+ return subprocess.call([
276+ 'ssh-keygen', '-q', '-t', 'rsa', '-N', passphrase, '-f', path])
277+
278+
279+def get_su_command(user, args):
280+ """Return a command line as a sequence, prepending "su" if necessary.
281+
282+ This can be used together with `run` when the `su` context manager is not
283+ enough (e.g. an external program uses uid rather than euid).
284+
285+ run(*get_su_command(user, ['bzr', 'whoami']))
286+
287+ If the su is requested as current user, the arguments are returned as
288+ given::
289+
290+ >>> import getpass
291+ >>> current_user = getpass.getuser()
292+
293+ >>> get_su_command(current_user, ('ls', '-l'))
294+ ('ls', '-l')
295+
296+ Otherwise, "su" is prepended::
297+
298+ >>> get_su_command('nobody', ('ls', '-l', 'my file'))
299+ ('su', 'nobody', '-c', "ls -l 'my file'")
300+ """
301+ if get_user_ids(user)[0] != os.getuid():
302+ args = [i for i in args if i is not None]
303+ return ('su', user, '-c', join_command(args))
304+ return args
305+
306+
307+def get_user_home(user):
308+ """Return the home directory of the given `user`.
309+
310+ >>> get_user_home('root')
311+ '/root'
312+
313+ If the user does not exist, return a default /home/[username] home::
314+
315+ >>> get_user_home('_this_user_does_not_exist_')
316+ '/home/_this_user_does_not_exist_'
317+ """
318+ try:
319+ return pwd.getpwnam(user).pw_dir
320+ except KeyError:
321+ return os.path.join(os.path.sep, 'home', user)
322+
323+
324+def get_user_ids(user):
325+ """Return the uid and gid of given `user`, e.g.::
326+
327+ >>> get_user_ids('root')
328+ (0, 0)
329+ """
330+ userdata = pwd.getpwnam(user)
331+ return userdata.pw_uid, userdata.pw_gid
332
333
334 def install_extra_repositories(*repositories):
335 """Install all of the extra repositories and update apt.
336
337+ Given repositories can contain a "{distribution}" placeholder, that will
338+ be replaced by current distribution codename.
339+
340 :raises: subprocess.CalledProcessError
341 """
342 distribution = run('lsb_release', '-cs').strip()
343- # Starting from Oneiric, the `apt-add-repository` is interactive by
344+ # Starting from Oneiric, `apt-add-repository` is interactive by
345 # default, and requires a "-y" flag to be set.
346 assume_yes = None if distribution == 'lucid' else '-y'
347 for repo in repositories:
348- run('apt-add-repository', assume_yes, repo)
349+ repository = repo.format(distribution=distribution)
350+ run('apt-add-repository', assume_yes, repository)
351 run('apt-get', 'clean')
352 run('apt-get', 'update')
353
354
355-def grep(content, filename):
356+def join_command(args):
357+ """Return a valid Unix command line from `args`.
358+
359+ >>> join_command(['ls', '-l'])
360+ 'ls -l'
361+
362+ Arguments containing spaces and empty args are correctly quoted::
363+
364+ >>> join_command(['command', 'arg1', 'arg containing spaces', ''])
365+ "command arg1 'arg containing spaces' ''"
366+ """
367+ return ' '.join(pipes.quote(arg) for arg in args)
368+
369+
370+def mkdirs(*args):
371+ """Create leaf directories (given as `args`) and all intermediate ones.
372+
373+ >>> import tempfile
374+ >>> base_dir = tempfile.mktemp(suffix='/')
375+ >>> dir1 = tempfile.mktemp(prefix=base_dir)
376+ >>> dir2 = tempfile.mktemp(prefix=base_dir)
377+ >>> mkdirs(dir1, dir2)
378+ >>> os.path.isdir(dir1)
379+ True
380+ >>> os.path.isdir(dir2)
381+ True
382+
383+ If the leaf directory already exists the function returns without errors::
384+
385+ >>> mkdirs(dir1)
386+
387+ An `OSError` is raised if the leaf path exists and it is a file::
388+
389+ >>> f = tempfile.NamedTemporaryFile(
390+ ... 'w', delete=False, prefix=base_dir)
391+ >>> f.close()
392+ >>> mkdirs(f.name) # doctest: +ELLIPSIS
393+ Traceback (most recent call last):
394+ OSError: ...
395+ """
396+ for directory in args:
397+ try:
398+ os.makedirs(directory)
399+ except OSError as err:
400+ if err.errno != errno.EEXIST or os.path.isfile(directory):
401+ raise
402+
403+
404+def run(*args, **kwargs):
405+ """Run the command with the given arguments.
406+
407+ The first argument is the path to the command to run.
408+ Subsequent arguments are command-line arguments to be passed.
409+
410+ This function accepts all optional keyword arguments accepted by
411+ `subprocess.Popen`.
412+ """
413+ args = [i for i in args if i is not None]
414+ pipe = subprocess.PIPE
415+ process = subprocess.Popen(
416+ args, stdout=kwargs.pop('stdout', pipe),
417+ stderr=kwargs.pop('stderr', pipe),
418+ close_fds=kwargs.pop('close_fds', True), **kwargs)
419+ stdout, stderr = process.communicate()
420+ if process.returncode:
421+ raise subprocess.CalledProcessError(
422+ process.returncode, repr(args), output=stdout+stderr)
423+ return stdout
424+
425+
426+def script_name():
427+ """Return the name of this script."""
428+ return os.path.basename(sys.argv[0])
429+
430+
431+def search_file(regexp, filename):
432+ """Return the first line in `filename` that matches `regexp`."""
433 with open(filename) as f:
434 for line in f:
435- if re.match(content, line):
436- return line.strip()
437-
438-
439-def get_value_from_line(line):
440- return line.split('=')[1].strip('"\' ')
441-
442-
443-def script_name():
444- return os.path.basename(sys.argv[0])
445+ if re.search(regexp, line):
446+ return line
447+
448+
449+def ssh(location, user=None, key=None, caller=subprocess.call):
450+ """Return a callable that can be used to run ssh shell commands.
451+
452+ The ssh `location` and, optionally, `user` must be given.
453+ If the user is None then the current user is used for the connection.
454+
455+ The callable internally uses the given `caller`::
456+
457+ >>> def caller(cmd):
458+ ... print tuple(cmd)
459+ >>> sshcall = ssh('example.com', 'myuser', caller=caller)
460+ >>> root_sshcall = ssh('example.com', caller=caller)
461+ >>> sshcall('ls -l') # doctest: +ELLIPSIS
462+ ('ssh', '-t', ..., 'myuser@example.com', '--', 'ls -l')
463+ >>> root_sshcall('ls -l') # doctest: +ELLIPSIS
464+ ('ssh', '-t', ..., 'example.com', '--', 'ls -l')
465+
466+ The ssh key path can be optionally provided::
467+
468+ >>> root_sshcall = ssh('example.com', key='/tmp/foo', caller=caller)
469+ >>> root_sshcall('ls -l') # doctest: +ELLIPSIS
470+ ('ssh', '-t', ..., '-i', '/tmp/foo', 'example.com', '--', 'ls -l')
471+
472+ If the ssh command exits with an error code,
473+ a `subprocess.CalledProcessError` is raised::
474+
475+ >>> ssh('loc', caller=lambda cmd: 1)('ls -l') # doctest: +ELLIPSIS
476+ Traceback (most recent call last):
477+ CalledProcessError: ...
478+
479+ If ignore_errors is set to True when executing the command, no error
480+ will be raised, even if the command itself returns an error code.
481+
482+ >>> sshcall = ssh('loc', caller=lambda cmd: 1)
483+ >>> sshcall('ls -l', ignore_errors=True)
484+ """
485+ sshcmd = [
486+ 'ssh',
487+ '-t',
488+ '-t', # Yes, this second -t is deliberate. See `man ssh`.
489+ '-o', 'StrictHostKeyChecking=no',
490+ '-o', 'UserKnownHostsFile=/dev/null',
491+ ]
492+ if key is not None:
493+ sshcmd.extend(['-i', key])
494+ if user is not None:
495+ location = '{}@{}'.format(user, location)
496+ sshcmd.extend([location, '--'])
497+
498+ def _sshcall(cmd, ignore_errors=False):
499+ command = sshcmd + [cmd]
500+ retcode = caller(command)
501+ if retcode and not ignore_errors:
502+ raise subprocess.CalledProcessError(retcode, ' '.join(command))
503+
504+ return _sshcall
505+
506+
507+@contextmanager
508+def su(user):
509+ """A context manager to temporarily run the script as a different user."""
510+ uid, gid = get_user_ids(user)
511+ os.setegid(gid)
512+ os.seteuid(uid)
513+ home = get_user_home(user)
514+ with environ(HOME=home):
515+ try:
516+ yield Env(uid, gid, home)
517+ finally:
518+ os.setegid(os.getgid())
519+ os.seteuid(os.getuid())
520+
521+
522+def user_exists(username):
523+ """Return True if given `username` exists, e.g.::
524+
525+ >>> user_exists('root')
526+ True
527+ >>> user_exists('_this_user_does_not_exist_')
528+ False
529+ """
530+ try:
531+ pwd.getpwnam(username)
532+ except KeyError:
533+ return False
534+ return True
535
536
537 def wait_for_page_contents(url, contents, timeout=120, validate=None):
538@@ -148,83 +541,6 @@
539 time.sleep(0.1)
540
541
542-class Serializer:
543- """Handle JSON (de)serialization."""
544-
545- def __init__(self, path, default=None, serialize=None, deserialize=None):
546- self.path = path
547- self.default = default or {}
548- self.serialize = serialize or json.dump
549- self.deserialize = deserialize or json.load
550-
551- def exists(self):
552- return os.path.exists(self.path)
553-
554- def get(self):
555- if self.exists():
556- with open(self.path) as f:
557- return self.deserialize(f)
558- return self.default
559-
560- def set(self, data):
561- with open(self.path, 'w') as f:
562- self.serialize(data, f)
563-
564-
565-def get_user_ids(user):
566- """Return the uid and gid of given `user`, e.g.::
567-
568- >>> get_user_ids('root')
569- (0, 0)
570- """
571- userdata = pwd.getpwnam(user)
572- return userdata.pw_uid, userdata.pw_gid
573-
574-
575-def get_user_home(user):
576- """Return the home directory of the given `user`.
577-
578- >>> get_user_home('root')
579- '/root'
580- """
581- return pwd.getpwnam(user).pw_dir
582-
583-
584-@contextmanager
585-def cd(directory):
586- """A context manager to temporary change current working dir, e.g.::
587-
588- >>> import os
589- >>> os.chdir('/tmp')
590- >>> with cd('/bin'): print os.getcwd()
591- /bin
592- >>> os.getcwd()
593- '/tmp'
594- """
595- cwd = os.getcwd()
596- os.chdir(directory)
597- yield
598- os.chdir(cwd)
599-
600-
601-@contextmanager
602-def su(user):
603- """A context manager to temporary run the script as a different user."""
604- uid, gid = get_user_ids(user)
605- os.setegid(gid)
606- os.seteuid(uid)
607- current_home = os.getenv('HOME')
608- home = get_user_home(user)
609- os.environ['HOME'] = home
610- try:
611- yield Env(uid, gid, home)
612- finally:
613- os.setegid(os.getgid())
614- os.seteuid(os.getuid())
615- if current_home is not None:
616- os.environ['HOME'] = current_home
617-
618-
619 class DictDiffer:
620 """
621 Calculate the difference between two dictionaries as:
622
623=== modified file 'tests.py'
624--- tests.py 2012-03-02 13:58:36 +0000
625+++ tests.py 2012-03-07 10:19:06 +0000
626@@ -6,44 +6,42 @@
627 __metaclass__ = type
628
629
630+import getpass
631 import os
632 from subprocess import CalledProcessError
633+import tempfile
634 import unittest
635
636 from shelltoolbox import (
637 cd,
638 command,
639 DictDiffer,
640+ environ,
641+ file_append,
642+ file_prepend,
643+ generate_ssh_keys,
644+ get_su_command,
645+ get_user_home,
646+ get_user_ids,
647+ join_command,
648+ mkdirs,
649 run,
650+ search_file,
651+ Serializer,
652+ ssh,
653 su,
654+ user_exists,
655 )
656
657
658-class TestRun(unittest.TestCase):
659-
660- def testSimpleCommand(self):
661- # Running a simple command (ls) works and running the command
662- # produces a string.
663- self.assertIsInstance(run('/bin/ls'), str)
664-
665- def testStdoutReturned(self):
666- # Running a simple command (ls) works and running the command
667- # produces a string.
668- self.assertIn('Usage:', run('/bin/ls', '--help'))
669-
670- def testCalledProcessErrorRaised(self):
671- # If an error occurs a CalledProcessError is raised with the return
672- # code, command executed, and the output of the command.
673- with self.assertRaises(CalledProcessError) as info:
674- run('ls', '--not a valid switch')
675- exception = info.exception
676- self.assertEqual(2, exception.returncode)
677- self.assertEqual("['ls', '--not a valid switch']", exception.cmd)
678- self.assertIn('unrecognized option', exception.output)
679-
680- def testNoneArguments(self):
681- # Ensure None is ignored when passed as positional argument.
682- self.assertIn('Usage:', run('/bin/ls', None, '--help', None))
683+class TestCdContextManager(unittest.TestCase):
684+
685+ def test_cd(self):
686+ curdir = os.getcwd()
687+ self.assertNotEqual('/var', curdir)
688+ with cd('/var'):
689+ self.assertEqual('/var', os.getcwd())
690+ self.assertEqual(curdir, os.getcwd())
691
692
693 class TestCommand(unittest.TestCase):
694@@ -112,6 +110,349 @@
695 self.assertEquals(expected, diff.added_or_changed)
696
697
698+class TestEnviron(unittest.TestCase):
699+
700+ def test_existing(self):
701+ # If an existing environment variable is changed, it is
702+ # restored during context cleanup.
703+ os.environ['MY_VARIABLE'] = 'foo'
704+ with environ(MY_VARIABLE='bar'):
705+ self.assertEqual('bar', os.getenv('MY_VARIABLE'))
706+ self.assertEqual('foo', os.getenv('MY_VARIABLE'))
707+ del os.environ['MY_VARIABLE']
708+
709+ def test_new(self):
710+ # If a new environment variable is added, it is removed during
711+ # context cleanup.
712+ with environ(MY_VAR1='foo', MY_VAR2='bar'):
713+ self.assertEqual('foo', os.getenv('MY_VAR1'))
714+ self.assertEqual('bar', os.getenv('MY_VAR2'))
715+ self.assertIsNone(os.getenv('MY_VAR1'))
716+ self.assertIsNone(os.getenv('MY_VAR2'))
717+
718+
719+class BaseCreateFile(object):
720+
721+ def create_file(self, content):
722+ f = tempfile.NamedTemporaryFile('w', delete=False)
723+ f.write(content)
724+ f.close()
725+ return f
726+
727+
728+class BaseTestFile(BaseCreateFile):
729+
730+ base_content = 'line1\n'
731+ new_content = 'new line\n'
732+
733+ def check_file_content(self, content, filename):
734+ self.assertEqual(content, open(filename).read())
735+
736+
737+class TestFileAppend(unittest.TestCase, BaseTestFile):
738+
739+ def test_append(self):
740+ # Ensure the new line is correctly added at the end of the file.
741+ f = self.create_file(self.base_content)
742+ file_append(f.name, self.new_content)
743+ self.check_file_content(self.base_content + self.new_content, f.name)
744+
745+ def test_existing_content(self):
746+ # Ensure nothing happens if the file already contains the given line.
747+ content = self.base_content + self.new_content
748+ f = self.create_file(content)
749+ file_append(f.name, self.new_content)
750+ self.check_file_content(content, f.name)
751+
752+ def test_new_line_in_file_contents(self):
753+ # A new line is automatically added before the given content if it
754+ # is not present at the end of current file.
755+ f = self.create_file(self.base_content.strip())
756+ file_append(f.name, self.new_content)
757+ self.check_file_content(self.base_content + self.new_content, f.name)
758+
759+ def test_new_line_in_given_line(self):
760+ # A new line is automatically added to the given line if not present.
761+ f = self.create_file(self.base_content)
762+ file_append(f.name, self.new_content.strip())
763+ self.check_file_content(self.base_content + self.new_content, f.name)
764+
765+ def test_non_existent_file(self):
766+ # Ensure the file is created if it does not exist.
767+ filename = tempfile.mktemp()
768+ file_append(filename, self.base_content)
769+ self.check_file_content(self.base_content, filename)
770+
771+ def test_fragment(self):
772+ # Ensure a line fragment is not matched.
773+ f = self.create_file(self.base_content)
774+ fragment = self.base_content[2:]
775+ file_append(f.name, fragment)
776+ self.check_file_content(self.base_content + fragment, f.name)
777+
778+
779+class TestFilePrepend(unittest.TestCase, BaseTestFile):
780+
781+ def test_prpend(self):
782+ # Ensure the new content is correctly prepended at the beginning of
783+ # the file.
784+ f = self.create_file(self.base_content)
785+ file_prepend(f.name, self.new_content)
786+ self.check_file_content(self.new_content + self.base_content, f.name)
787+
788+ def test_existing_content(self):
789+ # Ensure nothing happens if the file already starts with the given
790+ # content.
791+ content = self.base_content + self.new_content
792+ f = self.create_file(content)
793+ file_prepend(f.name, self.base_content)
794+ self.check_file_content(content, f.name)
795+
796+ def test_move_content(self):
797+ # If the file contains the given content, but not at the beginning,
798+ # the content is moved on top.
799+ f = self.create_file(self.base_content + self.new_content)
800+ file_prepend(f.name, self.new_content)
801+ self.check_file_content(self.new_content + self.base_content, f.name)
802+
803+ def test_new_line_in_given_line(self):
804+ # A new line is automatically added to the given line if not present.
805+ f = self.create_file(self.base_content)
806+ file_prepend(f.name, self.new_content.strip())
807+ self.check_file_content(self.new_content + self.base_content, f.name)
808+
809+
810+class TestGenerateSSHKeys(unittest.TestCase):
811+
812+ def test_generation(self):
813+ # Ensure ssh keys are correctly generated.
814+ filename = tempfile.mktemp()
815+ generate_ssh_keys(filename)
816+ first_line = open(filename).readlines()[0].strip()
817+ self.assertEqual('-----BEGIN RSA PRIVATE KEY-----', first_line)
818+ pub_content = open(filename + '.pub').read()
819+ self.assertTrue(pub_content.startswith('ssh-rsa'))
820+
821+
822+class TestGetSuCommand(unittest.TestCase):
823+
824+ def test_current_user(self):
825+ # If the su is requested as current user, the arguments are
826+ # returned as given.
827+ cmd = ('ls', '-l')
828+ command = get_su_command(getpass.getuser(), cmd)
829+ self.assertSequenceEqual(cmd, command)
830+
831+ def test_another_user(self):
832+ # Ensure "su" is prepended and arguments are correctly quoted.
833+ command = get_su_command('nobody', ('ls', '-l', 'my file'))
834+ self.assertSequenceEqual(
835+ ('su', 'nobody', '-c', "ls -l 'my file'"), command)
836+
837+
838+class TestGetUserHome(unittest.TestCase):
839+
840+ def test_existent(self):
841+ # Ensure the real home directory is returned for existing users.
842+ self.assertEqual('/root', get_user_home('root'))
843+
844+ def test_non_existent(self):
845+ # If the user does not exist, return a default /home/[username] home.
846+ user = '_this_user_does_not_exist_'
847+ self.assertEqual('/home/' + user, get_user_home(user))
848+
849+
850+class TestGetUserIds(unittest.TestCase):
851+
852+ def test_get_user_ids(self):
853+ # Ensure the correct uid and gid are returned.
854+ uid, gid = get_user_ids('root')
855+ self.assertEqual(0, uid)
856+ self.assertEqual(0, gid)
857+
858+
859+class TestJoinCommand(unittest.TestCase):
860+
861+ def test_normal(self):
862+ # Ensure a normal command is correctly parsed.
863+ command = 'ls -l'
864+ self.assertEqual(command, join_command(command.split()))
865+
866+ def test_containing_spaces(self):
867+ # Ensure args containing spaces are correctly quoted.
868+ args = ('command', 'arg containig spaces')
869+ self.assertEqual("command 'arg containig spaces'", join_command(args))
870+
871+ def test_empty(self):
872+ # Ensure empty args are correctly quoted.
873+ args = ('command', '')
874+ self.assertEqual("command ''", join_command(args))
875+
876+
877+class TestMkdirs(unittest.TestCase):
878+
879+ def test_intermediate_dirs(self):
880+ # Ensure the leaf directory and all intermediate ones are created.
881+ base_dir = tempfile.mktemp(suffix='/')
882+ dir1 = tempfile.mktemp(prefix=base_dir)
883+ dir2 = tempfile.mktemp(prefix=base_dir)
884+ mkdirs(dir1, dir2)
885+ self.assertTrue(os.path.isdir(dir1))
886+ self.assertTrue(os.path.isdir(dir2))
887+
888+ def test_existing_dir(self):
889+ # If the leaf directory already exists the function returns
890+ # without errors.
891+ mkdirs('/tmp')
892+
893+ def test_existing_file(self):
894+ # An `OSError` is raised if the leaf path exists and it is a file.
895+ f = tempfile.NamedTemporaryFile('w', delete=False)
896+ f.close()
897+ with self.assertRaises(OSError):
898+ mkdirs(f.name)
899+
900+
901+class TestRun(unittest.TestCase):
902+
903+ def testSimpleCommand(self):
904+ # Running a simple command (ls) works and running the command
905+ # produces a string.
906+ self.assertIsInstance(run('/bin/ls'), str)
907+
908+ def testStdoutReturned(self):
909+ # Running a simple command (ls) works and running the command
910+ # produces a string.
911+ self.assertIn('Usage:', run('/bin/ls', '--help'))
912+
913+ def testCalledProcessErrorRaised(self):
914+ # If an error occurs a CalledProcessError is raised with the return
915+ # code, command executed, and the output of the command.
916+ with self.assertRaises(CalledProcessError) as info:
917+ run('ls', '--not a valid switch')
918+ exception = info.exception
919+ self.assertEqual(2, exception.returncode)
920+ self.assertEqual("['ls', '--not a valid switch']", exception.cmd)
921+ self.assertIn('unrecognized option', exception.output)
922+
923+ def testNoneArguments(self):
924+ # Ensure None is ignored when passed as positional argument.
925+ self.assertIn('Usage:', run('/bin/ls', None, '--help', None))
926+
927+
928+class TestSearchFile(unittest.TestCase, BaseCreateFile):
929+
930+ content1 = 'content1\n'
931+ content2 = 'content2\n'
932+
933+ def setUp(self):
934+ self.filename = self.create_file(self.content1 + self.content2).name
935+
936+ def tearDown(self):
937+ os.remove(self.filename)
938+
939+ def test_grep(self):
940+ # Ensure plain text is correctly matched.
941+ self.assertEqual(self.content2, search_file('ent2', self.filename))
942+ self.assertEqual(self.content1, search_file('content', self.filename))
943+
944+ def test_no_match(self):
945+ # Ensure the function does not return false positives.
946+ self.assertIsNone(search_file('no_match', self.filename))
947+
948+ def test_regexp(self):
949+ # Ensure the function works with regular expressions.
950+ self.assertEqual(self.content2, search_file('\w2', self.filename))
951+
952+
953+class TestSerializer(unittest.TestCase):
954+
955+ def setUp(self):
956+ self.path = tempfile.mktemp()
957+ self.data = {'key': 'value'}
958+
959+ def tearDown(self):
960+ if os.path.exists(self.path):
961+ os.remove(self.path)
962+
963+ def test_serializer(self):
964+ # Ensure data is correctly serializied and deserialized.
965+ s = Serializer(self.path)
966+ s.set(self.data)
967+ self.assertEqual(self.data, s.get())
968+
969+ def test_existence(self):
970+ # Ensure the file is created only when needed.
971+ s = Serializer(self.path)
972+ self.assertFalse(s.exists())
973+ s.set(self.data)
974+ self.assertTrue(s.exists())
975+
976+ def test_default_value(self):
977+ # If the file does not exist, the serializer returns a default value.
978+ s = Serializer(self.path)
979+ self.assertEqual({}, s.get())
980+ s = Serializer(self.path, default=47)
981+ self.assertEqual(47, s.get())
982+
983+ def test_another_serializer(self):
984+ # It is possible to use a custom serializer (e.g. pickle).
985+ import pickle
986+ s = Serializer(
987+ self.path, serialize=pickle.dump, deserialize=pickle.load)
988+ s.set(self.data)
989+ self.assertEqual(self.data, s.get())
990+
991+
992+class TestSSH(unittest.TestCase):
993+
994+ def setUp(self):
995+ self.last_command = None
996+
997+ def remove_command_options(self, cmd):
998+ cmd = list(cmd)
999+ del cmd[1:7]
1000+ return cmd
1001+
1002+ def caller(self, cmd):
1003+ self.last_command = self.remove_command_options(cmd)
1004+
1005+ def check_last_command(self, expected):
1006+ self.assertSequenceEqual(expected, self.last_command)
1007+
1008+ def test_current_user(self):
1009+ # Ensure ssh command is correctly generated for current user.
1010+ sshcall = ssh('example.com', caller=self.caller)
1011+ sshcall('ls -l')
1012+ self.check_last_command(['ssh', 'example.com', '--', 'ls -l'])
1013+
1014+ def test_another_user(self):
1015+ # Ensure ssh command is correctly generated for a different user.
1016+ sshcall = ssh('example.com', 'myuser', caller=self.caller)
1017+ sshcall('ls -l')
1018+ self.check_last_command(['ssh', 'myuser@example.com', '--', 'ls -l'])
1019+
1020+ def test_ssh_key(self):
1021+ # The ssh key path can be optionally provided.
1022+ sshcall = ssh('example.com', key='/tmp/foo', caller=self.caller)
1023+ sshcall('ls -l')
1024+ self.check_last_command([
1025+ 'ssh', '-i', '/tmp/foo', 'example.com', '--', 'ls -l'])
1026+
1027+ def test_error(self):
1028+ # If the ssh command exits with an error code, a
1029+ # `subprocess.CalledProcessError` is raised.
1030+ sshcall = ssh('example.com', caller=lambda cmd: 1)
1031+ with self.assertRaises(CalledProcessError):
1032+ sshcall('ls -l')
1033+
1034+ def test_ignore_errors(self):
1035+ # If ignore_errors is set to True when executing the command, no error
1036+ # will be raised, even if the command itself returns an error code.
1037+ sshcall = ssh('example.com', caller=lambda cmd: 1)
1038+ sshcall('ls -l', ignore_errors=True)
1039+
1040+
1041 current_euid = os.geteuid()
1042 current_egid = os.getegid()
1043 current_home = os.environ['HOME']
1044@@ -180,13 +521,11 @@
1045 self.assertEqual(current_home, os.environ['HOME'])
1046
1047
1048-class TestCdContextManager(unittest.TestCase):
1049- def test_cd(self):
1050- curdir = os.getcwd()
1051- self.assertNotEqual('/var', curdir)
1052- with cd('/var'):
1053- self.assertEqual('/var', os.getcwd())
1054- self.assertEqual(curdir, os.getcwd())
1055+class TestUserExists(unittest.TestCase):
1056+
1057+ def test_user_exists(self):
1058+ self.assertTrue(user_exists('root'))
1059+ self.assertFalse(user_exists('_this_user_does_not_exist_'))
1060
1061
1062 if __name__ == '__main__':

Subscribers

People subscribed via source and target branches