Merge ~cjwatson/launchpad:extract-job-state-flush into launchpad:master

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: 3e0bb9e55d00407d2dd1498ed1637f077ad5614f
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~cjwatson/launchpad:extract-job-state-flush
Merge into: launchpad:master
Diff against target: 116 lines (+24/-18)
2 files modified
lib/lp/services/job/runner.py (+6/-0)
lib/lp/services/job/tests/test_celery.py (+18/-18)
Reviewer Review Type Date Requested Status
Ines Almeida Approve
Review via email: mp+449126@code.launchpad.net

Commit message

Flush store before extracting job state

Description of the change

Since converting `Job` to Storm in c69d6205ab, the store is no longer automatically flushed when the `id` column is fetched, so we have to be a little more careful to ensure that it's flushed at appropriate times. Some Celery tasks are currently showing up with their job IDs as None, indicating that this isn't always being done properly. This appears to be because the before-commit hook that extracts the job state runs before the Storm hook that flushes the store at the start of a commit, so if the statement that created the `Job` row hadn't yet been flushed for some other reason then we might not know the job ID yet.

The test suite evaded this problem because Celery-based job tests typically use `FeatureFixture`, which causes `BaseRunnableJob.celeryRunOnCommit` to make another database query after creating the job, thus causing an implicit flush.

Flushing the store before extracting the job state should be a complete fix for this class of problem.

To post a comment you must log in.
Revision history for this message
Ines Almeida (ines-almeida) wrote :

Makes sense

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
2index 9b8f1cf..f66d5eb 100644
3--- a/lib/lp/services/job/runner.py
4+++ b/lib/lp/services/job/runner.py
5@@ -41,6 +41,7 @@ from zope.security.proxy import removeSecurityProxy
6
7 from lp.services import scripts
8 from lp.services.config import config, dbconfig
9+from lp.services.database.interfaces import IStore
10 from lp.services.database.policy import DatabaseBlockedPolicy
11 from lp.services.features import getFeatureFlag
12 from lp.services.job.interfaces.job import IJob, IRunnableJob
13@@ -266,6 +267,11 @@ class BaseRunnableJob(BaseRunnableJobSource):
14
15 def extractJobState(self):
16 """Hook function to call before starting a commit."""
17+ # Before-commit hooks are called before the hook in
18+ # storm.zope.zstorm.StoreDataManager.tpc_begin that flushes the
19+ # store, so we have to flush the store here because we might
20+ # otherwise not know the job ID yet.
21+ IStore(self.job).flush()
22 self.job_state = JobState(self)
23
24 def celeryCommitHook(self, succeeded):
25diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
26index 70bf2b6..970f9f5 100644
27--- a/lib/lp/services/job/tests/test_celery.py
28+++ b/lib/lp/services/job/tests/test_celery.py
29@@ -26,7 +26,7 @@ from lp.services.database.interfaces import IStore
30 from lp.services.features.testing import FeatureFixture
31 from lp.services.job.interfaces.job import IJob, IRunnableJob, JobStatus
32 from lp.services.job.model.job import Job
33-from lp.services.job.runner import BaseRunnableJob
34+from lp.services.job.runner import BaseRunnableJob, celery_enabled
35 from lp.services.job.tests import block_on_job, monitor_celery
36 from lp.testing import TestCaseWithFactory
37 from lp.testing.layers import CeleryJobLayer
38@@ -45,7 +45,6 @@ class TestJob(BaseRunnableJob):
39 self.job = store.find(Job, id=job_id)[0]
40 else:
41 self.job = Job(max_retries=2, scheduled_start=scheduled_start)
42- IStore(Job).flush()
43
44 def run(self):
45 pass
46@@ -100,16 +99,25 @@ class TestJobsViaCelery(TestCaseWithFactory):
47
48 layer = CeleryJobLayer
49
50- def test_TestJob(self):
51- # TestJob can be run via Celery.
52+ def enableCeleryClass(self, job_class_name):
53+ """Enable running jobs with a given class name via Celery."""
54 self.useFixture(
55- FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
56+ FeatureFixture({"jobs.celery.enabled_classes": job_class_name})
57 )
58+ # Prime the feature flag cache so that
59+ # BaseRunnableJob.celeryRunOnCommit doesn't make a database query.
60+ # This lets us more carefully test the flush behaviour in
61+ # BaseRunnableJob.extractJobState.
62+ self.assertTrue(celery_enabled(job_class_name))
63+
64+ def test_TestJob(self):
65+ # TestJob can be run via Celery.
66+ self.enableCeleryClass("TestJob")
67 with block_on_job(self):
68 job = TestJob()
69 job.celeryRunOnCommit()
70- job_id = job.job_id
71 transaction.commit()
72+ job_id = job.job_id
73 store = IStore(Job)
74 dbjob = store.find(Job, id=job_id)[0]
75 self.assertEqual(JobStatus.COMPLETED, dbjob.status)
76@@ -119,9 +127,7 @@ class TestJobsViaCelery(TestCaseWithFactory):
77 # in 10 seconds, and one at any time. Wait up to a minute and
78 # ensure that the correct three have completed, and that they
79 # completed in the expected order.
80- self.useFixture(
81- FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
82- )
83+ self.enableCeleryClass("TestJob")
84 now = datetime.now(timezone.utc)
85 job_past = TestJob(scheduled_start=now - timedelta(seconds=60))
86 job_past.celeryRunOnCommit()
87@@ -158,11 +164,7 @@ class TestJobsViaCelery(TestCaseWithFactory):
88 def test_jobs_with_retry_exceptions_are_queued_again(self):
89 # A job that raises a retry error is automatically queued
90 # and executed again.
91- self.useFixture(
92- FeatureFixture(
93- {"jobs.celery.enabled_classes": "TestJobWithRetryError"}
94- )
95- )
96+ self.enableCeleryClass("TestJobWithRetryError")
97
98 # Set scheduled_start on the job to ensure that retry delays
99 # override it.
100@@ -214,14 +216,12 @@ class TestJobsViaCelery(TestCaseWithFactory):
101 def test_without_rabbitmq(self):
102 # If no RabbitMQ broker is configured, the job is not run via Celery.
103 self.pushConfig("rabbitmq", broker_urls="none")
104- self.useFixture(
105- FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
106- )
107+ self.enableCeleryClass("TestJob")
108 with monitor_celery() as responses:
109 job = TestJob()
110 job.celeryRunOnCommit()
111- job_id = job.job_id
112 transaction.commit()
113+ job_id = job.job_id
114 self.assertEqual([], responses)
115 store = IStore(Job)
116 dbjob = store.find(Job, id=job_id)[0]

Subscribers

People subscribed via source and target branches

to status/vote changes: