Merge lp:~abentley/launchpad/re-roll-r15692 into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 15704
Proposed branch: lp:~abentley/launchpad/re-roll-r15692
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/enable-tests
Diff against target: 86 lines (+23/-21)
1 file modified
lib/lp/services/job/tests/test_celeryjob.py (+23/-21)
To merge this branch: bzr merge lp:~abentley/launchpad/re-roll-r15692
Reviewer Review Type Date Requested Status
Abel Deuring (community) code Approve
Review via email: mp+117070@code.launchpad.net

Commit message

Fix waiting for queue length.

Description of the change

= Summary =
Fix spuriously-failing test.

== Proposed fix ==
Change != to ==.

== Pre-implementation notes ==
None

== LOC Rationale ==
I have a LOC credit of 1928

== Implementation details ==
wait_for_queue's logic was inverted. It was waiting when it did not need to, and not waiting when it needed to. But since it didn't indicate when waiting had timed out, this was not apparent in local testing.

This branch fixes the logic, and also causes it to fail when waiting times out, to prevent similar future errors. So it becomes assertQueueSize.

== Tests ==
bin/test test_celeryjob

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_celeryjob.py

To post a comment you must log in.
Revision history for this message
Abel Deuring (adeuring) wrote :

thanks for spotting the "inverted logic"

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 2012-07-27 14:38:25 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 2012-07-27 14:38:25 +0000
@@ -38,16 +38,34 @@
38 self.addCleanup(drain_celery_queues)38 self.addCleanup(drain_celery_queues)
39 return job39 return job
4040
41 def assertQueueSize(self, app, queues, expected_len):
42 """Assert the message queue (eventually) reaches the specified size.
43
44 This can be used to avoid race conditions with RabbitMQ's message
45 delivery.
46 """
47 from lazr.jobrunner.celerytask import list_queued
48 for x in range(100):
49 actual_len = len(list_queued(app, queues))
50 if actual_len == expected_len:
51 return
52 sleep(0.1)
53 self.fail('Queue size did not reach %d; still at %d' %
54 (expected_len, actual_len))
55
41 def test_find_missing_ready(self):56 def test_find_missing_ready(self):
42 """A job which is ready but not queued is "missing"."""57 """A job which is ready but not queued is "missing"."""
43 job = self.createMissingJob()58 job = self.createMissingJob()
44 wait_for_queue(self.CeleryRunJob.app, [BranchScanJob.task_queue], 0)59 self.assertQueueSize(self.CeleryRunJob.app,
60 [BranchScanJob.task_queue], 0)
45 self.assertEqual([job], self.find_missing_ready(BranchScanJob))61 self.assertEqual([job], self.find_missing_ready(BranchScanJob))
46 job.runViaCelery()62 job.runViaCelery()
47 wait_for_queue(self.CeleryRunJob.app, [BranchScanJob.task_queue], 1)63 self.assertQueueSize(self.CeleryRunJob.app,
64 [BranchScanJob.task_queue], 1)
48 self.assertEqual([], self.find_missing_ready(BranchScanJob))65 self.assertEqual([], self.find_missing_ready(BranchScanJob))
49 drain_celery_queues()66 drain_celery_queues()
50 wait_for_queue(self.CeleryRunJob.app, [BranchScanJob.task_queue], 0)67 self.assertQueueSize(self.CeleryRunJob.app,
68 [BranchScanJob.task_queue], 0)
51 self.assertEqual([job], self.find_missing_ready(BranchScanJob))69 self.assertEqual([job], self.find_missing_ready(BranchScanJob))
5270
53 def test_run_missing_ready_not_enabled(self):71 def test_run_missing_ready_not_enabled(self):
@@ -73,7 +91,6 @@
73 def test_run_missing_ready_does_not_return_results(self):91 def test_run_missing_ready_does_not_return_results(self):
74 """The celerybeat task run_missing_ready does not create a92 """The celerybeat task run_missing_ready does not create a
75 result queue."""93 result queue."""
76 from lazr.jobrunner.celerytask import list_queued
77 from lp.services.job.tests.celery_helpers import noop94 from lp.services.job.tests.celery_helpers import noop
78 job_queue_name = 'celerybeat'95 job_queue_name = 'celerybeat'
79 request = self.RunMissingReady().apply_async(96 request = self.RunMissingReady().apply_async(
@@ -85,9 +102,7 @@
85 # This would also happen when "with celeryd()" would do nothing.102 # This would also happen when "with celeryd()" would do nothing.
86 # So let's be sure that a task is queued...103 # So let's be sure that a task is queued...
87 # Give the system some time to deliver the message104 # Give the system some time to deliver the message
88 wait_for_queue(self.RunMissingReady.app, [job_queue_name], 1)105 self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 1)
89 self.assertEqual(
90 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
91 # Wait at most 60 seconds for celeryd to start and process106 # Wait at most 60 seconds for celeryd to start and process
92 # the task.107 # the task.
93 with celeryd(job_queue_name):108 with celeryd(job_queue_name):
@@ -95,8 +110,7 @@
95 # RunMissingReady has finished.110 # RunMissingReady has finished.
96 noop.apply_async(queue=job_queue_name).wait(60)111 noop.apply_async(queue=job_queue_name).wait(60)
97 # But now the message has been consumed by celeryd.112 # But now the message has been consumed by celeryd.
98 self.assertEqual(113 self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 0)
99 0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
100 # No result queue was created for the task.114 # No result queue was created for the task.
101 try:115 try:
102 real_stdout = sys.stdout116 real_stdout = sys.stdout
@@ -122,15 +136,3 @@
122 "Unexpected output from clear_queues:\n"136 "Unexpected output from clear_queues:\n"
123 "stdout: %r\n"137 "stdout: %r\n"
124 "stderr: %r" % (fake_stdout, fake_stderr))138 "stderr: %r" % (fake_stdout, fake_stderr))
125
126
127def wait_for_queue(app, queues, count):
128 """Wait until there are a specified number of items in the queue.
129
130 This can be used to avoid race conditions with RabbitMQ's message delivery.
131 """
132 from lazr.jobrunner.celerytask import list_queued
133 for x in range(100):
134 if len(list_queued(app, queues)) != count:
135 break
136 sleep(0.1)