Merge lp:~jimbaker/juju-jitsu/watch-subcommand into lp:juju-jitsu

Proposed by Jim Baker
Status: Merged
Merge reported by: Mark Mims
Merged at revision: not available
Proposed branch: lp:~jimbaker/juju-jitsu/watch-subcommand
Merge into: lp:juju-jitsu
Diff against target: 702 lines (+678/-0)
5 files modified
sub-commands/aiki/cli.py (+108/-0)
sub-commands/aiki/twistutils.py (+20/-0)
sub-commands/aiki/watcher.py (+277/-0)
sub-commands/topodump (+31/-0)
sub-commands/watch (+242/-0)
To merge this branch: bzr merge lp:~jimbaker/juju-jitsu/watch-subcommand
Reviewer Review Type Date Requested Status
Mark Mims (community) Approve
Review via email: mp+109438@code.launchpad.net

Description of the change

Adds a new jitsu subcommand, watch. It directly uses the Juju API where possible, otherwise ZK, to wait for a set of conditions about a Juju environment to become true. It does not timeout, instead use the core timeout command if that is desired.

The help gives the full flavor of what can be done:

usage: watch [-h] [-e ENVIRONMENT]
           [--loglevel CRITICAL|ERROR|WARNING|INFO|DEBUG] [--verbose]
           [--any] [--number]
           CONDITION [CONDITION ...]

Watch Juju environment for condition(s) to become true, waiting as necessary.

Each condition is specified as follows:

  INSTANCE Either service, relation, or service unit. For a
                        relation, either its relation id (example: db:0) or
                        descriptors can be used (examples: "mysql wordpress"
                        or "mysql:db wordpress:db").

  -n NUM, --num-units NUM
                        Number of units; only applies to services
  -r RELATION, --relation RELATION
                        Specify relation for unit or service
  --setting SETTING Relation setting exists
  --state STATE State of unit or for at least all NUM units of service
  --x-state STATE Cannot be in this state
  [NAME=SETTING [NAME=SETTING ... ]]
                        These settings must be as specified for the unit
                        or for at least all NUM units of a service

optional arguments:
  -h, --help show this help message and exit
  -e ENVIRONMENT, --environment ENVIRONMENT
                        Environment to act upon (otherwise uses default)
  --loglevel CRITICAL|ERROR|WARNING|INFO|DEBUG
                        Log level
  --verbose, -v Enable verbose logging
  --any Any of the conditions may be true
  --number Number output by the corresponding condition

Examples of watches with single conditions:

$ relation_id=`jitsu watch "wordpress mysql"` # waits until relation available and prints id; must be quoted
$ relation_id=`jitsu watch "wordpress:db mysql:app"` # like above, but with fully specified descriptors
$ jitsu watch mysql # service is deployed
$ jitsu watch mysql/0 --state=started # unit is started
$ timeout 60s jitsu watch mysql/0 --state=started # watch up to 60s for mysql/0 to be running, using core timeout command
$ jitsu watch -r db:0 mysql/0 foo=bar # watch for foo to be set to bar on mysql/0 on relation id of db:0
$ jitsu watch mysql/0 -r "wordpress mysql" foo=bar # watch for wordpress<->mysql, then watch for this setting
$ jitsu watch mysql/0 -r db:0 foo= # watch for foo to be unset
$ jitsu watch mysql/0 -r db:0 --setting=foo # watch for foo to be set to *some* value

Multiple conditions can be combined:

$ jitsu watch mysql/0 -r "wordpress mysql" foo= mysql/1 -r "wordpress mysql" foo=bar # all conditions must apply
$ jitsu --any watch ... # any of the conditions may apply

To post a comment you must log in.
Revision history for this message
Mark Mims (mark-mims) wrote :

looks great... does exactly what I need for testing

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'sub-commands/aiki/cli.py'
2--- sub-commands/aiki/cli.py 1970-01-01 00:00:00 +0000
3+++ sub-commands/aiki/cli.py 2012-06-08 21:25:21 +0000
4@@ -0,0 +1,108 @@
5+import argparse
6+import logging
7+import sys
8+import traceback
9+
10+
11+from twisted.internet import reactor
12+from twisted.internet.defer import inlineCallbacks
13+import yaml
14+import zookeeper
15+
16+from aiki.introspect import get_zk_client_connector
17+
18+
19+LOG_LEVELS = dict(
20+ CRITICAL=logging.CRITICAL,
21+ ERROR=logging.ERROR,
22+ WARNING=logging.WARNING,
23+ INFO=logging.INFO,
24+ DEBUG=logging.DEBUG)
25+
26+
27+def make_arg_parser(*args, **kwargs):
28+ parser = argparse.ArgumentParser(*args, **kwargs)
29+ parser.add_argument(
30+ "-e", "--environment", default=None, help="Environment to act upon (otherwise uses default)", metavar="ENVIRONMENT")
31+ parser.add_argument(
32+ "--loglevel", default=None, choices=LOG_LEVELS, help="Log level",
33+ metavar="CRITICAL|ERROR|WARNING|INFO|DEBUG")
34+ parser.add_argument("--verbose", "-v", default=False, action="store_true", help="Enable verbose logging")
35+ return parser
36+
37+
38+def setup_logging(options):
39+ options.loglevel = LOG_LEVELS.get(options.loglevel)
40+ if options.verbose:
41+ if options.loglevel is None:
42+ options.loglevel = logging.DEBUG
43+ else:
44+ # Avoid potentially voluminous ZK debug logging
45+ zookeeper.set_debug_level(0)
46+ if options.loglevel is None:
47+ options.loglevel = logging.INFO
48+
49+ log_options = {
50+ "level": options.loglevel,
51+ "format": "%(asctime)s %(name)s:%(levelname)s %(message)s"
52+ }
53+ logging.basicConfig(**log_options)
54+
55+
56+def run_command(command, options):
57+ """Connect to ZK, either from admin machine or on a Juju machine, then run command"""
58+ # Introspect to find the right client connector; this will be
59+ # called, then yielded upon, to get a ZK client once async code is
60+ # entered
61+ try:
62+ connector = get_zk_client_connector(options)
63+ except Exception, e:
64+ print >> sys.stderr, e
65+ sys.exit(1)
66+
67+ exit_code = [0]
68+ reactor.callWhenRunning(_run_command_in_reactor, command, exit_code, connector, options)
69+ reactor.run()
70+ sys.exit(exit_code[0])
71+
72+
73+@inlineCallbacks
74+def _run_command_in_reactor(command, exit_code, connector, options):
75+ try:
76+ yield _wrapped_command(command, exit_code, connector, options)
77+ except Exception, e:
78+ exit_code[0] = 1
79+ if options.verbose:
80+ traceback.print_exc() # Writes to stderr
81+ else:
82+ print >> sys.stderr, e
83+ finally:
84+ reactor.stop()
85+
86+
87+@inlineCallbacks
88+def _wrapped_command(command, result, connector, options):
89+ client = yield connector()
90+ try:
91+ yield command(result, client, options)
92+ finally:
93+ yield client.close()
94+
95+
96+def parse_relation_settings(args):
97+ """Given args, return a list of the remaining args along with settings"""
98+ settings = {}
99+ kv_iter = iter(args)
100+ remaining_args = []
101+ for kv in kv_iter:
102+ if "=" not in kv:
103+ remaining_args = [kv]
104+ break
105+ k, v = kv.split("=", 1)
106+ if v.startswith("@"):
107+ filename = v[1:]
108+ with open(filename, "r") as f:
109+ v = f.read()
110+ settings[k] = yaml.safe_load(v)
111+ remaining_args.extend(kv_iter)
112+ return settings, remaining_args
113
114=== added file 'sub-commands/aiki/twistutils.py'
115--- sub-commands/aiki/twistutils.py 1970-01-01 00:00:00 +0000
116+++ sub-commands/aiki/twistutils.py 2012-06-08 21:25:21 +0000
117@@ -0,0 +1,20 @@
118+from twisted.internet.defer import Deferred
119+
120+
121+class CountDownLatch(object):
122+ # NOTE For now, we only support successful callbacks, not errbacks
123+ def __init__(self, count, deferred_list=()):
124+ self.count = count
125+ self.completed = Deferred()
126+ for d in deferred_list:
127+ self.add(d)
128+
129+ def add(self, deferred):
130+ deferred.addCallback(self.count_down)
131+
132+ def count_down(self, *ignored):
133+ if self.count > 0:
134+ self.count -= 1
135+ if self.count == 0:
136+ if not self.completed.called:
137+ self.completed.callback(None) # Maybe collect together these results?
138
139=== added file 'sub-commands/aiki/watcher.py'
140--- sub-commands/aiki/watcher.py 1970-01-01 00:00:00 +0000
141+++ sub-commands/aiki/watcher.py 2012-06-08 21:25:21 +0000
142@@ -0,0 +1,277 @@
143+import logging
144+
145+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
146+from zookeeper import NoNodeException
147+import yaml
148+
149+from juju.state.base import StateBase
150+from juju.state.relation import RelationStateManager, ServiceRelationState
151+from juju.state.service import ServiceStateManager, parse_service_name
152+from juju.state.errors import (
153+ RelationStateNotFound, ServiceStateNotFound, ServiceUnitStateNotFound, InvalidRelationIdentity,
154+ StopWatcher, NoMatchingEndpoints, AmbiguousRelation)
155+from juju.unit.workflow import WorkflowStateClient
156+
157+
158+class AnySetting(object):
159+
160+ def __str__(self):
161+ return "<Any setting>"
162+
163+ __repr__ = __str__
164+
165+ANY_SETTING = AnySetting()
166+
167+
168+class Watcher(StateBase):
169+
170+ def __init__(self, client):
171+ super(Watcher, self).__init__(client)
172+ self.log = logging.getLogger("aiki.watcher")
173+ self.relation_state_manager = RelationStateManager(self._client)
174+ self.service_state_manager = ServiceStateManager(self._client)
175+
176+ @inlineCallbacks
177+ def wait_for_relation(self, *descriptors):
178+ """Wait for a relation described by `descriptors` to exist in the topology"""
179+ done = Deferred()
180+
181+ @inlineCallbacks
182+ def expected(*ignored):
183+ try:
184+ relation_id = yield self.get_relation_id_from_descriptors(*descriptors)
185+ relation_ident = yield self.get_relation_ident(descriptors[0], relation_id)
186+ done.callback(relation_ident)
187+ raise StopWatcher()
188+ except (ServiceStateNotFound, RelationStateNotFound):
189+ self.log.debug("Topology changed but continuing to wait for relation: %s" % (descriptors,))
190+
191+ yield self._watch_topology(expected)
192+ returnValue((yield done))
193+
194+ @inlineCallbacks
195+ def wait_for_service(self, service_name):
196+ """Wait for `service_name` to exist in the topology"""
197+ done = Deferred()
198+
199+ @inlineCallbacks
200+ def expected(*ignored):
201+ try:
202+ yield self.service_state_manager.get_service_state(service_name)
203+ done.callback(service_name)
204+ raise StopWatcher()
205+ except ServiceStateNotFound:
206+ self.log.debug("Topology changed, still waiting for existence of service: %s" % service_name)
207+
208+ yield self._watch_topology(expected)
209+ returnValue((yield done))
210+
211+ @inlineCallbacks
212+ def wait_for_unit(self, unit_name):
213+ """Wait for `unit_name` to exist in the topology"""
214+ done = Deferred()
215+
216+ @inlineCallbacks
217+ def expected(*ignored):
218+ try:
219+ service_name = parse_service_name(unit_name)
220+ service = yield self.service_state_manager.get_service_state(service_name)
221+ yield service.get_unit_state(unit_name)
222+ done.callback(service_name)
223+ raise StopWatcher()
224+ except (ServiceUnitStateNotFound, ServiceStateNotFound):
225+ self.log.debug("Topology changed, still waiting for existence of unit: %s" % unit_name)
226+
227+ yield self._watch_topology(expected)
228+ returnValue((yield done))
229+
230+ @inlineCallbacks
231+ def watch_new_service_units(self, service_name, cb):
232+ """As new service units are added to service_name, calls the `cb` function for each unit."""
233+
234+ service = yield self.service_state_manager.get_service_state(service_name)
235+ watched_units = set()
236+
237+ @inlineCallbacks
238+ def check(*ignored):
239+ try:
240+ units = yield service.get_all_unit_states()
241+ for unit in units:
242+ # NOTE ignore units that have gone missing for the
243+ # time being; from actual testing, this seems
244+ # unlikely, but we might need to do more elaborate
245+ # canceling (StopWatcher). Regardless, that's what
246+ # would do with juju, but this is jitsu.
247+ if unit.unit_name not in watched_units:
248+ watched_units.add(unit.unit_name)
249+
250+ from twisted.internet import reactor
251+ reactor.callLater(0, cb, unit.unit_name)
252+ except ServiceStateNotFound:
253+ self.log.warn("Service is no longer in topology: %s" % service_name)
254+ raise StopWatcher()
255+
256+ yield self._watch_topology(check)
257+
258+ def setup_zk_watch(self, path, callback):
259+ """Returns a Deferred"""
260+
261+ @inlineCallbacks
262+ def manage_callback(*ignored):
263+ # Need to guard on the client being connected in the case
264+ # 1) a watch is waiting to run (in the reactor);
265+ # 2) and the connection is closed.
266+ #
267+ # It remains the reponsibility of `callback` to raise
268+ # `StopWatcher`, per above.
269+ if not self._client.connected:
270+ returnValue(None)
271+ exists_d, watch_d = self._client.exists_and_watch(path)
272+ stat = yield exists_d
273+ exists = bool(stat)
274+ if exists:
275+ try:
276+ yield callback(exists)
277+ except StopWatcher:
278+ returnValue(None)
279+ except NoNodeException, e:
280+ # This may occur if the callback is trying to process
281+ # data related to this path, so just ignore
282+ self.log.debug("Ignoring no node exception when watching %s: %s", path, e)
283+
284+ watch_d.addCallback(manage_callback)
285+
286+ return manage_callback()
287+
288+ def get_relation_id(self, relation_ident):
289+ """Return the (internal) relation id for `relation_ident`."""
290+ # NOTE may want to refactor by adding to juju.state.relation
291+ parts = relation_ident.split(":")
292+ if len(parts) != 2 or not parts[1].isdigit():
293+ raise InvalidRelationIdentity(relation_ident)
294+ relation_name, normalized_id = parts
295+ relation_id = "%s-%s" % ("relation", normalized_id.zfill(10))
296+ if not self.topology.has_relation(relation_id):
297+ raise RelationStateNotFound()
298+ return relation_id
299+
300+ @inlineCallbacks
301+ def get_relations(self, service_name):
302+ """Get the relations associated to `service_name`."""
303+ relations = []
304+ service_state_manager = ServiceStateManager(self._client)
305+ self.topology = yield self._read_topology()
306+ service = yield service_state_manager.get_service_state(service_name)
307+ internal_service_id = service.internal_id
308+ for info in self.topology.get_relations_for_service(
309+ internal_service_id):
310+ service_info = info["service"]
311+ relations.append(
312+ ServiceRelationState(
313+ self._client,
314+ internal_service_id,
315+ info["relation_id"],
316+ info["scope"],
317+ **service_info))
318+ returnValue(relations)
319+
320+ @inlineCallbacks
321+ def get_relation_ident(self, descriptor, relation_id):
322+ parts = descriptor.split(":", 1)
323+ service_name = parts[0]
324+ relations = yield self.get_relations(service_name)
325+ relation_ident = [r.relation_ident for r in relations if r.internal_relation_id == relation_id][0]
326+ returnValue(relation_ident)
327+
328+ @inlineCallbacks
329+ def get_relation_id_from_descriptors(self, *descriptors):
330+ endpoint_pairs = yield self.service_state_manager.join_descriptors(*descriptors)
331+ if len(endpoint_pairs) == 0:
332+ raise NoMatchingEndpoints()
333+ elif len(endpoint_pairs) > 1:
334+ for pair in endpoint_pairs[1:]:
335+ if not (pair[0].relation_name.startswith("juju-") or
336+ pair[1].relation_name.startswith("juju-")):
337+ raise AmbiguousRelation(descriptors, endpoint_pairs)
338+ endpoints = endpoint_pairs[0]
339+ if endpoints[0] == endpoints[1]:
340+ endpoints = endpoints[0:1]
341+
342+ # so this needs to be put in the context of a continual watch until the topology is in effect
343+ relation_state = yield self.relation_state_manager.get_relation_state(*endpoints)
344+ returnValue(relation_state.internal_id)
345+
346+ @inlineCallbacks
347+ def watch_unit_settings(self, unit_name, relation_ident, expected_settings):
348+ """Finally returns when unit_name has the expected settings, if ever"""
349+ # NOTE may want to refactor by adding to juju.state.relation
350+ # the generic settings path logic
351+ self.topology = topology = yield self._read_topology() # TODO refactor one of these assignments out
352+ relation_id = self.get_relation_id(relation_ident)
353+ unit_id = topology.get_service_unit_id_from_name(unit_name)
354+ container = topology.get_service_unit_container(unit_id)
355+ container_info = "%s/" % container[-1] if container else ""
356+ settings_path = "/relations/%s/%ssettings/%s" % (relation_id, container_info, unit_id)
357+ done = Deferred()
358+
359+ @inlineCallbacks
360+ def expected(*ignored):
361+ content_yaml, stat = yield self._client.get(settings_path)
362+ content = yaml.load(content_yaml)
363+ for K, V in expected_settings.iteritems():
364+ if V is None:
365+ # key should *not* have a setting
366+ if K in content:
367+ break
368+ elif V is ANY_SETTING:
369+ if K not in content:
370+ break
371+ elif content.get(K) != V:
372+ break
373+ else:
374+ done.callback(content)
375+ raise StopWatcher()
376+
377+ yield self.setup_zk_watch(settings_path, expected)
378+ settings = yield done
379+ returnValue(settings)
380+
381+ @inlineCallbacks
382+ def get_unit_agent_state(self, unit):
383+ # NOTE extracted from juju.control.status
384+ unit_workflow_client = WorkflowStateClient(self._client, unit)
385+ workflow_state = yield unit_workflow_client.get_state()
386+ if not workflow_state:
387+ agent_state = "pending"
388+ else:
389+ unit_connected = yield unit.has_agent()
390+ agent_state = workflow_state.replace("_", "-") if unit_connected else "down"
391+ returnValue(agent_state)
392+
393+ @inlineCallbacks
394+ def wait_for_unit_state(self, unit_name, expected_states, excluded_states):
395+ service_name = parse_service_name(unit_name)
396+ service = yield self.service_state_manager.get_service_state(service_name)
397+ unit = yield service.get_unit_state(unit_name)
398+ agent_path = [None]
399+ done = Deferred()
400+
401+ @inlineCallbacks
402+ def expected(*ignored):
403+ if done.called:
404+ raise StopWatcher()
405+ agent_state = yield self.get_unit_agent_state(unit)
406+ self.log.debug("%s has state: %s", unit_name, agent_state)
407+ if agent_state in expected_states or (excluded_states and agent_state not in excluded_states):
408+ if not done.called:
409+ done.callback(agent_state)
410+ raise StopWatcher()
411+
412+ # Watch both the agent ephemeral plus the workflow state - the
413+ # agent state is a composite of the two
414+ yield self.setup_zk_watch(unit._get_agent_path(), expected)
415+ unit_workflow_client = WorkflowStateClient(self._client, unit)
416+ yield self.setup_zk_watch(unit_workflow_client.zk_state_path, expected)
417+
418+ agent_state = yield done
419+ returnValue(agent_state)
420
421=== added file 'sub-commands/topodump'
422--- sub-commands/topodump 1970-01-01 00:00:00 +0000
423+++ sub-commands/topodump 2012-06-08 21:25:21 +0000
424@@ -0,0 +1,31 @@
425+#!/usr/bin/env python
426+
427+from twisted.internet.defer import inlineCallbacks
428+from aiki.cli import make_arg_parser, setup_logging, run_command
429+from juju.state.base import StateBase
430+
431+
432+def main():
433+ parser = make_arg_parser()
434+ options = parser.parse_args()
435+ setup_logging(options)
436+ print "options", options
437+ run_command(topodump, options)
438+
439+
440+@inlineCallbacks
441+def topodump(result, client, options):
442+ Dumper = TopoDump(client)
443+ yield Dumper.dump()
444+
445+
446+class TopoDump(StateBase):
447+
448+ @inlineCallbacks
449+ def dump(self):
450+ topology = yield self._read_topology()
451+ print topology.dump()
452+
453+
454+if __name__ == '__main__':
455+ main()
456
457=== added file 'sub-commands/watch'
458--- sub-commands/watch 1970-01-01 00:00:00 +0000
459+++ sub-commands/watch 2012-06-08 21:25:21 +0000
460@@ -0,0 +1,242 @@
461+#!/usr/bin/env python
462+
463+import argparse
464+import logging
465+import textwrap
466+
467+import yaml
468+from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue, succeed
469+
470+from aiki.cli import make_arg_parser, setup_logging, run_command, parse_relation_settings
471+from aiki.twistutils import CountDownLatch
472+from aiki.watcher import Watcher, ANY_SETTING
473+
474+
475+log = logging.getLogger("watch")
476+
477+
478+def main():
479+ options = make_watch_parser()
480+ setup_logging(options)
481+ run_command(watch, options)
482+
483+
484+def make_watch_parser():
485+ condition_parser = argparse.ArgumentParser(description="Parses each condition")
486+ condition_parser.add_argument(
487+ "-n", "--num-units", default=None, type=int, metavar="NUM", help="Number of units; only applies to services")
488+ condition_parser.add_argument("-r", "--relation", default=None, metavar="RELATION", help="Specify relation")
489+ condition_parser.add_argument("--setting", default=[], action="append", metavar="SETTING", help="Relation setting exists")
490+ condition_parser.add_argument(
491+ "--state", default=[], dest="states", action="append", metavar="STATE", help="State of service or a unit")
492+ condition_parser.add_argument(
493+ "--x-state", default=[], dest="excluded_states", action="append", metavar="STATE", help="Cannot be in this state")
494+ condition_parser.add_argument("rest", nargs=argparse.REMAINDER)
495+
496+ # If updating condition_parser, it can be useful to do
497+ # condition_parser.format_help() to get the usage info for
498+ # CONDITION; some manual editing will be required however.
499+
500+ main_parser = make_arg_parser(
501+ formatter_class=argparse.RawDescriptionHelpFormatter,
502+ usage=textwrap.dedent("""\
503+ watch [-h] [-e ENVIRONMENT]
504+ [--loglevel CRITICAL|ERROR|WARNING|INFO|DEBUG] [--verbose]
505+ [--any] [--number]
506+ CONDITION [CONDITION ...]
507+
508+ Watch Juju environment for condition(s) to become true, waiting as necessary.
509+
510+ Each condition is specified as follows:
511+
512+ INSTANCE Either service, relation, or service unit. For a
513+ relation, either its relation id (example: db:0) or
514+ descriptors can be used (examples: "mysql wordpress"
515+ or "mysql:db wordpress:db").
516+
517+ -n NUM, --num-units NUM
518+ Number of units; only applies to services
519+ -r RELATION, --relation RELATION
520+ Specify relation for unit or service
521+ --setting SETTING Relation setting exists
522+ --state STATE State of unit or for at least all NUM units of service
523+ --x-state STATE Cannot be in this state
524+ [NAME=SETTING [NAME=SETTING ... ]]
525+ These settings must be as specified for the unit
526+ or for at least all NUM units of a service
527+ """),
528+ epilog=textwrap.dedent("""\
529+ Examples of watches with single conditions:
530+
531+ $ relation_id=`jitsu watch "wordpress mysql"` # waits until relation available and prints id; must be quoted
532+ $ relation_id=`jitsu watch "wordpress:db mysql:app"` # like above, but with fully specified descriptors
533+ $ jitsu watch mysql # service is deployed
534+ $ jitsu watch mysql/0 --state=started # unit is started
535+ $ timeout 60s jitsu watch mysql/0 --state=started # watch up to 60s for mysql/0 to be running, using core timeout command
536+ $ jitsu watch -r db:0 mysql/0 foo=bar # watch for foo to be set to bar on mysql/0 on relation id of db:0
537+ $ jitsu watch mysql/0 -r "wordpress mysql" foo=bar # watch for wordpress<->mysql, then watch for this setting
538+ $ jitsu watch mysql/0 -r db:0 foo= # watch for foo to be unset
539+ $ jitsu watch mysql/0 -r db:0 --setting=foo # watch for foo to be set to *some* value
540+
541+ Multiple conditions can be combined:
542+
543+ $ jitsu watch mysql/0 -r "wordpress mysql" foo= mysql/1 -r "wordpress mysql" foo=bar # all conditions must apply
544+ $ jitsu --any watch ... # any of the conditions may apply
545+ """))
546+ main_parser.add_argument("--any", default=False, action="store_true", help="Any of the conditions may be true")
547+ main_parser.add_argument("--number", default=False, action="store_true", help="Number output by the corresponding condition")
548+
549+ options, condition_args = main_parser.parse_known_args()
550+
551+ # Partially parse (using remainder args) multiple times working
552+ # left to right to get conditions until all original args are
553+ # consumed
554+ options.conditions = []
555+ count = 0
556+ while condition_args:
557+ instance = condition_args.pop(0)
558+ condition = condition_parser.parse_args(condition_args)
559+ condition.instance = instance
560+ condition.count = count
561+ options.conditions.append(condition) # Just collect together
562+ condition.settings, condition_args = parse_relation_settings(condition.rest)
563+ for setting in condition.setting:
564+ condition.settings[setting] = ANY_SETTING
565+ condition.states = set(condition.states)
566+ condition.excluded_states = set(condition.excluded_states)
567+ count += 1
568+ return options
569+
570+
571+def wait_for_results(deferreds, fireOnOneCallback):
572+ d = DeferredList(deferreds, fireOnOneCallback=fireOnOneCallback, fireOnOneErrback=True, consumeErrors=True)
573+ d.addCallback(lambda r: [x[1] for x in r])
574+
575+ def get_errors(f):
576+ # Avoid spurious errors seen in closing connections; we don't care!
577+ if not d.called:
578+ return f.value.subFailure
579+
580+ d.addErrback(get_errors)
581+ return d
582+
583+
584+@inlineCallbacks
585+def watch(result, client, options):
586+ wait_for_conditions = []
587+ for condition in options.conditions:
588+ wait_for_conditions.append(parse_and_wait_on_condition(client, options, condition))
589+ yield wait_for_results(wait_for_conditions, options.any)
590+
591+
592+def print_result(options, condition, result):
593+ """Given conditions complete asynchronously, the user can specify numbering to disambiguate"""
594+ if options.number:
595+ print condition.count,
596+ print result
597+
598+
599+@inlineCallbacks
600+def parse_and_wait_on_condition(client, options, condition):
601+ """Given a condition description from argparse, creates a watcher for it"""
602+ watcher = Watcher(client)
603+
604+ # Disambiguate the following cases: relation, unit, or service:
605+ # 1. Relation
606+ if len(condition.instance.split()) >= 2:
607+ # Only handle non-peer relations, since peer relations are automatically established anyway;
608+ # also handle error case of a relation between more than 2 services
609+ relation_ident = yield wait_for_relation(client, watcher, options, condition, condition.instance)
610+ print_result(options, condition, relation_ident)
611+ return
612+
613+ # 2. Service unit
614+ if "/" in condition.instance:
615+ yield wait_for_unit(client, watcher, options, condition, condition.instance)
616+ print_result(options, condition, condition.instance)
617+ return
618+
619+ # 3. Service
620+ yield wait_for_service(client, watcher, options, condition)
621+ print_result(options, condition, condition.instance)
622+
623+
624+@inlineCallbacks
625+def wait_for_unit(client, watcher, options, condition, unit_name, relation_ident=None):
626+ log.info("Waiting for unit %s...", unit_name)
627+ yield watcher.wait_for_unit(unit_name)
628+ if condition.states or condition.excluded_states:
629+ log.info("Waiting for unit %s to be in %s and not in %s",
630+ unit_name, sorted(condition.states), sorted(condition.excluded_states))
631+ agent_state = yield watcher.wait_for_unit_state(unit_name, condition.states, condition.excluded_states)
632+ log.info("Completed waiting for unit %s", unit_name)
633+
634+ if condition.relation:
635+ if relation_ident is None:
636+ relation_ident = yield wait_for_relation(client, watcher, options, condition)
637+ if condition.settings:
638+ log.info("Waiting for %s: settings %s", unit_name, condition.settings)
639+ settings = yield watcher.watch_unit_settings(unit_name, relation_ident, condition.settings)
640+ log.info("Completed waiting for %s: expected %s, actual %s",
641+ unit_name, condition.settings, settings)
642+ returnValue(unit_name)
643+
644+
645+@inlineCallbacks
646+def wait_for_relation(client, watcher, options, condition, relation=None):
647+ """Return relation ident corresponding to the condition or relation (if specified), waiting as necessary"""
648+
649+ if relation is None:
650+ relation = condition.relation
651+
652+ if ":" in relation:
653+ parts = relation.split(":")
654+ if len(parts) == 2 and parts[1].isdigit():
655+ relation_ident = relation
656+ returnValue(relation_ident)
657+
658+ # Otherwise wait for it:
659+ descriptors = relation.split()
660+ if len(descriptors) == 1 or len(descriptors) == 2:
661+ log.info("Waiting for %s...", format_descriptors(*descriptors))
662+ relation_ident = yield watcher.wait_for_relation(*descriptors)
663+ log.info("Completed waiting for %s", format_descriptors(*descriptors))
664+ returnValue(relation_ident)
665+
666+ # log.error("Bad relation: %s" % relation)
667+ raise ValueError("Bad relation: %s" % relation)
668+
669+
670+@inlineCallbacks
671+def wait_for_service(client, watcher, options, condition):
672+ """Return service name when sucessfully waited"""
673+ log.info("Waiting for %s%s service...",
674+ "%d unit(s) of " % condition.num_units if condition.num_units else "",
675+ condition.instance)
676+ yield watcher.wait_for_service(condition.instance)
677+ log.info("Completed waiting for %s service", condition.instance)
678+ if condition.relation:
679+ relation_ident = yield wait_for_relation(client, watcher, options, condition)
680+ else:
681+ relation_ident = None
682+ if condition.num_units:
683+ latch = CountDownLatch(condition.num_units)
684+
685+ def new_unit_cb(unit_name):
686+ # NOTE Could also presumably cancel any removed units, consider that for a future refactoring
687+ return succeed(latch.add(wait_for_unit(client, watcher, options, condition, unit_name, relation_ident)))
688+
689+ yield watcher.watch_new_service_units(condition.instance, new_unit_cb)
690+ yield latch.completed
691+ returnValue(condition.instance)
692+
693+
694+def format_descriptors(*descriptors):
695+ if len(descriptors) == 1:
696+ return "peer relation %s" % descriptors[0]
697+ else:
698+ return "relation between " + " and ".join(sorted(descriptors))
699+
700+
701+if __name__ == '__main__':
702+ main()

Subscribers

People subscribed via source and target branches

to all changes: