Merge lp:~themue/pyjuju/go-state-more-unit-watchers into lp:pyjuju/go
- go-state-more-unit-watchers
- Merge into go
Status: | Rejected |
---|---|
Rejected by: | Gustavo Niemeyer |
Proposed branch: | lp:~themue/pyjuju/go-state-more-unit-watchers |
Merge into: | lp:pyjuju/go |
Diff against target: |
1044 lines (+718/-102) 6 files modified
state/state.go (+6/-0) state/state_test.go (+1/-1) state/unit.go (+246/-16) state/watch_test.go (+191/-19) state/watcher/watcher.go (+115/-0) state/watcher/watcher_test.go (+159/-66) |
To merge this branch: | bzr merge lp:~themue/pyjuju/go-state-more-unit-watchers |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+101760@code.launchpad.net |
Commit message
Description of the change
More watchers for Unit.
Adding of NeedsUpgradeWat
for Units. They all are based on the new ExistenceWatcher for
the creation, content changing and deletion of nodes.
Roger Peppe (rogpeppe) wrote : | # |
- 113. By Frank Mueller
-
Changed after review.
Roger Peppe (rogpeppe) wrote : | # |
https:/
File state/watch_test.go (right):
https:/
state/watch_
On 2012/04/16 15:45:31, TheMue wrote:
> On 2012/04/16 11:43:01, rog wrote:
> > you could probably just Assert(ok, Equals, true) here
> > and thus simplify the interface to receiveChange.
> > That applies to most of the similar tests below - those that don't
test for
> the
> > changes channel being closed can factor out the closed test.
> >
> > another possibility is to pass expected values for timeout and
closed into the
> > function:
> >
> > receiveChange := func(w *state.
expectTimeout
> bool)
> > *state.ConfigNode {
> > etc
> > }
> Thought about it too. But then I decided for a clean seperation of the
simple
> retrievement with a possible timeout (had some due to errors while
development)
> and the assertions in the test. To me this way is more readable, even
if the
> code is bit more bloated.
fair enough.
https:/
File state/watcher/
https:/
state/watcher/
On 2012/04/16 15:45:31, TheMue wrote:
> On 2012/04/16 11:43:01, rog wrote:
> > it seems slightly odd that when a node gets deleted, we ignore what
we've just
> > been told and call GetW and then ExistsW on the node anyway.
> > i appreciate that passing the event type into update might
complicate the
> logic
> > a little, but i think it might be worth doing. something like this
perhaps?
> >
> > func (w *ExistenceWatcher) update(event int) (nextWatch <-chan
> zookeeper.Event,
> > err error
> > ) {
> > var content string
> > var watch <-chan zookeeper.Event
> > if event != zookeeper.
> > content, _, watch, err = w.zk.GetW(w.path)
> > }
> > switch {
> > case event == zookeeper.
> > zookeeper.ZNONODE):
> > call existsW
> > etc
> > }
> Hmm, ok, which event would you pass for the first call of update()? At
this
> moment you're unsure if it exists. And where does err in switch come
from?
you could pass any event that's not EVENT_DELETED - EVENT_CHANGED would
be fine.
err is defined in the same place it is currently - as a return
parameter.
Gustavo Niemeyer (niemeyer) wrote : | # |
As we discussed over IRC, can you please break that down into smaller
chunks?
https:/
File state/state.go (right):
https:/
state/state.go:273: func (w *ConfigWatcher) Dying() <-chan struct{} {
Why is this necessary?
https:/
File state/unit.go (right):
https:/
state/unit.go:316: // WatchResolved creates a watcher for the resolved
node of the unit.
There's no such thing as a "resolved node" documented in this API. We
need to document these methods in terms of what they mean semantically
for juju, rather than what they mean for the interaction of these
methods with zookeeper. For example:
// WatchResolved returns a watcher that fires when the unit
// is marked as having had its problems resolved. See
// SetResolved for details.
Please review the documentation for the other methods.
https:/
state/unit.go:480: if none && mode == ResolvedNone {
none? Does it mean acceptNone? Please fix the argument name and document
it.
Gustavo Niemeyer (niemeyer) wrote : | # |
More notes on the changes in watcher to get you going tomorrow.
https:/
File state/watcher/
https:/
state/watcher/
This isn't watching just existence.. it's watching both existence and
contents. How's that different from the ContentsWatcher, and why do we
need both? Any higher-level watcher that needs contents may be
implemented on top of a watcher that offers existence *and* contents. I
suggest dropping this type, and modifying ContentsWatcher to handle both
problems.
This should be done in an independent branch.
https:/
state/watcher/
zookeeper.ZNONODE):
This check should be inverted. We care about the case ZNONODE, so let's
be explicit about it:
case zookeeper.
...
https:/
state/watcher/
This is bogus. It is assuming that the remote state of the node is known
here, but it's not. The current state of the node is in that parameter
that is being ignored.
The overall logic must be fixed to take that into consideration.
Gustavo Niemeyer (niemeyer) wrote : | # |
For the record, this work is being split down into other smaller
branches. Please do keep these reviews in mind, though.
Unmerged revisions
- 113. By Frank Mueller
-
Changed after review.
- 112. By Frank Mueller
-
Extended watcher package by ExistenceWatcher with looks for creation,
content changes and deletions. Used it in NeedsUpgradeWatcher, ResolvedWatcher
and PortsWatcher for Units.
Preview Diff
1 | === modified file 'state/state.go' |
2 | --- state/state.go 2012-04-05 13:01:51 +0000 |
3 | +++ state/state.go 2012-04-16 15:47:21 +0000 |
4 | @@ -268,6 +268,12 @@ |
5 | return w.changeChan |
6 | } |
7 | |
8 | +// Dying returns a channel that is closed when the |
9 | +// watcher has stopped or is about to stop. |
10 | +func (w *ConfigWatcher) Dying() <-chan struct{} { |
11 | + return w.tomb.Dying() |
12 | +} |
13 | + |
14 | // Stop stops the watch and returns any error encountered |
15 | // while watching. This method should always be called |
16 | // before discarding the watcher. |
17 | |
18 | === modified file 'state/state_test.go' |
19 | --- state/state_test.go 2012-03-26 12:47:37 +0000 |
20 | +++ state/state_test.go 2012-04-16 15:47:21 +0000 |
21 | @@ -778,7 +778,7 @@ |
22 | c.Assert(err, ErrorMatches, "no unused machine found") |
23 | } |
24 | |
25 | -func (s *StateSuite) TestGetSetClearUnitUpgrate(c *C) { |
26 | +func (s *StateSuite) TestGetSetClearUnitUpgrade(c *C) { |
27 | // Check that setting and clearing an upgrade flag on a unit works. |
28 | dummy, _ := addDummyCharm(c, s.st) |
29 | wordpress, err := s.st.AddService("wordpress", dummy) |
30 | |
31 | === modified file 'state/unit.go' |
32 | --- state/unit.go 2012-03-20 20:28:54 +0000 |
33 | +++ state/unit.go 2012-04-16 15:47:21 +0000 |
34 | @@ -11,6 +11,8 @@ |
35 | "launchpad.net/gozk/zookeeper" |
36 | "launchpad.net/juju/go/charm" |
37 | "launchpad.net/juju/go/state/presence" |
38 | + "launchpad.net/juju/go/state/watcher" |
39 | + "launchpad.net/tomb" |
40 | "strconv" |
41 | "strings" |
42 | "time" |
43 | @@ -261,6 +263,11 @@ |
44 | return err |
45 | } |
46 | |
47 | +// WatchNeedsUpgrade creates a watcher for the upgrade flag of the unit. |
48 | +func (u *Unit) WatchNeedsUpgrade() *NeedsUpgradeWatcher { |
49 | + return newNeedsUpgradeWatcher(u) |
50 | +} |
51 | + |
52 | // Resolved returns the resolved mode for the unit. |
53 | func (u *Unit) Resolved() (ResolvedMode, error) { |
54 | yaml, _, err := u.st.zk.Get(u.zkResolvedPath()) |
55 | @@ -271,15 +278,7 @@ |
56 | if err != nil { |
57 | return ResolvedNone, err |
58 | } |
59 | - setting := &struct{ Retry ResolvedMode }{} |
60 | - if err = goyaml.Unmarshal([]byte(yaml), setting); err != nil { |
61 | - return ResolvedNone, err |
62 | - } |
63 | - mode := setting.Retry |
64 | - if err := validResolvedMode(mode); err != nil { |
65 | - return ResolvedNone, err |
66 | - } |
67 | - return mode, nil |
68 | + return parseResolvedMode(yaml) |
69 | } |
70 | |
71 | // SetResolved marks the unit as having had any previous state |
72 | @@ -289,7 +288,7 @@ |
73 | // reexecute previous failed hooks or to continue as if they had |
74 | // succeeded before. |
75 | func (u *Unit) SetResolved(mode ResolvedMode) error { |
76 | - if err := validResolvedMode(mode); err != nil { |
77 | + if err := validResolvedMode(mode, false); err != nil { |
78 | return err |
79 | } |
80 | setting := &struct{ Retry ResolvedMode }{mode} |
81 | @@ -314,6 +313,11 @@ |
82 | return err |
83 | } |
84 | |
85 | +// WatchResolved creates a watcher for the resolved node of the unit. |
86 | +func (u *Unit) WatchResolved() *ResolvedWatcher { |
87 | + return newResolvedWatcher(u) |
88 | +} |
89 | + |
90 | // OpenPort sets the policy of the port with protocol and number to be opened. |
91 | func (u *Unit) OpenPort(protocol string, number int) error { |
92 | openPort := func(oldYaml string, stat *zookeeper.Stat) (string, error) { |
93 | @@ -386,6 +390,11 @@ |
94 | return ports.Open, nil |
95 | } |
96 | |
97 | +// WatchPorts creates a watcher for the ports node of the unit. |
98 | +func (u *Unit) WatchPorts() *PortsWatcher { |
99 | + return newPortsWatcher(u) |
100 | +} |
101 | + |
102 | // AgentAlive returns whether the respective remote agent is alive. |
103 | func (u *Unit) AgentAlive() (bool, error) { |
104 | return presence.Alive(u.st.zk, u.zkAgentPath()) |
105 | @@ -417,11 +426,6 @@ |
106 | return fmt.Sprintf("/units/%s", u.key) |
107 | } |
108 | |
109 | -// zkPortsPath returns the ZooKeeper path for the open ports. |
110 | -func (u *Unit) zkPortsPath() string { |
111 | - return fmt.Sprintf("/units/%s/ports", u.key) |
112 | -} |
113 | - |
114 | // zkAgentPath returns the ZooKeeper path for the unit agent. |
115 | func (u *Unit) zkAgentPath() string { |
116 | return fmt.Sprintf("/units/%s/agent", u.key) |
117 | @@ -437,6 +441,11 @@ |
118 | return fmt.Sprintf("/units/%s/resolved", u.key) |
119 | } |
120 | |
121 | +// zkPortsPath returns the ZooKeeper path for the open ports. |
122 | +func (u *Unit) zkPortsPath() string { |
123 | + return fmt.Sprintf("/units/%s/ports", u.key) |
124 | +} |
125 | + |
126 | // parseUnitName parses a unit name like "wordpress/0" into |
127 | // its service name and sequence number parts. |
128 | func parseUnitName(name string) (serviceName string, seqNo int, err error) { |
129 | @@ -451,11 +460,232 @@ |
130 | return parts[0], int(sequenceNo), nil |
131 | } |
132 | |
133 | +// parseResolveMode parses a given YAML for the resolve mode |
134 | +// and checks if it's valid. |
135 | +func parseResolvedMode(yaml string) (ResolvedMode, error) { |
136 | + setting := &struct{ Retry ResolvedMode }{} |
137 | + if err := goyaml.Unmarshal([]byte(yaml), setting); err != nil { |
138 | + return ResolvedNone, err |
139 | + } |
140 | + mode := setting.Retry |
141 | + if err := validResolvedMode(mode, true); err != nil { |
142 | + return ResolvedNone, err |
143 | + } |
144 | + return mode, nil |
145 | +} |
146 | + |
147 | // validResolvedMode ensures that only valid values for the |
148 | // resolved mode are used. |
149 | -func validResolvedMode(mode ResolvedMode) error { |
150 | +func validResolvedMode(mode ResolvedMode, none bool) error { |
151 | + if none && mode == ResolvedNone { |
152 | + return nil |
153 | + } |
154 | if mode != ResolvedRetryHooks && mode != ResolvedNoHooks { |
155 | return fmt.Errorf("invalid error resolution mode: %d", mode) |
156 | } |
157 | return nil |
158 | } |
159 | + |
160 | +// NeedsUpgradeWatcher observes changes to a unit's upgrade flag. |
161 | +type NeedsUpgradeWatcher struct { |
162 | + unit *Unit |
163 | + tomb tomb.Tomb |
164 | + watcher *watcher.ExistenceWatcher |
165 | + changeChan chan bool |
166 | +} |
167 | + |
168 | +// newNeedsUpgradeWatcher creates and starts a new resolved flag node |
169 | +// watcher for the given path. |
170 | +func newNeedsUpgradeWatcher(u *Unit) *NeedsUpgradeWatcher { |
171 | + w := &NeedsUpgradeWatcher{ |
172 | + unit: u, |
173 | + changeChan: make(chan bool), |
174 | + watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkNeedsUpgradePath()), |
175 | + } |
176 | + go w.loop() |
177 | + return w |
178 | +} |
179 | + |
180 | +// Changes returns a channel that will receive the new |
181 | +// resolved mode when a change is detected. Note that multiple |
182 | +// changes may be observed as a single event in the channel. |
183 | +func (w *NeedsUpgradeWatcher) Changes() <-chan bool { |
184 | + return w.changeChan |
185 | +} |
186 | + |
187 | +// Dying returns a channel that is closed when the |
188 | +// watcher has stopped or is about to stop. |
189 | +func (w *NeedsUpgradeWatcher) Dying() <-chan struct{} { |
190 | + return w.tomb.Dying() |
191 | +} |
192 | + |
193 | +// Stop stops the watch and returns any error encountered |
194 | +// while watching. This method should always be called |
195 | +// before discarding the watcher. |
196 | +func (w *NeedsUpgradeWatcher) Stop() error { |
197 | + w.tomb.Kill(nil) |
198 | + if err := w.watcher.Stop(); err != nil { |
199 | + w.tomb.Wait() |
200 | + return err |
201 | + } |
202 | + return w.tomb.Wait() |
203 | +} |
204 | + |
205 | +// loop is the backend for watching the resolved flag node. |
206 | +func (w *NeedsUpgradeWatcher) loop() { |
207 | + defer w.tomb.Done() |
208 | + defer close(w.changeChan) |
209 | + |
210 | + for { |
211 | + select { |
212 | + case <-w.tomb.Dying(): |
213 | + return |
214 | + case change := <-w.watcher.Changes(): |
215 | + select { |
216 | + case <-w.watcher.Dying(): |
217 | + return |
218 | + case <-w.tomb.Dying(): |
219 | + return |
220 | + case w.changeChan <- change.Exists: |
221 | + } |
222 | + } |
223 | + } |
224 | +} |
225 | + |
226 | +// ResolvedWatcher observes changes to a resolved flag node. |
227 | +type ResolvedWatcher struct { |
228 | + unit *Unit |
229 | + tomb tomb.Tomb |
230 | + watcher *watcher.ExistenceWatcher |
231 | + changeChan chan ResolvedMode |
232 | +} |
233 | + |
234 | +// newResolvedWatcher creates and starts a new resolved flag node |
235 | +// watcher for the given path. |
236 | +func newResolvedWatcher(u *Unit) *ResolvedWatcher { |
237 | + w := &ResolvedWatcher{ |
238 | + unit: u, |
239 | + changeChan: make(chan ResolvedMode), |
240 | + watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkResolvedPath()), |
241 | + } |
242 | + go w.loop() |
243 | + return w |
244 | +} |
245 | + |
246 | +// Changes returns a channel that will receive the new |
247 | +// resolved mode when a change is detected. Note that multiple |
248 | +// changes may be observed as a single event in the channel. |
249 | +func (w *ResolvedWatcher) Changes() <-chan ResolvedMode { |
250 | + return w.changeChan |
251 | +} |
252 | + |
253 | +// Dying returns a channel that is closed when the |
254 | +// watcher has stopped or is about to stop. |
255 | +func (w *ResolvedWatcher) Dying() <-chan struct{} { |
256 | + return w.tomb.Dying() |
257 | +} |
258 | + |
259 | +// Stop stops the watch and returns any error encountered |
260 | +// while watching. This method should always be called |
261 | +// before discarding the watcher. |
262 | +func (w *ResolvedWatcher) Stop() error { |
263 | + w.tomb.Kill(nil) |
264 | + if err := w.watcher.Stop(); err != nil { |
265 | + w.tomb.Wait() |
266 | + return err |
267 | + } |
268 | + return w.tomb.Wait() |
269 | +} |
270 | + |
271 | +// loop is the backend for watching the resolved flag node. |
272 | +func (w *ResolvedWatcher) loop() { |
273 | + defer w.tomb.Done() |
274 | + defer close(w.changeChan) |
275 | + |
276 | + for { |
277 | + select { |
278 | + case <-w.tomb.Dying(): |
279 | + return |
280 | + case change := <-w.watcher.Changes(): |
281 | + mode, err := parseResolvedMode(change.Content) |
282 | + if err != nil { |
283 | + w.tomb.Kill(err) |
284 | + return |
285 | + } |
286 | + select { |
287 | + case <-w.watcher.Dying(): |
288 | + return |
289 | + case <-w.tomb.Dying(): |
290 | + return |
291 | + case w.changeChan <- mode: |
292 | + } |
293 | + } |
294 | + } |
295 | +} |
296 | + |
297 | +// PortsWatcher observes changes to a unit's open ports. |
298 | +type PortsWatcher struct { |
299 | + unit *Unit |
300 | + tomb tomb.Tomb |
301 | + watcher *watcher.ExistenceWatcher |
302 | + changeChan chan []Port |
303 | +} |
304 | + |
305 | +// newPortsWatcher creates and starts a new resolved flag node |
306 | +// watcher for the given path. |
307 | +func newPortsWatcher(u *Unit) *PortsWatcher { |
308 | + w := &PortsWatcher{ |
309 | + unit: u, |
310 | + changeChan: make(chan []Port), |
311 | + watcher: watcher.NewExistenceWatcher(u.st.zk, u.zkPortsPath()), |
312 | + } |
313 | + go w.loop() |
314 | + return w |
315 | +} |
316 | + |
317 | +// Changes returns a channel that will receive the new |
318 | +// ports when a change is detected. Note that multiple |
319 | +// changes may be observed as a single event in the channel. |
320 | +func (w *PortsWatcher) Changes() <-chan []Port { |
321 | + return w.changeChan |
322 | +} |
323 | + |
324 | +// Stop stops the watch and returns any error encountered |
325 | +// while watching. This method should always be called |
326 | +// before discarding the watcher. |
327 | +func (w *PortsWatcher) Stop() error { |
328 | + w.tomb.Kill(nil) |
329 | + if err := w.watcher.Stop(); err != nil { |
330 | + w.tomb.Wait() |
331 | + return err |
332 | + } |
333 | + return w.tomb.Wait() |
334 | +} |
335 | + |
336 | +// loop is the backend for watching the ports node. |
337 | +func (w *PortsWatcher) loop() { |
338 | + defer w.tomb.Done() |
339 | + defer close(w.changeChan) |
340 | + |
341 | + for { |
342 | + select { |
343 | + case <-w.tomb.Dying(): |
344 | + return |
345 | + case change := <-w.watcher.Changes(): |
346 | + var ports struct { |
347 | + Open []Port |
348 | + } |
349 | + if err := goyaml.Unmarshal([]byte(change.Content), &ports); err != nil { |
350 | + w.tomb.Kill(err) |
351 | + return |
352 | + } |
353 | + select { |
354 | + case <-w.watcher.Dying(): |
355 | + return |
356 | + case <-w.tomb.Dying(): |
357 | + return |
358 | + case w.changeChan <- ports.Open: |
359 | + } |
360 | + } |
361 | + } |
362 | +} |
363 | |
364 | === modified file 'state/watch_test.go' |
365 | --- state/watch_test.go 2012-04-05 13:01:51 +0000 |
366 | +++ state/watch_test.go 2012-04-16 15:47:21 +0000 |
367 | @@ -2,10 +2,20 @@ |
368 | |
369 | import ( |
370 | . "launchpad.net/gocheck" |
371 | + "launchpad.net/juju/go/state" |
372 | "time" |
373 | ) |
374 | |
375 | func (s *StateSuite) TestServiceWatchConfig(c *C) { |
376 | + receiveChange := func(w *state.ConfigWatcher) (*state.ConfigNode, bool, bool) { |
377 | + select { |
378 | + case change, ok := <-w.Changes(): |
379 | + return change, ok, false |
380 | + case <-time.After(100 * time.Millisecond): |
381 | + return nil, false, true |
382 | + } |
383 | + return nil, false, false |
384 | + } |
385 | dummy, _ := addDummyCharm(c, s.st) |
386 | wordpress, err := s.st.AddService("wordpress", dummy) |
387 | c.Assert(err, IsNil) |
388 | @@ -17,7 +27,9 @@ |
389 | watcher := wordpress.WatchConfig() |
390 | |
391 | // Recieve initial event after creation. |
392 | - changedConfig := <-watcher.Changes() |
393 | + changedConfig, ok, timeout := receiveChange(watcher) |
394 | + c.Assert(timeout, Equals, false) |
395 | + c.Assert(ok, Equals, true) |
396 | c.Assert(changedConfig.Keys(), HasLen, 0) |
397 | |
398 | // Two more change events. |
399 | @@ -25,20 +37,23 @@ |
400 | config.Set("baz", "yadda") |
401 | _, err = config.Write() |
402 | c.Assert(err, IsNil) |
403 | - |
404 | time.Sleep(100 * time.Millisecond) |
405 | config.Delete("foo") |
406 | _, err = config.Write() |
407 | c.Assert(err, IsNil) |
408 | |
409 | // Receive the two changes. |
410 | - changedConfig = <-watcher.Changes() |
411 | + changedConfig, ok, timeout = receiveChange(watcher) |
412 | + c.Assert(timeout, Equals, false) |
413 | + c.Assert(ok, Equals, true) |
414 | c.Assert(changedConfig.Map(), DeepEquals, map[string]interface{}{"foo": "bar", "baz": "yadda"}) |
415 | foo, found := changedConfig.Get("foo") |
416 | c.Assert(found, Equals, true) |
417 | c.Assert(foo, Equals, "bar") |
418 | |
419 | - changedConfig = <-watcher.Changes() |
420 | + changedConfig, ok, timeout = receiveChange(watcher) |
421 | + c.Assert(timeout, Equals, false) |
422 | + c.Assert(ok, Equals, true) |
423 | c.Assert(changedConfig.Map(), DeepEquals, map[string]interface{}{"baz": "yadda"}) |
424 | foo, found = changedConfig.Get("foo") |
425 | c.Assert(found, Equals, false) |
426 | @@ -47,18 +62,23 @@ |
427 | c.Assert(baz, Equals, "yadda") |
428 | |
429 | // No more changes. |
430 | - select { |
431 | - case <-watcher.Changes(): |
432 | - c.Fatalf("no more config changes expected") |
433 | - case <-time.After(200 * time.Millisecond): |
434 | - // The timeout is expected. |
435 | - } |
436 | + _, _, timeout = receiveChange(watcher) |
437 | + c.Assert(timeout, Equals, true) |
438 | |
439 | err = watcher.Stop() |
440 | c.Assert(err, IsNil) |
441 | } |
442 | |
443 | -func (s *StateSuite) TestServiceWatchConfigIllegalData(c *C) { |
444 | +func (s *StateSuite) TestServiceWatchConfigIllegalDatal(c *C) { |
445 | + receiveChange := func(w *state.ConfigWatcher) (*state.ConfigNode, bool, bool) { |
446 | + select { |
447 | + case change, ok := <-w.Changes(): |
448 | + return change, ok, false |
449 | + case <-time.After(100 * time.Millisecond): |
450 | + return nil, false, true |
451 | + } |
452 | + return nil, false, false |
453 | + } |
454 | dummy, _ := addDummyCharm(c, s.st) |
455 | wordpress, err := s.st.AddService("wordpress", dummy) |
456 | c.Assert(err, IsNil) |
457 | @@ -69,15 +89,167 @@ |
458 | _, err = s.zkConn.Set("/services/service-0000000000/config", "---", -1) |
459 | c.Assert(err, IsNil) |
460 | |
461 | - // Changes() has to be closed |
462 | - select { |
463 | - case _, ok := <-watcher.Changes(): |
464 | - c.Assert(ok, Equals, false) |
465 | - case <-time.After(200 * time.Millisecond): |
466 | - // Timeout should not be needed. |
467 | - c.Fatalf("config change channel should have been closed due to the illegal data") |
468 | - } |
469 | + // Changes() has to be closed. |
470 | + _, ok, timeout := receiveChange(watcher) |
471 | + c.Assert(ok, Equals, false) |
472 | + c.Assert(timeout, Equals, false) |
473 | |
474 | err = watcher.Stop() |
475 | c.Assert(err, ErrorMatches, "YAML error: .*") |
476 | } |
477 | + |
478 | +func (s *StateSuite) TestUnitWatchNeedsUpgrade(c *C) { |
479 | + receiveChange := func(w *state.NeedsUpgradeWatcher) (bool, bool, bool) { |
480 | + select { |
481 | + case change, ok := <-w.Changes(): |
482 | + return change, ok, false |
483 | + case <-time.After(100 * time.Millisecond): |
484 | + return false, false, true |
485 | + } |
486 | + return false, false, false |
487 | + } |
488 | + dummy, _ := addDummyCharm(c, s.st) |
489 | + wordpress, err := s.st.AddService("wordpress", dummy) |
490 | + c.Assert(err, IsNil) |
491 | + c.Assert(wordpress.Name(), Equals, "wordpress") |
492 | + unit, err := wordpress.AddUnit() |
493 | + c.Assert(err, IsNil) |
494 | + watcher := unit.WatchNeedsUpgrade() |
495 | + |
496 | + go func() { |
497 | + time.Sleep(50 * time.Millisecond) |
498 | + err = unit.SetNeedsUpgrade() |
499 | + c.Assert(err, IsNil) |
500 | + time.Sleep(50 * time.Millisecond) |
501 | + err = unit.ClearNeedsUpgrade() |
502 | + c.Assert(err, IsNil) |
503 | + }() |
504 | + |
505 | + // Receive the changes. |
506 | + upgrade, ok, timeout := receiveChange(watcher) |
507 | + c.Assert(timeout, Equals, false) |
508 | + c.Assert(ok, Equals, true) |
509 | + c.Assert(upgrade, Equals, true) |
510 | + upgrade, ok, timeout = receiveChange(watcher) |
511 | + c.Assert(ok, Equals, true) |
512 | + c.Assert(timeout, Equals, false) |
513 | + c.Assert(upgrade, Equals, false) |
514 | + |
515 | + // No more changes. |
516 | + _, _, timeout = receiveChange(watcher) |
517 | + c.Assert(timeout, Equals, true) |
518 | + |
519 | + err = watcher.Stop() |
520 | + c.Assert(err, IsNil) |
521 | +} |
522 | + |
523 | +func (s *StateSuite) TestUnitWatchResolved(c *C) { |
524 | + receiveChange := func(w *state.ResolvedWatcher) (state.ResolvedMode, bool, bool) { |
525 | + select { |
526 | + case change, ok := <-w.Changes(): |
527 | + return change, ok, false |
528 | + case <-time.After(100 * time.Millisecond): |
529 | + return state.ResolvedNoHooks, false, true |
530 | + } |
531 | + return state.ResolvedNoHooks, false, false |
532 | + } |
533 | + dummy, _ := addDummyCharm(c, s.st) |
534 | + wordpress, err := s.st.AddService("wordpress", dummy) |
535 | + c.Assert(err, IsNil) |
536 | + c.Assert(wordpress.Name(), Equals, "wordpress") |
537 | + unit, err := wordpress.AddUnit() |
538 | + c.Assert(err, IsNil) |
539 | + watcher := unit.WatchResolved() |
540 | + |
541 | + go func() { |
542 | + time.Sleep(50 * time.Millisecond) |
543 | + err = unit.SetResolved(state.ResolvedRetryHooks) |
544 | + c.Assert(err, IsNil) |
545 | + time.Sleep(50 * time.Millisecond) |
546 | + err = unit.ClearResolved() |
547 | + c.Assert(err, IsNil) |
548 | + time.Sleep(50 * time.Millisecond) |
549 | + err = unit.SetResolved(state.ResolvedNoHooks) |
550 | + c.Assert(err, IsNil) |
551 | + }() |
552 | + |
553 | + // Receive the changes. |
554 | + resolved, ok, timeout := receiveChange(watcher) |
555 | + c.Assert(timeout, Equals, false) |
556 | + c.Assert(ok, Equals, true) |
557 | + c.Assert(resolved, Equals, state.ResolvedRetryHooks) |
558 | + resolved, ok, timeout = receiveChange(watcher) |
559 | + c.Assert(timeout, Equals, false) |
560 | + c.Assert(ok, Equals, true) |
561 | + c.Assert(resolved, Equals, state.ResolvedNone) |
562 | + resolved, ok, timeout = receiveChange(watcher) |
563 | + c.Assert(timeout, Equals, false) |
564 | + c.Assert(ok, Equals, true) |
565 | + c.Assert(resolved, Equals, state.ResolvedNoHooks) |
566 | + |
567 | + // No more changes. |
568 | + _, _, timeout = receiveChange(watcher) |
569 | + c.Assert(timeout, Equals, true) |
570 | + |
571 | + err = watcher.Stop() |
572 | + c.Assert(err, IsNil) |
573 | +} |
574 | + |
575 | +func (s *StateSuite) TestUnitWatchPorts(c *C) { |
576 | + receiveChange := func(w *state.PortsWatcher) ([]state.Port, bool, bool) { |
577 | + select { |
578 | + case change, ok := <-w.Changes(): |
579 | + return change, ok, false |
580 | + case <-time.After(100 * time.Millisecond): |
581 | + return nil, false, true |
582 | + } |
583 | + return nil, false, false |
584 | + } |
585 | + dummy, _ := addDummyCharm(c, s.st) |
586 | + wordpress, err := s.st.AddService("wordpress", dummy) |
587 | + c.Assert(err, IsNil) |
588 | + c.Assert(wordpress.Name(), Equals, "wordpress") |
589 | + unit, err := wordpress.AddUnit() |
590 | + c.Assert(err, IsNil) |
591 | + watcher := unit.WatchPorts() |
592 | + |
593 | + go func() { |
594 | + time.Sleep(50 * time.Millisecond) |
595 | + err = unit.OpenPort("tcp", 80) |
596 | + c.Assert(err, IsNil) |
597 | + time.Sleep(50 * time.Millisecond) |
598 | + err = unit.OpenPort("udp", 53) |
599 | + c.Assert(err, IsNil) |
600 | + time.Sleep(50 * time.Millisecond) |
601 | + err = unit.ClosePort("tcp", 80) |
602 | + c.Assert(err, IsNil) |
603 | + }() |
604 | + |
605 | + // Receive the changes. |
606 | + open, ok, timeout := receiveChange(watcher) |
607 | + c.Assert(timeout, Equals, false) |
608 | + c.Assert(ok, Equals, true) |
609 | + c.Assert(open, DeepEquals, []state.Port{ |
610 | + {"tcp", 80}, |
611 | + }) |
612 | + open, ok, timeout = receiveChange(watcher) |
613 | + c.Assert(timeout, Equals, false) |
614 | + c.Assert(ok, Equals, true) |
615 | + c.Assert(open, DeepEquals, []state.Port{ |
616 | + {"tcp", 80}, |
617 | + {"udp", 53}, |
618 | + }) |
619 | + open, ok, timeout = receiveChange(watcher) |
620 | + c.Assert(timeout, Equals, false) |
621 | + c.Assert(ok, Equals, true) |
622 | + c.Assert(open, DeepEquals, []state.Port{ |
623 | + {"udp", 53}, |
624 | + }) |
625 | + |
626 | + // No more changes. |
627 | + _, _, timeout = receiveChange(watcher) |
628 | + c.Assert(timeout, Equals, true) |
629 | + |
630 | + err = watcher.Stop() |
631 | + c.Assert(err, IsNil) |
632 | +} |
633 | |
634 | === modified file 'state/watcher/watcher.go' |
635 | --- state/watcher/watcher.go 2012-04-05 13:01:51 +0000 |
636 | +++ state/watcher/watcher.go 2012-04-16 15:47:21 +0000 |
637 | @@ -6,6 +6,121 @@ |
638 | "launchpad.net/tomb" |
639 | ) |
640 | |
641 | +// ExistenceChange holds information on the existence |
642 | +// and contents of a node. Content will be empty when the |
643 | +// node does not exist. |
644 | +type ExistenceChange struct { |
645 | + Exists bool |
646 | + Content string |
647 | +} |
648 | + |
649 | +// ExistenceWatcher observes a ZooKeeper node and delivers a |
650 | +// notification when it is created, changed or deleted. |
651 | +type ExistenceWatcher struct { |
652 | + zk *zookeeper.Conn |
653 | + path string |
654 | + tomb tomb.Tomb |
655 | + changeChan chan ExistenceChange |
656 | + existence ExistenceChange |
657 | +} |
658 | + |
659 | +// NewExistenceWatcher creates a ExistenceWatcher observing |
660 | +// the ZooKeeper node at watchedPath. |
661 | +func NewExistenceWatcher(zk *zookeeper.Conn, watchedPath string) *ExistenceWatcher { |
662 | + w := &ExistenceWatcher{ |
663 | + zk: zk, |
664 | + path: watchedPath, |
665 | + changeChan: make(chan ExistenceChange), |
666 | + existence: ExistenceChange{}, |
667 | + } |
668 | + go w.loop() |
669 | + return w |
670 | +} |
671 | + |
672 | +// Changes returns a channel that will receive the change if a node |
673 | +// is created or deleted. In case or a created node it also contains |
674 | +// the content. Note that multiple changes may be observed as a single |
675 | +// event in the channel. |
676 | +func (w *ExistenceWatcher) Changes() <-chan ExistenceChange { |
677 | + return w.changeChan |
678 | +} |
679 | + |
680 | +// Dying returns a channel that is closed when the |
681 | +// watcher has stopped or is about to stop. |
682 | +func (w *ExistenceWatcher) Dying() <-chan struct{} { |
683 | + return w.tomb.Dying() |
684 | +} |
685 | + |
686 | +// Stop stops the watch and returns any error encountered |
687 | +// while watching. This method should always be called before |
688 | +// discarding the watcher. |
689 | +func (w *ExistenceWatcher) Stop() error { |
690 | + w.tomb.Kill(nil) |
691 | + return w.tomb.Wait() |
692 | +} |
693 | + |
694 | +// loop is the backend for watching. |
695 | +func (w *ExistenceWatcher) loop() { |
696 | + defer w.tomb.Done() |
697 | + defer close(w.changeChan) |
698 | + |
699 | + watch, err := w.update() |
700 | + if err != nil { |
701 | + w.tomb.Kill(err) |
702 | + return |
703 | + } |
704 | + |
705 | + for { |
706 | + select { |
707 | + case <-w.tomb.Dying(): |
708 | + return |
709 | + case evt := <-watch: |
710 | + if !evt.Ok() { |
711 | + w.tomb.Killf("watcher: critical session event: %v", evt) |
712 | + return |
713 | + } |
714 | + watch, err = w.update() |
715 | + if err != nil { |
716 | + w.tomb.Kill(err) |
717 | + return |
718 | + } |
719 | + } |
720 | + } |
721 | +} |
722 | + |
723 | +// update checks the node existence and emits changes of that state to the |
724 | +// change channel if it has changed. It returns the next watch. |
725 | +func (w *ExistenceWatcher) update() (nextWatch <-chan zookeeper.Event, err error) { |
726 | + content, _, watch, err := w.zk.GetW(w.path) |
727 | + switch { |
728 | + case err != nil && !zookeeper.IsError(err, zookeeper.ZNONODE): |
729 | + return nil, fmt.Errorf("watcher: can't get the content of node %q: %v", w.path, err) |
730 | + case err != nil: |
731 | + // Need a new watch to signal creation of node. |
732 | + _, watch, err = w.zk.ExistsW(w.path) |
733 | + if err != nil { |
734 | + return nil, fmt.Errorf("watcher: can't check the existence of node %q: %v", w.path, err) |
735 | + } |
736 | + if !w.existence.Exists { |
737 | + return watch, nil |
738 | + } |
739 | + w.existence.Exists = false |
740 | + w.existence.Content = "" |
741 | + default: |
742 | + if w.existence.Exists && content == w.existence.Content { |
743 | + return watch, nil |
744 | + } |
745 | + w.existence.Exists = true |
746 | + w.existence.Content = content |
747 | + } |
748 | + select { |
749 | + case <-w.tomb.Dying(): |
750 | + return nil, tomb.ErrDying |
751 | + case w.changeChan <- w.existence: |
752 | + } |
753 | + return watch, nil |
754 | +} |
755 | + |
756 | // ContentWatcher observes a ZooKeeper node and delivers a |
757 | // notification when a content change is detected. |
758 | type ContentWatcher struct { |
759 | |
760 | === modified file 'state/watcher/watcher_test.go' |
761 | --- state/watcher/watcher_test.go 2012-04-05 13:01:51 +0000 |
762 | +++ state/watcher/watcher_test.go 2012-04-16 15:47:21 +0000 |
763 | @@ -39,122 +39,215 @@ |
764 | |
765 | s.zkConn = zk |
766 | s.path = "/watcher" |
767 | - |
768 | - _, err = s.zkConn.Create(s.path, "", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
769 | - c.Assert(err, IsNil) |
770 | } |
771 | |
772 | func (s *WatcherSuite) TearDownTest(c *C) { |
773 | - testing.ZkRemoveTree(s.zkConn, s.path) |
774 | + s.removePath(c) |
775 | s.zkConn.Close() |
776 | } |
777 | |
778 | +func (s *WatcherSuite) TestExistenceWatcher(c *C) { |
779 | + receiveChange := func(w *watcher.ExistenceWatcher) (*watcher.ExistenceChange, bool, bool) { |
780 | + select { |
781 | + case change, ok := <-w.Changes(): |
782 | + return &change, ok, false |
783 | + case <-time.After(200 * time.Millisecond): |
784 | + return nil, false, true |
785 | + } |
786 | + return nil, false, false |
787 | + } |
788 | + watcher := watcher.NewExistenceWatcher(s.zkConn, s.path) |
789 | + |
790 | + go func() { |
791 | + time.Sleep(50 * time.Millisecond) |
792 | + s.createPath(c, "init") |
793 | + time.Sleep(50 * time.Millisecond) |
794 | + s.changeContent(c, "foo") |
795 | + time.Sleep(50 * time.Millisecond) |
796 | + s.removePath(c) |
797 | + time.Sleep(50 * time.Millisecond) |
798 | + s.createPath(c, "done") |
799 | + }() |
800 | + |
801 | + // Receive the four changes. |
802 | + change, ok, timeout := receiveChange(watcher) |
803 | + c.Assert(timeout, Equals, false) |
804 | + c.Assert(ok, Equals, true) |
805 | + c.Assert(change.Exists, Equals, true) |
806 | + c.Assert(change.Content, Equals, "init") |
807 | + |
808 | + change, ok, timeout = receiveChange(watcher) |
809 | + c.Assert(timeout, Equals, false) |
810 | + c.Assert(ok, Equals, true) |
811 | + c.Assert(change.Exists, Equals, true) |
812 | + c.Assert(change.Content, Equals, "foo") |
813 | + |
814 | + change, ok, timeout = receiveChange(watcher) |
815 | + c.Assert(timeout, Equals, false) |
816 | + c.Assert(ok, Equals, true) |
817 | + c.Assert(change.Exists, Equals, false) |
818 | + c.Assert(change.Content, Equals, "") |
819 | + |
820 | + change, ok, timeout = receiveChange(watcher) |
821 | + c.Assert(timeout, Equals, false) |
822 | + c.Assert(ok, Equals, true) |
823 | + c.Assert(change.Exists, Equals, true) |
824 | + c.Assert(change.Content, Equals, "done") |
825 | + |
826 | + // No more changes. |
827 | + _, _, timeout = receiveChange(watcher) |
828 | + c.Assert(timeout, Equals, true) |
829 | + |
830 | + err := watcher.Stop() |
831 | + c.Assert(err, IsNil) |
832 | + |
833 | + // Changes() has to be closed. |
834 | + _, ok, timeout = receiveChange(watcher) |
835 | + c.Assert(ok, Equals, false) |
836 | + c.Assert(timeout, Equals, false) |
837 | +} |
838 | + |
839 | func (s *WatcherSuite) TestContentWatcher(c *C) { |
840 | + receiveChange := func(w *watcher.ContentWatcher) (string, bool, bool) { |
841 | + select { |
842 | + case change, ok := <-w.Changes(): |
843 | + return change, ok, false |
844 | + case <-time.After(200 * time.Millisecond): |
845 | + return "", false, true |
846 | + } |
847 | + return "", false, false |
848 | + } |
849 | + s.createPath(c, "init") |
850 | watcher := watcher.NewContentWatcher(s.zkConn, s.path) |
851 | |
852 | go func() { |
853 | time.Sleep(50 * time.Millisecond) |
854 | s.changeContent(c, "foo") |
855 | - |
856 | time.Sleep(50 * time.Millisecond) |
857 | s.changeContent(c, "foo") |
858 | - |
859 | time.Sleep(50 * time.Millisecond) |
860 | s.changeContent(c, "bar") |
861 | }() |
862 | |
863 | - // Receive the two changes. |
864 | - change := <-watcher.Changes() |
865 | + // Receive the three changes. First one is from creation, |
866 | + // the content watcher needs an existing node. |
867 | + change, ok, timeout := receiveChange(watcher) |
868 | + c.Assert(timeout, Equals, false) |
869 | + c.Assert(ok, Equals, true) |
870 | + c.Assert(change, Equals, "init") |
871 | + |
872 | + change, ok, timeout = receiveChange(watcher) |
873 | + c.Assert(timeout, Equals, false) |
874 | + c.Assert(ok, Equals, true) |
875 | c.Assert(change, Equals, "foo") |
876 | |
877 | - change = <-watcher.Changes() |
878 | + change, ok, timeout = receiveChange(watcher) |
879 | + c.Assert(timeout, Equals, false) |
880 | + c.Assert(ok, Equals, true) |
881 | c.Assert(change, Equals, "bar") |
882 | |
883 | // No more changes. |
884 | - select { |
885 | - case <-watcher.Changes(): |
886 | - c.Fatalf("no more changes expected") |
887 | - case <-time.After(200 * time.Millisecond): |
888 | - // The timeout is expected. |
889 | - } |
890 | + _, _, timeout = receiveChange(watcher) |
891 | + c.Assert(timeout, Equals, true) |
892 | |
893 | err := watcher.Stop() |
894 | c.Assert(err, IsNil) |
895 | |
896 | - // Changes() has to be closed |
897 | - select { |
898 | - case _, ok := <-watcher.Changes(): |
899 | - c.Assert(ok, Equals, false) |
900 | - case <-time.After(200 * time.Millisecond): |
901 | - // Timeout should not be needed. |
902 | - c.Fatalf("timeout should not happen") |
903 | + // Changes() has to be closed. |
904 | + _, ok, timeout = receiveChange(watcher) |
905 | + c.Assert(ok, Equals, false) |
906 | + c.Assert(timeout, Equals, false) |
907 | +} |
908 | + |
909 | +func (s *WatcherSuite) TestContentWatcherDeletedNode(c *C) { |
910 | + receiveChange := func(w *watcher.ContentWatcher) (string, bool, bool) { |
911 | + select { |
912 | + case change, ok := <-w.Changes(): |
913 | + return change, ok, false |
914 | + case <-time.After(200 * time.Millisecond): |
915 | + return "", false, true |
916 | + } |
917 | + return "", false, false |
918 | } |
919 | + s.createPath(c, "init") |
920 | + watcher := watcher.NewContentWatcher(s.zkConn, s.path) |
921 | + |
922 | + // Receive initiol creation event. |
923 | + change := <-watcher.Changes() |
924 | + c.Assert(change, Equals, "init") |
925 | + |
926 | + go func() { |
927 | + time.Sleep(50 * time.Millisecond) |
928 | + s.removePath(c) |
929 | + }() |
930 | + |
931 | + // Changes() has to be closed. |
932 | + _, ok, timeout := receiveChange(watcher) |
933 | + c.Assert(ok, Equals, false) |
934 | + c.Assert(timeout, Equals, false) |
935 | + |
936 | + err := watcher.Stop() |
937 | + c.Assert(err, ErrorMatches, `watcher: node "/watcher" has been deleted`) |
938 | } |
939 | |
940 | func (s *WatcherSuite) TestChildrenWatcher(c *C) { |
941 | + receiveChange := func(w *watcher.ChildrenWatcher) (*watcher.ChildrenChange, bool, bool) { |
942 | + select { |
943 | + case change, ok := <-w.Changes(): |
944 | + return &change, ok, false |
945 | + case <-time.After(200 * time.Millisecond): |
946 | + return nil, false, true |
947 | + } |
948 | + return nil, false, false |
949 | + } |
950 | + s.createPath(c, "init") |
951 | watcher := watcher.NewChildrenWatcher(s.zkConn, s.path) |
952 | |
953 | go func() { |
954 | time.Sleep(50 * time.Millisecond) |
955 | s.changeChildren(c, true, "foo") |
956 | - |
957 | time.Sleep(50 * time.Millisecond) |
958 | s.changeChildren(c, true, "bar") |
959 | - |
960 | time.Sleep(50 * time.Millisecond) |
961 | s.changeChildren(c, false, "foo") |
962 | }() |
963 | |
964 | // Receive the three changes. |
965 | - change := <-watcher.Changes() |
966 | + change, ok, timeout := receiveChange(watcher) |
967 | + c.Assert(timeout, Equals, false) |
968 | + c.Assert(ok, Equals, true) |
969 | c.Assert(change.Added, DeepEquals, []string{"foo"}) |
970 | |
971 | - change = <-watcher.Changes() |
972 | + change, ok, timeout = receiveChange(watcher) |
973 | + c.Assert(timeout, Equals, false) |
974 | + c.Assert(ok, Equals, true) |
975 | c.Assert(change.Added, DeepEquals, []string{"bar"}) |
976 | |
977 | - change = <-watcher.Changes() |
978 | + change, ok, timeout = receiveChange(watcher) |
979 | + c.Assert(timeout, Equals, false) |
980 | + c.Assert(ok, Equals, true) |
981 | c.Assert(change.Deleted, DeepEquals, []string{"foo"}) |
982 | |
983 | // No more changes. |
984 | - select { |
985 | - case <-watcher.Changes(): |
986 | - c.Fatalf("no more changes expected") |
987 | - case <-time.After(time.Second): |
988 | - // The timeout is expected. |
989 | - } |
990 | - |
991 | - err := watcher.Stop() |
992 | - c.Assert(err, IsNil) |
993 | - |
994 | - // Changes() has to be closed |
995 | - select { |
996 | - case _, ok := <-watcher.Changes(): |
997 | - c.Assert(ok, Equals, false) |
998 | - case <-time.After(200 * time.Millisecond): |
999 | - // Timeout should not be needed. |
1000 | - c.Fatalf("timeout should not happen") |
1001 | - } |
1002 | -} |
1003 | - |
1004 | -func (s *WatcherSuite) TestDeletedNode(c *C) { |
1005 | - watcher := watcher.NewContentWatcher(s.zkConn, s.path) |
1006 | - |
1007 | - go func() { |
1008 | - time.Sleep(50 * time.Millisecond) |
1009 | - testing.ZkRemoveTree(s.zkConn, s.path) |
1010 | - }() |
1011 | - |
1012 | - // Changes() has to be closed |
1013 | - select { |
1014 | - case _, ok := <-watcher.Changes(): |
1015 | - c.Assert(ok, Equals, false) |
1016 | - case <-time.After(200 * time.Millisecond): |
1017 | - // Timeout should not be needed. |
1018 | - c.Fatalf("timeout should not happen") |
1019 | - } |
1020 | - |
1021 | - err := watcher.Stop() |
1022 | - c.Assert(err, ErrorMatches, `watcher: node "/watcher" has been deleted`) |
1023 | + _, _, timeout = receiveChange(watcher) |
1024 | + c.Assert(timeout, Equals, true) |
1025 | + |
1026 | + err := watcher.Stop() |
1027 | + c.Assert(err, IsNil) |
1028 | + |
1029 | + // Changes() has to be closed. |
1030 | + _, ok, timeout = receiveChange(watcher) |
1031 | + c.Assert(ok, Equals, false) |
1032 | + c.Assert(timeout, Equals, false) |
1033 | +} |
1034 | + |
1035 | +func (s *WatcherSuite) createPath(c *C, content string) { |
1036 | + _, err := s.zkConn.Create(s.path, content, 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
1037 | + c.Assert(err, IsNil) |
1038 | +} |
1039 | + |
1040 | +func (s *WatcherSuite) removePath(c *C) { |
1041 | + testing.ZkRemoveTree(s.zkConn, s.path) |
1042 | } |
1043 | |
1044 | func (s *WatcherSuite) changeContent(c *C, content string) { |
looks good.
https:/ /codereview. appspot. com/6011047/ diff/1/ state/unit. go
File state/unit.go (right):
https:/ /codereview. appspot. com/6011047/ diff/1/ state/unit. go#newcode489
state/unit.go:489: // NeedsUpgradeWatcher observes changes to a upgrade
flag node.
// NeedsUpgradeWatcher observes changes to a unit's upgrade flag.
?
https:/ /codereview. appspot. com/6011047/ diff/1/ state/unit. go#newcode531
state/unit.go:531: defer close(w.changeChan)
is this how callers are supposed to know that the upgrade watcher is
shutting down? there's no Dying method, so i guess so. if so, it should
be documented (it's different from the ExistenceWatcher way); if not
then you should a a Dying method and ensure that w.tomb is in a dying
state before closing changeChan.
https:/ /codereview. appspot. com/6011047/ diff/1/ state/unit. go#newcode614
state/unit.go:614: // PortsWatcher observes changes to a ports node.
// PortsWatcher observes changes to a unit's open ports.
https:/ /codereview. appspot. com/6011047/ diff/1/ state/unit. go#newcode656 cher.
state/unit.go:656: defer close(w.changeChan)
same applies here as to NeedsUpgradeWat
https:/ /codereview. appspot. com/6011047/ diff/1/ state/unit. go#newcode663
state/unit.go:663: ports := &struct{ Open []Port }{}
var ports struct {
Open []Port
}
would be clearer i think (and would avoid the unnecessary double
indirection).
https:/ /codereview. appspot. com/6011047/ diff/1/ state/watch_ test.go
File state/watch_test.go (right):
https:/ /codereview. appspot. com/6011047/ diff/1/ state/watch_ test.go# newcode13 test.go: 13: return change, ok, false
state/watch_
you could probably just Assert(ok, Equals, true) here
and thus simplify the interface to receiveChange.
That applies to most of the similar tests below - those that don't test
for the changes channel being closed can factor out the closed test.
another possibility is to pass expected values for timeout and closed
into the function:
receiveChange := func(w *state. ConfigWatcher, expectClosed,
expectTimeout bool) *state.ConfigNode {
etc
}
https:/ /codereview. appspot. com/6011047/ diff/1/ state/watch_ test.go# newcode14 test.go: 14: case <-time. After(time. Second) :
state/watch_
i think a second is too long here. 100 * time.Millsecond?
same applies below, let's not make testing take too long.
https:/ /codereview. appspot. com/6011047/ diff/1/ state/watcher/ watcher. go watcher. go (right):
File state/watcher/
https:/ /codereview. appspot. com/6011047/ diff/1/ state/watcher/ watcher. go#newcode10 watcher. go:10: // of a node and, if it's true, its
state/watcher/
content.
// ExistenceChange holds information on the existence
// and contents of a node. Content will be empty when the
// node does not exist.
?
https:/ /codereview. appspot. com/6011047/ diff/1/ state/watcher/ watcher. go#newcode17 watcher. go:17: // notification (including the content)
state/watcher/
when it is created, changed
s/ (including the content)//
it's implied by the rest of the sentence and the ExistenceChange type.
https:/ /codereview. appspot. com/6011047/ diff/1/ state/watcher/ watcher. go#newcode94 watcher. go:94: content, _, watch, err := w.zk.GetW(w.path)
state/watcher/
it seems slightly odd that when a node gets deleted, we ignore what
we've just...