Merge lp:~allenap/launchpad/ec2-parry into lp:launchpad

Proposed by Gavin Panella
Status: Rejected
Rejected by: Gavin Panella
Proposed branch: lp:~allenap/launchpad/ec2-parry
Merge into: lp:launchpad
Diff against target: 1371 lines (+1043/-96)
8 files modified
lib/devscripts/ec2test/account.py (+3/-0)
lib/devscripts/ec2test/builtins.py (+100/-0)
lib/devscripts/ec2test/instance.py (+11/-2)
lib/devscripts/ec2test/remote.py (+67/-54)
lib/devscripts/ec2test/remotenode.py (+539/-0)
lib/devscripts/ec2test/remotenodekiller.py (+146/-0)
lib/devscripts/ec2test/testrunner.py (+88/-40)
utilities/pb-shell (+89/-0)
To merge this branch: bzr merge lp:~allenap/launchpad/ec2-parry
Reviewer Review Type Date Requested Status
Michael Hudson-Doyle Abstain
Review via email: mp+14693@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :
Download full text (5.9 KiB)

== The new Twisted node daemon, lib/devscripts/ec2test/remotenode.py ==

Diff: http://pastebin.ubuntu.com/315123/

This daemon is meant to make the following possible:

1. Start up N instances (on EC2, but the daemon makes no assumptions
   on that front).

2. One of them is told it is the supervisor. It immediately builds the
   LP tree and calculates the complete list of tests to run. It puts
   these tests into N similarly sized parcels for workers to pick up
   later. It also starts the subunit results aggregator service.

3. Every instance is told where the supervisor is. These instances
   become workers, and ask the supervisor for work.

4. Each worker runs the tests it has been given, and streams the
   results back to the supervisor's result aggregator.

5. When a worker has finished its run, with or without error, it tells
   the supervisor that it has attempted to run every test in its
   parcel.

6. The supervisor keeps track of which tests it actually has results
   for; there seem to be some problems where workers don't actually
   run all of the tests they're asked to.

7. The worker asks for more work. If there isn't any, it shuts
   down.

8. Once the supervisor has responses for all parcels, it sends a
   comprehensive report email, then shuts down.

9. Another process (remotenodekiller.py) takes care of actually
   shutting each instance down, when either the node daemon has
   exited, or if the logs have not had any activity for 10 minutes.

Currently there is no support for submitting to PQM after a test run,
and the email address is hard-wired to <email address hidden>. I
think I might fix the latter before I land this branch :)

Anyway, the daemon has three services: controller, subunit results
aggregator, and inspection. The first two are needed to run the
testing service, the last helps during development, and for debugging.

=== Controller ===

The main class here is TestNode. This currently mixes in many
responsibilities that should be split out. It contains the logic for
being a supervisor (including starting the aggregator service and
sending report emails), being a worker, and shutting down. It is also,
in Perspective Broker parlance, a Referencable object, which means its
remote_* methods can be called remotely.

Ideally I would like to separate out concepts like the branch, the
supervisor and the worker, and then have a Referencable object to be
the remote control for them.

=== Subunit Aggregator ===

This is simply a socket that receives line-based output from a
remotely running bin/test process that's been asked to produce subunit
output (the --subunit flag). For each connection, it creates a
subunit.TestProtocolServer instance, to which it blindly passes the
input. This TestProtocolServer is given a results object that the
supervisor has created. This way all test results from all workers are
recorded into a single test result object.

The test result object also calls back to the supervisor for every
test that is run so that the supervisor can keep track of the tests
for which a result is available. It wouldn't be good if it submitted
to PQM if 1000 tests had never actually run.

=== Inspection se...

Read more...

lp:~allenap/launchpad/ec2-parry updated
9824. By Gavin Panella

Merge devel.

9825. By Gavin Panella

time.time() is UTC already.

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (44.9 KiB)

Hi Gavin,

This is about a quarter of a review -- sorry time crunch at the sprint
hasn't let me do anything more :(

Excited to see this progressing naturally, a little concerned about
the complexity...

> === modified file 'lib/devscripts/ec2test/account.py'
> --- lib/devscripts/ec2test/account.py 2009-10-08 14:50:19 +0000
> +++ lib/devscripts/ec2test/account.py 2009-11-12 08:29:26 +0000
> @@ -104,6 +104,9 @@
> security_group.authorize('tcp', 22, 22, '%s/32' % ip)
> security_group.authorize('tcp', 80, 80, '%s/32' % ip)
> security_group.authorize('tcp', 443, 443, '%s/32' % ip)
> + # Authorize Perspective Broker and Subunit. XXX: This is way
> + # too permissive.

Er, I'd say so yes.

Given that what I think you _actually_ want is ec2 instances to be
able to connect to each other on this protocol, not external machines,
you don't want to do the authorization like this, but rather use some
other EC2 feature I currently can't remember the name of :-) Read the
security group docs I guess.

> + security_group.authorize('tcp', 8789, 8790, '0.0.0.0/0')
> for network in demo_networks:
> # Add missing netmask info for single ips.
> if '/' not in network:

> === modified file 'lib/devscripts/ec2test/builtins.py'
> --- lib/devscripts/ec2test/builtins.py 2009-10-16 11:07:37 +0000
> +++ lib/devscripts/ec2test/builtins.py 2009-11-12 08:29:26 +0000
> @@ -306,6 +306,112 @@
> instance.set_up_and_run(postmortem, not headless, runner.run_tests)
>
>
> +class cmd_parallel_test(EC2Command):
> + """Run the test suite in ec2 in parallel."""
> +
> + takes_options = [
> + branch_option,
> + trunk_option,
> + machine_id_option,
> + instance_type_option,
> + postmortem_option,
> + include_download_cache_changes_option,
> + Option(
> + 'jobs', short_name='j', type=int, argname="NUM",
> + help=('The number of instances to start and distribute the '
> + 'test command across.')),
> + ]
> +
> + takes_args = ['test_branch?']
> +
> + def run(self, test_branch=None, branch=None, trunk=False, machine=None,
> + instance_type=DEFAULT_INSTANCE_TYPE, postmortem=False,
> + include_download_cache_changes=False, jobs=1):
> + if branch is None:
> + branch = []
> + branches, test_branch = _get_branches_and_test_branch(
> + trunk, branch, test_branch)
> +
> + if jobs < 1:
> + raise BzrCommandError(
> + 'The number of instances must be greater than zero.')
> +
> + from twisted.internet import reactor
> + from twisted.internet.defer import DeferredList, DeferredLock
> + from twisted.internet.threads import deferToThread
> +
> + from devscripts.ec2test.remotenode import connect_to_node

I don't think there is any reason these imports can't be at the module
level.

> + # Keep a record of workers as they start.
> + workers = []
> +
> + # This is fired once the supervisor has started.
> + supervisor_startup = DeferredLock()
> +
> + def show_error(f...

review: Abstain
lp:~allenap/launchpad/ec2-parry updated
9826. By Gavin Panella

Move imports to module level.

9827. By Gavin Panella

Use DeferredLock.run() where possible.

9828. By Gavin Panella

Return the deferred for starting the worker.

9829. By Gavin Panella

Just prepend LC_ALL=C to the command.

9830. By Gavin Panella

Only configure email when it's needed.

Revision history for this message
Gavin Panella (allenap) wrote :
Download full text (51.8 KiB)

On Thu, 12 Nov 2009 09:19:31 -0000
Michael Hudson <email address hidden> wrote:

> Review: Abstain
> Hi Gavin,
>
> This is about a quarter of a review -- sorry time crunch at the sprint
> hasn't let me do anything more :(
>
> Excited to see this progressing naturally, a little concerned about
> the complexity...

Cool :)

Complexity bad, but I know it's there. This branch probably represents
a mid-point in my understanding and approach to the problem. In my
head, some of this can already be removed or refactored down to
something simpler, but that's for a later branch. And maybe someone
else will do it.

>
> > === modified file 'lib/devscripts/ec2test/account.py'
> > --- lib/devscripts/ec2test/account.py 2009-10-08 14:50:19 +0000
> > +++ lib/devscripts/ec2test/account.py 2009-11-12 08:29:26 +0000
> > @@ -104,6 +104,9 @@
> > security_group.authorize('tcp', 22, 22, '%s/32' % ip)
> > security_group.authorize('tcp', 80, 80, '%s/32' % ip)
> > security_group.authorize('tcp', 443, 443, '%s/32' % ip)
> > + # Authorize Perspective Broker and Subunit. XXX: This is way
> > + # too permissive.
>
> Er, I'd say so yes.
>
> Given that what I think you _actually_ want is ec2 instances to be
> able to connect to each other on this protocol, not external machines,
> you don't want to do the authorization like this, but rather use some
> other EC2 feature I currently can't remember the name of :-) Read the
> security group docs I guess.

Yes, I agree totally. This was a hack to get things working. I think
it's possible to grant one security group access to another, but I've
not tried it. Also, machines started together as part of a reservation
or with the same security group might have different properties.

Currently the wrappers in devscripts/ec2test make it difficult to
start up multiple instances as part of a reservation, and security
groups are not exposed in the wrappers iirc, but that's just a matter
of code.

Anyway, this is definitely something I mean to fix. You've probably
seen in the rest of the code that there is quite a lot that could be
improved, simplified, or removed. But I'm keen to get this landed so
that other masochists can dogfood it.

>
> > + security_group.authorize('tcp', 8789, 8790, '0.0.0.0/0')
> > for network in demo_networks:
> > # Add missing netmask info for single ips.
> > if '/' not in network:
>
> > === modified file 'lib/devscripts/ec2test/builtins.py'
> > --- lib/devscripts/ec2test/builtins.py 2009-10-16 11:07:37 +0000
> > +++ lib/devscripts/ec2test/builtins.py 2009-11-12 08:29:26 +0000
> > @@ -306,6 +306,112 @@
> > instance.set_up_and_run(postmortem, not headless, runner.run_tests)
> >
> >
> > +class cmd_parallel_test(EC2Command):
> > + """Run the test suite in ec2 in parallel."""
> > +
> > + takes_options = [
> > + branch_option,
> > + trunk_option,
> > + machine_id_option,
> > + instance_type_option,
> > + postmortem_option,
> > + include_download_cache_changes_option,
> > + Option(
> > + 'jobs', short_name='j', type=int, argname="NUM",
> > + help=(...

lp:~allenap/launchpad/ec2-parry updated
9831. By Gavin Panella

Don't fail if the .bazaar directory already exists.

9832. By Gavin Panella

Prepending LC_ALL=C to commands either didn't have a full effect, or broke things. Reverting.

9833. By Gavin Panella

Merge devel.

9834. By Gavin Panella

twistd on Hardy does not recognize --umask.

9835. By Gavin Panella

DeferredLock.run() does not pass any args.

9836. By Gavin Panella

Create test parcels in order to reduce layer churn (also because the test suite seems to break a *lot* when run out of order). Keep track of new tests, so that _add_tests() can be called more than once (not that it needs to be, but this behaviour will be useful later on for retrying unrunnable tests), and keep a record of all tests. Remove some other noise from the test list obtained by running bin/test --list.

9837. By Gavin Panella

Rename self._tests to self._tests_to_run.

9838. By Gavin Panella

Merge devel.

9839. By Gavin Panella

Merge devel.

9840. By Gavin Panella

Merge devel.

Revision history for this message
Gavin Panella (allenap) wrote :

The code has moved on so much from this merge proposal, and moved into a separate branch, so I'm going to reject it.

Unmerged revisions

9841. By Gavin Panella

Merge devel, resolving several conflicts.

9840. By Gavin Panella

Merge devel.

9839. By Gavin Panella

Merge devel.

9838. By Gavin Panella

Merge devel.

9837. By Gavin Panella

Rename self._tests to self._tests_to_run.

9836. By Gavin Panella

Create test parcels in order to reduce layer churn (also because the test suite seems to break a *lot* when run out of order). Keep track of new tests, so that _add_tests() can be called more than once (not that it needs to be, but this behaviour will be useful later on for retrying unrunnable tests), and keep a record of all tests. Remove some other noise from the test list obtained by running bin/test --list.

9835. By Gavin Panella

DeferredLock.run() does not pass any args.

9834. By Gavin Panella

twistd on Hardy does not recognize --umask.

9833. By Gavin Panella

Merge devel.

9832. By Gavin Panella

Prepending LC_ALL=C to commands either didn't have a full effect, or broke things. Reverting.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/devscripts/ec2test/account.py'
2--- lib/devscripts/ec2test/account.py 2009-10-08 14:50:19 +0000
3+++ lib/devscripts/ec2test/account.py 2009-12-23 15:23:24 +0000
4@@ -104,6 +104,9 @@
5 security_group.authorize('tcp', 22, 22, '%s/32' % ip)
6 security_group.authorize('tcp', 80, 80, '%s/32' % ip)
7 security_group.authorize('tcp', 443, 443, '%s/32' % ip)
8+ # Authorize Perspective Broker and Subunit. XXX: This is way
9+ # too permissive.
10+ security_group.authorize('tcp', 8789, 8790, '0.0.0.0/0')
11 for network in demo_networks:
12 # Add missing netmask info for single ips.
13 if '/' not in network:
14
15=== modified file 'lib/devscripts/ec2test/builtins.py'
16--- lib/devscripts/ec2test/builtins.py 2009-11-30 16:26:24 +0000
17+++ lib/devscripts/ec2test/builtins.py 2009-12-23 15:23:24 +0000
18@@ -18,11 +18,16 @@
19
20 import socket
21
22+from twisted.internet import reactor
23+from twisted.internet.defer import DeferredList, DeferredLock
24+from twisted.internet.threads import deferToThread
25+
26 from devscripts import get_launchpad_root
27
28 from devscripts.ec2test.credentials import EC2Credentials
29 from devscripts.ec2test.instance import (
30 AVAILABLE_INSTANCE_TYPES, DEFAULT_INSTANCE_TYPE, EC2Instance)
31+from devscripts.ec2test.remotenode import connect_to_node
32 from devscripts.ec2test.session import EC2SessionName
33 from devscripts.ec2test.testrunner import EC2TestRunner, TRUNK_BRANCH
34
35@@ -306,6 +311,101 @@
36 instance.set_up_and_run(postmortem, not headless, runner.run_tests)
37
38
39+class cmd_parallel_test(EC2Command):
40+ """Run the test suite in ec2 in parallel."""
41+
42+ takes_options = [
43+ branch_option,
44+ trunk_option,
45+ machine_id_option,
46+ instance_type_option,
47+ postmortem_option,
48+ include_download_cache_changes_option,
49+ Option(
50+ 'jobs', short_name='j', type=int, argname="NUM",
51+ help=('The number of instances to start and distribute the '
52+ 'test command across.')),
53+ ]
54+
55+ takes_args = ['test_branch?']
56+
57+ def run(self, test_branch=None, branch=None, trunk=False, machine=None,
58+ instance_type=DEFAULT_INSTANCE_TYPE, postmortem=False,
59+ include_download_cache_changes=False, jobs=1):
60+ if branch is None:
61+ branch = []
62+ branches, test_branch = _get_branches_and_test_branch(
63+ trunk, branch, test_branch)
64+
65+ if jobs < 1:
66+ raise BzrCommandError(
67+ 'The number of instances must be greater than zero.')
68+
69+ # Keep a record of workers as they start.
70+ workers = []
71+
72+ # This is fired once the supervisor has started.
73+ supervisor_startup = DeferredLock()
74+
75+ def show_error(failure):
76+ failure.printTraceback()
77+
78+ def start_instance_and_node(instance, runner):
79+ # Bring up the instance and start the test node service.
80+ try:
81+ instance.start()
82+ runner.run_node()
83+ except:
84+ instance.shutdown()
85+ raise
86+ else:
87+ return instance
88+
89+ def get_root(instance):
90+ return connect_to_node(instance.hostname)
91+
92+ def start_tests(root, instance):
93+ workers.append((root, instance))
94+ if len(workers) == 1:
95+ # Start the supervisor.
96+ supervisor_startup.run(
97+ root.callRemote, 'become_supervisor', jobs)
98+ # Start the worker.
99+ supervisor_root, supervisor_instance = workers[0]
100+ # Wait for the supervisor to start.
101+ d = supervisor_startup.run(lambda: None)
102+ # Call the node *outside* of the lock.
103+ return d.addCallback(lambda _: root.callRemote(
104+ 'got_supervisor', supervisor_instance.hostname))
105+
106+ def create_and_start_instance():
107+ session_name = EC2SessionName.make(EC2TestRunner.name)
108+ instance = EC2Instance.make(
109+ session_name, instance_type, machine)
110+ runner = EC2TestRunner(
111+ test_branch, branches=branches,
112+ include_download_cache_changes=include_download_cache_changes,
113+ instance=instance, launchpad_login=instance._launchpad_login)
114+ # Do the startup in a thread because boto is
115+ # blocking. XXX: Use txAWS here instead?
116+ startup = deferToThread(start_instance_and_node, instance, runner)
117+ startup.addCallback(get_root)
118+ startup.addCallback(start_tests, instance)
119+ return instance, startup
120+
121+ startups = []
122+ for job in range(jobs):
123+ instance, startup = create_and_start_instance()
124+ startup.addErrback(show_error)
125+ startups.append(startup)
126+
127+ # Stop when all the instances have been started.
128+ started = DeferredList(startups)
129+ started.addBoth(lambda _: reactor.stop())
130+
131+ reactor.run()
132+
133+
134 class cmd_land(EC2Command):
135 """Land a merge proposal on Launchpad."""
136
137
138=== modified file 'lib/devscripts/ec2test/instance.py'
139--- lib/devscripts/ec2test/instance.py 2009-11-27 07:24:49 +0000
140+++ lib/devscripts/ec2test/instance.py 2009-12-23 15:23:24 +0000
141@@ -404,8 +404,6 @@
142 def set_up_and_run(self, postmortem, shutdown, func, *args, **kw):
143 """Start, run `func` and then maybe shut down.
144
145- :param config: A dictionary specifying details of how the instance
146- should be run:
147 :param postmortem: If true, any exceptions will be caught and an
148 interactive session run to allow debugging the problem.
149 :param shutdown: If true, shut down the instance after `func` and
150@@ -574,6 +572,11 @@
151 self._ssh = ssh
152 self._sftp = None
153
154+ def _command_with_locale(self, cmd):
155+ # Default the locale to C to stop locale warnings (especially
156+ # from bzr) from clogging up the output.
157+ return 'export LC_ALL=C; ' + cmd
158+
159 @property
160 def sftp(self):
161 if self._sftp is None:
162@@ -588,6 +591,7 @@
163 :param out: A stream to write the output of the remote command to.
164 :param err: A stream to write the error of the remote command to.
165 """
166+ cmd = self._command_with_locale(cmd)
167 if out is None:
168 out = sys.stdout
169 if err is None:
170@@ -623,12 +627,17 @@
171 raise RuntimeError('Command failed: %s' % (cmd,))
172 return res
173
174+ def run_as_daemon(self, cmd):
175+ """Start `cmd` as a daemonized process on the server."""
176+ return self.perform('(%s </dev/null >/dev/null 2>/dev/null &)' % cmd)
177+
178 def run_with_ssh_agent(self, cmd, ignore_failure=False):
179 """Run 'cmd' in a subprocess.
180
181 Use this to run commands that require local SSH credentials. For
182 example, getting private branches from Launchpad.
183 """
184+ cmd = self._command_with_locale(cmd)
185 self._instance.log(
186 '%s@%s$ %s\n'
187 % (self._username, self._instance._boto_instance.id, cmd))
188
189=== renamed file 'lib/devscripts/ec2test/ec2test-remote.py' => 'lib/devscripts/ec2test/remote.py'
190--- lib/devscripts/ec2test/ec2test-remote.py 2009-10-09 15:04:24 +0000
191+++ lib/devscripts/ec2test/remote.py 2009-12-23 15:23:24 +0000
192@@ -30,7 +30,8 @@
193 class BaseTestRunner:
194
195 def __init__(self, email=None, pqm_message=None, public_branch=None,
196- public_branch_revno=None, test_options=None):
197+ public_branch_revno=None, test_options=None,
198+ prefix=os.path.curdir):
199 self.email = email
200 self.pqm_message = pqm_message
201 self.public_branch = public_branch
202@@ -42,7 +43,8 @@
203 self.test_options = test_options
204
205 # Configure paths.
206- self.lp_dir = os.path.join(os.path.sep, 'var', 'launchpad')
207+ self.prefix_dir = os.path.abspath(prefix)
208+ self.lp_dir = os.path.join(self.prefix_dir, 'launchpad')
209 self.tmp_dir = os.path.join(self.lp_dir, 'tmp')
210 self.test_dir = os.path.join(self.lp_dir, 'test')
211 self.sourcecode_dir = os.path.join(self.test_dir, 'sourcecode')
212@@ -52,8 +54,8 @@
213 self.test_dir,
214 self.public_branch,
215 self.public_branch_revno,
216- self.sourcecode_dir
217- )
218+ self.sourcecode_dir,
219+ os.path.join(self.prefix_dir, 'www'))
220
221 # Daemonization options.
222 self.pid_filename = os.path.join(self.lp_dir, 'ec2test-remote.pid')
223@@ -88,7 +90,41 @@
224 """
225 raise NotImplementedError
226
227- def test(self):
228+ def run_tests(self):
229+ """Run the tests the good old fashioned way.
230+
231+ :return: A boolean indicating success.
232+ """
233+ call = self.build_test_command()
234+
235+ popen = subprocess.Popen(
236+ call, bufsize=-1,
237+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
238+ cwd=self.test_dir)
239+
240+ self._gather_test_output(
241+ popen, self.logger.summary_file, self.logger.out_file)
242+
243+ # Grab the testrunner exit status
244+ result = popen.wait()
245+
246+ if self.pqm_message is not None:
247+ subject = self.pqm_message.get('Subject')
248+ if result:
249+ # failure
250+ self.logger.summary_file.write(
251+ '\n\n**NOT** submitted to PQM:\n%s\n' % (subject,))
252+ else:
253+ # success
254+ conn = bzrlib.smtp_connection.SMTPConnection(
255+ bzrlib.config.GlobalConfig())
256+ conn.send_email(self.pqm_message)
257+ self.logger.summary_file.write(
258+ '\n\nSUBMITTED TO PQM:\n%s\n' % (subject,))
259+
260+ return (result == 0)
261+
262+ def run(self):
263 """Run the tests, log the results.
264
265 Signals the ec2test process and cleans up the logs once all the tests
266@@ -99,43 +135,17 @@
267 # os.fork() may have tried to close them.
268 self.logger.prepare()
269
270- out_file = self.logger.out_file
271+ out_file = self.logger.out_file
272 summary_file = self.logger.summary_file
273- config = bzrlib.config.GlobalConfig()
274-
275- call = self.build_test_command()
276
277 try:
278+ success = False
279 try:
280 try:
281- popen = subprocess.Popen(
282- call, bufsize=-1,
283- stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
284- cwd=self.test_dir)
285-
286- self._gather_test_output(popen, summary_file, out_file)
287-
288- # Grab the testrunner exit status
289- result = popen.wait()
290-
291- if self.pqm_message is not None:
292- subject = self.pqm_message.get('Subject')
293- if result:
294- # failure
295- summary_file.write(
296- '\n\n**NOT** submitted to PQM:\n%s\n' %
297- (subject,))
298- else:
299- # success
300- conn = bzrlib.smtp_connection.SMTPConnection(
301- config)
302- conn.send_email(self.pqm_message)
303- summary_file.write('\n\nSUBMITTED TO PQM:\n%s\n' %
304- (subject,))
305+ success = self.run_tests()
306 except:
307 summary_file.write('\n\nERROR IN TESTRUNNER\n\n')
308 traceback.print_exc(file=summary_file)
309- result = 1
310 raise
311 finally:
312 # It probably isn't safe to close the log files ourselves,
313@@ -143,10 +153,11 @@
314 summary_file.close()
315 if self.email is not None:
316 subject = 'Test results: %s' % (
317- result and 'FAILURE' or 'SUCCESS')
318+ success and 'SUCCESS' or 'FAILURE')
319 summary_file = open(self.logger.summary_filename, 'r')
320 bzrlib.email_message.EmailMessage.send(
321- config, self.email[0], self.email,
322+ bzrlib.config.GlobalConfig(),
323+ self.email[0], self.email,
324 subject, summary_file.read())
325 summary_file.close()
326 finally:
327@@ -187,8 +198,7 @@
328
329 def build_test_command(self):
330 """See BaseTestRunner.build_test_command()."""
331- command = ['make', 'check', 'VERBOSITY=' + self.test_options]
332- return command
333+ return ['make', 'check', 'VERBOSITY=' + self.test_options]
334
335 # Used to filter lines in the summary log. See
336 # `BaseTestRunner.ignore_line()`.
337@@ -211,23 +221,22 @@
338 # display, and return the exit code. (See the xvfb-run man page for
339 # details.)
340 return [
341- 'xvfb-run',
342- '-s', '-screen 0 1024x768x24',
343- 'make', 'jscheck']
344+ 'xvfb-run', '-s', '-screen 0 1024x768x24',
345+ 'make', 'jscheck', 'VERBOSITY=' + self.test_options]
346
347
348 class WebTestLogger:
349 """Logs test output to disk and a simple web page."""
350
351 def __init__(self, test_dir, public_branch, public_branch_revno,
352- sourcecode_dir):
353+ sourcecode_dir, www_dir):
354 """ Class initialiser """
355 self.test_dir = test_dir
356 self.public_branch = public_branch
357 self.public_branch_revno = public_branch_revno
358 self.sourcecode_dir = sourcecode_dir
359+ self.www_dir = www_dir
360
361- self.www_dir = os.path.join(os.path.sep, 'var', 'www')
362 self.out_filename = os.path.join(self.www_dir, 'current_test.log')
363 self.summary_filename = os.path.join(self.www_dir, 'summary.log')
364 self.index_filename = os.path.join(self.www_dir, 'index.html')
365@@ -265,9 +274,9 @@
366 """
367 self.open_logs()
368
369- out_file = self.out_file
370+ out_file = self.out_file
371 summary_file = self.summary_file
372- index_file = self.index_file
373+ index_file = self.index_file
374
375 def write(msg):
376 msg += '\n'
377@@ -432,11 +441,16 @@
378 parser.add_option(
379 '--jscheck', dest='jscheck', default=False, action='store_true',
380 help=('Run the JavaScript integration test suite.'))
381+ parser.add_option(
382+ '--prefix', dest='prefix', default=os.path.join(os.path.sep, 'var'),
383+ help=('The directory in which to start the runner, '
384+ '%default by default.'))
385
386 options, args = parser.parse_args()
387
388 if options.debug:
389- import pdb; pdb.set_trace()
390+ import pdb
391+ pdb.set_trace()
392 if options.pqm_message is not None:
393 pqm_message = pickle.loads(
394 options.pqm_message.decode('string-escape').decode('base64'))
395@@ -450,20 +464,19 @@
396 runner_type = TestOnMergeRunner
397
398 runner = runner_type(
399- options.email,
400- pqm_message,
401- options.public_branch,
402- options.public_branch_revno,
403- ' '.join(args)
404- )
405+ email=options.email,
406+ pqm_message=pqm_message,
407+ public_branch=options.public_branch,
408+ public_branch_revno=options.public_branch_revno,
409+ test_options=' '.join(args),
410+ prefix=options.prefix)
411
412 try:
413 try:
414 if options.daemon:
415 print 'Starting testrunner daemon...'
416 runner.daemonize()
417-
418- runner.test()
419+ runner.run()
420 except:
421 # Handle exceptions thrown by the test() or daemonize() methods.
422 if options.email:
423
424=== added file 'lib/devscripts/ec2test/remotenode.py'
425--- lib/devscripts/ec2test/remotenode.py 1970-01-01 00:00:00 +0000
426+++ lib/devscripts/ec2test/remotenode.py 2009-12-23 15:23:24 +0000
427@@ -0,0 +1,539 @@
428+# Copyright 2009 Canonical Ltd. This software is licensed under the
429+# GNU Affero General Public License version 3 (see the file LICENSE).
430+
431+__metatype__ = type
432+
433+import os
434+import tempfile
435+import time
436+
437+from itertools import count, takewhile
438+
439+import bzrlib.config
440+import bzrlib.email_message
441+import bzrlib.smtp_connection
442+import subunit
443+
444+from twisted.application import service, internet
445+from twisted.cred import portal, checkers
446+from twisted.conch import manhole, manhole_ssh
447+from twisted.internet import defer
448+from twisted.internet import error
449+from twisted.internet import reactor
450+from twisted.internet.protocol import (
451+ ClientCreator, ProcessProtocol, Protocol, ServerFactory, connectionDone)
452+from twisted.internet.threads import deferToThread
453+from twisted.internet.utils import getProcessOutputAndValue
454+from twisted.protocols.basic import LineOnlyReceiver
455+from twisted.spread import pb
456+from twisted.trial.reporter import TimingTextReporter
457+
458+
459+# The port that nodes in the cluster listen on for PB commands.
460+TEST_NODE_PORT = 8789
461+
462+# The port that the supervisor node listens on to receive test result data
463+# from the workers.
464+# XXX: Make this a property of the factory?
465+TEST_STREAMING_PORT = 8790
466+
467+# SSH port for inspecting the running node.
468+NODE_INSPECTION_PORT = 8791
469+
470+# For throwing stuff away.
471+DEVNULL = open(os.devnull, 'rwb+')
472+
473+
474+def calculate_parcel_size(total, num_parcels):
475+ """Calculate the number of elements that should go into a parcel.
476+
477+ calculate_parcel_size(total, num_parcels) * num_parcels >= total
478+ """
479+ parcel_size = total // num_parcels
480+ while parcel_size * num_parcels < total:
481+ parcel_size += 1
482+ return parcel_size
483+
484+
485+def save_list(iterable, delimiter='\n'):
486+ """Save an iterable of strings to a file, newline-separated."""
487+ fd, filename = tempfile.mkstemp()
488+ f = os.fdopen(fd, 'wb')
489+ for line in iterable:
490+ f.write(line + delimiter)
491+ f.close()
492+ return filename
493+
494+
495+class CombiningSubunitFactory(ServerFactory):
496+ """A factory for listening to subunit streams and collating results."""
497+
498+ def __init__(self, result, logger):
499+ self._result = result
500+ self._logger = logger
501+
502+ def buildProtocol(self, addr):
503+ return SubunitProtocol(self._result, self._logger)
504+
505+
506+class SubunitProtocolLogger:
507+ """Organize logging for the SubunitProtocol."""
508+
509+ def __init__(self, log_dir):
510+ self._log_dir = log_dir
511+ self._log_nums = count(1)
512+ self._logs = []
513+
514+ def open_log(self):
515+ log_index = '%02d' % self._log_nums.next()
516+ log_filename = os.path.join(
517+ self._log_dir, 'subunit%s.log' % log_index)
518+ log = open(log_filename, 'wb', buffering=1)
519+ self._logs.append(log_filename)
520+ return log
521+
522+ def close_log(self, log):
523+ log.close()
524+
525+ @property
526+ def logs(self):
527+ return iter(self._logs)
528+
529+
530+class SubunitProtocol(LineOnlyReceiver):
531+ """An implementation of subunit in Twisted, yay!"""
532+
533+ # subunit separates lines with \n.
534+ delimiter = '\n'
535+
536+ def __init__(self, result, logger):
537+ self._result = result
538+ self._logger = logger
539+
540+ def connectionMade(self):
541+ self._subunit_log = self._logger.open_log()
542+ self._subunit = subunit.TestProtocolServer(self._result, DEVNULL)
543+
544+ def lineReceived(self, line):
545+ # subunit.TestProtocolServer expects the line-ending to be
546+ # present.
547+ line = line + self.delimiter
548+ self._subunit_log.write(line)
549+ self._subunit.lineReceived(line)
550+
551+ def connectionLost(self, reason=connectionDone):
552+ self._subunit.lostConnection()
553+ del self._subunit
554+ self._logger.close_log(self._subunit_log)
555+ del self._subunit_log
556+
557+
558+class TestProcessProtocol(ProcessProtocol):
559+
560+ # XXX: TestProcessProtocol isn't a great name for this. It's really an
561+ # adapter from a line receiver to a process protocol, plus a deferred that
562+ # fires on process end. Doesn't have anything to do with tests really.
563+
564+ # XXX: Timeouts!
565+
566+ def __init__(self, deferred, remote_transport):
567+ self._fire_when_done = deferred
568+ self._remote_transport = remote_transport
569+
570+ def connectionMade(self):
571+ print "Test process started."
572+ self._checkpoint = time.time()
573+
574+ def outReceived(self, data):
575+ self._remote_transport.write(data)
576+ # Say something periodically to prevent watchdogs from killing us.
577+ if time.time() - self._checkpoint > 60:
578+ print "Test process running."
579+ self._checkpoint = time.time()
580+
581+ def processEnded(self, reason):
582+ if self._fire_when_done is not None:
583+ d, self._fire_when_done = self._fire_when_done, None
584+ if reason.check(error.ProcessDone):
585+ d.callback(None)
586+ else:
587+ d.callback(reason)
588+ self._remote_transport.loseConnection()
589+
590+
591+class AccountingTestResult(TimingTextReporter):
592+
593+ def __init__(self, callback, stream):
594+ self._callback = callback
595+ super(AccountingTestResult, self).__init__(stream)
596+
597+ def stopTest(self, test):
598+ self._callback(test)
599+ super(AccountingTestResult, self).stopTest(test)
600+
601+
602+def connect_to_node(address, port=TEST_NODE_PORT, reactor=reactor):
603+ """Get the PB node at 'address'."""
604+ factory = pb.PBClientFactory()
605+ reactor.connectTCP(address, port, factory)
606+ return factory.getRootObject()
607+
608+
609+def connect_for_writing(reactor, host, port):
610+ d = ClientCreator(reactor, Protocol).connectTCP(host, port)
611+ return d.addCallback(lambda protocol: protocol.transport)
612+
613+
614+def run_process(executable, args=(), path=None, reactor=None):
615+ """Run a process and raise an error if it fails."""
616+ print 'Running', executable, args
617+
618+ d = getProcessOutputAndValue(
619+ executable, args=args, env=os.environ,
620+ path=path, reactor=reactor)
621+
622+ def check_result((out, err, code)):
623+ print 'Done'
624+ if code != 0:
625+ # XXX: Is this the best exception to raise?
626+ raise RuntimeError(
627+ "%s failed with code %d" % (executable, code), out, err)
628+ return out
629+
630+ return d.addCallback(check_result)
631+
632+
633+class TreeBuilder:
634+ """Builds the tree once, guarded by a lock."""
635+
636+ # XXX: This could easily be made more general, i.e. ExecuteOnce. In fact,
637+ # there probably is already something similar in Twisted.
638+
639+ def __init__(self, build_dir):
640+ self._lock = defer.DeferredLock()
641+ self._build_dir = build_dir
642+ self._build_result = None
643+
644+ def _maybe_build(self):
645+ if self._build_result is None:
646+ build = run_process('/usr/bin/make', path=self._build_dir)
647+ def built(result):
648+ self._build_result = result
649+ return build.addBoth(built)
650+ else:
651+ return self._build_result
652+
653+ def build(self):
654+ return self._lock.run(self._maybe_build)
655+
656+
657+class CloseableDeferredQueue(defer.DeferredQueue):
658+ """A `DeferredQueue` that can be closed.
659+
660+ Once it has been closed by `queue.close()`, and once there are no
661+ pending items in the queue, `queue.get()` will always return a
662+ Deferred that has already been fired with None.
663+ """
664+
665+ def __init__(self, size=None, backlog=None):
666+ super(CloseableDeferredQueue, self).__init__(size, backlog)
667+ self.closed = False
668+
669+ def close(self):
670+ """Close this queue, and fire None to any waiting processes."""
671+ self.closed = True
672+ while self.waiting:
673+ self.waiting.pop().callback(None)
674+
675+ def get(self):
676+ d = super(CloseableDeferredQueue, self).get()
677+ if self.closed and not d.called:
678+ d.callback(None)
679+ return d
680+
681+
682+class TestNode(pb.Root):
683+ """A node of a cluster that runs the Launchpad test suite."""
684+
685+ def __init__(self, service, branch_dir, log_dir):
686+ self._service = service
687+ self._branch_dir = branch_dir
688+ self._log_dir = log_dir
689+ self._is_supervisor = False
690+ self._tree_builder = TreeBuilder(self._branch_dir)
691+
692+ def _run_process(self, executable, args=(), reactor=None):
693+ """Run a process and raise an error if it fails.
694+
695+ Always runs the process in the `branch_dir`.
696+ """
697+ return run_process(executable, args, self._branch_dir, reactor)
698+
699+ def _find_tests(self):
700+ """Return a list of tests that make up the Launchpad test suite.
701+
702+ :return: A Deferred that fires with a list of test ids.
703+ """
704+ d = self._tree_builder.build()
705+ test_process = os.path.join(self._branch_dir, 'bin', 'test')
706+ d.addCallback(
707+ lambda ignored: self._run_process(test_process, ['--list']))
708+ # ./bin/test --list dumps out all of the test ids with newline
709+ # separation, but it adds a Total: at the end. There can also
710+ # be import fascist warnings after that. Drop the total and
711+ # everything after it so we don't confuse our callers.
712+ p_testline = lambda line: not line.startswith('Total:')
713+ return d.addCallback(
714+ lambda output: list(
715+ takewhile(p_testline, output.splitlines())))
716+
717+ def _add_tests(self, tests, num_parcels):
718+ """Keep a record of the tests to be run.
719+
720+ The set of tests will be used to keep track of what tests have
721+ not yet had results. Also, using a set de-duplicates the list.
722+ """
723+ tests = set(tests)
724+ tests_new = tests - self._tests_all
725+ # Sort the tests to reduce layer setup/teardown churn. The
726+ # test suite also breaks a lot when run out of order.
727+ tests_new_sorted = sorted(tests_new)
728+ # Update state.
729+ self._tests_all.update(tests_new)
730+ self._tests_to_run.update(tests_new)
731+ # Split up new tests into parcels.
732+ parcel_size = calculate_parcel_size(len(tests), num_parcels)
733+ for index in xrange(0, len(tests_new_sorted), parcel_size):
734+ self._test_parcels.put(
735+ tests_new_sorted[index:index+parcel_size])
736+ # Tell everyone.
737+ print ("%s new tests, split into %d parcels, each of at "
738+ "most %d tests" % (len(tests_new), num_parcels, parcel_size))
739+
740+ def _calculate_work(self, num_parcels):
741+ print 'Calculating work'
742+ d = self._find_tests()
743+ d.addCallback(self._add_tests, num_parcels)
744+ return d
745+
746+ def remote_become_supervisor(self, num_parcels):
747+ """Tell this node that it is a supervisor.
748+
749+ Note that supervisor nodes can still be workers, i.e. they can still
750+ run tests.
751+
752+ :param num_parcels: The number of parcels in which to split the work
753+ up into. For optimum sharing - assuming each parcel takes a
754+ similar amount of time to process - this should be the same as the
755+ number of workers, or a multiple thereof.
756+ """
757+ if self._is_supervisor:
758+ return
759+ self._is_supervisor = True
760+ print 'Becoming supervisor'
761+ # All tests.
762+ self._tests_all = set()
763+ # Tests that have not yet been run or attempted.
764+ self._tests_to_run = set()
765+ # Tests that have been attempted but for which there is no result.
766+ self._tests_not_run = set()
767+ # Parcels of tests to run on one go, for efficiency.
768+ self._test_parcels = CloseableDeferredQueue()
769+ # The result object.
770+ self._test_result_log = os.path.join(self._log_dir, 'result.log')
771+ self._test_result = AccountingTestResult(
772+ callback=lambda test: self._tests_to_run.discard(test.id()),
773+ stream=open(self._test_result_log, 'wb', buffering=1))
774+ # Listen for subunit, pushing the results into the test
775+ # result, and logging everything as we go along.
776+ self._test_logger = SubunitProtocolLogger(self._log_dir)
777+ result_service = internet.TCPServer(
778+ TEST_STREAMING_PORT, CombiningSubunitFactory(
779+ self._test_result, self._test_logger))
780+ result_service.setServiceParent(self._service)
781+ # At this point, we have been told how many parcels of work there
782+ # should be, but we do not necessarily have any workers. Calculate
783+ # each parcel of work. Work will be scheduled to workers as they ask
784+ # for it.
785+ self._calculate_work(num_parcels)
786+
787+ def remote_get_work(self):
788+ """Called by workers when they are ready for work."""
789+ assert self._is_supervisor
790+ return self._test_parcels.get()
791+
792+ def remote_done_work(self, parcel):
793+ """Called by workers to declare that they have successfully
794+ completed a parcel of work.
795+
796+ This is used to check that we have at least attempted to run
797+ every test, and also to trigger shutdown of the node.
798+
799+ Returns another parcel of work if there is any.
800+ """
801+ assert self._is_supervisor
802+ self._tests_not_run.update(
803+ self._tests_to_run.intersection(parcel))
804+ self._tests_to_run.difference_update(self._tests_not_run)
805+ if len(self._tests_to_run) == 0:
806+ self._done_as_supervisor()
807+ print ("Work done; parcel had %d tests; "
808+ "%d tests remaining; %d cannot be run." % (
809+ len(parcel), len(self._tests_to_run),
810+ len(self._tests_not_run)))
811+ return self._test_parcels.get()
812+
813+ def _do_work(self, tests):
814+ # Connect to the reporting server.
815+ connected = connect_for_writing(
816+ reactor, self._supervisor_addr, TEST_STREAMING_PORT)
817+ def run_tests(transport):
818+ tests_finished = defer.Deferred()
819+ protocol = TestProcessProtocol(tests_finished, transport)
820+ command = (
821+ '/usr/bin/make', '-o', 'clean', '-o', 'build', 'check',
822+ 'VERBOSITY=--subunit --load-list %s' % save_list(tests))
823+ print "%s$ %s" % (self._branch_dir, command)
824+ reactor.spawnProcess(
825+ protocol, command[0], command,
826+ env=os.environ, path=self._branch_dir)
827+ return tests_finished
828+ connected.addCallback(run_tests)
829+ connected.addErrback(lambda failure: failure.printTraceback())
830+ connected.addBoth(
831+ lambda _: self._supervisor.callRemote('done_work', tests))
832+ connected.addCallback(self._maybe_do_work)
833+
834+ def _maybe_do_work(self, tests):
835+ if tests is None:
836+ return self._done_as_worker()
837+ else:
838+ return self._do_work(tests)
839+
840+ def remote_got_supervisor(self, supervisor_addr):
841+ """Called when the supervisor node is up and running."""
842+ print 'Got supervisor', supervisor_addr
843+ d = connect_to_node(supervisor_addr)
844+ def got_supervisor(supervisor):
845+ self._supervisor_addr = supervisor_addr
846+ self._supervisor = supervisor
847+ d.addCallback(got_supervisor)
848+ d.addCallback(lambda _: self._tree_builder.build())
849+ d.addCallback(lambda _: self._supervisor.callRemote('get_work'))
850+ d.addCallback(self._maybe_do_work)
851+
852+ def _send_summary_email(self):
853+ if self._test_result.wasSuccessful():
854+ if len(self._tests_to_run) > 0 or len(self._tests_not_run) > 0:
855+ subject = 'Test results: INCONCLUSIVE'
856+ else:
857+ subject = 'Test results: SUCCESS'
858+ else:
859+ subject = 'Tests results: FAILURE'
860+
861+ body = []
862+
863+ # Warn about tests that have not been run at all.
864+ if len(self._tests_to_run) > 0:
865+ body.append(
866+ "Warning: %d tests were not attempted. See untested.txt "
867+ "for the full list." % len(self._tests_to_run))
868+
869+ # Warn about tests that could not be run.
870+ if len(self._tests_not_run) > 0:
871+ body.append(
872+ "Warning: %d tests could not be run. See unrunnable.txt "
873+ "for the full list." % len(self._tests_not_run))
874+
875+ # Pad a bit.
876+ body.extend(['', '--', ''])
877+
878+ # XXX: Change the email address :)
879+ message = bzrlib.email_message.EmailMessage(
880+ "gavin.panella@canonical.com",
881+ ["gavin.panella@canonical.com"],
882+ subject, "\n".join(body))
883+
884+ # Attach the results log.
885+ message.add_inline_attachment(
886+ open(self._test_result_log, 'rb').read(),
887+ os.path.basename(self._test_result_log))
888+
889+ # Attach a list of tests that were not run.
890+ if len(self._tests_to_run) > 0:
891+ message.add_inline_attachment(
892+ "\n".join(sorted(self._tests_to_run)), "untested.txt")
893+
894+ # Attach a list of tests that could not be run.
895+ if len(self._tests_not_run) > 0:
896+ message.add_inline_attachment(
897+ "\n".join(sorted(self._tests_not_run)), "unrunnable.txt")
898+
899+ # Attach the subunit logs.
900+ for log_filename in self._test_logger.logs:
901+ message.add_inline_attachment(
902+ open(log_filename, 'rb').read(),
903+ os.path.basename(log_filename))
904+
905+ def send():
906+ config = bzrlib.config.GlobalConfig()
907+ connection = bzrlib.smtp_connection.SMTPConnection(config)
908+ connection.send_email(message)
909+ return deferToThread(send)
910+
911+ def _done_as_supervisor(self):
912+ d = self._send_summary_email()
913+ d.addErrback(lambda failure: failure.printTraceback())
914+ d.addBoth(lambda _: self._shutdown())
915+
916+ def _done_as_worker(self):
917+ if not self._is_supervisor:
918+ self._shutdown()
919+
920+ def _shutdown(self):
921+ # XXX: There's probably a better way of doing this, but this
922+ # seems to work reliably for now.
923+ return reactor.callLater(2, reactor.stop)
924+
925+
926+def make_root(service, node_dir):
927+ node_dir = os.path.abspath(node_dir)
928+ branch_dir = os.path.join(node_dir, 'launchpad', 'test')
929+ log_dir = os.path.join(node_dir, 'www')
930+ # XXX: Might be better for the node service to actually implement
931+ # IService.
932+ return TestNode(service, branch_dir, log_dir)
933+
934+
935+def make_node_service(node):
936+ return internet.TCPServer(TEST_NODE_PORT, pb.PBServerFactory(node))
937+
938+
939+def make_inspection_service(**namespace):
940+ def getManholeFactory(ns):
941+ realm = manhole_ssh.TerminalRealm()
942+ def getManhole(_):
943+ return manhole.ColoredManhole(ns)
944+ realm.chainedProtocolFactory.protocolFactory = getManhole
945+ p = portal.Portal(realm)
946+ p.registerChecker(
947+ checkers.InMemoryUsernamePasswordDatabaseDontUse(admin="admin"))
948+ return manhole_ssh.ConchFactory(p)
949+ return internet.TCPServer(
950+ NODE_INSPECTION_PORT, getManholeFactory(namespace))
951+
952+
953+# The Launchpad Test node application.
954+application = service.Application("Launchpad Test Node")
955+
956+# The root, controller. It's a bit of a muddle right now.
957+root = make_root(application, os.environ.get('NODE_DIR', '/var'))
958+
959+# The PB node service.
960+node_service = make_node_service(root)
961+node_service.setServiceParent(application)
962+
963+# The inspection service.
964+inspection_service = make_inspection_service(
965+ application=application, root=root)
966+inspection_service.setServiceParent(application)
967
968=== added file 'lib/devscripts/ec2test/remotenodekiller.py'
969--- lib/devscripts/ec2test/remotenodekiller.py 1970-01-01 00:00:00 +0000
970+++ lib/devscripts/ec2test/remotenodekiller.py 2009-12-23 15:23:24 +0000
971@@ -0,0 +1,146 @@
972+#!/usr/bin/python
973+
974+import optparse
975+import sys
976+import traceback
977+
978+from glob import glob
979+from os import kill
980+from os.path import exists, getmtime
981+from subprocess import call
982+from time import sleep, time
983+
984+
985+def read_pid_from_file(filename):
986+ """Read the pid from a file.
987+
988+ IO errors are allowed to propagate, but if the first line of the
989+ file cannot converted to an int then None is returned, thus
990+ permitting odd looking pid files.
991+ """
992+ fd = open(filename, 'rb')
993+ try:
994+ pid = fd.readline().strip()
995+ finally:
996+ fd.close()
997+ try:
998+ return int(pid)
999+ except ValueError:
1000+ return None
1001+
1002+
1003+def watch(wait, pids, pid_files, log_file_patterns, expiry_age):
1004+ """Watch for changes in the given resources.
1005+
1006+ This checks:
1007+
1008+ * that all the specified processes are running,
1009+
1010+ * that all the specified pid files exist,
1011+
1012+ * that the process for which the pid file exists is running, if
1013+ the first line of the pid file is recognised as a pid (i.e. an
1014+ integer),
1015+
1016+ * that at least one log file exists for each given pattern,
1017+
1018+ * that at least one log file for each given pattern has been
1019+ written to recently.
1020+
1021+ """
1022+ try:
1023+ while True:
1024+ # Don't spin round.
1025+ sleep(wait)
1026+ # Check that all pids are receiving signals.
1027+ for pid in pids:
1028+ kill(pid, 0)
1029+ # If any pid file has gone, return.
1030+ for filename in pid_files:
1031+ if not exists(filename):
1032+ return
1033+ # If a pid can be obtained from the file, check it's
1034+ # running.
1035+ pid = read_pid_from_file(filename)
1036+ if pid is not None:
1037+ kill(pid, 0)
1038+ # Check log files.
1039+ expired = time() - expiry_age
1040+ for log_file_pattern in log_file_patterns:
1041+ log_files = glob(log_file_pattern)
1042+ # If no log files exist for this pattern, return.
1043+ if len(log_files) == 0:
1044+ return
1045+ # If none of the log files have been written to
1046+ # recently, return.
1047+ log_file_max_mtime = max(
1048+ getmtime(log_file) for log_file in log_files)
1049+ if log_file_max_mtime < expired:
1050+ return
1051+ except KeyboardInterrupt:
1052+ raise
1053+ except:
1054+ traceback.print_exc()
1055+
1056+
1057+def main(args):
1058+ parser = optparse.OptionParser(
1059+ usage="%prog [options] command", description=(
1060+ "Run a arbitrary command when it appears that a process has "
1061+ "exited or stopped running."))
1062+ parser.add_option(
1063+ '--pid', action='append', dest='pids', type=int, help=(
1064+ "A pid to watch. If the process stops running, "
1065+ "the command is run."))
1066+ parser.add_option(
1067+ '--pid-file', action='append', dest='pid_files', help=(
1068+ "A pid file to watch. If any pid file disappears, "
1069+ "the command is run."))
1070+ parser.add_option(
1071+ '--log-file-pattern', action='append', dest='log_file_patterns',
1072+ help=(
1073+ "A log (or other) file to watch. If any log file disappears, "
1074+ "the command is run. If any log file gets old (see "
1075+ "--expiry-age), the command is run. Values specified here are "
1076+ "subject to glob expansion."))
1077+ parser.add_option(
1078+ '--expiry-age', action='store', dest='expiry_age', help=(
1079+ "The number of seconds since a log file was modified before "
1080+ "it is considered to be expired. Defaults to %default seconds."))
1081+ parser.add_option(
1082+ '--wait', action='store', type=int, dest='wait', help=(
1083+ "How long to wait between each check of the pids and logs. "
1084+ "Defaults to %default seconds."))
1085+ parser.set_defaults(expiry_age=600, wait=10)
1086+
1087+ options, command = parser.parse_args(args)
1088+
1089+ if options.pids is None:
1090+ options.pids = []
1091+ if options.pid_files is None:
1092+ options.pid_files = []
1093+ if options.log_file_patterns is None:
1094+ options.log_file_patterns = []
1095+
1096+ if (len(options.pids) + len(options.pid_files) +
1097+ len(options.log_file_patterns)) == 0:
1098+ parser.error("Nothing to watch.")
1099+ if len(command) == 0:
1100+ parser.error("No command to run.")
1101+ if options.expiry_age < 1:
1102+ parser.error("The expiry age must be at least 1 second.")
1103+ if options.wait < 1:
1104+ parser.error("The wait must be at least 1 second.")
1105+
1106+ watch(
1107+ options.wait, options.pids, options.pid_files,
1108+ options.log_file_patterns, options.expiry_age)
1109+
1110+ return call(command)
1111+
1112+
1113+if __name__ == '__main__':
1114+ try:
1115+ sys.exit(main(sys.argv[1:]))
1116+ except KeyboardInterrupt:
1117+ sys.exit(1)
1118
1119=== modified file 'lib/devscripts/ec2test/testrunner.py'
1120--- lib/devscripts/ec2test/testrunner.py 2009-12-01 22:53:47 +0000
1121+++ lib/devscripts/ec2test/testrunner.py 2009-12-23 15:23:24 +0000
1122@@ -285,23 +285,10 @@
1123 self.email = email
1124
1125 # Email configuration.
1126- if email is not None or pqm_message is not None:
1127- self._smtp_server = config.get_user_option('smtp_server')
1128- if self._smtp_server is None or self._smtp_server == 'localhost':
1129- raise ValueError(
1130- 'To send email, a remotely accessible smtp_server (and '
1131- 'smtp_username and smtp_password, if necessary) must be '
1132- 'configured in bzr. See the SMTP server information '
1133- 'here: https://wiki.canonical.com/EmailSetup .')
1134- self._smtp_username = config.get_user_option('smtp_username')
1135- self._smtp_password = config.get_user_option('smtp_password')
1136- from_email = config.username()
1137- if not from_email:
1138- # XXX: JonathanLange 2009-10-04: Is this strictly true? I
1139- # can't actually see where this is used.
1140- raise ValueError(
1141- 'To send email, your bzr email address must be set '
1142- '(use ``bzr whoami``).')
1143+ self._smtp_server = config.get_user_option('smtp_server')
1144+ self._smtp_username = config.get_user_option('smtp_username')
1145+ self._smtp_password = config.get_user_option('smtp_password')
1146+ self._from_email = config.username()
1147
1148 self._instance = instance
1149
1150@@ -314,29 +301,40 @@
1151
1152 def configure_system(self):
1153 user_connection = self._instance.connect()
1154- as_user = user_connection.perform
1155- # Set up bazaar.conf with smtp information if necessary
1156- if self.email or self.message:
1157- as_user('mkdir .bazaar')
1158- bazaar_conf_file = user_connection.sftp.open(
1159- ".bazaar/bazaar.conf", 'w')
1160- bazaar_conf_file.write(
1161- 'smtp_server = %s\n' % (self._smtp_server,))
1162- if self._smtp_username:
1163- bazaar_conf_file.write(
1164- 'smtp_username = %s\n' % (self._smtp_username,))
1165- if self._smtp_password:
1166- bazaar_conf_file.write(
1167- 'smtp_password = %s\n' % (self._smtp_password,))
1168- bazaar_conf_file.close()
1169- # Copy remote ec2-remote over
1170- self.log('Copying ec2test-remote.py to remote machine.\n')
1171- user_connection.sftp.put(
1172- os.path.join(os.path.dirname(os.path.realpath(__file__)),
1173- 'ec2test-remote.py'),
1174- '/var/launchpad/ec2test-remote.py')
1175- # Set up launchpad login and email
1176- as_user('bzr launchpad-login %s' % (self._launchpad_login,))
1177+ # Set up launchpad login.
1178+ user_connection.perform(
1179+ 'bzr launchpad-login %s' % (self._launchpad_login,))
1180+ # Done.
1181+ user_connection.close()
1182+
1183+ def configure_email(self):
1184+ # Sanity checks.
1185+ if self._smtp_server is None or self._smtp_server == 'localhost':
1186+ raise ValueError(
1187+ 'To send email, a remotely accessible smtp_server (and '
1188+ 'smtp_username and smtp_password, if necessary) must be '
1189+ 'configured in bzr. See the SMTP server information '
1190+ 'here: https://wiki.canonical.com/EmailSetup .')
1191+ if not self._from_email:
1192+ # XXX: JonathanLange 2009-10-04: Is this strictly true? I
1193+ # can't actually see where this is used.
1194+ raise ValueError(
1195+ 'To send email, your bzr email address must be set '
1196+ '(use ``bzr whoami``).')
1197+ # Set up bazaar.conf with smtp information.
1198+ user_connection = self._instance.connect()
1199+ user_connection.perform('mkdir -p .bazaar')
1200+ bazaar_conf_file = user_connection.sftp.open(
1201+ ".bazaar/bazaar.conf", 'w')
1202+ bazaar_conf_file.write(
1203+ 'smtp_server = %s\n' % (self._smtp_server,))
1204+ if self._smtp_username:
1205+ bazaar_conf_file.write(
1206+ 'smtp_username = %s\n' % (self._smtp_username,))
1207+ if self._smtp_password:
1208+ bazaar_conf_file.write(
1209+ 'smtp_password = %s\n' % (self._smtp_password,))
1210+ bazaar_conf_file.close()
1211 user_connection.close()
1212
1213 def prepare_tests(self):
1214@@ -434,9 +432,19 @@
1215
1216 def run_tests(self):
1217 self.configure_system()
1218+ if self.email is not None or self.message is not None:
1219+ self.configure_email()
1220 self.prepare_tests()
1221 user_connection = self._instance.connect()
1222
1223+ # Copy remote ec2-remote over.
1224+ self.log(
1225+ 'Copying remote.py to ec2test-remote.py on remote machine.\n')
1226+ user_connection.sftp.put(
1227+ os.path.join(
1228+ os.path.dirname(os.path.realpath(__file__)), 'remote.py'),
1229+ '/var/launchpad/ec2test-remote.py')
1230+
1231 # Make sure we activate the failsafe --shutdown feature. This will
1232 # make the server shut itself down after the test run completes, or
1233 # if the test harness suffers a critical failure.
1234@@ -523,3 +531,43 @@
1235 # ec2test-remote.py wants the extra options to be after a double-
1236 # dash.
1237 return ('--', self.test_options)
1238+
1239+ def run_node(self):
1240+ """Start the test node."""
1241+ self.configure_system()
1242+ self.configure_email()
1243+ self.prepare_tests()
1244+ user_connection = self._instance.connect()
1245+ # Install Twisted.
1246+ user_connection.perform(
1247+ 'sudo aptitude -y install'
1248+ ' python-twisted-core python-twisted-conch')
1249+ # Get Subunit. This is ugly, but I can't find a better way of
1250+ # doing it right now. Revision 90 is known to work.
1251+ user_connection.run_with_ssh_agent(
1252+ 'bzr branch -r90 lp:subunit ~/subunit')
1253+ # Copy remotenode.py and remotenodekiller.py over.
1254+ self.log('Copying remotenode.py to remote machine.\n')
1255+ from_dir = os.path.dirname(os.path.realpath(__file__))
1256+ user_connection.sftp.put(
1257+ os.path.join(from_dir, 'remotenode.py'),
1258+ '/var/launchpad/remotenode.py')
1259+ user_connection.sftp.put(
1260+ os.path.join(from_dir, 'remotenodekiller.py'),
1261+ '/var/launchpad/remotenodekiller.py')
1262+ # Start the remote node.
1263+ user_connection.perform(
1264+ 'PYTHONPATH=~/subunit/python twistd'
1265+ ' --python /var/launchpad/remotenode.py'
1266+ ' --pidfile /var/launchpad/remotenode.pid'
1267+ ' --logfile /var/www/remotenode.log')
1268+ # Daemonize the killer.
1269+ user_connection.run_as_daemon(
1270+ "python /var/launchpad/remotenodekiller.py"
1271+ " --pid-file /var/launchpad/remotenode.pid"
1272+ " --log-file-pattern '/var/www/*.log*'"
1273+ " -- sudo shutdown -h now")
1274+ # Remove the index.html from the www directory so we can see
1275+ # the logs being written therein.
1276+ user_connection.perform('rm /var/www/index.html')
1277+ user_connection.close()
1278
1279=== added file 'utilities/pb-shell'
1280--- utilities/pb-shell 1970-01-01 00:00:00 +0000
1281+++ utilities/pb-shell 2009-12-23 15:23:24 +0000
1282@@ -0,0 +1,89 @@
1283+#!/usr/bin/python
1284+"""A interactive PB (Perspective Broker) shell."""
1285+
1286+__metatype__ = type
1287+
1288+import code
1289+
1290+from functools import partial
1291+
1292+from twisted.internet import reactor, threads
1293+from twisted.spread import pb
1294+
1295+
1296+def block(func, *args, **kwargs):
1297+ """Run a function in the reactor and wait for the result.
1298+
1299+ If the result is a `pb.RemoteReference`, wrap it with a
1300+ `BlockingRemoteReference`.
1301+ """
1302+ result = threads.blockingCallFromThread(
1303+ reactor, func, *args, **kwargs)
1304+ if isinstance(result, pb.RemoteReference):
1305+ result = BlockingRemoteReference(result)
1306+ return result
1307+
1308+
1309+def blocking(wrapped):
1310+ """A decorator to turn an async PB call into a blocking one."""
1311+ return partial(block, wrapped)
1312+
1313+
1314+class BlockingRemoteReference:
1315+ """A blocking version of a `pb.RemoteReference`.
1316+
1317+ Accessing any attribute (excluding those starting with an
1318+ underscore) returns a blocking wrapper around a `callRemote` for
1319+ the given name.
1320+ """
1321+
1322+ def __init__(self, reference):
1323+ self.reference = reference
1324+
1325+ def __getattr__(self, name):
1326+ if name.startswith('_'):
1327+ raise AttributeError(name)
1328+ return blocking(
1329+ partial(self.reference.callRemote, name))
1330+
1331+
1332+def disaster(failure):
1333+ failure.printTraceback()
1334+ return failure
1335+
1336+
1337+def connect(host, port=8789):
1338+ """Connect to a PB server, returning the root object."""
1339+ # This function blocks, and is meant to be called from *outside*
1340+ # of the event loop.
1341+ def _connect():
1342+ """Connect to a PB server, returning the root object."""
1343+ factory = pb.PBClientFactory()
1344+ reactor.connectTCP(host, port, factory)
1345+ d = factory.getRootObject()
1346+ d.addErrback(disaster)
1347+ return d
1348+ return block(_connect)
1349+
1350+
1351+def interact(ns):
1352+ try:
1353+ from IPython.ipapi import launch_new_instance
1354+ except ImportError:
1355+ return code.interact(banner='', local=ns)
1356+ else:
1357+ return launch_new_instance(ns)
1358+
1359+
1360+def console():
1361+ """Start the interactive shell."""
1362+ commands = {'connect': connect}
1363+ try:
1364+ interact(commands)
1365+ finally:
1366+ reactor.callFromThread(reactor.stop)
1367+
1368+
1369+if __name__ == '__main__':
1370+ reactor.callInThread(console)
1371+ reactor.run()