Merge lp:~abentley/launchpad/ampoule-0.1.1 into lp:launchpad
- ampoule-0.1.1
- Merge into devel
Status: | Merged |
---|---|
Approved by: | Gavin Panella |
Approved revision: | not available |
Merged at revision: | not available |
Proposed branch: | lp:~abentley/launchpad/ampoule-0.1.1 |
Merge into: | lp:launchpad |
Diff against target: |
239 lines (+36/-69) 5 files modified
lib/lp/bugs/windmill/tests/test_bug_inline_subscriber.py (+2/-1) lib/lp/code/model/branchmergeproposaljob.py (+1/-9) lib/lp/services/job/runner.py (+30/-45) lib/lp/services/job/tests/test_runner.py (+1/-9) versions.cfg (+2/-5) |
To merge this branch: | bzr merge lp:~abentley/launchpad/ampoule-0.1.1 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gavin Panella (community) | Approve | ||
Review via email: mp+18481@code.launchpad.net |
Commit message
Description of the change
Aaron Bentley (abentley) wrote : | # |
Aaron Bentley (abentley) wrote : | # |
After submitting this, I remembered about __import__, so I've replaced exec with __import__.
Gavin Panella (allenap) wrote : | # |
Hi Aaron,
I can't pretend to fully understand all of this, but the changes look
good. I have one tiny comment.
Gavin.
> === modified file 'lib/lp/
> --- lib/lp/
> +++ lib/lp/
> @@ -15,6 +15,7 @@
> ]
>
>
> +from calendar import timegm
> import contextlib
> import logging
> import os
> @@ -22,9 +23,8 @@
> import sys
>
> from ampoule import child, pool, main
> -from twisted.internet import defer, error, reactor, stdio
> +from twisted.internet import defer, reactor
> from twisted.protocols import amp
> -from twisted.python import log, reflect
>
> from zope.component import getUtility
> from zope.security.proxy import removeSecurityProxy
> @@ -231,9 +231,25 @@
> class JobRunnerProces
> """Base class for processes that run jobs."""
>
> - def __init__(self):
> + def __init__(self, job_source_name):
> child.AMPChild.
> - self.context_
> + segments = job_source_
> + module = '.'.join(
> + name = segments[-1]
Could you simplify this to:
module, name = job_source_
?
Ah, I guess it wouldn't work if there are no "."s in job_source_name.
Aaron Bentley (abentley) wrote : | # |
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1
Gavin Panella wrote:
>> - def __init__(self):
>> + def __init__(self, job_source_name):
>> child.AMPChild.
>> - self.context_
>> + segments = job_source_
>> + module = '.'.join(
>> + name = segments[-1]
>
> Could you simplify this to:
>
> module, name = job_source_
Done. Thanks for the tip.
> Ah, I guess it wouldn't work if there are no "."s in job_source_name.
If there are no dots, the import will fail anyhow. I can't imagine any
time when it would be sensible to supply a job_source_name with no dots
in it, though.
Aaron
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Mozilla - http://
iEYEARECAAYFAkt
QUQAni+
=ieiV
-----END PGP SIGNATURE-----
Preview Diff
1 | === modified file 'lib/lp/bugs/windmill/tests/test_bug_inline_subscriber.py' | |||
2 | --- lib/lp/bugs/windmill/tests/test_bug_inline_subscriber.py 2010-02-01 18:37:00 +0000 | |||
3 | +++ lib/lp/bugs/windmill/tests/test_bug_inline_subscriber.py 2010-02-03 22:24:19 +0000 | |||
4 | @@ -18,7 +18,8 @@ | |||
5 | 18 | layer = BugsWindmillLayer | 18 | layer = BugsWindmillLayer |
6 | 19 | suite_name = 'Inline bug page subscribers test' | 19 | suite_name = 'Inline bug page subscribers test' |
7 | 20 | 20 | ||
9 | 21 | def test_inline_subscriber(self): | 21 | def DISABLED_test_inline_subscriber(self): |
10 | 22 | # This test fails intermittently. See bug #516781. | ||
11 | 22 | """Test inline subscribing on bugs pages. | 23 | """Test inline subscribing on bugs pages. |
12 | 23 | 24 | ||
13 | 24 | This test makes sure that subscribing and unsubscribing | 25 | This test makes sure that subscribing and unsubscribing |
14 | 25 | 26 | ||
15 | === modified file 'lib/lp/code/model/branchmergeproposaljob.py' | |||
16 | --- lib/lp/code/model/branchmergeproposaljob.py 2010-01-11 20:07:17 +0000 | |||
17 | +++ lib/lp/code/model/branchmergeproposaljob.py 2010-02-03 22:24:19 +0000 | |||
18 | @@ -51,7 +51,7 @@ | |||
19 | 51 | from lp.codehosting.vfs import get_multi_server, get_scanner_server | 51 | from lp.codehosting.vfs import get_multi_server, get_scanner_server |
20 | 52 | from lp.services.job.model.job import Job | 52 | from lp.services.job.model.job import Job |
21 | 53 | from lp.services.job.interfaces.job import IRunnableJob | 53 | from lp.services.job.interfaces.job import IRunnableJob |
23 | 54 | from lp.services.job.runner import BaseRunnableJob, JobRunnerProcess | 54 | from lp.services.job.runner import BaseRunnableJob |
24 | 55 | 55 | ||
25 | 56 | 56 | ||
26 | 57 | class BranchMergeProposalJobType(DBEnumeratedType): | 57 | class BranchMergeProposalJobType(DBEnumeratedType): |
27 | @@ -283,14 +283,6 @@ | |||
28 | 283 | self.branch_merge_proposal.preview_diff = preview | 283 | self.branch_merge_proposal.preview_diff = preview |
29 | 284 | 284 | ||
30 | 285 | 285 | ||
31 | 286 | class UpdatePreviewDiffProcess(JobRunnerProcess): | ||
32 | 287 | """A process that runs UpdatePreviewDiffJobs""" | ||
33 | 288 | job_class = UpdatePreviewDiffJob | ||
34 | 289 | |||
35 | 290 | |||
36 | 291 | UpdatePreviewDiffJob.amp = UpdatePreviewDiffProcess | ||
37 | 292 | |||
38 | 293 | |||
39 | 294 | class CreateMergeProposalJob(BaseRunnableJob): | 286 | class CreateMergeProposalJob(BaseRunnableJob): |
40 | 295 | """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`.""" | 287 | """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`.""" |
41 | 296 | 288 | ||
42 | 297 | 289 | ||
43 | === modified file 'lib/lp/services/job/runner.py' | |||
44 | --- lib/lp/services/job/runner.py 2010-02-03 11:17:12 +0000 | |||
45 | +++ lib/lp/services/job/runner.py 2010-02-03 22:24:18 +0000 | |||
46 | @@ -15,6 +15,7 @@ | |||
47 | 15 | ] | 15 | ] |
48 | 16 | 16 | ||
49 | 17 | 17 | ||
50 | 18 | from calendar import timegm | ||
51 | 18 | import contextlib | 19 | import contextlib |
52 | 19 | import logging | 20 | import logging |
53 | 20 | import os | 21 | import os |
54 | @@ -22,9 +23,8 @@ | |||
55 | 22 | import sys | 23 | import sys |
56 | 23 | 24 | ||
57 | 24 | from ampoule import child, pool, main | 25 | from ampoule import child, pool, main |
59 | 25 | from twisted.internet import defer, error, reactor, stdio | 26 | from twisted.internet import defer, reactor |
60 | 26 | from twisted.protocols import amp | 27 | from twisted.protocols import amp |
61 | 27 | from twisted.python import log, reflect | ||
62 | 28 | 28 | ||
63 | 29 | from zope.component import getUtility | 29 | from zope.component import getUtility |
64 | 30 | from zope.security.proxy import removeSecurityProxy | 30 | from zope.security.proxy import removeSecurityProxy |
65 | @@ -242,9 +242,23 @@ | |||
66 | 242 | class JobRunnerProcess(child.AMPChild): | 242 | class JobRunnerProcess(child.AMPChild): |
67 | 243 | """Base class for processes that run jobs.""" | 243 | """Base class for processes that run jobs.""" |
68 | 244 | 244 | ||
70 | 245 | def __init__(self): | 245 | def __init__(self, job_source_name): |
71 | 246 | child.AMPChild.__init__(self) | 246 | child.AMPChild.__init__(self) |
73 | 247 | self.context_manager = self.job_class.contextManager() | 247 | module, name = job_source_name.rsplit('.', 1) |
74 | 248 | source_module = __import__(module, fromlist=[name]) | ||
75 | 249 | self.job_source = getattr(source_module, name) | ||
76 | 250 | self.context_manager = self.job_source.contextManager() | ||
77 | 251 | |||
78 | 252 | @staticmethod | ||
79 | 253 | def __enter__(): | ||
80 | 254 | def handler(signum, frame): | ||
81 | 255 | raise TimeoutError | ||
82 | 256 | scripts.execute_zcml_for_scripts(use_web_security=False) | ||
83 | 257 | signal(SIGHUP, handler) | ||
84 | 258 | |||
85 | 259 | @staticmethod | ||
86 | 260 | def __exit__(exc_type, exc_val, exc_tb): | ||
87 | 261 | pass | ||
88 | 248 | 262 | ||
89 | 249 | def makeConnection(self, transport): | 263 | def makeConnection(self, transport): |
90 | 250 | """The Job context is entered on connect.""" | 264 | """The Job context is entered on connect.""" |
91 | @@ -258,9 +272,9 @@ | |||
92 | 258 | 272 | ||
93 | 259 | @RunJobCommand.responder | 273 | @RunJobCommand.responder |
94 | 260 | def runJobCommand(self, job_id): | 274 | def runJobCommand(self, job_id): |
96 | 261 | """Run a job of this job_class according to its job id.""" | 275 | """Run a job from this job_source according to its job id.""" |
97 | 262 | runner = BaseJobRunner() | 276 | runner = BaseJobRunner() |
99 | 263 | job = self.job_class.get(job_id) | 277 | job = self.job_source.get(job_id) |
100 | 264 | oops = runner.runJobHandleError(job) | 278 | oops = runner.runJobHandleError(job) |
101 | 265 | if oops is None: | 279 | if oops is None: |
102 | 266 | oops_id = '' | 280 | oops_id = '' |
103 | @@ -269,28 +283,22 @@ | |||
104 | 269 | return {'success': len(runner.completed_jobs), 'oops_id': oops_id} | 283 | return {'success': len(runner.completed_jobs), 'oops_id': oops_id} |
105 | 270 | 284 | ||
106 | 271 | 285 | ||
107 | 272 | class HUPProcessPool(pool.ProcessPool): | ||
108 | 273 | """A ProcessPool that kills with HUP.""" | ||
109 | 274 | |||
110 | 275 | def _handleTimeout(self, child): | ||
111 | 276 | try: | ||
112 | 277 | child.transport.signalProcess(SIGHUP) | ||
113 | 278 | except error.ProcessExitedAlready: | ||
114 | 279 | pass | ||
115 | 280 | |||
116 | 281 | |||
117 | 282 | class TwistedJobRunner(BaseJobRunner): | 286 | class TwistedJobRunner(BaseJobRunner): |
118 | 283 | """Run Jobs via twisted.""" | 287 | """Run Jobs via twisted.""" |
119 | 284 | 288 | ||
121 | 285 | def __init__(self, job_source, job_amp, logger=None, error_utility=None): | 289 | def __init__(self, job_source, logger=None, error_utility=None): |
122 | 286 | starter = main.ProcessStarter( | 290 | starter = main.ProcessStarter( |
124 | 287 | bootstrap=BOOTSTRAP, packages=('twisted', 'ampoule'), | 291 | packages=('twisted', 'ampoule'), |
125 | 288 | env={'PYTHONPATH': os.environ['PYTHONPATH'], | 292 | env={'PYTHONPATH': os.environ['PYTHONPATH'], |
126 | 289 | 'PATH': os.environ['PATH'], | 293 | 'PATH': os.environ['PATH'], |
127 | 290 | 'LPCONFIG': os.environ['LPCONFIG']}) | 294 | 'LPCONFIG': os.environ['LPCONFIG']}) |
128 | 291 | super(TwistedJobRunner, self).__init__(logger, error_utility) | 295 | super(TwistedJobRunner, self).__init__(logger, error_utility) |
129 | 292 | self.job_source = job_source | 296 | self.job_source = job_source |
131 | 293 | self.pool = HUPProcessPool(job_amp, starter=starter, min=0) | 297 | import_name = '%s.%s' % ( |
132 | 298 | removeSecurityProxy(job_source).__module__, job_source.__name__) | ||
133 | 299 | self.pool = pool.ProcessPool( | ||
134 | 300 | JobRunnerProcess, ampChildArgs=[import_name], starter=starter, | ||
135 | 301 | min=0, timeout_signal=SIGHUP) | ||
136 | 294 | 302 | ||
137 | 295 | def runJobInSubprocess(self, job): | 303 | def runJobInSubprocess(self, job): |
138 | 296 | """Run the job_class with the specified id in the process pool. | 304 | """Run the job_class with the specified id in the process pool. |
139 | @@ -304,12 +312,9 @@ | |||
140 | 304 | self.incomplete_jobs.append(job) | 312 | self.incomplete_jobs.append(job) |
141 | 305 | return | 313 | return |
142 | 306 | job_id = job.id | 314 | job_id = job.id |
147 | 307 | timeout = job.getTimeout() | 315 | deadline = timegm(job.lease_expires.timetuple()) |
144 | 308 | # work around ampoule bug | ||
145 | 309 | if timeout == 0: | ||
146 | 310 | timeout = 0.0000000000001 | ||
148 | 311 | deferred = self.pool.doWork( | 316 | deferred = self.pool.doWork( |
150 | 312 | RunJobCommand, job_id = job_id, _timeout=timeout) | 317 | RunJobCommand, job_id = job_id, _deadline=deadline) |
151 | 313 | def update(response): | 318 | def update(response): |
152 | 314 | if response['success']: | 319 | if response['success']: |
153 | 315 | self.completed_jobs.append(job) | 320 | self.completed_jobs.append(job) |
154 | @@ -362,8 +367,7 @@ | |||
155 | 362 | def runFromSource(cls, job_source, logger, error_utility=None): | 367 | def runFromSource(cls, job_source, logger, error_utility=None): |
156 | 363 | """Run all ready jobs provided by the specified source.""" | 368 | """Run all ready jobs provided by the specified source.""" |
157 | 364 | logger.info("Running through Twisted.") | 369 | logger.info("Running through Twisted.") |
160 | 365 | runner = cls(job_source, removeSecurityProxy(job_source).amp, logger, | 370 | runner = cls(job_source, logger, error_utility) |
159 | 366 | error_utility) | ||
161 | 367 | reactor.callWhenRunning(runner.runAll) | 371 | reactor.callWhenRunning(runner.runAll) |
162 | 368 | handler = getsignal(SIGCHLD) | 372 | handler = getsignal(SIGCHLD) |
163 | 369 | try: | 373 | try: |
164 | @@ -396,22 +400,3 @@ | |||
165 | 396 | 400 | ||
166 | 397 | def __init__(self): | 401 | def __init__(self): |
167 | 398 | Exception.__init__(self, "Job ran too long.") | 402 | Exception.__init__(self, "Job ran too long.") |
168 | 399 | |||
169 | 400 | |||
170 | 401 | BOOTSTRAP = """\ | ||
171 | 402 | import sys | ||
172 | 403 | from twisted.application import reactors | ||
173 | 404 | reactors.installReactor(sys.argv[-2]) | ||
174 | 405 | from lp.services.job.runner import bootstrap | ||
175 | 406 | bootstrap(sys.argv[-1]) | ||
176 | 407 | """ | ||
177 | 408 | |||
178 | 409 | def bootstrap(ampChildPath): | ||
179 | 410 | def handler(signum, frame): | ||
180 | 411 | raise TimeoutError | ||
181 | 412 | signal(SIGHUP, handler) | ||
182 | 413 | log.startLogging(sys.stderr) | ||
183 | 414 | ampChild = reflect.namedAny(ampChildPath) | ||
184 | 415 | stdio.StandardIO(ampChild(), 3, 4) | ||
185 | 416 | scripts.execute_zcml_for_scripts(use_web_security=False) | ||
186 | 417 | reactor.run() | ||
187 | 418 | 403 | ||
188 | === modified file 'lib/lp/services/job/tests/test_runner.py' | |||
189 | --- lib/lp/services/job/tests/test_runner.py 2010-01-16 10:20:46 +0000 | |||
190 | +++ lib/lp/services/job/tests/test_runner.py 2010-02-03 22:24:18 +0000 | |||
191 | @@ -18,7 +18,7 @@ | |||
192 | 18 | 18 | ||
193 | 19 | from lp.testing.mail_helpers import pop_notifications | 19 | from lp.testing.mail_helpers import pop_notifications |
194 | 20 | from lp.services.job.runner import ( | 20 | from lp.services.job.runner import ( |
196 | 21 | JobRunner, BaseRunnableJob, JobRunnerProcess, TwistedJobRunner | 21 | JobRunner, BaseRunnableJob, TwistedJobRunner |
197 | 22 | ) | 22 | ) |
198 | 23 | from lp.services.job.interfaces.job import JobStatus, IRunnableJob | 23 | from lp.services.job.interfaces.job import JobStatus, IRunnableJob |
199 | 24 | from lp.services.job.model.job import Job | 24 | from lp.services.job.model.job import Job |
200 | @@ -283,14 +283,6 @@ | |||
201 | 283 | sleep(30) | 283 | sleep(30) |
202 | 284 | 284 | ||
203 | 285 | 285 | ||
204 | 286 | class StuckJobProcess(JobRunnerProcess): | ||
205 | 287 | |||
206 | 288 | job_class = StuckJob | ||
207 | 289 | |||
208 | 290 | |||
209 | 291 | StuckJob.amp = StuckJobProcess | ||
210 | 292 | |||
211 | 293 | |||
212 | 294 | class ListLogger: | 286 | class ListLogger: |
213 | 295 | 287 | ||
214 | 296 | def __init__(self): | 288 | def __init__(self): |
215 | 297 | 289 | ||
216 | === modified file 'versions.cfg' | |||
217 | --- versions.cfg 2010-02-03 10:57:21 +0000 | |||
218 | +++ versions.cfg 2010-02-03 22:24:18 +0000 | |||
219 | @@ -4,11 +4,7 @@ | |||
220 | 4 | [versions] | 4 | [versions] |
221 | 5 | # Alphabetical, case-insensitive, please! :-) | 5 | # Alphabetical, case-insensitive, please! :-) |
222 | 6 | 6 | ||
228 | 7 | # from -r 3:lp:~launchpad/ampoule/launchpad-tweaked | 7 | ampoule = 0.2.0 |
224 | 8 | # To reproduce: | ||
225 | 9 | # bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\ | ||
226 | 10 | # -r 3 | ||
227 | 11 | ampoule = 0.1.0-lp-1 | ||
229 | 12 | # Non-released bzr version from bzr+ssh://bazaar.launchpad.net/~mwhudson/bzr/2.1.0b4-lp2 | 8 | # Non-released bzr version from bzr+ssh://bazaar.launchpad.net/~mwhudson/bzr/2.1.0b4-lp2 |
230 | 13 | bzr = 2.1b4-lp2 | 9 | bzr = 2.1b4-lp2 |
231 | 14 | chameleon.core = 1.0b35 | 10 | chameleon.core = 1.0b35 |
232 | @@ -50,6 +46,7 @@ | |||
233 | 50 | PasteDeploy = 1.3.3 | 46 | PasteDeploy = 1.3.3 |
234 | 51 | pyasn1 = 0.0.9a | 47 | pyasn1 = 0.0.9a |
235 | 52 | pycrypto = 2.0.1 | 48 | pycrypto = 2.0.1 |
236 | 49 | pyOpenSSL = 0.10 | ||
237 | 53 | python-memcached = 1.45 | 50 | python-memcached = 1.45 |
238 | 54 | python-openid = 2.2.1 | 51 | python-openid = 2.2.1 |
239 | 55 | pytz = 2009l | 52 | pytz = 2009l |
= Summary =
Update to Ampoule 0.2.0.
== Proposed fix ==
This branch updates ampoule and also adds a version for pyOpenSSL, which is a
new Ampoule dependency. This new version of ampoule has many changes that I
suggested, to push as much code upstream as I thought was architecturally
sound.
== Pre-implementation notes ==
None
== Implementation details ==
Instead of having a separate AMPChild subclass for every JobSource, a new
ampoule feature allows us to parameterize the JobRunnerProcess. We supply a
full path to the JobSource. In JobRunnerProcess, we import this and use it. I
used 'exec' for this, because the import module looked crazy complicated.
Instead of supplying our own BOOTSTRAP code in order to provide a bit more
startup code, a new ampoule feature lets us simply implement the context
manager protocol in JobRunnerProcess.
Instead of subclassing pool.ProcessPool to use SIGHUP, ampoule now lets us
parameterize the timout_signal.
Instead of converting our leases, which are datetimes, into timeout durations,
a new ampoule feature lets us supply them as deadlines.
job_class was renamed to job_source where this was a better description of its
usage.
== Tests == preview_ diffs
bin/test -vt test_runner -t update_
== Demo and Q/A ==
No visible change.
= Launchpad lint =
Checking for conflicts. and issues in doctests and templates.
Running jslint, xmllint, pyflakes, and pylint.
Using normal rules.
Linting changed files: services/ job/tests/ test_runner. py services/ job/runner. py code/model/ branchmergeprop osaljob. py
lib/lp/
lib/lp/
versions.cfg
lib/lp/
== Pyflakes notices ==
lib/lp/ services/ job/runner. py
240: undefined name 'job_source'
^^^ This is because of the exec.
== Pylint notices ==
lib/lp/ services/ job/runner. py s.__init_ _] Use of the exec statement s.__init_ _] Undefined variable 'job_source'
35: [F0401] Unable to import 'lazr.delegates' (No module named delegates)
239: [W0122, JobRunnerProces
240: [E0602, JobRunnerProces
^^^ This is because of the exec.
lib/lp/ code/model/ branchmergeprop osaljob. py
22: [F0401] Unable to import 'lazr.delegates' (No module named delegates)
23: [F0401] Unable to import 'lazr.enum' (No module named enum)