Merge lp:~niemeyer/juju-core/presence-polishing into lp:~juju/juju-core/trunk

Proposed by Gustavo Niemeyer
Status: Merged
Merged at revision: 483
Proposed branch: lp:~niemeyer/juju-core/presence-polishing
Merge into: lp:~juju/juju-core/trunk
Prerequisite: lp:~niemeyer/juju-core/mstate-machine-watcher
Diff against target: 960 lines (+319/-209)
9 files modified
mstate/machine.go (+19/-25)
mstate/machine_test.go (+13/-10)
mstate/open.go (+3/-3)
mstate/presence/presence.go (+88/-74)
mstate/presence/presence_test.go (+137/-37)
mstate/state.go (+19/-19)
mstate/unit.go (+23/-26)
mstate/unit_test.go (+15/-10)
mstate/watcher/watcher_test.go (+2/-5)
To merge this branch: bzr merge lp:~niemeyer/juju-core/presence-polishing
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+123627@code.launchpad.net

Description of the change

mstate/presence: bring it in line with mstate/watcher

This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.

It also addresses a bug in Alive (it could return false
improperly without errors).

https://codereview.appspot.com/6501114/

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Reviewers: mp+123627_code.launchpad.net,

Message:
Please take a look.

Description:
mstate/presence: bring it in line with mstate/watcher

This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.

It also addresses a bug in Alive (it could return false
improperly without errors).

https://code.launchpad.net/~niemeyer/juju-core/presence-polishing/+merge/123627

Requires:
https://code.launchpad.net/~niemeyer/juju-core/mstate-machine-watcher/+merge/123614

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/6501114/

Affected files:
   A [revision details]
   M mstate/machine.go
   M mstate/machine_test.go
   M mstate/open.go
   M mstate/presence/presence.go
   M mstate/presence/presence_test.go
   M mstate/state.go
   M mstate/unit.go
   M mstate/unit_test.go
   M mstate/watcher/watcher_test.go

Revision history for this message
William Reade (fwereade) wrote :

LGTM, only very vague suggestions :).

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (left):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#oldcode88
mstate/machine.go:88: func (m *Machine) WaitAgentAlive(timeout
time.Duration) error {
Kinda irrelevant, but I still don't know what use case these
WaitAgentAlive methods fulfil.

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#newcode114
mstate/machine.go:114: return fmt.Errorf("waiting for agent of machine
%v: watcher is dying", m)
ErrorContextf?

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go
File mstate/presence/presence.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go#newcode240
mstate/presence/presence.go:240: // w.pending may get new requests as we
handle other requests.
I'm not sure I quite follow how this works. It seems that it's
specifically *only* while handling requests that pending can change --
and that therefore there's no opportunity for it to change in between a
failed loop test and the closing truncate -- but I had to think about it
for a little bit longer than I would prefer, and it has a little hint of
looking-wrong to it. Would it make sense to expand the comment a little?

https://codereview.appspot.com/6501114/diff/1/mstate/unit.go
File mstate/unit.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/unit.go#newcode257
mstate/unit.go:257: return fmt.Errorf("waiting for agent of unit %q:
watcher is dying", u)
Similar comments to Machine.WaitAgentAlive

https://codereview.appspot.com/6501114/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Please take a look.

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (left):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#oldcode88
mstate/machine.go:88: func (m *Machine) WaitAgentAlive(timeout
time.Duration) error {
On 2012/09/11 12:09:31, fwereade wrote:
> Kinda irrelevant, but I still don't know what use case these
WaitAgentAlive
> methods fulfil.

LOL.. that crossed my mind. It feels like something clever we thought of
but never used.

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#newcode114
mstate/machine.go:114: return fmt.Errorf("waiting for agent of machine
%v: watcher is dying", m)
On 2012/09/11 12:09:31, fwereade wrote:
> ErrorContextf?

Good catch, done.

I've also refactored those methods a bit to reduce them.

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go
File mstate/presence/presence.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go#newcode240
mstate/presence/presence.go:240: // w.pending may get new requests as we
handle other requests.
On 2012/09/11 12:09:31, fwereade wrote:
> I'm not sure I quite follow how this works. It seems that it's
specifically
> *only* while handling requests that pending can change --

Events dispatched from the syncing procedure go into pending too. I'm
happy to expand the comment, but do you have any suggestions that would
make it clarify the misunderstanding you have/had?

https://codereview.appspot.com/6501114/

Revision history for this message
William Reade (fwereade) wrote :

LGTM

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#newcode114
mstate/machine.go:114: return fmt.Errorf("waiting for agent of machine
%v: watcher is dying", m)
On 2012/09/11 13:27:15, niemeyer wrote:
> On 2012/09/11 12:09:31, fwereade wrote:
> > ErrorContextf?

> Good catch, done.

> I've also refactored those methods a bit to reduce them.

Very neat :)

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go
File mstate/presence/presence.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go#newcode240
mstate/presence/presence.go:240: // w.pending may get new requests as we
handle other requests.
On 2012/09/11 13:27:15, niemeyer wrote:
> On 2012/09/11 12:09:31, fwereade wrote:
> > I'm not sure I quite follow how this works. It seems that it's
specifically
> > *only* while handling requests that pending can change --

> Events dispatched from the syncing procedure go into pending too. I'm
happy to
> expand the comment, but do you have any suggestions that would make it
clarify
> the misunderstanding you have/had?

Everything I can think of is a more cumbersome rewording of what you
already have -- best just forget I said anything :)

https://codereview.appspot.com/6501114/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

*** Submitted:

mstate/presence: bring it in line with mstate/watcher

This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.

It also addresses a bug in Alive (it could return false
improperly without errors).

R=fwereade
CC=
https://codereview.appspot.com/6501114

https://codereview.appspot.com/6501114/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'mstate/machine.go'
2--- mstate/machine.go 2012-09-10 15:29:01 +0000
3+++ mstate/machine.go 2012-09-11 13:29:53 +0000
4@@ -80,35 +80,29 @@
5 }
6
7 // AgentAlive returns whether the respective remote agent is alive.
8-func (m *Machine) AgentAlive() bool {
9- return m.st.presencew.Alive(m.globalKey())
10+func (m *Machine) AgentAlive() (bool, error) {
11+ return m.st.pwatcher.Alive(m.globalKey())
12 }
13
14 // WaitAgentAlive blocks until the respective agent is alive.
15-func (m *Machine) WaitAgentAlive(timeout time.Duration) error {
16+func (m *Machine) WaitAgentAlive(timeout time.Duration) (err error) {
17+ defer trivial.ErrorContextf(&err, "waiting for agent of machine %v", m)
18 ch := make(chan presence.Change)
19- m.st.presencew.Add(m.globalKey(), ch)
20- defer m.st.presencew.Remove(m.globalKey(), ch)
21- // Initial check.
22- select {
23- case change := <-ch:
24- if change.Alive {
25- return nil
26- }
27- case <-time.After(timeout):
28- return fmt.Errorf("waiting for agent of machine %v: still not alive after timeout", m)
29- }
30- // Hasn't been alive, so now wait for change.
31- select {
32- case change := <-ch:
33- if change.Alive {
34- return nil
35- }
36- panic(fmt.Sprintf("presence reported dead status twice in a row for machine %v", m))
37- case <-time.After(timeout):
38- return fmt.Errorf("waiting for agent of machine %v: still not alive after timeout", m)
39- }
40- panic("unreachable")
41+ m.st.pwatcher.Watch(m.globalKey(), ch)
42+ defer m.st.pwatcher.Unwatch(m.globalKey(), ch)
43+ for i := 0; i < 2; i++ {
44+ select {
45+ case change := <-ch:
46+ if change.Alive {
47+ return nil
48+ }
49+ case <-time.After(timeout):
50+ return fmt.Errorf("still not alive after timeout")
51+ case <-m.st.pwatcher.Dying():
52+ return m.st.pwatcher.Err()
53+ }
54+ }
55+ panic(fmt.Sprintf("presence reported dead status twice in a row for machine %v", m))
56 }
57
58 // SetAgentAlive signals that the agent for machine m is alive.
59
60=== modified file 'mstate/machine_test.go'
61--- mstate/machine_test.go 2012-09-10 15:29:01 +0000
62+++ mstate/machine_test.go 2012-09-11 13:29:53 +0000
63@@ -23,7 +23,8 @@
64 }
65
66 func (s *MachineSuite) TestMachineSetAgentAlive(c *C) {
67- alive := s.machine.AgentAlive()
68+ alive, err := s.machine.AgentAlive()
69+ c.Assert(err, IsNil)
70 c.Assert(alive, Equals, false)
71
72 pinger, err := s.machine.SetAgentAlive()
73@@ -31,37 +32,39 @@
74 c.Assert(pinger, Not(IsNil))
75 defer pinger.Stop()
76
77- s.State.ForcePresenceRefresh()
78- alive = s.machine.AgentAlive()
79+ s.State.Sync()
80+ alive, err = s.machine.AgentAlive()
81+ c.Assert(err, IsNil)
82 c.Assert(alive, Equals, true)
83 }
84
85 func (s *MachineSuite) TestMachineWaitAgentAlive(c *C) {
86 // test -gocheck.f TestMachineWaitAgentAlive
87 timeout := 5 * time.Second
88- alive := s.machine.AgentAlive()
89+ alive, err := s.machine.AgentAlive()
90+ c.Assert(err, IsNil)
91 c.Assert(alive, Equals, false)
92
93- s.State.ForcePresenceRefresh()
94- err := s.machine.WaitAgentAlive(timeout)
95+ s.State.StartSync()
96+ err = s.machine.WaitAgentAlive(timeout)
97 c.Assert(err, ErrorMatches, `waiting for agent of machine 0: still not alive after timeout`)
98
99 pinger, err := s.machine.SetAgentAlive()
100 c.Assert(err, IsNil)
101
102- s.State.ForcePresenceRefresh()
103+ s.State.StartSync()
104 err = s.machine.WaitAgentAlive(timeout)
105 c.Assert(err, IsNil)
106
107- alive = s.machine.AgentAlive()
108+ alive, err = s.machine.AgentAlive()
109 c.Assert(err, IsNil)
110 c.Assert(alive, Equals, true)
111
112 err = pinger.Kill()
113 c.Assert(err, IsNil)
114
115- s.State.ForcePresenceRefresh()
116- alive = s.machine.AgentAlive()
117+ s.State.Sync()
118+ alive, err = s.machine.AgentAlive()
119 c.Assert(err, IsNil)
120 c.Assert(alive, Equals, false)
121 }
122
123=== modified file 'mstate/open.go'
124--- mstate/open.go 2012-09-11 08:42:28 +0000
125+++ mstate/open.go 2012-09-11 13:29:53 +0000
126@@ -50,7 +50,7 @@
127 st.runner = txn.NewRunner(db.C("txns"))
128 st.runner.ChangeLog(db.C("txns.log"))
129 st.watcher = watcher.New(db.C("txns.log"))
130- st.presencew = presence.NewWatcher(pdb.C("presence"))
131+ st.pwatcher = presence.NewWatcher(pdb.C("presence"))
132 for _, index := range indexes {
133 err = st.relations.EnsureIndex(index)
134 if err != nil {
135@@ -61,8 +61,8 @@
136 }
137
138 func (st *State) Close() error {
139- err1 := st.presencew.Stop()
140- err2 := st.watcher.Stop()
141+ err1 := st.watcher.Stop()
142+ err2 := st.pwatcher.Stop()
143 st.db.Session.Close()
144 for _, err := range []error{err1, err2} {
145 if err != nil {
146
147=== modified file 'mstate/presence/presence.go'
148--- mstate/presence/presence.go 2012-09-06 22:12:07 +0000
149+++ mstate/presence/presence.go 2012-09-11 13:29:53 +0000
150@@ -58,7 +58,7 @@
151 beingKey map[int64]string
152 beingSeq map[string]int64
153
154- // watches has the per-key observer channels from Add/Remove.
155+ // watches has the per-key observer channels from Watch/Unwatch.
156 watches map[string][]chan<- Change
157
158 // pending contains all the events to be dispatched to the watcher
159@@ -70,13 +70,12 @@
160 // the the gorotuine loop.
161 request chan interface{}
162
163- // refreshed contains pending ForceRefresh done channels
164- // that are waiting for the completion notice.
165- refreshed []chan bool
166+ // syncDone contains pending done channels from sync requests.
167+ syncDone []chan bool
168
169- // next will dispatch when it's time to refresh the database
170+ // next will dispatch when it's time to sync the database
171 // knowledge. It's maintained here so that ForceRefresh
172- // can manipulate it to force a refresh sooner.
173+ // can manipulate it to force a sync sooner.
174 next <-chan time.Time
175 }
176
177@@ -116,17 +115,31 @@
178 return w.tomb.Wait()
179 }
180
181-type reqAdd struct {
182- key string
183- ch chan<- Change
184-}
185-
186-type reqRemove struct {
187- key string
188- ch chan<- Change
189-}
190-
191-type reqRefresh struct {
192+// Dying returns a channel that is closed when the watcher is stopping
193+// due to an error or because Stop was called explicitly.
194+func (w *Watcher) Dying() <-chan struct{} {
195+ return w.tomb.Dying()
196+}
197+
198+// Err returns the error with which the watcher stopped.
199+// It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive
200+// if the watcher is still running properly, or the respective error
201+// if the watcher is terminating or has terminated with an error.
202+func (w *Watcher) Err() error {
203+ return w.tomb.Err()
204+}
205+
206+type reqWatch struct {
207+ key string
208+ ch chan<- Change
209+}
210+
211+type reqUnwatch struct {
212+ key string
213+ ch chan<- Change
214+}
215+
216+type reqSync struct {
217 done chan bool
218 }
219
220@@ -135,55 +148,55 @@
221 result chan bool
222 }
223
224-// Add includes key into w for liveness monitoring. An event will
225+func (w *Watcher) sendReq(req interface{}) {
226+ select {
227+ case w.request <- req:
228+ case <-w.tomb.Dying():
229+ }
230+}
231+
232+// Watch starts watching the liveness of key. An event will
233 // be sent onto ch to report the initial status for the key, and
234 // from then on a new event will be sent whenever a change is
235 // detected. Change values sent to the channel must be consumed,
236 // or the whole watcher will blocked.
237-func (w *Watcher) Add(key string, ch chan<- Change) {
238- select {
239- case w.request <- reqAdd{key, ch}:
240- case <-w.tomb.Dying():
241- }
242-}
243-
244-// Remove removes key and ch from liveness monitoring.
245-func (w *Watcher) Remove(key string, ch chan<- Change) {
246- select {
247- case w.request <- reqRemove{key, ch}:
248- case <-w.tomb.Dying():
249- }
250-}
251-
252-// ForceRefresh forces a synchronous refresh of the watcher knowledge.
253-// It blocks until the database state has been loaded and the events
254-// have been prepared, but unblocks before changes are sent onto the
255-// registered channels.
256-func (w *Watcher) ForceRefresh() {
257+func (w *Watcher) Watch(key string, ch chan<- Change) {
258+ w.sendReq(reqWatch{key, ch})
259+}
260+
261+// Unwatch stops watching the liveness of key via ch.
262+func (w *Watcher) Unwatch(key string, ch chan<- Change) {
263+ w.sendReq(reqUnwatch{key, ch})
264+}
265+
266+// StartSync forces the watcher to load new events from the database.
267+func (w *Watcher) StartSync() {
268+ w.sendReq(reqSync{nil})
269+}
270+
271+// Sync forces the watcher to load new events from the database and blocks
272+// until all events have been dispatched.
273+func (w *Watcher) Sync() {
274 done := make(chan bool)
275- select {
276- case w.request <- reqRefresh{done}:
277- case <-w.tomb.Dying():
278- }
279+ w.sendReq(reqSync{done})
280 select {
281 case <-done:
282 case <-w.tomb.Dying():
283 }
284 }
285
286-// Alive returns whether the key is currently considered alive by w.
287-func (w *Watcher) Alive(key string) bool {
288+// Alive returns whether the key is currently considered alive by w,
289+// or an error in case the watcher is dying.
290+func (w *Watcher) Alive(key string) (bool, error) {
291 result := make(chan bool, 1)
292- select {
293- case w.request <- reqAlive{key, result}:
294- case <-w.tomb.Dying():
295- }
296+ w.sendReq(reqAlive{key, result})
297 var alive bool
298 select {
299 case alive = <-result:
300 case <-w.tomb.Dying():
301+ return false, fmt.Errorf("cannot check liveness: watcher is dying")
302 }
303- return alive
304+ return alive, nil
305 }
306
307 // period is the length of each time slot in seconds.
308@@ -205,18 +218,19 @@
309 return tomb.ErrDying
310 case <-w.next:
311 w.next = time.After(time.Duration(period) * time.Second)
312- refreshed := w.refreshed
313- w.refreshed = nil
314- if err := w.refresh(); err != nil {
315+ syncDone := w.syncDone
316+ w.syncDone = nil
317+ if err := w.sync(); err != nil {
318 return err
319 }
320- for _, done := range refreshed {
321+ w.flush()
322+ for _, done := range syncDone {
323 close(done)
324 }
325 case req := <-w.request:
326 w.handle(req)
327+ w.flush()
328 }
329- w.flush()
330 }
331 return nil
332 }
333@@ -224,20 +238,18 @@
334 // flush sends all pending events to their respective channels.
335 func (w *Watcher) flush() {
336 // w.pending may get new requests as we handle other requests.
337- i := 0
338- for i < len(w.pending) {
339+ for i := 0; i < len(w.pending); i++ {
340 e := &w.pending[i]
341- if e.ch == nil {
342- i++ // Removed meanwhile.
343- continue
344- }
345- select {
346- case <-w.tomb.Dying():
347- return
348- case req := <-w.request:
349- w.handle(req)
350- case e.ch <- Change{e.key, e.alive}:
351- i++
352+ for e.ch != nil {
353+ select {
354+ case <-w.tomb.Dying():
355+ return
356+ case req := <-w.request:
357+ w.handle(req)
358+ continue
359+ case e.ch <- Change{e.key, e.alive}:
360+ }
361+ break
362 }
363 }
364 w.pending = w.pending[:0]
365@@ -248,10 +260,12 @@
366 func (w *Watcher) handle(req interface{}) {
367 log.Debugf("presence: got request: %#v", req)
368 switch r := req.(type) {
369- case reqRefresh:
370+ case reqSync:
371 w.next = time.After(0)
372- w.refreshed = append(w.refreshed, r.done)
373- case reqAdd:
374+ if r.done != nil {
375+ w.syncDone = append(w.syncDone, r.done)
376+ }
377+ case reqWatch:
378 for _, ch := range w.watches[r.key] {
379 if ch == r.ch {
380 panic("adding channel twice for same key")
381@@ -260,7 +274,7 @@
382 w.watches[r.key] = append(w.watches[r.key], r.ch)
383 _, alive := w.beingSeq[r.key]
384 w.pending = append(w.pending, event{r.ch, r.key, alive})
385- case reqRemove:
386+ case reqUnwatch:
387 watches := w.watches[r.key]
388 for i, ch := range watches {
389 if ch == r.ch {
390@@ -294,11 +308,11 @@
391 Dead map[string]int64 ",omitempty"
392 }
393
394-// refresh updates the watcher knowledge from the database, and
395+// sync updates the watcher knowledge from the database, and
396 // queues events to observing channels. It fetches the last two time
397 // slots and compares the union of both to the in-memory state.
398-func (w *Watcher) refresh() error {
399- log.Debugf("presence: refreshing watcher knowledge from database...")
400+func (w *Watcher) sync() error {
401+ log.Debugf("presence: synchronizing watcher knowledge with database...")
402 slot := timeSlot(time.Now(), w.delta)
403 var ping []pingInfo
404 err := w.pings.Find(bson.D{{"$or", []pingInfo{{Slot: slot}, {Slot: slot - period}}}}).All(&ping)
405
406=== modified file 'mstate/presence/presence_test.go'
407--- mstate/presence/presence_test.go 2012-09-06 22:12:07 +0000
408+++ mstate/presence/presence_test.go 2012-09-11 13:29:53 +0000
409@@ -5,6 +5,7 @@
410 . "launchpad.net/gocheck"
411 "launchpad.net/juju-core/mstate/presence"
412 "launchpad.net/juju-core/testing"
413+ "launchpad.net/tomb"
414 "strconv"
415 stdtesting "testing"
416 "time"
417@@ -71,6 +72,40 @@
418 }
419 }
420
421+func assertAlive(c *C, w *presence.Watcher, key string, alive bool) {
422+ alive, err := w.Alive("a")
423+ c.Assert(err, IsNil)
424+ c.Assert(alive, Equals, alive)
425+}
426+
427+func (s *PresenceSuite) TestErrAndDying(c *C) {
428+ w := presence.NewWatcher(s.presence)
429+ defer w.Stop()
430+
431+ c.Assert(w.Err(), Equals, tomb.ErrStillAlive)
432+ select {
433+ case <-w.Dying():
434+ c.Fatalf("Dying channel fired unexpectedly")
435+ default:
436+ }
437+ c.Assert(w.Stop(), IsNil)
438+ c.Assert(w.Err(), IsNil)
439+ select {
440+ case <-w.Dying():
441+ default:
442+ c.Fatalf("Dying channel should have fired")
443+ }
444+}
445+
446+func (s *PresenceSuite) TestAliveError(c *C) {
447+ w := presence.NewWatcher(s.presence)
448+ c.Assert(w.Stop(), IsNil)
449+
450+ alive, err := w.Alive("a")
451+ c.Assert(err, ErrorMatches, ".*: watcher is dying")
452+ c.Assert(alive, Equals, false)
453+}
454+
455 func (s *PresenceSuite) TestWorkflow(c *C) {
456 w := presence.NewWatcher(s.presence)
457 pa := presence.NewPinger(s.presence, "a")
458@@ -79,61 +114,61 @@
459 defer pa.Stop()
460 defer pb.Stop()
461
462- c.Assert(w.Alive("a"), Equals, false)
463- c.Assert(w.Alive("b"), Equals, false)
464+ assertAlive(c, w, "a", false)
465+ assertAlive(c, w, "b", false)
466
467 // Buffer one entry to avoid blocking the watcher here.
468 cha := make(chan presence.Change, 1)
469 chb := make(chan presence.Change, 1)
470- w.Add("a", cha)
471- w.Add("b", chb)
472+ w.Watch("a", cha)
473+ w.Watch("b", chb)
474
475 // Initial events with current status.
476 assertChange(c, cha, presence.Change{"a", false})
477 assertChange(c, chb, presence.Change{"b", false})
478
479- w.ForceRefresh()
480+ w.StartSync()
481 assertNoChange(c, cha)
482 assertNoChange(c, chb)
483
484 c.Assert(pa.Start(), IsNil)
485
486- w.ForceRefresh()
487+ w.StartSync()
488 assertChange(c, cha, presence.Change{"a", true})
489 assertNoChange(c, cha)
490 assertNoChange(c, chb)
491
492- c.Assert(w.Alive("a"), Equals, true)
493- c.Assert(w.Alive("b"), Equals, false)
494+ assertAlive(c, w, "a", true)
495+ assertAlive(c, w, "b", false)
496
497 // Changes while the channel is out are not observed.
498- w.Remove("a", cha)
499+ w.Unwatch("a", cha)
500 assertNoChange(c, cha)
501 pa.Kill()
502- w.ForceRefresh()
503+ w.Sync()
504 pa = presence.NewPinger(s.presence, "a")
505 pa.Start()
506- w.ForceRefresh()
507+ w.StartSync()
508 assertNoChange(c, cha)
509
510 // We can still query it manually, though.
511- c.Assert(w.Alive("a"), Equals, true)
512- c.Assert(w.Alive("b"), Equals, false)
513+ assertAlive(c, w, "a", true)
514+ assertAlive(c, w, "b", false)
515
516 // Initial positive event. No refresh needed.
517- w.Add("a", cha)
518+ w.Watch("a", cha)
519 assertChange(c, cha, presence.Change{"a", true})
520
521 c.Assert(pb.Start(), IsNil)
522
523- w.ForceRefresh()
524+ w.StartSync()
525 assertChange(c, chb, presence.Change{"b", true})
526 assertNoChange(c, cha)
527 assertNoChange(c, chb)
528
529 c.Assert(pa.Stop(), IsNil)
530
531- w.ForceRefresh()
532+ w.StartSync()
533 assertNoChange(c, cha)
534 assertNoChange(c, chb)
535
536@@ -141,7 +176,7 @@
537 c.Assert(pa.Kill(), IsNil)
538 c.Assert(pb.Kill(), IsNil)
539
540- w.ForceRefresh()
541+ w.StartSync()
542 assertChange(c, cha, presence.Change{"a", false})
543 assertChange(c, chb, presence.Change{"b", false})
544
545@@ -172,11 +207,11 @@
546 c.Logf("Checking who's still alive...")
547 w := presence.NewWatcher(s.presence)
548 defer w.Stop()
549- w.ForceRefresh()
550+ w.Sync()
551 ch := make(chan presence.Change)
552 for i := 0; i < N; i++ {
553 k := strconv.Itoa(i)
554- w.Add(k, ch)
555+ w.Watch(k, ch)
556 if i%2 == 0 {
557 assertChange(c, ch, presence.Change{k, true})
558 } else {
559@@ -192,26 +227,26 @@
560 defer p.Stop()
561
562 ch := make(chan presence.Change)
563- w.Add("a", ch)
564+ w.Watch("a", ch)
565 assertChange(c, ch, presence.Change{"a", false})
566
567 c.Assert(p.Start(), IsNil)
568- w.ForceRefresh()
569+ w.StartSync()
570 assertChange(c, ch, presence.Change{"a", true})
571
572 // Still alive in previous slot.
573 presence.FakeTimeSlot(1)
574- w.ForceRefresh()
575+ w.StartSync()
576 assertNoChange(c, ch)
577
578 // Two last slots are empty.
579 presence.FakeTimeSlot(2)
580- w.ForceRefresh()
581+ w.StartSync()
582 assertChange(c, ch, presence.Change{"a", false})
583
584 // Already dead so killing isn't noticed.
585 p.Kill()
586- w.ForceRefresh()
587+ w.StartSync()
588 assertNoChange(c, ch)
589 }
590
591@@ -225,7 +260,7 @@
592 defer p.Stop()
593
594 ch := make(chan presence.Change)
595- w.Add("a", ch)
596+ w.Watch("a", ch)
597 assertChange(c, ch, presence.Change{"a", false})
598
599 // A single ping.
600@@ -237,18 +272,18 @@
601 assertChange(c, ch, presence.Change{"a", true})
602 }
603
604-func (s *PresenceSuite) TestAddRemoveOnQueue(c *C) {
605+func (s *PresenceSuite) TestWatchUnwatchOnQueue(c *C) {
606 w := presence.NewWatcher(s.presence)
607 ch := make(chan presence.Change)
608 for i := 0; i < 100; i++ {
609 key := strconv.Itoa(i)
610 c.Logf("Adding %q", key)
611- w.Add(key, ch)
612+ w.Watch(key, ch)
613 }
614 for i := 1; i < 100; i += 2 {
615 key := strconv.Itoa(i)
616 c.Logf("Removing %q", key)
617- w.Remove(key, ch)
618+ w.Unwatch(key, ch)
619 }
620 alive := make(map[string]bool)
621 for i := 0; i < 50; i++ {
622@@ -288,10 +323,10 @@
623 stop := false
624 for !stop {
625 w := presence.NewWatcher(s.presence)
626- w.ForceRefresh()
627- alive := w.Alive("a")
628+ w.Sync()
629+ alive, err := w.Alive("a")
630 c.Check(w.Stop(), IsNil)
631- if !c.Check(alive, Equals, true) {
632+ if !c.Check(err, IsNil) || !c.Check(alive, Equals, true) {
633 break
634 }
635 select {
636@@ -325,22 +360,87 @@
637 // Start p1 and let it go on.
638 c.Assert(p1.Start(), IsNil)
639
640- w.ForceRefresh()
641- c.Assert(w.Alive("a"), Equals, true)
642+ w.Sync()
643+ assertAlive(c, w, "a", true)
644
645 // Start and kill p2, which will temporarily
646 // invalidate p1 and set the key as dead.
647 c.Assert(p2.Start(), IsNil)
648 c.Assert(p2.Kill(), IsNil)
649
650- w.ForceRefresh()
651- c.Assert(w.Alive("a"), Equals, false)
652+ w.Sync()
653+ assertAlive(c, w, "a", false)
654
655 // Wait for two periods, and check again. Since
656 // p1 is still alive, p2's death will expire and
657 // the key will come back.
658 time.Sleep(period * 2 * time.Second)
659
660- w.ForceRefresh()
661- c.Assert(w.Alive("a"), Equals, true)
662+ w.Sync()
663+ assertAlive(c, w, "a", true)
664+}
665+
666+func (s *PresenceSuite) TestStartSync(c *C) {
667+ w := presence.NewWatcher(s.presence)
668+ p := presence.NewPinger(s.presence, "a")
669+ defer w.Stop()
670+ defer p.Stop()
671+
672+ ch := make(chan presence.Change)
673+ w.Watch("a", ch)
674+ assertChange(c, ch, presence.Change{"a", false})
675+
676+ c.Assert(p.Start(), IsNil)
677+
678+ done := make(chan bool)
679+ go func() {
680+ w.StartSync()
681+ w.StartSync()
682+ w.StartSync()
683+ done <- true
684+ }()
685+
686+ select {
687+ case <-done:
688+ case <-time.After(100 * time.Millisecond):
689+ c.Fatalf("StartSync failed to return")
690+ }
691+
692+ assertChange(c, ch, presence.Change{"a", true})
693+}
694+
695+func (s *PresenceSuite) TestSync(c *C) {
696+ w := presence.NewWatcher(s.presence)
697+ p := presence.NewPinger(s.presence, "a")
698+ defer w.Stop()
699+ defer p.Stop()
700+
701+ ch := make(chan presence.Change)
702+ w.Watch("a", ch)
703+ assertChange(c, ch, presence.Change{"a", false})
704+
705+ // Nothing to do here.
706+ w.Sync()
707+
708+ c.Assert(p.Start(), IsNil)
709+
710+ done := make(chan bool)
711+ go func() {
712+ w.Sync()
713+ done <- true
714+ }()
715+
716+ select {
717+ case <-done:
718+ c.Fatalf("Sync returned too early")
719+ case <-time.After(200 * time.Millisecond):
720+ }
721+
722+ assertChange(c, ch, presence.Change{"a", true})
723+
724+ select {
725+ case <-done:
726+ case <-time.After(100 * time.Millisecond):
727+ c.Fatalf("Sync failed to returned")
728+ }
729 }
730
731=== modified file 'mstate/state.go'
732--- mstate/state.go 2012-09-10 15:29:01 +0000
733+++ mstate/state.go 2012-09-11 13:29:53 +0000
734@@ -28,17 +28,17 @@
735 // State represents the state of an environment
736 // managed by juju.
737 type State struct {
738- db *mgo.Database
739- charms *mgo.Collection
740- machines *mgo.Collection
741- relations *mgo.Collection
742- services *mgo.Collection
743- settings *mgo.Collection
744- units *mgo.Collection
745- presence *mgo.Collection
746- runner *txn.Runner
747- watcher *watcher.Watcher
748- presencew *presence.Watcher
749+ db *mgo.Database
750+ charms *mgo.Collection
751+ machines *mgo.Collection
752+ relations *mgo.Collection
753+ services *mgo.Collection
754+ settings *mgo.Collection
755+ units *mgo.Collection
756+ presence *mgo.Collection
757+ runner *txn.Runner
758+ watcher *watcher.Watcher
759+ pwatcher *presence.Watcher
760 }
761
762 func deadOnAbort(err error) error {
763@@ -365,16 +365,16 @@
764 return newUnit(s, &doc), nil
765 }
766
767-// ForcePresenceRefresh forces a synchronous refresh of
768-// the presence watcher knowledge
769-func (s *State) ForcePresenceRefresh() {
770- s.presencew.ForceRefresh()
771-}
772-
773 // StartSync forces watchers to resynchronize their state with the
774 // database immediately. This will happen periodically automatically.
775 func (s *State) StartSync() {
776- // TODO Make presence more like watcher, add it here, and
777- // remove ForcePresenceRefresh.
778 s.watcher.StartSync()
779+ s.pwatcher.StartSync()
780+}
781+
782+// Sync forces watchers to resynchronize their state with the
783+// database immediately, and waits until all events are known.
784+func (s *State) Sync() {
785+ s.watcher.Sync()
786+ s.pwatcher.Sync()
787 }
788
789=== modified file 'mstate/unit.go'
790--- mstate/unit.go 2012-09-06 16:54:22 +0000
791+++ mstate/unit.go 2012-09-11 13:29:53 +0000
792@@ -196,7 +196,10 @@
793 case UnitStopped:
794 return UnitStopped, "", nil
795 }
796- alive := u.AgentAlive()
797+ alive, err := u.AgentAlive()
798+ if err != nil {
799+ return "", "", err
800+ }
801 if !alive {
802 s = UnitDown
803 }
804@@ -222,35 +225,29 @@
805 }
806
807 // AgentAlive returns whether the respective remote agent is alive.
808-func (u *Unit) AgentAlive() bool {
809- return u.st.presencew.Alive(u.globalKey())
810+func (u *Unit) AgentAlive() (bool, error) {
811+ return u.st.pwatcher.Alive(u.globalKey())
812 }
813
814 // WaitAgentAlive blocks until the respective agent is alive.
815-func (u *Unit) WaitAgentAlive(timeout time.Duration) error {
816+func (u *Unit) WaitAgentAlive(timeout time.Duration) (err error) {
817+ defer trivial.ErrorContextf(&err, "waiting for agent of unit %q", u)
818 ch := make(chan presence.Change)
819- u.st.presencew.Add(u.globalKey(), ch)
820- defer u.st.presencew.Remove(u.globalKey(), ch)
821- // Initial check.
822- select {
823- case change := <-ch:
824- if change.Alive {
825- return nil
826- }
827- case <-time.After(timeout):
828- return fmt.Errorf("waiting for agent of unit %q: still not alive after timeout", u)
829- }
830- // Hasn't been alive, so now wait for change.
831- select {
832- case change := <-ch:
833- if change.Alive {
834- return nil
835- }
836- panic(fmt.Sprintf("presence reported dead status twice in a row for unit %q", u))
837- case <-time.After(timeout):
838- return fmt.Errorf("waiting for agent of unit %q: still not alive after timeout", u)
839- }
840- panic("unreachable")
841+ u.st.pwatcher.Watch(u.globalKey(), ch)
842+ defer u.st.pwatcher.Unwatch(u.globalKey(), ch)
843+ for i := 0; i < 2; i++ {
844+ select {
845+ case change := <-ch:
846+ if change.Alive {
847+ return nil
848+ }
849+ case <-time.After(timeout):
850+ return fmt.Errorf("still not alive after timeout")
851+ case <-u.st.pwatcher.Dying():
852+ return u.st.pwatcher.Err()
853+ }
854+ }
855+ panic(fmt.Sprintf("presence reported dead status twice in a row for unit %q", u))
856 }
857
858 // SetAgentAlive signals that the agent for unit u is alive.
859
860=== modified file 'mstate/unit_test.go'
861--- mstate/unit_test.go 2012-09-06 17:04:54 +0000
862+++ mstate/unit_test.go 2012-09-11 13:29:53 +0000
863@@ -90,7 +90,7 @@
864 c.Assert(p.Kill(), IsNil)
865 }()
866
867- s.State.ForcePresenceRefresh()
868+ s.State.StartSync()
869 status, info, err = s.unit.Status()
870 c.Assert(err, IsNil)
871 c.Assert(status, Equals, state.UnitStarted)
872@@ -105,7 +105,8 @@
873 }
874
875 func (s *UnitSuite) TestUnitSetAgentAlive(c *C) {
876- alive := s.unit.AgentAlive()
877+ alive, err := s.unit.AgentAlive()
878+ c.Assert(err, IsNil)
879 c.Assert(alive, Equals, false)
880
881 pinger, err := s.unit.SetAgentAlive()
882@@ -113,34 +114,38 @@
883 c.Assert(pinger, Not(IsNil))
884 defer pinger.Stop()
885
886- s.State.ForcePresenceRefresh()
887- alive = s.unit.AgentAlive()
888+ s.State.Sync()
889+ alive, err = s.unit.AgentAlive()
890+ c.Assert(err, IsNil)
891 c.Assert(alive, Equals, true)
892 }
893
894 func (s *UnitSuite) TestUnitWaitAgentAlive(c *C) {
895 timeout := 5 * time.Second
896- alive := s.unit.AgentAlive()
897+ alive, err := s.unit.AgentAlive()
898+ c.Assert(err, IsNil)
899 c.Assert(alive, Equals, false)
900
901- err := s.unit.WaitAgentAlive(timeout)
902+ err = s.unit.WaitAgentAlive(timeout)
903 c.Assert(err, ErrorMatches, `waiting for agent of unit "wordpress/0": still not alive after timeout`)
904
905 pinger, err := s.unit.SetAgentAlive()
906 c.Assert(err, IsNil)
907
908- s.State.ForcePresenceRefresh()
909+ s.State.StartSync()
910 err = s.unit.WaitAgentAlive(timeout)
911 c.Assert(err, IsNil)
912
913- alive = s.unit.AgentAlive()
914+ alive, err = s.unit.AgentAlive()
915+ c.Assert(err, IsNil)
916 c.Assert(alive, Equals, true)
917
918 err = pinger.Kill()
919 c.Assert(err, IsNil)
920
921- s.State.ForcePresenceRefresh()
922- alive = s.unit.AgentAlive()
923+ s.State.Sync()
924+ alive, err = s.unit.AgentAlive()
925+ c.Assert(err, IsNil)
926 c.Assert(alive, Equals, false)
927 }
928
929
930=== modified file 'mstate/watcher/watcher_test.go'
931--- mstate/watcher/watcher_test.go 2012-09-10 16:19:57 +0000
932+++ mstate/watcher/watcher_test.go 2012-09-11 13:29:53 +0000
933@@ -440,9 +440,6 @@
934 func (s *WatcherSuite) TestStartSync(c *C) {
935 s.w.Watch("test", "a", -1, s.ch)
936
937- // Nothing to do here.
938- s.w.StartSync()
939-
940 revno := s.insert(c, "test", "a")
941
942 done := make(chan bool)
943@@ -456,7 +453,7 @@
944 select {
945 case <-done:
946 case <-time.After(100 * time.Millisecond):
947- c.Fatalf("SyncStart failed to return")
948+ c.Fatalf("StartSync failed to return")
949 }
950
951 assertChange(c, s.ch, watcher.Change{"test", "a", revno})
952@@ -479,7 +476,7 @@
953 select {
954 case <-done:
955 c.Fatalf("Sync returned too early")
956- case <-time.After(500 * time.Millisecond):
957+ case <-time.After(200 * time.Millisecond):
958 }
959
960 assertChange(c, s.ch, watcher.Change{"test", "a", revno})

Subscribers

People subscribed via source and target branches