Merge lp:~james-w/conn-check/split into lp:conn-check

Proposed by James Westby
Status: Merged
Approved by: James Westby
Approved revision: 16
Merged at revision: 16
Proposed branch: lp:~james-w/conn-check/split
Merge into: lp:conn-check
Diff against target: 2210 lines (+1039/-984)
8 files modified
conn_check/__init__.py (+3/-938)
conn_check/check_impl.py (+325/-0)
conn_check/checks.py (+310/-0)
conn_check/main.py (+181/-0)
conn_check/patterns.py (+153/-0)
demo.yaml (+1/-1)
setup.py (+1/-1)
tests.py (+65/-44)
To merge this branch: bzr merge lp:~james-w/conn-check/split
Reviewer Review Type Date Requested Status
James Westby (community) Approve
Review via email: mp+228217@code.launchpad.net

Commit message

Refactor to split up conn_check.py.

Description of the change

Hi,

Split up the big file.

Thanks,

James

To post a comment you must log in.
Revision history for this message
James Westby (james-w) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'conn_check/__init__.py'
--- conn_check/__init__.py 2014-07-24 21:09:27 +0000
+++ conn_check/__init__.py 2014-07-24 22:10:31 +0000
@@ -3,41 +3,6 @@
3"""3"""
44
5import os5import os
6import re
7import sys
8import time
9import glob
10from pkg_resources import resource_stream
11import traceback
12import urlparse
13import yaml
14
15from argparse import ArgumentParser
16from threading import Thread
17
18from OpenSSL import SSL
19from OpenSSL.crypto import load_certificate, FILETYPE_PEM
20
21from twisted.internet import epollreactor
22epollreactor.install()
23
24from twisted.internet import reactor, ssl
25from twisted.internet.defer import (
26 returnValue,
27 inlineCallbacks,
28 maybeDeferred,
29 DeferredList,
30 Deferred)
31from twisted.internet.error import DNSLookupError, TimeoutError
32from twisted.internet.abstract import isIPAddress
33from twisted.internet.protocol import (
34 DatagramProtocol,
35 Protocol,
36 ClientCreator)
37from twisted.python.failure import Failure
38from twisted.python.threadpool import ThreadPool
39from twisted.web.client import Agent
40
416
42def get_version_string():7def get_version_string():
43 return open(os.path.join(os.path.dirname(__file__),8 return open(os.path.join(os.path.dirname(__file__),
@@ -47,909 +12,9 @@
47def get_version():12def get_version():
48 return get_version_string().split('.')13 return get_version_string().split('.')
4914
15
50__version__ = get_version_string()16__version__ = get_version_string()
5117
5218
53CONNECT_TIMEOUT = 1019from twisted.internet import epollreactor
54CA_CERTS = []20epollreactor.install()
55
56for certFileName in glob.glob("/etc/ssl/certs/*.pem"):
57 # There might be some dead symlinks in there, so let's make sure it's real.
58 if os.path.exists(certFileName):
59 data = open(certFileName).read()
60 x509 = load_certificate(FILETYPE_PEM, data)
61 # Now, de-duplicate in case the same cert has multiple names.
62 CA_CERTS.append(x509)
63
64
65class VerifyingContextFactory(ssl.CertificateOptions):
66
67 def __init__(self, verify, caCerts, verifyCallback=None):
68 ssl.CertificateOptions.__init__(self, verify=verify,
69 caCerts=caCerts,
70 method=SSL.SSLv23_METHOD)
71 self.verifyCallback = verifyCallback
72
73 def _makeContext(self):
74 context = ssl.CertificateOptions._makeContext(self)
75 if self.verifyCallback is not None:
76 context.set_verify(
77 SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
78 self.verifyCallback)
79 return context
80
81
82def maybeDeferToThread(f, *args, **kwargs):
83 """
84 Call the function C{f} using a thread from the given threadpool and return
85 the result as a Deferred.
86
87 @param f: The function to call. May return a deferred.
88 @param *args: positional arguments to pass to f.
89 @param **kwargs: keyword arguments to pass to f.
90
91 @return: A Deferred which fires a callback with the result of f, or an
92 errback with a L{twisted.python.failure.Failure} if f throws an
93 exception.
94 """
95 threadpool = reactor.getThreadPool()
96
97 d = Deferred()
98
99 def realOnResult(result):
100 if not isinstance(result, Failure):
101 reactor.callFromThread(d.callback, result)
102 else:
103 reactor.callFromThread(d.errback, result)
104
105 def onResult(success, result):
106 assert success
107 assert isinstance(result, Deferred)
108 result.addBoth(realOnResult)
109
110 threadpool.callInThreadWithCallback(onResult, maybeDeferred,
111 f, *args, **kwargs)
112
113 return d
114
115
116class Check(object):
117 """Abstract base class for objects embodying connectivity checks."""
118
119 def check(self, pattern, results):
120 """Run this check, if it matches the pattern.
121
122 If the pattern matches, and this is a leaf node in the check tree,
123 implementations of Check.check should call
124 results.notify_start, then either results.notify_success or
125 results.notify_failure.
126 """
127 raise NotImplementedError("%r.check not implemented" % type(self))
128
129 def skip(self, pattern, results):
130 """Indicate that this check has been skipped.
131
132 If the pattern matches and this is a leaf node in the check tree,
133 implementations of Check.skip should call results.notify_skip.
134 """
135 raise NotImplementedError("%r.skip not implemented" % type(self))
136
137
138class Pattern(object):
139 """Abstract base class for patterns used to select subsets of checks."""
140
141 def assume_prefix(self, prefix):
142 """Return an equivalent pattern with the given prefix baked in.
143
144 For example, if self.matches("bar") is True, then
145 self.assume_prefix("foo").matches("foobar") will be True.
146 """
147 return PrefixPattern(prefix, self)
148
149 def failed(self):
150 """Return True if the pattern cannot match any string.
151
152 This is mainly used so we can bail out early when recursing into
153 check trees.
154 """
155 return not self.prefix_matches("")
156
157 def prefix_matches(self, partial_name):
158 """Return True if the partial name (a prefix) is a potential match."""
159 raise NotImplementedError("%r.prefix_matches not implemented" %
160 type(self))
161
162 def matches(self, name):
163 """Return True if the given name matches."""
164 raise NotImplementedError("%r.match not implemented" %
165 type(self))
166
167
168class ResultTracker(object):
169 """Base class for objects which report or record check results."""
170
171 def notify_start(self, name, info):
172 """Register the start of a check."""
173
174 def notify_skip(self, name):
175 """Register a check being skipped."""
176
177 def notify_success(self, name, duration):
178 """Register a successful check."""
179
180 def notify_failure(self, name, info, exc_info, duration):
181 """Register the failure of a check."""
182
183
184class PrefixResultWrapper(ResultTracker):
185 """ResultWrapper wrapper which adds a prefix to recorded results."""
186
187 def __init__(self, wrapped, prefix):
188 """Initialize an instance."""
189 super(PrefixResultWrapper, self).__init__()
190 self.wrapped = wrapped
191 self.prefix = prefix
192
193 def make_name(self, name):
194 """Make a name by prepending the prefix."""
195 return "%s%s" % (self.prefix, name)
196
197 def notify_skip(self, name):
198 """Register a check being skipped."""
199 self.wrapped.notify_skip(self.make_name(name))
200
201 def notify_start(self, name, info):
202 """Register the start of a check."""
203 self.wrapped.notify_start(self.make_name(name), info)
204
205 def notify_success(self, name, duration):
206 """Register success."""
207 self.wrapped.notify_success(self.make_name(name), duration)
208
209 def notify_failure(self, name, info, exc_info, duration):
210 """Register failure."""
211 self.wrapped.notify_failure(self.make_name(name),
212 info, exc_info, duration)
213
214
215class FailureCountingResultWrapper(ResultTracker):
216 """ResultWrapper wrapper which counts failures."""
217
218 def __init__(self, wrapped):
219 """Initialize an instance."""
220 super(FailureCountingResultWrapper, self).__init__()
221 self.wrapped = wrapped
222 self.failure_count = 0
223
224 def notify_skip(self, name):
225 """Register a check being skipped."""
226 self.wrapped.notify_skip(name)
227
228 def notify_start(self, name, info):
229 """Register the start of a check."""
230 self.failure_count += 1
231 self.wrapped.notify_start(name, info)
232
233 def notify_success(self, name, duration):
234 """Register success."""
235 self.failure_count -= 1
236 self.wrapped.notify_success(name, duration)
237
238 def notify_failure(self, name, info, exc_info, duration):
239 """Register failure."""
240 self.wrapped.notify_failure(name, info, exc_info, duration)
241
242 def any_failed(self):
243 """Return True if any checks using this wrapper failed so far."""
244 return self.failure_count > 0
245
246
247class FailedPattern(Pattern):
248 """Patterns that always fail to match."""
249
250 def assume_prefix(self, prefix):
251 """Return an equivalent pattern with the given prefix baked in."""
252 return FAILED_PATTERN
253
254 def prefix_matches(self, partial_name):
255 """Return True if the partial name matches."""
256 return False
257
258 def matches(self, name):
259 """Return True if the complete name matches."""
260 return False
261
262
263FAILED_PATTERN = FailedPattern()
264
265
266PATTERN_TOKEN_RE = re.compile(r'\*|[^*]+')
267
268
269def tokens_to_partial_re(tokens):
270 """Convert tokens to a regular expression for matching prefixes."""
271
272 def token_to_re(token):
273 """Convert tokens to (begin, end, alt_end) triples."""
274 if token == '*':
275 return (r'(?:.*', ')?', ')')
276 else:
277 chars = list(token)
278 begin = "".join(["(?:" + re.escape(c) for c in chars])
279 end = "".join([")?" for c in chars])
280 return (begin, end, end)
281
282 subexprs = map(token_to_re, tokens)
283 if len(subexprs) > 0:
284 # subexpressions like (.*)? aren't accepted, so we may have to use
285 # an alternate closing form for the last (innermost) subexpression
286 (begin, _, alt_end) = subexprs[-1]
287 subexprs[-1] = (begin, alt_end, alt_end)
288 return re.compile("".join([se[0] for se in subexprs] +
289 [se[1] for se in reversed(subexprs)] +
290 [r'\Z']))
291
292
293def tokens_to_re(tokens):
294 """Convert tokens to a regular expression for exact matching."""
295
296 def token_to_re(token):
297 """Convert tokens to simple regular expressions."""
298 if token == '*':
299 return r'.*'
300 else:
301 return re.escape(token)
302
303 return re.compile("".join(map(token_to_re, tokens) + [r'\Z']))
304
305
306class SimplePattern(Pattern):
307 """Pattern that matches according to the given pattern expression."""
308
309 def __init__(self, pattern):
310 """Initialize an instance."""
311 super(SimplePattern, self).__init__()
312 tokens = PATTERN_TOKEN_RE.findall(pattern)
313 self.partial_re = tokens_to_partial_re(tokens)
314 self.full_re = tokens_to_re(tokens)
315
316 def prefix_matches(self, partial_name):
317 """Return True if the partial name matches."""
318 return self.partial_re.match(partial_name) is not None
319
320 def matches(self, name):
321 """Return True if the complete name matches."""
322 return self.full_re.match(name) is not None
323
324
325class PrefixPattern(Pattern):
326 """Pattern that assumes a previously given prefix."""
327
328 def __init__(self, prefix, pattern):
329 """Initialize an instance."""
330 super(PrefixPattern, self).__init__()
331 self.prefix = prefix
332 self.pattern = pattern
333
334 def assume_prefix(self, prefix):
335 """Return an equivalent pattern with the given prefix baked in."""
336 return PrefixPattern(self.prefix + prefix, self.pattern)
337
338 def prefix_matches(self, partial_name):
339 """Return True if the partial name matches."""
340 return self.pattern.prefix_matches(self.prefix + partial_name)
341
342 def matches(self, name):
343 """Return True if the complete name matches."""
344 return self.pattern.matches(self.prefix + name)
345
346
347class SumPattern(Pattern):
348 """Pattern that matches if at least one given pattern matches."""
349
350 def __init__(self, patterns):
351 """Initialize an instance."""
352 super(SumPattern, self).__init__()
353 self.patterns = patterns
354
355 def prefix_matches(self, partial_name):
356 """Return True if the partial name matches."""
357 for pattern in self.patterns:
358 if pattern.prefix_matches(partial_name):
359 return True
360 return False
361
362 def matches(self, name):
363 """Return True if the complete name matches."""
364 for pattern in self.patterns:
365 if pattern.matches(name):
366 return True
367 return False
368
369
370class ConditionalCheck(Check):
371 """A Check that skips unless the given predicate is true at check time."""
372
373 def __init__(self, wrapped, predicate):
374 """Initialize an instance."""
375 super(ConditionalCheck, self).__init__()
376 self.wrapped = wrapped
377 self.predicate = predicate
378
379 def check(self, pattern, result):
380 """Skip the check."""
381 if self.predicate():
382 return self.wrapped.check(pattern, result)
383 else:
384 self.skip(pattern, result)
385
386 def skip(self, pattern, result):
387 """Skip the check."""
388 self.wrapped.skip(pattern, result)
389
390
391class FunctionCheck(Check):
392 """A Check which takes a check function."""
393
394 def __init__(self, name, check, info=None, blocking=False):
395 """Initialize an instance."""
396 super(FunctionCheck, self).__init__()
397 self.name = name
398 self.info = info
399 self.check_fn = check
400 self.blocking = blocking
401
402 @inlineCallbacks
403 def check(self, pattern, results):
404 """Call the check function."""
405 if not pattern.matches(self.name):
406 returnValue(None)
407 results.notify_start(self.name, self.info)
408 start = time.time()
409 try:
410 if self.blocking:
411 result = yield maybeDeferToThread(self.check_fn)
412 else:
413 result = yield maybeDeferred(self.check_fn)
414 results.notify_success(self.name, time.time() - start)
415 returnValue(result)
416 except Exception:
417 results.notify_failure(self.name, self.info,
418 sys.exc_info(), time.time() - start)
419
420 def skip(self, pattern, results):
421 """Record the skip."""
422 if not pattern.matches(self.name):
423 return
424 results.notify_skip(self.name)
425
426
427class MultiCheck(Check):
428 """A composite check comprised of multiple subchecks."""
429
430 def __init__(self, subchecks, strategy):
431 """Initialize an instance."""
432 super(MultiCheck, self).__init__()
433 self.subchecks = list(subchecks)
434 self.strategy = strategy
435
436 def check(self, pattern, results):
437 """Run subchecks using the strategy supplied at creation time."""
438 return self.strategy(self.subchecks, pattern, results)
439
440 def skip(self, pattern, results):
441 """Skip subchecks."""
442 for subcheck in self.subchecks:
443 subcheck.skip(pattern, results)
444
445
446class PrefixCheckWrapper(Check):
447 """Runs a given check, adding a prefix to its name.
448
449 This works by wrapping the pattern and result tracker objects
450 passed to .check and .skip.
451 """
452
453 def __init__(self, wrapped, prefix):
454 """Initialize an instance."""
455 super(PrefixCheckWrapper, self).__init__()
456 self.wrapped = wrapped
457 self.prefix = prefix
458
459 def do_subcheck(self, subcheck, pattern, results):
460 """Do a subcheck if the pattern could still match."""
461 pattern = pattern.assume_prefix(self.prefix)
462 if not pattern.failed():
463 results = PrefixResultWrapper(wrapped=results,
464 prefix=self.prefix)
465 return subcheck(pattern, results)
466
467 def check(self, pattern, results):
468 """Run the check, prefixing results."""
469 return self.do_subcheck(self.wrapped.check, pattern, results)
470
471 def skip(self, pattern, results):
472 """Skip checks, prefixing results."""
473 self.do_subcheck(self.wrapped.skip, pattern, results)
474
475
476@inlineCallbacks
477def sequential_strategy(subchecks, pattern, results):
478 """Run subchecks sequentially, skipping checks after the first failure.
479
480 This is most useful when the failure of one check in the sequence
481 would imply the failure of later checks -- for example, it probably
482 doesn't make sense to run an SSL check if the basic TCP check failed.
483
484 Use sequential_check to create a meta-check using this strategy.
485 """
486 local_results = FailureCountingResultWrapper(wrapped=results)
487 failed = False
488 for subcheck in subchecks:
489 if failed:
490 subcheck.skip(pattern, local_results)
491 else:
492 yield maybeDeferred(subcheck.check, pattern, local_results)
493 if local_results.any_failed():
494 failed = True
495
496
497def parallel_strategy(subchecks, pattern, results):
498 """A strategy which runs the given subchecks in parallel.
499
500 Most checks can potentially block for long periods, and shouldn't have
501 interdependencies, so it makes sense to run them in parallel to
502 shorten the overall run time.
503
504 Use parallel_check to create a meta-check using this strategy.
505 """
506 deferreds = [maybeDeferred(subcheck.check, pattern, results)
507 for subcheck in subchecks]
508 return DeferredList(deferreds)
509
510
511def parallel_check(subchecks):
512 """Return a check that runs the given subchecks in parallel."""
513 return MultiCheck(subchecks=subchecks, strategy=parallel_strategy)
514
515
516def sequential_check(subchecks):
517 """Return a check that runs the given subchecks in sequence."""
518 return MultiCheck(subchecks=subchecks, strategy=sequential_strategy)
519
520
521def add_check_prefix(*args):
522 """Return an equivalent check with the given prefix prepended to its name.
523
524 The final argument should be a check; the remaining arguments are treated
525 as name components and joined with the check name using periods as
526 separators. For example, if the name of a check is "baz", then:
527
528 add_check_prefix("foo", "bar", check)
529
530 ...will return a check with the effective name "foo.bar.baz".
531 """
532 args = list(args)
533 check = args.pop(-1)
534 path = ".".join(args)
535 return PrefixCheckWrapper(wrapped=check, prefix="%s." % (path,))
536
537
538def make_check(name, check, info=None, blocking=False):
539 """Make a check object from a function."""
540 return FunctionCheck(name=name, check=check, info=info, blocking=blocking)
541
542
543def guard_check(check, predicate):
544 """Wrap a check so that it is skipped unless the predicate is true."""
545 return ConditionalCheck(wrapped=check, predicate=predicate)
546
547
548class TCPCheckProtocol(Protocol):
549
550 def connectionMade(self):
551 self.transport.loseConnection()
552
553
554@inlineCallbacks
555def do_tcp_check(host, port, ssl=False, ssl_verify=True):
556 """Generic connection check function."""
557 if not isIPAddress(host):
558 try:
559 ip = yield reactor.resolve(host, timeout=(1, CONNECT_TIMEOUT))
560 except DNSLookupError:
561 raise ValueError("dns resolution failed")
562 else:
563 ip = host
564 creator = ClientCreator(reactor, TCPCheckProtocol)
565 try:
566 if ssl:
567 context = VerifyingContextFactory(ssl_verify, CA_CERTS)
568 yield creator.connectSSL(ip, port, context,
569 timeout=CONNECT_TIMEOUT)
570 else:
571 yield creator.connectTCP(ip, port, timeout=CONNECT_TIMEOUT)
572 except TimeoutError:
573 if ip == host:
574 raise ValueError("timed out")
575 else:
576 raise ValueError("timed out connecting to %s" % ip)
577
578
579def make_tcp_check(host, port, **kwargs):
580 """Return a check for TCP connectivity."""
581 return make_check("tcp.{}:{}".format(host, port), lambda: do_tcp_check(host, port),
582 info="%s:%s" % (host, port))
583
584
585def make_ssl_check(host, port, verify=True, **kwargs):
586 """Return a check for SSL setup."""
587 return make_check("ssl.{}:{}".format(host, port),
588 lambda: do_tcp_check(host, port, ssl=True,
589 ssl_verify=verify),
590 info="%s:%s" % (host, port))
591
592
593class UDPCheckProtocol(DatagramProtocol):
594
595 def __init__(self, host, port, send, expect, deferred=None):
596 self.host = host
597 self.port = port
598 self.send = send
599 self.expect = expect
600 self.deferred = deferred
601
602 def _finish(self, success, result):
603 if not (self.delayed.cancelled or self.delayed.called):
604 self.delayed.cancel()
605 if self.deferred is not None:
606 if success:
607 self.deferred.callback(result)
608 else:
609 self.deferred.errback(result)
610 self.deferred = None
611
612 def startProtocol(self):
613 self.transport.write(self.send, (self.host, self.port))
614 self.delayed = reactor.callLater(CONNECT_TIMEOUT,
615 self._finish,
616 False, TimeoutError())
617
618 def datagramReceived(self, datagram, addr):
619 if datagram == self.expect:
620 self._finish(True, True)
621 else:
622 self._finish(False, ValueError("unexpected reply"))
623
624
625@inlineCallbacks
626def do_udp_check(host, port, send, expect):
627 """Generic connection check function."""
628 if not isIPAddress(host):
629 try:
630 ip = yield reactor.resolve(host, timeout=(1, CONNECT_TIMEOUT))
631 except DNSLookupError:
632 raise ValueError("dns resolution failed")
633 else:
634 ip = host
635 deferred = Deferred()
636 protocol = UDPCheckProtocol(host, port, send, expect, deferred)
637 reactor.listenUDP(0, protocol)
638 try:
639 yield deferred
640 except TimeoutError:
641 if ip == host:
642 raise ValueError("timed out")
643 else:
644 raise ValueError("timed out waiting for %s" % ip)
645
646
647def make_udp_check(host, port, send, expect, **kwargs):
648 """Return a check for UDP connectivity."""
649 return make_check("udp.{}:{}".format(host, port),
650 lambda: do_udp_check(host, port, send, expect),
651 info="%s:%s" % (host, port))
652
653
654def extract_host_port(url):
655 parsed = urlparse.urlparse(url)
656 host = parsed.hostname
657 port = parsed.port
658 scheme = parsed.scheme
659 if not scheme:
660 scheme = 'http'
661 if port is None:
662 if scheme == 'https':
663 port = 443
664 else:
665 port = 80
666 return host, port, scheme
667
668
669def make_http_check(url, method='GET', expected_code=200, **kwargs):
670 subchecks = []
671 host, port, scheme = extract_host_port(url)
672 subchecks.append(make_tcp_check(host, port))
673 if scheme == 'https':
674 subchecks.append(make_ssl_check(host, port))
675
676 @inlineCallbacks
677 def do_request():
678 agent = Agent(reactor)
679 response = yield agent.request(method, url)
680 if response.code != expected_code:
681 raise RuntimeError(
682 "Unexpected response code: {}".format(response.code))
683
684 subchecks.append(make_check('{}.{}'.format(method, url), do_request,
685 info='{} {}'.format(method, url)))
686 return sequential_check(subchecks)
687
688
689def make_amqp_check(host, port, username, password, use_ssl=True, vhost="/", **kwargs):
690 """Return a check for AMQP connectivity."""
691 from txamqp.protocol import AMQClient
692 from txamqp.client import TwistedDelegate
693 from txamqp.spec import load as load_spec
694
695 subchecks = []
696 subchecks.append(make_tcp_check(host, port))
697
698 if use_ssl:
699 subchecks.append(make_ssl_check(host, port, verify=False))
700
701 @inlineCallbacks
702 def do_auth():
703 """Connect and authenticate."""
704 delegate = TwistedDelegate()
705 spec = load_spec(resource_stream('conn_check', 'amqp0-8.xml'))
706 creator = ClientCreator(reactor, AMQClient,
707 delegate, vhost, spec)
708 client = yield creator.connectTCP(host, port, timeout=CONNECT_TIMEOUT)
709 yield client.authenticate(username, password)
710
711 subchecks.append(make_check("auth", do_auth,
712 info="user %s" % (username,),))
713 return sequential_check(subchecks)
714
715
716def make_postgres_check(host, port, username, password, database, **kwargs):
717 """Return a check for Postgres connectivity."""
718
719 import psycopg2
720 subchecks = []
721 connect_kw = {'host': host, 'user': username, 'database': database}
722
723 if host[0] != '/':
724 connect_kw['port'] = port
725 subchecks.append(make_tcp_check(host, port))
726
727 if password is not None:
728 connect_kw['password'] = password
729
730 def check_auth():
731 """Try to establish a postgres connection and log in."""
732 conn = psycopg2.connect(**connect_kw)
733 conn.close()
734
735 subchecks.append(make_check("auth", check_auth,
736 info="user %s" % (username,),
737 blocking=True))
738 return sequential_check(subchecks)
739
740
741def make_redis_check(host, port, password=None, **kwargs):
742 """Make a check for the configured redis server."""
743 import txredis
744 subchecks = []
745 subchecks.append(make_tcp_check(host, port))
746
747 @inlineCallbacks
748 def do_connect():
749 """Connect and authenticate.
750 """
751 client_creator = ClientCreator(reactor, txredis.client.RedisClient)
752 client = yield client_creator.connectTCP(host=host, port=port,
753 timeout=CONNECT_TIMEOUT)
754
755 if password is None:
756 ping = yield client.ping()
757 if not ping:
758 raise RuntimeError("failed to ping redis")
759 else:
760 resp = yield client.auth(password)
761 if resp != 'OK':
762 raise RuntimeError("failed to auth to redis")
763
764 connect_info = "connect with auth" if password is not None else "connect"
765 subchecks.append(make_check(connect_info, do_connect))
766 return add_check_prefix('redis', sequential_check(subchecks))
767
768
769CHECKS = {
770 'tcp': {
771 'fn': make_tcp_check,
772 'args': ['host', 'port'],
773 },
774 'ssl': {
775 'fn': make_ssl_check,
776 'args': ['host', 'port'],
777 },
778 'udp': {
779 'fn': make_udp_check,
780 'args': ['host', 'port', 'send', 'expect'],
781 },
782 'http': {
783 'fn': make_http_check,
784 'args': ['url'],
785 },
786 'amqp': {
787 'fn': make_amqp_check,
788 'args': ['host', 'port', 'username', 'password'],
789 },
790 'postgres': {
791 'fn': make_postgres_check,
792 'args': ['host', 'port', 'username', 'password', 'database'],
793 },
794 'redis': {
795 'fn': make_redis_check,
796 'args': ['host', 'port'],
797 },
798}
799
800
801def check_from_description(check_description):
802 _type = check_description['type']
803 check = CHECKS.get(_type, None)
804 if check is None:
805 raise AssertionError("Unknown check type: {}, available checks: {}".format(
806 _type, CHECKS.keys()))
807 for arg in check['args']:
808 if arg not in check_description:
809 raise AssertionError('{} missing from check: {}'.format(arg,
810 check_description))
811 res = check['fn'](**check_description)
812 return res
813
814
815def build_checks(check_descriptions):
816 subchecks = map(check_from_description, check_descriptions)
817 return parallel_check(subchecks)
818
819
820@inlineCallbacks
821def run_checks(checks, pattern, results):
822 """Make and run all the pertinent checks."""
823 try:
824 yield checks.check(pattern, results)
825 finally:
826 reactor.stop()
827
828
829class TimestampOutput(object):
830
831 def __init__(self, output):
832 self.start = time.time()
833 self.output = output
834
835 def write(self, data):
836 self.output.write("%.3f: %s" % (time.time() - self.start, data))
837
838
839class ConsoleOutput(ResultTracker):
840 """Displays check results."""
841
842 def __init__(self, output, verbose, show_tracebacks, show_duration):
843 """Initialize an instance."""
844 super(ConsoleOutput, self).__init__()
845 self.output = output
846 self.verbose = verbose
847 self.show_tracebacks = show_tracebacks
848 self.show_duration = show_duration
849
850 def format_duration(self, duration):
851 if not self.show_duration:
852 return ""
853 return " (%.3f ms)" % duration
854
855 def notify_start(self, name, info):
856 """Register the start of a check."""
857 if self.verbose:
858 if info:
859 info = " (%s)" % (info,)
860 self.output.write("Starting %s%s...\n" % (name, info or ''))
861
862 def notify_skip(self, name):
863 """Register a check being skipped."""
864 self.output.write("SKIPPING %s\n" % (name,))
865
866 def notify_success(self, name, duration):
867 """Register a success."""
868 self.output.write("OK %s%s\n" % (
869 name, self.format_duration(duration)))
870
871 def notify_failure(self, name, info, exc_info, duration):
872 """Register a failure."""
873 message = str(exc_info[1]).split("\n")[0]
874 if info:
875 message = "(%s): %s" % (info, message)
876 self.output.write("FAILED %s%s: %s\n" % (
877 name, self.format_duration(duration), message))
878 if self.show_tracebacks:
879 formatted = traceback.format_exception(exc_info[0],
880 exc_info[1],
881 exc_info[2],
882 None)
883 lines = "".join(formatted).split("\n")
884 if len(lines) > 0 and len(lines[-1]) == 0:
885 lines.pop()
886 indented = "\n".join([" %s" % (line,) for line in lines])
887 self.output.write("%s\n" % (indented,))
888
889
890def main(*args):
891 """Parse arguments, then build and run checks in a reactor."""
892 parser = ArgumentParser()
893 parser.add_argument("config_file",
894 help="Config file specifying the checks to run.")
895 parser.add_argument("patterns", nargs='*',
896 help="Patterns to filter the checks.")
897 parser.add_argument("-v", "--verbose", dest="verbose",
898 action="store_true", default=False,
899 help="Show additional status")
900 parser.add_argument("-d", "--duration", dest="show_duration",
901 action="store_true", default=False,
902 help="Show duration")
903 parser.add_argument("-t", "--tracebacks", dest="show_tracebacks",
904 action="store_true", default=False,
905 help="Show tracebacks on failure")
906 parser.add_argument("--validate", dest="validate",
907 action="store_true", default=False,
908 help="Only validate the config file, don't run checks.")
909 options = parser.parse_args(list(args))
910
911 if options.patterns:
912 pattern = SumPattern(map(SimplePattern, options.patterns))
913 else:
914 pattern = SimplePattern("*")
915
916 def make_daemon_thread(*args, **kw):
917 """Create a daemon thread."""
918 thread = Thread(*args, **kw)
919 thread.daemon = True
920 return thread
921
922 threadpool = ThreadPool(minthreads=1)
923 threadpool.threadFactory = make_daemon_thread
924 reactor.threadpool = threadpool
925 reactor.callWhenRunning(threadpool.start)
926
927 output = sys.stdout
928 if options.show_duration:
929 output = TimestampOutput(output)
930
931 results = ConsoleOutput(output=output,
932 show_tracebacks=options.show_tracebacks,
933 show_duration=options.show_duration,
934 verbose=options.verbose)
935 results = FailureCountingResultWrapper(results)
936 with open(options.config_file) as f:
937 descriptions = yaml.load(f)
938 checks = build_checks(descriptions)
939 if not options.validate:
940 reactor.callWhenRunning(run_checks, checks, pattern, results)
941
942 reactor.run()
943
944 if results.any_failed():
945 return 1
946 else:
947 return 0
948
949
950def run():
951 exit(main(*sys.argv[1:]))
952
953
954if __name__ == '__main__':
955 run()
95621
=== added file 'conn_check/check_impl.py'
--- conn_check/check_impl.py 1970-01-01 00:00:00 +0000
+++ conn_check/check_impl.py 2014-07-24 22:10:31 +0000
@@ -0,0 +1,325 @@
1import sys
2import time
3
4from twisted.internet import reactor
5from twisted.internet.defer import (
6 returnValue,
7 inlineCallbacks,
8 maybeDeferred,
9 DeferredList,
10 Deferred)
11from twisted.python.failure import Failure
12
13
14def maybeDeferToThread(f, *args, **kwargs):
15 """
16 Call the function C{f} using a thread from the given threadpool and return
17 the result as a Deferred.
18
19 @param f: The function to call. May return a deferred.
20 @param *args: positional arguments to pass to f.
21 @param **kwargs: keyword arguments to pass to f.
22
23 @return: A Deferred which fires a callback with the result of f, or an
24 errback with a L{twisted.python.failure.Failure} if f throws an
25 exception.
26 """
27 threadpool = reactor.getThreadPool()
28
29 d = Deferred()
30
31 def realOnResult(result):
32 if not isinstance(result, Failure):
33 reactor.callFromThread(d.callback, result)
34 else:
35 reactor.callFromThread(d.errback, result)
36
37 def onResult(success, result):
38 assert success
39 assert isinstance(result, Deferred)
40 result.addBoth(realOnResult)
41
42 threadpool.callInThreadWithCallback(onResult, maybeDeferred,
43 f, *args, **kwargs)
44
45 return d
46
47
48class Check(object):
49 """Abstract base class for objects embodying connectivity checks."""
50
51 def check(self, pattern, results):
52 """Run this check, if it matches the pattern.
53
54 If the pattern matches, and this is a leaf node in the check tree,
55 implementations of Check.check should call
56 results.notify_start, then either results.notify_success or
57 results.notify_failure.
58 """
59 raise NotImplementedError("%r.check not implemented" % type(self))
60
61 def skip(self, pattern, results):
62 """Indicate that this check has been skipped.
63
64 If the pattern matches and this is a leaf node in the check tree,
65 implementations of Check.skip should call results.notify_skip.
66 """
67 raise NotImplementedError("%r.skip not implemented" % type(self))
68
69
70class ConditionalCheck(Check):
71 """A Check that skips unless the given predicate is true at check time."""
72
73 def __init__(self, wrapped, predicate):
74 """Initialize an instance."""
75 super(ConditionalCheck, self).__init__()
76 self.wrapped = wrapped
77 self.predicate = predicate
78
79 def check(self, pattern, result):
80 """Skip the check."""
81 if self.predicate():
82 return self.wrapped.check(pattern, result)
83 else:
84 self.skip(pattern, result)
85
86 def skip(self, pattern, result):
87 """Skip the check."""
88 self.wrapped.skip(pattern, result)
89
90
91class ResultTracker(object):
92 """Base class for objects which report or record check results."""
93
94 def notify_start(self, name, info):
95 """Register the start of a check."""
96
97 def notify_skip(self, name):
98 """Register a check being skipped."""
99
100 def notify_success(self, name, duration):
101 """Register a successful check."""
102
103 def notify_failure(self, name, info, exc_info, duration):
104 """Register the failure of a check."""
105
106
107class PrefixResultWrapper(ResultTracker):
108 """ResultWrapper wrapper which adds a prefix to recorded results."""
109
110 def __init__(self, wrapped, prefix):
111 """Initialize an instance."""
112 super(PrefixResultWrapper, self).__init__()
113 self.wrapped = wrapped
114 self.prefix = prefix
115
116 def make_name(self, name):
117 """Make a name by prepending the prefix."""
118 return "%s%s" % (self.prefix, name)
119
120 def notify_skip(self, name):
121 """Register a check being skipped."""
122 self.wrapped.notify_skip(self.make_name(name))
123
124 def notify_start(self, name, info):
125 """Register the start of a check."""
126 self.wrapped.notify_start(self.make_name(name), info)
127
128 def notify_success(self, name, duration):
129 """Register success."""
130 self.wrapped.notify_success(self.make_name(name), duration)
131
132 def notify_failure(self, name, info, exc_info, duration):
133 """Register failure."""
134 self.wrapped.notify_failure(self.make_name(name),
135 info, exc_info, duration)
136
137
138class FailureCountingResultWrapper(ResultTracker):
139 """ResultWrapper wrapper which counts failures."""
140
141 def __init__(self, wrapped):
142 """Initialize an instance."""
143 super(FailureCountingResultWrapper, self).__init__()
144 self.wrapped = wrapped
145 self.failure_count = 0
146
147 def notify_skip(self, name):
148 """Register a check being skipped."""
149 self.wrapped.notify_skip(name)
150
151 def notify_start(self, name, info):
152 """Register the start of a check."""
153 self.failure_count += 1
154 self.wrapped.notify_start(name, info)
155
156 def notify_success(self, name, duration):
157 """Register success."""
158 self.failure_count -= 1
159 self.wrapped.notify_success(name, duration)
160
161 def notify_failure(self, name, info, exc_info, duration):
162 """Register failure."""
163 self.wrapped.notify_failure(name, info, exc_info, duration)
164
165 def any_failed(self):
166 """Return True if any checks using this wrapper failed so far."""
167 return self.failure_count > 0
168
169
170class FunctionCheck(Check):
171 """A Check which takes a check function."""
172
173 def __init__(self, name, check, info=None, blocking=False):
174 """Initialize an instance."""
175 super(FunctionCheck, self).__init__()
176 self.name = name
177 self.info = info
178 self.check_fn = check
179 self.blocking = blocking
180
181 @inlineCallbacks
182 def check(self, pattern, results):
183 """Call the check function."""
184 if not pattern.matches(self.name):
185 returnValue(None)
186 results.notify_start(self.name, self.info)
187 start = time.time()
188 try:
189 if self.blocking:
190 result = yield maybeDeferToThread(self.check_fn)
191 else:
192 result = yield maybeDeferred(self.check_fn)
193 results.notify_success(self.name, time.time() - start)
194 returnValue(result)
195 except Exception:
196 results.notify_failure(self.name, self.info,
197 sys.exc_info(), time.time() - start)
198
199 def skip(self, pattern, results):
200 """Record the skip."""
201 if not pattern.matches(self.name):
202 return
203 results.notify_skip(self.name)
204
205
206class MultiCheck(Check):
207 """A composite check comprised of multiple subchecks."""
208
209 def __init__(self, subchecks, strategy):
210 """Initialize an instance."""
211 super(MultiCheck, self).__init__()
212 self.subchecks = list(subchecks)
213 self.strategy = strategy
214
215 def check(self, pattern, results):
216 """Run subchecks using the strategy supplied at creation time."""
217 return self.strategy(self.subchecks, pattern, results)
218
219 def skip(self, pattern, results):
220 """Skip subchecks."""
221 for subcheck in self.subchecks:
222 subcheck.skip(pattern, results)
223
224
225class PrefixCheckWrapper(Check):
226 """Runs a given check, adding a prefix to its name.
227
228 This works by wrapping the pattern and result tracker objects
229 passed to .check and .skip.
230 """
231
232 def __init__(self, wrapped, prefix):
233 """Initialize an instance."""
234 super(PrefixCheckWrapper, self).__init__()
235 self.wrapped = wrapped
236 self.prefix = prefix
237
238 def do_subcheck(self, subcheck, pattern, results):
239 """Do a subcheck if the pattern could still match."""
240 pattern = pattern.assume_prefix(self.prefix)
241 if not pattern.failed():
242 results = PrefixResultWrapper(wrapped=results,
243 prefix=self.prefix)
244 return subcheck(pattern, results)
245
246 def check(self, pattern, results):
247 """Run the check, prefixing results."""
248 return self.do_subcheck(self.wrapped.check, pattern, results)
249
250 def skip(self, pattern, results):
251 """Skip checks, prefixing results."""
252 self.do_subcheck(self.wrapped.skip, pattern, results)
253
254
255@inlineCallbacks
256def sequential_strategy(subchecks, pattern, results):
257 """Run subchecks sequentially, skipping checks after the first failure.
258
259 This is most useful when the failure of one check in the sequence
260 would imply the failure of later checks -- for example, it probably
261 doesn't make sense to run an SSL check if the basic TCP check failed.
262
263 Use sequential_check to create a meta-check using this strategy.
264 """
265 local_results = FailureCountingResultWrapper(wrapped=results)
266 failed = False
267 for subcheck in subchecks:
268 if failed:
269 subcheck.skip(pattern, local_results)
270 else:
271 yield maybeDeferred(subcheck.check, pattern, local_results)
272 if local_results.any_failed():
273 failed = True
274
275
276def parallel_strategy(subchecks, pattern, results):
277 """A strategy which runs the given subchecks in parallel.
278
279 Most checks can potentially block for long periods, and shouldn't have
280 interdependencies, so it makes sense to run them in parallel to
281 shorten the overall run time.
282
283 Use parallel_check to create a meta-check using this strategy.
284 """
285 deferreds = [maybeDeferred(subcheck.check, pattern, results)
286 for subcheck in subchecks]
287 return DeferredList(deferreds)
288
289
290def parallel_check(subchecks):
291 """Return a check that runs the given subchecks in parallel."""
292 return MultiCheck(subchecks=subchecks, strategy=parallel_strategy)
293
294
295def sequential_check(subchecks):
296 """Return a check that runs the given subchecks in sequence."""
297 return MultiCheck(subchecks=subchecks, strategy=sequential_strategy)
298
299
300def add_check_prefix(*args):
301 """Return an equivalent check with the given prefix prepended to its name.
302
303 The final argument should be a check; the remaining arguments are treated
304 as name components and joined with the check name using periods as
305 separators. For example, if the name of a check is "baz", then:
306
307 add_check_prefix("foo", "bar", check)
308
309 ...will return a check with the effective name "foo.bar.baz".
310 """
311 args = list(args)
312 check = args.pop(-1)
313 path = ".".join(args)
314 return PrefixCheckWrapper(wrapped=check, prefix="%s." % (path,))
315
316
317def make_check(name, check, info=None, blocking=False):
318 """Make a check object from a function."""
319 return FunctionCheck(name=name, check=check, info=info, blocking=blocking)
320
321
322def guard_check(check, predicate):
323 """Wrap a check so that it is skipped unless the predicate is true."""
324 return ConditionalCheck(wrapped=check, predicate=predicate)
325
0326
=== added file 'conn_check/checks.py'
--- conn_check/checks.py 1970-01-01 00:00:00 +0000
+++ conn_check/checks.py 2014-07-24 22:10:31 +0000
@@ -0,0 +1,310 @@
1import glob
2import os
3from pkg_resources import resource_stream
4import urlparse
5
6from OpenSSL import SSL
7from OpenSSL.crypto import load_certificate, FILETYPE_PEM
8
9from twisted.internet import reactor, ssl
10from twisted.internet.error import DNSLookupError, TimeoutError
11from twisted.internet.abstract import isIPAddress
12from twisted.internet.defer import (
13 Deferred,
14 inlineCallbacks,
15 )
16from twisted.internet.protocol import (
17 ClientCreator,
18 DatagramProtocol,
19 Protocol,
20 )
21from twisted.web.client import Agent
22
23from .check_impl import (
24 add_check_prefix,
25 make_check,
26 sequential_check,
27 )
28
29
30CONNECT_TIMEOUT = 10
31CA_CERTS = []
32
33
34for certFileName in glob.glob("/etc/ssl/certs/*.pem"):
35 # There might be some dead symlinks in there, so let's make sure it's real.
36 if os.path.exists(certFileName):
37 data = open(certFileName).read()
38 x509 = load_certificate(FILETYPE_PEM, data)
39 # Now, de-duplicate in case the same cert has multiple names.
40 CA_CERTS.append(x509)
41
42
43class TCPCheckProtocol(Protocol):
44
45 def connectionMade(self):
46 self.transport.loseConnection()
47
48
49class VerifyingContextFactory(ssl.CertificateOptions):
50
51 def __init__(self, verify, caCerts, verifyCallback=None):
52 ssl.CertificateOptions.__init__(self, verify=verify,
53 caCerts=caCerts,
54 method=SSL.SSLv23_METHOD)
55 self.verifyCallback = verifyCallback
56
57 def _makeContext(self):
58 context = ssl.CertificateOptions._makeContext(self)
59 if self.verifyCallback is not None:
60 context.set_verify(
61 SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
62 self.verifyCallback)
63 return context
64
65
66@inlineCallbacks
67def do_tcp_check(host, port, ssl=False, ssl_verify=True):
68 """Generic connection check function."""
69 if not isIPAddress(host):
70 try:
71 ip = yield reactor.resolve(host, timeout=(1, CONNECT_TIMEOUT))
72 except DNSLookupError:
73 raise ValueError("dns resolution failed")
74 else:
75 ip = host
76 creator = ClientCreator(reactor, TCPCheckProtocol)
77 try:
78 if ssl:
79 context = VerifyingContextFactory(ssl_verify, CA_CERTS)
80 yield creator.connectSSL(ip, port, context,
81 timeout=CONNECT_TIMEOUT)
82 else:
83 yield creator.connectTCP(ip, port, timeout=CONNECT_TIMEOUT)
84 except TimeoutError:
85 if ip == host:
86 raise ValueError("timed out")
87 else:
88 raise ValueError("timed out connecting to %s" % ip)
89
90
91def make_tcp_check(host, port, **kwargs):
92 """Return a check for TCP connectivity."""
93 return make_check("tcp.{}:{}".format(host, port), lambda: do_tcp_check(host, port),
94 info="%s:%s" % (host, port))
95
96
97def make_ssl_check(host, port, verify=True, **kwargs):
98 """Return a check for SSL setup."""
99 return make_check("ssl.{}:{}".format(host, port),
100 lambda: do_tcp_check(host, port, ssl=True,
101 ssl_verify=verify),
102 info="%s:%s" % (host, port))
103
104
105class UDPCheckProtocol(DatagramProtocol):
106
107 def __init__(self, host, port, send, expect, deferred=None):
108 self.host = host
109 self.port = port
110 self.send = send
111 self.expect = expect
112 self.deferred = deferred
113
114 def _finish(self, success, result):
115 if not (self.delayed.cancelled or self.delayed.called):
116 self.delayed.cancel()
117 if self.deferred is not None:
118 if success:
119 self.deferred.callback(result)
120 else:
121 self.deferred.errback(result)
122 self.deferred = None
123
124 def startProtocol(self):
125 self.transport.write(self.send, (self.host, self.port))
126 self.delayed = reactor.callLater(CONNECT_TIMEOUT,
127 self._finish,
128 False, TimeoutError())
129
130 def datagramReceived(self, datagram, addr):
131 if datagram == self.expect:
132 self._finish(True, True)
133 else:
134 self._finish(False, ValueError("unexpected reply"))
135
136
137@inlineCallbacks
138def do_udp_check(host, port, send, expect):
139 """Generic connection check function."""
140 if not isIPAddress(host):
141 try:
142 ip = yield reactor.resolve(host, timeout=(1, CONNECT_TIMEOUT))
143 except DNSLookupError:
144 raise ValueError("dns resolution failed")
145 else:
146 ip = host
147 deferred = Deferred()
148 protocol = UDPCheckProtocol(host, port, send, expect, deferred)
149 reactor.listenUDP(0, protocol)
150 try:
151 yield deferred
152 except TimeoutError:
153 if ip == host:
154 raise ValueError("timed out")
155 else:
156 raise ValueError("timed out waiting for %s" % ip)
157
158
159def make_udp_check(host, port, send, expect, **kwargs):
160 """Return a check for UDP connectivity."""
161 return make_check("udp.{}:{}".format(host, port),
162 lambda: do_udp_check(host, port, send, expect),
163 info="%s:%s" % (host, port))
164
165
166def extract_host_port(url):
167 parsed = urlparse.urlparse(url)
168 host = parsed.hostname
169 port = parsed.port
170 scheme = parsed.scheme
171 if not scheme:
172 scheme = 'http'
173 if port is None:
174 if scheme == 'https':
175 port = 443
176 else:
177 port = 80
178 return host, port, scheme
179
180
181def make_http_check(url, method='GET', expected_code=200, **kwargs):
182 subchecks = []
183 host, port, scheme = extract_host_port(url)
184 subchecks.append(make_tcp_check(host, port))
185 if scheme == 'https':
186 subchecks.append(make_ssl_check(host, port))
187
188 @inlineCallbacks
189 def do_request():
190 agent = Agent(reactor)
191 response = yield agent.request(method, url)
192 if response.code != expected_code:
193 raise RuntimeError(
194 "Unexpected response code: {}".format(response.code))
195
196 subchecks.append(make_check('{}.{}'.format(method, url), do_request,
197 info='{} {}'.format(method, url)))
198 return sequential_check(subchecks)
199
200
201def make_amqp_check(host, port, username, password, use_ssl=True, vhost="/", **kwargs):
202 """Return a check for AMQP connectivity."""
203 from txamqp.protocol import AMQClient
204 from txamqp.client import TwistedDelegate
205 from txamqp.spec import load as load_spec
206
207 subchecks = []
208 subchecks.append(make_tcp_check(host, port))
209
210 if use_ssl:
211 subchecks.append(make_ssl_check(host, port, verify=False))
212
213 @inlineCallbacks
214 def do_auth():
215 """Connect and authenticate."""
216 delegate = TwistedDelegate()
217 spec = load_spec(resource_stream('conn_check', 'amqp0-8.xml'))
218 creator = ClientCreator(reactor, AMQClient,
219 delegate, vhost, spec)
220 client = yield creator.connectTCP(host, port, timeout=CONNECT_TIMEOUT)
221 yield client.authenticate(username, password)
222
223 subchecks.append(make_check("auth", do_auth,
224 info="user %s" % (username,),))
225 return sequential_check(subchecks)
226
227
228def make_postgres_check(host, port, username, password, database, **kwargs):
229 """Return a check for Postgres connectivity."""
230
231 import psycopg2
232 subchecks = []
233 connect_kw = {'host': host, 'user': username, 'database': database}
234
235 if host[0] != '/':
236 connect_kw['port'] = port
237 subchecks.append(make_tcp_check(host, port))
238
239 if password is not None:
240 connect_kw['password'] = password
241
242 def check_auth():
243 """Try to establish a postgres connection and log in."""
244 conn = psycopg2.connect(**connect_kw)
245 conn.close()
246
247 subchecks.append(make_check("auth", check_auth,
248 info="user %s" % (username,),
249 blocking=True))
250 return sequential_check(subchecks)
251
252
253def make_redis_check(host, port, password=None, **kwargs):
254 """Make a check for the configured redis server."""
255 import txredis
256 subchecks = []
257 subchecks.append(make_tcp_check(host, port))
258
259 @inlineCallbacks
260 def do_connect():
261 """Connect and authenticate.
262 """
263 client_creator = ClientCreator(reactor, txredis.client.RedisClient)
264 client = yield client_creator.connectTCP(host=host, port=port,
265 timeout=CONNECT_TIMEOUT)
266
267 if password is None:
268 ping = yield client.ping()
269 if not ping:
270 raise RuntimeError("failed to ping redis")
271 else:
272 resp = yield client.auth(password)
273 if resp != 'OK':
274 raise RuntimeError("failed to auth to redis")
275
276 connect_info = "connect with auth" if password is not None else "connect"
277 subchecks.append(make_check(connect_info, do_connect))
278 return add_check_prefix('redis', sequential_check(subchecks))
279
280
281CHECKS = {
282 'tcp': {
283 'fn': make_tcp_check,
284 'args': ['host', 'port'],
285 },
286 'ssl': {
287 'fn': make_ssl_check,
288 'args': ['host', 'port'],
289 },
290 'udp': {
291 'fn': make_udp_check,
292 'args': ['host', 'port', 'send', 'expect'],
293 },
294 'http': {
295 'fn': make_http_check,
296 'args': ['url'],
297 },
298 'amqp': {
299 'fn': make_amqp_check,
300 'args': ['host', 'port', 'username', 'password'],
301 },
302 'postgres': {
303 'fn': make_postgres_check,
304 'args': ['host', 'port', 'username', 'password', 'database'],
305 },
306 'redis': {
307 'fn': make_redis_check,
308 'args': ['host', 'port'],
309 },
310}
0311
=== added file 'conn_check/main.py'
--- conn_check/main.py 1970-01-01 00:00:00 +0000
+++ conn_check/main.py 2014-07-24 22:10:31 +0000
@@ -0,0 +1,181 @@
1from argparse import ArgumentParser
2import sys
3from threading import Thread
4import time
5import traceback
6import yaml
7
8from twisted.internet import reactor
9from twisted.internet.defer import (
10 inlineCallbacks,
11 )
12from twisted.python.threadpool import ThreadPool
13
14from .check_impl import (
15 FailureCountingResultWrapper,
16 parallel_check,
17 ResultTracker,
18 )
19from .checks import CHECKS
20from .patterns import (
21 SimplePattern,
22 SumPattern,
23 )
24
25
26def check_from_description(check_description):
27 _type = check_description['type']
28 check = CHECKS.get(_type, None)
29 if check is None:
30 raise AssertionError("Unknown check type: {}, available checks: {}".format(
31 _type, CHECKS.keys()))
32 for arg in check['args']:
33 if arg not in check_description:
34 raise AssertionError('{} missing from check: {}'.format(arg,
35 check_description))
36 res = check['fn'](**check_description)
37 return res
38
39
40def build_checks(check_descriptions):
41 subchecks = map(check_from_description, check_descriptions)
42 return parallel_check(subchecks)
43
44
45@inlineCallbacks
46def run_checks(checks, pattern, results):
47 """Make and run all the pertinent checks."""
48 try:
49 yield checks.check(pattern, results)
50 finally:
51 reactor.stop()
52
53
54class TimestampOutput(object):
55
56 def __init__(self, output):
57 self.start = time.time()
58 self.output = output
59
60 def write(self, data):
61 self.output.write("%.3f: %s" % (time.time() - self.start, data))
62
63
64class ConsoleOutput(ResultTracker):
65 """Displays check results."""
66
67 def __init__(self, output, verbose, show_tracebacks, show_duration):
68 """Initialize an instance."""
69 super(ConsoleOutput, self).__init__()
70 self.output = output
71 self.verbose = verbose
72 self.show_tracebacks = show_tracebacks
73 self.show_duration = show_duration
74
75 def format_duration(self, duration):
76 if not self.show_duration:
77 return ""
78 return " (%.3f ms)" % duration
79
80 def notify_start(self, name, info):
81 """Register the start of a check."""
82 if self.verbose:
83 if info:
84 info = " (%s)" % (info,)
85 self.output.write("Starting %s%s...\n" % (name, info or ''))
86
87 def notify_skip(self, name):
88 """Register a check being skipped."""
89 self.output.write("SKIPPING %s\n" % (name,))
90
91 def notify_success(self, name, duration):
92 """Register a success."""
93 self.output.write("OK %s%s\n" % (
94 name, self.format_duration(duration)))
95
96 def notify_failure(self, name, info, exc_info, duration):
97 """Register a failure."""
98 message = str(exc_info[1]).split("\n")[0]
99 if info:
100 message = "(%s): %s" % (info, message)
101 self.output.write("FAILED %s%s: %s\n" % (
102 name, self.format_duration(duration), message))
103 if self.show_tracebacks:
104 formatted = traceback.format_exception(exc_info[0],
105 exc_info[1],
106 exc_info[2],
107 None)
108 lines = "".join(formatted).split("\n")
109 if len(lines) > 0 and len(lines[-1]) == 0:
110 lines.pop()
111 indented = "\n".join([" %s" % (line,) for line in lines])
112 self.output.write("%s\n" % (indented,))
113
114
115def main(*args):
116 """Parse arguments, then build and run checks in a reactor."""
117 parser = ArgumentParser()
118 parser.add_argument("config_file",
119 help="Config file specifying the checks to run.")
120 parser.add_argument("patterns", nargs='*',
121 help="Patterns to filter the checks.")
122 parser.add_argument("-v", "--verbose", dest="verbose",
123 action="store_true", default=False,
124 help="Show additional status")
125 parser.add_argument("-d", "--duration", dest="show_duration",
126 action="store_true", default=False,
127 help="Show duration")
128 parser.add_argument("-t", "--tracebacks", dest="show_tracebacks",
129 action="store_true", default=False,
130 help="Show tracebacks on failure")
131 parser.add_argument("--validate", dest="validate",
132 action="store_true", default=False,
133 help="Only validate the config file, don't run checks.")
134 options = parser.parse_args(list(args))
135
136 if options.patterns:
137 pattern = SumPattern(map(SimplePattern, options.patterns))
138 else:
139 pattern = SimplePattern("*")
140
141 def make_daemon_thread(*args, **kw):
142 """Create a daemon thread."""
143 thread = Thread(*args, **kw)
144 thread.daemon = True
145 return thread
146
147 threadpool = ThreadPool(minthreads=1)
148 threadpool.threadFactory = make_daemon_thread
149 reactor.threadpool = threadpool
150 reactor.callWhenRunning(threadpool.start)
151
152 output = sys.stdout
153 if options.show_duration:
154 output = TimestampOutput(output)
155
156 results = ConsoleOutput(output=output,
157 show_tracebacks=options.show_tracebacks,
158 show_duration=options.show_duration,
159 verbose=options.verbose)
160 results = FailureCountingResultWrapper(results)
161 with open(options.config_file) as f:
162 descriptions = yaml.load(f)
163 checks = build_checks(descriptions)
164 if not options.validate:
165 reactor.callWhenRunning(run_checks, checks, pattern, results)
166
167 reactor.run()
168
169 if results.any_failed():
170 return 1
171 else:
172 return 0
173
174
175def run():
176 exit(main(*sys.argv[1:]))
177
178
179if __name__ == '__main__':
180 run()
181
0182
=== added file 'conn_check/patterns.py'
--- conn_check/patterns.py 1970-01-01 00:00:00 +0000
+++ conn_check/patterns.py 2014-07-24 22:10:31 +0000
@@ -0,0 +1,153 @@
1import re
2
3class Pattern(object):
4 """Abstract base class for patterns used to select subsets of checks."""
5
6 def assume_prefix(self, prefix):
7 """Return an equivalent pattern with the given prefix baked in.
8
9 For example, if self.matches("bar") is True, then
10 self.assume_prefix("foo").matches("foobar") will be True.
11 """
12 return PrefixPattern(prefix, self)
13
14 def failed(self):
15 """Return True if the pattern cannot match any string.
16
17 This is mainly used so we can bail out early when recursing into
18 check trees.
19 """
20 return not self.prefix_matches("")
21
22 def prefix_matches(self, partial_name):
23 """Return True if the partial name (a prefix) is a potential match."""
24 raise NotImplementedError("%r.prefix_matches not implemented" %
25 type(self))
26
27 def matches(self, name):
28 """Return True if the given name matches."""
29 raise NotImplementedError("%r.match not implemented" %
30 type(self))
31
32
33class FailedPattern(Pattern):
34 """Patterns that always fail to match."""
35
36 def assume_prefix(self, prefix):
37 """Return an equivalent pattern with the given prefix baked in."""
38 return FAILED_PATTERN
39
40 def prefix_matches(self, partial_name):
41 """Return True if the partial name matches."""
42 return False
43
44 def matches(self, name):
45 """Return True if the complete name matches."""
46 return False
47
48
49FAILED_PATTERN = FailedPattern()
50
51
52PATTERN_TOKEN_RE = re.compile(r'\*|[^*]+')
53
54
55def tokens_to_partial_re(tokens):
56 """Convert tokens to a regular expression for matching prefixes."""
57
58 def token_to_re(token):
59 """Convert tokens to (begin, end, alt_end) triples."""
60 if token == '*':
61 return (r'(?:.*', ')?', ')')
62 else:
63 chars = list(token)
64 begin = "".join(["(?:" + re.escape(c) for c in chars])
65 end = "".join([")?" for c in chars])
66 return (begin, end, end)
67
68 subexprs = map(token_to_re, tokens)
69 if len(subexprs) > 0:
70 # subexpressions like (.*)? aren't accepted, so we may have to use
71 # an alternate closing form for the last (innermost) subexpression
72 (begin, _, alt_end) = subexprs[-1]
73 subexprs[-1] = (begin, alt_end, alt_end)
74 return re.compile("".join([se[0] for se in subexprs] +
75 [se[1] for se in reversed(subexprs)] +
76 [r'\Z']))
77
78
79def tokens_to_re(tokens):
80 """Convert tokens to a regular expression for exact matching."""
81
82 def token_to_re(token):
83 """Convert tokens to simple regular expressions."""
84 if token == '*':
85 return r'.*'
86 else:
87 return re.escape(token)
88
89 return re.compile("".join(map(token_to_re, tokens) + [r'\Z']))
90
91
92class SimplePattern(Pattern):
93 """Pattern that matches according to the given pattern expression."""
94
95 def __init__(self, pattern):
96 """Initialize an instance."""
97 super(SimplePattern, self).__init__()
98 tokens = PATTERN_TOKEN_RE.findall(pattern)
99 self.partial_re = tokens_to_partial_re(tokens)
100 self.full_re = tokens_to_re(tokens)
101
102 def prefix_matches(self, partial_name):
103 """Return True if the partial name matches."""
104 return self.partial_re.match(partial_name) is not None
105
106 def matches(self, name):
107 """Return True if the complete name matches."""
108 return self.full_re.match(name) is not None
109
110
111class PrefixPattern(Pattern):
112 """Pattern that assumes a previously given prefix."""
113
114 def __init__(self, prefix, pattern):
115 """Initialize an instance."""
116 super(PrefixPattern, self).__init__()
117 self.prefix = prefix
118 self.pattern = pattern
119
120 def assume_prefix(self, prefix):
121 """Return an equivalent pattern with the given prefix baked in."""
122 return PrefixPattern(self.prefix + prefix, self.pattern)
123
124 def prefix_matches(self, partial_name):
125 """Return True if the partial name matches."""
126 return self.pattern.prefix_matches(self.prefix + partial_name)
127
128 def matches(self, name):
129 """Return True if the complete name matches."""
130 return self.pattern.matches(self.prefix + name)
131
132
133class SumPattern(Pattern):
134 """Pattern that matches if at least one given pattern matches."""
135
136 def __init__(self, patterns):
137 """Initialize an instance."""
138 super(SumPattern, self).__init__()
139 self.patterns = patterns
140
141 def prefix_matches(self, partial_name):
142 """Return True if the partial name matches."""
143 for pattern in self.patterns:
144 if pattern.prefix_matches(partial_name):
145 return True
146 return False
147
148 def matches(self, name):
149 """Return True if the complete name matches."""
150 for pattern in self.patterns:
151 if pattern.matches(name):
152 return True
153 return False
0154
=== modified file 'demo.yaml'
--- demo.yaml 2014-07-24 19:30:30 +0000
+++ demo.yaml 2014-07-24 22:10:31 +0000
@@ -14,5 +14,5 @@
14 host: 127.0.0.114 host: 127.0.0.1
15 port: 637915 port: 6379
16 password: foobared16 password: foobared
17- type: url17- type: http
18 url: https://login.ubuntu.com/18 url: https://login.ubuntu.com/
1919
=== modified file 'setup.py'
--- setup.py 2014-07-24 15:20:04 +0000
+++ setup.py 2014-07-24 22:10:31 +0000
@@ -27,7 +27,7 @@
27 include_package_data=True,27 include_package_data=True,
28 entry_points={28 entry_points={
29 'console_scripts': [29 'console_scripts': [
30 'conn-check = conn_check:run',30 'conn-check = conn_check.main:run',
31 ],31 ],
32 },32 },
33 license='GPL3',33 license='GPL3',
3434
=== modified file 'tests.py'
--- tests.py 2014-07-24 21:09:27 +0000
+++ tests.py 2014-07-24 22:10:31 +0000
@@ -3,7 +3,28 @@
33
4from testtools import matchers4from testtools import matchers
55
6import conn_check6from conn_check.check_impl import (
7 FunctionCheck,
8 MultiCheck,
9 parallel_strategy,
10 PrefixCheckWrapper,
11 sequential_strategy,
12 )
13from conn_check.checks import (
14 CHECKS,
15 extract_host_port,
16 make_amqp_check,
17 make_http_check,
18 make_postgres_check,
19 make_redis_check,
20 make_ssl_check,
21 make_tcp_check,
22 make_udp_check,
23 )
24from conn_check.main import (
25 build_checks,
26 check_from_description,
27 )
728
829
9class FunctionCheckMatcher(testtools.Matcher):30class FunctionCheckMatcher(testtools.Matcher):
@@ -15,7 +36,7 @@
1536
16 def match(self, matchee):37 def match(self, matchee):
17 checks = []38 checks = []
18 checks.append(matchers.IsInstance(conn_check.FunctionCheck))39 checks.append(matchers.IsInstance(FunctionCheck))
19 checks.append(matchers.Annotate(40 checks.append(matchers.Annotate(
20 "name doesn't match",41 "name doesn't match",
21 matchers.AfterPreprocessing(operator.attrgetter('name'),42 matchers.AfterPreprocessing(operator.attrgetter('name'),
@@ -43,7 +64,7 @@
4364
44 def match(self, matchee):65 def match(self, matchee):
45 checks = []66 checks = []
46 checks.append(matchers.IsInstance(conn_check.MultiCheck))67 checks.append(matchers.IsInstance(MultiCheck))
47 checks.append(matchers.AfterPreprocessing(operator.attrgetter('strategy'),68 checks.append(matchers.AfterPreprocessing(operator.attrgetter('strategy'),
48 matchers.Is(self.strategy)))69 matchers.Is(self.strategy)))
49 checks.append(matchers.AfterPreprocessing(operator.attrgetter('subchecks'),70 checks.append(matchers.AfterPreprocessing(operator.attrgetter('subchecks'),
@@ -58,40 +79,40 @@
58class ExtractHostPortTests(testtools.TestCase):79class ExtractHostPortTests(testtools.TestCase):
5980
60 def test_basic(self):81 def test_basic(self):
61 self.assertEqual(conn_check.extract_host_port('http://localhost:80/'),82 self.assertEqual(extract_host_port('http://localhost:80/'),
62 ('localhost', 80, 'http'))83 ('localhost', 80, 'http'))
6384
64 def test_no_scheme(self):85 def test_no_scheme(self):
65 self.assertEqual(conn_check.extract_host_port('//localhost/'),86 self.assertEqual(extract_host_port('//localhost/'),
66 ('localhost', 80, 'http'))87 ('localhost', 80, 'http'))
6788
68 def test_no_port_http(self):89 def test_no_port_http(self):
69 self.assertEqual(conn_check.extract_host_port('http://localhost/'),90 self.assertEqual(extract_host_port('http://localhost/'),
70 ('localhost', 80, 'http'))91 ('localhost', 80, 'http'))
7192
72 def test_no_port_https(self):93 def test_no_port_https(self):
73 self.assertEqual(conn_check.extract_host_port('https://localhost/'),94 self.assertEqual(extract_host_port('https://localhost/'),
74 ('localhost', 443, 'https'))95 ('localhost', 443, 'https'))
7596
7697
77class ConnCheckTest(testtools.TestCase):98class ConnCheckTest(testtools.TestCase):
7899
79 def test_make_tcp_check(self):100 def test_make_tcp_check(self):
80 result = conn_check.make_tcp_check('localhost', 8080)101 result = make_tcp_check('localhost', 8080)
81 self.assertThat(result, FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))102 self.assertThat(result, FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))
82103
83 def test_make_ssl_check(self):104 def test_make_ssl_check(self):
84 result = conn_check.make_ssl_check('localhost', 8080, verify=True)105 result = make_ssl_check('localhost', 8080, verify=True)
85 self.assertThat(result, FunctionCheckMatcher('ssl.localhost:8080', 'localhost:8080'))106 self.assertThat(result, FunctionCheckMatcher('ssl.localhost:8080', 'localhost:8080'))
86107
87 def test_make_udp_check(self):108 def test_make_udp_check(self):
88 result = conn_check.make_udp_check('localhost', 8080, 'foo', 'bar')109 result = make_udp_check('localhost', 8080, 'foo', 'bar')
89 self.assertThat(result, FunctionCheckMatcher('udp.localhost:8080', 'localhost:8080'))110 self.assertThat(result, FunctionCheckMatcher('udp.localhost:8080', 'localhost:8080'))
90111
91 def test_make_http_check(self):112 def test_make_http_check(self):
92 result = conn_check.make_http_check('http://localhost/')113 result = make_http_check('http://localhost/')
93 self.assertThat(result,114 self.assertThat(result,
94 MultiCheckMatcher(strategy=conn_check.sequential_strategy,115 MultiCheckMatcher(strategy=sequential_strategy,
95 subchecks=[116 subchecks=[
96 FunctionCheckMatcher('tcp.localhost:80', 'localhost:80'),117 FunctionCheckMatcher('tcp.localhost:80', 'localhost:80'),
97 FunctionCheckMatcher('GET.http://localhost/', 'GET http://localhost/')118 FunctionCheckMatcher('GET.http://localhost/', 'GET http://localhost/')
@@ -99,9 +120,9 @@
99 ))120 ))
100121
101 def test_make_http_check_https(self):122 def test_make_http_check_https(self):
102 result = conn_check.make_http_check('https://localhost/')123 result = make_http_check('https://localhost/')
103 self.assertThat(result,124 self.assertThat(result,
104 MultiCheckMatcher(strategy=conn_check.sequential_strategy,125 MultiCheckMatcher(strategy=sequential_strategy,
105 subchecks=[126 subchecks=[
106 FunctionCheckMatcher('tcp.localhost:443', 'localhost:443'),127 FunctionCheckMatcher('tcp.localhost:443', 'localhost:443'),
107 FunctionCheckMatcher('ssl.localhost:443', 'localhost:443'),128 FunctionCheckMatcher('ssl.localhost:443', 'localhost:443'),
@@ -110,10 +131,10 @@
110 ))131 ))
111132
112 def test_make_amqp_check(self):133 def test_make_amqp_check(self):
113 result = conn_check.make_amqp_check('localhost', 8080, 'foo',134 result = make_amqp_check('localhost', 8080, 'foo',
114 'bar', use_ssl=True, vhost='/')135 'bar', use_ssl=True, vhost='/')
115 self.assertIsInstance(result, conn_check.MultiCheck)136 self.assertIsInstance(result, MultiCheck)
116 self.assertIs(result.strategy, conn_check.sequential_strategy)137 self.assertIs(result.strategy, sequential_strategy)
117 self.assertEqual(len(result.subchecks), 3)138 self.assertEqual(len(result.subchecks), 3)
118 self.assertThat(result.subchecks[0],139 self.assertThat(result.subchecks[0],
119 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))140 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))
@@ -122,20 +143,20 @@
122 self.assertThat(result.subchecks[2], FunctionCheckMatcher('auth', 'user foo'))143 self.assertThat(result.subchecks[2], FunctionCheckMatcher('auth', 'user foo'))
123144
124 def test_make_amqp_check_no_ssl(self):145 def test_make_amqp_check_no_ssl(self):
125 result = conn_check.make_amqp_check('localhost', 8080, 'foo',146 result = make_amqp_check('localhost', 8080, 'foo',
126 'bar', use_ssl=False, vhost='/')147 'bar', use_ssl=False, vhost='/')
127 self.assertIsInstance(result, conn_check.MultiCheck)148 self.assertIsInstance(result, MultiCheck)
128 self.assertIs(result.strategy, conn_check.sequential_strategy)149 self.assertIs(result.strategy, sequential_strategy)
129 self.assertEqual(len(result.subchecks), 2)150 self.assertEqual(len(result.subchecks), 2)
130 self.assertThat(result.subchecks[0],151 self.assertThat(result.subchecks[0],
131 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))152 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))
132 self.assertThat(result.subchecks[1], FunctionCheckMatcher('auth', 'user foo'))153 self.assertThat(result.subchecks[1], FunctionCheckMatcher('auth', 'user foo'))
133154
134 def test_make_postgres_check(self):155 def test_make_postgres_check(self):
135 result = conn_check.make_postgres_check('localhost', 8080,'foo',156 result = make_postgres_check('localhost', 8080,'foo',
136 'bar', 'test')157 'bar', 'test')
137 self.assertIsInstance(result, conn_check.MultiCheck)158 self.assertIsInstance(result, MultiCheck)
138 self.assertIs(result.strategy, conn_check.sequential_strategy)159 self.assertIs(result.strategy, sequential_strategy)
139 self.assertEqual(len(result.subchecks), 2)160 self.assertEqual(len(result.subchecks), 2)
140 self.assertThat(result.subchecks[0],161 self.assertThat(result.subchecks[0],
141 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))162 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))
@@ -143,33 +164,33 @@
143 FunctionCheckMatcher('auth', 'user foo', blocking=True))164 FunctionCheckMatcher('auth', 'user foo', blocking=True))
144165
145 def test_make_postgres_check_local_socket(self):166 def test_make_postgres_check_local_socket(self):
146 result = conn_check.make_postgres_check('/local.sock', 8080,'foo',167 result = make_postgres_check('/local.sock', 8080,'foo',
147 'bar', 'test')168 'bar', 'test')
148 self.assertIsInstance(result, conn_check.MultiCheck)169 self.assertIsInstance(result, MultiCheck)
149 self.assertIs(result.strategy, conn_check.sequential_strategy)170 self.assertIs(result.strategy, sequential_strategy)
150 self.assertEqual(len(result.subchecks), 1)171 self.assertEqual(len(result.subchecks), 1)
151 self.assertThat(result.subchecks[0],172 self.assertThat(result.subchecks[0],
152 FunctionCheckMatcher('auth', 'user foo', blocking=True))173 FunctionCheckMatcher('auth', 'user foo', blocking=True))
153174
154 def test_make_redis_check(self):175 def test_make_redis_check(self):
155 result = conn_check.make_redis_check('localhost', 8080)176 result = make_redis_check('localhost', 8080)
156 self.assertIsInstance(result, conn_check.PrefixCheckWrapper)177 self.assertIsInstance(result, PrefixCheckWrapper)
157 self.assertEqual(result.prefix, 'redis.')178 self.assertEqual(result.prefix, 'redis.')
158 wrapped = result.wrapped179 wrapped = result.wrapped
159 self.assertIsInstance(wrapped, conn_check.MultiCheck)180 self.assertIsInstance(wrapped, MultiCheck)
160 self.assertIs(wrapped.strategy, conn_check.sequential_strategy)181 self.assertIs(wrapped.strategy, sequential_strategy)
161 self.assertEqual(len(wrapped.subchecks), 2)182 self.assertEqual(len(wrapped.subchecks), 2)
162 self.assertThat(wrapped.subchecks[0],183 self.assertThat(wrapped.subchecks[0],
163 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))184 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))
164 self.assertThat(wrapped.subchecks[1], FunctionCheckMatcher('connect', None))185 self.assertThat(wrapped.subchecks[1], FunctionCheckMatcher('connect', None))
165186
166 def test_make_redis_check_with_password(self):187 def test_make_redis_check_with_password(self):
167 result = conn_check.make_redis_check('localhost', 8080, 'foobar')188 result = make_redis_check('localhost', 8080, 'foobar')
168 self.assertIsInstance(result, conn_check.PrefixCheckWrapper)189 self.assertIsInstance(result, PrefixCheckWrapper)
169 self.assertEqual(result.prefix, 'redis.')190 self.assertEqual(result.prefix, 'redis.')
170 wrapped = result.wrapped191 wrapped = result.wrapped
171 self.assertIsInstance(wrapped, conn_check.MultiCheck)192 self.assertIsInstance(wrapped, MultiCheck)
172 self.assertIs(wrapped.strategy, conn_check.sequential_strategy)193 self.assertIs(wrapped.strategy, sequential_strategy)
173 self.assertEqual(len(wrapped.subchecks), 2)194 self.assertEqual(len(wrapped.subchecks), 2)
174 self.assertThat(wrapped.subchecks[0],195 self.assertThat(wrapped.subchecks[0],
175 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))196 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))
@@ -178,28 +199,28 @@
178199
179 def test_check_from_description_unknown_type(self):200 def test_check_from_description_unknown_type(self):
180 e = self.assertRaises(AssertionError,201 e = self.assertRaises(AssertionError,
181 conn_check.check_from_description, {'type': 'foo'})202 check_from_description, {'type': 'foo'})
182 self.assertEqual(203 self.assertEqual(
183 str(e),204 str(e),
184 "Unknown check type: foo, available checks: {}".format(conn_check.CHECKS.keys()))205 "Unknown check type: foo, available checks: {}".format(CHECKS.keys()))
185206
186 def test_check_from_description_missing_arg(self):207 def test_check_from_description_missing_arg(self):
187 description = {'type': 'tcp'}208 description = {'type': 'tcp'}
188 e = self.assertRaises(AssertionError,209 e = self.assertRaises(AssertionError,
189 conn_check.check_from_description, description)210 check_from_description, description)
190 self.assertEqual(211 self.assertEqual(
191 str(e),212 str(e),
192 "host missing from check: {}".format(description))213 "host missing from check: {}".format(description))
193214
194 def test_check_from_description_makes_check(self):215 def test_check_from_description_makes_check(self):
195 description = {'type': 'tcp', 'host': 'localhost', 'port': '8080'}216 description = {'type': 'tcp', 'host': 'localhost', 'port': '8080'}
196 result = conn_check.check_from_description(description)217 result = check_from_description(description)
197 self.assertThat(result,218 self.assertThat(result,
198 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))219 FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080'))
199220
200 def test_build_checks(self):221 def test_build_checks(self):
201 description = [{'type': 'tcp', 'host': 'localhost', 'port': '8080'}]222 description = [{'type': 'tcp', 'host': 'localhost', 'port': '8080'}]
202 result = conn_check.build_checks(description)223 result = build_checks(description)
203 self.assertThat(result,224 self.assertThat(result,
204 MultiCheckMatcher(strategy=conn_check.parallel_strategy,225 MultiCheckMatcher(strategy=parallel_strategy,
205 subchecks=[FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080')]))226 subchecks=[FunctionCheckMatcher('tcp.localhost:8080', 'localhost:8080')]))

Subscribers

People subscribed via source and target branches