Merge lp:~themue/pyjuju/go-state-more-unit-watchers into lp:pyjuju/go

Proposed by Frank Mueller
Status: Rejected
Rejected by: Gustavo Niemeyer
Proposed branch: lp:~themue/pyjuju/go-state-more-unit-watchers
Merge into: lp:pyjuju/go
Diff against target: 1044 lines (+718/-102)
6 files modified
state/state.go (+6/-0)
state/state_test.go (+1/-1)
state/unit.go (+246/-16)
state/watch_test.go (+191/-19)
state/watcher/watcher.go (+115/-0)
state/watcher/watcher_test.go (+159/-66)
To merge this branch: bzr merge lp:~themue/pyjuju/go-state-more-unit-watchers
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+101760@code.launchpad.net

Description of the change

More watchers for Unit.

Adding of NeedsUpgradeWatcher, ResolvedWatcher and PortsWatcher
for Units. They all are based on the new ExistenceWatcher for
the creation, content changing and deletion of nodes.

https://codereview.appspot.com/6011047/

To post a comment you must log in.
Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (4.1 KiB)

looks good.

https://codereview.appspot.com/6011047/diff/1/state/unit.go
File state/unit.go (right):

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode489
state/unit.go:489: // NeedsUpgradeWatcher observes changes to a upgrade
flag node.
  // NeedsUpgradeWatcher observes changes to a unit's upgrade flag.
?

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode531
state/unit.go:531: defer close(w.changeChan)
is this how callers are supposed to know that the upgrade watcher is
shutting down? there's no Dying method, so i guess so. if so, it should
be documented (it's different from the ExistenceWatcher way); if not
then you should a a Dying method and ensure that w.tomb is in a dying
state before closing changeChan.

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode614
state/unit.go:614: // PortsWatcher observes changes to a ports node.
// PortsWatcher observes changes to a unit's open ports.

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode656
state/unit.go:656: defer close(w.changeChan)
same applies here as to NeedsUpgradeWatcher.

https://codereview.appspot.com/6011047/diff/1/state/unit.go#newcode663
state/unit.go:663: ports := &struct{ Open []Port }{}
var ports struct {
     Open []Port
}
would be clearer i think (and would avoid the unnecessary double
indirection).

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go
File state/watch_test.go (right):

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go#newcode13
state/watch_test.go:13: return change, ok, false
you could probably just Assert(ok, Equals, true) here
and thus simplify the interface to receiveChange.
That applies to most of the similar tests below - those that don't test
for the changes channel being closed can factor out the closed test.

another possibility is to pass expected values for timeout and closed
into the function:

receiveChange := func(w *state.ConfigWatcher, expectClosed,
expectTimeout bool) *state.ConfigNode {
     etc
}

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go#newcode14
state/watch_test.go:14: case <-time.After(time.Second):
i think a second is too long here. 100 * time.Millsecond?
same applies below, let's not make testing take too long.

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go
File state/watcher/watcher.go (right):

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go#newcode10
state/watcher/watcher.go:10: // of a node and, if it's true, its
content.
// ExistenceChange holds information on the existence
// and contents of a node. Content will be empty when the
// node does not exist.
?

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go#newcode17
state/watcher/watcher.go:17: // notification (including the content)
when it is created, changed
s/ (including the content)//

it's implied by the rest of the sentence and the ExistenceChange type.

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go#newcode94
state/watcher/watcher.go:94: content, _, watch, err := w.zk.GetW(w.path)
it seems slightly odd that when a node gets deleted, we ignore what
we've just...

Read more...

113. By Frank Mueller

Changed after review.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go
File state/watch_test.go (right):

https://codereview.appspot.com/6011047/diff/1/state/watch_test.go#newcode13
state/watch_test.go:13: return change, ok, false
On 2012/04/16 15:45:31, TheMue wrote:
> On 2012/04/16 11:43:01, rog wrote:
> > you could probably just Assert(ok, Equals, true) here
> > and thus simplify the interface to receiveChange.
> > That applies to most of the similar tests below - those that don't
test for
> the
> > changes channel being closed can factor out the closed test.
> >
> > another possibility is to pass expected values for timeout and
closed into the
> > function:
> >
> > receiveChange := func(w *state.ConfigWatcher, expectClosed,
expectTimeout
> bool)
> > *state.ConfigNode {
> > etc
> > }

> Thought about it too. But then I decided for a clean seperation of the
simple
> retrievement with a possible timeout (had some due to errors while
development)
> and the assertions in the test. To me this way is more readable, even
if the
> code is bit more bloated.

fair enough.

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go
File state/watcher/watcher.go (right):

https://codereview.appspot.com/6011047/diff/1/state/watcher/watcher.go#newcode94
state/watcher/watcher.go:94: content, _, watch, err := w.zk.GetW(w.path)
On 2012/04/16 15:45:31, TheMue wrote:
> On 2012/04/16 11:43:01, rog wrote:
> > it seems slightly odd that when a node gets deleted, we ignore what
we've just
> > been told and call GetW and then ExistsW on the node anyway.
> > i appreciate that passing the event type into update might
complicate the
> logic
> > a little, but i think it might be worth doing. something like this
perhaps?
> >
> > func (w *ExistenceWatcher) update(event int) (nextWatch <-chan
> zookeeper.Event,
> > err error
> > ) {
> > var content string
> > var watch <-chan zookeeper.Event
> > if event != zookeeper.EVENT_DELETED {
> > content, _, watch, err = w.zk.GetW(w.path)
> > }
> > switch {
> > case event == zookeeper.EVENT_DELETED || zookeeper.IsError(err,
> > zookeeper.ZNONODE):
> > call existsW
> > etc
> > }

> Hmm, ok, which event would you pass for the first call of update()? At
this
> moment you're unsure if it exists. And where does err in switch come
from?

you could pass any event that's not EVENT_DELETED - EVENT_CHANGED would
be fine.

err is defined in the same place it is currently - as a return
parameter.

https://codereview.appspot.com/6011047/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

As we discussed over IRC, can you please break that down into smaller
chunks?

https://codereview.appspot.com/6011047/diff/6001/state/state.go
File state/state.go (right):

https://codereview.appspot.com/6011047/diff/6001/state/state.go#newcode273
state/state.go:273: func (w *ConfigWatcher) Dying() <-chan struct{} {
Why is this necessary?

https://codereview.appspot.com/6011047/diff/6001/state/unit.go
File state/unit.go (right):

https://codereview.appspot.com/6011047/diff/6001/state/unit.go#newcode316
state/unit.go:316: // WatchResolved creates a watcher for the resolved
node of the unit.
There's no such thing as a "resolved node" documented in this API. We
need to document these methods in terms of what they mean semantically
for juju, rather than what they mean for the interaction of these
methods with zookeeper. For example:

// WatchResolved returns a watcher that fires when the unit
// is marked as having had its problems resolved. See
// SetResolved for details.

Please review the documentation for the other methods.

https://codereview.appspot.com/6011047/diff/6001/state/unit.go#newcode480
state/unit.go:480: if none && mode == ResolvedNone {
none? Does it mean acceptNone? Please fix the argument name and document
it.

https://codereview.appspot.com/6011047/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

More notes on the changes in watcher to get you going tomorrow.

https://codereview.appspot.com/6011047/diff/6001/state/watcher/watcher.go
File state/watcher/watcher.go (right):

https://codereview.appspot.com/6011047/diff/6001/state/watcher/watcher.go#newcode12
state/watcher/watcher.go:12: type ExistenceChange struct {
This isn't watching just existence.. it's watching both existence and
contents. How's that different from the ContentsWatcher, and why do we
need both? Any higher-level watcher that needs contents may be
implemented on top of a watcher that offers existence *and* contents. I
suggest dropping this type, and modifying ContentsWatcher to handle both
problems.

This should be done in an independent branch.

https://codereview.appspot.com/6011047/diff/6001/state/watcher/watcher.go#newcode96
state/watcher/watcher.go:96: case err != nil && !zookeeper.IsError(err,
zookeeper.ZNONODE):
This check should be inverted. We care about the case ZNONODE, so let's
be explicit about it:

case zookeeper.IsError(err, zookeeper.ZNONODE):
    ...

https://codereview.appspot.com/6011047/diff/6001/state/watcher/watcher.go#newcode100
state/watcher/watcher.go:100: _, watch, err = w.zk.ExistsW(w.path)
This is bogus. It is assuming that the remote state of the node is known
here, but it's not. The current state of the node is in that parameter
that is being ignored.

The overall logic must be fixed to take that into consideration.

https://codereview.appspot.com/6011047/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

For the record, this work is being split down into other smaller
branches. Please do keep these reviews in mind, though.

https://codereview.appspot.com/6011047/

Unmerged revisions

113. By Frank Mueller

Changed after review.

112. By Frank Mueller

Extended watcher package by ExistenceWatcher with looks for creation,
content changes and deletions. Used it in NeedsUpgradeWatcher, ResolvedWatcher
and PortsWatcher for Units.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'state/state.go'
2--- state/state.go 2012-04-05 13:01:51 +0000
3+++ state/state.go 2012-04-16 15:47:21 +0000
4@@ -268,6 +268,12 @@
5 return w.changeChan
6 }
7
8+// Dying returns a channel that is closed when the
9+// watcher has stopped or is about to stop.
10+func (w *ConfigWatcher) Dying() <-chan struct{} {
11+ return w.tomb.Dying()
12+}
13+
14 // Stop stops the watch and returns any error encountered
15 // while watching. This method should always be called
16 // before discarding the watcher.
17
18=== modified file 'state/state_test.go'
19--- state/state_test.go 2012-03-26 12:47:37 +0000
20+++ state/state_test.go 2012-04-16 15:47:21 +0000
21@@ -778,7 +778,7 @@
22 c.Assert(err, ErrorMatches, "no unused machine found")
23 }
24
25-func (s *StateSuite) TestGetSetClearUnitUpgrate(c *C) {
26+func (s *StateSuite) TestGetSetClearUnitUpgrade(c *C) {
27 // Check that setting and clearing an upgrade flag on a unit works.
28 dummy, _ := addDummyCharm(c, s.st)
29 wordpress, err := s.st.AddService("wordpress", dummy)
30
31=== modified file 'state/unit.go'
32--- state/unit.go 2012-03-20 20:28:54 +0000
33+++ state/unit.go 2012-04-16 15:47:21 +0000
34@@ -11,6 +11,8 @@
35 "launchpad.net/gozk/zookeeper"
36 "launchpad.net/juju/go/charm"
37 "launchpad.net/juju/go/state/presence"
38+ "launchpad.net/juju/go/state/watcher"
39+ "launchpad.net/tomb"
40 "strconv"
41 "strings"
42 "time"
43@@ -261,6 +263,11 @@
44 return err
45 }
46
47+// WatchNeedsUpgrade creates a watcher for the upgrade flag of the unit.
48+func (u *Unit) WatchNeedsUpgrade() *NeedsUpgradeWatcher {
49+ return newNeedsUpgradeWatcher(u)
50+}
51+
52 // Resolved returns the resolved mode for the unit.
53 func (u *Unit) Resolved() (ResolvedMode, error) {
54 yaml, _, err := u.st.zk.Get(u.zkResolvedPath())
55@@ -271,15 +278,7 @@
56 if err != nil {
57 return ResolvedNone, err
58 }
59- setting := &struct{ Retry ResolvedMode }{}
60- if err = goyaml.Unmarshal([]byte(yaml), setting); err != nil {
61- return ResolvedNone, err
62- }
63- mode := setting.Retry
64- if err := validResolvedMode(mode); err != nil {
65- return ResolvedNone, err
66- }
67- return mode, nil
68+ return parseResolvedMode(yaml)
69 }
70
71 // SetResolved marks the unit as having had any previous state
72@@ -289,7 +288,7 @@
73 // reexecute previous failed hooks or to continue as if they had
74 // succeeded before.
75 func (u *Unit) SetResolved(mode ResolvedMode) error {
76- if err := validResolvedMode(mode); err != nil {
77+ if err := validResolvedMode(mode, false); err != nil {
78 return err
79 }
80 setting := &struct{ Retry ResolvedMode }{mode}
81@@ -314,6 +313,11 @@
82 return err
83 }
84
85+// WatchResolved creates a watcher for the resolved node of the unit.
86+func (u *Unit) WatchResolved() *ResolvedWatcher {
87+ return newResolvedWatcher(u)
88+}
89+
90 // OpenPort sets the policy of the port with protocol and number to be opened.
91 func (u *Unit) OpenPort(protocol string, number int) error {
92 openPort := func(oldYaml string, stat *zookeeper.Stat) (string, error) {
93@@ -386,6 +390,11 @@
94 return ports.Open, nil
95 }
96
97+// WatchPorts creates a watcher for the ports node of the unit.
98+func (u *Unit) WatchPorts() *PortsWatcher {
99+ return newPortsWatcher(u)
100+}
101+
102 // AgentAlive returns whether the respective remote agent is alive.
103 func (u *Unit) AgentAlive() (bool, error) {
104 return presence.Alive(u.st.zk, u.zkAgentPath())
105@@ -417,11 +426,6 @@
106 return fmt.Sprintf("/units/%s", u.key)
107 }
108
109-// zkPortsPath returns the ZooKeeper path for the open ports.
110-func (u *Unit) zkPortsPath() string {
111- return fmt.Sprintf("/units/%s/ports", u.key)
112-}
113-
114 // zkAgentPath returns the ZooKeeper path for the unit agent.
115 func (u *Unit) zkAgentPath() string {
116 return fmt.Sprintf("/units/%s/agent", u.key)
117@@ -437,6 +441,11 @@
118 return fmt.Sprintf("/units/%s/resolved", u.key)
119 }
120
121+// zkPortsPath returns the ZooKeeper path for the open ports.
122+func (u *Unit) zkPortsPath() string {
123+ return fmt.Sprintf("/units/%s/ports", u.key)
124+}
125+
126 // parseUnitName parses a unit name like "wordpress/0" into
127 // its service name and sequence number parts.
128 func parseUnitName(name string) (serviceName string, seqNo int, err error) {
129@@ -451,11 +460,232 @@
130 return parts[0], int(sequenceNo), nil
131 }
132
133+// parseResolveMode parses a given YAML for the resolve mode
134+// and checks if it's valid.
135+func parseResolvedMode(yaml string) (ResolvedMode, error) {
136+ setting := &struct{ Retry ResolvedMode }{}
137+ if err := goyaml.Unmarshal([]byte(yaml), setting); err != nil {
138+ return ResolvedNone, err
139+ }
140+ mode := setting.Retry
141+ if err := validResolvedMode(mode, true); err != nil {
142+ return ResolvedNone, err
143+ }
144+ return mode, nil
145+}
146+
147 // validResolvedMode ensures that only valid values for the
148 // resolved mode are used.
149-func validResolvedMode(mode ResolvedMode) error {
150+func validResolvedMode(mode ResolvedMode, none bool) error {
151+ if none && mode == ResolvedNone {
152+ return nil
153+ }
154 if mode != ResolvedRetryHooks && mode != ResolvedNoHooks {
155 return fmt.Errorf("invalid error resolution mode: %d", mode)
156 }
157 return nil
158 }
159+
160+// NeedsUpgradeWatcher observes changes to a unit's upgrade flag.
161+type NeedsUpgradeWatcher struct {
162+ unit *Unit
163+ tomb tomb.Tomb
164+ watcher *watcher.ExistenceWatcher
165+ changeChan chan bool
166+}
167+
168+// newNeedsUpgradeWatcher creates and starts a new resolved flag node
169+// watcher for the given path.
170+func newNeedsUpgradeWatcher(u *Unit) *NeedsUpgradeWatcher {
171+ w := &NeedsUpgradeWatcher{
172+ unit: u,
173+ changeChan: make(chan bool),
174+ watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkNeedsUpgradePath()),
175+ }
176+ go w.loop()
177+ return w
178+}
179+
180+// Changes returns a channel that will receive the new
181+// resolved mode when a change is detected. Note that multiple
182+// changes may be observed as a single event in the channel.
183+func (w *NeedsUpgradeWatcher) Changes() <-chan bool {
184+ return w.changeChan
185+}
186+
187+// Dying returns a channel that is closed when the
188+// watcher has stopped or is about to stop.
189+func (w *NeedsUpgradeWatcher) Dying() <-chan struct{} {
190+ return w.tomb.Dying()
191+}
192+
193+// Stop stops the watch and returns any error encountered
194+// while watching. This method should always be called
195+// before discarding the watcher.
196+func (w *NeedsUpgradeWatcher) Stop() error {
197+ w.tomb.Kill(nil)
198+ if err := w.watcher.Stop(); err != nil {
199+ w.tomb.Wait()
200+ return err
201+ }
202+ return w.tomb.Wait()
203+}
204+
205+// loop is the backend for watching the resolved flag node.
206+func (w *NeedsUpgradeWatcher) loop() {
207+ defer w.tomb.Done()
208+ defer close(w.changeChan)
209+
210+ for {
211+ select {
212+ case <-w.tomb.Dying():
213+ return
214+ case change := <-w.watcher.Changes():
215+ select {
216+ case <-w.watcher.Dying():
217+ return
218+ case <-w.tomb.Dying():
219+ return
220+ case w.changeChan <- change.Exists:
221+ }
222+ }
223+ }
224+}
225+
226+// ResolvedWatcher observes changes to a resolved flag node.
227+type ResolvedWatcher struct {
228+ unit *Unit
229+ tomb tomb.Tomb
230+ watcher *watcher.ExistenceWatcher
231+ changeChan chan ResolvedMode
232+}
233+
234+// newResolvedWatcher creates and starts a new resolved flag node
235+// watcher for the given path.
236+func newResolvedWatcher(u *Unit) *ResolvedWatcher {
237+ w := &ResolvedWatcher{
238+ unit: u,
239+ changeChan: make(chan ResolvedMode),
240+ watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkResolvedPath()),
241+ }
242+ go w.loop()
243+ return w
244+}
245+
246+// Changes returns a channel that will receive the new
247+// resolved mode when a change is detected. Note that multiple
248+// changes may be observed as a single event in the channel.
249+func (w *ResolvedWatcher) Changes() <-chan ResolvedMode {
250+ return w.changeChan
251+}
252+
253+// Dying returns a channel that is closed when the
254+// watcher has stopped or is about to stop.
255+func (w *ResolvedWatcher) Dying() <-chan struct{} {
256+ return w.tomb.Dying()
257+}
258+
259+// Stop stops the watch and returns any error encountered
260+// while watching. This method should always be called
261+// before discarding the watcher.
262+func (w *ResolvedWatcher) Stop() error {
263+ w.tomb.Kill(nil)
264+ if err := w.watcher.Stop(); err != nil {
265+ w.tomb.Wait()
266+ return err
267+ }
268+ return w.tomb.Wait()
269+}
270+
271+// loop is the backend for watching the resolved flag node.
272+func (w *ResolvedWatcher) loop() {
273+ defer w.tomb.Done()
274+ defer close(w.changeChan)
275+
276+ for {
277+ select {
278+ case <-w.tomb.Dying():
279+ return
280+ case change := <-w.watcher.Changes():
281+ mode, err := parseResolvedMode(change.Content)
282+ if err != nil {
283+ w.tomb.Kill(err)
284+ return
285+ }
286+ select {
287+ case <-w.watcher.Dying():
288+ return
289+ case <-w.tomb.Dying():
290+ return
291+ case w.changeChan <- mode:
292+ }
293+ }
294+ }
295+}
296+
297+// PortsWatcher observes changes to a unit's open ports.
298+type PortsWatcher struct {
299+ unit *Unit
300+ tomb tomb.Tomb
301+ watcher *watcher.ExistenceWatcher
302+ changeChan chan []Port
303+}
304+
305+// newPortsWatcher creates and starts a new resolved flag node
306+// watcher for the given path.
307+func newPortsWatcher(u *Unit) *PortsWatcher {
308+ w := &PortsWatcher{
309+ unit: u,
310+ changeChan: make(chan []Port),
311+ watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkPortsPath()),
312+ }
313+ go w.loop()
314+ return w
315+}
316+
317+// Changes returns a channel that will receive the new
318+// ports when a change is detected. Note that multiple
319+// changes may be observed as a single event in the channel.
320+func (w *PortsWatcher) Changes() <-chan []Port {
321+ return w.changeChan
322+}
323+
324+// Stop stops the watch and returns any error encountered
325+// while watching. This method should always be called
326+// before discarding the watcher.
327+func (w *PortsWatcher) Stop() error {
328+ w.tomb.Kill(nil)
329+ if err := w.watcher.Stop(); err != nil {
330+ w.tomb.Wait()
331+ return err
332+ }
333+ return w.tomb.Wait()
334+}
335+
336+// loop is the backend for watching the ports node.
337+func (w *PortsWatcher) loop() {
338+ defer w.tomb.Done()
339+ defer close(w.changeChan)
340+
341+ for {
342+ select {
343+ case <-w.tomb.Dying():
344+ return
345+ case change := <-w.watcher.Changes():
346+ var ports struct {
347+ Open []Port
348+ }
349+ if err := goyaml.Unmarshal([]byte(change.Content), &ports); err != nil {
350+ w.tomb.Kill(err)
351+ return
352+ }
353+ select {
354+ case <-w.watcher.Dying():
355+ return
356+ case <-w.tomb.Dying():
357+ return
358+ case w.changeChan <- ports.Open:
359+ }
360+ }
361+ }
362+}
363
364=== modified file 'state/watch_test.go'
365--- state/watch_test.go 2012-04-05 13:01:51 +0000
366+++ state/watch_test.go 2012-04-16 15:47:21 +0000
367@@ -2,10 +2,20 @@
368
369 import (
370 . "launchpad.net/gocheck"
371+ "launchpad.net/juju/go/state"
372 "time"
373 )
374
375 func (s *StateSuite) TestServiceWatchConfig(c *C) {
376+ receiveChange := func(w *state.ConfigWatcher) (*state.ConfigNode, bool, bool) {
377+ select {
378+ case change, ok := <-w.Changes():
379+ return change, ok, false
380+ case <-time.After(100 * time.Millisecond):
381+ return nil, false, true
382+ }
383+ return nil, false, false
384+ }
385 dummy, _ := addDummyCharm(c, s.st)
386 wordpress, err := s.st.AddService("wordpress", dummy)
387 c.Assert(err, IsNil)
388@@ -17,7 +27,9 @@
389 watcher := wordpress.WatchConfig()
390
391 // Recieve initial event after creation.
392- changedConfig := <-watcher.Changes()
393+ changedConfig, ok, timeout := receiveChange(watcher)
394+ c.Assert(timeout, Equals, false)
395+ c.Assert(ok, Equals, true)
396 c.Assert(changedConfig.Keys(), HasLen, 0)
397
398 // Two more change events.
399@@ -25,20 +37,23 @@
400 config.Set("baz", "yadda")
401 _, err = config.Write()
402 c.Assert(err, IsNil)
403-
404 time.Sleep(100 * time.Millisecond)
405 config.Delete("foo")
406 _, err = config.Write()
407 c.Assert(err, IsNil)
408
409 // Receive the two changes.
410- changedConfig = <-watcher.Changes()
411+ changedConfig, ok, timeout = receiveChange(watcher)
412+ c.Assert(timeout, Equals, false)
413+ c.Assert(ok, Equals, true)
414 c.Assert(changedConfig.Map(), DeepEquals, map[string]interface{}{"foo": "bar", "baz": "yadda"})
415 foo, found := changedConfig.Get("foo")
416 c.Assert(found, Equals, true)
417 c.Assert(foo, Equals, "bar")
418
419- changedConfig = <-watcher.Changes()
420+ changedConfig, ok, timeout = receiveChange(watcher)
421+ c.Assert(timeout, Equals, false)
422+ c.Assert(ok, Equals, true)
423 c.Assert(changedConfig.Map(), DeepEquals, map[string]interface{}{"baz": "yadda"})
424 foo, found = changedConfig.Get("foo")
425 c.Assert(found, Equals, false)
426@@ -47,18 +62,23 @@
427 c.Assert(baz, Equals, "yadda")
428
429 // No more changes.
430- select {
431- case <-watcher.Changes():
432- c.Fatalf("no more config changes expected")
433- case <-time.After(200 * time.Millisecond):
434- // The timeout is expected.
435- }
436+ _, _, timeout = receiveChange(watcher)
437+ c.Assert(timeout, Equals, true)
438
439 err = watcher.Stop()
440 c.Assert(err, IsNil)
441 }
442
443-func (s *StateSuite) TestServiceWatchConfigIllegalData(c *C) {
444+func (s *StateSuite) TestServiceWatchConfigIllegalDatal(c *C) {
445+ receiveChange := func(w *state.ConfigWatcher) (*state.ConfigNode, bool, bool) {
446+ select {
447+ case change, ok := <-w.Changes():
448+ return change, ok, false
449+ case <-time.After(100 * time.Millisecond):
450+ return nil, false, true
451+ }
452+ return nil, false, false
453+ }
454 dummy, _ := addDummyCharm(c, s.st)
455 wordpress, err := s.st.AddService("wordpress", dummy)
456 c.Assert(err, IsNil)
457@@ -69,15 +89,167 @@
458 _, err = s.zkConn.Set("/services/service-0000000000/config", "---", -1)
459 c.Assert(err, IsNil)
460
461- // Changes() has to be closed
462- select {
463- case _, ok := <-watcher.Changes():
464- c.Assert(ok, Equals, false)
465- case <-time.After(200 * time.Millisecond):
466- // Timeout should not be needed.
467- c.Fatalf("config change channel should have been closed due to the illegal data")
468- }
469+ // Changes() has to be closed.
470+ _, ok, timeout := receiveChange(watcher)
471+ c.Assert(ok, Equals, false)
472+ c.Assert(timeout, Equals, false)
473
474 err = watcher.Stop()
475 c.Assert(err, ErrorMatches, "YAML error: .*")
476 }
477+
478+func (s *StateSuite) TestUnitWatchNeedsUpgrade(c *C) {
479+ receiveChange := func(w *state.NeedsUpgradeWatcher) (bool, bool, bool) {
480+ select {
481+ case change, ok := <-w.Changes():
482+ return change, ok, false
483+ case <-time.After(100 * time.Millisecond):
484+ return false, false, true
485+ }
486+ return false, false, false
487+ }
488+ dummy, _ := addDummyCharm(c, s.st)
489+ wordpress, err := s.st.AddService("wordpress", dummy)
490+ c.Assert(err, IsNil)
491+ c.Assert(wordpress.Name(), Equals, "wordpress")
492+ unit, err := wordpress.AddUnit()
493+ c.Assert(err, IsNil)
494+ watcher := unit.WatchNeedsUpgrade()
495+
496+ go func() {
497+ time.Sleep(50 * time.Millisecond)
498+ err = unit.SetNeedsUpgrade()
499+ c.Assert(err, IsNil)
500+ time.Sleep(50 * time.Millisecond)
501+ err = unit.ClearNeedsUpgrade()
502+ c.Assert(err, IsNil)
503+ }()
504+
505+ // Receive the changes.
506+ upgrade, ok, timeout := receiveChange(watcher)
507+ c.Assert(timeout, Equals, false)
508+ c.Assert(ok, Equals, true)
509+ c.Assert(upgrade, Equals, true)
510+ upgrade, ok, timeout = receiveChange(watcher)
511+ c.Assert(ok, Equals, true)
512+ c.Assert(timeout, Equals, false)
513+ c.Assert(upgrade, Equals, false)
514+
515+ // No more changes.
516+ _, _, timeout = receiveChange(watcher)
517+ c.Assert(timeout, Equals, true)
518+
519+ err = watcher.Stop()
520+ c.Assert(err, IsNil)
521+}
522+
523+func (s *StateSuite) TestUnitWatchResolved(c *C) {
524+ receiveChange := func(w *state.ResolvedWatcher) (state.ResolvedMode, bool, bool) {
525+ select {
526+ case change, ok := <-w.Changes():
527+ return change, ok, false
528+ case <-time.After(100 * time.Millisecond):
529+ return state.ResolvedNoHooks, false, true
530+ }
531+ return state.ResolvedNoHooks, false, false
532+ }
533+ dummy, _ := addDummyCharm(c, s.st)
534+ wordpress, err := s.st.AddService("wordpress", dummy)
535+ c.Assert(err, IsNil)
536+ c.Assert(wordpress.Name(), Equals, "wordpress")
537+ unit, err := wordpress.AddUnit()
538+ c.Assert(err, IsNil)
539+ watcher := unit.WatchResolved()
540+
541+ go func() {
542+ time.Sleep(50 * time.Millisecond)
543+ err = unit.SetResolved(state.ResolvedRetryHooks)
544+ c.Assert(err, IsNil)
545+ time.Sleep(50 * time.Millisecond)
546+ err = unit.ClearResolved()
547+ c.Assert(err, IsNil)
548+ time.Sleep(50 * time.Millisecond)
549+ err = unit.SetResolved(state.ResolvedNoHooks)
550+ c.Assert(err, IsNil)
551+ }()
552+
553+ // Receive the changes.
554+ resolved, ok, timeout := receiveChange(watcher)
555+ c.Assert(timeout, Equals, false)
556+ c.Assert(ok, Equals, true)
557+ c.Assert(resolved, Equals, state.ResolvedRetryHooks)
558+ resolved, ok, timeout = receiveChange(watcher)
559+ c.Assert(timeout, Equals, false)
560+ c.Assert(ok, Equals, true)
561+ c.Assert(resolved, Equals, state.ResolvedNone)
562+ resolved, ok, timeout = receiveChange(watcher)
563+ c.Assert(timeout, Equals, false)
564+ c.Assert(ok, Equals, true)
565+ c.Assert(resolved, Equals, state.ResolvedNoHooks)
566+
567+ // No more changes.
568+ _, _, timeout = receiveChange(watcher)
569+ c.Assert(timeout, Equals, true)
570+
571+ err = watcher.Stop()
572+ c.Assert(err, IsNil)
573+}
574+
575+func (s *StateSuite) TestUnitWatchPorts(c *C) {
576+ receiveChange := func(w *state.PortsWatcher) ([]state.Port, bool, bool) {
577+ select {
578+ case change, ok := <-w.Changes():
579+ return change, ok, false
580+ case <-time.After(100 * time.Millisecond):
581+ return nil, false, true
582+ }
583+ return nil, false, false
584+ }
585+ dummy, _ := addDummyCharm(c, s.st)
586+ wordpress, err := s.st.AddService("wordpress", dummy)
587+ c.Assert(err, IsNil)
588+ c.Assert(wordpress.Name(), Equals, "wordpress")
589+ unit, err := wordpress.AddUnit()
590+ c.Assert(err, IsNil)
591+ watcher := unit.WatchPorts()
592+
593+ go func() {
594+ time.Sleep(50 * time.Millisecond)
595+ err = unit.OpenPort("tcp", 80)
596+ c.Assert(err, IsNil)
597+ time.Sleep(50 * time.Millisecond)
598+ err = unit.OpenPort("udp", 53)
599+ c.Assert(err, IsNil)
600+ time.Sleep(50 * time.Millisecond)
601+ err = unit.ClosePort("tcp", 80)
602+ c.Assert(err, IsNil)
603+ }()
604+
605+ // Receive the changes.
606+ open, ok, timeout := receiveChange(watcher)
607+ c.Assert(timeout, Equals, false)
608+ c.Assert(ok, Equals, true)
609+ c.Assert(open, DeepEquals, []state.Port{
610+ {"tcp", 80},
611+ })
612+ open, ok, timeout = receiveChange(watcher)
613+ c.Assert(timeout, Equals, false)
614+ c.Assert(ok, Equals, true)
615+ c.Assert(open, DeepEquals, []state.Port{
616+ {"tcp", 80},
617+ {"udp", 53},
618+ })
619+ open, ok, timeout = receiveChange(watcher)
620+ c.Assert(timeout, Equals, false)
621+ c.Assert(ok, Equals, true)
622+ c.Assert(open, DeepEquals, []state.Port{
623+ {"udp", 53},
624+ })
625+
626+ // No more changes.
627+ _, _, timeout = receiveChange(watcher)
628+ c.Assert(timeout, Equals, true)
629+
630+ err = watcher.Stop()
631+ c.Assert(err, IsNil)
632+}
633
634=== modified file 'state/watcher/watcher.go'
635--- state/watcher/watcher.go 2012-04-05 13:01:51 +0000
636+++ state/watcher/watcher.go 2012-04-16 15:47:21 +0000
637@@ -6,6 +6,121 @@
638 "launchpad.net/tomb"
639 )
640
641+// ExistenceChange holds information on the existence
642+// and contents of a node. Content will be empty when the
643+// node does not exist.
644+type ExistenceChange struct {
645+ Exists bool
646+ Content string
647+}
648+
649+// ExistenceWatcher observes a ZooKeeper node and delivers a
650+// notification when it is created, changed or deleted.
651+type ExistenceWatcher struct {
652+ zk *zookeeper.Conn
653+ path string
654+ tomb tomb.Tomb
655+ changeChan chan ExistenceChange
656+ existence ExistenceChange
657+}
658+
659+// NewExistenceWatcher creates a ExistenceWatcher observing
660+// the ZooKeeper node at watchedPath.
661+func NewExistenceWatcher(zk *zookeeper.Conn, watchedPath string) *ExistenceWatcher {
662+ w := &ExistenceWatcher{
663+ zk: zk,
664+ path: watchedPath,
665+ changeChan: make(chan ExistenceChange),
666+ existence: ExistenceChange{},
667+ }
668+ go w.loop()
669+ return w
670+}
671+
672+// Changes returns a channel that will receive the change if a node
673+// is created or deleted. In case or a created node it also contains
674+// the content. Note that multiple changes may be observed as a single
675+// event in the channel.
676+func (w *ExistenceWatcher) Changes() <-chan ExistenceChange {
677+ return w.changeChan
678+}
679+
680+// Dying returns a channel that is closed when the
681+// watcher has stopped or is about to stop.
682+func (w *ExistenceWatcher) Dying() <-chan struct{} {
683+ return w.tomb.Dying()
684+}
685+
686+// Stop stops the watch and returns any error encountered
687+// while watching. This method should always be called before
688+// discarding the watcher.
689+func (w *ExistenceWatcher) Stop() error {
690+ w.tomb.Kill(nil)
691+ return w.tomb.Wait()
692+}
693+
694+// loop is the backend for watching.
695+func (w *ExistenceWatcher) loop() {
696+ defer w.tomb.Done()
697+ defer close(w.changeChan)
698+
699+ watch, err := w.update()
700+ if err != nil {
701+ w.tomb.Kill(err)
702+ return
703+ }
704+
705+ for {
706+ select {
707+ case <-w.tomb.Dying():
708+ return
709+ case evt := <-watch:
710+ if !evt.Ok() {
711+ w.tomb.Killf("watcher: critical session event: %v", evt)
712+ return
713+ }
714+ watch, err = w.update()
715+ if err != nil {
716+ w.tomb.Kill(err)
717+ return
718+ }
719+ }
720+ }
721+}
722+
723+// update checks the node existence and emits changes of that state to the
724+// change channel if it has changed. It returns the next watch.
725+func (w *ExistenceWatcher) update() (nextWatch <-chan zookeeper.Event, err error) {
726+ content, _, watch, err := w.zk.GetW(w.path)
727+ switch {
728+ case err != nil && !zookeeper.IsError(err, zookeeper.ZNONODE):
729+ return nil, fmt.Errorf("watcher: can't get the content of node %q: %v", w.path, err)
730+ case err != nil:
731+ // Need a new watch to signal creation of node.
732+ _, watch, err = w.zk.ExistsW(w.path)
733+ if err != nil {
734+ return nil, fmt.Errorf("watcher: can't check the existence of node %q: %v", w.path, err)
735+ }
736+ if !w.existence.Exists {
737+ return watch, nil
738+ }
739+ w.existence.Exists = false
740+ w.existence.Content = ""
741+ default:
742+ if w.existence.Exists && content == w.existence.Content {
743+ return watch, nil
744+ }
745+ w.existence.Exists = true
746+ w.existence.Content = content
747+ }
748+ select {
749+ case <-w.tomb.Dying():
750+ return nil, tomb.ErrDying
751+ case w.changeChan <- w.existence:
752+ }
753+ return watch, nil
754+}
755+
756 // ContentWatcher observes a ZooKeeper node and delivers a
757 // notification when a content change is detected.
758 type ContentWatcher struct {
759
760=== modified file 'state/watcher/watcher_test.go'
761--- state/watcher/watcher_test.go 2012-04-05 13:01:51 +0000
762+++ state/watcher/watcher_test.go 2012-04-16 15:47:21 +0000
763@@ -39,122 +39,215 @@
764
765 s.zkConn = zk
766 s.path = "/watcher"
767-
768- _, err = s.zkConn.Create(s.path, "", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
769- c.Assert(err, IsNil)
770 }
771
772 func (s *WatcherSuite) TearDownTest(c *C) {
773- testing.ZkRemoveTree(s.zkConn, s.path)
774+ s.removePath(c)
775 s.zkConn.Close()
776 }
777
778+func (s *WatcherSuite) TestExistenceWatcher(c *C) {
779+ receiveChange := func(w *watcher.ExistenceWatcher) (*watcher.ExistenceChange, bool, bool) {
780+ select {
781+ case change, ok := <-w.Changes():
782+ return &change, ok, false
783+ case <-time.After(200 * time.Millisecond):
784+ return nil, false, true
785+ }
786+ return nil, false, false
787+ }
788+ watcher := watcher.NewExistenceWatcher(s.zkConn, s.path)
789+
790+ go func() {
791+ time.Sleep(50 * time.Millisecond)
792+ s.createPath(c, "init")
793+ time.Sleep(50 * time.Millisecond)
794+ s.changeContent(c, "foo")
795+ time.Sleep(50 * time.Millisecond)
796+ s.removePath(c)
797+ time.Sleep(50 * time.Millisecond)
798+ s.createPath(c, "done")
799+ }()
800+
801+ // Receive the four changes.
802+ change, ok, timeout := receiveChange(watcher)
803+ c.Assert(timeout, Equals, false)
804+ c.Assert(ok, Equals, true)
805+ c.Assert(change.Exists, Equals, true)
806+ c.Assert(change.Content, Equals, "init")
807+
808+ change, ok, timeout = receiveChange(watcher)
809+ c.Assert(timeout, Equals, false)
810+ c.Assert(ok, Equals, true)
811+ c.Assert(change.Exists, Equals, true)
812+ c.Assert(change.Content, Equals, "foo")
813+
814+ change, ok, timeout = receiveChange(watcher)
815+ c.Assert(timeout, Equals, false)
816+ c.Assert(ok, Equals, true)
817+ c.Assert(change.Exists, Equals, false)
818+ c.Assert(change.Content, Equals, "")
819+
820+ change, ok, timeout = receiveChange(watcher)
821+ c.Assert(timeout, Equals, false)
822+ c.Assert(ok, Equals, true)
823+ c.Assert(change.Exists, Equals, true)
824+ c.Assert(change.Content, Equals, "done")
825+
826+ // No more changes.
827+ _, _, timeout = receiveChange(watcher)
828+ c.Assert(timeout, Equals, true)
829+
830+ err := watcher.Stop()
831+ c.Assert(err, IsNil)
832+
833+ // Changes() has to be closed.
834+ _, ok, timeout = receiveChange(watcher)
835+ c.Assert(ok, Equals, false)
836+ c.Assert(timeout, Equals, false)
837+}
838+
839 func (s *WatcherSuite) TestContentWatcher(c *C) {
840+ receiveChange := func(w *watcher.ContentWatcher) (string, bool, bool) {
841+ select {
842+ case change, ok := <-w.Changes():
843+ return change, ok, false
844+ case <-time.After(200 * time.Millisecond):
845+ return "", false, true
846+ }
847+ return "", false, false
848+ }
849+ s.createPath(c, "init")
850 watcher := watcher.NewContentWatcher(s.zkConn, s.path)
851
852 go func() {
853 time.Sleep(50 * time.Millisecond)
854 s.changeContent(c, "foo")
855-
856 time.Sleep(50 * time.Millisecond)
857 s.changeContent(c, "foo")
858-
859 time.Sleep(50 * time.Millisecond)
860 s.changeContent(c, "bar")
861 }()
862
863- // Receive the two changes.
864- change := <-watcher.Changes()
865+ // Receive the three changes. First one is from creation,
866+ // the content watcher needs an existing node.
867+ change, ok, timeout := receiveChange(watcher)
868+ c.Assert(timeout, Equals, false)
869+ c.Assert(ok, Equals, true)
870+ c.Assert(change, Equals, "init")
871+
872+ change, ok, timeout = receiveChange(watcher)
873+ c.Assert(timeout, Equals, false)
874+ c.Assert(ok, Equals, true)
875 c.Assert(change, Equals, "foo")
876
877- change = <-watcher.Changes()
878+ change, ok, timeout = receiveChange(watcher)
879+ c.Assert(timeout, Equals, false)
880+ c.Assert(ok, Equals, true)
881 c.Assert(change, Equals, "bar")
882
883 // No more changes.
884- select {
885- case <-watcher.Changes():
886- c.Fatalf("no more changes expected")
887- case <-time.After(200 * time.Millisecond):
888- // The timeout is expected.
889- }
890+ _, _, timeout = receiveChange(watcher)
891+ c.Assert(timeout, Equals, true)
892
893 err := watcher.Stop()
894 c.Assert(err, IsNil)
895
896- // Changes() has to be closed
897- select {
898- case _, ok := <-watcher.Changes():
899- c.Assert(ok, Equals, false)
900- case <-time.After(200 * time.Millisecond):
901- // Timeout should not be needed.
902- c.Fatalf("timeout should not happen")
903+ // Changes() has to be closed.
904+ _, ok, timeout = receiveChange(watcher)
905+ c.Assert(ok, Equals, false)
906+ c.Assert(timeout, Equals, false)
907+}
908+
909+func (s *WatcherSuite) TestContentWatcherDeletedNode(c *C) {
910+ receiveChange := func(w *watcher.ContentWatcher) (string, bool, bool) {
911+ select {
912+ case change, ok := <-w.Changes():
913+ return change, ok, false
914+ case <-time.After(200 * time.Millisecond):
915+ return "", false, true
916+ }
917+ return "", false, false
918 }
919+ s.createPath(c, "init")
920+ watcher := watcher.NewContentWatcher(s.zkConn, s.path)
921+
922+ // Receive initiol creation event.
923+ change := <-watcher.Changes()
924+ c.Assert(change, Equals, "init")
925+
926+ go func() {
927+ time.Sleep(50 * time.Millisecond)
928+ s.removePath(c)
929+ }()
930+
931+ // Changes() has to be closed.
932+ _, ok, timeout := receiveChange(watcher)
933+ c.Assert(ok, Equals, false)
934+ c.Assert(timeout, Equals, false)
935+
936+ err := watcher.Stop()
937+ c.Assert(err, ErrorMatches, `watcher: node "/watcher" has been deleted`)
938 }
939
940 func (s *WatcherSuite) TestChildrenWatcher(c *C) {
941+ receiveChange := func(w *watcher.ChildrenWatcher) (*watcher.ChildrenChange, bool, bool) {
942+ select {
943+ case change, ok := <-w.Changes():
944+ return &change, ok, false
945+ case <-time.After(200 * time.Millisecond):
946+ return nil, false, true
947+ }
948+ return nil, false, false
949+ }
950+ s.createPath(c, "init")
951 watcher := watcher.NewChildrenWatcher(s.zkConn, s.path)
952
953 go func() {
954 time.Sleep(50 * time.Millisecond)
955 s.changeChildren(c, true, "foo")
956-
957 time.Sleep(50 * time.Millisecond)
958 s.changeChildren(c, true, "bar")
959-
960 time.Sleep(50 * time.Millisecond)
961 s.changeChildren(c, false, "foo")
962 }()
963
964 // Receive the three changes.
965- change := <-watcher.Changes()
966+ change, ok, timeout := receiveChange(watcher)
967+ c.Assert(timeout, Equals, false)
968+ c.Assert(ok, Equals, true)
969 c.Assert(change.Added, DeepEquals, []string{"foo"})
970
971- change = <-watcher.Changes()
972+ change, ok, timeout = receiveChange(watcher)
973+ c.Assert(timeout, Equals, false)
974+ c.Assert(ok, Equals, true)
975 c.Assert(change.Added, DeepEquals, []string{"bar"})
976
977- change = <-watcher.Changes()
978+ change, ok, timeout = receiveChange(watcher)
979+ c.Assert(timeout, Equals, false)
980+ c.Assert(ok, Equals, true)
981 c.Assert(change.Deleted, DeepEquals, []string{"foo"})
982
983 // No more changes.
984- select {
985- case <-watcher.Changes():
986- c.Fatalf("no more changes expected")
987- case <-time.After(time.Second):
988- // The timeout is expected.
989- }
990-
991- err := watcher.Stop()
992- c.Assert(err, IsNil)
993-
994- // Changes() has to be closed
995- select {
996- case _, ok := <-watcher.Changes():
997- c.Assert(ok, Equals, false)
998- case <-time.After(200 * time.Millisecond):
999- // Timeout should not be needed.
1000- c.Fatalf("timeout should not happen")
1001- }
1002-}
1003-
1004-func (s *WatcherSuite) TestDeletedNode(c *C) {
1005- watcher := watcher.NewContentWatcher(s.zkConn, s.path)
1006-
1007- go func() {
1008- time.Sleep(50 * time.Millisecond)
1009- testing.ZkRemoveTree(s.zkConn, s.path)
1010- }()
1011-
1012- // Changes() has to be closed
1013- select {
1014- case _, ok := <-watcher.Changes():
1015- c.Assert(ok, Equals, false)
1016- case <-time.After(200 * time.Millisecond):
1017- // Timeout should not be needed.
1018- c.Fatalf("timeout should not happen")
1019- }
1020-
1021- err := watcher.Stop()
1022- c.Assert(err, ErrorMatches, `watcher: node "/watcher" has been deleted`)
1023+ _, _, timeout = receiveChange(watcher)
1024+ c.Assert(timeout, Equals, true)
1025+
1026+ err := watcher.Stop()
1027+ c.Assert(err, IsNil)
1028+
1029+ // Changes() has to be closed.
1030+ _, ok, timeout = receiveChange(watcher)
1031+ c.Assert(ok, Equals, false)
1032+ c.Assert(timeout, Equals, false)
1033+}
1034+
1035+func (s *WatcherSuite) createPath(c *C, content string) {
1036+ _, err := s.zkConn.Create(s.path, content, 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
1037+ c.Assert(err, IsNil)
1038+}
1039+
1040+func (s *WatcherSuite) removePath(c *C) {
1041+ testing.ZkRemoveTree(s.zkConn, s.path)
1042 }
1043
1044 func (s *WatcherSuite) changeContent(c *C, content string) {

Subscribers

People subscribed via source and target branches