Merge lp:~themue/juju-core/go-state-service-relation-watcher into lp:~juju/juju-core/trunk

Proposed by Frank Mueller on 2012-08-20
Status: Work in progress
Proposed branch: lp:~themue/juju-core/go-state-service-relation-watcher
Merge into: lp:~juju/juju-core/trunk
Prerequisite: lp:~themue/juju-core/go-state-lifecycle-watcher
Diff against target: 333 lines (+193/-59)
2 files modified
state/service_test.go (+11/-9)
state/watcher.go (+182/-50)
To merge this branch: bzr merge lp:~themue/juju-core/go-state-service-relation-watcher
Reviewer Review Type Date Requested Status
The Go Language Gophers 2012-08-20 Pending
Review via email: mp+120407@code.launchpad.net

Description of the change

state: reimplemented service relation watcher

The first release of the service relation watcher observed
changes of the topology and returned added and removed relations.
This changes with the introduction of the lifecycle into state.
The first changed entity is the relation. So the new service
relation watcher now has to observe the lifecycles of the
relations too.

https://codereview.appspot.com/6462083/

To post a comment you must log in.
363. By Frank Mueller on 2012-08-21

state: merged trunk to stay up-to-date

Unmerged revisions

363. By Frank Mueller on 2012-08-21

state: merged trunk to stay up-to-date

362. By Frank Mueller on 2012-08-20

state: some cleanup and commenting before propose

361. By Frank Mueller on 2012-08-20

state: reimplemented the service relation watcher to handle lifecycle changes

360. By Frank Mueller on 2012-08-16

state: adopted lifecycle changes to watcher

359. By Frank Mueller on 2012-08-10

state: changes after reviews

358. By Frank Mueller on 2012-08-09

state: added life watcher

357. By Frank Mueller on 2012-08-08

state: changed lifecycle after review, now uses RetryChange and is embedable

356. By Frank Mueller on 2012-08-06

state: merged trunk and worked off the review comments

355. By Frank Mueller on 2012-07-31

state: first lifecycle utils and implementation for relation

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'state/service_test.go'
2--- state/service_test.go 2012-08-21 09:39:19 +0000
3+++ state/service_test.go 2012-08-21 09:39:19 +0000
4@@ -398,16 +398,17 @@
5 w := s.service.WatchRelations()
6
7 // Check initial event, and lack of followup.
8- assertChange := func(adds, removes []int) {
9+ assertChange := func(alive, dying, dead []int) {
10 select {
11 case change := <-w.Changes():
12- assertRelationIds(c, change.Added, adds)
13- assertRelationIds(c, change.Removed, removes)
14+ assertRelationIds(c, change.Alive, alive)
15+ assertRelationIds(c, change.Dying, dying)
16+ assertRelationIds(c, change.Dead, dead)
17 case <-time.After(200 * time.Millisecond):
18 c.Fatalf("expected change, got nothing")
19 }
20 }
21- assertChange(nil, nil)
22+ // assertChange(nil, nil, nil)
23 assertNoChange := func() {
24 select {
25 case change := <-w.Changes():
26@@ -429,24 +430,25 @@
27 wp1ep := state.RelationEndpoint{"wp1", "ifce", "bar", state.RoleRequirer, charm.ScopeGlobal}
28 rel, err := s.State.AddRelation(mysqlep, wp1ep)
29 c.Assert(err, IsNil)
30- assertChange([]int{0}, nil)
31+ assertChange([]int{0}, nil, nil)
32 assertNoChange()
33
34 // Add another relation; check change.
35 wp2ep := state.RelationEndpoint{"wp2", "ifce", "baz", state.RoleRequirer, charm.ScopeGlobal}
36 _, err = s.State.AddRelation(mysqlep, wp2ep)
37 c.Assert(err, IsNil)
38- assertChange([]int{1}, nil)
39+ assertChange([]int{1}, nil, nil)
40 assertNoChange()
41
42 // Remove one of the relations; check change.
43 err = rel.Kill()
44 c.Assert(err, IsNil)
45+ assertChange(nil, []int{0}, nil)
46 err = rel.Die()
47 c.Assert(err, IsNil)
48 err = s.State.RemoveRelation(rel)
49 c.Assert(err, IsNil)
50- assertChange(nil, []int{0})
51+ assertChange(nil, nil, []int{0})
52 assertNoChange()
53
54 // Stop watcher; check change chan is closed.
55@@ -457,7 +459,7 @@
56 case _, ok := <-w.Changes():
57 c.Assert(ok, Equals, false)
58 default:
59- c.Fatalf("Changes not closed")
60+ c.Fatalf("changes not closed")
61 }
62 }
63 assertClosed()
64@@ -466,7 +468,7 @@
65 rel, err = s.State.AddRelation(mysqlep, wp1ep)
66 c.Assert(err, IsNil)
67 w = s.service.WatchRelations()
68- assertChange([]int{1, 2}, nil)
69+ assertChange([]int{1, 2}, nil, nil)
70 assertNoChange()
71
72 // Stop new watcher; check change chan is closed.
73
74=== modified file 'state/watcher.go'
75--- state/watcher.go 2012-08-21 09:39:19 +0000
76+++ state/watcher.go 2012-08-21 09:39:19 +0000
77@@ -737,74 +737,206 @@
78
79 // ServiceRelationsWatcher notifies of changes to a service's relations.
80 type ServiceRelationsWatcher struct {
81- contentWatcher
82- changeChan chan RelationsChange
83+ tomb tomb.Tomb
84+ changeChan chan RelationChange
85 service *Service
86- current map[string]*Relation
87-}
88-
89-type RelationsChange struct {
90- Added, Removed []*Relation
91+ topologyw *watcher.ContentWatcher
92+ lifes map[string]*relationLife
93+ lifeChange chan relationLifeChange
94+}
95+
96+type RelationChange struct {
97+ Alive, Dying, Dead []*Relation
98+}
99+
100+type initialRelationState struct {
101+ relation *Relation
102+ life Life
103 }
104
105 // newServiceRelationsWatcher creates and starts a new service relations watcher.
106 func newServiceRelationsWatcher(s *Service) *ServiceRelationsWatcher {
107 w := &ServiceRelationsWatcher{
108- contentWatcher: newContentWatcher(s.st, zkTopologyPath),
109- changeChan: make(chan RelationsChange),
110- service: s,
111- current: make(map[string]*Relation),
112+ changeChan: make(chan RelationChange),
113+ service: s,
114+ topologyw: watcher.NewContentWatcher(s.st.zk, zkTopologyPath),
115+ lifes: make(map[string]*relationLife),
116+ lifeChange: make(chan relationLifeChange),
117 }
118- go w.loop(w)
119+ go w.loop()
120 return w
121 }
122
123 // Changes returns a channel that will receive changes when
124 // the service enters and leaves relations.
125-// The Added field in the first event on the channel holds the initial
126-// state, corresponding to that returned by service.Relations.
127-func (w *ServiceRelationsWatcher) Changes() <-chan RelationsChange {
128+func (w *ServiceRelationsWatcher) Changes() <-chan RelationChange {
129 return w.changeChan
130 }
131
132-func (w *ServiceRelationsWatcher) update(change watcher.ContentChange) error {
133- t, err := parseTopology(change.Content)
134- if err != nil {
135- return err
136- }
137- relations, err := w.service.relationsFromTopology(t)
138- if err != nil {
139- return err
140- }
141- latest := map[string]*Relation{}
142- for _, rel := range relations {
143- latest[rel.key] = rel
144- }
145- ch := RelationsChange{}
146- for key, rel := range latest {
147- if w.current[key] == nil {
148- ch.Added = append(ch.Added, rel)
149- }
150- }
151- for key, rel := range w.current {
152- if latest[key] == nil {
153- ch.Removed = append(ch.Removed, rel)
154- }
155- }
156- if w.updated && len(ch.Added) == 0 && len(ch.Removed) == 0 {
157- return nil
158- }
159- select {
160- case <-w.tomb.Dying():
161- return tomb.ErrDying
162- case w.changeChan <- ch:
163- w.current = latest
164- }
165- return nil
166+func (w *ServiceRelationsWatcher) loop() {
167+ defer w.finish()
168+ initialized := false
169+ initial := make(map[string]initialRelationState)
170+
171+ for {
172+ select {
173+ case <-w.tomb.Dying():
174+ return
175+ case chg, ok := <-w.topologyw.Changes():
176+ // Topology changed.
177+ if !ok {
178+ w.tomb.Kill(watcher.MustErr(w.topologyw))
179+ return
180+ }
181+ t, err := parseTopology(chg.Content)
182+ if err != nil {
183+ w.tomb.Kill(err)
184+ return
185+ }
186+ relations, err := w.service.relationsFromTopology(t)
187+ if err != nil {
188+ w.tomb.Kill(err)
189+ return
190+ }
191+ current := make(map[string]*Relation)
192+ for _, r := range relations {
193+ current[r.key] = r
194+ if w.lifes[r.key] == nil {
195+ rl := newRelationLife(w, r.key)
196+ w.lifes[r.key] = rl
197+ if !initialized {
198+ life, ok := <-rl.watcher.Changes()
199+ if !ok {
200+ w.tomb.Kill(watcher.MustErr(rl.watcher))
201+ return
202+ }
203+ initial[r.key] = initialRelationState{r, life}
204+ }
205+ go rl.watchLoop()
206+ }
207+ }
208+ if !initialized && len(initial) > 0 {
209+ // Prepare and send initial state.
210+ var rchg RelationChange
211+ for _, irs := range initial {
212+ switch irs.life {
213+ case Alive:
214+ rchg.Alive = append(rchg.Alive, irs.relation)
215+ case Dying:
216+ rchg.Alive = append(rchg.Dying, irs.relation)
217+ case Dead:
218+ rchg.Alive = append(rchg.Dead, irs.relation)
219+ }
220+ }
221+ select {
222+ case w.changeChan <- rchg:
223+ case <-w.tomb.Dying():
224+ w.tomb.Kill(tomb.ErrDying)
225+ return
226+ }
227+ initialized = true
228+ }
229+ for key, rl := range w.lifes {
230+ if current[key] == nil {
231+ if err := rl.stopWatch(); err != nil {
232+ w.tomb.Kill(err)
233+ }
234+ delete(w.lifes, key)
235+ }
236+ }
237+ case chg := <-w.lifeChange:
238+ // Relation lifecycle change.
239+ var rchg RelationChange
240+ switch chg.life {
241+ case Alive:
242+ rchg.Alive = []*Relation{chg.relation}
243+ case Dying:
244+ rchg.Dying = []*Relation{chg.relation}
245+ case Dead:
246+ rchg.Dead = []*Relation{chg.relation}
247+ }
248+ select {
249+ case w.changeChan <- rchg:
250+ case <-w.tomb.Dying():
251+ w.tomb.Kill(tomb.ErrDying)
252+ return
253+ }
254+ }
255+ }
256 }
257
258-func (w *ServiceRelationsWatcher) done() {
259+// finishes cleans up when the watcher is stopping.
260+func (w *ServiceRelationsWatcher) finish() {
261 close(w.changeChan)
262+ for _, rl := range w.lifes {
263+ w.tomb.Kill(rl.stopWatch())
264+ }
265+ watcher.Stop(w.topologyw, &w.tomb)
266+ w.tomb.Done()
267+}
268+
269+// Stop stops the watcher and returns any errors encountered while watching.
270+func (w *ServiceRelationsWatcher) Stop() error {
271+ w.tomb.Kill(nil)
272+ return w.tomb.Wait()
273+}
274+
275+// Err returns any error encountered while stopping the watcher, or
276+// tome.ErrStillAlive if the watcher is still running.
277+func (w *ServiceRelationsWatcher) Err() error {
278+ return w.tomb.Err()
279+}
280+
281+// relationLife holds relations data and watches for lifecycle changes.
282+type relationLife struct {
283+ tomb tomb.Tomb
284+ srw *ServiceRelationsWatcher
285+ relation *Relation
286+ watcher *LifeWatcher
287+}
288+
289+type relationLifeChange struct {
290+ relation *Relation
291+ life Life
292+}
293+
294+// newRelationLife returns the relation data and starts the life watcher.
295+func newRelationLife(srw *ServiceRelationsWatcher, key string) *relationLife {
296+ r := newRelation(srw.service.st, key)
297+ rl := &relationLife{
298+ srw: srw,
299+ relation: r,
300+ watcher: r.WatchLife(),
301+ }
302+ return rl
303+}
304+
305+// watchLoop watches the relations life for changes.
306+func (rl *relationLife) watchLoop() {
307+ defer rl.tomb.Done()
308+ defer rl.watcher.Stop()
309+ for {
310+ select {
311+ case <-rl.tomb.Dying():
312+ return
313+ case change, ok := <-rl.watcher.Changes():
314+ if !ok {
315+ rl.srw.tomb.Kill(watcher.MustErr(rl.watcher))
316+ return
317+ }
318+ select {
319+ case rl.srw.lifeChange <- relationLifeChange{rl.relation, change}:
320+ case <-rl.tomb.Dying():
321+ return
322+ }
323+ }
324+ }
325+}
326+
327+// stopWatch stops the relation watching.
328+func (rl *relationLife) stopWatch() error {
329+ rl.tomb.Kill(nil)
330+ return rl.tomb.Wait()
331 }
332
333 // RelationUnitsWatcher watches the presence and settings of units

Subscribers

People subscribed via source and target branches