Merge lp:~niemeyer/juju-core/presence-polishing into lp:~juju/juju-core/trunk
- presence-polishing
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 483 |
Proposed branch: | lp:~niemeyer/juju-core/presence-polishing |
Merge into: | lp:~juju/juju-core/trunk |
Prerequisite: | lp:~niemeyer/juju-core/mstate-machine-watcher |
Diff against target: |
960 lines (+319/-209) 9 files modified
mstate/machine.go (+19/-25) mstate/machine_test.go (+13/-10) mstate/open.go (+3/-3) mstate/presence/presence.go (+88/-74) mstate/presence/presence_test.go (+137/-37) mstate/state.go (+19/-19) mstate/unit.go (+23/-26) mstate/unit_test.go (+15/-10) mstate/watcher/watcher_test.go (+2/-5) |
To merge this branch: | bzr merge lp:~niemeyer/juju-core/presence-polishing |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
The Go Language Gophers | Pending | ||
Review via email: mp+123627@code.launchpad.net |
Commit message
Description of the change
mstate/presence: bring it in line with mstate/watcher
This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.
It also addresses a bug in Alive (it could return false
improperly without errors).
Gustavo Niemeyer (niemeyer) wrote : | # |
William Reade (fwereade) wrote : | # |
LGTM, only very vague suggestions :).
https:/
File mstate/machine.go (left):
https:/
mstate/
time.Duration) error {
Kinda irrelevant, but I still don't know what use case these
WaitAgentAlive methods fulfil.
https:/
File mstate/machine.go (right):
https:/
mstate/
%v: watcher is dying", m)
ErrorContextf?
https:/
File mstate/
https:/
mstate/
handle other requests.
I'm not sure I quite follow how this works. It seems that it's
specifically *only* while handling requests that pending can change --
and that therefore there's no opportunity for it to change in between a
failed loop test and the closing truncate -- but I had to think about it
for a little bit longer than I would prefer, and it has a little hint of
looking-wrong to it. Would it make sense to expand the comment a little?
https:/
File mstate/unit.go (right):
https:/
mstate/unit.go:257: return fmt.Errorf("waiting for agent of unit %q:
watcher is dying", u)
Similar comments to Machine.
Gustavo Niemeyer (niemeyer) wrote : | # |
Please take a look.
https:/
File mstate/machine.go (left):
https:/
mstate/
time.Duration) error {
On 2012/09/11 12:09:31, fwereade wrote:
> Kinda irrelevant, but I still don't know what use case these
WaitAgentAlive
> methods fulfil.
LOL.. that crossed my mind. It feels like something clever we thought of
but never used.
https:/
File mstate/machine.go (right):
https:/
mstate/
%v: watcher is dying", m)
On 2012/09/11 12:09:31, fwereade wrote:
> ErrorContextf?
Good catch, done.
I've also refactored those methods a bit to reduce them.
https:/
File mstate/
https:/
mstate/
handle other requests.
On 2012/09/11 12:09:31, fwereade wrote:
> I'm not sure I quite follow how this works. It seems that it's
specifically
> *only* while handling requests that pending can change --
Events dispatched from the syncing procedure go into pending too. I'm
happy to expand the comment, but do you have any suggestions that would
make it clarify the misunderstanding you have/had?
William Reade (fwereade) wrote : | # |
LGTM
https:/
File mstate/machine.go (right):
https:/
mstate/
%v: watcher is dying", m)
On 2012/09/11 13:27:15, niemeyer wrote:
> On 2012/09/11 12:09:31, fwereade wrote:
> > ErrorContextf?
> Good catch, done.
> I've also refactored those methods a bit to reduce them.
Very neat :)
https:/
File mstate/
https:/
mstate/
handle other requests.
On 2012/09/11 13:27:15, niemeyer wrote:
> On 2012/09/11 12:09:31, fwereade wrote:
> > I'm not sure I quite follow how this works. It seems that it's
specifically
> > *only* while handling requests that pending can change --
> Events dispatched from the syncing procedure go into pending too. I'm
happy to
> expand the comment, but do you have any suggestions that would make it
clarify
> the misunderstanding you have/had?
Everything I can think of is a more cumbersome rewording of what you
already have -- best just forget I said anything :)
Gustavo Niemeyer (niemeyer) wrote : | # |
*** Submitted:
mstate/presence: bring it in line with mstate/watcher
This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.
It also addresses a bug in Alive (it could return false
improperly without errors).
R=fwereade
CC=
https:/
Preview Diff
1 | === modified file 'mstate/machine.go' |
2 | --- mstate/machine.go 2012-09-10 15:29:01 +0000 |
3 | +++ mstate/machine.go 2012-09-11 13:29:53 +0000 |
4 | @@ -80,35 +80,29 @@ |
5 | } |
6 | |
7 | // AgentAlive returns whether the respective remote agent is alive. |
8 | -func (m *Machine) AgentAlive() bool { |
9 | - return m.st.presencew.Alive(m.globalKey()) |
10 | +func (m *Machine) AgentAlive() (bool, error) { |
11 | + return m.st.pwatcher.Alive(m.globalKey()) |
12 | } |
13 | |
14 | // WaitAgentAlive blocks until the respective agent is alive. |
15 | -func (m *Machine) WaitAgentAlive(timeout time.Duration) error { |
16 | +func (m *Machine) WaitAgentAlive(timeout time.Duration) (err error) { |
17 | + defer trivial.ErrorContextf(&err, "waiting for agent of machine %v", m) |
18 | ch := make(chan presence.Change) |
19 | - m.st.presencew.Add(m.globalKey(), ch) |
20 | - defer m.st.presencew.Remove(m.globalKey(), ch) |
21 | - // Initial check. |
22 | - select { |
23 | - case change := <-ch: |
24 | - if change.Alive { |
25 | - return nil |
26 | - } |
27 | - case <-time.After(timeout): |
28 | - return fmt.Errorf("waiting for agent of machine %v: still not alive after timeout", m) |
29 | - } |
30 | - // Hasn't been alive, so now wait for change. |
31 | - select { |
32 | - case change := <-ch: |
33 | - if change.Alive { |
34 | - return nil |
35 | - } |
36 | - panic(fmt.Sprintf("presence reported dead status twice in a row for machine %v", m)) |
37 | - case <-time.After(timeout): |
38 | - return fmt.Errorf("waiting for agent of machine %v: still not alive after timeout", m) |
39 | - } |
40 | - panic("unreachable") |
41 | + m.st.pwatcher.Watch(m.globalKey(), ch) |
42 | + defer m.st.pwatcher.Unwatch(m.globalKey(), ch) |
43 | + for i := 0; i < 2; i++ { |
44 | + select { |
45 | + case change := <-ch: |
46 | + if change.Alive { |
47 | + return nil |
48 | + } |
49 | + case <-time.After(timeout): |
50 | + return fmt.Errorf("still not alive after timeout") |
51 | + case <-m.st.pwatcher.Dying(): |
52 | + return m.st.pwatcher.Err() |
53 | + } |
54 | + } |
55 | + panic(fmt.Sprintf("presence reported dead status twice in a row for machine %v", m)) |
56 | } |
57 | |
58 | // SetAgentAlive signals that the agent for machine m is alive. |
59 | |
60 | === modified file 'mstate/machine_test.go' |
61 | --- mstate/machine_test.go 2012-09-10 15:29:01 +0000 |
62 | +++ mstate/machine_test.go 2012-09-11 13:29:53 +0000 |
63 | @@ -23,7 +23,8 @@ |
64 | } |
65 | |
66 | func (s *MachineSuite) TestMachineSetAgentAlive(c *C) { |
67 | - alive := s.machine.AgentAlive() |
68 | + alive, err := s.machine.AgentAlive() |
69 | + c.Assert(err, IsNil) |
70 | c.Assert(alive, Equals, false) |
71 | |
72 | pinger, err := s.machine.SetAgentAlive() |
73 | @@ -31,37 +32,39 @@ |
74 | c.Assert(pinger, Not(IsNil)) |
75 | defer pinger.Stop() |
76 | |
77 | - s.State.ForcePresenceRefresh() |
78 | - alive = s.machine.AgentAlive() |
79 | + s.State.Sync() |
80 | + alive, err = s.machine.AgentAlive() |
81 | + c.Assert(err, IsNil) |
82 | c.Assert(alive, Equals, true) |
83 | } |
84 | |
85 | func (s *MachineSuite) TestMachineWaitAgentAlive(c *C) { |
86 | // test -gocheck.f TestMachineWaitAgentAlive |
87 | timeout := 5 * time.Second |
88 | - alive := s.machine.AgentAlive() |
89 | + alive, err := s.machine.AgentAlive() |
90 | + c.Assert(err, IsNil) |
91 | c.Assert(alive, Equals, false) |
92 | |
93 | - s.State.ForcePresenceRefresh() |
94 | - err := s.machine.WaitAgentAlive(timeout) |
95 | + s.State.StartSync() |
96 | + err = s.machine.WaitAgentAlive(timeout) |
97 | c.Assert(err, ErrorMatches, `waiting for agent of machine 0: still not alive after timeout`) |
98 | |
99 | pinger, err := s.machine.SetAgentAlive() |
100 | c.Assert(err, IsNil) |
101 | |
102 | - s.State.ForcePresenceRefresh() |
103 | + s.State.StartSync() |
104 | err = s.machine.WaitAgentAlive(timeout) |
105 | c.Assert(err, IsNil) |
106 | |
107 | - alive = s.machine.AgentAlive() |
108 | + alive, err = s.machine.AgentAlive() |
109 | c.Assert(err, IsNil) |
110 | c.Assert(alive, Equals, true) |
111 | |
112 | err = pinger.Kill() |
113 | c.Assert(err, IsNil) |
114 | |
115 | - s.State.ForcePresenceRefresh() |
116 | - alive = s.machine.AgentAlive() |
117 | + s.State.Sync() |
118 | + alive, err = s.machine.AgentAlive() |
119 | c.Assert(err, IsNil) |
120 | c.Assert(alive, Equals, false) |
121 | } |
122 | |
123 | === modified file 'mstate/open.go' |
124 | --- mstate/open.go 2012-09-11 08:42:28 +0000 |
125 | +++ mstate/open.go 2012-09-11 13:29:53 +0000 |
126 | @@ -50,7 +50,7 @@ |
127 | st.runner = txn.NewRunner(db.C("txns")) |
128 | st.runner.ChangeLog(db.C("txns.log")) |
129 | st.watcher = watcher.New(db.C("txns.log")) |
130 | - st.presencew = presence.NewWatcher(pdb.C("presence")) |
131 | + st.pwatcher = presence.NewWatcher(pdb.C("presence")) |
132 | for _, index := range indexes { |
133 | err = st.relations.EnsureIndex(index) |
134 | if err != nil { |
135 | @@ -61,8 +61,8 @@ |
136 | } |
137 | |
138 | func (st *State) Close() error { |
139 | - err1 := st.presencew.Stop() |
140 | - err2 := st.watcher.Stop() |
141 | + err1 := st.watcher.Stop() |
142 | + err2 := st.pwatcher.Stop() |
143 | st.db.Session.Close() |
144 | for _, err := range []error{err1, err2} { |
145 | if err != nil { |
146 | |
147 | === modified file 'mstate/presence/presence.go' |
148 | --- mstate/presence/presence.go 2012-09-06 22:12:07 +0000 |
149 | +++ mstate/presence/presence.go 2012-09-11 13:29:53 +0000 |
150 | @@ -58,7 +58,7 @@ |
151 | beingKey map[int64]string |
152 | beingSeq map[string]int64 |
153 | |
154 | - // watches has the per-key observer channels from Add/Remove. |
155 | + // watches has the per-key observer channels from Watch/Unwatch. |
156 | watches map[string][]chan<- Change |
157 | |
158 | // pending contains all the events to be dispatched to the watcher |
159 | @@ -70,13 +70,12 @@ |
160 | // the the gorotuine loop. |
161 | request chan interface{} |
162 | |
163 | - // refreshed contains pending ForceRefresh done channels |
164 | - // that are waiting for the completion notice. |
165 | - refreshed []chan bool |
166 | + // syncDone contains pending done channels from sync requests. |
167 | + syncDone []chan bool |
168 | |
169 | - // next will dispatch when it's time to refresh the database |
170 | + // next will dispatch when it's time to sync the database |
171 | // knowledge. It's maintained here so that ForceRefresh |
172 | - // can manipulate it to force a refresh sooner. |
173 | + // can manipulate it to force a sync sooner. |
174 | next <-chan time.Time |
175 | } |
176 | |
177 | @@ -116,17 +115,31 @@ |
178 | return w.tomb.Wait() |
179 | } |
180 | |
181 | -type reqAdd struct { |
182 | - key string |
183 | - ch chan<- Change |
184 | -} |
185 | - |
186 | -type reqRemove struct { |
187 | - key string |
188 | - ch chan<- Change |
189 | -} |
190 | - |
191 | -type reqRefresh struct { |
192 | +// Dying returns a channel that is closed when the watcher is stopping |
193 | +// due to an error or because Stop was called explicitly. |
194 | +func (w *Watcher) Dying() <-chan struct{} { |
195 | + return w.tomb.Dying() |
196 | +} |
197 | + |
198 | +// Err returns the error with which the watcher stopped. |
199 | +// It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive |
200 | +// if the watcher is still running properly, or the respective error |
201 | +// if the watcher is terminating or has terminated with an error. |
202 | +func (w *Watcher) Err() error { |
203 | + return w.tomb.Err() |
204 | +} |
205 | + |
206 | +type reqWatch struct { |
207 | + key string |
208 | + ch chan<- Change |
209 | +} |
210 | + |
211 | +type reqUnwatch struct { |
212 | + key string |
213 | + ch chan<- Change |
214 | +} |
215 | + |
216 | +type reqSync struct { |
217 | done chan bool |
218 | } |
219 | |
220 | @@ -135,55 +148,55 @@ |
221 | result chan bool |
222 | } |
223 | |
224 | -// Add includes key into w for liveness monitoring. An event will |
225 | +func (w *Watcher) sendReq(req interface{}) { |
226 | + select { |
227 | + case w.request <- req: |
228 | + case <-w.tomb.Dying(): |
229 | + } |
230 | +} |
231 | + |
232 | +// Watch starts watching the liveness of key. An event will |
233 | // be sent onto ch to report the initial status for the key, and |
234 | // from then on a new event will be sent whenever a change is |
235 | // detected. Change values sent to the channel must be consumed, |
236 | // or the whole watcher will blocked. |
237 | -func (w *Watcher) Add(key string, ch chan<- Change) { |
238 | - select { |
239 | - case w.request <- reqAdd{key, ch}: |
240 | - case <-w.tomb.Dying(): |
241 | - } |
242 | -} |
243 | - |
244 | -// Remove removes key and ch from liveness monitoring. |
245 | -func (w *Watcher) Remove(key string, ch chan<- Change) { |
246 | - select { |
247 | - case w.request <- reqRemove{key, ch}: |
248 | - case <-w.tomb.Dying(): |
249 | - } |
250 | -} |
251 | - |
252 | -// ForceRefresh forces a synchronous refresh of the watcher knowledge. |
253 | -// It blocks until the database state has been loaded and the events |
254 | -// have been prepared, but unblocks before changes are sent onto the |
255 | -// registered channels. |
256 | -func (w *Watcher) ForceRefresh() { |
257 | +func (w *Watcher) Watch(key string, ch chan<- Change) { |
258 | + w.sendReq(reqWatch{key, ch}) |
259 | +} |
260 | + |
261 | +// Unwatch stops watching the liveness of key via ch. |
262 | +func (w *Watcher) Unwatch(key string, ch chan<- Change) { |
263 | + w.sendReq(reqUnwatch{key, ch}) |
264 | +} |
265 | + |
266 | +// StartSync forces the watcher to load new events from the database. |
267 | +func (w *Watcher) StartSync() { |
268 | + w.sendReq(reqSync{nil}) |
269 | +} |
270 | + |
271 | +// Sync forces the watcher to load new events from the database and blocks |
272 | +// until all events have been dispatched. |
273 | +func (w *Watcher) Sync() { |
274 | done := make(chan bool) |
275 | - select { |
276 | - case w.request <- reqRefresh{done}: |
277 | - case <-w.tomb.Dying(): |
278 | - } |
279 | + w.sendReq(reqSync{done}) |
280 | select { |
281 | case <-done: |
282 | case <-w.tomb.Dying(): |
283 | } |
284 | } |
285 | |
286 | -// Alive returns whether the key is currently considered alive by w. |
287 | -func (w *Watcher) Alive(key string) bool { |
288 | +// Alive returns whether the key is currently considered alive by w, |
289 | +// or an error in case the watcher is dying. |
290 | +func (w *Watcher) Alive(key string) (bool, error) { |
291 | result := make(chan bool, 1) |
292 | - select { |
293 | - case w.request <- reqAlive{key, result}: |
294 | - case <-w.tomb.Dying(): |
295 | - } |
296 | + w.sendReq(reqAlive{key, result}) |
297 | var alive bool |
298 | select { |
299 | case alive = <-result: |
300 | case <-w.tomb.Dying(): |
301 | + return false, fmt.Errorf("cannot check liveness: watcher is dying") |
302 | } |
303 | - return alive |
304 | + return alive, nil |
305 | } |
306 | |
307 | // period is the length of each time slot in seconds. |
308 | @@ -205,18 +218,19 @@ |
309 | return tomb.ErrDying |
310 | case <-w.next: |
311 | w.next = time.After(time.Duration(period) * time.Second) |
312 | - refreshed := w.refreshed |
313 | - w.refreshed = nil |
314 | - if err := w.refresh(); err != nil { |
315 | + syncDone := w.syncDone |
316 | + w.syncDone = nil |
317 | + if err := w.sync(); err != nil { |
318 | return err |
319 | } |
320 | - for _, done := range refreshed { |
321 | + w.flush() |
322 | + for _, done := range syncDone { |
323 | close(done) |
324 | } |
325 | case req := <-w.request: |
326 | w.handle(req) |
327 | + w.flush() |
328 | } |
329 | - w.flush() |
330 | } |
331 | return nil |
332 | } |
333 | @@ -224,20 +238,18 @@ |
334 | // flush sends all pending events to their respective channels. |
335 | func (w *Watcher) flush() { |
336 | // w.pending may get new requests as we handle other requests. |
337 | - i := 0 |
338 | - for i < len(w.pending) { |
339 | + for i := 0; i < len(w.pending); i++ { |
340 | e := &w.pending[i] |
341 | - if e.ch == nil { |
342 | - i++ // Removed meanwhile. |
343 | - continue |
344 | - } |
345 | - select { |
346 | - case <-w.tomb.Dying(): |
347 | - return |
348 | - case req := <-w.request: |
349 | - w.handle(req) |
350 | - case e.ch <- Change{e.key, e.alive}: |
351 | - i++ |
352 | + for e.ch != nil { |
353 | + select { |
354 | + case <-w.tomb.Dying(): |
355 | + return |
356 | + case req := <-w.request: |
357 | + w.handle(req) |
358 | + continue |
359 | + case e.ch <- Change{e.key, e.alive}: |
360 | + } |
361 | + break |
362 | } |
363 | } |
364 | w.pending = w.pending[:0] |
365 | @@ -248,10 +260,12 @@ |
366 | func (w *Watcher) handle(req interface{}) { |
367 | log.Debugf("presence: got request: %#v", req) |
368 | switch r := req.(type) { |
369 | - case reqRefresh: |
370 | + case reqSync: |
371 | w.next = time.After(0) |
372 | - w.refreshed = append(w.refreshed, r.done) |
373 | - case reqAdd: |
374 | + if r.done != nil { |
375 | + w.syncDone = append(w.syncDone, r.done) |
376 | + } |
377 | + case reqWatch: |
378 | for _, ch := range w.watches[r.key] { |
379 | if ch == r.ch { |
380 | panic("adding channel twice for same key") |
381 | @@ -260,7 +274,7 @@ |
382 | w.watches[r.key] = append(w.watches[r.key], r.ch) |
383 | _, alive := w.beingSeq[r.key] |
384 | w.pending = append(w.pending, event{r.ch, r.key, alive}) |
385 | - case reqRemove: |
386 | + case reqUnwatch: |
387 | watches := w.watches[r.key] |
388 | for i, ch := range watches { |
389 | if ch == r.ch { |
390 | @@ -294,11 +308,11 @@ |
391 | Dead map[string]int64 ",omitempty" |
392 | } |
393 | |
394 | -// refresh updates the watcher knowledge from the database, and |
395 | +// sync updates the watcher knowledge from the database, and |
396 | // queues events to observing channels. It fetches the last two time |
397 | // slots and compares the union of both to the in-memory state. |
398 | -func (w *Watcher) refresh() error { |
399 | - log.Debugf("presence: refreshing watcher knowledge from database...") |
400 | +func (w *Watcher) sync() error { |
401 | + log.Debugf("presence: synchronizing watcher knowledge with database...") |
402 | slot := timeSlot(time.Now(), w.delta) |
403 | var ping []pingInfo |
404 | err := w.pings.Find(bson.D{{"$or", []pingInfo{{Slot: slot}, {Slot: slot - period}}}}).All(&ping) |
405 | |
406 | === modified file 'mstate/presence/presence_test.go' |
407 | --- mstate/presence/presence_test.go 2012-09-06 22:12:07 +0000 |
408 | +++ mstate/presence/presence_test.go 2012-09-11 13:29:53 +0000 |
409 | @@ -5,6 +5,7 @@ |
410 | . "launchpad.net/gocheck" |
411 | "launchpad.net/juju-core/mstate/presence" |
412 | "launchpad.net/juju-core/testing" |
413 | + "launchpad.net/tomb" |
414 | "strconv" |
415 | stdtesting "testing" |
416 | "time" |
417 | @@ -71,6 +72,40 @@ |
418 | } |
419 | } |
420 | |
421 | +func assertAlive(c *C, w *presence.Watcher, key string, alive bool) { |
422 | + alive, err := w.Alive("a") |
423 | + c.Assert(err, IsNil) |
424 | + c.Assert(alive, Equals, alive) |
425 | +} |
426 | + |
427 | +func (s *PresenceSuite) TestErrAndDying(c *C) { |
428 | + w := presence.NewWatcher(s.presence) |
429 | + defer w.Stop() |
430 | + |
431 | + c.Assert(w.Err(), Equals, tomb.ErrStillAlive) |
432 | + select { |
433 | + case <-w.Dying(): |
434 | + c.Fatalf("Dying channel fired unexpectedly") |
435 | + default: |
436 | + } |
437 | + c.Assert(w.Stop(), IsNil) |
438 | + c.Assert(w.Err(), IsNil) |
439 | + select { |
440 | + case <-w.Dying(): |
441 | + default: |
442 | + c.Fatalf("Dying channel should have fired") |
443 | + } |
444 | +} |
445 | + |
446 | +func (s *PresenceSuite) TestAliveError(c *C) { |
447 | + w := presence.NewWatcher(s.presence) |
448 | + c.Assert(w.Stop(), IsNil) |
449 | + |
450 | + alive, err := w.Alive("a") |
451 | + c.Assert(err, ErrorMatches, ".*: watcher is dying") |
452 | + c.Assert(alive, Equals, false) |
453 | +} |
454 | + |
455 | func (s *PresenceSuite) TestWorkflow(c *C) { |
456 | w := presence.NewWatcher(s.presence) |
457 | pa := presence.NewPinger(s.presence, "a") |
458 | @@ -79,61 +114,61 @@ |
459 | defer pa.Stop() |
460 | defer pb.Stop() |
461 | |
462 | - c.Assert(w.Alive("a"), Equals, false) |
463 | - c.Assert(w.Alive("b"), Equals, false) |
464 | + assertAlive(c, w, "a", false) |
465 | + assertAlive(c, w, "b", false) |
466 | |
467 | // Buffer one entry to avoid blocking the watcher here. |
468 | cha := make(chan presence.Change, 1) |
469 | chb := make(chan presence.Change, 1) |
470 | - w.Add("a", cha) |
471 | - w.Add("b", chb) |
472 | + w.Watch("a", cha) |
473 | + w.Watch("b", chb) |
474 | |
475 | // Initial events with current status. |
476 | assertChange(c, cha, presence.Change{"a", false}) |
477 | assertChange(c, chb, presence.Change{"b", false}) |
478 | |
479 | - w.ForceRefresh() |
480 | + w.StartSync() |
481 | assertNoChange(c, cha) |
482 | assertNoChange(c, chb) |
483 | |
484 | c.Assert(pa.Start(), IsNil) |
485 | |
486 | - w.ForceRefresh() |
487 | + w.StartSync() |
488 | assertChange(c, cha, presence.Change{"a", true}) |
489 | assertNoChange(c, cha) |
490 | assertNoChange(c, chb) |
491 | |
492 | - c.Assert(w.Alive("a"), Equals, true) |
493 | - c.Assert(w.Alive("b"), Equals, false) |
494 | + assertAlive(c, w, "a", true) |
495 | + assertAlive(c, w, "b", false) |
496 | |
497 | // Changes while the channel is out are not observed. |
498 | - w.Remove("a", cha) |
499 | + w.Unwatch("a", cha) |
500 | assertNoChange(c, cha) |
501 | pa.Kill() |
502 | - w.ForceRefresh() |
503 | + w.Sync() |
504 | pa = presence.NewPinger(s.presence, "a") |
505 | pa.Start() |
506 | - w.ForceRefresh() |
507 | + w.StartSync() |
508 | assertNoChange(c, cha) |
509 | |
510 | // We can still query it manually, though. |
511 | - c.Assert(w.Alive("a"), Equals, true) |
512 | - c.Assert(w.Alive("b"), Equals, false) |
513 | + assertAlive(c, w, "a", true) |
514 | + assertAlive(c, w, "b", false) |
515 | |
516 | // Initial positive event. No refresh needed. |
517 | - w.Add("a", cha) |
518 | + w.Watch("a", cha) |
519 | assertChange(c, cha, presence.Change{"a", true}) |
520 | |
521 | c.Assert(pb.Start(), IsNil) |
522 | |
523 | - w.ForceRefresh() |
524 | + w.StartSync() |
525 | assertChange(c, chb, presence.Change{"b", true}) |
526 | assertNoChange(c, cha) |
527 | assertNoChange(c, chb) |
528 | |
529 | c.Assert(pa.Stop(), IsNil) |
530 | |
531 | - w.ForceRefresh() |
532 | + w.StartSync() |
533 | assertNoChange(c, cha) |
534 | assertNoChange(c, chb) |
535 | |
536 | @@ -141,7 +176,7 @@ |
537 | c.Assert(pa.Kill(), IsNil) |
538 | c.Assert(pb.Kill(), IsNil) |
539 | |
540 | - w.ForceRefresh() |
541 | + w.StartSync() |
542 | assertChange(c, cha, presence.Change{"a", false}) |
543 | assertChange(c, chb, presence.Change{"b", false}) |
544 | |
545 | @@ -172,11 +207,11 @@ |
546 | c.Logf("Checking who's still alive...") |
547 | w := presence.NewWatcher(s.presence) |
548 | defer w.Stop() |
549 | - w.ForceRefresh() |
550 | + w.Sync() |
551 | ch := make(chan presence.Change) |
552 | for i := 0; i < N; i++ { |
553 | k := strconv.Itoa(i) |
554 | - w.Add(k, ch) |
555 | + w.Watch(k, ch) |
556 | if i%2 == 0 { |
557 | assertChange(c, ch, presence.Change{k, true}) |
558 | } else { |
559 | @@ -192,26 +227,26 @@ |
560 | defer p.Stop() |
561 | |
562 | ch := make(chan presence.Change) |
563 | - w.Add("a", ch) |
564 | + w.Watch("a", ch) |
565 | assertChange(c, ch, presence.Change{"a", false}) |
566 | |
567 | c.Assert(p.Start(), IsNil) |
568 | - w.ForceRefresh() |
569 | + w.StartSync() |
570 | assertChange(c, ch, presence.Change{"a", true}) |
571 | |
572 | // Still alive in previous slot. |
573 | presence.FakeTimeSlot(1) |
574 | - w.ForceRefresh() |
575 | + w.StartSync() |
576 | assertNoChange(c, ch) |
577 | |
578 | // Two last slots are empty. |
579 | presence.FakeTimeSlot(2) |
580 | - w.ForceRefresh() |
581 | + w.StartSync() |
582 | assertChange(c, ch, presence.Change{"a", false}) |
583 | |
584 | // Already dead so killing isn't noticed. |
585 | p.Kill() |
586 | - w.ForceRefresh() |
587 | + w.StartSync() |
588 | assertNoChange(c, ch) |
589 | } |
590 | |
591 | @@ -225,7 +260,7 @@ |
592 | defer p.Stop() |
593 | |
594 | ch := make(chan presence.Change) |
595 | - w.Add("a", ch) |
596 | + w.Watch("a", ch) |
597 | assertChange(c, ch, presence.Change{"a", false}) |
598 | |
599 | // A single ping. |
600 | @@ -237,18 +272,18 @@ |
601 | assertChange(c, ch, presence.Change{"a", true}) |
602 | } |
603 | |
604 | -func (s *PresenceSuite) TestAddRemoveOnQueue(c *C) { |
605 | +func (s *PresenceSuite) TestWatchUnwatchOnQueue(c *C) { |
606 | w := presence.NewWatcher(s.presence) |
607 | ch := make(chan presence.Change) |
608 | for i := 0; i < 100; i++ { |
609 | key := strconv.Itoa(i) |
610 | c.Logf("Adding %q", key) |
611 | - w.Add(key, ch) |
612 | + w.Watch(key, ch) |
613 | } |
614 | for i := 1; i < 100; i += 2 { |
615 | key := strconv.Itoa(i) |
616 | c.Logf("Removing %q", key) |
617 | - w.Remove(key, ch) |
618 | + w.Unwatch(key, ch) |
619 | } |
620 | alive := make(map[string]bool) |
621 | for i := 0; i < 50; i++ { |
622 | @@ -288,10 +323,10 @@ |
623 | stop := false |
624 | for !stop { |
625 | w := presence.NewWatcher(s.presence) |
626 | - w.ForceRefresh() |
627 | - alive := w.Alive("a") |
628 | + w.Sync() |
629 | + alive, err := w.Alive("a") |
630 | c.Check(w.Stop(), IsNil) |
631 | - if !c.Check(alive, Equals, true) { |
632 | + if !c.Check(err, IsNil) || !c.Check(alive, Equals, true) { |
633 | break |
634 | } |
635 | select { |
636 | @@ -325,22 +360,87 @@ |
637 | // Start p1 and let it go on. |
638 | c.Assert(p1.Start(), IsNil) |
639 | |
640 | - w.ForceRefresh() |
641 | - c.Assert(w.Alive("a"), Equals, true) |
642 | + w.Sync() |
643 | + assertAlive(c, w, "a", true) |
644 | |
645 | // Start and kill p2, which will temporarily |
646 | // invalidate p1 and set the key as dead. |
647 | c.Assert(p2.Start(), IsNil) |
648 | c.Assert(p2.Kill(), IsNil) |
649 | |
650 | - w.ForceRefresh() |
651 | - c.Assert(w.Alive("a"), Equals, false) |
652 | + w.Sync() |
653 | + assertAlive(c, w, "a", false) |
654 | |
655 | // Wait for two periods, and check again. Since |
656 | // p1 is still alive, p2's death will expire and |
657 | // the key will come back. |
658 | time.Sleep(period * 2 * time.Second) |
659 | |
660 | - w.ForceRefresh() |
661 | - c.Assert(w.Alive("a"), Equals, true) |
662 | + w.Sync() |
663 | + assertAlive(c, w, "a", true) |
664 | +} |
665 | + |
666 | +func (s *PresenceSuite) TestStartSync(c *C) { |
667 | + w := presence.NewWatcher(s.presence) |
668 | + p := presence.NewPinger(s.presence, "a") |
669 | + defer w.Stop() |
670 | + defer p.Stop() |
671 | + |
672 | + ch := make(chan presence.Change) |
673 | + w.Watch("a", ch) |
674 | + assertChange(c, ch, presence.Change{"a", false}) |
675 | + |
676 | + c.Assert(p.Start(), IsNil) |
677 | + |
678 | + done := make(chan bool) |
679 | + go func() { |
680 | + w.StartSync() |
681 | + w.StartSync() |
682 | + w.StartSync() |
683 | + done <- true |
684 | + }() |
685 | + |
686 | + select { |
687 | + case <-done: |
688 | + case <-time.After(100 * time.Millisecond): |
689 | + c.Fatalf("StartSync failed to return") |
690 | + } |
691 | + |
692 | + assertChange(c, ch, presence.Change{"a", true}) |
693 | +} |
694 | + |
695 | +func (s *PresenceSuite) TestSync(c *C) { |
696 | + w := presence.NewWatcher(s.presence) |
697 | + p := presence.NewPinger(s.presence, "a") |
698 | + defer w.Stop() |
699 | + defer p.Stop() |
700 | + |
701 | + ch := make(chan presence.Change) |
702 | + w.Watch("a", ch) |
703 | + assertChange(c, ch, presence.Change{"a", false}) |
704 | + |
705 | + // Nothing to do here. |
706 | + w.Sync() |
707 | + |
708 | + c.Assert(p.Start(), IsNil) |
709 | + |
710 | + done := make(chan bool) |
711 | + go func() { |
712 | + w.Sync() |
713 | + done <- true |
714 | + }() |
715 | + |
716 | + select { |
717 | + case <-done: |
718 | + c.Fatalf("Sync returned too early") |
719 | + case <-time.After(200 * time.Millisecond): |
720 | + } |
721 | + |
722 | + assertChange(c, ch, presence.Change{"a", true}) |
723 | + |
724 | + select { |
725 | + case <-done: |
726 | + case <-time.After(100 * time.Millisecond): |
727 | + c.Fatalf("Sync failed to returned") |
728 | + } |
729 | } |
730 | |
731 | === modified file 'mstate/state.go' |
732 | --- mstate/state.go 2012-09-10 15:29:01 +0000 |
733 | +++ mstate/state.go 2012-09-11 13:29:53 +0000 |
734 | @@ -28,17 +28,17 @@ |
735 | // State represents the state of an environment |
736 | // managed by juju. |
737 | type State struct { |
738 | - db *mgo.Database |
739 | - charms *mgo.Collection |
740 | - machines *mgo.Collection |
741 | - relations *mgo.Collection |
742 | - services *mgo.Collection |
743 | - settings *mgo.Collection |
744 | - units *mgo.Collection |
745 | - presence *mgo.Collection |
746 | - runner *txn.Runner |
747 | - watcher *watcher.Watcher |
748 | - presencew *presence.Watcher |
749 | + db *mgo.Database |
750 | + charms *mgo.Collection |
751 | + machines *mgo.Collection |
752 | + relations *mgo.Collection |
753 | + services *mgo.Collection |
754 | + settings *mgo.Collection |
755 | + units *mgo.Collection |
756 | + presence *mgo.Collection |
757 | + runner *txn.Runner |
758 | + watcher *watcher.Watcher |
759 | + pwatcher *presence.Watcher |
760 | } |
761 | |
762 | func deadOnAbort(err error) error { |
763 | @@ -365,16 +365,16 @@ |
764 | return newUnit(s, &doc), nil |
765 | } |
766 | |
767 | -// ForcePresenceRefresh forces a synchronous refresh of |
768 | -// the presence watcher knowledge |
769 | -func (s *State) ForcePresenceRefresh() { |
770 | - s.presencew.ForceRefresh() |
771 | -} |
772 | - |
773 | // StartSync forces watchers to resynchronize their state with the |
774 | // database immediately. This will happen periodically automatically. |
775 | func (s *State) StartSync() { |
776 | - // TODO Make presence more like watcher, add it here, and |
777 | - // remove ForcePresenceRefresh. |
778 | s.watcher.StartSync() |
779 | + s.pwatcher.StartSync() |
780 | +} |
781 | + |
782 | +// Sync forces watchers to resynchronize their state with the |
783 | +// database immediately, and waits until all events are known. |
784 | +func (s *State) Sync() { |
785 | + s.watcher.Sync() |
786 | + s.pwatcher.Sync() |
787 | } |
788 | |
789 | === modified file 'mstate/unit.go' |
790 | --- mstate/unit.go 2012-09-06 16:54:22 +0000 |
791 | +++ mstate/unit.go 2012-09-11 13:29:53 +0000 |
792 | @@ -196,7 +196,10 @@ |
793 | case UnitStopped: |
794 | return UnitStopped, "", nil |
795 | } |
796 | - alive := u.AgentAlive() |
797 | + alive, err := u.AgentAlive() |
798 | + if err != nil { |
799 | + return "", "", err |
800 | + } |
801 | if !alive { |
802 | s = UnitDown |
803 | } |
804 | @@ -222,35 +225,29 @@ |
805 | } |
806 | |
807 | // AgentAlive returns whether the respective remote agent is alive. |
808 | -func (u *Unit) AgentAlive() bool { |
809 | - return u.st.presencew.Alive(u.globalKey()) |
810 | +func (u *Unit) AgentAlive() (bool, error) { |
811 | + return u.st.pwatcher.Alive(u.globalKey()) |
812 | } |
813 | |
814 | // WaitAgentAlive blocks until the respective agent is alive. |
815 | -func (u *Unit) WaitAgentAlive(timeout time.Duration) error { |
816 | +func (u *Unit) WaitAgentAlive(timeout time.Duration) (err error) { |
817 | + defer trivial.ErrorContextf(&err, "waiting for agent of unit %q", u) |
818 | ch := make(chan presence.Change) |
819 | - u.st.presencew.Add(u.globalKey(), ch) |
820 | - defer u.st.presencew.Remove(u.globalKey(), ch) |
821 | - // Initial check. |
822 | - select { |
823 | - case change := <-ch: |
824 | - if change.Alive { |
825 | - return nil |
826 | - } |
827 | - case <-time.After(timeout): |
828 | - return fmt.Errorf("waiting for agent of unit %q: still not alive after timeout", u) |
829 | - } |
830 | - // Hasn't been alive, so now wait for change. |
831 | - select { |
832 | - case change := <-ch: |
833 | - if change.Alive { |
834 | - return nil |
835 | - } |
836 | - panic(fmt.Sprintf("presence reported dead status twice in a row for unit %q", u)) |
837 | - case <-time.After(timeout): |
838 | - return fmt.Errorf("waiting for agent of unit %q: still not alive after timeout", u) |
839 | - } |
840 | - panic("unreachable") |
841 | + u.st.pwatcher.Watch(u.globalKey(), ch) |
842 | + defer u.st.pwatcher.Unwatch(u.globalKey(), ch) |
843 | + for i := 0; i < 2; i++ { |
844 | + select { |
845 | + case change := <-ch: |
846 | + if change.Alive { |
847 | + return nil |
848 | + } |
849 | + case <-time.After(timeout): |
850 | + return fmt.Errorf("still not alive after timeout") |
851 | + case <-u.st.pwatcher.Dying(): |
852 | + return u.st.pwatcher.Err() |
853 | + } |
854 | + } |
855 | + panic(fmt.Sprintf("presence reported dead status twice in a row for unit %q", u)) |
856 | } |
857 | |
858 | // SetAgentAlive signals that the agent for unit u is alive. |
859 | |
860 | === modified file 'mstate/unit_test.go' |
861 | --- mstate/unit_test.go 2012-09-06 17:04:54 +0000 |
862 | +++ mstate/unit_test.go 2012-09-11 13:29:53 +0000 |
863 | @@ -90,7 +90,7 @@ |
864 | c.Assert(p.Kill(), IsNil) |
865 | }() |
866 | |
867 | - s.State.ForcePresenceRefresh() |
868 | + s.State.StartSync() |
869 | status, info, err = s.unit.Status() |
870 | c.Assert(err, IsNil) |
871 | c.Assert(status, Equals, state.UnitStarted) |
872 | @@ -105,7 +105,8 @@ |
873 | } |
874 | |
875 | func (s *UnitSuite) TestUnitSetAgentAlive(c *C) { |
876 | - alive := s.unit.AgentAlive() |
877 | + alive, err := s.unit.AgentAlive() |
878 | + c.Assert(err, IsNil) |
879 | c.Assert(alive, Equals, false) |
880 | |
881 | pinger, err := s.unit.SetAgentAlive() |
882 | @@ -113,34 +114,38 @@ |
883 | c.Assert(pinger, Not(IsNil)) |
884 | defer pinger.Stop() |
885 | |
886 | - s.State.ForcePresenceRefresh() |
887 | - alive = s.unit.AgentAlive() |
888 | + s.State.Sync() |
889 | + alive, err = s.unit.AgentAlive() |
890 | + c.Assert(err, IsNil) |
891 | c.Assert(alive, Equals, true) |
892 | } |
893 | |
894 | func (s *UnitSuite) TestUnitWaitAgentAlive(c *C) { |
895 | timeout := 5 * time.Second |
896 | - alive := s.unit.AgentAlive() |
897 | + alive, err := s.unit.AgentAlive() |
898 | + c.Assert(err, IsNil) |
899 | c.Assert(alive, Equals, false) |
900 | |
901 | - err := s.unit.WaitAgentAlive(timeout) |
902 | + err = s.unit.WaitAgentAlive(timeout) |
903 | c.Assert(err, ErrorMatches, `waiting for agent of unit "wordpress/0": still not alive after timeout`) |
904 | |
905 | pinger, err := s.unit.SetAgentAlive() |
906 | c.Assert(err, IsNil) |
907 | |
908 | - s.State.ForcePresenceRefresh() |
909 | + s.State.StartSync() |
910 | err = s.unit.WaitAgentAlive(timeout) |
911 | c.Assert(err, IsNil) |
912 | |
913 | - alive = s.unit.AgentAlive() |
914 | + alive, err = s.unit.AgentAlive() |
915 | + c.Assert(err, IsNil) |
916 | c.Assert(alive, Equals, true) |
917 | |
918 | err = pinger.Kill() |
919 | c.Assert(err, IsNil) |
920 | |
921 | - s.State.ForcePresenceRefresh() |
922 | - alive = s.unit.AgentAlive() |
923 | + s.State.Sync() |
924 | + alive, err = s.unit.AgentAlive() |
925 | + c.Assert(err, IsNil) |
926 | c.Assert(alive, Equals, false) |
927 | } |
928 | |
929 | |
930 | === modified file 'mstate/watcher/watcher_test.go' |
931 | --- mstate/watcher/watcher_test.go 2012-09-10 16:19:57 +0000 |
932 | +++ mstate/watcher/watcher_test.go 2012-09-11 13:29:53 +0000 |
933 | @@ -440,9 +440,6 @@ |
934 | func (s *WatcherSuite) TestStartSync(c *C) { |
935 | s.w.Watch("test", "a", -1, s.ch) |
936 | |
937 | - // Nothing to do here. |
938 | - s.w.StartSync() |
939 | - |
940 | revno := s.insert(c, "test", "a") |
941 | |
942 | done := make(chan bool) |
943 | @@ -456,7 +453,7 @@ |
944 | select { |
945 | case <-done: |
946 | case <-time.After(100 * time.Millisecond): |
947 | - c.Fatalf("SyncStart failed to return") |
948 | + c.Fatalf("StartSync failed to return") |
949 | } |
950 | |
951 | assertChange(c, s.ch, watcher.Change{"test", "a", revno}) |
952 | @@ -479,7 +476,7 @@ |
953 | select { |
954 | case <-done: |
955 | c.Fatalf("Sync returned too early") |
956 | - case <-time.After(500 * time.Millisecond): |
957 | + case <-time.After(200 * time.Millisecond): |
958 | } |
959 | |
960 | assertChange(c, s.ch, watcher.Change{"test", "a", revno}) |
Reviewers: mp+123627_ code.launchpad. net,
Message:
Please take a look.
Description:
mstate/presence: bring it in line with mstate/watcher
This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.
It also addresses a bug in Alive (it could return false
improperly without errors).
https:/ /code.launchpad .net/~niemeyer/ juju-core/ presence- polishing/ +merge/ 123627
Requires: /code.launchpad .net/~niemeyer/ juju-core/ mstate- machine- watcher/ +merge/ 123614
https:/
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/6501114/
Affected files: machine_ test.go presence/ presence. go presence/ presence_ test.go watcher/ watcher_ test.go
A [revision details]
M mstate/machine.go
M mstate/
M mstate/open.go
M mstate/
M mstate/
M mstate/state.go
M mstate/unit.go
M mstate/unit_test.go
M mstate/