Merge lp:~themue/pyjuju/go-state-more-unit-watchers into lp:pyjuju/go

Proposed by Frank Mueller
Status: Rejected
Rejected by: Gustavo Niemeyer
Proposed branch: lp:~themue/pyjuju/go-state-more-unit-watchers
Merge into: lp:pyjuju/go
Diff against target: 1044 lines (+718/-102)
6 files modified
state/state.go (+6/-0)
state/state_test.go (+1/-1)
state/unit.go (+246/-16)
state/watch_test.go (+191/-19)
state/watcher/watcher.go (+115/-0)
state/watcher/watcher_test.go (+159/-66)
To merge this branch: bzr merge lp:~themue/pyjuju/go-state-more-unit-watchers
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+101760@code.launchpad.net

Description of the change

More watchers for Unit.

Adding of NeedsUpgradeWatcher, ResolvedWatcher and PortsWatcher
for Units. They all are based on the new ExistenceWatcher for
the creation, content changing and deletion of nodes.

https://codereview.appspot.com/6011047/

To post a comment you must log in.
Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (4.1 KiB)

looks good.

https://codereview.appspot.com/6011047/diff/1/state/unit.go
File state/unit.go (right):

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode489
state/unit.go:489: // NeedsUpgradeWatcher observes changes to a upgrade
flag node.
  // NeedsUpgradeWatcher observes changes to a unit's upgrade flag.
?

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode531
state/unit.go:531: defer close(w.changeChan)
is this how callers are supposed to know that the upgrade watcher is
shutting down? there's no Dying method, so i guess so. if so, it should
be documented (it's different from the ExistenceWatcher way); if not
then you should a a Dying method and ensure that w.tomb is in a dying
state before closing changeChan.

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode614
state/unit.go:614: // PortsWatcher observes changes to a ports node.
// PortsWatcher observes changes to a unit's open ports.

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode656
state/unit.go:656: defer close(w.changeChan)
same applies here as to NeedsUpgradeWatcher.

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode663
state/unit.go:663: ports := &struct{ Open []Port }{}
var ports struct {
     Open []Port
}
would be clearer i think (and would avoid the unnecessary double
indirection).

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go
File state/watch_test.go (right):

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go#newcode13
state/watch_test.go:13: return change, ok, false
you could probably just Assert(ok, Equals, true) here
and thus simplify the interface to receiveChange.
That applies to most of the similar tests below - those that don't test
for the changes channel being closed can factor out the closed test.

another possibility is to pass expected values for timeout and closed
into the function:

receiveChange := func(w *state.ConfigWatcher, expectClosed,
expectTimeout bool) *state.ConfigNode {
     etc
}

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go#newcode14
state/watch_test.go:14: case <-time.After(time.Second):
i think a second is too long here. 100 * time.Millsecond?
same applies below, let's not make testing take too long.

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go
File state/watcher/watcher.go (right):

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go#newcode10
state/watcher/watcher.go:10: // of a node and, if it's true, its
content.
// ExistenceChange holds information on the existence
// and contents of a node. Content will be empty when the
// node does not exist.
?

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go#newcode17
state/watcher/watcher.go:17: // notification (including the content)
when it is created, changed
s/ (including the content)//

it's implied by the rest of the sentence and the ExistenceChange type.

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go#newcode94
state/watcher/watcher.go:94: content, _, watch, err := w.zk.GetW(w.path)
it seems slightly odd that when a node gets deleted, we ignore what
we've just...

Read more...

113. By Frank Mueller

Changed after review.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go
File state/watch_test.go (right):

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go#newcode13
state/watch_test.go:13: return change, ok, false
On 2012/04/16 15:45:31, TheMue wrote:
> On 2012/04/16 11:43:01, rog wrote:
> > you could probably just Assert(ok, Equals, true) here
> > and thus simplify the interface to receiveChange.
> > That applies to most of the similar tests below - those that don't
test for
> the
> > changes channel being closed can factor out the closed test.
> >
> > another possibility is to pass expected values for timeout and
closed into the
> > function:
> >
> > receiveChange := func(w *state.ConfigWatcher, expectClosed,
expectTimeout
> bool)
> > *state.ConfigNode {
> > etc
> > }

> Thought about it too. But then I decided for a clean seperation of the
simple
> retrievement with a possible timeout (had some due to errors while
development)
> and the assertions in the test. To me this way is more readable, even
if the
> code is bit more bloated.

fair enough.

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go
File state/watcher/watcher.go (right):

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go#newcode94
state/watcher/watcher.go:94: content, _, watch, err := w.zk.GetW(w.path)
On 2012/04/16 15:45:31, TheMue wrote:
> On 2012/04/16 11:43:01, rog wrote:
> > it seems slightly odd that when a node gets deleted, we ignore what
we've just
> > been told and call GetW and then ExistsW on the node anyway.
> > i appreciate that passing the event type into update might
complicate the
> logic
> > a little, but i think it might be worth doing. something like this
perhaps?
> >
> > func (w *ExistenceWatcher) update(event int) (nextWatch <-chan
> zookeeper.Event,
> > err error
> > ) {
> > var content string
> > var watch <-chan zookeeper.Event
> > if event != zookeeper.EVENT_DELETED {
> > content, _, watch, err = w.zk.GetW(w.path)
> > }
> > switch {
> > case event == zookeeper.EVENT_DELETED || zookeeper.IsError(err,
> > zookeeper.ZNONODE):
> > call existsW
> > etc
> > }

> Hmm, ok, which event would you pass for the first call of update()? At
this
> moment you're unsure if it exists. And where does err in switch come
from?

you could pass any event that's not EVENT_DELETED - EVENT_CHANGED would
be fine.

err is defined in the same place it is currently - as a return
parameter.

https://codereview.appspot.com/6011047/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

As we discussed over IRC, can you please break that down into smaller
chunks?

https://codereview.appspot.com/6011047/diff/6001/state/state.go
File state/state.go (right):

https://codereview.appspot.com/6011047/diff/6001/state/state.go#newcode273
state/state.go:273: func (w *ConfigWatcher) Dying() <-chan struct{} {
Why is this necessary?

https://codereview.appspot.com/6011047/diff/6001/state/unit.go
File state/unit.go (right):

https://codereview.appspot.com/6011047/diff/6001/state/unit.go#newcode316
state/unit.go:316: // WatchResolved creates a watcher for the resolved
node of the unit.
There's no such thing as a "resolved node" documented in this API. We
need to document these methods in terms of what they mean semantically
for juju, rather than what they mean for the interaction of these
methods with zookeeper. For example:

// WatchResolved returns a watcher that fires when the unit
// is marked as having had its problems resolved. See
// SetResolved for details.

Please review the documentation for the other methods.

https://codereview.appspot.com/6011047/diff/6001/state/unit.go#newcode480
state/unit.go:480: if none && mode == ResolvedNone {
none? Does it mean acceptNone? Please fix the argument name and document
it.

https://codereview.appspot.com/6011047/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

More notes on the changes in watcher to get you going tomorrow.

https://codereview.appspot.com/6011047/diff/6001/state/watcher/watcher.go
File state/watcher/watcher.go (right):

https://codereview.appspot.com/6011047/diff/6001/state/watcher/watcher.go#newcode12
state/watcher/watcher.go:12: type ExistenceChange struct {
This isn't watching just existence.. it's watching both existence and
contents. How's that different from the ContentsWatcher, and why do we
need both? Any higher-level watcher that needs contents may be
implemented on top of a watcher that offers existence *and* contents. I
suggest dropping this type, and modifying ContentsWatcher to handle both
problems.

This should be done in an independent branch.

https://codereview.appspot.com/6011047/diff/6001/state/watcher/watcher.go#newcode96
state/watcher/watcher.go:96: case err != nil && !zookeeper.IsError(err,
zookeeper.ZNONODE):
This check should be inverted. We care about the case ZNONODE, so let's
be explicit about it:

case zookeeper.IsError(err, zookeeper.ZNONODE):
    ...

https://codereview.appspot.com/6011047/diff/6001/state/watcher/watcher.go#newcode100
state/watcher/watcher.go:100: _, watch, err = w.zk.ExistsW(w.path)
This is bogus. It is assuming that the remote state of the node is known
here, but it's not. The current state of the node is in that parameter
that is being ignored.

The overall logic must be fixed to take that into consideration.

https://codereview.appspot.com/6011047/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

For the record, this work is being split down into other smaller
branches. Please do keep these reviews in mind, though.

https://codereview.appspot.com/6011047/

Unmerged revisions

113. By Frank Mueller

Changed after review.

112. By Frank Mueller

Extended watcher package by ExistenceWatcher with looks for creation,
content changes and deletions. Used it in NeedsUpgradeWatcher, ResolvedWatcher
and PortsWatcher for Units.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'state/state.go'
--- state/state.go 2012-04-05 13:01:51 +0000
+++ state/state.go 2012-04-16 15:47:21 +0000
@@ -268,6 +268,12 @@
268 return w.changeChan268 return w.changeChan
269}269}
270270
271// Dying returns a channel that is closed when the
272// watcher has stopped or is about to stop.
273func (w *ConfigWatcher) Dying() <-chan struct{} {
274 return w.tomb.Dying()
275}
276
271// Stop stops the watch and returns any error encountered277// Stop stops the watch and returns any error encountered
272// while watching. This method should always be called278// while watching. This method should always be called
273// before discarding the watcher.279// before discarding the watcher.
274280
=== modified file 'state/state_test.go'
--- state/state_test.go 2012-03-26 12:47:37 +0000
+++ state/state_test.go 2012-04-16 15:47:21 +0000
@@ -778,7 +778,7 @@
778 c.Assert(err, ErrorMatches, "no unused machine found")778 c.Assert(err, ErrorMatches, "no unused machine found")
779}779}
780780
781func (s *StateSuite) TestGetSetClearUnitUpgrate(c *C) {781func (s *StateSuite) TestGetSetClearUnitUpgrade(c *C) {
782 // Check that setting and clearing an upgrade flag on a unit works.782 // Check that setting and clearing an upgrade flag on a unit works.
783 dummy, _ := addDummyCharm(c, s.st)783 dummy, _ := addDummyCharm(c, s.st)
784 wordpress, err := s.st.AddService("wordpress", dummy)784 wordpress, err := s.st.AddService("wordpress", dummy)
785785
=== modified file 'state/unit.go'
--- state/unit.go 2012-03-20 20:28:54 +0000
+++ state/unit.go 2012-04-16 15:47:21 +0000
@@ -11,6 +11,8 @@
11 "launchpad.net/gozk/zookeeper"11 "launchpad.net/gozk/zookeeper"
12 "launchpad.net/juju/go/charm"12 "launchpad.net/juju/go/charm"
13 "launchpad.net/juju/go/state/presence"13 "launchpad.net/juju/go/state/presence"
14 "launchpad.net/juju/go/state/watcher"
15 "launchpad.net/tomb"
14 "strconv"16 "strconv"
15 "strings"17 "strings"
16 "time"18 "time"
@@ -261,6 +263,11 @@
261 return err263 return err
262}264}
263265
266// WatchNeedsUpgrade creates a watcher for the upgrade flag of the unit.
267func (u *Unit) WatchNeedsUpgrade() *NeedsUpgradeWatcher {
268 return newNeedsUpgradeWatcher(u)
269}
270
264// Resolved returns the resolved mode for the unit.271// Resolved returns the resolved mode for the unit.
265func (u *Unit) Resolved() (ResolvedMode, error) {272func (u *Unit) Resolved() (ResolvedMode, error) {
266 yaml, _, err := u.st.zk.Get(u.zkResolvedPath())273 yaml, _, err := u.st.zk.Get(u.zkResolvedPath())
@@ -271,15 +278,7 @@
271 if err != nil {278 if err != nil {
272 return ResolvedNone, err279 return ResolvedNone, err
273 }280 }
274 setting := &struct{ Retry ResolvedMode }{}281 return parseResolvedMode(yaml)
275 if err = goyaml.Unmarshal([]byte(yaml), setting); err != nil {
276 return ResolvedNone, err
277 }
278 mode := setting.Retry
279 if err := validResolvedMode(mode); err != nil {
280 return ResolvedNone, err
281 }
282 return mode, nil
283}282}
284283
285// SetResolved marks the unit as having had any previous state284// SetResolved marks the unit as having had any previous state
@@ -289,7 +288,7 @@
289// reexecute previous failed hooks or to continue as if they had 288// reexecute previous failed hooks or to continue as if they had
290// succeeded before.289// succeeded before.
291func (u *Unit) SetResolved(mode ResolvedMode) error {290func (u *Unit) SetResolved(mode ResolvedMode) error {
292 if err := validResolvedMode(mode); err != nil {291 if err := validResolvedMode(mode, false); err != nil {
293 return err292 return err
294 }293 }
295 setting := &struct{ Retry ResolvedMode }{mode}294 setting := &struct{ Retry ResolvedMode }{mode}
@@ -314,6 +313,11 @@
314 return err313 return err
315}314}
316315
316// WatchResolved creates a watcher for the resolved node of the unit.
317func (u *Unit) WatchResolved() *ResolvedWatcher {
318 return newResolvedWatcher(u)
319}
320
317// OpenPort sets the policy of the port with protocol and number to be opened.321// OpenPort sets the policy of the port with protocol and number to be opened.
318func (u *Unit) OpenPort(protocol string, number int) error {322func (u *Unit) OpenPort(protocol string, number int) error {
319 openPort := func(oldYaml string, stat *zookeeper.Stat) (string, error) {323 openPort := func(oldYaml string, stat *zookeeper.Stat) (string, error) {
@@ -386,6 +390,11 @@
386 return ports.Open, nil390 return ports.Open, nil
387}391}
388392
393// WatchPorts creates a watcher for the ports node of the unit.
394func (u *Unit) WatchPorts() *PortsWatcher {
395 return newPortsWatcher(u)
396}
397
389// AgentAlive returns whether the respective remote agent is alive.398// AgentAlive returns whether the respective remote agent is alive.
390func (u *Unit) AgentAlive() (bool, error) {399func (u *Unit) AgentAlive() (bool, error) {
391 return presence.Alive(u.st.zk, u.zkAgentPath())400 return presence.Alive(u.st.zk, u.zkAgentPath())
@@ -417,11 +426,6 @@
417 return fmt.Sprintf("/units/%s", u.key)426 return fmt.Sprintf("/units/%s", u.key)
418}427}
419428
420// zkPortsPath returns the ZooKeeper path for the open ports.
421func (u *Unit) zkPortsPath() string {
422 return fmt.Sprintf("/units/%s/ports", u.key)
423}
424
425// zkAgentPath returns the ZooKeeper path for the unit agent.429// zkAgentPath returns the ZooKeeper path for the unit agent.
426func (u *Unit) zkAgentPath() string {430func (u *Unit) zkAgentPath() string {
427 return fmt.Sprintf("/units/%s/agent", u.key)431 return fmt.Sprintf("/units/%s/agent", u.key)
@@ -437,6 +441,11 @@
437 return fmt.Sprintf("/units/%s/resolved", u.key)441 return fmt.Sprintf("/units/%s/resolved", u.key)
438}442}
439443
444// zkPortsPath returns the ZooKeeper path for the open ports.
445func (u *Unit) zkPortsPath() string {
446 return fmt.Sprintf("/units/%s/ports", u.key)
447}
448
440// parseUnitName parses a unit name like "wordpress/0" into449// parseUnitName parses a unit name like "wordpress/0" into
441// its service name and sequence number parts.450// its service name and sequence number parts.
442func parseUnitName(name string) (serviceName string, seqNo int, err error) {451func parseUnitName(name string) (serviceName string, seqNo int, err error) {
@@ -451,11 +460,232 @@
451 return parts[0], int(sequenceNo), nil460 return parts[0], int(sequenceNo), nil
452}461}
453462
463// parseResolveMode parses a given YAML for the resolve mode
464// and checks if it's valid.
465func parseResolvedMode(yaml string) (ResolvedMode, error) {
466 setting := &struct{ Retry ResolvedMode }{}
467 if err := goyaml.Unmarshal([]byte(yaml), setting); err != nil {
468 return ResolvedNone, err
469 }
470 mode := setting.Retry
471 if err := validResolvedMode(mode, true); err != nil {
472 return ResolvedNone, err
473 }
474 return mode, nil
475}
476
454// validResolvedMode ensures that only valid values for the477// validResolvedMode ensures that only valid values for the
455// resolved mode are used.478// resolved mode are used.
456func validResolvedMode(mode ResolvedMode) error {479func validResolvedMode(mode ResolvedMode, none bool) error {
480 if none && mode == ResolvedNone {
481 return nil
482 }
457 if mode != ResolvedRetryHooks && mode != ResolvedNoHooks {483 if mode != ResolvedRetryHooks && mode != ResolvedNoHooks {
458 return fmt.Errorf("invalid error resolution mode: %d", mode)484 return fmt.Errorf("invalid error resolution mode: %d", mode)
459 }485 }
460 return nil486 return nil
461}487}
488
489// NeedsUpgradeWatcher observes changes to a unit's upgrade flag.
490type NeedsUpgradeWatcher struct {
491 unit *Unit
492 tomb tomb.Tomb
493 watcher *watcher.ExistenceWatcher
494 changeChan chan bool
495}
496
497// newNeedsUpgradeWatcher creates and starts a new resolved flag node
498// watcher for the given path.
499func newNeedsUpgradeWatcher(u *Unit) *NeedsUpgradeWatcher {
500 w := &NeedsUpgradeWatcher{
501 unit: u,
502 changeChan: make(chan bool),
503 watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkNeedsUpgradePath()),
504 }
505 go w.loop()
506 return w
507}
508
509// Changes returns a channel that will receive the new
510// resolved mode when a change is detected. Note that multiple
511// changes may be observed as a single event in the channel.
512func (w *NeedsUpgradeWatcher) Changes() <-chan bool {
513 return w.changeChan
514}
515
516// Dying returns a channel that is closed when the
517// watcher has stopped or is about to stop.
518func (w *NeedsUpgradeWatcher) Dying() <-chan struct{} {
519 return w.tomb.Dying()
520}
521
522// Stop stops the watch and returns any error encountered
523// while watching. This method should always be called
524// before discarding the watcher.
525func (w *NeedsUpgradeWatcher) Stop() error {
526 w.tomb.Kill(nil)
527 if err := w.watcher.Stop(); err != nil {
528 w.tomb.Wait()
529 return err
530 }
531 return w.tomb.Wait()
532}
533
534// loop is the backend for watching the resolved flag node.
535func (w *NeedsUpgradeWatcher) loop() {
536 defer w.tomb.Done()
537 defer close(w.changeChan)
538
539 for {
540 select {
541 case <-w.tomb.Dying():
542 return
543 case change := <-w.watcher.Changes():
544 select {
545 case <-w.watcher.Dying():
546 return
547 case <-w.tomb.Dying():
548 return
549 case w.changeChan <- change.Exists:
550 }
551 }
552 }
553}
554
555// ResolvedWatcher observes changes to a resolved flag node.
556type ResolvedWatcher struct {
557 unit *Unit
558 tomb tomb.Tomb
559 watcher *watcher.ExistenceWatcher
560 changeChan chan ResolvedMode
561}
562
563// newResolvedWatcher creates and starts a new resolved flag node
564// watcher for the given path.
565func newResolvedWatcher(u *Unit) *ResolvedWatcher {
566 w := &ResolvedWatcher{
567 unit: u,
568 changeChan: make(chan ResolvedMode),
569 watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkResolvedPath()),
570 }
571 go w.loop()
572 return w
573}
574
575// Changes returns a channel that will receive the new
576// resolved mode when a change is detected. Note that multiple
577// changes may be observed as a single event in the channel.
578func (w *ResolvedWatcher) Changes() <-chan ResolvedMode {
579 return w.changeChan
580}
581
582// Dying returns a channel that is closed when the
583// watcher has stopped or is about to stop.
584func (w *ResolvedWatcher) Dying() <-chan struct{} {
585 return w.tomb.Dying()
586}
587
588// Stop stops the watch and returns any error encountered
589// while watching. This method should always be called
590// before discarding the watcher.
591func (w *ResolvedWatcher) Stop() error {
592 w.tomb.Kill(nil)
593 if err := w.watcher.Stop(); err != nil {
594 w.tomb.Wait()
595 return err
596 }
597 return w.tomb.Wait()
598}
599
600// loop is the backend for watching the resolved flag node.
601func (w *ResolvedWatcher) loop() {
602 defer w.tomb.Done()
603 defer close(w.changeChan)
604
605 for {
606 select {
607 case <-w.tomb.Dying():
608 return
609 case change := <-w.watcher.Changes():
610 mode, err := parseResolvedMode(change.Content)
611 if err != nil {
612 w.tomb.Kill(err)
613 return
614 }
615 select {
616 case <-w.watcher.Dying():
617 return
618 case <-w.tomb.Dying():
619 return
620 case w.changeChan <- mode:
621 }
622 }
623 }
624}
625
626// PortsWatcher observes changes to a unit's open ports.
627type PortsWatcher struct {
628 unit *Unit
629 tomb tomb.Tomb
630 watcher *watcher.ExistenceWatcher
631 changeChan chan []Port
632}
633
634// newPortsWatcher creates and starts a new resolved flag node
635// watcher for the given path.
636func newPortsWatcher(u *Unit) *PortsWatcher {
637 w := &PortsWatcher{
638 unit: u,
639 changeChan: make(chan []Port),
640 watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkPortsPath()),
641 }
642 go w.loop()
643 return w
644}
645
646// Changes returns a channel that will receive the new
647// ports when a change is detected. Note that multiple
648// changes may be observed as a single event in the channel.
649func (w *PortsWatcher) Changes() <-chan []Port {
650 return w.changeChan
651}
652
653// Stop stops the watch and returns any error encountered
654// while watching. This method should always be called
655// before discarding the watcher.
656func (w *PortsWatcher) Stop() error {
657 w.tomb.Kill(nil)
658 if err := w.watcher.Stop(); err != nil {
659 w.tomb.Wait()
660 return err
661 }
662 return w.tomb.Wait()
663}
664
665// loop is the backend for watching the ports node.
666func (w *PortsWatcher) loop() {
667 defer w.tomb.Done()
668 defer close(w.changeChan)
669
670 for {
671 select {
672 case <-w.tomb.Dying():
673 return
674 case change := <-w.watcher.Changes():
675 var ports struct {
676 Open []Port
677 }
678 if err := goyaml.Unmarshal([]byte(change.Content), &ports); err != nil {
679 w.tomb.Kill(err)
680 return
681 }
682 select {
683 case <-w.watcher.Dying():
684 return
685 case <-w.tomb.Dying():
686 return
687 case w.changeChan <- ports.Open:
688 }
689 }
690 }
691}
462692
=== modified file 'state/watch_test.go'
--- state/watch_test.go 2012-04-05 13:01:51 +0000
+++ state/watch_test.go 2012-04-16 15:47:21 +0000
@@ -2,10 +2,20 @@
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "launchpad.net/juju/go/state"
5 "time"6 "time"
6)7)
78
8func (s *StateSuite) TestServiceWatchConfig(c *C) {9func (s *StateSuite) TestServiceWatchConfig(c *C) {
10 receiveChange := func(w *state.ConfigWatcher) (*state.ConfigNode, bool, bool) {
11 select {
12 case change, ok := <-w.Changes():
13 return change, ok, false
14 case <-time.After(100 * time.Millisecond):
15 return nil, false, true
16 }
17 return nil, false, false
18 }
9 dummy, _ := addDummyCharm(c, s.st)19 dummy, _ := addDummyCharm(c, s.st)
10 wordpress, err := s.st.AddService("wordpress", dummy)20 wordpress, err := s.st.AddService("wordpress", dummy)
11 c.Assert(err, IsNil)21 c.Assert(err, IsNil)
@@ -17,7 +27,9 @@
17 watcher := wordpress.WatchConfig()27 watcher := wordpress.WatchConfig()
1828
19 // Recieve initial event after creation.29 // Recieve initial event after creation.
20 changedConfig := <-watcher.Changes()30 changedConfig, ok, timeout := receiveChange(watcher)
31 c.Assert(timeout, Equals, false)
32 c.Assert(ok, Equals, true)
21 c.Assert(changedConfig.Keys(), HasLen, 0)33 c.Assert(changedConfig.Keys(), HasLen, 0)
2234
23 // Two more change events.35 // Two more change events.
@@ -25,20 +37,23 @@
25 config.Set("baz", "yadda")37 config.Set("baz", "yadda")
26 _, err = config.Write()38 _, err = config.Write()
27 c.Assert(err, IsNil)39 c.Assert(err, IsNil)
28
29 time.Sleep(100 * time.Millisecond)40 time.Sleep(100 * time.Millisecond)
30 config.Delete("foo")41 config.Delete("foo")
31 _, err = config.Write()42 _, err = config.Write()
32 c.Assert(err, IsNil)43 c.Assert(err, IsNil)
3344
34 // Receive the two changes.45 // Receive the two changes.
35 changedConfig = <-watcher.Changes()46 changedConfig, ok, timeout = receiveChange(watcher)
47 c.Assert(timeout, Equals, false)
48 c.Assert(ok, Equals, true)
36 c.Assert(changedConfig.Map(), DeepEquals, map[string]interface{}{"foo": "bar", "baz": "yadda"})49 c.Assert(changedConfig.Map(), DeepEquals, map[string]interface{}{"foo": "bar", "baz": "yadda"})
37 foo, found := changedConfig.Get("foo")50 foo, found := changedConfig.Get("foo")
38 c.Assert(found, Equals, true)51 c.Assert(found, Equals, true)
39 c.Assert(foo, Equals, "bar")52 c.Assert(foo, Equals, "bar")
4053
41 changedConfig = <-watcher.Changes()54 changedConfig, ok, timeout = receiveChange(watcher)
55 c.Assert(timeout, Equals, false)
56 c.Assert(ok, Equals, true)
42 c.Assert(changedConfig.Map(), DeepEquals, map[string]interface{}{"baz": "yadda"})57 c.Assert(changedConfig.Map(), DeepEquals, map[string]interface{}{"baz": "yadda"})
43 foo, found = changedConfig.Get("foo")58 foo, found = changedConfig.Get("foo")
44 c.Assert(found, Equals, false)59 c.Assert(found, Equals, false)
@@ -47,18 +62,23 @@
47 c.Assert(baz, Equals, "yadda")62 c.Assert(baz, Equals, "yadda")
4863
49 // No more changes.64 // No more changes.
50 select {65 _, _, timeout = receiveChange(watcher)
51 case <-watcher.Changes():66 c.Assert(timeout, Equals, true)
52 c.Fatalf("no more config changes expected")
53 case <-time.After(200 * time.Millisecond):
54 // The timeout is expected.
55 }
5667
57 err = watcher.Stop()68 err = watcher.Stop()
58 c.Assert(err, IsNil)69 c.Assert(err, IsNil)
59}70}
6071
61func (s *StateSuite) TestServiceWatchConfigIllegalData(c *C) {72func (s *StateSuite) TestServiceWatchConfigIllegalDatal(c *C) {
73 receiveChange := func(w *state.ConfigWatcher) (*state.ConfigNode, bool, bool) {
74 select {
75 case change, ok := <-w.Changes():
76 return change, ok, false
77 case <-time.After(100 * time.Millisecond):
78 return nil, false, true
79 }
80 return nil, false, false
81 }
62 dummy, _ := addDummyCharm(c, s.st)82 dummy, _ := addDummyCharm(c, s.st)
63 wordpress, err := s.st.AddService("wordpress", dummy)83 wordpress, err := s.st.AddService("wordpress", dummy)
64 c.Assert(err, IsNil)84 c.Assert(err, IsNil)
@@ -69,15 +89,167 @@
69 _, err = s.zkConn.Set("/services/service-0000000000/config", "---", -1)89 _, err = s.zkConn.Set("/services/service-0000000000/config", "---", -1)
70 c.Assert(err, IsNil)90 c.Assert(err, IsNil)
7191
72 // Changes() has to be closed92 // Changes() has to be closed.
73 select {93 _, ok, timeout := receiveChange(watcher)
74 case _, ok := <-watcher.Changes():94 c.Assert(ok, Equals, false)
75 c.Assert(ok, Equals, false)95 c.Assert(timeout, Equals, false)
76 case <-time.After(200 * time.Millisecond):
77 // Timeout should not be needed.
78 c.Fatalf("config change channel should have been closed due to the illegal data")
79 }
8096
81 err = watcher.Stop()97 err = watcher.Stop()
82 c.Assert(err, ErrorMatches, "YAML error: .*")98 c.Assert(err, ErrorMatches, "YAML error: .*")
83}99}
100
101func (s *StateSuite) TestUnitWatchNeedsUpgrade(c *C) {
102 receiveChange := func(w *state.NeedsUpgradeWatcher) (bool, bool, bool) {
103 select {
104 case change, ok := <-w.Changes():
105 return change, ok, false
106 case <-time.After(100 * time.Millisecond):
107 return false, false, true
108 }
109 return false, false, false
110 }
111 dummy, _ := addDummyCharm(c, s.st)
112 wordpress, err := s.st.AddService("wordpress", dummy)
113 c.Assert(err, IsNil)
114 c.Assert(wordpress.Name(), Equals, "wordpress")
115 unit, err := wordpress.AddUnit()
116 c.Assert(err, IsNil)
117 watcher := unit.WatchNeedsUpgrade()
118
119 go func() {
120 time.Sleep(50 * time.Millisecond)
121 err = unit.SetNeedsUpgrade()
122 c.Assert(err, IsNil)
123 time.Sleep(50 * time.Millisecond)
124 err = unit.ClearNeedsUpgrade()
125 c.Assert(err, IsNil)
126 }()
127
128 // Receive the changes.
129 upgrade, ok, timeout := receiveChange(watcher)
130 c.Assert(timeout, Equals, false)
131 c.Assert(ok, Equals, true)
132 c.Assert(upgrade, Equals, true)
133 upgrade, ok, timeout = receiveChange(watcher)
134 c.Assert(ok, Equals, true)
135 c.Assert(timeout, Equals, false)
136 c.Assert(upgrade, Equals, false)
137
138 // No more changes.
139 _, _, timeout = receiveChange(watcher)
140 c.Assert(timeout, Equals, true)
141
142 err = watcher.Stop()
143 c.Assert(err, IsNil)
144}
145
146func (s *StateSuite) TestUnitWatchResolved(c *C) {
147 receiveChange := func(w *state.ResolvedWatcher) (state.ResolvedMode, bool, bool) {
148 select {
149 case change, ok := <-w.Changes():
150 return change, ok, false
151 case <-time.After(100 * time.Millisecond):
152 return state.ResolvedNoHooks, false, true
153 }
154 return state.ResolvedNoHooks, false, false
155 }
156 dummy, _ := addDummyCharm(c, s.st)
157 wordpress, err := s.st.AddService("wordpress", dummy)
158 c.Assert(err, IsNil)
159 c.Assert(wordpress.Name(), Equals, "wordpress")
160 unit, err := wordpress.AddUnit()
161 c.Assert(err, IsNil)
162 watcher := unit.WatchResolved()
163
164 go func() {
165 time.Sleep(50 * time.Millisecond)
166 err = unit.SetResolved(state.ResolvedRetryHooks)
167 c.Assert(err, IsNil)
168 time.Sleep(50 * time.Millisecond)
169 err = unit.ClearResolved()
170 c.Assert(err, IsNil)
171 time.Sleep(50 * time.Millisecond)
172 err = unit.SetResolved(state.ResolvedNoHooks)
173 c.Assert(err, IsNil)
174 }()
175
176 // Receive the changes.
177 resolved, ok, timeout := receiveChange(watcher)
178 c.Assert(timeout, Equals, false)
179 c.Assert(ok, Equals, true)
180 c.Assert(resolved, Equals, state.ResolvedRetryHooks)
181 resolved, ok, timeout = receiveChange(watcher)
182 c.Assert(timeout, Equals, false)
183 c.Assert(ok, Equals, true)
184 c.Assert(resolved, Equals, state.ResolvedNone)
185 resolved, ok, timeout = receiveChange(watcher)
186 c.Assert(timeout, Equals, false)
187 c.Assert(ok, Equals, true)
188 c.Assert(resolved, Equals, state.ResolvedNoHooks)
189
190 // No more changes.
191 _, _, timeout = receiveChange(watcher)
192 c.Assert(timeout, Equals, true)
193
194 err = watcher.Stop()
195 c.Assert(err, IsNil)
196}
197
198func (s *StateSuite) TestUnitWatchPorts(c *C) {
199 receiveChange := func(w *state.PortsWatcher) ([]state.Port, bool, bool) {
200 select {
201 case change, ok := <-w.Changes():
202 return change, ok, false
203 case <-time.After(100 * time.Millisecond):
204 return nil, false, true
205 }
206 return nil, false, false
207 }
208 dummy, _ := addDummyCharm(c, s.st)
209 wordpress, err := s.st.AddService("wordpress", dummy)
210 c.Assert(err, IsNil)
211 c.Assert(wordpress.Name(), Equals, "wordpress")
212 unit, err := wordpress.AddUnit()
213 c.Assert(err, IsNil)
214 watcher := unit.WatchPorts()
215
216 go func() {
217 time.Sleep(50 * time.Millisecond)
218 err = unit.OpenPort("tcp", 80)
219 c.Assert(err, IsNil)
220 time.Sleep(50 * time.Millisecond)
221 err = unit.OpenPort("udp", 53)
222 c.Assert(err, IsNil)
223 time.Sleep(50 * time.Millisecond)
224 err = unit.ClosePort("tcp", 80)
225 c.Assert(err, IsNil)
226 }()
227
228 // Receive the changes.
229 open, ok, timeout := receiveChange(watcher)
230 c.Assert(timeout, Equals, false)
231 c.Assert(ok, Equals, true)
232 c.Assert(open, DeepEquals, []state.Port{
233 {"tcp", 80},
234 })
235 open, ok, timeout = receiveChange(watcher)
236 c.Assert(timeout, Equals, false)
237 c.Assert(ok, Equals, true)
238 c.Assert(open, DeepEquals, []state.Port{
239 {"tcp", 80},
240 {"udp", 53},
241 })
242 open, ok, timeout = receiveChange(watcher)
243 c.Assert(timeout, Equals, false)
244 c.Assert(ok, Equals, true)
245 c.Assert(open, DeepEquals, []state.Port{
246 {"udp", 53},
247 })
248
249 // No more changes.
250 _, _, timeout = receiveChange(watcher)
251 c.Assert(timeout, Equals, true)
252
253 err = watcher.Stop()
254 c.Assert(err, IsNil)
255}
84256
=== modified file 'state/watcher/watcher.go'
--- state/watcher/watcher.go 2012-04-05 13:01:51 +0000
+++ state/watcher/watcher.go 2012-04-16 15:47:21 +0000
@@ -6,6 +6,121 @@
6 "launchpad.net/tomb"6 "launchpad.net/tomb"
7)7)
88
9// ExistenceChange holds information on the existence
10// and contents of a node. Content will be empty when the
11// node does not exist.
12type ExistenceChange struct {
13 Exists bool
14 Content string
15}
16
17// ExistenceWatcher observes a ZooKeeper node and delivers a
18// notification when it is created, changed or deleted.
19type ExistenceWatcher struct {
20 zk *zookeeper.Conn
21 path string
22 tomb tomb.Tomb
23 changeChan chan ExistenceChange
24 existence ExistenceChange
25}
26
27// NewExistenceWatcher creates a ExistenceWatcher observing
28// the ZooKeeper node at watchedPath.
29func NewExistenceWatcher(zk *zookeeper.Conn, watchedPath string) *ExistenceWatcher {
30 w := &ExistenceWatcher{
31 zk: zk,
32 path: watchedPath,
33 changeChan: make(chan ExistenceChange),
34 existence: ExistenceChange{},
35 }
36 go w.loop()
37 return w
38}
39
40// Changes returns a channel that will receive the change if a node
41// is created or deleted. In case or a created node it also contains
42// the content. Note that multiple changes may be observed as a single
43// event in the channel.
44func (w *ExistenceWatcher) Changes() <-chan ExistenceChange {
45 return w.changeChan
46}
47
48// Dying returns a channel that is closed when the
49// watcher has stopped or is about to stop.
50func (w *ExistenceWatcher) Dying() <-chan struct{} {
51 return w.tomb.Dying()
52}
53
54// Stop stops the watch and returns any error encountered
55// while watching. This method should always be called before
56// discarding the watcher.
57func (w *ExistenceWatcher) Stop() error {
58 w.tomb.Kill(nil)
59 return w.tomb.Wait()
60}
61
62// loop is the backend for watching.
63func (w *ExistenceWatcher) loop() {
64 defer w.tomb.Done()
65 defer close(w.changeChan)
66
67 watch, err := w.update()
68 if err != nil {
69 w.tomb.Kill(err)
70 return
71 }
72
73 for {
74 select {
75 case <-w.tomb.Dying():
76 return
77 case evt := <-watch:
78 if !evt.Ok() {
79 w.tomb.Killf("watcher: critical session event: %v", evt)
80 return
81 }
82 watch, err = w.update()
83 if err != nil {
84 w.tomb.Kill(err)
85 return
86 }
87 }
88 }
89}
90
91// update checks the node existence and emits changes of that state to the
92// change channel if it has changed. It returns the next watch.
93func (w *ExistenceWatcher) update() (nextWatch <-chan zookeeper.Event, err error) {
94 content, _, watch, err := w.zk.GetW(w.path)
95 switch {
96 case err != nil && !zookeeper.IsError(err, zookeeper.ZNONODE):
97 return nil, fmt.Errorf("watcher: can't get the content of node %q: %v", w.path, err)
98 case err != nil:
99 // Need a new watch to signal creation of node.
100 _, watch, err = w.zk.ExistsW(w.path)
101 if err != nil {
102 return nil, fmt.Errorf("watcher: can't check the existence of node %q: %v", w.path, err)
103 }
104 if !w.existence.Exists {
105 return watch, nil
106 }
107 w.existence.Exists = false
108 w.existence.Content = ""
109 default:
110 if w.existence.Exists && content == w.existence.Content {
111 return watch, nil
112 }
113 w.existence.Exists = true
114 w.existence.Content = content
115 }
116 select {
117 case <-w.tomb.Dying():
118 return nil, tomb.ErrDying
119 case w.changeChan <- w.existence:
120 }
121 return watch, nil
122}
123
9// ContentWatcher observes a ZooKeeper node and delivers a124// ContentWatcher observes a ZooKeeper node and delivers a
10// notification when a content change is detected.125// notification when a content change is detected.
11type ContentWatcher struct {126type ContentWatcher struct {
12127
=== modified file 'state/watcher/watcher_test.go'
--- state/watcher/watcher_test.go 2012-04-05 13:01:51 +0000
+++ state/watcher/watcher_test.go 2012-04-16 15:47:21 +0000
@@ -39,122 +39,215 @@
3939
40 s.zkConn = zk40 s.zkConn = zk
41 s.path = "/watcher"41 s.path = "/watcher"
42
43 _, err = s.zkConn.Create(s.path, "", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
44 c.Assert(err, IsNil)
45}42}
4643
47func (s *WatcherSuite) TearDownTest(c *C) {44func (s *WatcherSuite) TearDownTest(c *C) {
48 testing.ZkRemoveTree(s.zkConn, s.path)45 s.removePath(c)
49 s.zkConn.Close()46 s.zkConn.Close()
50}47}
5148
49func (s *WatcherSuite) TestExistenceWatcher(c *C) {
50 receiveChange := func(w *watcher.ExistenceWatcher) (*watcher.ExistenceChange, bool, bool) {
51 select {
52 case change, ok := <-w.Changes():
53 return &change, ok, false
54 case <-time.After(200 * time.Millisecond):
55 return nil, false, true
56 }
57 return nil, false, false
58 }
59 watcher := watcher.NewExistenceWatcher(s.zkConn, s.path)
60
61 go func() {
62 time.Sleep(50 * time.Millisecond)
63 s.createPath(c, "init")
64 time.Sleep(50 * time.Millisecond)
65 s.changeContent(c, "foo")
66 time.Sleep(50 * time.Millisecond)
67 s.removePath(c)
68 time.Sleep(50 * time.Millisecond)
69 s.createPath(c, "done")
70 }()
71
72 // Receive the four changes.
73 change, ok, timeout := receiveChange(watcher)
74 c.Assert(timeout, Equals, false)
75 c.Assert(ok, Equals, true)
76 c.Assert(change.Exists, Equals, true)
77 c.Assert(change.Content, Equals, "init")
78
79 change, ok, timeout = receiveChange(watcher)
80 c.Assert(timeout, Equals, false)
81 c.Assert(ok, Equals, true)
82 c.Assert(change.Exists, Equals, true)
83 c.Assert(change.Content, Equals, "foo")
84
85 change, ok, timeout = receiveChange(watcher)
86 c.Assert(timeout, Equals, false)
87 c.Assert(ok, Equals, true)
88 c.Assert(change.Exists, Equals, false)
89 c.Assert(change.Content, Equals, "")
90
91 change, ok, timeout = receiveChange(watcher)
92 c.Assert(timeout, Equals, false)
93 c.Assert(ok, Equals, true)
94 c.Assert(change.Exists, Equals, true)
95 c.Assert(change.Content, Equals, "done")
96
97 // No more changes.
98 _, _, timeout = receiveChange(watcher)
99 c.Assert(timeout, Equals, true)
100
101 err := watcher.Stop()
102 c.Assert(err, IsNil)
103
104 // Changes() has to be closed.
105 _, ok, timeout = receiveChange(watcher)
106 c.Assert(ok, Equals, false)
107 c.Assert(timeout, Equals, false)
108}
109
52func (s *WatcherSuite) TestContentWatcher(c *C) {110func (s *WatcherSuite) TestContentWatcher(c *C) {
111 receiveChange := func(w *watcher.ContentWatcher) (string, bool, bool) {
112 select {
113 case change, ok := <-w.Changes():
114 return change, ok, false
115 case <-time.After(200 * time.Millisecond):
116 return "", false, true
117 }
118 return "", false, false
119 }
120 s.createPath(c, "init")
53 watcher := watcher.NewContentWatcher(s.zkConn, s.path)121 watcher := watcher.NewContentWatcher(s.zkConn, s.path)
54122
55 go func() {123 go func() {
56 time.Sleep(50 * time.Millisecond)124 time.Sleep(50 * time.Millisecond)
57 s.changeContent(c, "foo")125 s.changeContent(c, "foo")
58
59 time.Sleep(50 * time.Millisecond)126 time.Sleep(50 * time.Millisecond)
60 s.changeContent(c, "foo")127 s.changeContent(c, "foo")
61
62 time.Sleep(50 * time.Millisecond)128 time.Sleep(50 * time.Millisecond)
63 s.changeContent(c, "bar")129 s.changeContent(c, "bar")
64 }()130 }()
65131
66 // Receive the two changes.132 // Receive the three changes. First one is from creation,
67 change := <-watcher.Changes()133 // the content watcher needs an existing node.
134 change, ok, timeout := receiveChange(watcher)
135 c.Assert(timeout, Equals, false)
136 c.Assert(ok, Equals, true)
137 c.Assert(change, Equals, "init")
138
139 change, ok, timeout = receiveChange(watcher)
140 c.Assert(timeout, Equals, false)
141 c.Assert(ok, Equals, true)
68 c.Assert(change, Equals, "foo")142 c.Assert(change, Equals, "foo")
69143
70 change = <-watcher.Changes()144 change, ok, timeout = receiveChange(watcher)
145 c.Assert(timeout, Equals, false)
146 c.Assert(ok, Equals, true)
71 c.Assert(change, Equals, "bar")147 c.Assert(change, Equals, "bar")
72148
73 // No more changes.149 // No more changes.
74 select {150 _, _, timeout = receiveChange(watcher)
75 case <-watcher.Changes():151 c.Assert(timeout, Equals, true)
76 c.Fatalf("no more changes expected")
77 case <-time.After(200 * time.Millisecond):
78 // The timeout is expected.
79 }
80152
81 err := watcher.Stop()153 err := watcher.Stop()
82 c.Assert(err, IsNil)154 c.Assert(err, IsNil)
83155
84 // Changes() has to be closed156 // Changes() has to be closed.
85 select {157 _, ok, timeout = receiveChange(watcher)
86 case _, ok := <-watcher.Changes():158 c.Assert(ok, Equals, false)
87 c.Assert(ok, Equals, false)159 c.Assert(timeout, Equals, false)
88 case <-time.After(200 * time.Millisecond):160}
89 // Timeout should not be needed.161
90 c.Fatalf("timeout should not happen")162func (s *WatcherSuite) TestContentWatcherDeletedNode(c *C) {
163 receiveChange := func(w *watcher.ContentWatcher) (string, bool, bool) {
164 select {
165 case change, ok := <-w.Changes():
166 return change, ok, false
167 case <-time.After(200 * time.Millisecond):
168 return "", false, true
169 }
170 return "", false, false
91 }171 }
172 s.createPath(c, "init")
173 watcher := watcher.NewContentWatcher(s.zkConn, s.path)
174
175 // Receive initiol creation event.
176 change := <-watcher.Changes()
177 c.Assert(change, Equals, "init")
178
179 go func() {
180 time.Sleep(50 * time.Millisecond)
181 s.removePath(c)
182 }()
183
184 // Changes() has to be closed.
185 _, ok, timeout := receiveChange(watcher)
186 c.Assert(ok, Equals, false)
187 c.Assert(timeout, Equals, false)
188
189 err := watcher.Stop()
190 c.Assert(err, ErrorMatches, `watcher: node "/watcher" has been deleted`)
92}191}
93192
94func (s *WatcherSuite) TestChildrenWatcher(c *C) {193func (s *WatcherSuite) TestChildrenWatcher(c *C) {
194 receiveChange := func(w *watcher.ChildrenWatcher) (*watcher.ChildrenChange, bool, bool) {
195 select {
196 case change, ok := <-w.Changes():
197 return &change, ok, false
198 case <-time.After(200 * time.Millisecond):
199 return nil, false, true
200 }
201 return nil, false, false
202 }
203 s.createPath(c, "init")
95 watcher := watcher.NewChildrenWatcher(s.zkConn, s.path)204 watcher := watcher.NewChildrenWatcher(s.zkConn, s.path)
96205
97 go func() {206 go func() {
98 time.Sleep(50 * time.Millisecond)207 time.Sleep(50 * time.Millisecond)
99 s.changeChildren(c, true, "foo")208 s.changeChildren(c, true, "foo")
100
101 time.Sleep(50 * time.Millisecond)209 time.Sleep(50 * time.Millisecond)
102 s.changeChildren(c, true, "bar")210 s.changeChildren(c, true, "bar")
103
104 time.Sleep(50 * time.Millisecond)211 time.Sleep(50 * time.Millisecond)
105 s.changeChildren(c, false, "foo")212 s.changeChildren(c, false, "foo")
106 }()213 }()
107214
108 // Receive the three changes.215 // Receive the three changes.
109 change := <-watcher.Changes()216 change, ok, timeout := receiveChange(watcher)
217 c.Assert(timeout, Equals, false)
218 c.Assert(ok, Equals, true)
110 c.Assert(change.Added, DeepEquals, []string{"foo"})219 c.Assert(change.Added, DeepEquals, []string{"foo"})
111220
112 change = <-watcher.Changes()221 change, ok, timeout = receiveChange(watcher)
222 c.Assert(timeout, Equals, false)
223 c.Assert(ok, Equals, true)
113 c.Assert(change.Added, DeepEquals, []string{"bar"})224 c.Assert(change.Added, DeepEquals, []string{"bar"})
114225
115 change = <-watcher.Changes()226 change, ok, timeout = receiveChange(watcher)
227 c.Assert(timeout, Equals, false)
228 c.Assert(ok, Equals, true)
116 c.Assert(change.Deleted, DeepEquals, []string{"foo"})229 c.Assert(change.Deleted, DeepEquals, []string{"foo"})
117230
118 // No more changes.231 // No more changes.
119 select {232 _, _, timeout = receiveChange(watcher)
120 case <-watcher.Changes():233 c.Assert(timeout, Equals, true)
121 c.Fatalf("no more changes expected")234
122 case <-time.After(time.Second):235 err := watcher.Stop()
123 // The timeout is expected.236 c.Assert(err, IsNil)
124 }237
125238 // Changes() has to be closed.
126 err := watcher.Stop()239 _, ok, timeout = receiveChange(watcher)
127 c.Assert(err, IsNil)240 c.Assert(ok, Equals, false)
128241 c.Assert(timeout, Equals, false)
129 // Changes() has to be closed242}
130 select {243
131 case _, ok := <-watcher.Changes():244func (s *WatcherSuite) createPath(c *C, content string) {
132 c.Assert(ok, Equals, false)245 _, err := s.zkConn.Create(s.path, content, 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
133 case <-time.After(200 * time.Millisecond):246 c.Assert(err, IsNil)
134 // Timeout should not be needed.247}
135 c.Fatalf("timeout should not happen")248
136 }249func (s *WatcherSuite) removePath(c *C) {
137}250 testing.ZkRemoveTree(s.zkConn, s.path)
138
139func (s *WatcherSuite) TestDeletedNode(c *C) {
140 watcher := watcher.NewContentWatcher(s.zkConn, s.path)
141
142 go func() {
143 time.Sleep(50 * time.Millisecond)
144 testing.ZkRemoveTree(s.zkConn, s.path)
145 }()
146
147 // Changes() has to be closed
148 select {
149 case _, ok := <-watcher.Changes():
150 c.Assert(ok, Equals, false)
151 case <-time.After(200 * time.Millisecond):
152 // Timeout should not be needed.
153 c.Fatalf("timeout should not happen")
154 }
155
156 err := watcher.Stop()
157 c.Assert(err, ErrorMatches, `watcher: node "/watcher" has been deleted`)
158}251}
159252
160func (s *WatcherSuite) changeContent(c *C, content string) {253func (s *WatcherSuite) changeContent(c *C, content string) {

Subscribers

People subscribed via source and target branches