Merge lp:~adeuring/launchpad/lp-lazr.jobrunner into lp:launchpad

Proposed by Abel Deuring
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 15037
Proposed branch: lp:~adeuring/launchpad/lp-lazr.jobrunner
Merge into: lp:launchpad
Diff against target: 606 lines (+98/-136)
15 files modified
lib/lp/code/scripts/tests/test_create_merge_proposals.py (+3/-3)
lib/lp/code/scripts/tests/test_merge_proposal_jobs.py (+2/-1)
lib/lp/code/scripts/tests/test_reclaim_branch_space.py (+7/-5)
lib/lp/code/scripts/tests/test_sendbranchmail.py (+12/-7)
lib/lp/registry/tests/test_process_job_sources_cronjob.py (+2/-2)
lib/lp/services/job/interfaces/job.py (+3/-6)
lib/lp/services/job/model/job.py (+28/-5)
lib/lp/services/job/runner.py (+17/-89)
lib/lp/services/job/tests/test_runner.py (+11/-12)
lib/lp/soyuz/model/packagecopyjob.py (+1/-1)
lib/lp/soyuz/tests/test_packagecopyjob.py (+1/-1)
lib/lp/testing/factory.py (+1/-1)
lib/lp/translations/scripts/tests/test_packaging_translations.py (+2/-2)
setup.py (+1/-0)
versions.cfg (+7/-1)
To merge this branch: bzr merge lp:~adeuring/launchpad/lp-lazr.jobrunner
Reviewer Review Type Date Requested Status
Aaron Bentley (community) Approve
Abel Deuring (community) Needs Resubmitting
Review via email: mp+97458@code.launchpad.net

Commit message

use lazr.jobrunner for basic job management

Description of the change

This branch removes some core methods from class JobRunner, which
are now provided by lazr.jobrunner.

It requires lp:~adeuring/launchpad/lazr.jobrunner-more-tests (not yet
merged into lp:lazr.jobrunner/trunk)

lazr.jobrunner does not do everything which the old job runner did.
Most notably, the now removed method BaseJobRunner.runJob() contained
several calls of transaction.commit() and transaction.abort().

The new module lazr.jobrunner does make any assumption where or how
the status of a job is stored, or if a job makes any other changes
to a database, and hence does do any of these calls.

Instead, these calls are now done in Job.start(), Job.complete() etc.

Grepping through the Launchpad source code showed that some of these
methods are called in places that are not related to controlling a
job, but for example to create a job in the status "suspended".

Calling transaction.commit() unconditionally could thus cause unwanted
"intermediate" commits; to avoid this, I added the parameter
manage_transaction to Job.start() etc, which is False by default,
but the calls in lazr.jobrunner use manage_transaction=True. This
is somewhat ugly, but the alternative, keeping transaction management
in lazr.jobrunner, would also look odd: The module is supposed to do
job management, regardless of any possible database related activities
of a job.

test: ./bin/test -vvt lp.services.job

no lint

To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

Not landable yet because it assumes lazr.jobrunner is a development egg. "../lazr.jobrunner" should not be listed in buildout.cfg, and a version of lazr.jobrunner should be specified in versions.cfg.

runJob may not need to be overridden in BaseJobRunner. If not, please remove it.

job_str may not need to be overridden in BaseJobRunner. If not, please remove it.

review: Needs Fixing
Revision history for this message
Abel Deuring (adeuring) wrote :

> Not landable yet because it assumes lazr.jobrunner is a development egg.
> "../lazr.jobrunner" should not be listed in buildout.cfg, and a version
> of lazr.jobrunner should be specified in versions.cfg.

Done.

>
> runJob may not need to be overridden in BaseJobRunner. If not, please
> remove it.

I think that we should keep this method: class runner.BaseRunnableJob
implements some required methods like notifyOops(), but it does not
implement start(), complete() etc. The latter methods are defined in
lp.services.job.model.job.Job -- and all of them are required by
lazr.jobrunner.runner.JobRunner. So, "IRunnableJob(job)" in

    def runJob(self, job):
        super(BaseJobRunner, self).runJob(IRunnableJob(job))

in BaseJobRunner ensures that the Zope machinery adapts the "incomplete"
job to IRunnableJob. OK, IRunnableJob(job) is also called in
JobRunner.runAll() and in TwistedJobRunner.runJobInSubprocess(), so it is
right now not strictly necessary in JobRunner.runJob() -- but keeping
JobRunner.runJob() makes it easier to remove runAll() later.

Alternatively, we could call IRunnableJob(job) in lazr.jobrunner, but I
am not sure if this makes sense.

> job_str may not need to be overridden in BaseJobRunner. If not, please
> remove it.

Right, this is no longer needed. Removed.

Revision history for this message
Aaron Bentley (abentley) wrote :

> > runJob may not need to be overridden in BaseJobRunner. If not, please
> > remove it.
>
> I think that we should keep this method: class runner.BaseRunnableJob
> implements some required methods like notifyOops(), but it does not
> implement start(), complete() etc. The latter methods are defined in
> lp.services.job.model.job.Job -- and all of them are required by
> lazr.jobrunner.runner.JobRunner. So, "IRunnableJob(job)" in
>
> def runJob(self, job):
> super(BaseJobRunner, self).runJob(IRunnableJob(job))
>
> in BaseJobRunner ensures that the Zope machinery adapts the "incomplete"
> job to IRunnableJob. OK, IRunnableJob(job) is also called in
> JobRunner.runAll() and in TwistedJobRunner.runJobInSubprocess(), so it is
> right now not strictly necessary in JobRunner.runJob() -- but keeping
> JobRunner.runJob() makes it easier to remove runAll() later.

I think this is fine for now. In the future, I think we might want to make the job source responsible for ensuring that the job is runnable, and have the runner assume the job is runnable.

> Alternatively, we could call IRunnableJob(job) in lazr.jobrunner, but I
> am not sure if this makes sense.

I'd like to keep zope.component out of lazr.jobrunner if we can.

review: Approve
Revision history for this message
Abel Deuring (adeuring) wrote :

Some additional changes are necessary to use lazr.jobrunner (see also https://code.launchpad.net/~adeuring/lazr.jobrunner/use_job_repr_in_logging/+merge/98821):

The change in lib/lp/registry/tests/test_process_job_sources_cronjob.py reflects the change from logger.debug() in the original BaseJobRunner.runJob() method to logger.info() in lazr.jobrunner.

The second change: I added the method Job._doOops() again to BasJobRunner. The reason:

The implementation of _doOops() in lazr.jobrunner calls oops_report.publish(). The makes sense because the module does not make any asssumptions about a setup for some sort automatic OOPS generation: It simply publishes the OOPS report and the logs an error message.

This conflicts with the way how Launchpad generates an OOPS in scripts: lp.services.scripts.logger has a class OopsHandler(logging.Handler), which automatically generates an OOPS report, when logger.error() is called. So, a call of _doOops() from lazr.jobrunner first publishes an OOPS, then logs the error -- and a log handler publishes the same OPS report again.

tests:

bin/test -vv \
-t lp.registry.tests.test_process_job_sources_cronjob.ProcessJobSourceGroupsTest.test_processed \
-t lp.registry.tests.test_process_job_sources_cronjob.ProcessJobSourceTest.test_processed \
-t lp.code.scripts.tests.test_create_merge_proposals.TestCreateMergeProposals.test_oops \
-t lp.code.scripts.tests.test_reclaim_branch_space.TestReclaimBranchSpaceScript.test_reclaimbranchspace_script_logs_oops \
-t lp.code.scripts.tests.test_sendbranchmail.TestSendbranchmail.test_sendbranchmail_handles_oops \
-t lp.translations.tests.test_rosetta_branches_script.TestRosettaBranchesScript.test_rosetta_branches_script_oops \
-t lp.registry.tests.test_membership_notification_job.MembershipNotificationJobTest.test_smoke_admining_team

no lint

Revision history for this message
Abel Deuring (adeuring) :
review: Needs Resubmitting
Revision history for this message
Aaron Bentley (abentley) wrote :

I believe this would introduce a regression with database error handling. If there is a database error (like a permission failure) runJobHandleError will call Job.fail. But that will cause a database lookup. Which fails due to the previous error. e.g. http://pastebin.ubuntu.com/892304/

review: Needs Fixing
Revision history for this message
Abel Deuring (adeuring) wrote :

The problem you mentioned is fixed in lazr.jobrunner (including tests).

review: Needs Resubmitting
Revision history for this message
Aaron Bentley (abentley) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/code/scripts/tests/test_create_merge_proposals.py'
--- lib/lp/code/scripts/tests/test_create_merge_proposals.py 2012-01-01 02:58:52 +0000
+++ lib/lp/code/scripts/tests/test_create_merge_proposals.py 2012-03-28 11:45:27 +0000
@@ -36,11 +36,11 @@
36 retcode, stdout, stderr = run_script(36 retcode, stdout, stderr = run_script(
37 'cronscripts/create_merge_proposals.py', [])37 'cronscripts/create_merge_proposals.py', [])
38 self.assertEqual(0, retcode)38 self.assertEqual(0, retcode)
39 self.assertEqual(39 self.assertTextMatchesExpressionIgnoreWhitespace(
40 "INFO Creating lockfile: "40 "INFO Creating lockfile: "
41 "/var/lock/launchpad-create_merge_proposals.lock\n"41 "/var/lock/launchpad-create_merge_proposals.lock\n"
42 "INFO "42 "INFO Running <.*CreateMergeProposalJob object at .*?> "
43 "Running CreateMergeProposalJob (ID %d) in status Waiting\n"43 "\(ID %s\) in status Waiting\n"
44 "INFO Ran 1 CreateMergeProposalJobs.\n" % job.job.id, stderr)44 "INFO Ran 1 CreateMergeProposalJobs.\n" % job.job.id, stderr)
45 self.assertEqual('', stdout)45 self.assertEqual('', stdout)
46 self.assertEqual(1, source.landing_targets.count())46 self.assertEqual(1, source.landing_targets.count())
4747
=== modified file 'lib/lp/code/scripts/tests/test_merge_proposal_jobs.py'
--- lib/lp/code/scripts/tests/test_merge_proposal_jobs.py 2012-01-01 02:58:52 +0000
+++ lib/lp/code/scripts/tests/test_merge_proposal_jobs.py 2012-03-28 11:45:27 +0000
@@ -40,7 +40,8 @@
40 '\tworkers: 0\n'40 '\tworkers: 0\n'
41 'INFO \tworkers: 0\n'41 'INFO \tworkers: 0\n'
42 '(.|\n)*'42 '(.|\n)*'
43 'INFO Running GenerateIncrementalDiffJob \(ID %d\).\n'43 'INFO Running '
44 '<GENERATE_INCREMENTAL_DIFF job for merge .*?> \(ID %d\).\n'
44 '(.|\n)*'45 '(.|\n)*'
45 'INFO STOPPING: \'\'\n'46 'INFO STOPPING: \'\'\n'
46 'Main loop terminated.\n'47 'Main loop terminated.\n'
4748
=== modified file 'lib/lp/code/scripts/tests/test_reclaim_branch_space.py'
--- lib/lp/code/scripts/tests/test_reclaim_branch_space.py 2012-01-01 02:58:52 +0000
+++ lib/lp/code/scripts/tests/test_reclaim_branch_space.py 2012-03-28 11:45:27 +0000
@@ -46,7 +46,8 @@
46 'cronscripts/reclaimbranchspace.py', [])46 'cronscripts/reclaimbranchspace.py', [])
47 self.assertEqual('', stdout)47 self.assertEqual('', stdout)
48 self.assertEqual(48 self.assertEqual(
49 'INFO Creating lockfile: /var/lock/launchpad-reclaimbranchspace.lock\n'49 'INFO '
50 'Creating lockfile: /var/lock/launchpad-reclaimbranchspace.lock\n'
50 'INFO Reclaimed space for 0 branches.\n', stderr)51 'INFO Reclaimed space for 0 branches.\n', stderr)
51 self.assertEqual(0, retcode)52 self.assertEqual(0, retcode)
52 self.assertTrue(53 self.assertTrue(
@@ -63,16 +64,17 @@
63 retcode, stdout, stderr = run_script(64 retcode, stdout, stderr = run_script(
64 'cronscripts/reclaimbranchspace.py', [])65 'cronscripts/reclaimbranchspace.py', [])
65 self.assertEqual('', stdout)66 self.assertEqual('', stdout)
66 self.assertEqual(67 self.assertTextMatchesExpressionIgnoreWhitespace(
67 'INFO Creating lockfile: /var/lock/launchpad-reclaimbranchspace.lock\n'68 'INFO '
68 'INFO Running ReclaimBranchSpaceJob (ID %d) in status Waiting\n'69 'Creating lockfile: /var/lock/launchpad-reclaimbranchspace.lock\n'
70 'INFO Running <RECLAIM_BRANCH_SPACE branch job \(\d+\) for '
71 '\d+> \(ID %s\) in status Waiting\n'
69 'INFO Reclaimed space for 1 branches.\n' % reclaim_job.job.id,72 'INFO Reclaimed space for 1 branches.\n' % reclaim_job.job.id,
70 stderr)73 stderr)
71 self.assertEqual(0, retcode)74 self.assertEqual(0, retcode)
72 self.assertFalse(75 self.assertFalse(
73 os.path.exists(mirrored_path))76 os.path.exists(mirrored_path))
7477
75
76 def test_reclaimbranchspace_script_logs_oops(self):78 def test_reclaimbranchspace_script_logs_oops(self):
77 # If the job fails, an oops is logged.79 # If the job fails, an oops is logged.
78 db_branch = self.factory.makeAnyBranch()80 db_branch = self.factory.makeAnyBranch()
7981
=== modified file 'lib/lp/code/scripts/tests/test_sendbranchmail.py'
--- lib/lp/code/scripts/tests/test_sendbranchmail.py 2012-01-01 02:58:52 +0000
+++ lib/lp/code/scripts/tests/test_sendbranchmail.py 2012-03-28 11:45:27 +0000
@@ -50,9 +50,11 @@
50 transaction.commit()50 transaction.commit()
51 retcode, stdout, stderr = run_script(51 retcode, stdout, stderr = run_script(
52 'cronscripts/sendbranchmail.py', [])52 'cronscripts/sendbranchmail.py', [])
53 self.assertEqual(53 self.assertTextMatchesExpressionIgnoreWhitespace(
54 'INFO Creating lockfile: /var/lock/launchpad-sendbranchmail.lock\n'54 'INFO '
55 'INFO Running RevisionMailJob (ID %d) in status Waiting\n'55 'Creating lockfile: /var/lock/launchpad-sendbranchmail.lock\n'
56 'INFO Running <REVISION_MAIL branch job \(\d+\) for .*?> '
57 '\(ID %d\) in status Waiting\n'
56 'INFO Ran 1 RevisionMailJobs.\n' % mail_job.job.id, stderr)58 'INFO Ran 1 RevisionMailJobs.\n' % mail_job.job.id, stderr)
57 self.assertEqual('', stdout)59 self.assertEqual('', stdout)
58 self.assertEqual(0, retcode)60 self.assertEqual(0, retcode)
@@ -67,7 +69,8 @@
67 retcode, stdout, stderr = run_script(69 retcode, stdout, stderr = run_script(
68 'cronscripts/sendbranchmail.py', [])70 'cronscripts/sendbranchmail.py', [])
69 self.assertIn(71 self.assertIn(
70 'INFO Creating lockfile: /var/lock/launchpad-sendbranchmail.lock\n',72 'INFO '
73 'Creating lockfile: /var/lock/launchpad-sendbranchmail.lock\n',
71 stderr)74 stderr)
72 self.assertIn('INFO Job resulted in OOPS:', stderr)75 self.assertIn('INFO Job resulted in OOPS:', stderr)
73 self.assertIn('INFO Ran 0 RevisionMailJobs.\n', stderr)76 self.assertIn('INFO Ran 0 RevisionMailJobs.\n', stderr)
@@ -88,9 +91,11 @@
88 transaction.commit()91 transaction.commit()
89 retcode, stdout, stderr = run_script(92 retcode, stdout, stderr = run_script(
90 'cronscripts/sendbranchmail.py', [])93 'cronscripts/sendbranchmail.py', [])
91 self.assertEqual(94 self.assertTextMatchesExpressionIgnoreWhitespace(
92 'INFO Creating lockfile: /var/lock/launchpad-sendbranchmail.lock\n'95 'INFO '
93 'INFO Running RevisionsAddedJob (ID %d) in status Waiting\n'96 'Creating lockfile: /var/lock/launchpad-sendbranchmail.lock\n'
97 'INFO Running <REVISIONS_ADDED_MAIL branch job \(\d+\) '
98 'for .*?> \(ID %d\) in status Waiting\n'
94 'INFO Ran 1 RevisionMailJobs.\n' % job.job.id,99 'INFO Ran 1 RevisionMailJobs.\n' % job.job.id,
95 stderr)100 stderr)
96 self.assertEqual('', stdout)101 self.assertEqual('', stdout)
97102
=== modified file 'lib/lp/registry/tests/test_process_job_sources_cronjob.py'
--- lib/lp/registry/tests/test_process_job_sources_cronjob.py 2012-01-01 02:58:52 +0000
+++ lib/lp/registry/tests/test_process_job_sources_cronjob.py 2012-03-28 11:45:27 +0000
@@ -62,7 +62,7 @@
62 returncode, output, error = run_script(62 returncode, output, error = run_script(
63 self.script, ['-v', 'IMembershipNotificationJobSource'])63 self.script, ['-v', 'IMembershipNotificationJobSource'])
64 self.assertIn(64 self.assertIn(
65 ('DEBUG Running <MembershipNotificationJob '65 ('INFO Running <MembershipNotificationJob '
66 'about ~murdock in ~a-team; status=Waiting>'),66 'about ~murdock in ~a-team; status=Waiting>'),
67 error)67 error)
68 self.assertIn('DEBUG MembershipNotificationJob sent email', error)68 self.assertIn('DEBUG MembershipNotificationJob sent email', error)
@@ -126,7 +126,7 @@
126 returncode, output, error = run_script(126 returncode, output, error = run_script(
127 self.script, ['-v', '--wait', 'MAIN'])127 self.script, ['-v', '--wait', 'MAIN'])
128 self.assertTextMatchesExpressionIgnoreWhitespace(128 self.assertTextMatchesExpressionIgnoreWhitespace(
129 ('DEBUG Running <MembershipNotificationJob '129 ('INFO Running <MembershipNotificationJob '
130 'about ~murdock in ~a-team; status=Waiting>'),130 'about ~murdock in ~a-team; status=Waiting>'),
131 error)131 error)
132 self.assertIn('DEBUG MembershipNotificationJob sent email', error)132 self.assertIn('DEBUG MembershipNotificationJob sent email', error)
133133
=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py 2011-12-24 16:54:44 +0000
+++ lib/lp/services/job/interfaces/job.py 2012-03-28 11:45:27 +0000
@@ -14,7 +14,6 @@
14 'ITwistedJobSource',14 'ITwistedJobSource',
15 'JobStatus',15 'JobStatus',
16 'LeaseHeld',16 'LeaseHeld',
17 'SuspendJobException',
18 ]17 ]
1918
2019
@@ -39,11 +38,6 @@
39from lp.registry.interfaces.person import IPerson38from lp.registry.interfaces.person import IPerson
4039
4140
42class SuspendJobException(Exception):
43 """Raised when a running job wants to suspend itself."""
44 pass
45
46
47class LeaseHeld(Exception):41class LeaseHeld(Exception):
48 """Raised when attempting to acquire a list that is already held."""42 """Raised when attempting to acquire a list that is already held."""
4943
@@ -88,6 +82,9 @@
88class IJob(Interface):82class IJob(Interface):
89 """Basic attributes of a job."""83 """Basic attributes of a job."""
9084
85 job_id = Int(title=_(
86 'A unique identifier for this job.'))
87
91 scheduled_start = Datetime(88 scheduled_start = Datetime(
92 title=_('Time when the IJob was scheduled to start.'))89 title=_('Time when the IJob was scheduled to start.'))
9390
9491
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2012-02-24 03:47:44 +0000
+++ lib/lp/services/job/model/job.py 2012-03-28 11:45:27 +0000
@@ -25,6 +25,7 @@
25 Int,25 Int,
26 Reference,26 Reference,
27 )27 )
28import transaction
28from zope.interface import implements29from zope.interface import implements
2930
30from lp.services.database import bulk31from lp.services.database import bulk
@@ -56,6 +57,10 @@
5657
57 implements(IJob)58 implements(IJob)
5859
60 @property
61 def job_id(self):
62 return self.id
63
59 scheduled_start = UtcDateTimeCol()64 scheduled_start = UtcDateTimeCol()
6065
61 date_created = UtcDateTimeCol()66 date_created = UtcDateTimeCol()
@@ -144,31 +149,49 @@
144 expiry = timegm(self.lease_expires.timetuple())149 expiry = timegm(self.lease_expires.timetuple())
145 return max(0, expiry - time.time())150 return max(0, expiry - time.time())
146151
147 def start(self):152 def start(self, manage_transaction=False):
148 """See `IJob`."""153 """See `IJob`."""
149 self._set_status(JobStatus.RUNNING)154 self._set_status(JobStatus.RUNNING)
150 self.date_started = datetime.datetime.now(UTC)155 self.date_started = datetime.datetime.now(UTC)
151 self.date_finished = None156 self.date_finished = None
152 self.attempt_count += 1157 self.attempt_count += 1
158 if manage_transaction:
159 transaction.commit()
153160
154 def complete(self):161 def complete(self, manage_transaction=False):
155 """See `IJob`."""162 """See `IJob`."""
163 # Commit the transaction to update the DB time.
164 if manage_transaction:
165 transaction.commit()
156 self._set_status(JobStatus.COMPLETED)166 self._set_status(JobStatus.COMPLETED)
157 self.date_finished = datetime.datetime.now(UTC)167 self.date_finished = datetime.datetime.now(UTC)
168 if manage_transaction:
169 transaction.commit()
158170
159 def fail(self):171 def fail(self, manage_transaction=False):
160 """See `IJob`."""172 """See `IJob`."""
173 if manage_transaction:
174 transaction.abort()
161 self._set_status(JobStatus.FAILED)175 self._set_status(JobStatus.FAILED)
162 self.date_finished = datetime.datetime.now(UTC)176 self.date_finished = datetime.datetime.now(UTC)
177 if manage_transaction:
178 transaction.commit()
163179
164 def queue(self):180 def queue(self, manage_transaction=False):
165 """See `IJob`."""181 """See `IJob`."""
182 # Commit the transaction to update the DB time.
183 if manage_transaction:
184 transaction.commit()
166 self._set_status(JobStatus.WAITING)185 self._set_status(JobStatus.WAITING)
167 self.date_finished = datetime.datetime.now(UTC)186 self.date_finished = datetime.datetime.now(UTC)
187 if manage_transaction:
188 transaction.commit()
168189
169 def suspend(self):190 def suspend(self, manage_transaction=False):
170 """See `IJob`."""191 """See `IJob`."""
171 self._set_status(JobStatus.SUSPENDED)192 self._set_status(JobStatus.SUSPENDED)
193 if manage_transaction:
194 transaction.commit()
172195
173 def resume(self):196 def resume(self):
174 """See `IJob`."""197 """See `IJob`."""
175198
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-01-01 02:58:52 +0000
+++ lib/lp/services/job/runner.py 2012-03-28 11:45:27 +0000
@@ -38,6 +38,7 @@
38 pool,38 pool,
39 )39 )
40from lazr.delegates import delegates40from lazr.delegates import delegates
41from lazr.jobrunner.jobrunner import JobRunner as LazrJobRunner
41import transaction42import transaction
42from twisted.internet import reactor43from twisted.internet import reactor
43from twisted.internet.defer import (44from twisted.internet.defer import (
@@ -61,7 +62,6 @@
61 IJob,62 IJob,
62 IRunnableJob,63 IRunnableJob,
63 LeaseHeld,64 LeaseHeld,
64 SuspendJobException,
65 )65 )
66from lp.services.mail.sendmail import (66from lp.services.mail.sendmail import (
67 MailController,67 MailController,
@@ -176,20 +176,24 @@
176 return176 return
177 ctrl.send()177 ctrl.send()
178178
179179 def makeOopsReport(self, oops_config, info):
180class BaseJobRunner(object):180 """Generate an OOPS report using the given OOPS configuration."""
181 return oops_config.create(
182 context=dict(exc_info=info))
183
184
185class BaseJobRunner(LazrJobRunner):
181 """Runner of Jobs."""186 """Runner of Jobs."""
182187
183 def __init__(self, logger=None, error_utility=None):188 def __init__(self, logger=None, error_utility=None):
184 self.completed_jobs = []
185 self.incomplete_jobs = []
186 if logger is None:
187 logger = logging.getLogger()
188 self.logger = logger
189 self.error_utility = error_utility
190 self.oops_ids = []189 self.oops_ids = []
191 if self.error_utility is None:190 if error_utility is None:
192 self.error_utility = errorlog.globalErrorUtility191 self.error_utility = errorlog.globalErrorUtility
192 else:
193 self.error_utility = error_utility
194 super(BaseJobRunner, self).__init__(
195 logger, oops_config=self.error_utility._oops_config,
196 oopsMessage=self.error_utility.oopsMessage)
193197
194 def acquireLease(self, job):198 def acquireLease(self, job):
195 self.logger.debug(199 self.logger.debug(
@@ -204,83 +208,8 @@
204 return False208 return False
205 return True209 return True
206210
207 @staticmethod
208 def job_str(job):
209 class_name = job.__class__.__name__
210 ijob_id = removeSecurityProxy(job).job.id
211 return '%s (ID %d)' % (class_name, ijob_id)
212
213 def runJob(self, job):211 def runJob(self, job):
214 """Attempt to run a job, updating its status as appropriate."""212 super(BaseJobRunner, self).runJob(IRunnableJob(job))
215 job = IRunnableJob(job)
216
217 self.logger.info(
218 'Running %s in status %s' % (
219 self.job_str(job), job.status.title))
220 job.start()
221 transaction.commit()
222 do_retry = False
223 try:
224 try:
225 job.run()
226 except job.retry_error_types, e:
227 if job.attempt_count > job.max_retries:
228 raise
229 self.logger.exception(
230 "Scheduling retry due to %s.", e.__class__.__name__)
231 do_retry = True
232 except SuspendJobException:
233 self.logger.debug("Job suspended itself")
234 job.suspend()
235 self.incomplete_jobs.append(job)
236 except Exception:
237 transaction.abort()
238 job.fail()
239 # Record the failure.
240 transaction.commit()
241 self.incomplete_jobs.append(job)
242 raise
243 else:
244 # Commit transaction to update the DB time.
245 transaction.commit()
246 if do_retry:
247 job.queue()
248 self.incomplete_jobs.append(job)
249 else:
250 job.complete()
251 self.completed_jobs.append(job)
252 # Commit transaction to update job status.
253 transaction.commit()
254
255 def runJobHandleError(self, job):
256 """Run the specified job, handling errors.
257
258 Most errors will be logged as Oopses. Jobs in user_error_types won't.
259 The list of complete or incomplete jobs will be updated.
260 """
261 job = IRunnableJob(job)
262 with self.error_utility.oopsMessage(
263 dict(job.getOopsVars())):
264 try:
265 try:
266 self.logger.debug('Running %r', job)
267 self.runJob(job)
268 except job.user_error_types, e:
269 self.logger.info('Job %r failed with user error %r.' %
270 (job, e))
271 job.notifyUserError(e)
272 except Exception:
273 info = sys.exc_info()
274 return self._doOops(job, info)
275 except Exception:
276 # This only happens if sending attempting to notify users
277 # about errors fails for some reason (like a misconfigured
278 # email server).
279 self.logger.exception(
280 "Failed to notify users about a failure.")
281 info = sys.exc_info()
282 # Returning the oops says something went wrong.
283 return self.error_utility.raising(info)
284213
285 def _doOops(self, job, info):214 def _doOops(self, job, info):
286 """Report an OOPS for the provided job and info.215 """Report an OOPS for the provided job and info.
@@ -291,6 +220,7 @@
291 """220 """
292 oops = self.error_utility.raising(info)221 oops = self.error_utility.raising(info)
293 job.notifyOops(oops)222 job.notifyOops(oops)
223 self._logOopsId(oops['id'])
294 return oops224 return oops
295225
296 def _logOopsId(self, oops_id):226 def _logOopsId(self, oops_id):
@@ -331,9 +261,7 @@
331 continue261 continue
332 # Commit transaction to clear the row lock.262 # Commit transaction to clear the row lock.
333 transaction.commit()263 transaction.commit()
334 oops = self.runJobHandleError(job)264 self.runJobHandleError(job)
335 if oops is not None:
336 self._logOopsId(oops['id'])
337265
338266
339class RunJobCommand(amp.Command):267class RunJobCommand(amp.Command):
340268
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2012-01-01 02:58:52 +0000
+++ lib/lp/services/job/tests/test_runner.py 2012-03-28 11:45:27 +0000
@@ -8,6 +8,7 @@
8from textwrap import dedent8from textwrap import dedent
9from time import sleep9from time import sleep
1010
11from lazr.jobrunner.jobrunner import SuspendJobException
11from testtools.matchers import MatchesRegex12from testtools.matchers import MatchesRegex
12from testtools.testcase import ExpectedException13from testtools.testcase import ExpectedException
13import transaction14import transaction
@@ -19,7 +20,6 @@
19 IRunnableJob,20 IRunnableJob,
20 JobStatus,21 JobStatus,
21 LeaseHeld,22 LeaseHeld,
22 SuspendJobException,
23 )23 )
24from lp.services.job.model.job import Job24from lp.services.job.model.job import Job
25from lp.services.job.runner import (25from lp.services.job.runner import (
@@ -411,8 +411,8 @@
411 self.job = Job()411 self.job = Job()
412412
413 def __repr__(self):413 def __repr__(self):
414 return '<StuckJob(%r, lease_length=%s, delay=%s)>' % (414 return '<%s(%r, lease_length=%s, delay=%s)>' % (
415 self.id, self.lease_length, self.delay)415 self.__class__.__name__, self.id, self.lease_length, self.delay)
416416
417 def acquireLease(self):417 def acquireLease(self):
418 return self.job.acquireLease(self.lease_length)418 return self.job.acquireLease(self.lease_length)
@@ -550,15 +550,14 @@
550 (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))550 (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
551 self.oops_capture.sync()551 self.oops_capture.sync()
552 oops = self.oopses[0]552 oops = self.oopses[0]
553 self.assertEqual(553 expected_exception = ('TimeoutError', 'Job ran too long.')
554 ('TimeoutError', 'Job ran too long.'),554 self.assertEqual(expected_exception, (oops['type'], oops['value']))
555 (oops['type'], oops['value']))
556 self.assertThat(logger.getLogBuffer(), MatchesRegex(555 self.assertThat(logger.getLogBuffer(), MatchesRegex(
557 dedent("""\556 dedent("""\
558 INFO Running through Twisted.557 INFO Running through Twisted.
559 INFO Running StuckJob \(ID .*\).558 INFO Running <StuckJob.*?> \(ID .*?\).
560 INFO Running StuckJob \(ID .*\).559 INFO Running <StuckJob.*?> \(ID .*?\).
561 INFO Job resulted in OOPS: .*560 INFO Job resulted in OOPS: .*
562 """)))561 """)))
563562
564 def test_timeout_short(self):563 def test_timeout_short(self):
@@ -582,8 +581,8 @@
582 logger.getLogBuffer(), MatchesRegex(581 logger.getLogBuffer(), MatchesRegex(
583 dedent("""\582 dedent("""\
584 INFO Running through Twisted.583 INFO Running through Twisted.
585 INFO Running ShorterStuckJob \(ID .*\).584 INFO Running <ShorterStuckJob.*?> \(ID .*?\).
586 INFO Running ShorterStuckJob \(ID .*\).585 INFO Running <ShorterStuckJob.*?> \(ID .*?\).
587 INFO Job resulted in OOPS: %s586 INFO Job resulted in OOPS: %s
588 """) % oops['id']))587 """) % oops['id']))
589 self.assertEqual(('TimeoutError', 'Job ran too long.'),588 self.assertEqual(('TimeoutError', 'Job ran too long.'),
590589
=== modified file 'lib/lp/soyuz/model/packagecopyjob.py'
--- lib/lp/soyuz/model/packagecopyjob.py 2012-02-24 03:47:44 +0000
+++ lib/lp/soyuz/model/packagecopyjob.py 2012-03-28 11:45:27 +0000
@@ -11,6 +11,7 @@
11import logging11import logging
1212
13from lazr.delegates import delegates13from lazr.delegates import delegates
14from lazr.jobrunner.jobrunner import SuspendJobException
14from storm.locals import (15from storm.locals import (
15 And,16 And,
16 Int,17 Int,
@@ -49,7 +50,6 @@
49from lp.services.database.stormbase import StormBase50from lp.services.database.stormbase import StormBase
50from lp.services.job.interfaces.job import (51from lp.services.job.interfaces.job import (
51 JobStatus,52 JobStatus,
52 SuspendJobException,
53 )53 )
54from lp.services.job.model.job import Job54from lp.services.job.model.job import Job
55from lp.services.job.runner import BaseRunnableJob55from lp.services.job.runner import BaseRunnableJob
5656
=== modified file 'lib/lp/soyuz/tests/test_packagecopyjob.py'
--- lib/lp/soyuz/tests/test_packagecopyjob.py 2012-02-01 11:31:20 +0000
+++ lib/lp/soyuz/tests/test_packagecopyjob.py 2012-03-28 11:45:27 +0000
@@ -6,6 +6,7 @@
6import operator6import operator
7from textwrap import dedent7from textwrap import dedent
88
9from lazr.jobrunner.jobrunner import SuspendJobException
9from storm.store import Store10from storm.store import Store
10from testtools.content import text_content11from testtools.content import text_content
11from testtools.matchers import MatchesStructure12from testtools.matchers import MatchesStructure
@@ -25,7 +26,6 @@
25from lp.services.features.testing import FeatureFixture26from lp.services.features.testing import FeatureFixture
26from lp.services.job.interfaces.job import (27from lp.services.job.interfaces.job import (
27 JobStatus,28 JobStatus,
28 SuspendJobException,
29 )29 )
30from lp.services.webapp.testing import verifyObject30from lp.services.webapp.testing import verifyObject
31from lp.soyuz.adapters.overrides import SourceOverride31from lp.soyuz.adapters.overrides import SourceOverride
3232
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2012-03-26 14:23:39 +0000
+++ lib/lp/testing/factory.py 2012-03-28 11:45:27 +0000
@@ -48,6 +48,7 @@
48from bzrlib.merge_directive import MergeDirective248from bzrlib.merge_directive import MergeDirective2
49from bzrlib.plugins.builder.recipe import BaseRecipeBranch49from bzrlib.plugins.builder.recipe import BaseRecipeBranch
50from bzrlib.revision import Revision as BzrRevision50from bzrlib.revision import Revision as BzrRevision
51from lazr.jobrunner.jobrunner import SuspendJobException
51import pytz52import pytz
52from pytz import UTC53from pytz import UTC
53import simplejson54import simplejson
@@ -233,7 +234,6 @@
233 IEmailAddressSet,234 IEmailAddressSet,
234 )235 )
235from lp.services.identity.model.account import Account236from lp.services.identity.model.account import Account
236from lp.services.job.interfaces.job import SuspendJobException
237from lp.services.librarian.interfaces import ILibraryFileAliasSet237from lp.services.librarian.interfaces import ILibraryFileAliasSet
238from lp.services.mail.signedmessage import SignedMessage238from lp.services.mail.signedmessage import SignedMessage
239from lp.services.messages.model.message import (239from lp.services.messages.model.message import (
240240
=== modified file 'lib/lp/translations/scripts/tests/test_packaging_translations.py'
--- lib/lp/translations/scripts/tests/test_packaging_translations.py 2012-01-01 02:58:52 +0000
+++ lib/lp/translations/scripts/tests/test_packaging_translations.py 2012-03-28 11:45:27 +0000
@@ -33,10 +33,10 @@
33 matcher = MatchesRegex(dedent("""\33 matcher = MatchesRegex(dedent("""\
34 INFO Creating lockfile: /var/lock/launchpad-jobcronscript.lock34 INFO Creating lockfile: /var/lock/launchpad-jobcronscript.lock
35 INFO Running synchronously.35 INFO Running synchronously.
36 INFO Running TranslationMergeJob \(ID .*\) in status Waiting36 INFO Running <.*?TranslationMergeJob.*?> \(ID .*\) in status Waiting
37 INFO Merging .* and .* in Ubuntu Distroseries.*37 INFO Merging .* and .* in Ubuntu Distroseries.*
38 INFO Deleted POTMsgSets: 1. TranslationMessages: 1.38 INFO Deleted POTMsgSets: 1. TranslationMessages: 1.
39 INFO Running TranslationSplitJob \(ID .*\) in status Waiting39 INFO Running <.*?TranslationSplitJob.*?> \(ID .*\) in status Waiting
40 INFO Splitting .* and .* in Ubuntu Distroseries.*40 INFO Splitting .* and .* in Ubuntu Distroseries.*
41 INFO 1 entries split.41 INFO 1 entries split.
42 INFO Ran 1 TranslationMergeJob jobs.42 INFO Ran 1 TranslationMergeJob jobs.
4343
=== modified file 'setup.py'
--- setup.py 2012-03-09 04:33:37 +0000
+++ setup.py 2012-03-28 11:45:27 +0000
@@ -47,6 +47,7 @@
47 'lazr.enum',47 'lazr.enum',
48 'lazr.lifecycle',48 'lazr.lifecycle',
49 'lazr.restful',49 'lazr.restful',
50 'lazr.jobrunner',
50 'lazr.smtptest',51 'lazr.smtptest',
51 'lazr.testing',52 'lazr.testing',
52 'lazr.uri',53 'lazr.uri',
5354
=== modified file 'versions.cfg'
--- versions.cfg 2012-03-26 22:45:24 +0000
+++ versions.cfg 2012-03-28 11:45:27 +0000
@@ -5,11 +5,13 @@
5# Alphabetical, case-insensitive, please! :-)5# Alphabetical, case-insensitive, please! :-)
66
7ampoule = 0.2.07ampoule = 0.2.0
8amqplib = 0.6.18amqplib = 1.0.2
9anyjson = 0.3.1
9argparse = 1.2.110argparse = 1.2.1
10BeautifulSoup = 3.1.0.111BeautifulSoup = 3.1.0.1
11bson = 0.3.212bson = 0.3.2
12bzr = 2.5.0dev2-r615213bzr = 2.5.0dev2-r6152
14celery = 2.5.1
13Chameleon = 2.4.015Chameleon = 2.4.0
14ClientForm = 0.2.1016ClientForm = 0.2.10
15cssutils = 0.9.717cssutils = 0.9.7
@@ -26,11 +28,13 @@
26grokcore.component = 1.628grokcore.component = 1.6
27html5browser = 0.0.929html5browser = 0.0.9
28httplib2 = 0.6.030httplib2 = 0.6.0
31importlib = 1.0.2
29ipython = 0.9.132ipython = 0.9.1
30iso8601 = 0.1.433iso8601 = 0.1.4
31jsautobuild = 0.234jsautobuild = 0.2
32Jinja2 = 2.235Jinja2 = 2.2
33keyring = 0.6.236keyring = 0.6.2
37kombu = 2.1.1
34launchpadlib = 1.9.1238launchpadlib = 1.9.12
35lazr.amqp = 0.139lazr.amqp = 0.1
36lazr.authentication = 0.1.140lazr.authentication = 0.1.1
@@ -38,6 +42,7 @@
38lazr.config = 1.1.342lazr.config = 1.1.3
39lazr.delegates = 1.2.043lazr.delegates = 1.2.0
40lazr.enum = 1.1.344lazr.enum = 1.1.3
45lazr.jobrunner = 0.2
41lazr.lifecycle = 1.146lazr.lifecycle = 1.1
42lazr.restful = 0.19.647lazr.restful = 0.19.6
43lazr.restfulclient = 0.12.048lazr.restfulclient = 0.12.0
@@ -78,6 +83,7 @@
78pymongo = 2.1.183pymongo = 2.1.1
79pyOpenSSL = 0.1084pyOpenSSL = 0.10
80pystache = 0.3.185pystache = 0.3.1
86python-dateutil = 1.5
81python-memcached = 1.4887python-memcached = 1.48
82# 2.2.1 with the one-liner Expect: 100-continue fix from88# 2.2.1 with the one-liner Expect: 100-continue fix from
83# lp:~wgrant/python-openid/python-openid-2.2.1-fix676372.89# lp:~wgrant/python-openid/python-openid-2.2.1-fix676372.