Merge lp:~niemeyer/juju-core/presence-polishing into lp:~juju/juju-core/trunk

Proposed by Gustavo Niemeyer
Status: Merged
Merged at revision: 483
Proposed branch: lp:~niemeyer/juju-core/presence-polishing
Merge into: lp:~juju/juju-core/trunk
Prerequisite: lp:~niemeyer/juju-core/mstate-machine-watcher
Diff against target: 960 lines (+319/-209)
9 files modified
mstate/machine.go (+19/-25)
mstate/machine_test.go (+13/-10)
mstate/open.go (+3/-3)
mstate/presence/presence.go (+88/-74)
mstate/presence/presence_test.go (+137/-37)
mstate/state.go (+19/-19)
mstate/unit.go (+23/-26)
mstate/unit_test.go (+15/-10)
mstate/watcher/watcher_test.go (+2/-5)
To merge this branch: bzr merge lp:~niemeyer/juju-core/presence-polishing
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+123627@code.launchpad.net

Description of the change

mstate/presence: bring it in line with mstate/watcher

This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.

It also addresses a bug in Alive (it could return false
improperly without errors).

https://codereview.appspot.com/6501114/

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Reviewers: mp+123627_code.launchpad.net,

Message:
Please take a look.

Description:
mstate/presence: bring it in line with mstate/watcher

This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.

It also addresses a bug in Alive (it could return false
improperly without errors).

https://code.launchpad.net/~niemeyer/juju-core/presence-polishing/+merge/123627

Requires:
https://code.launchpad.net/~niemeyer/juju-core/mstate-machine-watcher/+merge/123614

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/6501114/

Affected files:
   A [revision details]
   M mstate/machine.go
   M mstate/machine_test.go
   M mstate/open.go
   M mstate/presence/presence.go
   M mstate/presence/presence_test.go
   M mstate/state.go
   M mstate/unit.go
   M mstate/unit_test.go
   M mstate/watcher/watcher_test.go

Revision history for this message
William Reade (fwereade) wrote :

LGTM, only very vague suggestions :).

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (left):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#oldcode88
mstate/machine.go:88: func (m *Machine) WaitAgentAlive(timeout
time.Duration) error {
Kinda irrelevant, but I still don't know what use case these
WaitAgentAlive methods fulfil.

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#newcode114
mstate/machine.go:114: return fmt.Errorf("waiting for agent of machine
%v: watcher is dying", m)
ErrorContextf?

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go
File mstate/presence/presence.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go#newcode240
mstate/presence/presence.go:240: // w.pending may get new requests as we
handle other requests.
I'm not sure I quite follow how this works. It seems that it's
specifically *only* while handling requests that pending can change --
and that therefore there's no opportunity for it to change in between a
failed loop test and the closing truncate -- but I had to think about it
for a little bit longer than I would prefer, and it has a little hint of
looking-wrong to it. Would it make sense to expand the comment a little?

https://codereview.appspot.com/6501114/diff/1/mstate/unit.go
File mstate/unit.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/unit.go#newcode257
mstate/unit.go:257: return fmt.Errorf("waiting for agent of unit %q:
watcher is dying", u)
Similar comments to Machine.WaitAgentAlive

https://codereview.appspot.com/6501114/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Please take a look.

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (left):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#oldcode88
mstate/machine.go:88: func (m *Machine) WaitAgentAlive(timeout
time.Duration) error {
On 2012/09/11 12:09:31, fwereade wrote:
> Kinda irrelevant, but I still don't know what use case these
WaitAgentAlive
> methods fulfil.

LOL.. that crossed my mind. It feels like something clever we thought of
but never used.

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#newcode114
mstate/machine.go:114: return fmt.Errorf("waiting for agent of machine
%v: watcher is dying", m)
On 2012/09/11 12:09:31, fwereade wrote:
> ErrorContextf?

Good catch, done.

I've also refactored those methods a bit to reduce them.

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go
File mstate/presence/presence.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go#newcode240
mstate/presence/presence.go:240: // w.pending may get new requests as we
handle other requests.
On 2012/09/11 12:09:31, fwereade wrote:
> I'm not sure I quite follow how this works. It seems that it's
specifically
> *only* while handling requests that pending can change --

Events dispatched from the syncing procedure go into pending too. I'm
happy to expand the comment, but do you have any suggestions that would
make it clarify the misunderstanding you have/had?

https://codereview.appspot.com/6501114/

Revision history for this message
William Reade (fwereade) wrote :

LGTM

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/machine.go#newcode114
mstate/machine.go:114: return fmt.Errorf("waiting for agent of machine
%v: watcher is dying", m)
On 2012/09/11 13:27:15, niemeyer wrote:
> On 2012/09/11 12:09:31, fwereade wrote:
> > ErrorContextf?

> Good catch, done.

> I've also refactored those methods a bit to reduce them.

Very neat :)

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go
File mstate/presence/presence.go (right):

https://codereview.appspot.com/6501114/diff/1/mstate/presence/presence.go#newcode240
mstate/presence/presence.go:240: // w.pending may get new requests as we
handle other requests.
On 2012/09/11 13:27:15, niemeyer wrote:
> On 2012/09/11 12:09:31, fwereade wrote:
> > I'm not sure I quite follow how this works. It seems that it's
specifically
> > *only* while handling requests that pending can change --

> Events dispatched from the syncing procedure go into pending too. I'm
happy to
> expand the comment, but do you have any suggestions that would make it
clarify
> the misunderstanding you have/had?

Everything I can think of is a more cumbersome rewording of what you
already have -- best just forget I said anything :)

https://codereview.appspot.com/6501114/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

*** Submitted:

mstate/presence: bring it in line with mstate/watcher

This mirrors into mstate/presence the improvements made into
the mstate/watcher package, including several points made in
reviews.

It also addresses a bug in Alive (it could return false
improperly without errors).

R=fwereade
CC=
https://codereview.appspot.com/6501114

https://codereview.appspot.com/6501114/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'mstate/machine.go'
--- mstate/machine.go 2012-09-10 15:29:01 +0000
+++ mstate/machine.go 2012-09-11 13:29:53 +0000
@@ -80,35 +80,29 @@
80}80}
8181
82// AgentAlive returns whether the respective remote agent is alive.82// AgentAlive returns whether the respective remote agent is alive.
83func (m *Machine) AgentAlive() bool {83func (m *Machine) AgentAlive() (bool, error) {
84 return m.st.presencew.Alive(m.globalKey())84 return m.st.pwatcher.Alive(m.globalKey())
85}85}
8686
87// WaitAgentAlive blocks until the respective agent is alive.87// WaitAgentAlive blocks until the respective agent is alive.
88func (m *Machine) WaitAgentAlive(timeout time.Duration) error {88func (m *Machine) WaitAgentAlive(timeout time.Duration) (err error) {
89 defer trivial.ErrorContextf(&err, "waiting for agent of machine %v", m)
89 ch := make(chan presence.Change)90 ch := make(chan presence.Change)
90 m.st.presencew.Add(m.globalKey(), ch)91 m.st.pwatcher.Watch(m.globalKey(), ch)
91 defer m.st.presencew.Remove(m.globalKey(), ch)92 defer m.st.pwatcher.Unwatch(m.globalKey(), ch)
92 // Initial check.93 for i := 0; i < 2; i++ {
93 select {94 select {
94 case change := <-ch:95 case change := <-ch:
95 if change.Alive {96 if change.Alive {
96 return nil97 return nil
97 }98 }
98 case <-time.After(timeout):99 case <-time.After(timeout):
99 return fmt.Errorf("waiting for agent of machine %v: still not alive after timeout", m)100 return fmt.Errorf("still not alive after timeout")
100 }101 case <-m.st.pwatcher.Dying():
101 // Hasn't been alive, so now wait for change.102 return m.st.pwatcher.Err()
102 select {103 }
103 case change := <-ch:104 }
104 if change.Alive {105 panic(fmt.Sprintf("presence reported dead status twice in a row for machine %v", m))
105 return nil
106 }
107 panic(fmt.Sprintf("presence reported dead status twice in a row for machine %v", m))
108 case <-time.After(timeout):
109 return fmt.Errorf("waiting for agent of machine %v: still not alive after timeout", m)
110 }
111 panic("unreachable")
112}106}
113107
114// SetAgentAlive signals that the agent for machine m is alive. 108// SetAgentAlive signals that the agent for machine m is alive.
115109
=== modified file 'mstate/machine_test.go'
--- mstate/machine_test.go 2012-09-10 15:29:01 +0000
+++ mstate/machine_test.go 2012-09-11 13:29:53 +0000
@@ -23,7 +23,8 @@
23}23}
2424
25func (s *MachineSuite) TestMachineSetAgentAlive(c *C) {25func (s *MachineSuite) TestMachineSetAgentAlive(c *C) {
26 alive := s.machine.AgentAlive()26 alive, err := s.machine.AgentAlive()
27 c.Assert(err, IsNil)
27 c.Assert(alive, Equals, false)28 c.Assert(alive, Equals, false)
2829
29 pinger, err := s.machine.SetAgentAlive()30 pinger, err := s.machine.SetAgentAlive()
@@ -31,37 +32,39 @@
31 c.Assert(pinger, Not(IsNil))32 c.Assert(pinger, Not(IsNil))
32 defer pinger.Stop()33 defer pinger.Stop()
3334
34 s.State.ForcePresenceRefresh()35 s.State.Sync()
35 alive = s.machine.AgentAlive()36 alive, err = s.machine.AgentAlive()
37 c.Assert(err, IsNil)
36 c.Assert(alive, Equals, true)38 c.Assert(alive, Equals, true)
37}39}
3840
39func (s *MachineSuite) TestMachineWaitAgentAlive(c *C) {41func (s *MachineSuite) TestMachineWaitAgentAlive(c *C) {
40 // test -gocheck.f TestMachineWaitAgentAlive42 // test -gocheck.f TestMachineWaitAgentAlive
41 timeout := 5 * time.Second43 timeout := 5 * time.Second
42 alive := s.machine.AgentAlive()44 alive, err := s.machine.AgentAlive()
45 c.Assert(err, IsNil)
43 c.Assert(alive, Equals, false)46 c.Assert(alive, Equals, false)
4447
45 s.State.ForcePresenceRefresh()48 s.State.StartSync()
46 err := s.machine.WaitAgentAlive(timeout)49 err = s.machine.WaitAgentAlive(timeout)
47 c.Assert(err, ErrorMatches, `waiting for agent of machine 0: still not alive after timeout`)50 c.Assert(err, ErrorMatches, `waiting for agent of machine 0: still not alive after timeout`)
4851
49 pinger, err := s.machine.SetAgentAlive()52 pinger, err := s.machine.SetAgentAlive()
50 c.Assert(err, IsNil)53 c.Assert(err, IsNil)
5154
52 s.State.ForcePresenceRefresh()55 s.State.StartSync()
53 err = s.machine.WaitAgentAlive(timeout)56 err = s.machine.WaitAgentAlive(timeout)
54 c.Assert(err, IsNil)57 c.Assert(err, IsNil)
5558
56 alive = s.machine.AgentAlive()59 alive, err = s.machine.AgentAlive()
57 c.Assert(err, IsNil)60 c.Assert(err, IsNil)
58 c.Assert(alive, Equals, true)61 c.Assert(alive, Equals, true)
5962
60 err = pinger.Kill()63 err = pinger.Kill()
61 c.Assert(err, IsNil)64 c.Assert(err, IsNil)
6265
63 s.State.ForcePresenceRefresh()66 s.State.Sync()
64 alive = s.machine.AgentAlive()67 alive, err = s.machine.AgentAlive()
65 c.Assert(err, IsNil)68 c.Assert(err, IsNil)
66 c.Assert(alive, Equals, false)69 c.Assert(alive, Equals, false)
67}70}
6871
=== modified file 'mstate/open.go'
--- mstate/open.go 2012-09-11 08:42:28 +0000
+++ mstate/open.go 2012-09-11 13:29:53 +0000
@@ -50,7 +50,7 @@
50 st.runner = txn.NewRunner(db.C("txns"))50 st.runner = txn.NewRunner(db.C("txns"))
51 st.runner.ChangeLog(db.C("txns.log"))51 st.runner.ChangeLog(db.C("txns.log"))
52 st.watcher = watcher.New(db.C("txns.log"))52 st.watcher = watcher.New(db.C("txns.log"))
53 st.presencew = presence.NewWatcher(pdb.C("presence"))53 st.pwatcher = presence.NewWatcher(pdb.C("presence"))
54 for _, index := range indexes {54 for _, index := range indexes {
55 err = st.relations.EnsureIndex(index)55 err = st.relations.EnsureIndex(index)
56 if err != nil {56 if err != nil {
@@ -61,8 +61,8 @@
61}61}
6262
63func (st *State) Close() error {63func (st *State) Close() error {
64 err1 := st.presencew.Stop()64 err1 := st.watcher.Stop()
65 err2 := st.watcher.Stop()65 err2 := st.pwatcher.Stop()
66 st.db.Session.Close()66 st.db.Session.Close()
67 for _, err := range []error{err1, err2} {67 for _, err := range []error{err1, err2} {
68 if err != nil {68 if err != nil {
6969
=== modified file 'mstate/presence/presence.go'
--- mstate/presence/presence.go 2012-09-06 22:12:07 +0000
+++ mstate/presence/presence.go 2012-09-11 13:29:53 +0000
@@ -58,7 +58,7 @@
58 beingKey map[int64]string58 beingKey map[int64]string
59 beingSeq map[string]int6459 beingSeq map[string]int64
6060
61 // watches has the per-key observer channels from Add/Remove.61 // watches has the per-key observer channels from Watch/Unwatch.
62 watches map[string][]chan<- Change62 watches map[string][]chan<- Change
6363
64 // pending contains all the events to be dispatched to the watcher64 // pending contains all the events to be dispatched to the watcher
@@ -70,13 +70,12 @@
70 // the the gorotuine loop.70 // the the gorotuine loop.
71 request chan interface{}71 request chan interface{}
7272
73 // refreshed contains pending ForceRefresh done channels73 // syncDone contains pending done channels from sync requests.
74 // that are waiting for the completion notice.74 syncDone []chan bool
75 refreshed []chan bool
7675
77 // next will dispatch when it's time to refresh the database76 // next will dispatch when it's time to sync the database
78 // knowledge. It's maintained here so that ForceRefresh77 // knowledge. It's maintained here so that ForceRefresh
79 // can manipulate it to force a refresh sooner.78 // can manipulate it to force a sync sooner.
80 next <-chan time.Time79 next <-chan time.Time
81}80}
8281
@@ -116,17 +115,31 @@
116 return w.tomb.Wait()115 return w.tomb.Wait()
117}116}
118117
119type reqAdd struct {118// Dying returns a channel that is closed when the watcher is stopping
120 key string119// due to an error or because Stop was called explicitly.
121 ch chan<- Change120func (w *Watcher) Dying() <-chan struct{} {
122}121 return w.tomb.Dying()
123122}
124type reqRemove struct {123
125 key string124// Err returns the error with which the watcher stopped.
126 ch chan<- Change125// It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive
127}126// if the watcher is still running properly, or the respective error
128127// if the watcher is terminating or has terminated with an error.
129type reqRefresh struct {128func (w *Watcher) Err() error {
129 return w.tomb.Err()
130}
131
132type reqWatch struct {
133 key string
134 ch chan<- Change
135}
136
137type reqUnwatch struct {
138 key string
139 ch chan<- Change
140}
141
142type reqSync struct {
130 done chan bool143 done chan bool
131}144}
132145
@@ -135,55 +148,55 @@
135 result chan bool148 result chan bool
136}149}
137150
138// Add includes key into w for liveness monitoring. An event will151func (w *Watcher) sendReq(req interface{}) {
152 select {
153 case w.request <- req:
154 case <-w.tomb.Dying():
155 }
156}
157
158// Watch starts watching the liveness of key. An event will
139// be sent onto ch to report the initial status for the key, and159// be sent onto ch to report the initial status for the key, and
140// from then on a new event will be sent whenever a change is160// from then on a new event will be sent whenever a change is
141// detected. Change values sent to the channel must be consumed,161// detected. Change values sent to the channel must be consumed,
142// or the whole watcher will blocked.162// or the whole watcher will blocked.
143func (w *Watcher) Add(key string, ch chan<- Change) {163func (w *Watcher) Watch(key string, ch chan<- Change) {
144 select {164 w.sendReq(reqWatch{key, ch})
145 case w.request <- reqAdd{key, ch}:165}
146 case <-w.tomb.Dying():166
147 }167// Unwatch stops watching the liveness of key via ch.
148}168func (w *Watcher) Unwatch(key string, ch chan<- Change) {
149169 w.sendReq(reqUnwatch{key, ch})
150// Remove removes key and ch from liveness monitoring.170}
151func (w *Watcher) Remove(key string, ch chan<- Change) {171
152 select {172// StartSync forces the watcher to load new events from the database.
153 case w.request <- reqRemove{key, ch}:173func (w *Watcher) StartSync() {
154 case <-w.tomb.Dying():174 w.sendReq(reqSync{nil})
155 }175}
156}176
157177// Sync forces the watcher to load new events from the database and blocks
158// ForceRefresh forces a synchronous refresh of the watcher knowledge.178// until all events have been dispatched.
159// It blocks until the database state has been loaded and the events179func (w *Watcher) Sync() {
160// have been prepared, but unblocks before changes are sent onto the
161// registered channels.
162func (w *Watcher) ForceRefresh() {
163 done := make(chan bool)180 done := make(chan bool)
164 select {181 w.sendReq(reqSync{done})
165 case w.request <- reqRefresh{done}:
166 case <-w.tomb.Dying():
167 }
168 select {182 select {
169 case <-done:183 case <-done:
170 case <-w.tomb.Dying():184 case <-w.tomb.Dying():
171 }185 }
172}186}
173187
174// Alive returns whether the key is currently considered alive by w.188// Alive returns whether the key is currently considered alive by w,
175func (w *Watcher) Alive(key string) bool {189// or an error in case the watcher is dying.
190func (w *Watcher) Alive(key string) (bool, error) {
176 result := make(chan bool, 1)191 result := make(chan bool, 1)
177 select {192 w.sendReq(reqAlive{key, result})
178 case w.request <- reqAlive{key, result}:
179 case <-w.tomb.Dying():
180 }
181 var alive bool193 var alive bool
182 select {194 select {
183 case alive = <-result:195 case alive = <-result:
184 case <-w.tomb.Dying():196 case <-w.tomb.Dying():
197 return false, fmt.Errorf("cannot check liveness: watcher is dying")
185 }198 }
186 return alive199 return alive, nil
187}200}
188201
189// period is the length of each time slot in seconds.202// period is the length of each time slot in seconds.
@@ -205,18 +218,19 @@
205 return tomb.ErrDying218 return tomb.ErrDying
206 case <-w.next:219 case <-w.next:
207 w.next = time.After(time.Duration(period) * time.Second)220 w.next = time.After(time.Duration(period) * time.Second)
208 refreshed := w.refreshed221 syncDone := w.syncDone
209 w.refreshed = nil222 w.syncDone = nil
210 if err := w.refresh(); err != nil {223 if err := w.sync(); err != nil {
211 return err224 return err
212 }225 }
213 for _, done := range refreshed {226 w.flush()
227 for _, done := range syncDone {
214 close(done)228 close(done)
215 }229 }
216 case req := <-w.request:230 case req := <-w.request:
217 w.handle(req)231 w.handle(req)
232 w.flush()
218 }233 }
219 w.flush()
220 }234 }
221 return nil235 return nil
222}236}
@@ -224,20 +238,18 @@
224// flush sends all pending events to their respective channels.238// flush sends all pending events to their respective channels.
225func (w *Watcher) flush() {239func (w *Watcher) flush() {
226 // w.pending may get new requests as we handle other requests.240 // w.pending may get new requests as we handle other requests.
227 i := 0241 for i := 0; i < len(w.pending); i++ {
228 for i < len(w.pending) {
229 e := &w.pending[i]242 e := &w.pending[i]
230 if e.ch == nil {243 for e.ch != nil {
231 i++ // Removed meanwhile.244 select {
232 continue245 case <-w.tomb.Dying():
233 }246 return
234 select {247 case req := <-w.request:
235 case <-w.tomb.Dying():248 w.handle(req)
236 return249 continue
237 case req := <-w.request:250 case e.ch <- Change{e.key, e.alive}:
238 w.handle(req)251 }
239 case e.ch <- Change{e.key, e.alive}:252 break
240 i++
241 }253 }
242 }254 }
243 w.pending = w.pending[:0]255 w.pending = w.pending[:0]
@@ -248,10 +260,12 @@
248func (w *Watcher) handle(req interface{}) {260func (w *Watcher) handle(req interface{}) {
249 log.Debugf("presence: got request: %#v", req)261 log.Debugf("presence: got request: %#v", req)
250 switch r := req.(type) {262 switch r := req.(type) {
251 case reqRefresh:263 case reqSync:
252 w.next = time.After(0)264 w.next = time.After(0)
253 w.refreshed = append(w.refreshed, r.done)265 if r.done != nil {
254 case reqAdd:266 w.syncDone = append(w.syncDone, r.done)
267 }
268 case reqWatch:
255 for _, ch := range w.watches[r.key] {269 for _, ch := range w.watches[r.key] {
256 if ch == r.ch {270 if ch == r.ch {
257 panic("adding channel twice for same key")271 panic("adding channel twice for same key")
@@ -260,7 +274,7 @@
260 w.watches[r.key] = append(w.watches[r.key], r.ch)274 w.watches[r.key] = append(w.watches[r.key], r.ch)
261 _, alive := w.beingSeq[r.key]275 _, alive := w.beingSeq[r.key]
262 w.pending = append(w.pending, event{r.ch, r.key, alive})276 w.pending = append(w.pending, event{r.ch, r.key, alive})
263 case reqRemove:277 case reqUnwatch:
264 watches := w.watches[r.key]278 watches := w.watches[r.key]
265 for i, ch := range watches {279 for i, ch := range watches {
266 if ch == r.ch {280 if ch == r.ch {
@@ -294,11 +308,11 @@
294 Dead map[string]int64 ",omitempty"308 Dead map[string]int64 ",omitempty"
295}309}
296310
297// refresh updates the watcher knowledge from the database, and311// sync updates the watcher knowledge from the database, and
298// queues events to observing channels. It fetches the last two time312// queues events to observing channels. It fetches the last two time
299// slots and compares the union of both to the in-memory state.313// slots and compares the union of both to the in-memory state.
300func (w *Watcher) refresh() error {314func (w *Watcher) sync() error {
301 log.Debugf("presence: refreshing watcher knowledge from database...")315 log.Debugf("presence: synchronizing watcher knowledge with database...")
302 slot := timeSlot(time.Now(), w.delta)316 slot := timeSlot(time.Now(), w.delta)
303 var ping []pingInfo317 var ping []pingInfo
304 err := w.pings.Find(bson.D{{"$or", []pingInfo{{Slot: slot}, {Slot: slot - period}}}}).All(&ping)318 err := w.pings.Find(bson.D{{"$or", []pingInfo{{Slot: slot}, {Slot: slot - period}}}}).All(&ping)
305319
=== modified file 'mstate/presence/presence_test.go'
--- mstate/presence/presence_test.go 2012-09-06 22:12:07 +0000
+++ mstate/presence/presence_test.go 2012-09-11 13:29:53 +0000
@@ -5,6 +5,7 @@
5 . "launchpad.net/gocheck"5 . "launchpad.net/gocheck"
6 "launchpad.net/juju-core/mstate/presence"6 "launchpad.net/juju-core/mstate/presence"
7 "launchpad.net/juju-core/testing"7 "launchpad.net/juju-core/testing"
8 "launchpad.net/tomb"
8 "strconv"9 "strconv"
9 stdtesting "testing"10 stdtesting "testing"
10 "time"11 "time"
@@ -71,6 +72,40 @@
71 }72 }
72}73}
7374
75func assertAlive(c *C, w *presence.Watcher, key string, alive bool) {
76 alive, err := w.Alive("a")
77 c.Assert(err, IsNil)
78 c.Assert(alive, Equals, alive)
79}
80
81func (s *PresenceSuite) TestErrAndDying(c *C) {
82 w := presence.NewWatcher(s.presence)
83 defer w.Stop()
84
85 c.Assert(w.Err(), Equals, tomb.ErrStillAlive)
86 select {
87 case <-w.Dying():
88 c.Fatalf("Dying channel fired unexpectedly")
89 default:
90 }
91 c.Assert(w.Stop(), IsNil)
92 c.Assert(w.Err(), IsNil)
93 select {
94 case <-w.Dying():
95 default:
96 c.Fatalf("Dying channel should have fired")
97 }
98}
99
100func (s *PresenceSuite) TestAliveError(c *C) {
101 w := presence.NewWatcher(s.presence)
102 c.Assert(w.Stop(), IsNil)
103
104 alive, err := w.Alive("a")
105 c.Assert(err, ErrorMatches, ".*: watcher is dying")
106 c.Assert(alive, Equals, false)
107}
108
74func (s *PresenceSuite) TestWorkflow(c *C) {109func (s *PresenceSuite) TestWorkflow(c *C) {
75 w := presence.NewWatcher(s.presence)110 w := presence.NewWatcher(s.presence)
76 pa := presence.NewPinger(s.presence, "a")111 pa := presence.NewPinger(s.presence, "a")
@@ -79,61 +114,61 @@
79 defer pa.Stop()114 defer pa.Stop()
80 defer pb.Stop()115 defer pb.Stop()
81116
82 c.Assert(w.Alive("a"), Equals, false)117 assertAlive(c, w, "a", false)
83 c.Assert(w.Alive("b"), Equals, false)118 assertAlive(c, w, "b", false)
84119
85 // Buffer one entry to avoid blocking the watcher here.120 // Buffer one entry to avoid blocking the watcher here.
86 cha := make(chan presence.Change, 1)121 cha := make(chan presence.Change, 1)
87 chb := make(chan presence.Change, 1)122 chb := make(chan presence.Change, 1)
88 w.Add("a", cha)123 w.Watch("a", cha)
89 w.Add("b", chb)124 w.Watch("b", chb)
90125
91 // Initial events with current status.126 // Initial events with current status.
92 assertChange(c, cha, presence.Change{"a", false})127 assertChange(c, cha, presence.Change{"a", false})
93 assertChange(c, chb, presence.Change{"b", false})128 assertChange(c, chb, presence.Change{"b", false})
94129
95 w.ForceRefresh()130 w.StartSync()
96 assertNoChange(c, cha)131 assertNoChange(c, cha)
97 assertNoChange(c, chb)132 assertNoChange(c, chb)
98133
99 c.Assert(pa.Start(), IsNil)134 c.Assert(pa.Start(), IsNil)
100135
101 w.ForceRefresh()136 w.StartSync()
102 assertChange(c, cha, presence.Change{"a", true})137 assertChange(c, cha, presence.Change{"a", true})
103 assertNoChange(c, cha)138 assertNoChange(c, cha)
104 assertNoChange(c, chb)139 assertNoChange(c, chb)
105140
106 c.Assert(w.Alive("a"), Equals, true)141 assertAlive(c, w, "a", true)
107 c.Assert(w.Alive("b"), Equals, false)142 assertAlive(c, w, "b", false)
108143
109 // Changes while the channel is out are not observed.144 // Changes while the channel is out are not observed.
110 w.Remove("a", cha)145 w.Unwatch("a", cha)
111 assertNoChange(c, cha)146 assertNoChange(c, cha)
112 pa.Kill()147 pa.Kill()
113 w.ForceRefresh()148 w.Sync()
114 pa = presence.NewPinger(s.presence, "a")149 pa = presence.NewPinger(s.presence, "a")
115 pa.Start()150 pa.Start()
116 w.ForceRefresh()151 w.StartSync()
117 assertNoChange(c, cha)152 assertNoChange(c, cha)
118153
119 // We can still query it manually, though.154 // We can still query it manually, though.
120 c.Assert(w.Alive("a"), Equals, true)155 assertAlive(c, w, "a", true)
121 c.Assert(w.Alive("b"), Equals, false)156 assertAlive(c, w, "b", false)
122157
123 // Initial positive event. No refresh needed.158 // Initial positive event. No refresh needed.
124 w.Add("a", cha)159 w.Watch("a", cha)
125 assertChange(c, cha, presence.Change{"a", true})160 assertChange(c, cha, presence.Change{"a", true})
126161
127 c.Assert(pb.Start(), IsNil)162 c.Assert(pb.Start(), IsNil)
128163
129 w.ForceRefresh()164 w.StartSync()
130 assertChange(c, chb, presence.Change{"b", true})165 assertChange(c, chb, presence.Change{"b", true})
131 assertNoChange(c, cha)166 assertNoChange(c, cha)
132 assertNoChange(c, chb)167 assertNoChange(c, chb)
133168
134 c.Assert(pa.Stop(), IsNil)169 c.Assert(pa.Stop(), IsNil)
135170
136 w.ForceRefresh()171 w.StartSync()
137 assertNoChange(c, cha)172 assertNoChange(c, cha)
138 assertNoChange(c, chb)173 assertNoChange(c, chb)
139174
@@ -141,7 +176,7 @@
141 c.Assert(pa.Kill(), IsNil)176 c.Assert(pa.Kill(), IsNil)
142 c.Assert(pb.Kill(), IsNil)177 c.Assert(pb.Kill(), IsNil)
143178
144 w.ForceRefresh()179 w.StartSync()
145 assertChange(c, cha, presence.Change{"a", false})180 assertChange(c, cha, presence.Change{"a", false})
146 assertChange(c, chb, presence.Change{"b", false})181 assertChange(c, chb, presence.Change{"b", false})
147182
@@ -172,11 +207,11 @@
172 c.Logf("Checking who's still alive...")207 c.Logf("Checking who's still alive...")
173 w := presence.NewWatcher(s.presence)208 w := presence.NewWatcher(s.presence)
174 defer w.Stop()209 defer w.Stop()
175 w.ForceRefresh()210 w.Sync()
176 ch := make(chan presence.Change)211 ch := make(chan presence.Change)
177 for i := 0; i < N; i++ {212 for i := 0; i < N; i++ {
178 k := strconv.Itoa(i)213 k := strconv.Itoa(i)
179 w.Add(k, ch)214 w.Watch(k, ch)
180 if i%2 == 0 {215 if i%2 == 0 {
181 assertChange(c, ch, presence.Change{k, true})216 assertChange(c, ch, presence.Change{k, true})
182 } else {217 } else {
@@ -192,26 +227,26 @@
192 defer p.Stop()227 defer p.Stop()
193228
194 ch := make(chan presence.Change)229 ch := make(chan presence.Change)
195 w.Add("a", ch)230 w.Watch("a", ch)
196 assertChange(c, ch, presence.Change{"a", false})231 assertChange(c, ch, presence.Change{"a", false})
197232
198 c.Assert(p.Start(), IsNil)233 c.Assert(p.Start(), IsNil)
199 w.ForceRefresh()234 w.StartSync()
200 assertChange(c, ch, presence.Change{"a", true})235 assertChange(c, ch, presence.Change{"a", true})
201236
202 // Still alive in previous slot.237 // Still alive in previous slot.
203 presence.FakeTimeSlot(1)238 presence.FakeTimeSlot(1)
204 w.ForceRefresh()239 w.StartSync()
205 assertNoChange(c, ch)240 assertNoChange(c, ch)
206241
207 // Two last slots are empty.242 // Two last slots are empty.
208 presence.FakeTimeSlot(2)243 presence.FakeTimeSlot(2)
209 w.ForceRefresh()244 w.StartSync()
210 assertChange(c, ch, presence.Change{"a", false})245 assertChange(c, ch, presence.Change{"a", false})
211246
212 // Already dead so killing isn't noticed.247 // Already dead so killing isn't noticed.
213 p.Kill()248 p.Kill()
214 w.ForceRefresh()249 w.StartSync()
215 assertNoChange(c, ch)250 assertNoChange(c, ch)
216}251}
217252
@@ -225,7 +260,7 @@
225 defer p.Stop()260 defer p.Stop()
226261
227 ch := make(chan presence.Change)262 ch := make(chan presence.Change)
228 w.Add("a", ch)263 w.Watch("a", ch)
229 assertChange(c, ch, presence.Change{"a", false})264 assertChange(c, ch, presence.Change{"a", false})
230265
231 // A single ping.266 // A single ping.
@@ -237,18 +272,18 @@
237 assertChange(c, ch, presence.Change{"a", true})272 assertChange(c, ch, presence.Change{"a", true})
238}273}
239274
240func (s *PresenceSuite) TestAddRemoveOnQueue(c *C) {275func (s *PresenceSuite) TestWatchUnwatchOnQueue(c *C) {
241 w := presence.NewWatcher(s.presence)276 w := presence.NewWatcher(s.presence)
242 ch := make(chan presence.Change)277 ch := make(chan presence.Change)
243 for i := 0; i < 100; i++ {278 for i := 0; i < 100; i++ {
244 key := strconv.Itoa(i)279 key := strconv.Itoa(i)
245 c.Logf("Adding %q", key)280 c.Logf("Adding %q", key)
246 w.Add(key, ch)281 w.Watch(key, ch)
247 }282 }
248 for i := 1; i < 100; i += 2 {283 for i := 1; i < 100; i += 2 {
249 key := strconv.Itoa(i)284 key := strconv.Itoa(i)
250 c.Logf("Removing %q", key)285 c.Logf("Removing %q", key)
251 w.Remove(key, ch)286 w.Unwatch(key, ch)
252 }287 }
253 alive := make(map[string]bool)288 alive := make(map[string]bool)
254 for i := 0; i < 50; i++ {289 for i := 0; i < 50; i++ {
@@ -288,10 +323,10 @@
288 stop := false323 stop := false
289 for !stop {324 for !stop {
290 w := presence.NewWatcher(s.presence)325 w := presence.NewWatcher(s.presence)
291 w.ForceRefresh()326 w.Sync()
292 alive := w.Alive("a")327 alive, err := w.Alive("a")
293 c.Check(w.Stop(), IsNil)328 c.Check(w.Stop(), IsNil)
294 if !c.Check(alive, Equals, true) {329 if !c.Check(err, IsNil) || !c.Check(alive, Equals, true) {
295 break330 break
296 }331 }
297 select {332 select {
@@ -325,22 +360,87 @@
325 // Start p1 and let it go on.360 // Start p1 and let it go on.
326 c.Assert(p1.Start(), IsNil)361 c.Assert(p1.Start(), IsNil)
327362
328 w.ForceRefresh()363 w.Sync()
329 c.Assert(w.Alive("a"), Equals, true)364 assertAlive(c, w, "a", true)
330365
331 // Start and kill p2, which will temporarily366 // Start and kill p2, which will temporarily
332 // invalidate p1 and set the key as dead.367 // invalidate p1 and set the key as dead.
333 c.Assert(p2.Start(), IsNil)368 c.Assert(p2.Start(), IsNil)
334 c.Assert(p2.Kill(), IsNil)369 c.Assert(p2.Kill(), IsNil)
335370
336 w.ForceRefresh()371 w.Sync()
337 c.Assert(w.Alive("a"), Equals, false)372 assertAlive(c, w, "a", false)
338373
339 // Wait for two periods, and check again. Since374 // Wait for two periods, and check again. Since
340 // p1 is still alive, p2's death will expire and375 // p1 is still alive, p2's death will expire and
341 // the key will come back.376 // the key will come back.
342 time.Sleep(period * 2 * time.Second)377 time.Sleep(period * 2 * time.Second)
343378
344 w.ForceRefresh()379 w.Sync()
345 c.Assert(w.Alive("a"), Equals, true)380 assertAlive(c, w, "a", true)
381}
382
383func (s *PresenceSuite) TestStartSync(c *C) {
384 w := presence.NewWatcher(s.presence)
385 p := presence.NewPinger(s.presence, "a")
386 defer w.Stop()
387 defer p.Stop()
388
389 ch := make(chan presence.Change)
390 w.Watch("a", ch)
391 assertChange(c, ch, presence.Change{"a", false})
392
393 c.Assert(p.Start(), IsNil)
394
395 done := make(chan bool)
396 go func() {
397 w.StartSync()
398 w.StartSync()
399 w.StartSync()
400 done <- true
401 }()
402
403 select {
404 case <-done:
405 case <-time.After(100 * time.Millisecond):
406 c.Fatalf("StartSync failed to return")
407 }
408
409 assertChange(c, ch, presence.Change{"a", true})
410}
411
412func (s *PresenceSuite) TestSync(c *C) {
413 w := presence.NewWatcher(s.presence)
414 p := presence.NewPinger(s.presence, "a")
415 defer w.Stop()
416 defer p.Stop()
417
418 ch := make(chan presence.Change)
419 w.Watch("a", ch)
420 assertChange(c, ch, presence.Change{"a", false})
421
422 // Nothing to do here.
423 w.Sync()
424
425 c.Assert(p.Start(), IsNil)
426
427 done := make(chan bool)
428 go func() {
429 w.Sync()
430 done <- true
431 }()
432
433 select {
434 case <-done:
435 c.Fatalf("Sync returned too early")
436 case <-time.After(200 * time.Millisecond):
437 }
438
439 assertChange(c, ch, presence.Change{"a", true})
440
441 select {
442 case <-done:
443 case <-time.After(100 * time.Millisecond):
444 c.Fatalf("Sync failed to returned")
445 }
346}446}
347447
=== modified file 'mstate/state.go'
--- mstate/state.go 2012-09-10 15:29:01 +0000
+++ mstate/state.go 2012-09-11 13:29:53 +0000
@@ -28,17 +28,17 @@
28// State represents the state of an environment28// State represents the state of an environment
29// managed by juju.29// managed by juju.
30type State struct {30type State struct {
31 db *mgo.Database31 db *mgo.Database
32 charms *mgo.Collection32 charms *mgo.Collection
33 machines *mgo.Collection33 machines *mgo.Collection
34 relations *mgo.Collection34 relations *mgo.Collection
35 services *mgo.Collection35 services *mgo.Collection
36 settings *mgo.Collection36 settings *mgo.Collection
37 units *mgo.Collection37 units *mgo.Collection
38 presence *mgo.Collection38 presence *mgo.Collection
39 runner *txn.Runner39 runner *txn.Runner
40 watcher *watcher.Watcher40 watcher *watcher.Watcher
41 presencew *presence.Watcher41 pwatcher *presence.Watcher
42}42}
4343
44func deadOnAbort(err error) error {44func deadOnAbort(err error) error {
@@ -365,16 +365,16 @@
365 return newUnit(s, &doc), nil365 return newUnit(s, &doc), nil
366}366}
367367
368// ForcePresenceRefresh forces a synchronous refresh of
369// the presence watcher knowledge
370func (s *State) ForcePresenceRefresh() {
371 s.presencew.ForceRefresh()
372}
373
374// StartSync forces watchers to resynchronize their state with the368// StartSync forces watchers to resynchronize their state with the
375// database immediately. This will happen periodically automatically.369// database immediately. This will happen periodically automatically.
376func (s *State) StartSync() {370func (s *State) StartSync() {
377 // TODO Make presence more like watcher, add it here, and
378 // remove ForcePresenceRefresh.
379 s.watcher.StartSync()371 s.watcher.StartSync()
372 s.pwatcher.StartSync()
373}
374
375// Sync forces watchers to resynchronize their state with the
376// database immediately, and waits until all events are known.
377func (s *State) Sync() {
378 s.watcher.Sync()
379 s.pwatcher.Sync()
380}380}
381381
=== modified file 'mstate/unit.go'
--- mstate/unit.go 2012-09-06 16:54:22 +0000
+++ mstate/unit.go 2012-09-11 13:29:53 +0000
@@ -196,7 +196,10 @@
196 case UnitStopped:196 case UnitStopped:
197 return UnitStopped, "", nil197 return UnitStopped, "", nil
198 }198 }
199 alive := u.AgentAlive()199 alive, err := u.AgentAlive()
200 if err != nil {
201 return "", "", err
202 }
200 if !alive {203 if !alive {
201 s = UnitDown204 s = UnitDown
202 }205 }
@@ -222,35 +225,29 @@
222}225}
223226
224// AgentAlive returns whether the respective remote agent is alive.227// AgentAlive returns whether the respective remote agent is alive.
225func (u *Unit) AgentAlive() bool {228func (u *Unit) AgentAlive() (bool, error) {
226 return u.st.presencew.Alive(u.globalKey())229 return u.st.pwatcher.Alive(u.globalKey())
227}230}
228231
229// WaitAgentAlive blocks until the respective agent is alive.232// WaitAgentAlive blocks until the respective agent is alive.
230func (u *Unit) WaitAgentAlive(timeout time.Duration) error {233func (u *Unit) WaitAgentAlive(timeout time.Duration) (err error) {
234 defer trivial.ErrorContextf(&err, "waiting for agent of unit %q", u)
231 ch := make(chan presence.Change)235 ch := make(chan presence.Change)
232 u.st.presencew.Add(u.globalKey(), ch)236 u.st.pwatcher.Watch(u.globalKey(), ch)
233 defer u.st.presencew.Remove(u.globalKey(), ch)237 defer u.st.pwatcher.Unwatch(u.globalKey(), ch)
234 // Initial check.238 for i := 0; i < 2; i++ {
235 select {239 select {
236 case change := <-ch:240 case change := <-ch:
237 if change.Alive {241 if change.Alive {
238 return nil242 return nil
239 }243 }
240 case <-time.After(timeout):244 case <-time.After(timeout):
241 return fmt.Errorf("waiting for agent of unit %q: still not alive after timeout", u)245 return fmt.Errorf("still not alive after timeout")
242 }246 case <-u.st.pwatcher.Dying():
243 // Hasn't been alive, so now wait for change.247 return u.st.pwatcher.Err()
244 select {248 }
245 case change := <-ch:249 }
246 if change.Alive {250 panic(fmt.Sprintf("presence reported dead status twice in a row for unit %q", u))
247 return nil
248 }
249 panic(fmt.Sprintf("presence reported dead status twice in a row for unit %q", u))
250 case <-time.After(timeout):
251 return fmt.Errorf("waiting for agent of unit %q: still not alive after timeout", u)
252 }
253 panic("unreachable")
254}251}
255252
256// SetAgentAlive signals that the agent for unit u is alive. 253// SetAgentAlive signals that the agent for unit u is alive.
257254
=== modified file 'mstate/unit_test.go'
--- mstate/unit_test.go 2012-09-06 17:04:54 +0000
+++ mstate/unit_test.go 2012-09-11 13:29:53 +0000
@@ -90,7 +90,7 @@
90 c.Assert(p.Kill(), IsNil)90 c.Assert(p.Kill(), IsNil)
91 }()91 }()
9292
93 s.State.ForcePresenceRefresh()93 s.State.StartSync()
94 status, info, err = s.unit.Status()94 status, info, err = s.unit.Status()
95 c.Assert(err, IsNil)95 c.Assert(err, IsNil)
96 c.Assert(status, Equals, state.UnitStarted)96 c.Assert(status, Equals, state.UnitStarted)
@@ -105,7 +105,8 @@
105}105}
106106
107func (s *UnitSuite) TestUnitSetAgentAlive(c *C) {107func (s *UnitSuite) TestUnitSetAgentAlive(c *C) {
108 alive := s.unit.AgentAlive()108 alive, err := s.unit.AgentAlive()
109 c.Assert(err, IsNil)
109 c.Assert(alive, Equals, false)110 c.Assert(alive, Equals, false)
110111
111 pinger, err := s.unit.SetAgentAlive()112 pinger, err := s.unit.SetAgentAlive()
@@ -113,34 +114,38 @@
113 c.Assert(pinger, Not(IsNil))114 c.Assert(pinger, Not(IsNil))
114 defer pinger.Stop()115 defer pinger.Stop()
115116
116 s.State.ForcePresenceRefresh()117 s.State.Sync()
117 alive = s.unit.AgentAlive()118 alive, err = s.unit.AgentAlive()
119 c.Assert(err, IsNil)
118 c.Assert(alive, Equals, true)120 c.Assert(alive, Equals, true)
119}121}
120122
121func (s *UnitSuite) TestUnitWaitAgentAlive(c *C) {123func (s *UnitSuite) TestUnitWaitAgentAlive(c *C) {
122 timeout := 5 * time.Second124 timeout := 5 * time.Second
123 alive := s.unit.AgentAlive()125 alive, err := s.unit.AgentAlive()
126 c.Assert(err, IsNil)
124 c.Assert(alive, Equals, false)127 c.Assert(alive, Equals, false)
125128
126 err := s.unit.WaitAgentAlive(timeout)129 err = s.unit.WaitAgentAlive(timeout)
127 c.Assert(err, ErrorMatches, `waiting for agent of unit "wordpress/0": still not alive after timeout`)130 c.Assert(err, ErrorMatches, `waiting for agent of unit "wordpress/0": still not alive after timeout`)
128131
129 pinger, err := s.unit.SetAgentAlive()132 pinger, err := s.unit.SetAgentAlive()
130 c.Assert(err, IsNil)133 c.Assert(err, IsNil)
131134
132 s.State.ForcePresenceRefresh()135 s.State.StartSync()
133 err = s.unit.WaitAgentAlive(timeout)136 err = s.unit.WaitAgentAlive(timeout)
134 c.Assert(err, IsNil)137 c.Assert(err, IsNil)
135138
136 alive = s.unit.AgentAlive()139 alive, err = s.unit.AgentAlive()
140 c.Assert(err, IsNil)
137 c.Assert(alive, Equals, true)141 c.Assert(alive, Equals, true)
138142
139 err = pinger.Kill()143 err = pinger.Kill()
140 c.Assert(err, IsNil)144 c.Assert(err, IsNil)
141145
142 s.State.ForcePresenceRefresh()146 s.State.Sync()
143 alive = s.unit.AgentAlive()147 alive, err = s.unit.AgentAlive()
148 c.Assert(err, IsNil)
144 c.Assert(alive, Equals, false)149 c.Assert(alive, Equals, false)
145}150}
146151
147152
=== modified file 'mstate/watcher/watcher_test.go'
--- mstate/watcher/watcher_test.go 2012-09-10 16:19:57 +0000
+++ mstate/watcher/watcher_test.go 2012-09-11 13:29:53 +0000
@@ -440,9 +440,6 @@
440func (s *WatcherSuite) TestStartSync(c *C) {440func (s *WatcherSuite) TestStartSync(c *C) {
441 s.w.Watch("test", "a", -1, s.ch)441 s.w.Watch("test", "a", -1, s.ch)
442442
443 // Nothing to do here.
444 s.w.StartSync()
445
446 revno := s.insert(c, "test", "a")443 revno := s.insert(c, "test", "a")
447444
448 done := make(chan bool)445 done := make(chan bool)
@@ -456,7 +453,7 @@
456 select {453 select {
457 case <-done:454 case <-done:
458 case <-time.After(100 * time.Millisecond):455 case <-time.After(100 * time.Millisecond):
459 c.Fatalf("SyncStart failed to return")456 c.Fatalf("StartSync failed to return")
460 }457 }
461458
462 assertChange(c, s.ch, watcher.Change{"test", "a", revno})459 assertChange(c, s.ch, watcher.Change{"test", "a", revno})
@@ -479,7 +476,7 @@
479 select {476 select {
480 case <-done:477 case <-done:
481 c.Fatalf("Sync returned too early")478 c.Fatalf("Sync returned too early")
482 case <-time.After(500 * time.Millisecond):479 case <-time.After(200 * time.Millisecond):
483 }480 }
484481
485 assertChange(c, s.ch, watcher.Change{"test", "a", revno})482 assertChange(c, s.ch, watcher.Change{"test", "a", revno})

Subscribers

People subscribed via source and target branches