Merge lp:~abentley/launchpad/re-roll-r15692 into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 15704
Proposed branch: lp:~abentley/launchpad/re-roll-r15692
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/enable-tests
Diff against target: 86 lines (+23/-21)
1 file modified
lib/lp/services/job/tests/test_celeryjob.py (+23/-21)
To merge this branch: bzr merge lp:~abentley/launchpad/re-roll-r15692
Reviewer Review Type Date Requested Status
Abel Deuring (community) code Approve
Review via email: mp+117070@code.launchpad.net

Commit message

Fix waiting for queue length.

Description of the change

= Summary =
Fix spuriously-failing test.

== Proposed fix ==
Change != to ==.

== Pre-implementation notes ==
None

== LOC Rationale ==
I have a LOC credit of 1928

== Implementation details ==
wait_for_queue's logic was inverted. It was waiting when it did not need to, and not waiting when it needed to. But since it didn't indicate when waiting had timed out, this was not apparent in local testing.

This branch fixes the logic, and also causes it to fail when waiting times out, to prevent similar future errors. So it becomes assertQueueSize.

== Tests ==
bin/test test_celeryjob

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_celeryjob.py

To post a comment you must log in.
Revision history for this message
Abel Deuring (adeuring) wrote :

thanks for spotting the "inverted logic"

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
2--- lib/lp/services/job/tests/test_celeryjob.py 2012-07-27 14:38:25 +0000
3+++ lib/lp/services/job/tests/test_celeryjob.py 2012-07-27 14:38:25 +0000
4@@ -38,16 +38,34 @@
5 self.addCleanup(drain_celery_queues)
6 return job
7
8+ def assertQueueSize(self, app, queues, expected_len):
9+ """Assert the message queue (eventually) reaches the specified size.
10+
11+ This can be used to avoid race conditions with RabbitMQ's message
12+ delivery.
13+ """
14+ from lazr.jobrunner.celerytask import list_queued
15+ for x in range(100):
16+ actual_len = len(list_queued(app, queues))
17+ if actual_len == expected_len:
18+ return
19+ sleep(0.1)
20+ self.fail('Queue size did not reach %d; still at %d' %
21+ (expected_len, actual_len))
22+
23 def test_find_missing_ready(self):
24 """A job which is ready but not queued is "missing"."""
25 job = self.createMissingJob()
26- wait_for_queue(self.CeleryRunJob.app, [BranchScanJob.task_queue], 0)
27+ self.assertQueueSize(self.CeleryRunJob.app,
28+ [BranchScanJob.task_queue], 0)
29 self.assertEqual([job], self.find_missing_ready(BranchScanJob))
30 job.runViaCelery()
31- wait_for_queue(self.CeleryRunJob.app, [BranchScanJob.task_queue], 1)
32+ self.assertQueueSize(self.CeleryRunJob.app,
33+ [BranchScanJob.task_queue], 1)
34 self.assertEqual([], self.find_missing_ready(BranchScanJob))
35 drain_celery_queues()
36- wait_for_queue(self.CeleryRunJob.app, [BranchScanJob.task_queue], 0)
37+ self.assertQueueSize(self.CeleryRunJob.app,
38+ [BranchScanJob.task_queue], 0)
39 self.assertEqual([job], self.find_missing_ready(BranchScanJob))
40
41 def test_run_missing_ready_not_enabled(self):
42@@ -73,7 +91,6 @@
43 def test_run_missing_ready_does_not_return_results(self):
44 """The celerybeat task run_missing_ready does not create a
45 result queue."""
46- from lazr.jobrunner.celerytask import list_queued
47 from lp.services.job.tests.celery_helpers import noop
48 job_queue_name = 'celerybeat'
49 request = self.RunMissingReady().apply_async(
50@@ -85,9 +102,7 @@
51 # This would also happen when "with celeryd()" would do nothing.
52 # So let's be sure that a task is queued...
53 # Give the system some time to deliver the message
54- wait_for_queue(self.RunMissingReady.app, [job_queue_name], 1)
55- self.assertEqual(
56- 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
57+ self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 1)
58 # Wait at most 60 seconds for celeryd to start and process
59 # the task.
60 with celeryd(job_queue_name):
61@@ -95,8 +110,7 @@
62 # RunMissingReady has finished.
63 noop.apply_async(queue=job_queue_name).wait(60)
64 # But now the message has been consumed by celeryd.
65- self.assertEqual(
66- 0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
67+ self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 0)
68 # No result queue was created for the task.
69 try:
70 real_stdout = sys.stdout
71@@ -122,15 +136,3 @@
72 "Unexpected output from clear_queues:\n"
73 "stdout: %r\n"
74 "stderr: %r" % (fake_stdout, fake_stderr))
75-
76-
77-def wait_for_queue(app, queues, count):
78- """Wait until there are a specified number of items in the queue.
79-
80- This can be used to avoid race conditions with RabbitMQ's message delivery.
81- """
82- from lazr.jobrunner.celerytask import list_queued
83- for x in range(100):
84- if len(list_queued(app, queues)) != count:
85- break
86- sleep(0.1)