Merge lp:~themue/juju-core/go-worker-firewaller-machineunits into lp:~juju/juju-core/trunk
- go-worker-firewaller-machineunits
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Gustavo Niemeyer |
Approved revision: | 307 |
Merged at revision: | 327 |
Proposed branch: | lp:~themue/juju-core/go-worker-firewaller-machineunits |
Merge into: | lp:~juju/juju-core/trunk |
Prerequisite: | lp:~themue/juju-core/go-worker-firewaller-machines |
Diff against target: |
383 lines (+212/-90) 2 files modified
worker/firewaller/firewaller.go (+164/-84) worker/firewaller/firewaller_test.go (+48/-6) |
To merge this branch: | bzr merge lp:~themue/juju-core/go-worker-firewaller-machineunits |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
The Go Language Gophers | Pending | ||
Review via email: mp+115167@code.launchpad.net |
Commit message
Description of the change
firewaller: added handling of machine units changes
The second iteration of the firewaller adds the handling of
changes of the machine units. Here the adding of services is
missing and will be one of the next steps. The next step
itself will be the handling of unit ports changes.
Roger Peppe (rogpeppe) wrote : | # |
William Reade (fwereade) wrote : | # |
Essentially, same issues as the prereq.
https:/
File worker/
https:/
worker/
machine units changes failed")
This maybe also feels like a panic situation, now I come to look at it.
(I know we don't panic in every analogous situation, but maybe we
should: closure of a locally-owned channel tat should stay open until
the method exits feels like solid evidence of fundamental logic error to
me)
https:/
worker/
+1
https:/
worker/
Doesn't guarantee we won't get more events on unitPortsChanges (see
previous review).
https:/
worker/
fw.tomb.
(uncertainty as in previous review)
Gustavo Niemeyer (niemeyer) wrote : | # |
There are unhandled reviews on this. I'll wait for the next round.
Roger Peppe (rogpeppe) wrote : | # |
LGTM
https:/
File worker/
https:/
worker/
<-fw.machineUni
we never close machineUnitsChanges (trivially verifiable), so why not
just:
case change := <-fw.machineUni
and lose the test?
https:/
worker/
terminated prematurely", change.machine.id)
we're losing the error message from the machine tracker termination
here. how about:
log.Printf("tracker of machine %d terminated prematurely: %v",
change.
?
https:/
worker/
machine tracker keeping track of
it occurs to me that this thing doesn't actually keep track of anything
- it simply forwards machine units changes to
the central goroutine.
perhaps:
// newMachineTracker tracks changes unit changes to
// the given machine and sends them to the
// central firewaller loop.
?
Gustavo Niemeyer (niemeyer) wrote : | # |
https:/
File worker/
https:/
worker/
map[int]
s/machines/
https:/
worker/
map[string]
s/units/
https:/
worker/
map[string]
s/services/
https:/
worker/
change.Removed {
s/removedMachin
https:/
worker/
fw.machines[
s/mt/machinet/
https:/
worker/
change.Added {
s/addedMachine/
https:/
worker/
newMachineTrack
s/mt/machinet/
https:/
worker/
As we talked, this seems to just create extra churn. I think we can drop
it, and change the sending side so it never sends nil stuff.
https:/
worker/
change.
s/removedUnit/unit/
https:/
worker/
fw.units[
s/ut/unitt/
https:/
worker/
tracking unit %s", removedUnit.Name())
Shouldn't the unit ports in this machine be closed here?
https:/
worker/
change.change.Added {
s/addedUnit/unit/
https:/
worker/
s/ut/unitt/
https:/
worker/
s/machine/machinet/
https:/
worker/
s/change //; so that you can use the fields of the Change type directly
rather than doing change...
Gustavo Niemeyer (niemeyer) wrote : | # |
LGTM
https:/
File worker/
https:/
worker/
map[int]
As discussed: machineds, unitds, serviceds
https:/
worker/
*machineUnitsChange
s/machineUnitsC
field name)
https:/
worker/
*unitPortsChange
s/unitPortsChan
https:/
worker/
error when stopping: %v", unit.Name(), err)
s/%s/%q/
s/data/watcher/
Gustavo Niemeyer (niemeyer) wrote : | # |
- 308. By Frank Mueller
-
firewaller: forgot two name changes
- 309. By Frank Mueller
-
firewaller: merged trunk
Preview Diff
1 | === modified file 'worker/firewaller/firewaller.go' |
2 | --- worker/firewaller/firewaller.go 2012-07-19 11:16:40 +0000 |
3 | +++ worker/firewaller/firewaller.go 2012-07-25 08:25:37 +0000 |
4 | @@ -10,20 +10,26 @@ |
5 | // Firewaller watches the state for ports opened or closed |
6 | // and reflects those changes onto the backing environment. |
7 | type Firewaller struct { |
8 | - st *state.State |
9 | - tomb tomb.Tomb |
10 | - machinesWatcher *state.MachinesWatcher |
11 | - machines map[int]*machineTracker |
12 | - machineUnitsChanges chan *machineUnitsChange |
13 | + st *state.State |
14 | + tomb tomb.Tomb |
15 | + machinesWatcher *state.MachinesWatcher |
16 | + machineds map[int]*machineData |
17 | + unitsChange chan *machineUnitsChange |
18 | + unitds map[string]*unitData |
19 | + portsChange chan *unitPortsChange |
20 | + serviceds map[string]*serviceData |
21 | } |
22 | |
23 | // NewFirewaller returns a new Firewaller. |
24 | func NewFirewaller(st *state.State) (*Firewaller, error) { |
25 | fw := &Firewaller{ |
26 | - st: st, |
27 | - machinesWatcher: st.WatchMachines(), |
28 | - machines: make(map[int]*machineTracker), |
29 | - machineUnitsChanges: make(chan *machineUnitsChange), |
30 | + st: st, |
31 | + machinesWatcher: st.WatchMachines(), |
32 | + machineds: make(map[int]*machineData), |
33 | + unitsChange: make(chan *machineUnitsChange), |
34 | + unitds: make(map[string]*unitData), |
35 | + portsChange: make(chan *unitPortsChange), |
36 | + serviceds: make(map[string]*serviceData), |
37 | } |
38 | go fw.loop() |
39 | return fw, nil |
40 | @@ -39,24 +45,45 @@ |
41 | if !ok { |
42 | return |
43 | } |
44 | - for _, removedMachine := range change.Removed { |
45 | - m, ok := fw.machines[removedMachine.Id()] |
46 | + for _, machine := range change.Removed { |
47 | + machined, ok := fw.machineds[machine.Id()] |
48 | if !ok { |
49 | panic("trying to remove machine that wasn't added") |
50 | } |
51 | - delete(fw.machines, removedMachine.Id()) |
52 | - if err := m.stop(); err != nil { |
53 | - log.Printf("machine tracker %d returned error when stopping: %v", removedMachine.Id(), err) |
54 | - } |
55 | - log.Debugf("firewaller: stopped tracking machine %d", removedMachine.Id()) |
56 | - } |
57 | - for _, addedMachine := range change.Added { |
58 | - m := newMachineTracker(addedMachine, fw) |
59 | - fw.machines[addedMachine.Id()] = m |
60 | - log.Debugf("firewaller: started tracking machine %d", m.id) |
61 | - } |
62 | - case <-fw.machineUnitsChanges: |
63 | - // TODO(mue) fill with life. |
64 | + delete(fw.machineds, machine.Id()) |
65 | + if err := machined.stopWatch(); err != nil { |
66 | + log.Printf("machine data %d returned error when stopping: %v", machine.Id(), err) |
67 | + } |
68 | + log.Debugf("firewaller: stopped watching machine %d", machine.Id()) |
69 | + } |
70 | + for _, machine := range change.Added { |
71 | + machined := newMachineData(machine, fw) |
72 | + fw.machineds[machine.Id()] = machined |
73 | + log.Debugf("firewaller: started watching machine %d", machine.Id()) |
74 | + } |
75 | + case change := <-fw.unitsChange: |
76 | + for _, unit := range change.Removed { |
77 | + unitd, ok := fw.unitds[unit.Name()] |
78 | + if !ok { |
79 | + panic("trying to remove unit that wasn't added") |
80 | + } |
81 | + delete(fw.unitds, unit.Name()) |
82 | + // TODO(mue) Close ports. |
83 | + if err := unitd.stopWatch(); err != nil { |
84 | + log.Printf("unit watcher %q returned error when stopping: %v", unit.Name(), err) |
85 | + } |
86 | + log.Debugf("firewaller: stopped watching unit %s", unit.Name()) |
87 | + } |
88 | + for _, unit := range change.Added { |
89 | + unitd := newUnitData(unit, fw) |
90 | + fw.unitds[unit.Name()] = unitd |
91 | + if fw.serviceds[unit.ServiceName()] == nil { |
92 | + // TODO(mue) Add service watcher. |
93 | + } |
94 | + log.Debugf("firewaller: started watching unit %s", unit.Name()) |
95 | + } |
96 | + case <-fw.portsChange: |
97 | + // TODO(mue) Handle changes of ports. |
98 | } |
99 | } |
100 | } |
101 | @@ -64,8 +91,11 @@ |
102 | // finishes cleans up when the firewaller is stopping. |
103 | func (fw *Firewaller) finish() { |
104 | watcher.Stop(fw.machinesWatcher, &fw.tomb) |
105 | - for _, m := range fw.machines { |
106 | - fw.tomb.Kill(m.stop()) |
107 | + for _, unitd := range fw.unitds { |
108 | + fw.tomb.Kill(unitd.stopWatch()) |
109 | + } |
110 | + for _, machined := range fw.machineds { |
111 | + fw.tomb.Kill(machined.stopWatch()) |
112 | } |
113 | fw.tomb.Done() |
114 | } |
115 | @@ -83,68 +113,118 @@ |
116 | |
117 | // machineUnitsChange contains the changed units for one specific machine. |
118 | type machineUnitsChange struct { |
119 | - machine *machineTracker |
120 | - change *state.MachineUnitsChange |
121 | + machined *machineData |
122 | + *state.MachineUnitsChange |
123 | } |
124 | |
125 | -// machineTracker keeps track of the unit changes of a machine. |
126 | -type machineTracker struct { |
127 | +// machineData watches the unit changes of a machine and passes them |
128 | +// to the firewaller for handling. |
129 | +type machineData struct { |
130 | tomb tomb.Tomb |
131 | firewaller *Firewaller |
132 | - id int |
133 | + machine *state.Machine |
134 | watcher *state.MachineUnitsWatcher |
135 | - ports map[state.Port]*unitTracker |
136 | -} |
137 | - |
138 | -// newMachineTracker creates a new machine tracker keeping track of |
139 | -// unit changes of the passed machine. |
140 | -func newMachineTracker(mst *state.Machine, fw *Firewaller) *machineTracker { |
141 | - mt := &machineTracker{ |
142 | - firewaller: fw, |
143 | - id: mst.Id(), |
144 | - watcher: mst.WatchUnits(), |
145 | - ports: make(map[state.Port]*unitTracker), |
146 | - } |
147 | - go mt.loop() |
148 | - return mt |
149 | -} |
150 | - |
151 | -// loop is the backend watching for machine units changes. |
152 | -func (mt *machineTracker) loop() { |
153 | - defer mt.tomb.Done() |
154 | - defer mt.watcher.Stop() |
155 | - for { |
156 | - select { |
157 | - case <-mt.tomb.Dying(): |
158 | - return |
159 | - case change, ok := <-mt.watcher.Changes(): |
160 | - // Send change or nil. |
161 | - select { |
162 | - case mt.firewaller.machineUnitsChanges <- &machineUnitsChange{mt, change}: |
163 | - case <-mt.tomb.Dying(): |
164 | - return |
165 | - } |
166 | - // The watcher terminated prematurely, so end the loop. |
167 | - if !ok { |
168 | - mt.firewaller.tomb.Kill(watcher.MustErr(mt.watcher)) |
169 | - return |
170 | - } |
171 | - } |
172 | - } |
173 | -} |
174 | - |
175 | -// stop stops the machine tracker. |
176 | -func (mt *machineTracker) stop() error { |
177 | - mt.tomb.Kill(nil) |
178 | - return mt.tomb.Wait() |
179 | -} |
180 | - |
181 | -type unitTracker struct { |
182 | - service *serviceTracker |
183 | - id string |
184 | - ports []state.Port |
185 | -} |
186 | - |
187 | -type serviceTracker struct { |
188 | +} |
189 | + |
190 | +// newMachineData starts the watching of the passed machine. |
191 | +func newMachineData(machine *state.Machine, fw *Firewaller) *machineData { |
192 | + md := &machineData{ |
193 | + firewaller: fw, |
194 | + machine: machine, |
195 | + watcher: machine.WatchUnits(), |
196 | + } |
197 | + go md.watchLoop() |
198 | + return md |
199 | +} |
200 | + |
201 | +// watchLoop is the backend watching for machine units changes. |
202 | +func (md *machineData) watchLoop() { |
203 | + defer md.tomb.Done() |
204 | + defer md.watcher.Stop() |
205 | + for { |
206 | + select { |
207 | + case <-md.tomb.Dying(): |
208 | + return |
209 | + case change, ok := <-md.watcher.Changes(): |
210 | + if !ok { |
211 | + md.firewaller.tomb.Kill(watcher.MustErr(md.watcher)) |
212 | + return |
213 | + } |
214 | + select { |
215 | + case md.firewaller.unitsChange <- &machineUnitsChange{md, change}: |
216 | + case <-md.tomb.Dying(): |
217 | + return |
218 | + } |
219 | + } |
220 | + } |
221 | +} |
222 | + |
223 | +// stopWatch stops the machine watching. |
224 | +func (md *machineData) stopWatch() error { |
225 | + md.tomb.Kill(nil) |
226 | + return md.tomb.Wait() |
227 | +} |
228 | + |
229 | +// unitPortsChange contains the changed ports for one specific unit. |
230 | +type unitPortsChange struct { |
231 | + unitd *unitData |
232 | + ports []state.Port |
233 | +} |
234 | + |
235 | +// unitData watches the port changes of a unit and passes them |
236 | +// to the firewaller for handling. |
237 | +type unitData struct { |
238 | + tomb tomb.Tomb |
239 | + firewaller *Firewaller |
240 | + unit *state.Unit |
241 | + watcher *state.PortsWatcher |
242 | + service *serviceData |
243 | + ports []state.Port |
244 | +} |
245 | + |
246 | +// newMachineData starts the watching of the passed unit. |
247 | +func newUnitData(unit *state.Unit, fw *Firewaller) *unitData { |
248 | + ud := &unitData{ |
249 | + firewaller: fw, |
250 | + unit: unit, |
251 | + watcher: unit.WatchPorts(), |
252 | + ports: make([]state.Port, 0), |
253 | + } |
254 | + go ud.watchLoop() |
255 | + return ud |
256 | +} |
257 | + |
258 | +func (ud *unitData) watchLoop() { |
259 | + defer ud.tomb.Done() |
260 | + defer ud.watcher.Stop() |
261 | + for { |
262 | + select { |
263 | + case <-ud.tomb.Dying(): |
264 | + return |
265 | + case change, ok := <-ud.watcher.Changes(): |
266 | + if !ok { |
267 | + ud.firewaller.tomb.Kill(watcher.MustErr(ud.watcher)) |
268 | + return |
269 | + } |
270 | + select { |
271 | + case ud.firewaller.portsChange <- &unitPortsChange{ud, change}: |
272 | + case <-ud.tomb.Dying(): |
273 | + return |
274 | + } |
275 | + } |
276 | + } |
277 | +} |
278 | + |
279 | +// stopWatch stops the unit watching. |
280 | +func (ud *unitData) stopWatch() error { |
281 | + ud.tomb.Kill(nil) |
282 | + return ud.tomb.Wait() |
283 | +} |
284 | + |
285 | +// serviceData watches the exposed flag changes of a service and passes them |
286 | +// to the firewaller for handling. |
287 | +type serviceData struct { |
288 | + // TODO(mue) Fill with life. |
289 | + service *state.Service |
290 | exposed bool |
291 | } |
292 | |
293 | === modified file 'worker/firewaller/firewaller_test.go' |
294 | --- worker/firewaller/firewaller_test.go 2012-07-19 11:16:40 +0000 |
295 | +++ worker/firewaller/firewaller_test.go 2012-07-25 08:25:37 +0000 |
296 | @@ -5,6 +5,7 @@ |
297 | . "launchpad.net/gocheck" |
298 | "launchpad.net/juju-core/environs/dummy" |
299 | "launchpad.net/juju-core/log" |
300 | + "launchpad.net/juju-core/state" |
301 | "launchpad.net/juju-core/state/testing" |
302 | coretesting "launchpad.net/juju-core/testing" |
303 | "launchpad.net/juju-core/worker/firewaller" |
304 | @@ -58,7 +59,7 @@ |
305 | case e := <-logHook.event: |
306 | got = append(got, e) |
307 | case <-time.After(500 * time.Millisecond): |
308 | - c.Fatalf("expected %q; timed out after %q", expect, got) |
309 | + c.Fatalf("expected %q; timed out, got %q", expect, got) |
310 | } |
311 | } |
312 | select { |
313 | @@ -75,7 +76,8 @@ |
314 | type FirewallerSuite struct { |
315 | coretesting.LoggingSuite |
316 | testing.StateSuite |
317 | - op <-chan dummy.Operation |
318 | + op <-chan dummy.Operation |
319 | + charm *state.Charm |
320 | } |
321 | |
322 | var _ = Suite(&FirewallerSuite{}) |
323 | @@ -116,16 +118,56 @@ |
324 | c.Assert(err, IsNil) |
325 | |
326 | assertEvents(c, []string{ |
327 | - fmt.Sprint("started tracking machine ", m1.Id()), |
328 | - fmt.Sprint("started tracking machine ", m2.Id()), |
329 | - fmt.Sprint("started tracking machine ", m3.Id()), |
330 | + fmt.Sprint("started watching machine ", m1.Id()), |
331 | + fmt.Sprint("started watching machine ", m2.Id()), |
332 | + fmt.Sprint("started watching machine ", m3.Id()), |
333 | }) |
334 | |
335 | err = s.State.RemoveMachine(m2.Id()) |
336 | c.Assert(err, IsNil) |
337 | |
338 | assertEvents(c, []string{ |
339 | - fmt.Sprint("stopped tracking machine ", m2.Id()), |
340 | + fmt.Sprint("stopped watching machine ", m2.Id()), |
341 | + }) |
342 | + |
343 | + c.Assert(fw.Stop(), IsNil) |
344 | +} |
345 | + |
346 | +func (s *FirewallerSuite) TestAssignUnassignUnit(c *C) { |
347 | + fw, err := firewaller.NewFirewaller(s.State) |
348 | + c.Assert(err, IsNil) |
349 | + |
350 | + setUpLogHook() |
351 | + defer tearDownLogHook() |
352 | + |
353 | + m1, err := s.State.AddMachine() |
354 | + c.Assert(err, IsNil) |
355 | + m2, err := s.State.AddMachine() |
356 | + c.Assert(err, IsNil) |
357 | + s.charm = s.AddTestingCharm(c, "dummy") |
358 | + s1, err := s.State.AddService("wordpress", s.charm) |
359 | + c.Assert(err, IsNil) |
360 | + u1, err := s1.AddUnit() |
361 | + c.Assert(err, IsNil) |
362 | + err = u1.AssignToMachine(m1) |
363 | + c.Assert(err, IsNil) |
364 | + u2, err := s1.AddUnit() |
365 | + c.Assert(err, IsNil) |
366 | + err = u2.AssignToMachine(m2) |
367 | + c.Assert(err, IsNil) |
368 | + |
369 | + assertEvents(c, []string{ |
370 | + fmt.Sprint("started watching machine ", m1.Id()), |
371 | + fmt.Sprint("started watching machine ", m2.Id()), |
372 | + fmt.Sprint("started watching unit ", u1.Name()), |
373 | + fmt.Sprint("started watching unit ", u2.Name()), |
374 | + }) |
375 | + |
376 | + err = u1.UnassignFromMachine() |
377 | + c.Assert(err, IsNil) |
378 | + |
379 | + assertEvents(c, []string{ |
380 | + fmt.Sprint("stopped watching unit ", u1.Name()), |
381 | }) |
382 | |
383 | c.Assert(fw.Stop(), IsNil) |
LGTM
https:/ /codereview. appspot. com/6404051/ diff/1/ worker/ firewaller/ firewaller. go firewaller/ firewaller. go (right):
File worker/
https:/ /codereview. appspot. com/6404051/ diff/1/ worker/ firewaller/ firewaller. go#newcode101 firewaller/ firewaller. go:101: if change.stateChange != nil {
worker/
when could change.stateChange be nil?
https:/ /codereview. appspot. com/6404051/ diff/1/ worker/ firewaller/ firewaller. go#newcode153 firewaller/ firewaller. go:153: func (m *machine) loop(fw
worker/
*Firewaller) {
perhaps a comment on this function?
// loop waits for changes to the machine's units
// and funnels them to the firewall's machineUnitsChanges
// channel.
https:/ /codereview. appspot. com/6404051/