Merge lp:~themue/juju-core/go-worker-firewaller-machineunits into lp:~juju/juju-core/trunk

Proposed by Frank Mueller
Status: Merged
Approved by: Gustavo Niemeyer
Approved revision: 307
Merged at revision: 327
Proposed branch: lp:~themue/juju-core/go-worker-firewaller-machineunits
Merge into: lp:~juju/juju-core/trunk
Prerequisite: lp:~themue/juju-core/go-worker-firewaller-machines
Diff against target: 383 lines (+212/-90)
2 files modified
worker/firewaller/firewaller.go (+164/-84)
worker/firewaller/firewaller_test.go (+48/-6)
To merge this branch: bzr merge lp:~themue/juju-core/go-worker-firewaller-machineunits
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+115167@code.launchpad.net

Description of the change

firewaller: added handling of machine units changes

The second iteration of the firewaller adds the handling of
changes of the machine units. Here the adding of services is
missing and will be one of the next steps. The next step
itself will be the handling of unit ports changes.

https://codereview.appspot.com/6404051/

To post a comment you must log in.
Revision history for this message
Roger Peppe (rogpeppe) wrote :

LGTM

https://codereview.appspot.com/6404051/diff/1/worker/firewaller/firewaller.go
File worker/firewaller/firewaller.go (right):

https://codereview.appspot.com/6404051/diff/1/worker/firewaller/firewaller.go#newcode101
worker/firewaller/firewaller.go:101: if change.stateChange != nil {
when could change.stateChange be nil?

https://codereview.appspot.com/6404051/diff/1/worker/firewaller/firewaller.go#newcode153
worker/firewaller/firewaller.go:153: func (m *machine) loop(fw
*Firewaller) {
perhaps a comment on this function?

// loop waits for changes to the machine's units
// and funnels them to the firewall's machineUnitsChanges
// channel.

https://codereview.appspot.com/6404051/

Revision history for this message
William Reade (fwereade) wrote :

Essentially, same issues as the prereq.

https://codereview.appspot.com/6404051/diff/1/worker/firewaller/firewaller.go
File worker/firewaller/firewaller.go (right):

https://codereview.appspot.com/6404051/diff/1/worker/firewaller/firewaller.go#newcode98
worker/firewaller/firewaller.go:98: fw.tomb.Killf("aggregation of
machine units changes failed")
This maybe also feels like a panic situation, now I come to look at it.
(I know we don't panic in every analogous situation, but maybe we
should: closure of a locally-owned channel tat should stay open until
the method exits feels like solid evidence of fundamental logic error to
me)

https://codereview.appspot.com/6404051/diff/1/worker/firewaller/firewaller.go#newcode106
worker/firewaller/firewaller.go:106: // a not yet managed unit?
+1

https://codereview.appspot.com/6404051/diff/1/worker/firewaller/firewaller.go#newcode108
worker/firewaller/firewaller.go:108: u.watcher.Stop()
Doesn't guarantee we won't get more events on unitPortsChanges (see
previous review).

https://codereview.appspot.com/6404051/diff/1/worker/firewaller/firewaller.go#newcode198
worker/firewaller/firewaller.go:198: return
fw.tomb.Kill(watcher.MustErr(u.watcher))

(uncertainty as in previous review)

https://codereview.appspot.com/6404051/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

There are unhandled reviews on this. I'll wait for the next round.

https://codereview.appspot.com/6404051/

Revision history for this message
Roger Peppe (rogpeppe) wrote :

LGTM

https://codereview.appspot.com/6404051/diff/11057/worker/firewaller/firewaller.go
File worker/firewaller/firewaller.go (right):

https://codereview.appspot.com/6404051/diff/11057/worker/firewaller/firewaller.go#newcode64
worker/firewaller/firewaller.go:64: case change, ok :=
<-fw.machineUnitsChanges:
we never close machineUnitsChanges (trivially verifiable), so why not
just:

case change := <-fw.machineUnitsChanges:

and lose the test?

https://codereview.appspot.com/6404051/diff/11057/worker/firewaller/firewaller.go#newcode69
worker/firewaller/firewaller.go:69: log.Printf("tracker of machine %d
terminated prematurely", change.machine.id)
we're losing the error message from the machine tracker termination
here. how about:
log.Printf("tracker of machine %d terminated prematurely: %v",
change.machine.stop())
?

https://codereview.appspot.com/6404051/diff/11057/worker/firewaller/firewaller.go#newcode136
worker/firewaller/firewaller.go:136: // newMachineTracker creates a new
machine tracker keeping track of
it occurs to me that this thing doesn't actually keep track of anything
- it simply forwards machine units changes to
the central goroutine.

perhaps:
// newMachineTracker tracks changes unit changes to
// the given machine and sends them to the
// central firewaller loop.

?

https://codereview.appspot.com/6404051/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (4.0 KiB)

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go
File worker/firewaller/firewaller.go (right):

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode16
worker/firewaller/firewaller.go:16: machines
map[int]*machineTracker
s/machines/machineTrackers

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode18
worker/firewaller/firewaller.go:18: units
map[string]*unitTracker
s/units/unitTrackers/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode20
worker/firewaller/firewaller.go:20: services
map[string]*serviceTracker
s/services/serviceTrackers

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode48
worker/firewaller/firewaller.go:48: for _, removedMachine := range
change.Removed {
s/removedMachine/machine/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode49
worker/firewaller/firewaller.go:49: mt, ok :=
fw.machines[removedMachine.Id()]
s/mt/machinet/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode59
worker/firewaller/firewaller.go:59: for _, addedMachine := range
change.Added {
s/addedMachine/machine/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode60
worker/firewaller/firewaller.go:60: mt :=
newMachineTracker(addedMachine, fw)
s/mt/machinet/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode65
worker/firewaller/firewaller.go:65: if change.change == nil {
As we talked, this seems to just create extra churn. I think we can drop
it, and change the sending side so it never sends nil stuff.

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode70
worker/firewaller/firewaller.go:70: for _, removedUnit := range
change.change.Removed {
s/removedUnit/unit/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode71
worker/firewaller/firewaller.go:71: ut, ok :=
fw.units[removedUnit.Name()]
s/ut/unitt/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode79
worker/firewaller/firewaller.go:79: log.Debugf("firewaller: stopped
tracking unit %s", removedUnit.Name())
Shouldn't the unit ports in this machine be closed here?

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode81
worker/firewaller/firewaller.go:81: for _, addedUnit := range
change.change.Added {
s/addedUnit/unit/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode82
worker/firewaller/firewaller.go:82: ut := newUnitTracker(addedUnit, fw)
s/ut/unitt/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode120
worker/firewaller/firewaller.go:120: machine *machineTracker
s/machine/machinet/

https://codereview.appspot.com/6404051/diff/13004/worker/firewaller/firewaller.go#newcode121
worker/firewaller/firewaller.go:121: change *state.MachineUnitsChange
s/change //; so that you can use the fields of the Change type directly
rather than doing change...

Read more...

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

LGTM

https://codereview.appspot.com/6404051/diff/10058/worker/firewaller/firewaller.go
File worker/firewaller/firewaller.go (right):

https://codereview.appspot.com/6404051/diff/10058/worker/firewaller/firewaller.go#newcode16
worker/firewaller/firewaller.go:16: machineDatas
map[int]*machineData
As discussed: machineds, unitds, serviceds

https://codereview.appspot.com/6404051/diff/10058/worker/firewaller/firewaller.go#newcode17
worker/firewaller/firewaller.go:17: machineUnitsChanges chan
*machineUnitsChange
s/machineUnitsChanges/unitsChange/ (both in the type name and in the
field name)

https://codereview.appspot.com/6404051/diff/10058/worker/firewaller/firewaller.go#newcode19
worker/firewaller/firewaller.go:19: unitPortsChanges chan
*unitPortsChange
s/unitPortsChanges/portsChange/ (both in the type and in the field name)

https://codereview.appspot.com/6404051/diff/10058/worker/firewaller/firewaller.go#newcode73
worker/firewaller/firewaller.go:73: log.Printf("unit data %s returned
error when stopping: %v", unit.Name(), err)
s/%s/%q/
s/data/watcher/

https://codereview.appspot.com/6404051/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
308. By Frank Mueller

firewaller: forgot two name changes

309. By Frank Mueller

firewaller: merged trunk

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'worker/firewaller/firewaller.go'
2--- worker/firewaller/firewaller.go 2012-07-19 11:16:40 +0000
3+++ worker/firewaller/firewaller.go 2012-07-25 08:25:37 +0000
4@@ -10,20 +10,26 @@
5 // Firewaller watches the state for ports opened or closed
6 // and reflects those changes onto the backing environment.
7 type Firewaller struct {
8- st *state.State
9- tomb tomb.Tomb
10- machinesWatcher *state.MachinesWatcher
11- machines map[int]*machineTracker
12- machineUnitsChanges chan *machineUnitsChange
13+ st *state.State
14+ tomb tomb.Tomb
15+ machinesWatcher *state.MachinesWatcher
16+ machineds map[int]*machineData
17+ unitsChange chan *machineUnitsChange
18+ unitds map[string]*unitData
19+ portsChange chan *unitPortsChange
20+ serviceds map[string]*serviceData
21 }
22
23 // NewFirewaller returns a new Firewaller.
24 func NewFirewaller(st *state.State) (*Firewaller, error) {
25 fw := &Firewaller{
26- st: st,
27- machinesWatcher: st.WatchMachines(),
28- machines: make(map[int]*machineTracker),
29- machineUnitsChanges: make(chan *machineUnitsChange),
30+ st: st,
31+ machinesWatcher: st.WatchMachines(),
32+ machineds: make(map[int]*machineData),
33+ unitsChange: make(chan *machineUnitsChange),
34+ unitds: make(map[string]*unitData),
35+ portsChange: make(chan *unitPortsChange),
36+ serviceds: make(map[string]*serviceData),
37 }
38 go fw.loop()
39 return fw, nil
40@@ -39,24 +45,45 @@
41 if !ok {
42 return
43 }
44- for _, removedMachine := range change.Removed {
45- m, ok := fw.machines[removedMachine.Id()]
46+ for _, machine := range change.Removed {
47+ machined, ok := fw.machineds[machine.Id()]
48 if !ok {
49 panic("trying to remove machine that wasn't added")
50 }
51- delete(fw.machines, removedMachine.Id())
52- if err := m.stop(); err != nil {
53- log.Printf("machine tracker %d returned error when stopping: %v", removedMachine.Id(), err)
54- }
55- log.Debugf("firewaller: stopped tracking machine %d", removedMachine.Id())
56- }
57- for _, addedMachine := range change.Added {
58- m := newMachineTracker(addedMachine, fw)
59- fw.machines[addedMachine.Id()] = m
60- log.Debugf("firewaller: started tracking machine %d", m.id)
61- }
62- case <-fw.machineUnitsChanges:
63- // TODO(mue) fill with life.
64+ delete(fw.machineds, machine.Id())
65+ if err := machined.stopWatch(); err != nil {
66+ log.Printf("machine data %d returned error when stopping: %v", machine.Id(), err)
67+ }
68+ log.Debugf("firewaller: stopped watching machine %d", machine.Id())
69+ }
70+ for _, machine := range change.Added {
71+ machined := newMachineData(machine, fw)
72+ fw.machineds[machine.Id()] = machined
73+ log.Debugf("firewaller: started watching machine %d", machine.Id())
74+ }
75+ case change := <-fw.unitsChange:
76+ for _, unit := range change.Removed {
77+ unitd, ok := fw.unitds[unit.Name()]
78+ if !ok {
79+ panic("trying to remove unit that wasn't added")
80+ }
81+ delete(fw.unitds, unit.Name())
82+ // TODO(mue) Close ports.
83+ if err := unitd.stopWatch(); err != nil {
84+ log.Printf("unit watcher %q returned error when stopping: %v", unit.Name(), err)
85+ }
86+ log.Debugf("firewaller: stopped watching unit %s", unit.Name())
87+ }
88+ for _, unit := range change.Added {
89+ unitd := newUnitData(unit, fw)
90+ fw.unitds[unit.Name()] = unitd
91+ if fw.serviceds[unit.ServiceName()] == nil {
92+ // TODO(mue) Add service watcher.
93+ }
94+ log.Debugf("firewaller: started watching unit %s", unit.Name())
95+ }
96+ case <-fw.portsChange:
97+ // TODO(mue) Handle changes of ports.
98 }
99 }
100 }
101@@ -64,8 +91,11 @@
102 // finishes cleans up when the firewaller is stopping.
103 func (fw *Firewaller) finish() {
104 watcher.Stop(fw.machinesWatcher, &fw.tomb)
105- for _, m := range fw.machines {
106- fw.tomb.Kill(m.stop())
107+ for _, unitd := range fw.unitds {
108+ fw.tomb.Kill(unitd.stopWatch())
109+ }
110+ for _, machined := range fw.machineds {
111+ fw.tomb.Kill(machined.stopWatch())
112 }
113 fw.tomb.Done()
114 }
115@@ -83,68 +113,118 @@
116
117 // machineUnitsChange contains the changed units for one specific machine.
118 type machineUnitsChange struct {
119- machine *machineTracker
120- change *state.MachineUnitsChange
121+ machined *machineData
122+ *state.MachineUnitsChange
123 }
124
125-// machineTracker keeps track of the unit changes of a machine.
126-type machineTracker struct {
127+// machineData watches the unit changes of a machine and passes them
128+// to the firewaller for handling.
129+type machineData struct {
130 tomb tomb.Tomb
131 firewaller *Firewaller
132- id int
133+ machine *state.Machine
134 watcher *state.MachineUnitsWatcher
135- ports map[state.Port]*unitTracker
136-}
137-
138-// newMachineTracker creates a new machine tracker keeping track of
139-// unit changes of the passed machine.
140-func newMachineTracker(mst *state.Machine, fw *Firewaller) *machineTracker {
141- mt := &machineTracker{
142- firewaller: fw,
143- id: mst.Id(),
144- watcher: mst.WatchUnits(),
145- ports: make(map[state.Port]*unitTracker),
146- }
147- go mt.loop()
148- return mt
149-}
150-
151-// loop is the backend watching for machine units changes.
152-func (mt *machineTracker) loop() {
153- defer mt.tomb.Done()
154- defer mt.watcher.Stop()
155- for {
156- select {
157- case <-mt.tomb.Dying():
158- return
159- case change, ok := <-mt.watcher.Changes():
160- // Send change or nil.
161- select {
162- case mt.firewaller.machineUnitsChanges <- &machineUnitsChange{mt, change}:
163- case <-mt.tomb.Dying():
164- return
165- }
166- // The watcher terminated prematurely, so end the loop.
167- if !ok {
168- mt.firewaller.tomb.Kill(watcher.MustErr(mt.watcher))
169- return
170- }
171- }
172- }
173-}
174-
175-// stop stops the machine tracker.
176-func (mt *machineTracker) stop() error {
177- mt.tomb.Kill(nil)
178- return mt.tomb.Wait()
179-}
180-
181-type unitTracker struct {
182- service *serviceTracker
183- id string
184- ports []state.Port
185-}
186-
187-type serviceTracker struct {
188+}
189+
190+// newMachineData starts the watching of the passed machine.
191+func newMachineData(machine *state.Machine, fw *Firewaller) *machineData {
192+ md := &machineData{
193+ firewaller: fw,
194+ machine: machine,
195+ watcher: machine.WatchUnits(),
196+ }
197+ go md.watchLoop()
198+ return md
199+}
200+
201+// watchLoop is the backend watching for machine units changes.
202+func (md *machineData) watchLoop() {
203+ defer md.tomb.Done()
204+ defer md.watcher.Stop()
205+ for {
206+ select {
207+ case <-md.tomb.Dying():
208+ return
209+ case change, ok := <-md.watcher.Changes():
210+ if !ok {
211+ md.firewaller.tomb.Kill(watcher.MustErr(md.watcher))
212+ return
213+ }
214+ select {
215+ case md.firewaller.unitsChange <- &machineUnitsChange{md, change}:
216+ case <-md.tomb.Dying():
217+ return
218+ }
219+ }
220+ }
221+}
222+
223+// stopWatch stops the machine watching.
224+func (md *machineData) stopWatch() error {
225+ md.tomb.Kill(nil)
226+ return md.tomb.Wait()
227+}
228+
229+// unitPortsChange contains the changed ports for one specific unit.
230+type unitPortsChange struct {
231+ unitd *unitData
232+ ports []state.Port
233+}
234+
235+// unitData watches the port changes of a unit and passes them
236+// to the firewaller for handling.
237+type unitData struct {
238+ tomb tomb.Tomb
239+ firewaller *Firewaller
240+ unit *state.Unit
241+ watcher *state.PortsWatcher
242+ service *serviceData
243+ ports []state.Port
244+}
245+
246+// newMachineData starts the watching of the passed unit.
247+func newUnitData(unit *state.Unit, fw *Firewaller) *unitData {
248+ ud := &unitData{
249+ firewaller: fw,
250+ unit: unit,
251+ watcher: unit.WatchPorts(),
252+ ports: make([]state.Port, 0),
253+ }
254+ go ud.watchLoop()
255+ return ud
256+}
257+
258+func (ud *unitData) watchLoop() {
259+ defer ud.tomb.Done()
260+ defer ud.watcher.Stop()
261+ for {
262+ select {
263+ case <-ud.tomb.Dying():
264+ return
265+ case change, ok := <-ud.watcher.Changes():
266+ if !ok {
267+ ud.firewaller.tomb.Kill(watcher.MustErr(ud.watcher))
268+ return
269+ }
270+ select {
271+ case ud.firewaller.portsChange <- &unitPortsChange{ud, change}:
272+ case <-ud.tomb.Dying():
273+ return
274+ }
275+ }
276+ }
277+}
278+
279+// stopWatch stops the unit watching.
280+func (ud *unitData) stopWatch() error {
281+ ud.tomb.Kill(nil)
282+ return ud.tomb.Wait()
283+}
284+
285+// serviceData watches the exposed flag changes of a service and passes them
286+// to the firewaller for handling.
287+type serviceData struct {
288+ // TODO(mue) Fill with life.
289+ service *state.Service
290 exposed bool
291 }
292
293=== modified file 'worker/firewaller/firewaller_test.go'
294--- worker/firewaller/firewaller_test.go 2012-07-19 11:16:40 +0000
295+++ worker/firewaller/firewaller_test.go 2012-07-25 08:25:37 +0000
296@@ -5,6 +5,7 @@
297 . "launchpad.net/gocheck"
298 "launchpad.net/juju-core/environs/dummy"
299 "launchpad.net/juju-core/log"
300+ "launchpad.net/juju-core/state"
301 "launchpad.net/juju-core/state/testing"
302 coretesting "launchpad.net/juju-core/testing"
303 "launchpad.net/juju-core/worker/firewaller"
304@@ -58,7 +59,7 @@
305 case e := <-logHook.event:
306 got = append(got, e)
307 case <-time.After(500 * time.Millisecond):
308- c.Fatalf("expected %q; timed out after %q", expect, got)
309+ c.Fatalf("expected %q; timed out, got %q", expect, got)
310 }
311 }
312 select {
313@@ -75,7 +76,8 @@
314 type FirewallerSuite struct {
315 coretesting.LoggingSuite
316 testing.StateSuite
317- op <-chan dummy.Operation
318+ op <-chan dummy.Operation
319+ charm *state.Charm
320 }
321
322 var _ = Suite(&FirewallerSuite{})
323@@ -116,16 +118,56 @@
324 c.Assert(err, IsNil)
325
326 assertEvents(c, []string{
327- fmt.Sprint("started tracking machine ", m1.Id()),
328- fmt.Sprint("started tracking machine ", m2.Id()),
329- fmt.Sprint("started tracking machine ", m3.Id()),
330+ fmt.Sprint("started watching machine ", m1.Id()),
331+ fmt.Sprint("started watching machine ", m2.Id()),
332+ fmt.Sprint("started watching machine ", m3.Id()),
333 })
334
335 err = s.State.RemoveMachine(m2.Id())
336 c.Assert(err, IsNil)
337
338 assertEvents(c, []string{
339- fmt.Sprint("stopped tracking machine ", m2.Id()),
340+ fmt.Sprint("stopped watching machine ", m2.Id()),
341+ })
342+
343+ c.Assert(fw.Stop(), IsNil)
344+}
345+
346+func (s *FirewallerSuite) TestAssignUnassignUnit(c *C) {
347+ fw, err := firewaller.NewFirewaller(s.State)
348+ c.Assert(err, IsNil)
349+
350+ setUpLogHook()
351+ defer tearDownLogHook()
352+
353+ m1, err := s.State.AddMachine()
354+ c.Assert(err, IsNil)
355+ m2, err := s.State.AddMachine()
356+ c.Assert(err, IsNil)
357+ s.charm = s.AddTestingCharm(c, "dummy")
358+ s1, err := s.State.AddService("wordpress", s.charm)
359+ c.Assert(err, IsNil)
360+ u1, err := s1.AddUnit()
361+ c.Assert(err, IsNil)
362+ err = u1.AssignToMachine(m1)
363+ c.Assert(err, IsNil)
364+ u2, err := s1.AddUnit()
365+ c.Assert(err, IsNil)
366+ err = u2.AssignToMachine(m2)
367+ c.Assert(err, IsNil)
368+
369+ assertEvents(c, []string{
370+ fmt.Sprint("started watching machine ", m1.Id()),
371+ fmt.Sprint("started watching machine ", m2.Id()),
372+ fmt.Sprint("started watching unit ", u1.Name()),
373+ fmt.Sprint("started watching unit ", u2.Name()),
374+ })
375+
376+ err = u1.UnassignFromMachine()
377+ c.Assert(err, IsNil)
378+
379+ assertEvents(c, []string{
380+ fmt.Sprint("stopped watching unit ", u1.Name()),
381 })
382
383 c.Assert(fw.Stop(), IsNil)

Subscribers

People subscribed via source and target branches