Merge lp:~vila/udd/795321-make-tea into lp:udd

Proposed by Vincent Ladeuil
Status: Merged
Merged at revision: 513
Proposed branch: lp:~vila/udd/795321-make-tea
Merge into: lp:udd
Diff against target: 587 lines (+397/-17)
7 files modified
mass_import.py (+107/-13)
selftest.py (+1/-0)
udd/circuit_breaker.dot (+23/-0)
udd/circuit_breaker.py (+109/-0)
udd/icommon.py (+25/-4)
udd/tests/test_circuit_breaker.py (+129/-0)
udd/threads.py (+3/-0)
To merge this branch: bzr merge lp:~vila/udd/795321-make-tea
Reviewer Review Type Date Requested Status
James Westby Approve
Review via email: mp+76058@code.launchpad.net

Description of the change

This implements a circuit breaker to track launchpad down times and avoid
the associated spurious failures.

Strictly speaking a circuit breaker protect the usage of a resource by
tracking the attempts, successes and failures of these usages. When a
failure occurs, it blocks the attempts for some grace period and then wait
for a success to closed the breaker again.

See the udd/circuit_breaker.py and its associated tests for this part.

The pattern roughly apply to launchpad with some tweaks: launchpad is used
in many different ways and not all failures scenarios are currently known
(in the importer code base). Additionnally, several imports are running in
parallel querying launchpad in different ways and producing success and
failures than can be seen by the circuit breaker in unexpected orders.

I chose to implement a variant that relies on the existing known transient
failures: the ones recorded in the status database in the 'should_retry'
table and just pause for a while without starting new imports as soon as a
launchpad failure is detected. After the grace period, the *same* imports
are retried and should provide a good test that launchpad is back.

The net effect will be to *limit* the number of spurious failures.

The nice property of this table is that we can add more scenarios as we see
them appear with:

  requeue_package.py --auto <package>

This means we can complete our knowledge as we discover new failures.

I made only rough tests locally... but couldn't find an easy way to simulate
a local launchpad, even less a transiently down one :-/

But given that I've witnessed a storm of transient failures last Friday and
one Today, I expect more to come.

Note that I added the failure for today's storm (~600 failures) with:

  requeue --auto --all-of-type libgnomeui

(requeue is provided after sourcing fixit.sh and is really
requeue_package.py) but the package importer still have 165 outstanding jobs
as I write this (i.e. the time needed to recover from a lp downtime if far
longer than the down time itself).

Note that since all these failures were for the same traceback, it's highly
likely that this is the first (or one of the first) query made against
launchpad during an import, so it *is* a good candidate to retry imports.

I.e. even if all in-fly imports fail for an unknown reason (or one we
haven't correctly identified), the next started one will likely fail with a
*known* transient one and stop the storm.

This seems to be a good first step to cope with the actual code base without
requiring a full analysis of the all the failure modes :)

If this requires too much manual tweak and lead to too much false positives,
we can focus on finding a better way to identify launchpad transient
failures, but the circuit breaker will still be there, should be easy to
change and above all, should avoid the huge number of failures.

To post a comment you must log in.
Revision history for this message
James Westby (james-w) wrote :

Hi Vincent,

Thanks for working on this, it looks great, and I'm looking forward
to seeing it in production!

77 + """Launcpad circuit breaker.

Missing "h" in Launchpad.

131 + # We want to retry asap ('priority')
132 + self.status_db.retry(package_name, priority=True)

Maybe try with the same priority as before?

I don't think it's that important though, given that there
will only be a few jobs in that state though.

303 + self.state = 'open'

I prefer class "constants" for that sort of thing, so that
typos lead to obvious runtime errors, rather than errant
behaviour.

Thanks,

James

review: Approve
lp:~vila/udd/795321-make-tea updated
517. By Vincent Ladeuil

Address james review comments and add a '.dot' for the circuit breaker state automaton.

Revision history for this message
Vincent Ladeuil (vila) wrote :
Download full text (3.4 KiB)

>>>>> James Westby <email address hidden> writes:

    > Review: Approve
    > Hi Vincent,

    > Thanks for working on this,

Thanks for the speedy review !

    > it looks great, and I'm looking forward to seeing it in
    > production!

Me too :)

    > 77 + """Launcpad circuit breaker.

    > Missing "h" in Launchpad.

    > 131 + # We want to retry asap ('priority')
    > 132 + self.status_db.retry(package_name, priority=True)

    > Maybe try with the same priority as before?

    > I don't think it's that important though, given that there
    > will only be a few jobs in that state though.

It's a delicate matter and I may change my mind, but the underlying idea
is that if an import is qualified as failing due to lp, it makes it the
ideal candidate to check that lp is back online.

As such, I want this import to be tried *before* any other import as the
others have yet to prove that they qualify as better candidates.

Also, one should keep in mind that *several* imports are always in fly
and all of them can open/close the breaker so the
LaunchpadCircuitBreaker try to cope with false positives by essentially
retrying the imports that enhance the S/N ratio: imports may succeed
when lp is down and fail when it's up so whatever is in fly, we should
retry imports that helps deciding whether lp is up or not.

success when down:
==================

It's a bit unlikely but conceivable that an import manage to *not*
query lp during the down time. Fair enough (and one reason why I didn't
even try to stop the imports still running as soon as lp is seen down).

Such an unexpected success will wrongly close the breaker, but the
breaker will open at the next failure, it's a glitch but we cope with it
because we will open it again at the next failure (and we know we'll
have failures if lp is down).

failure when up:
================

As far as the circuit breaker is concerned, what matters is whether the
failure is related to lp, if it is not, the LaunchpadCircuitBreaker will
ignore it.

This is really where the failure qualification matters, if a failure is
related to lp but not marked as such, the circuit breaker is just
ineffective and we'll enter into a failure "storm".

summary:
========

Finding the Right failures is the real key. Potentially, any lp query
(in the largest interpretation) is Right. marking them all as such is
HARD.

Fortunately, there is an easy way to find Right failures: every time we
see a failure storm, we know it's caused by the *first* lp query done by
an import (which is also the reason we get storms: being the first query
it happens early so the import fails quickly and *another* import is
queued, also failing quickly, etc). Therefore, it's sufficient to mark
this query as transient to make it a Right failure.

We will need discover other Right failures (all over the place in the
import code path) but they are far less relevant because they won't
trigger during storms (i.e. when we retry an import that has failed this
way, the retry will fail on the *first* lp query, not because of this
other Right failure).

    > 303 + self.state = 'open'

    > I prefer class "constants" for that sort of thing, so that
    > ...

Read more...

Revision history for this message
James Westby (james-w) wrote :

On Wed, 21 Sep 2011 12:59:52 -0000, Vincent Ladeuil <email address hidden> wrote:
> It's a delicate matter and I may change my mind, but the underlying idea
> is that if an import is qualified as failing due to lp, it makes it the
> ideal candidate to check that lp is back online.

I think that's a good point, and as long as the number of failures is
small it should work well.

I wouldn't want to see 2000 low priority jobs done before high priority
ones to test that LP is back and working, but given that this pattern
should limit that number it should work fine.

Thanks,

James

Revision history for this message
Vincent Ladeuil (vila) wrote :

>>>>> James Westby <email address hidden> writes:

    > On Wed, 21 Sep 2011 12:59:52 -0000, Vincent Ladeuil <email address hidden> wrote:
    >> It's a delicate matter and I may change my mind, but the underlying idea
    >> is that if an import is qualified as failing due to lp, it makes it the
    >> ideal candidate to check that lp is back online.

    > I think that's a good point, and as long as the number of failures is
    > small it should work well.

    > I wouldn't want to see 2000 low priority jobs done before high priority
    > ones to test that LP is back and working, but given that this pattern
    > should limit that number it should work fine.

Indeed, it's limited by the number of parallel imports and applied only
on the transient failures, so, either it's effective and it applies to a
limited set or it's not and it doesn't change any priority.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'mass_import.py'
2--- mass_import.py 2011-06-23 21:15:05 +0000
3+++ mass_import.py 2011-09-21 12:39:28 +0000
4@@ -15,10 +15,13 @@
5
6 from launchpadlib.errors import HTTPError
7
8-from udd import icommon
9-from udd import iconfig
10-from udd import paths
11-from udd import threads
12+from udd import (
13+ circuit_breaker,
14+ icommon,
15+ iconfig,
16+ paths,
17+ threads,
18+ )
19
20
21 class WatchedFileHandler(logging.FileHandler):
22@@ -130,6 +133,16 @@
23 class Importer(threads.SubprocessMonitor):
24
25 def __init__(self, args, package_name, job_id):
26+ """Spawn and monitor an import.
27+
28+ :param args: The script and args to be used to do the import (including
29+ the package name).
30+
31+ :param package_name: The name of the package to import (for display
32+ purposes).
33+
34+ :param job_id: The status db job id for this import.
35+ """
36 # FIXME: We should put the max_duration in a config file when it
37 # becomes available. Until then, during the wheezy import the longest
38 # import took more than 2 hours, so 24 is generous enough. On the other
39@@ -141,6 +154,8 @@
40 self.package_name = package_name
41 self.job_id = job_id
42 self.status_db = icommon.StatusDatabase(paths.sqlite_file)
43+ self.success = None
44+ self.failure_sig = None
45
46 def spawn(self):
47 logger.info("Trying %s" % self.package_name)
48@@ -155,8 +170,8 @@
49 output = self.out + self.err
50 unicode_output = output.decode("utf-8", "replace")
51 ascii_output = unicode_output.encode("ascii", "replace")
52- success = self.retcode == 0
53- if success:
54+ self.success = self.retcode == 0
55+ if self.success:
56 logger.info("Success %s: %s"
57 % (self.package_name,
58 ascii_output.replace("\n", " ")))
59@@ -164,8 +179,11 @@
60 logger.warning("Importing %s failed:\n%s" % (self.package_name,
61 ascii_output))
62 self.status_db.finish_job(
63- self.package_name, self.job_id, success,
64+ self.package_name, self.job_id, self.success,
65 unicode_output.encode("utf-8", "replace"))
66+ if not self.success:
67+ reason = self.status_db.failure_reason(self.package_name)
68+ self.failure_sig = self.status_db.failure_signature(reason)
69 logger.info("thread for %s finished" % self.package_name)
70
71
72@@ -220,6 +238,72 @@
73 logger.info("All packages requeued, start again")
74 self.tried = set()
75
76+# Constants for launchpad status
77+LP_UP = 1
78+LP_DOWN = 2
79+LP_COMING_BACK = 3
80+
81+
82+class LaunchpadCircuitBreaker(circuit_breaker.CircuitBreaker):
83+ """Launchpad circuit breaker.
84+
85+ Launchpad is down on a regular basis for deployments.
86+
87+ During the outage, many spurious failures may occur.
88+
89+ As soon as we see a failure known to be retried automatically, we open the
90+ breaker and immediately requeue the job.
91+
92+ We don't start new imports while the breaker is open (grace period for
93+ launchpad to recover, speficied with 'delay').
94+
95+ We then start imports again and wait for a first success to close the
96+ breaker or a new failure to start again.
97+ """
98+
99+ def __init__(self, threshold, delay):
100+ super(LaunchpadCircuitBreaker, self).__init__(threshold, delay)
101+ self.status_db = icommon.StatusDatabase(paths.sqlite_file)
102+ # For log purposes, not to be confused with the breaker state itself
103+ # which is authoritative.
104+ self.lp_state = LP_UP
105+
106+ def see_attempt(self):
107+ submitted = super(LaunchpadCircuitBreaker, self).see_attempt()
108+ if self.lp_state == LP_UP and not submitted:
109+ # First attempt after a failure, lp is down
110+ logger.info("Launchpad just went down")
111+ self.lp_state = LP_DOWN
112+ elif self.lp_state == LP_DOWN and submitted:
113+ # First attempt after the delay
114+ logger.info("Testing if Launchpad is back")
115+ self.lp_state = LP_COMING_BACK
116+ return submitted
117+
118+ def see_success(self):
119+ super(LaunchpadCircuitBreaker, self).see_attempt()
120+ if self.lp_state == LP_COMING_BACK:
121+ logger.info("Launchpad *is* back")
122+ self.lp_state = LP_UP
123+
124+ def see_import(self, imp):
125+ # Some imports succeeded, other failed. See if some failures were
126+ # transient (transient failures are most likely caused by lp
127+ # downtimes).
128+ package_name = imp.package_name
129+ if imp.success:
130+ self.see_success()
131+ else:
132+ # We can't blame launchpad downtimes for all failures, we rely on
133+ # the ones declared in RETRY_TABLE (created when
134+ # 'requeue_package.py --auto' is used)
135+ if self.status_db.known_auto_retry(imp.failure_sig) is not None:
136+ self.see_failure()
137+ # We want to retry asap ('priority')
138+ self.status_db.retry(package_name, priority=True)
139+ logger.info("Launchpad is down, re-trying %s"
140+ % (t.package_name,))
141+
142
143 class ImportDriver(threads.ThreadDriver):
144 """Monitor the ThreadedImporter.
145@@ -230,17 +314,22 @@
146
147 def __init__(self, max_threads):
148 super(ImportDriver, self).__init__(None, max_threads)
149+ # Launchpad downtimes should be less than 5 minutes (300 seconds).
150+ self.circuit_breaker = LaunchpadCircuitBreaker(0, 300)
151
152 def before_start(self):
153 super(ImportDriver, self).before_start()
154 self.queue = AllQueue()
155
156 def do_one_step(self):
157- try:
158- super(ImportDriver, self).do_one_step()
159- except Exception, e:
160- # The Driver must go on !
161- report_exception('Driver', sys.exc_info())
162+ # Don't start a new import while the breaker is open and waiting for
163+ # things to cool down.
164+ if self.circuit_breaker.see_attempt():
165+ try:
166+ super(ImportDriver, self).do_one_step()
167+ except Exception, e:
168+ # The Driver must go on !
169+ report_exception('Driver', sys.exc_info())
170
171 def before_stop(self):
172 try:
173@@ -256,12 +345,16 @@
174
175 def collect_terminated_threads(self):
176 before = len(self.threads)
177- super(ImportDriver, self).collect_terminated_threads()
178+ collected = super(ImportDriver, self).collect_terminated_threads()
179 after = len(self.threads)
180 if before != after:
181 # Only mention the running threads when we add or remove some
182 logger.info("threads for %r still active"
183 % [t.package_name for t in self.threads])
184+ for imp in collected:
185+ # Delegate to the circuit breaker to intrepret the import
186+ self.circuit_breaker.see_import(imp)
187+ return collected
188
189 def check_time_quota(self):
190 killed = super(ImportDriver, self).check_time_quota()
191@@ -270,6 +363,7 @@
192 % (t.package_name,
193 datetime.timedelta(seconds=t.max_duration)))
194
195+
196 class Stop(Exception):
197 pass
198
199
200=== modified file 'selftest.py'
201--- selftest.py 2011-06-15 07:26:45 +0000
202+++ selftest.py 2011-09-21 12:39:28 +0000
203@@ -42,6 +42,7 @@
204 suite.addTests(self.testLoader.loadTestsFromModuleNames([
205 'udd.tests',
206 'udd.tests.test_config',
207+ 'udd.tests.test_circuit_breaker',
208 'udd.tests.test_import_list',
209 'udd.tests.test_import_package',
210 'udd.tests.test_package_to_import',
211
212=== added file 'udd/circuit_breaker.dot'
213--- udd/circuit_breaker.dot 1970-01-01 00:00:00 +0000
214+++ udd/circuit_breaker.dot 2011-09-21 12:39:28 +0000
215@@ -0,0 +1,23 @@
216+// Better enjoyed with:
217+// dot circuit_breaker.dot -T png -o circuit_breaker.png
218+digraph G
219+{
220+ node [shape=circle fixedsize=true width=1.5];
221+ // Closed
222+ closed [shape=doublecircle]; // Initial state
223+ closed -> open [label="failure"];
224+ closed:sw -> closed:nw [label="attempt"];
225+ closed:se -> closed:ne [label="success"];
226+ // Open
227+ open -> half_open [label="attempt"];
228+ open -> half_open [label="success", style=dashed];
229+ open:sw -> open:nw [label="failure"];
230+ // Half Open
231+ half_open -> closed [label="success"];
232+ half_open -> open [label="failure",];
233+ half_open:se -> half_open:ne [label="attempt"];
234+ // Align open and half_open
235+ {rank=same;
236+ open half_open
237+ }
238+}
239
240=== added file 'udd/circuit_breaker.py'
241--- udd/circuit_breaker.py 1970-01-01 00:00:00 +0000
242+++ udd/circuit_breaker.py 2011-09-21 12:39:28 +0000
243@@ -0,0 +1,109 @@
244+import time
245+
246+
247+# Constants for the circuit breaker states
248+CLOSED = 1
249+OPEN = 2
250+HALF_OPEN = 3
251+
252+
253+class CircuitBreaker(object):
254+
255+ def __init__(self, name, threshold=None, delay=None):
256+ """A circuit breaker opening when a failure threshold is reached.
257+
258+ It half opens again when an attempt is made after a delay.
259+ It then closes again when a success is seen.
260+
261+ :param threshold: When the number of failures exceeds it, the breaker
262+ is opened.
263+
264+ :param delay: Number of seconds before attempting to close (so-called
265+ half open) the breaker.
266+ """
267+ if threshold is None:
268+ # No threshold
269+ threshold = 0
270+ if delay is None:
271+ delay = 0.0
272+ self.name = name
273+ self.failures = 0
274+ self.threshold = threshold
275+ self.delay = delay # in secs
276+ self.state = None
277+ # Assume it works
278+ self._close()
279+
280+ def closed(self):
281+ """The circuit works if the breaker is closed."""
282+ return self.state == CLOSED
283+
284+ def see_failure(self):
285+ # One more failure
286+ self.failures += 1
287+ if self.state == CLOSED:
288+ self._open()
289+ elif self.state == OPEN:
290+ self._half_open()
291+ elif self.state == HALF_OPEN:
292+ # Fail again
293+ self._open()
294+ else:
295+ raise AssertionError('Unkown circuit breaker state: %s'
296+ % (self.state,))
297+
298+ def see_success(self):
299+ if self.state == CLOSED:
300+ # Everything works
301+ pass
302+ elif self.state == OPEN:
303+ # Weird, this is not expected in the normal workflow. We assume a
304+ # wrong ordering in the events (or too many of them) but move to
305+ # half_open first, waiting for a confirmation.
306+ self._half_open()
307+ elif self.state == HALF_OPEN:
308+ self._close()
309+ else:
310+ raise AssertionError('Unkown circuit breaker state: %s'
311+ % (self.state,))
312+
313+ def see_attempt(self):
314+ """An attempt to use the circuit is made.
315+
316+ :returns: False if the circuit is open, True otherwise. This represents
317+ whether or not the attempt tried to use the circuit.
318+ """
319+ if self.state == CLOSED:
320+ # Everything works.
321+ pass
322+ elif self.state == OPEN:
323+ self._half_open()
324+ elif self.state == HALF_OPEN:
325+ # We're trying. Trying more than once in parallel is allowed.
326+ pass
327+ else:
328+ raise AssertionError('Unkown circuit breaker state: %s'
329+ % (self.state,))
330+ # The attempt is succesful if the breaker is not opened
331+ return not (self.state == OPEN)
332+
333+ def _close(self):
334+ """The circuit works, close the breaker."""
335+ self.state = CLOSED
336+ self.failures = 0
337+
338+ def _open(self):
339+ """A failure happened, open the breaker when reaching the threshold."""
340+ if self.failures > self.threshold:
341+ # Enough failures, we have a problem
342+ self.state = OPEN
343+ self.opened_at = time.time()
344+
345+ def _half_open(self):
346+ """We're trying to close the breaker after a failure."""
347+ now = time.time()
348+ when = self.opened_at + self.delay
349+ if now >= when:
350+ # We waited long enough, let's try again.
351+ self.state = HALF_OPEN
352+
353
354=== modified file 'udd/icommon.py'
355--- udd/icommon.py 2011-08-17 08:30:51 +0000
356+++ udd/icommon.py 2011-09-21 12:39:28 +0000
357@@ -467,18 +467,23 @@
358
359 def _start_package(self, c, package):
360 ret = None
361+ # Search the active jobs for the given package ordered by job type
362+ # (priority, new, retry)
363 rows = c.execute(self.JOBS_TABLE_FIND, (package, 1)).fetchall()
364 now = datetime.datetime.utcnow()
365 first = now
366 job_id = 0
367 for row in rows:
368+ # Mark all the jobs inactive (and only the first as "started")
369 c.execute(self.JOBS_TABLE_UPDATE,
370 (row[1], 0, row[3], row[4], first, row[6], row[0]))
371 first = None
372 job_id = row[0]
373 if self._has_failed(c, package):
374+ # Don't start it if it has failed
375 job_id = None
376 else:
377+ # Mark the job as one that will be started RSN
378 c.execute('insert into %s values (?, ?, ?, 0)'
379 % self.FAILURES_TABLE,
380 (package, running_sentinel, now))
381@@ -651,6 +656,19 @@
382 sig = exc_type + sig
383 return sig
384
385+ def _known_auto_retry(self, c, sig):
386+ return c.execute(self.RETRY_TABLE_SELECT, (sig,)).fetchone()
387+
388+ def known_auto_retry(self, sig):
389+ c = self.conn.cursor()
390+ try:
391+ return self._known_auto_retry(c, sig)
392+ except:
393+ self.conn.rollback()
394+ raise
395+ finally:
396+ c.close()
397+
398 def retry(self, package, force=False, priority=False, auto=False,
399 all=False):
400 c = self.conn.cursor()
401@@ -669,7 +687,7 @@
402 sig = self.failure_signature(raw_reason)
403 self._retry(c, package, sig, row[2], priority=priority)
404 if auto and sig != None:
405- row = c.execute(self.RETRY_TABLE_SELECT, (sig,)).fetchone()
406+ row = self._known_auto_retry(c, sig)
407 if row is None:
408 c.execute(self.RETRY_TABLE_INSERT, (sig,))
409 if all and sig != None:
410@@ -715,19 +733,22 @@
411 self._add_job(c, package, job_type)
412
413 def _attempt_retry(self, c, info):
414- row = c.execute(self.RETRY_TABLE_SELECT, (info.signature,)).fetchone()
415+ row = self._known_auto_retry(c, info.signature)
416 if row is None:
417+ # A failure that is not auto-retried
418 return False
419 info.auto_retry = True
420- info.auto_retry_time = info.timestamp + datetime.timedelta(0,
421- self.AUTO_RETRY_SECONDS)
422+ info.auto_retry_time = (
423+ info.timestamp + datetime.timedelta(0, self.AUTO_RETRY_SECONDS))
424 if info.auto_retry_time > datetime.datetime.utcnow():
425+ # Too early to retry
426 return False
427 row = c.execute(self.OLD_FAILURES_TABLE_FIND, (info.name,)).fetchone()
428 if row is not None and row[1] == info.signature:
429 info.failure_count += row[4]
430 if row[4] > self.MAX_AUTO_RETRY_COUNT:
431 info.auto_retry_masked = True
432+ # Already retried MAX_AUTO_RETRY_COUNT times, stop retrying
433 return False
434 self._retry(c, info.name, info.signature, info.timestamp)
435
436
437=== added file 'udd/tests/test_circuit_breaker.py'
438--- udd/tests/test_circuit_breaker.py 1970-01-01 00:00:00 +0000
439+++ udd/tests/test_circuit_breaker.py 2011-09-21 12:39:28 +0000
440@@ -0,0 +1,129 @@
441+import time
442+
443+from bzrlib import tests
444+
445+from udd import circuit_breaker
446+
447+
448+class TestCircuitBreaker(tests.TestCase):
449+
450+ threshold = None
451+ delay = None
452+
453+ def setUp(self):
454+ super(TestCircuitBreaker, self).setUp()
455+ self.cb = self.get_cb()
456+
457+ def get_cb(self, threshold=None, delay=None):
458+ if threshold is None:
459+ threshold = self.threshold
460+ if delay is None:
461+ delay = self.delay
462+ return circuit_breaker.CircuitBreaker('test', threshold=threshold,
463+ delay=delay)
464+
465+ def assertClosed(self):
466+ self.assertTrue(self.cb.closed())
467+
468+ def assertOpened(self):
469+ self.assertFalse(self.cb.closed())
470+ self.assertEquals(circuit_breaker.OPEN, self.cb.state)
471+
472+ def assertHalfOpened(self):
473+ self.assertFalse(self.cb.closed())
474+ self.assertEquals(circuit_breaker.HALF_OPEN,
475+ self.cb.state)
476+
477+
478+class TestStateMachine(TestCircuitBreaker):
479+ """Test the three tests and three events with a specific circuit breaker.
480+
481+ We cover all transitions but for a circuit breaker with a 0 threshold and
482+ and a 0.0 delay to simplify.
483+
484+ Other threshold/delay combinations are covered elsewhere.
485+ """
486+
487+ threshold = 0 # So we don't have to reach it
488+ delay = 0.0 # So we don't have to wait
489+
490+ def test_initially_closed(self):
491+ self.assertClosed()
492+
493+ # The usual scenario is (state -- event --> state):
494+ # closed -- failure --> open -- attempt --> half_open -- success --> closed
495+ # The following tests cover this scenario.
496+
497+ def test_open_on_failure(self):
498+ self.assertClosed()
499+ self.cb.see_failure()
500+ self.assertOpened()
501+
502+ def test_half_open_on_attempt(self):
503+ self.cb.see_failure()
504+ self.assertOpened()
505+ self.assertTrue(self.cb.see_attempt())
506+ self.assertHalfOpened()
507+
508+ def test_closed_on_success(self):
509+ self.cb.see_failure()
510+ self.assertTrue(self.cb.see_attempt())
511+ self.assertHalfOpened()
512+ self.cb.see_success()
513+ self.assertClosed()
514+
515+ # Special case scenarios
516+
517+ def test_stay_closed(self):
518+ """The other events keep the breaker closed."""
519+ self.assertTrue(self.cb.see_attempt())
520+ self.assertClosed()
521+ self.cb.see_success()
522+ self.assertClosed()
523+
524+ def test_keep_trying(self):
525+ """While half opened, further attempts stay in the same state."""
526+ self.cb.see_failure()
527+ self.assertTrue(self.cb.see_attempt())
528+ self.assertHalfOpened()
529+ self.assertTrue(self.cb.see_attempt())
530+ self.assertHalfOpened()
531+
532+ def test_success_while_opened(self):
533+ self.cb.see_failure()
534+ self.cb.see_success()
535+ self.assertHalfOpened()
536+
537+ def test_no_cigar(self):
538+ """The attempt fails, back to the open state."""
539+ self.cb.see_failure()
540+ self.assertTrue(self.cb.see_attempt())
541+ self.cb.see_failure()
542+ self.assertOpened()
543+
544+
545+class TestThreshold(TestCircuitBreaker):
546+ """Test a real threshold"""
547+
548+ threshold = 1 # We need at least one failure to open
549+
550+ def test_open_on_failure_with_threshold(self):
551+ self.cb.see_failure()
552+ self.assertClosed()
553+ # Reach the threshold
554+ self.cb.see_failure()
555+ self.assertOpened()
556+
557+
558+class TestDelay(TestCircuitBreaker):
559+ """Test a real delay"""
560+
561+ delay = 0.1 # Short enough for not slowing down tests too much
562+
563+ def test_half_open_after_delay(self):
564+ self.cb.see_failure()
565+ self.assertFalse(self.cb.see_attempt())
566+ self.assertOpened()
567+ time.sleep(self.delay)
568+ self.assertTrue(self.cb.see_attempt())
569+ self.assertHalfOpened()
570
571=== modified file 'udd/threads.py'
572--- udd/threads.py 2011-06-15 07:26:45 +0000
573+++ udd/threads.py 2011-09-21 12:39:28 +0000
574@@ -228,10 +228,13 @@
575
576 def collect_terminated_threads(self):
577 # If there are terminated threads, collect them
578+ collected = []
579 for t in self.threads[:]:
580 if not t.isAlive():
581 t.collect()
582 self.threads.remove(t)
583+ collected.append(t)
584+ return collected
585
586 def kill_remaining_threads(self):
587 # Kill all the remaining threads

Subscribers

People subscribed via source and target branches