Merge lp:~allenap/launchpad/ec2-parry into lp:launchpad
- ec2-parry
- Merge into devel
Status: | Rejected |
---|---|
Rejected by: | Gavin Panella |
Proposed branch: | lp:~allenap/launchpad/ec2-parry |
Merge into: | lp:launchpad |
Diff against target: |
1371 lines (+1043/-96) 8 files modified
lib/devscripts/ec2test/account.py (+3/-0) lib/devscripts/ec2test/builtins.py (+100/-0) lib/devscripts/ec2test/instance.py (+11/-2) lib/devscripts/ec2test/remote.py (+67/-54) lib/devscripts/ec2test/remotenode.py (+539/-0) lib/devscripts/ec2test/remotenodekiller.py (+146/-0) lib/devscripts/ec2test/testrunner.py (+88/-40) utilities/pb-shell (+89/-0) |
To merge this branch: | bzr merge lp:~allenap/launchpad/ec2-parry |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Michael Hudson-Doyle | Abstain | ||
Review via email: mp+14693@code.launchpad.net |
Commit message
Description of the change
Gavin Panella (allenap) wrote : | # |
- 9824. By Gavin Panella
-
Merge devel.
- 9825. By Gavin Panella
-
time.time() is UTC already.
Michael Hudson-Doyle (mwhudson) wrote : | # |
Hi Gavin,
This is about a quarter of a review -- sorry time crunch at the sprint
hasn't let me do anything more :(
Excited to see this progressing naturally, a little concerned about
the complexity...
> === modified file 'lib/devscripts
> --- lib/devscripts/
> +++ lib/devscripts/
> @@ -104,6 +104,9 @@
> security_
> security_
> security_
> + # Authorize Perspective Broker and Subunit. XXX: This is way
> + # too permissive.
Er, I'd say so yes.
Given that what I think you _actually_ want is ec2 instances to be
able to connect to each other on this protocol, not external machines,
you don't want to do the authorization like this, but rather use some
other EC2 feature I currently can't remember the name of :-) Read the
security group docs I guess.
> + security_
> for network in demo_networks:
> # Add missing netmask info for single ips.
> if '/' not in network:
> === modified file 'lib/devscripts
> --- lib/devscripts/
> +++ lib/devscripts/
> @@ -306,6 +306,112 @@
> instance.
>
>
> +class cmd_parallel_
> + """Run the test suite in ec2 in parallel."""
> +
> + takes_options = [
> + branch_option,
> + trunk_option,
> + machine_id_option,
> + instance_
> + postmortem_option,
> + include_
> + Option(
> + 'jobs', short_name='j', type=int, argname="NUM",
> + help=('The number of instances to start and distribute the '
> + 'test command across.')),
> + ]
> +
> + takes_args = ['test_branch?']
> +
> + def run(self, test_branch=None, branch=None, trunk=False, machine=None,
> + instance_
> + include_
> + if branch is None:
> + branch = []
> + branches, test_branch = _get_branches_
> + trunk, branch, test_branch)
> +
> + if jobs < 1:
> + raise BzrCommandError(
> + 'The number of instances must be greater than zero.')
> +
> + from twisted.internet import reactor
> + from twisted.
> + from twisted.
> +
> + from devscripts.
I don't think there is any reason these imports can't be at the module
level.
> + # Keep a record of workers as they start.
> + workers = []
> +
> + # This is fired once the supervisor has started.
> + supervisor_startup = DeferredLock()
> +
> + def show_error(f...
- 9826. By Gavin Panella
-
Move imports to module level.
- 9827. By Gavin Panella
-
Use DeferredLock.run() where possible.
- 9828. By Gavin Panella
-
Return the deferred for starting the worker.
- 9829. By Gavin Panella
-
Just prepend LC_ALL=C to the command.
- 9830. By Gavin Panella
-
Only configure email when it's needed.
Gavin Panella (allenap) wrote : | # |
On Thu, 12 Nov 2009 09:19:31 -0000
Michael Hudson <email address hidden> wrote:
> Review: Abstain
> Hi Gavin,
>
> This is about a quarter of a review -- sorry time crunch at the sprint
> hasn't let me do anything more :(
>
> Excited to see this progressing naturally, a little concerned about
> the complexity...
Cool :)
Complexity bad, but I know it's there. This branch probably represents
a mid-point in my understanding and approach to the problem. In my
head, some of this can already be removed or refactored down to
something simpler, but that's for a later branch. And maybe someone
else will do it.
>
> > === modified file 'lib/devscripts
> > --- lib/devscripts/
> > +++ lib/devscripts/
> > @@ -104,6 +104,9 @@
> > security_
> > security_
> > security_
> > + # Authorize Perspective Broker and Subunit. XXX: This is way
> > + # too permissive.
>
> Er, I'd say so yes.
>
> Given that what I think you _actually_ want is ec2 instances to be
> able to connect to each other on this protocol, not external machines,
> you don't want to do the authorization like this, but rather use some
> other EC2 feature I currently can't remember the name of :-) Read the
> security group docs I guess.
Yes, I agree totally. This was a hack to get things working. I think
it's possible to grant one security group access to another, but I've
not tried it. Also, machines started together as part of a reservation
or with the same security group might have different properties.
Currently the wrappers in devscripts/ec2test make it difficult to
start up multiple instances as part of a reservation, and security
groups are not exposed in the wrappers iirc, but that's just a matter
of code.
Anyway, this is definitely something I mean to fix. You've probably
seen in the rest of the code that there is quite a lot that could be
improved, simplified, or removed. But I'm keen to get this landed so
that other masochists can dogfood it.
>
> > + security_
> > for network in demo_networks:
> > # Add missing netmask info for single ips.
> > if '/' not in network:
>
> > === modified file 'lib/devscripts
> > --- lib/devscripts/
> > +++ lib/devscripts/
> > @@ -306,6 +306,112 @@
> > instance.
> >
> >
> > +class cmd_parallel_
> > + """Run the test suite in ec2 in parallel."""
> > +
> > + takes_options = [
> > + branch_option,
> > + trunk_option,
> > + machine_id_option,
> > + instance_
> > + postmortem_option,
> > + include_
> > + Option(
> > + 'jobs', short_name='j', type=int, argname="NUM",
> > + help=(...
- 9831. By Gavin Panella
-
Don't fail if the .bazaar directory already exists.
- 9832. By Gavin Panella
-
Prepending LC_ALL=C to commands either didn't have a full effect, or broke things. Reverting.
- 9833. By Gavin Panella
-
Merge devel.
- 9834. By Gavin Panella
-
twistd on Hardy does not recognize --umask.
- 9835. By Gavin Panella
-
DeferredLock.run() does not pass any args.
- 9836. By Gavin Panella
-
Create test parcels in order to reduce layer churn (also because the test suite seems to break a *lot* when run out of order). Keep track of new tests, so that _add_tests() can be called more than once (not that it needs to be, but this behaviour will be useful later on for retrying unrunnable tests), and keep a record of all tests. Remove some other noise from the test list obtained by running bin/test --list.
- 9837. By Gavin Panella
-
Rename self._tests to self._tests_to_run.
- 9838. By Gavin Panella
-
Merge devel.
- 9839. By Gavin Panella
-
Merge devel.
- 9840. By Gavin Panella
-
Merge devel.
Gavin Panella (allenap) wrote : | # |
The code has moved on so much from this merge proposal, and moved into a separate branch, so I'm going to reject it.
Unmerged revisions
- 9841. By Gavin Panella
-
Merge devel, resolving several conflicts.
- 9840. By Gavin Panella
-
Merge devel.
- 9839. By Gavin Panella
-
Merge devel.
- 9838. By Gavin Panella
-
Merge devel.
- 9837. By Gavin Panella
-
Rename self._tests to self._tests_to_run.
- 9836. By Gavin Panella
-
Create test parcels in order to reduce layer churn (also because the test suite seems to break a *lot* when run out of order). Keep track of new tests, so that _add_tests() can be called more than once (not that it needs to be, but this behaviour will be useful later on for retrying unrunnable tests), and keep a record of all tests. Remove some other noise from the test list obtained by running bin/test --list.
- 9835. By Gavin Panella
-
DeferredLock.run() does not pass any args.
- 9834. By Gavin Panella
-
twistd on Hardy does not recognize --umask.
- 9833. By Gavin Panella
-
Merge devel.
- 9832. By Gavin Panella
-
Prepending LC_ALL=C to commands either didn't have a full effect, or broke things. Reverting.
Preview Diff
1 | === modified file 'lib/devscripts/ec2test/account.py' |
2 | --- lib/devscripts/ec2test/account.py 2009-10-08 14:50:19 +0000 |
3 | +++ lib/devscripts/ec2test/account.py 2009-12-23 15:23:24 +0000 |
4 | @@ -104,6 +104,9 @@ |
5 | security_group.authorize('tcp', 22, 22, '%s/32' % ip) |
6 | security_group.authorize('tcp', 80, 80, '%s/32' % ip) |
7 | security_group.authorize('tcp', 443, 443, '%s/32' % ip) |
8 | + # Authorize Perspective Broker and Subunit. XXX: This is way |
9 | + # too permissive. |
10 | + security_group.authorize('tcp', 8789, 8790, '0.0.0.0/0') |
11 | for network in demo_networks: |
12 | # Add missing netmask info for single ips. |
13 | if '/' not in network: |
14 | |
15 | === modified file 'lib/devscripts/ec2test/builtins.py' |
16 | --- lib/devscripts/ec2test/builtins.py 2009-11-30 16:26:24 +0000 |
17 | +++ lib/devscripts/ec2test/builtins.py 2009-12-23 15:23:24 +0000 |
18 | @@ -18,11 +18,16 @@ |
19 | |
20 | import socket |
21 | |
22 | +from twisted.internet import reactor |
23 | +from twisted.internet.defer import DeferredList, DeferredLock |
24 | +from twisted.internet.threads import deferToThread |
25 | + |
26 | from devscripts import get_launchpad_root |
27 | |
28 | from devscripts.ec2test.credentials import EC2Credentials |
29 | from devscripts.ec2test.instance import ( |
30 | AVAILABLE_INSTANCE_TYPES, DEFAULT_INSTANCE_TYPE, EC2Instance) |
31 | +from devscripts.ec2test.remotenode import connect_to_node |
32 | from devscripts.ec2test.session import EC2SessionName |
33 | from devscripts.ec2test.testrunner import EC2TestRunner, TRUNK_BRANCH |
34 | |
35 | @@ -306,6 +311,101 @@ |
36 | instance.set_up_and_run(postmortem, not headless, runner.run_tests) |
37 | |
38 | |
39 | +class cmd_parallel_test(EC2Command): |
40 | + """Run the test suite in ec2 in parallel.""" |
41 | + |
42 | + takes_options = [ |
43 | + branch_option, |
44 | + trunk_option, |
45 | + machine_id_option, |
46 | + instance_type_option, |
47 | + postmortem_option, |
48 | + include_download_cache_changes_option, |
49 | + Option( |
50 | + 'jobs', short_name='j', type=int, argname="NUM", |
51 | + help=('The number of instances to start and distribute the ' |
52 | + 'test command across.')), |
53 | + ] |
54 | + |
55 | + takes_args = ['test_branch?'] |
56 | + |
57 | + def run(self, test_branch=None, branch=None, trunk=False, machine=None, |
58 | + instance_type=DEFAULT_INSTANCE_TYPE, postmortem=False, |
59 | + include_download_cache_changes=False, jobs=1): |
60 | + if branch is None: |
61 | + branch = [] |
62 | + branches, test_branch = _get_branches_and_test_branch( |
63 | + trunk, branch, test_branch) |
64 | + |
65 | + if jobs < 1: |
66 | + raise BzrCommandError( |
67 | + 'The number of instances must be greater than zero.') |
68 | + |
69 | + # Keep a record of workers as they start. |
70 | + workers = [] |
71 | + |
72 | + # This is fired once the supervisor has started. |
73 | + supervisor_startup = DeferredLock() |
74 | + |
75 | + def show_error(failure): |
76 | + failure.printTraceback() |
77 | + |
78 | + def start_instance_and_node(instance, runner): |
79 | + # Bring up the instance and start the test node service. |
80 | + try: |
81 | + instance.start() |
82 | + runner.run_node() |
83 | + except: |
84 | + instance.shutdown() |
85 | + raise |
86 | + else: |
87 | + return instance |
88 | + |
89 | + def get_root(instance): |
90 | + return connect_to_node(instance.hostname) |
91 | + |
92 | + def start_tests(root, instance): |
93 | + workers.append((root, instance)) |
94 | + if len(workers) == 1: |
95 | + # Start the supervisor. |
96 | + supervisor_startup.run( |
97 | + root.callRemote, 'become_supervisor', jobs) |
98 | + # Start the worker. |
99 | + supervisor_root, supervisor_instance = workers[0] |
100 | + # Wait for the supervisor to start. |
101 | + d = supervisor_startup.run(lambda: None) |
102 | + # Call the node *outside* of the lock. |
103 | + return d.addCallback(lambda _: root.callRemote( |
104 | + 'got_supervisor', supervisor_instance.hostname)) |
105 | + |
106 | + def create_and_start_instance(): |
107 | + session_name = EC2SessionName.make(EC2TestRunner.name) |
108 | + instance = EC2Instance.make( |
109 | + session_name, instance_type, machine) |
110 | + runner = EC2TestRunner( |
111 | + test_branch, branches=branches, |
112 | + include_download_cache_changes=include_download_cache_changes, |
113 | + instance=instance, launchpad_login=instance._launchpad_login) |
114 | + # Do the startup in a thread because boto is |
115 | + # blocking. XXX: Use txAWS here instead? |
116 | + startup = deferToThread(start_instance_and_node, instance, runner) |
117 | + startup.addCallback(get_root) |
118 | + startup.addCallback(start_tests, instance) |
119 | + return instance, startup |
120 | + |
121 | + startups = [] |
122 | + for job in range(jobs): |
123 | + instance, startup = create_and_start_instance() |
124 | + startup.addErrback(show_error) |
125 | + startups.append(startup) |
126 | + |
127 | + # Stop when all the instances have been started. |
128 | + started = DeferredList(startups) |
129 | + started.addBoth(lambda _: reactor.stop()) |
130 | + |
131 | + reactor.run() |
132 | + |
133 | + |
134 | class cmd_land(EC2Command): |
135 | """Land a merge proposal on Launchpad.""" |
136 | |
137 | |
138 | === modified file 'lib/devscripts/ec2test/instance.py' |
139 | --- lib/devscripts/ec2test/instance.py 2009-11-27 07:24:49 +0000 |
140 | +++ lib/devscripts/ec2test/instance.py 2009-12-23 15:23:24 +0000 |
141 | @@ -404,8 +404,6 @@ |
142 | def set_up_and_run(self, postmortem, shutdown, func, *args, **kw): |
143 | """Start, run `func` and then maybe shut down. |
144 | |
145 | - :param config: A dictionary specifying details of how the instance |
146 | - should be run: |
147 | :param postmortem: If true, any exceptions will be caught and an |
148 | interactive session run to allow debugging the problem. |
149 | :param shutdown: If true, shut down the instance after `func` and |
150 | @@ -574,6 +572,11 @@ |
151 | self._ssh = ssh |
152 | self._sftp = None |
153 | |
154 | + def _command_with_locale(self, cmd): |
155 | + # Default the locale to C to stop locale warnings (especially |
156 | + # from bzr) from clogging up the output. |
157 | + return 'export LC_ALL=C; ' + cmd |
158 | + |
159 | @property |
160 | def sftp(self): |
161 | if self._sftp is None: |
162 | @@ -588,6 +591,7 @@ |
163 | :param out: A stream to write the output of the remote command to. |
164 | :param err: A stream to write the error of the remote command to. |
165 | """ |
166 | + cmd = self._command_with_locale(cmd) |
167 | if out is None: |
168 | out = sys.stdout |
169 | if err is None: |
170 | @@ -623,12 +627,17 @@ |
171 | raise RuntimeError('Command failed: %s' % (cmd,)) |
172 | return res |
173 | |
174 | + def run_as_daemon(self, cmd): |
175 | + """Start `cmd` as a daemonized process on the server.""" |
176 | + return self.perform('(%s </dev/null >/dev/null 2>/dev/null &)' % cmd) |
177 | + |
178 | def run_with_ssh_agent(self, cmd, ignore_failure=False): |
179 | """Run 'cmd' in a subprocess. |
180 | |
181 | Use this to run commands that require local SSH credentials. For |
182 | example, getting private branches from Launchpad. |
183 | """ |
184 | + cmd = self._command_with_locale(cmd) |
185 | self._instance.log( |
186 | '%s@%s$ %s\n' |
187 | % (self._username, self._instance._boto_instance.id, cmd)) |
188 | |
189 | === renamed file 'lib/devscripts/ec2test/ec2test-remote.py' => 'lib/devscripts/ec2test/remote.py' |
190 | --- lib/devscripts/ec2test/ec2test-remote.py 2009-10-09 15:04:24 +0000 |
191 | +++ lib/devscripts/ec2test/remote.py 2009-12-23 15:23:24 +0000 |
192 | @@ -30,7 +30,8 @@ |
193 | class BaseTestRunner: |
194 | |
195 | def __init__(self, email=None, pqm_message=None, public_branch=None, |
196 | - public_branch_revno=None, test_options=None): |
197 | + public_branch_revno=None, test_options=None, |
198 | + prefix=os.path.curdir): |
199 | self.email = email |
200 | self.pqm_message = pqm_message |
201 | self.public_branch = public_branch |
202 | @@ -42,7 +43,8 @@ |
203 | self.test_options = test_options |
204 | |
205 | # Configure paths. |
206 | - self.lp_dir = os.path.join(os.path.sep, 'var', 'launchpad') |
207 | + self.prefix_dir = os.path.abspath(prefix) |
208 | + self.lp_dir = os.path.join(self.prefix_dir, 'launchpad') |
209 | self.tmp_dir = os.path.join(self.lp_dir, 'tmp') |
210 | self.test_dir = os.path.join(self.lp_dir, 'test') |
211 | self.sourcecode_dir = os.path.join(self.test_dir, 'sourcecode') |
212 | @@ -52,8 +54,8 @@ |
213 | self.test_dir, |
214 | self.public_branch, |
215 | self.public_branch_revno, |
216 | - self.sourcecode_dir |
217 | - ) |
218 | + self.sourcecode_dir, |
219 | + os.path.join(self.prefix_dir, 'www')) |
220 | |
221 | # Daemonization options. |
222 | self.pid_filename = os.path.join(self.lp_dir, 'ec2test-remote.pid') |
223 | @@ -88,7 +90,41 @@ |
224 | """ |
225 | raise NotImplementedError |
226 | |
227 | - def test(self): |
228 | + def run_tests(self): |
229 | + """Run the tests the good old fashioned way. |
230 | + |
231 | + :return: A boolean indicating success. |
232 | + """ |
233 | + call = self.build_test_command() |
234 | + |
235 | + popen = subprocess.Popen( |
236 | + call, bufsize=-1, |
237 | + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
238 | + cwd=self.test_dir) |
239 | + |
240 | + self._gather_test_output( |
241 | + popen, self.logger.summary_file, self.logger.out_file) |
242 | + |
243 | + # Grab the testrunner exit status |
244 | + result = popen.wait() |
245 | + |
246 | + if self.pqm_message is not None: |
247 | + subject = self.pqm_message.get('Subject') |
248 | + if result: |
249 | + # failure |
250 | + self.logger.summary_file.write( |
251 | + '\n\n**NOT** submitted to PQM:\n%s\n' % (subject,)) |
252 | + else: |
253 | + # success |
254 | + conn = bzrlib.smtp_connection.SMTPConnection( |
255 | + bzrlib.config.GlobalConfig()) |
256 | + conn.send_email(self.pqm_message) |
257 | + self.logger.summary_file.write( |
258 | + '\n\nSUBMITTED TO PQM:\n%s\n' % (subject,)) |
259 | + |
260 | + return (result == 0) |
261 | + |
262 | + def run(self): |
263 | """Run the tests, log the results. |
264 | |
265 | Signals the ec2test process and cleans up the logs once all the tests |
266 | @@ -99,43 +135,17 @@ |
267 | # os.fork() may have tried to close them. |
268 | self.logger.prepare() |
269 | |
270 | - out_file = self.logger.out_file |
271 | + out_file = self.logger.out_file |
272 | summary_file = self.logger.summary_file |
273 | - config = bzrlib.config.GlobalConfig() |
274 | - |
275 | - call = self.build_test_command() |
276 | |
277 | try: |
278 | + success = False |
279 | try: |
280 | try: |
281 | - popen = subprocess.Popen( |
282 | - call, bufsize=-1, |
283 | - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
284 | - cwd=self.test_dir) |
285 | - |
286 | - self._gather_test_output(popen, summary_file, out_file) |
287 | - |
288 | - # Grab the testrunner exit status |
289 | - result = popen.wait() |
290 | - |
291 | - if self.pqm_message is not None: |
292 | - subject = self.pqm_message.get('Subject') |
293 | - if result: |
294 | - # failure |
295 | - summary_file.write( |
296 | - '\n\n**NOT** submitted to PQM:\n%s\n' % |
297 | - (subject,)) |
298 | - else: |
299 | - # success |
300 | - conn = bzrlib.smtp_connection.SMTPConnection( |
301 | - config) |
302 | - conn.send_email(self.pqm_message) |
303 | - summary_file.write('\n\nSUBMITTED TO PQM:\n%s\n' % |
304 | - (subject,)) |
305 | + success = self.run_tests() |
306 | except: |
307 | summary_file.write('\n\nERROR IN TESTRUNNER\n\n') |
308 | traceback.print_exc(file=summary_file) |
309 | - result = 1 |
310 | raise |
311 | finally: |
312 | # It probably isn't safe to close the log files ourselves, |
313 | @@ -143,10 +153,11 @@ |
314 | summary_file.close() |
315 | if self.email is not None: |
316 | subject = 'Test results: %s' % ( |
317 | - result and 'FAILURE' or 'SUCCESS') |
318 | + success and 'SUCCESS' or 'FAILURE') |
319 | summary_file = open(self.logger.summary_filename, 'r') |
320 | bzrlib.email_message.EmailMessage.send( |
321 | - config, self.email[0], self.email, |
322 | + bzrlib.config.GlobalConfig(), |
323 | + self.email[0], self.email, |
324 | subject, summary_file.read()) |
325 | summary_file.close() |
326 | finally: |
327 | @@ -187,8 +198,7 @@ |
328 | |
329 | def build_test_command(self): |
330 | """See BaseTestRunner.build_test_command().""" |
331 | - command = ['make', 'check', 'VERBOSITY=' + self.test_options] |
332 | - return command |
333 | + return ['make', 'check', 'VERBOSITY=' + self.test_options] |
334 | |
335 | # Used to filter lines in the summary log. See |
336 | # `BaseTestRunner.ignore_line()`. |
337 | @@ -211,23 +221,22 @@ |
338 | # display, and return the exit code. (See the xvfb-run man page for |
339 | # details.) |
340 | return [ |
341 | - 'xvfb-run', |
342 | - '-s', '-screen 0 1024x768x24', |
343 | - 'make', 'jscheck'] |
344 | + 'xvfb-run', '-s', '-screen 0 1024x768x24', |
345 | + 'make', 'jscheck', 'VERBOSITY=' + self.test_options] |
346 | |
347 | |
348 | class WebTestLogger: |
349 | """Logs test output to disk and a simple web page.""" |
350 | |
351 | def __init__(self, test_dir, public_branch, public_branch_revno, |
352 | - sourcecode_dir): |
353 | + sourcecode_dir, www_dir): |
354 | """ Class initialiser """ |
355 | self.test_dir = test_dir |
356 | self.public_branch = public_branch |
357 | self.public_branch_revno = public_branch_revno |
358 | self.sourcecode_dir = sourcecode_dir |
359 | + self.www_dir = www_dir |
360 | |
361 | - self.www_dir = os.path.join(os.path.sep, 'var', 'www') |
362 | self.out_filename = os.path.join(self.www_dir, 'current_test.log') |
363 | self.summary_filename = os.path.join(self.www_dir, 'summary.log') |
364 | self.index_filename = os.path.join(self.www_dir, 'index.html') |
365 | @@ -265,9 +274,9 @@ |
366 | """ |
367 | self.open_logs() |
368 | |
369 | - out_file = self.out_file |
370 | + out_file = self.out_file |
371 | summary_file = self.summary_file |
372 | - index_file = self.index_file |
373 | + index_file = self.index_file |
374 | |
375 | def write(msg): |
376 | msg += '\n' |
377 | @@ -432,11 +441,16 @@ |
378 | parser.add_option( |
379 | '--jscheck', dest='jscheck', default=False, action='store_true', |
380 | help=('Run the JavaScript integration test suite.')) |
381 | + parser.add_option( |
382 | + '--prefix', dest='prefix', default=os.path.join(os.path.sep, 'var'), |
383 | + help=('The directory in which to start the runner, ' |
384 | + '%default by default.')) |
385 | |
386 | options, args = parser.parse_args() |
387 | |
388 | if options.debug: |
389 | - import pdb; pdb.set_trace() |
390 | + import pdb |
391 | + pdb.set_trace() |
392 | if options.pqm_message is not None: |
393 | pqm_message = pickle.loads( |
394 | options.pqm_message.decode('string-escape').decode('base64')) |
395 | @@ -450,20 +464,19 @@ |
396 | runner_type = TestOnMergeRunner |
397 | |
398 | runner = runner_type( |
399 | - options.email, |
400 | - pqm_message, |
401 | - options.public_branch, |
402 | - options.public_branch_revno, |
403 | - ' '.join(args) |
404 | - ) |
405 | + email=options.email, |
406 | + pqm_message=pqm_message, |
407 | + public_branch=options.public_branch, |
408 | + public_branch_revno=options.public_branch_revno, |
409 | + test_options=' '.join(args), |
410 | + prefix=options.prefix) |
411 | |
412 | try: |
413 | try: |
414 | if options.daemon: |
415 | print 'Starting testrunner daemon...' |
416 | runner.daemonize() |
417 | - |
418 | - runner.test() |
419 | + runner.run() |
420 | except: |
421 | # Handle exceptions thrown by the test() or daemonize() methods. |
422 | if options.email: |
423 | |
424 | === added file 'lib/devscripts/ec2test/remotenode.py' |
425 | --- lib/devscripts/ec2test/remotenode.py 1970-01-01 00:00:00 +0000 |
426 | +++ lib/devscripts/ec2test/remotenode.py 2009-12-23 15:23:24 +0000 |
427 | @@ -0,0 +1,539 @@ |
428 | +# Copyright 2009 Canonical Ltd. This software is licensed under the |
429 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
430 | + |
431 | +__metatype__ = type |
432 | + |
433 | +import os |
434 | +import tempfile |
435 | +import time |
436 | + |
437 | +from itertools import count, takewhile |
438 | + |
439 | +import bzrlib.config |
440 | +import bzrlib.email_message |
441 | +import bzrlib.smtp_connection |
442 | +import subunit |
443 | + |
444 | +from twisted.application import service, internet |
445 | +from twisted.cred import portal, checkers |
446 | +from twisted.conch import manhole, manhole_ssh |
447 | +from twisted.internet import defer |
448 | +from twisted.internet import error |
449 | +from twisted.internet import reactor |
450 | +from twisted.internet.protocol import ( |
451 | + ClientCreator, ProcessProtocol, Protocol, ServerFactory, connectionDone) |
452 | +from twisted.internet.threads import deferToThread |
453 | +from twisted.internet.utils import getProcessOutputAndValue |
454 | +from twisted.protocols.basic import LineOnlyReceiver |
455 | +from twisted.spread import pb |
456 | +from twisted.trial.reporter import TimingTextReporter |
457 | + |
458 | + |
459 | +# The port that nodes in the cluster listen on for PB commands. |
460 | +TEST_NODE_PORT = 8789 |
461 | + |
462 | +# The port that the supervisor node listens on to receive test result data |
463 | +# from the workers. |
464 | +# XXX: Make this a property of the factory? |
465 | +TEST_STREAMING_PORT = 8790 |
466 | + |
467 | +# SSH port for inspecting the running node. |
468 | +NODE_INSPECTION_PORT = 8791 |
469 | + |
470 | +# For throwing stuff away. |
471 | +DEVNULL = open(os.devnull, 'rwb+') |
472 | + |
473 | + |
474 | +def calculate_parcel_size(total, num_parcels): |
475 | + """Calculate the number of elements that should go into a parcel. |
476 | + |
477 | + calculate_parcel_size(total, num_parcels) * num_parcels >= total |
478 | + """ |
479 | + parcel_size = total // num_parcels |
480 | + while parcel_size * num_parcels < total: |
481 | + parcel_size += 1 |
482 | + return parcel_size |
483 | + |
484 | + |
485 | +def save_list(iterable, delimiter='\n'): |
486 | + """Save an iterable of strings to a file, newline-separated.""" |
487 | + fd, filename = tempfile.mkstemp() |
488 | + f = os.fdopen(fd, 'wb') |
489 | + for line in iterable: |
490 | + f.write(line + delimiter) |
491 | + f.close() |
492 | + return filename |
493 | + |
494 | + |
495 | +class CombiningSubunitFactory(ServerFactory): |
496 | + """A factory for listening to subunit streams and collating results.""" |
497 | + |
498 | + def __init__(self, result, logger): |
499 | + self._result = result |
500 | + self._logger = logger |
501 | + |
502 | + def buildProtocol(self, addr): |
503 | + return SubunitProtocol(self._result, self._logger) |
504 | + |
505 | + |
506 | +class SubunitProtocolLogger: |
507 | + """Organize logging for the SubunitProtocol.""" |
508 | + |
509 | + def __init__(self, log_dir): |
510 | + self._log_dir = log_dir |
511 | + self._log_nums = count(1) |
512 | + self._logs = [] |
513 | + |
514 | + def open_log(self): |
515 | + log_index = '%02d' % self._log_nums.next() |
516 | + log_filename = os.path.join( |
517 | + self._log_dir, 'subunit%s.log' % log_index) |
518 | + log = open(log_filename, 'wb', buffering=1) |
519 | + self._logs.append(log_filename) |
520 | + return log |
521 | + |
522 | + def close_log(self, log): |
523 | + log.close() |
524 | + |
525 | + @property |
526 | + def logs(self): |
527 | + return iter(self._logs) |
528 | + |
529 | + |
530 | +class SubunitProtocol(LineOnlyReceiver): |
531 | + """An implementation of subunit in Twisted, yay!""" |
532 | + |
533 | + # subunit separates lines with \n. |
534 | + delimiter = '\n' |
535 | + |
536 | + def __init__(self, result, logger): |
537 | + self._result = result |
538 | + self._logger = logger |
539 | + |
540 | + def connectionMade(self): |
541 | + self._subunit_log = self._logger.open_log() |
542 | + self._subunit = subunit.TestProtocolServer(self._result, DEVNULL) |
543 | + |
544 | + def lineReceived(self, line): |
545 | + # subunit.TestProtocolServer expects the line-ending to be |
546 | + # present. |
547 | + line = line + self.delimiter |
548 | + self._subunit_log.write(line) |
549 | + self._subunit.lineReceived(line) |
550 | + |
551 | + def connectionLost(self, reason=connectionDone): |
552 | + self._subunit.lostConnection() |
553 | + del self._subunit |
554 | + self._logger.close_log(self._subunit_log) |
555 | + del self._subunit_log |
556 | + |
557 | + |
558 | +class TestProcessProtocol(ProcessProtocol): |
559 | + |
560 | + # XXX: TestProcessProtocol isn't a great name for this. It's really an |
561 | + # adapter from a line receiver to a process protocol, plus a deferred that |
562 | + # fires on process end. Doesn't have anything to do with tests really. |
563 | + |
564 | + # XXX: Timeouts! |
565 | + |
566 | + def __init__(self, deferred, remote_transport): |
567 | + self._fire_when_done = deferred |
568 | + self._remote_transport = remote_transport |
569 | + |
570 | + def connectionMade(self): |
571 | + print "Test process started." |
572 | + self._checkpoint = time.time() |
573 | + |
574 | + def outReceived(self, data): |
575 | + self._remote_transport.write(data) |
576 | + # Say something periodically to prevent watchdogs from killing us. |
577 | + if time.time() - self._checkpoint > 60: |
578 | + print "Test process running." |
579 | + self._checkpoint = time.time() |
580 | + |
581 | + def processEnded(self, reason): |
582 | + if self._fire_when_done is not None: |
583 | + d, self._fire_when_done = self._fire_when_done, None |
584 | + if reason.check(error.ProcessDone): |
585 | + d.callback(None) |
586 | + else: |
587 | + d.callback(reason) |
588 | + self._remote_transport.loseConnection() |
589 | + |
590 | + |
591 | +class AccountingTestResult(TimingTextReporter): |
592 | + |
593 | + def __init__(self, callback, stream): |
594 | + self._callback = callback |
595 | + super(AccountingTestResult, self).__init__(stream) |
596 | + |
597 | + def stopTest(self, test): |
598 | + self._callback(test) |
599 | + super(AccountingTestResult, self).stopTest(test) |
600 | + |
601 | + |
602 | +def connect_to_node(address, port=TEST_NODE_PORT, reactor=reactor): |
603 | + """Get the PB node at 'address'.""" |
604 | + factory = pb.PBClientFactory() |
605 | + reactor.connectTCP(address, port, factory) |
606 | + return factory.getRootObject() |
607 | + |
608 | + |
609 | +def connect_for_writing(reactor, host, port): |
610 | + d = ClientCreator(reactor, Protocol).connectTCP(host, port) |
611 | + return d.addCallback(lambda protocol: protocol.transport) |
612 | + |
613 | + |
614 | +def run_process(executable, args=(), path=None, reactor=None): |
615 | + """Run a process and raise an error if it fails.""" |
616 | + print 'Running', executable, args |
617 | + |
618 | + d = getProcessOutputAndValue( |
619 | + executable, args=args, env=os.environ, |
620 | + path=path, reactor=reactor) |
621 | + |
622 | + def check_result((out, err, code)): |
623 | + print 'Done' |
624 | + if code != 0: |
625 | + # XXX: Is this the best exception to raise? |
626 | + raise RuntimeError( |
627 | + "%s failed with code %d" % (executable, code), out, err) |
628 | + return out |
629 | + |
630 | + return d.addCallback(check_result) |
631 | + |
632 | + |
633 | +class TreeBuilder: |
634 | + """Builds the tree once, guarded by a lock.""" |
635 | + |
636 | + # XXX: This could easily be made more general, i.e. ExecuteOnce. In fact, |
637 | + # there probably is already something similar in Twisted. |
638 | + |
639 | + def __init__(self, build_dir): |
640 | + self._lock = defer.DeferredLock() |
641 | + self._build_dir = build_dir |
642 | + self._build_result = None |
643 | + |
644 | + def _maybe_build(self): |
645 | + if self._build_result is None: |
646 | + build = run_process('/usr/bin/make', path=self._build_dir) |
647 | + def built(result): |
648 | + self._build_result = result |
649 | + return build.addBoth(built) |
650 | + else: |
651 | + return self._build_result |
652 | + |
653 | + def build(self): |
654 | + return self._lock.run(self._maybe_build) |
655 | + |
656 | + |
657 | +class CloseableDeferredQueue(defer.DeferredQueue): |
658 | + """A `DeferredQueue` that can be closed. |
659 | + |
660 | + Once it has been closed by `queue.close()`, and once there are no |
661 | + pending items in the queue, `queue.get()` will always return a |
662 | + Deferred that has already been fired with None. |
663 | + """ |
664 | + |
665 | + def __init__(self, size=None, backlog=None): |
666 | + super(CloseableDeferredQueue, self).__init__(size, backlog) |
667 | + self.closed = False |
668 | + |
669 | + def close(self): |
670 | + """Close this queue, and fire None to any waiting processes.""" |
671 | + self.closed = True |
672 | + while self.waiting: |
673 | + self.waiting.pop().callback(None) |
674 | + |
675 | + def get(self): |
676 | + d = super(CloseableDeferredQueue, self).get() |
677 | + if self.closed and not d.called: |
678 | + d.callback(None) |
679 | + return d |
680 | + |
681 | + |
682 | +class TestNode(pb.Root): |
683 | + """A node of a cluster that runs the Launchpad test suite.""" |
684 | + |
685 | + def __init__(self, service, branch_dir, log_dir): |
686 | + self._service = service |
687 | + self._branch_dir = branch_dir |
688 | + self._log_dir = log_dir |
689 | + self._is_supervisor = False |
690 | + self._tree_builder = TreeBuilder(self._branch_dir) |
691 | + |
692 | + def _run_process(self, executable, args=(), reactor=None): |
693 | + """Run a process and raise an error if it fails. |
694 | + |
695 | + Always runs the process in the `branch_dir`. |
696 | + """ |
697 | + return run_process(executable, args, self._branch_dir, reactor) |
698 | + |
699 | + def _find_tests(self): |
700 | + """Return a list of tests that make up the Launchpad test suite. |
701 | + |
702 | + :return: A Deferred that fires with a list of test ids. |
703 | + """ |
704 | + d = self._tree_builder.build() |
705 | + test_process = os.path.join(self._branch_dir, 'bin', 'test') |
706 | + d.addCallback( |
707 | + lambda ignored: self._run_process(test_process, ['--list'])) |
708 | + # ./bin/test --list dumps out all of the test ids with newline |
709 | + # separation, but it adds a Total: at the end. There can also |
710 | + # be import fascist warnings after that. Drop the total and |
711 | + # everything after it so we don't confuse our callers. |
712 | + p_testline = lambda line: not line.startswith('Total:') |
713 | + return d.addCallback( |
714 | + lambda output: list( |
715 | + takewhile(p_testline, output.splitlines()))) |
716 | + |
717 | + def _add_tests(self, tests, num_parcels): |
718 | + """Keep a record of the tests to be run. |
719 | + |
720 | + The set of tests will be used to keep track of what tests have |
721 | + not yet had results. Also, using a set de-duplicates the list. |
722 | + """ |
723 | + tests = set(tests) |
724 | + tests_new = tests - self._tests_all |
725 | + # Sort the tests to reduce layer setup/teardown churn. The |
726 | + # test suite also breaks a lot when run out of order. |
727 | + tests_new_sorted = sorted(tests_new) |
728 | + # Update state. |
729 | + self._tests_all.update(tests_new) |
730 | + self._tests_to_run.update(tests_new) |
731 | + # Split up new tests into parcels. |
732 | + parcel_size = calculate_parcel_size(len(tests), num_parcels) |
733 | + for index in xrange(0, len(tests_new_sorted), parcel_size): |
734 | + self._test_parcels.put( |
735 | + tests_new_sorted[index:index+parcel_size]) |
736 | + # Tell everyone. |
737 | + print ("%s new tests, split into %d parcels, each of at " |
738 | + "most %d tests" % (len(tests_new), num_parcels, parcel_size)) |
739 | + |
740 | + def _calculate_work(self, num_parcels): |
741 | + print 'Calculating work' |
742 | + d = self._find_tests() |
743 | + d.addCallback(self._add_tests, num_parcels) |
744 | + return d |
745 | + |
746 | + def remote_become_supervisor(self, num_parcels): |
747 | + """Tell this node that it is a supervisor. |
748 | + |
749 | + Note that supervisor nodes can still be workers, i.e. they can still |
750 | + run tests. |
751 | + |
752 | + :param num_parcels: The number of parcels in which to split the work |
753 | + up into. For optimum sharing - assuming each parcel takes a |
754 | + similar amount of time to process - this should be the same as the |
755 | + number of workers, or a multiple thereof. |
756 | + """ |
757 | + if self._is_supervisor: |
758 | + return |
759 | + self._is_supervisor = True |
760 | + print 'Becoming supervisor' |
761 | + # All tests. |
762 | + self._tests_all = set() |
763 | + # Tests that have not yet been run or attempted. |
764 | + self._tests_to_run = set() |
765 | + # Tests that have been attempted but for which there is no result. |
766 | + self._tests_not_run = set() |
767 | + # Parcels of tests to run on one go, for efficiency. |
768 | + self._test_parcels = CloseableDeferredQueue() |
769 | + # The result object. |
770 | + self._test_result_log = os.path.join(self._log_dir, 'result.log') |
771 | + self._test_result = AccountingTestResult( |
772 | + callback=lambda test: self._tests_to_run.discard(test.id()), |
773 | + stream=open(self._test_result_log, 'wb', buffering=1)) |
774 | + # Listen for subunit, pushing the results into the test |
775 | + # result, and logging everything as we go along. |
776 | + self._test_logger = SubunitProtocolLogger(self._log_dir) |
777 | + result_service = internet.TCPServer( |
778 | + TEST_STREAMING_PORT, CombiningSubunitFactory( |
779 | + self._test_result, self._test_logger)) |
780 | + result_service.setServiceParent(self._service) |
781 | + # At this point, we have been told how many parcels of work there |
782 | + # should be, but we do not necessarily have any workers. Calculate |
783 | + # each parcel of work. Work will be scheduled to workers as they ask |
784 | + # for it. |
785 | + self._calculate_work(num_parcels) |
786 | + |
787 | + def remote_get_work(self): |
788 | + """Called by workers when they are ready for work.""" |
789 | + assert self._is_supervisor |
790 | + return self._test_parcels.get() |
791 | + |
792 | + def remote_done_work(self, parcel): |
793 | + """Called by workers to declare that they have successfully |
794 | + completed a parcel of work. |
795 | + |
796 | + This is used to check that we have at least attempted to run |
797 | + every test, and also to trigger shutdown of the node. |
798 | + |
799 | + Returns another parcel of work if there is any. |
800 | + """ |
801 | + assert self._is_supervisor |
802 | + self._tests_not_run.update( |
803 | + self._tests_to_run.intersection(parcel)) |
804 | + self._tests_to_run.difference_update(self._tests_not_run) |
805 | + if len(self._tests_to_run) == 0: |
806 | + self._done_as_supervisor() |
807 | + print ("Work done; parcel had %d tests; " |
808 | + "%d tests remaining; %d cannot be run." % ( |
809 | + len(parcel), len(self._tests_to_run), |
810 | + len(self._tests_not_run))) |
811 | + return self._test_parcels.get() |
812 | + |
813 | + def _do_work(self, tests): |
814 | + # Connect to the reporting server. |
815 | + connected = connect_for_writing( |
816 | + reactor, self._supervisor_addr, TEST_STREAMING_PORT) |
817 | + def run_tests(transport): |
818 | + tests_finished = defer.Deferred() |
819 | + protocol = TestProcessProtocol(tests_finished, transport) |
820 | + command = ( |
821 | + '/usr/bin/make', '-o', 'clean', '-o', 'build', 'check', |
822 | + 'VERBOSITY=--subunit --load-list %s' % save_list(tests)) |
823 | + print "%s$ %s" % (self._branch_dir, command) |
824 | + reactor.spawnProcess( |
825 | + protocol, command[0], command, |
826 | + env=os.environ, path=self._branch_dir) |
827 | + return tests_finished |
828 | + connected.addCallback(run_tests) |
829 | + connected.addErrback(lambda failure: failure.printTraceback()) |
830 | + connected.addBoth( |
831 | + lambda _: self._supervisor.callRemote('done_work', tests)) |
832 | + connected.addCallback(self._maybe_do_work) |
833 | + |
834 | + def _maybe_do_work(self, tests): |
835 | + if tests is None: |
836 | + return self._done_as_worker() |
837 | + else: |
838 | + return self._do_work(tests) |
839 | + |
840 | + def remote_got_supervisor(self, supervisor_addr): |
841 | + """Called when the supervisor node is up and running.""" |
842 | + print 'Got supervisor', supervisor_addr |
843 | + d = connect_to_node(supervisor_addr) |
844 | + def got_supervisor(supervisor): |
845 | + self._supervisor_addr = supervisor_addr |
846 | + self._supervisor = supervisor |
847 | + d.addCallback(got_supervisor) |
848 | + d.addCallback(lambda _: self._tree_builder.build()) |
849 | + d.addCallback(lambda _: self._supervisor.callRemote('get_work')) |
850 | + d.addCallback(self._maybe_do_work) |
851 | + |
852 | + def _send_summary_email(self): |
853 | + if self._test_result.wasSuccessful(): |
854 | + if len(self._tests_to_run) > 0 or len(self._tests_not_run) > 0: |
855 | + subject = 'Test results: INCONCLUSIVE' |
856 | + else: |
857 | + subject = 'Test results: SUCCESS' |
858 | + else: |
859 | + subject = 'Tests results: FAILURE' |
860 | + |
861 | + body = [] |
862 | + |
863 | + # Warn about tests that have not been run at all. |
864 | + if len(self._tests_to_run) > 0: |
865 | + body.append( |
866 | + "Warning: %d tests were not attempted. See untested.txt " |
867 | + "for the full list." % len(self._tests_to_run)) |
868 | + |
869 | + # Warn about tests that could not be run. |
870 | + if len(self._tests_not_run) > 0: |
871 | + body.append( |
872 | + "Warning: %d tests could not be run. See unrunnable.txt " |
873 | + "for the full list." % len(self._tests_not_run)) |
874 | + |
875 | + # Pad a bit. |
876 | + body.extend(['', '--', '']) |
877 | + |
878 | + # XXX: Change the email address :) |
879 | + message = bzrlib.email_message.EmailMessage( |
880 | + "gavin.panella@canonical.com", |
881 | + ["gavin.panella@canonical.com"], |
882 | + subject, "\n".join(body)) |
883 | + |
884 | + # Attach the results log. |
885 | + message.add_inline_attachment( |
886 | + open(self._test_result_log, 'rb').read(), |
887 | + os.path.basename(self._test_result_log)) |
888 | + |
889 | + # Attach a list of tests that were not run. |
890 | + if len(self._tests_to_run) > 0: |
891 | + message.add_inline_attachment( |
892 | + "\n".join(sorted(self._tests_to_run)), "untested.txt") |
893 | + |
894 | + # Attach a list of tests that could not be run. |
895 | + if len(self._tests_not_run) > 0: |
896 | + message.add_inline_attachment( |
897 | + "\n".join(sorted(self._tests_not_run)), "unrunnable.txt") |
898 | + |
899 | + # Attach the subunit logs. |
900 | + for log_filename in self._test_logger.logs: |
901 | + message.add_inline_attachment( |
902 | + open(log_filename, 'rb').read(), |
903 | + os.path.basename(log_filename)) |
904 | + |
905 | + def send(): |
906 | + config = bzrlib.config.GlobalConfig() |
907 | + connection = bzrlib.smtp_connection.SMTPConnection(config) |
908 | + connection.send_email(message) |
909 | + return deferToThread(send) |
910 | + |
911 | + def _done_as_supervisor(self): |
912 | + d = self._send_summary_email() |
913 | + d.addErrback(lambda failure: failure.printTraceback()) |
914 | + d.addBoth(lambda _: self._shutdown()) |
915 | + |
916 | + def _done_as_worker(self): |
917 | + if not self._is_supervisor: |
918 | + self._shutdown() |
919 | + |
920 | + def _shutdown(self): |
921 | + # XXX: There's probably a better way of doing this, but this |
922 | + # seems to work reliably for now. |
923 | + return reactor.callLater(2, reactor.stop) |
924 | + |
925 | + |
926 | +def make_root(service, node_dir): |
927 | + node_dir = os.path.abspath(node_dir) |
928 | + branch_dir = os.path.join(node_dir, 'launchpad', 'test') |
929 | + log_dir = os.path.join(node_dir, 'www') |
930 | + # XXX: Might be better for the node service to actually implement |
931 | + # IService. |
932 | + return TestNode(service, branch_dir, log_dir) |
933 | + |
934 | + |
935 | +def make_node_service(node): |
936 | + return internet.TCPServer(TEST_NODE_PORT, pb.PBServerFactory(node)) |
937 | + |
938 | + |
939 | +def make_inspection_service(**namespace): |
940 | + def getManholeFactory(ns): |
941 | + realm = manhole_ssh.TerminalRealm() |
942 | + def getManhole(_): |
943 | + return manhole.ColoredManhole(ns) |
944 | + realm.chainedProtocolFactory.protocolFactory = getManhole |
945 | + p = portal.Portal(realm) |
946 | + p.registerChecker( |
947 | + checkers.InMemoryUsernamePasswordDatabaseDontUse(admin="admin")) |
948 | + return manhole_ssh.ConchFactory(p) |
949 | + return internet.TCPServer( |
950 | + NODE_INSPECTION_PORT, getManholeFactory(namespace)) |
951 | + |
952 | + |
953 | +# The Launchpad Test node application. |
954 | +application = service.Application("Launchpad Test Node") |
955 | + |
956 | +# The root, controller. It's a bit of a muddle right now. |
957 | +root = make_root(application, os.environ.get('NODE_DIR', '/var')) |
958 | + |
959 | +# The PB node service. |
960 | +node_service = make_node_service(root) |
961 | +node_service.setServiceParent(application) |
962 | + |
963 | +# The inspection service. |
964 | +inspection_service = make_inspection_service( |
965 | + application=application, root=root) |
966 | +inspection_service.setServiceParent(application) |
967 | |
968 | === added file 'lib/devscripts/ec2test/remotenodekiller.py' |
969 | --- lib/devscripts/ec2test/remotenodekiller.py 1970-01-01 00:00:00 +0000 |
970 | +++ lib/devscripts/ec2test/remotenodekiller.py 2009-12-23 15:23:24 +0000 |
971 | @@ -0,0 +1,146 @@ |
972 | +#!/usr/bin/python |
973 | + |
974 | +import optparse |
975 | +import sys |
976 | +import traceback |
977 | + |
978 | +from glob import glob |
979 | +from os import kill |
980 | +from os.path import exists, getmtime |
981 | +from subprocess import call |
982 | +from time import sleep, time |
983 | + |
984 | + |
985 | +def read_pid_from_file(filename): |
986 | + """Read the pid from a file. |
987 | + |
988 | + IO errors are allowed to propagate, but if the first line of the |
989 | + file cannot converted to an int then None is returned, thus |
990 | + permitting odd looking pid files. |
991 | + """ |
992 | + fd = open(filename, 'rb') |
993 | + try: |
994 | + pid = fd.readline().strip() |
995 | + finally: |
996 | + fd.close() |
997 | + try: |
998 | + return int(pid) |
999 | + except ValueError: |
1000 | + return None |
1001 | + |
1002 | + |
1003 | +def watch(wait, pids, pid_files, log_file_patterns, expiry_age): |
1004 | + """Watch for changes in the given resources. |
1005 | + |
1006 | + This checks: |
1007 | + |
1008 | + * that all the specified processes are running, |
1009 | + |
1010 | + * that all the specified pid files exist, |
1011 | + |
1012 | + * that the process for which the pid file exists is running, if |
1013 | + the first line of the pid file is recognised as a pid (i.e. an |
1014 | + integer), |
1015 | + |
1016 | + * that at least one log file exists for each given pattern, |
1017 | + |
1018 | + * that at least one log file for each given pattern has been |
1019 | + written to recently. |
1020 | + |
1021 | + """ |
1022 | + try: |
1023 | + while True: |
1024 | + # Don't spin round. |
1025 | + sleep(wait) |
1026 | + # Check that all pids are receiving signals. |
1027 | + for pid in pids: |
1028 | + kill(pid, 0) |
1029 | + # If any pid file has gone, return. |
1030 | + for filename in pid_files: |
1031 | + if not exists(filename): |
1032 | + return |
1033 | + # If a pid can be obtained from the file, check it's |
1034 | + # running. |
1035 | + pid = read_pid_from_file(filename) |
1036 | + if pid is not None: |
1037 | + kill(pid, 0) |
1038 | + # Check log files. |
1039 | + expired = time() - expiry_age |
1040 | + for log_file_pattern in log_file_patterns: |
1041 | + log_files = glob(log_file_pattern) |
1042 | + # If no log files exist for this pattern, return. |
1043 | + if len(log_files) == 0: |
1044 | + return |
1045 | + # If none of the log files have been written to |
1046 | + # recently, return. |
1047 | + log_file_max_mtime = max( |
1048 | + getmtime(log_file) for log_file in log_files) |
1049 | + if log_file_max_mtime < expired: |
1050 | + return |
1051 | + except KeyboardInterrupt: |
1052 | + raise |
1053 | + except: |
1054 | + traceback.print_exc() |
1055 | + |
1056 | + |
1057 | +def main(args): |
1058 | + parser = optparse.OptionParser( |
1059 | + usage="%prog [options] command", description=( |
1060 | + "Run a arbitrary command when it appears that a process has " |
1061 | + "exited or stopped running.")) |
1062 | + parser.add_option( |
1063 | + '--pid', action='append', dest='pids', type=int, help=( |
1064 | + "A pid to watch. If the process stops running, " |
1065 | + "the command is run.")) |
1066 | + parser.add_option( |
1067 | + '--pid-file', action='append', dest='pid_files', help=( |
1068 | + "A pid file to watch. If any pid file disappears, " |
1069 | + "the command is run.")) |
1070 | + parser.add_option( |
1071 | + '--log-file-pattern', action='append', dest='log_file_patterns', |
1072 | + help=( |
1073 | + "A log (or other) file to watch. If any log file disappears, " |
1074 | + "the command is run. If any log file gets old (see " |
1075 | + "--expiry-age), the command is run. Values specified here are " |
1076 | + "subject to glob expansion.")) |
1077 | + parser.add_option( |
1078 | + '--expiry-age', action='store', dest='expiry_age', help=( |
1079 | + "The number of seconds since a log file was modified before " |
1080 | + "it is considered to be expired. Defaults to %default seconds.")) |
1081 | + parser.add_option( |
1082 | + '--wait', action='store', type=int, dest='wait', help=( |
1083 | + "How long to wait between each check of the pids and logs. " |
1084 | + "Defaults to %default seconds.")) |
1085 | + parser.set_defaults(expiry_age=600, wait=10) |
1086 | + |
1087 | + options, command = parser.parse_args(args) |
1088 | + |
1089 | + if options.pids is None: |
1090 | + options.pids = [] |
1091 | + if options.pid_files is None: |
1092 | + options.pid_files = [] |
1093 | + if options.log_file_patterns is None: |
1094 | + options.log_file_patterns = [] |
1095 | + |
1096 | + if (len(options.pids) + len(options.pid_files) + |
1097 | + len(options.log_file_patterns)) == 0: |
1098 | + parser.error("Nothing to watch.") |
1099 | + if len(command) == 0: |
1100 | + parser.error("No command to run.") |
1101 | + if options.expiry_age < 1: |
1102 | + parser.error("The expiry age must be at least 1 second.") |
1103 | + if options.wait < 1: |
1104 | + parser.error("The wait must be at least 1 second.") |
1105 | + |
1106 | + watch( |
1107 | + options.wait, options.pids, options.pid_files, |
1108 | + options.log_file_patterns, options.expiry_age) |
1109 | + |
1110 | + return call(command) |
1111 | + |
1112 | + |
1113 | +if __name__ == '__main__': |
1114 | + try: |
1115 | + sys.exit(main(sys.argv[1:])) |
1116 | + except KeyboardInterrupt: |
1117 | + sys.exit(1) |
1118 | |
1119 | === modified file 'lib/devscripts/ec2test/testrunner.py' |
1120 | --- lib/devscripts/ec2test/testrunner.py 2009-12-01 22:53:47 +0000 |
1121 | +++ lib/devscripts/ec2test/testrunner.py 2009-12-23 15:23:24 +0000 |
1122 | @@ -285,23 +285,10 @@ |
1123 | self.email = email |
1124 | |
1125 | # Email configuration. |
1126 | - if email is not None or pqm_message is not None: |
1127 | - self._smtp_server = config.get_user_option('smtp_server') |
1128 | - if self._smtp_server is None or self._smtp_server == 'localhost': |
1129 | - raise ValueError( |
1130 | - 'To send email, a remotely accessible smtp_server (and ' |
1131 | - 'smtp_username and smtp_password, if necessary) must be ' |
1132 | - 'configured in bzr. See the SMTP server information ' |
1133 | - 'here: https://wiki.canonical.com/EmailSetup .') |
1134 | - self._smtp_username = config.get_user_option('smtp_username') |
1135 | - self._smtp_password = config.get_user_option('smtp_password') |
1136 | - from_email = config.username() |
1137 | - if not from_email: |
1138 | - # XXX: JonathanLange 2009-10-04: Is this strictly true? I |
1139 | - # can't actually see where this is used. |
1140 | - raise ValueError( |
1141 | - 'To send email, your bzr email address must be set ' |
1142 | - '(use ``bzr whoami``).') |
1143 | + self._smtp_server = config.get_user_option('smtp_server') |
1144 | + self._smtp_username = config.get_user_option('smtp_username') |
1145 | + self._smtp_password = config.get_user_option('smtp_password') |
1146 | + self._from_email = config.username() |
1147 | |
1148 | self._instance = instance |
1149 | |
1150 | @@ -314,29 +301,40 @@ |
1151 | |
1152 | def configure_system(self): |
1153 | user_connection = self._instance.connect() |
1154 | - as_user = user_connection.perform |
1155 | - # Set up bazaar.conf with smtp information if necessary |
1156 | - if self.email or self.message: |
1157 | - as_user('mkdir .bazaar') |
1158 | - bazaar_conf_file = user_connection.sftp.open( |
1159 | - ".bazaar/bazaar.conf", 'w') |
1160 | - bazaar_conf_file.write( |
1161 | - 'smtp_server = %s\n' % (self._smtp_server,)) |
1162 | - if self._smtp_username: |
1163 | - bazaar_conf_file.write( |
1164 | - 'smtp_username = %s\n' % (self._smtp_username,)) |
1165 | - if self._smtp_password: |
1166 | - bazaar_conf_file.write( |
1167 | - 'smtp_password = %s\n' % (self._smtp_password,)) |
1168 | - bazaar_conf_file.close() |
1169 | - # Copy remote ec2-remote over |
1170 | - self.log('Copying ec2test-remote.py to remote machine.\n') |
1171 | - user_connection.sftp.put( |
1172 | - os.path.join(os.path.dirname(os.path.realpath(__file__)), |
1173 | - 'ec2test-remote.py'), |
1174 | - '/var/launchpad/ec2test-remote.py') |
1175 | - # Set up launchpad login and email |
1176 | - as_user('bzr launchpad-login %s' % (self._launchpad_login,)) |
1177 | + # Set up launchpad login. |
1178 | + user_connection.perform( |
1179 | + 'bzr launchpad-login %s' % (self._launchpad_login,)) |
1180 | + # Done. |
1181 | + user_connection.close() |
1182 | + |
1183 | + def configure_email(self): |
1184 | + # Sanity checks. |
1185 | + if self._smtp_server is None or self._smtp_server == 'localhost': |
1186 | + raise ValueError( |
1187 | + 'To send email, a remotely accessible smtp_server (and ' |
1188 | + 'smtp_username and smtp_password, if necessary) must be ' |
1189 | + 'configured in bzr. See the SMTP server information ' |
1190 | + 'here: https://wiki.canonical.com/EmailSetup .') |
1191 | + if not self._from_email: |
1192 | + # XXX: JonathanLange 2009-10-04: Is this strictly true? I |
1193 | + # can't actually see where this is used. |
1194 | + raise ValueError( |
1195 | + 'To send email, your bzr email address must be set ' |
1196 | + '(use ``bzr whoami``).') |
1197 | + # Set up bazaar.conf with smtp information. |
1198 | + user_connection = self._instance.connect() |
1199 | + user_connection.perform('mkdir -p .bazaar') |
1200 | + bazaar_conf_file = user_connection.sftp.open( |
1201 | + ".bazaar/bazaar.conf", 'w') |
1202 | + bazaar_conf_file.write( |
1203 | + 'smtp_server = %s\n' % (self._smtp_server,)) |
1204 | + if self._smtp_username: |
1205 | + bazaar_conf_file.write( |
1206 | + 'smtp_username = %s\n' % (self._smtp_username,)) |
1207 | + if self._smtp_password: |
1208 | + bazaar_conf_file.write( |
1209 | + 'smtp_password = %s\n' % (self._smtp_password,)) |
1210 | + bazaar_conf_file.close() |
1211 | user_connection.close() |
1212 | |
1213 | def prepare_tests(self): |
1214 | @@ -434,9 +432,19 @@ |
1215 | |
1216 | def run_tests(self): |
1217 | self.configure_system() |
1218 | + if self.email is not None or self.message is not None: |
1219 | + self.configure_email() |
1220 | self.prepare_tests() |
1221 | user_connection = self._instance.connect() |
1222 | |
1223 | + # Copy remote ec2-remote over. |
1224 | + self.log( |
1225 | + 'Copying remote.py to ec2test-remote.py on remote machine.\n') |
1226 | + user_connection.sftp.put( |
1227 | + os.path.join( |
1228 | + os.path.dirname(os.path.realpath(__file__)), 'remote.py'), |
1229 | + '/var/launchpad/ec2test-remote.py') |
1230 | + |
1231 | # Make sure we activate the failsafe --shutdown feature. This will |
1232 | # make the server shut itself down after the test run completes, or |
1233 | # if the test harness suffers a critical failure. |
1234 | @@ -523,3 +531,43 @@ |
1235 | # ec2test-remote.py wants the extra options to be after a double- |
1236 | # dash. |
1237 | return ('--', self.test_options) |
1238 | + |
1239 | + def run_node(self): |
1240 | + """Start the test node.""" |
1241 | + self.configure_system() |
1242 | + self.configure_email() |
1243 | + self.prepare_tests() |
1244 | + user_connection = self._instance.connect() |
1245 | + # Install Twisted. |
1246 | + user_connection.perform( |
1247 | + 'sudo aptitude -y install' |
1248 | + ' python-twisted-core python-twisted-conch') |
1249 | + # Get Subunit. This is ugly, but I can't find a better way of |
1250 | + # doing it right now. Revision 90 is known to work. |
1251 | + user_connection.run_with_ssh_agent( |
1252 | + 'bzr branch -r90 lp:subunit ~/subunit') |
1253 | + # Copy remotenode.py and remotenodekiller.py over. |
1254 | + self.log('Copying remotenode.py to remote machine.\n') |
1255 | + from_dir = os.path.dirname(os.path.realpath(__file__)) |
1256 | + user_connection.sftp.put( |
1257 | + os.path.join(from_dir, 'remotenode.py'), |
1258 | + '/var/launchpad/remotenode.py') |
1259 | + user_connection.sftp.put( |
1260 | + os.path.join(from_dir, 'remotenodekiller.py'), |
1261 | + '/var/launchpad/remotenodekiller.py') |
1262 | + # Start the remote node. |
1263 | + user_connection.perform( |
1264 | + 'PYTHONPATH=~/subunit/python twistd' |
1265 | + ' --python /var/launchpad/remotenode.py' |
1266 | + ' --pidfile /var/launchpad/remotenode.pid' |
1267 | + ' --logfile /var/www/remotenode.log') |
1268 | + # Daemonize the killer. |
1269 | + user_connection.run_as_daemon( |
1270 | + "python /var/launchpad/remotenodekiller.py" |
1271 | + " --pid-file /var/launchpad/remotenode.pid" |
1272 | + " --log-file-pattern '/var/www/*.log*'" |
1273 | + " -- sudo shutdown -h now") |
1274 | + # Remove the index.html from the www directory so we can see |
1275 | + # the logs being written therein. |
1276 | + user_connection.perform('rm /var/www/index.html') |
1277 | + user_connection.close() |
1278 | |
1279 | === added file 'utilities/pb-shell' |
1280 | --- utilities/pb-shell 1970-01-01 00:00:00 +0000 |
1281 | +++ utilities/pb-shell 2009-12-23 15:23:24 +0000 |
1282 | @@ -0,0 +1,89 @@ |
1283 | +#!/usr/bin/python |
1284 | +"""A interactive PB (Perspective Broker) shell.""" |
1285 | + |
1286 | +__metatype__ = type |
1287 | + |
1288 | +import code |
1289 | + |
1290 | +from functools import partial |
1291 | + |
1292 | +from twisted.internet import reactor, threads |
1293 | +from twisted.spread import pb |
1294 | + |
1295 | + |
1296 | +def block(func, *args, **kwargs): |
1297 | + """Run a function in the reactor and wait for the result. |
1298 | + |
1299 | + If the result is a `pb.RemoteReference`, wrap it with a |
1300 | + `BlockingRemoteReference`. |
1301 | + """ |
1302 | + result = threads.blockingCallFromThread( |
1303 | + reactor, func, *args, **kwargs) |
1304 | + if isinstance(result, pb.RemoteReference): |
1305 | + result = BlockingRemoteReference(result) |
1306 | + return result |
1307 | + |
1308 | + |
1309 | +def blocking(wrapped): |
1310 | + """A decorator to turn an async PB call into a blocking one.""" |
1311 | + return partial(block, wrapped) |
1312 | + |
1313 | + |
1314 | +class BlockingRemoteReference: |
1315 | + """A blocking version of a `pb.RemoteReference`. |
1316 | + |
1317 | + Accessing any attribute (excluding those starting with an |
1318 | + underscore) returns a blocking wrapper around a `callRemote` for |
1319 | + the given name. |
1320 | + """ |
1321 | + |
1322 | + def __init__(self, reference): |
1323 | + self.reference = reference |
1324 | + |
1325 | + def __getattr__(self, name): |
1326 | + if name.startswith('_'): |
1327 | + raise AttributeError(name) |
1328 | + return blocking( |
1329 | + partial(self.reference.callRemote, name)) |
1330 | + |
1331 | + |
1332 | +def disaster(failure): |
1333 | + failure.printTraceback() |
1334 | + return failure |
1335 | + |
1336 | + |
1337 | +def connect(host, port=8789): |
1338 | + """Connect to a PB server, returning the root object.""" |
1339 | + # This function blocks, and is meant to be called from *outside* |
1340 | + # of the event loop. |
1341 | + def _connect(): |
1342 | + """Connect to a PB server, returning the root object.""" |
1343 | + factory = pb.PBClientFactory() |
1344 | + reactor.connectTCP(host, port, factory) |
1345 | + d = factory.getRootObject() |
1346 | + d.addErrback(disaster) |
1347 | + return d |
1348 | + return block(_connect) |
1349 | + |
1350 | + |
1351 | +def interact(ns): |
1352 | + try: |
1353 | + from IPython.ipapi import launch_new_instance |
1354 | + except ImportError: |
1355 | + return code.interact(banner='', local=ns) |
1356 | + else: |
1357 | + return launch_new_instance(ns) |
1358 | + |
1359 | + |
1360 | +def console(): |
1361 | + """Start the interactive shell.""" |
1362 | + commands = {'connect': connect} |
1363 | + try: |
1364 | + interact(commands) |
1365 | + finally: |
1366 | + reactor.callFromThread(reactor.stop) |
1367 | + |
1368 | + |
1369 | +if __name__ == '__main__': |
1370 | + reactor.callInThread(console) |
1371 | + reactor.run() |
== The new Twisted node daemon, lib/devscripts/ ec2test/ remotenode. py ==
Diff: http:// pastebin. ubuntu. com/315123/
This daemon is meant to make the following possible:
1. Start up N instances (on EC2, but the daemon makes no assumptions
on that front).
2. One of them is told it is the supervisor. It immediately builds the
LP tree and calculates the complete list of tests to run. It puts
these tests into N similarly sized parcels for workers to pick up
later. It also starts the subunit results aggregator service.
3. Every instance is told where the supervisor is. These instances
become workers, and ask the supervisor for work.
4. Each worker runs the tests it has been given, and streams the
results back to the supervisor's result aggregator.
5. When a worker has finished its run, with or without error, it tells
the supervisor that it has attempted to run every test in its
parcel.
6. The supervisor keeps track of which tests it actually has results
for; there seem to be some problems where workers don't actually
run all of the tests they're asked to.
7. The worker asks for more work. If there isn't any, it shuts
down.
8. Once the supervisor has responses for all parcels, it sends a
comprehensive report email, then shuts down.
9. Another process (remotenodekill er.py) takes care of actually
shutting each instance down, when either the node daemon has
exited, or if the logs have not had any activity for 10 minutes.
Currently there is no support for submitting to PQM after a test run,
and the email address is hard-wired to <email address hidden>. I
think I might fix the latter before I land this branch :)
Anyway, the daemon has three services: controller, subunit results
aggregator, and inspection. The first two are needed to run the
testing service, the last helps during development, and for debugging.
=== Controller ===
The main class here is TestNode. This currently mixes in many
responsibilities that should be split out. It contains the logic for
being a supervisor (including starting the aggregator service and
sending report emails), being a worker, and shutting down. It is also,
in Perspective Broker parlance, a Referencable object, which means its
remote_* methods can be called remotely.
Ideally I would like to separate out concepts like the branch, the
supervisor and the worker, and then have a Referencable object to be
the remote control for them.
=== Subunit Aggregator ===
This is simply a socket that receives line-based output from a TestProtocolSer ver instance, to which it blindly passes the
remotely running bin/test process that's been asked to produce subunit
output (the --subunit flag). For each connection, it creates a
subunit.
input. This TestProtocolServer is given a results object that the
supervisor has created. This way all test results from all workers are
recorded into a single test result object.
The test result object also calls back to the supervisor for every
test that is run so that the supervisor can keep track of the tests
for which a result is available. It wouldn't be good if it submitted
to PQM if 1000 tests had never actually run.
=== Inspection se...