Merge lp:~themue/juju-core/go-cmd-firewall into lp:~juju/juju-core/trunk

Proposed by Frank Mueller
Status: Work in progress
Proposed branch: lp:~themue/juju-core/go-cmd-firewall
Merge into: lp:~juju/juju-core/trunk
Diff against target: 850 lines (+825/-2)
3 files modified
cmd/cmd_test.go (+5/-2)
cmd/firewall.go (+472/-0)
cmd/firewall_test.go (+348/-0)
To merge this branch: bzr merge lp:~themue/juju-core/go-cmd-firewall
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+114212@code.launchpad.net

Description of the change

cmd: added port of state/firewall.py

The port of the current Python firewall has been moved to
the cmd package because it's only used by the provisioning
agent. The implementation is split into several subtypes
and done, the tests are done for services.

THIS PROPOSAL IS STILL EVOLVING.

https://codereview.appspot.com/6374047/

To post a comment you must log in.
Revision history for this message
Roger Peppe (rogpeppe) wrote :

some initial superficial comments.

i'm still mulling it over.

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go
File cmd/firewall.go (right):

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode1
cmd/firewall.go:1: package cmd
this is the wrong place for this code.
given that it's really just part of the provisioning agent, it should be
alongside the PA code, in cmd/jujud, or the provisioning package,
if/when that happens.

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode14
cmd/firewall.go:14: type Ports []state.Port
i'm not sure that this type or its methods deserve export.
not exporting the type means that environs implementors don't
necessarily need to depend on this package.

since we're talking about a set of ports, i wonder if
map[state.Port]bool might be a nicer representation.

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode55
cmd/firewall.go:55: OpenPorts(instanceId string, machineId int) (Ports,
error)
i wonder if just "Ports" might be better here.
the name OpenPorts seems to suggest it's just a version of OpenPort that
opens more than one port.

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode106
cmd/firewall.go:106: // startd the watchers of the services exposed
flag.
s/startd/starts/

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode159
cmd/firewall.go:159: func (fw *Firewall)
AddOpenClosePortsOnMachineObserver(observer func(int)) {
i'm not sure i understand the use-case for this function. isn't
observing ports exactly what all this code is about? why do we need
another hook? if it's about testing, i'd have thought that the
PortsManager interface should give us a sufficient interface.

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode280
cmd/firewall.go:280: type serviceExposedWatcher struct {
see comment on newServiceUnitsWatcher below.

serviceExposedPortManager, perhaps?

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode332
cmd/firewall.go:332: }
i think this may read better:

sew.exposed = exposed
if exposed {
     sew.suw = newServiceUnitsWatcher(sew.fw, sew)
}

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode360
cmd/firewall.go:360: // the passed service.
this isn't really a watcher in the sense of the other watchers in state.
more of a serviceUnitPortManager, perhaps?

https://codereview.appspot.com/6374047/diff/1/cmd/firewall.go#newcode414
cmd/firewall.go:414: type unitPortsWatcher struct {
unitPortManager?

https://codereview.appspot.com/6374047/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Frank, I'm sure a lot of what is in this branch will be useful, but this
is not the way we should be pushing this kind of feature. 700 lines of
code, apparently in a location that, agreeing with roger, is not the
right one, with 5 lines of tests.

I understand you say it's an evolving proposal, but we've covered this
kind of thing before, and it's going to be painful again but I'll have
to ask you to chunk this down into smaller portions that we can more
easily agree on.

I'm happy to have a run down with you tomorrow to see how to best chunk
this up, and what is the first piece for the puzzle.

https://codereview.appspot.com/6374047/

Unmerged revisions

266. By Frank Mueller

cmd: added port of state/firewall.py

265. By Frank Mueller

state: preparing for trunk merging

264. By Frank Mueller

cmd: prepared firewall testing

263. By Frank Mueller

cmd: added initial version of the firewall

262. By Dave Cheney

state: machine.InstanceId error proposal

machine.InstanceId returns NoInstanceIdError when
there is no instance id set in the machine *ConfigNode.

R=niemeyer
CC=
https://codereview.appspot.com/6297105

261. By Gustavo Niemeyer

mstate: update to the mgo API version r2012.06.22

This was already reviewed in:

https://code.launchpad.net/~niemeyer/juju-core/store-update-mgo/+merge/111546

But Launchpad simply can't serve that branch without errors anymore. :-(

R=
CC=
https://codereview.appspot.com/6336052

260. By Gustavo Niemeyer

store: update mgo API to r2012.06.22

R=rog
CC=
https://codereview.appspot.com/6327049

259. By Frank Mueller

state: adding of FlagWatcher and integration into Service

The port of Firewall (in a different CL) needs the implementation
of the FlagWatcher and its integration into Service to watch the
exposed flag. Both is done in this CL.

R=niemeyer
CC=
https://codereview.appspot.com/6332048

258. By Roger Peppe

cmd/jujud: make provisioner use StateInfo from environment.

This means we can pass in a localhost address to the provisioning
agent and it will nonetheless use the correct address for zookeeper
when firing up clients.

Some rearrangement of testing code was necessary too.

R=niemeyer
CC=
https://codereview.appspot.com/6336047

257. By Dave Cheney

environs/dummy: enhance start/stop operations

This is an alternate proposal to enhance dummy to return
the actual environ.Instance objects created during
Start/StopInstance operations.

This would be used in the PA tests to double check that
m, err := s.st.AddMachine(n); m.InstanceId matches the
Instance.Id() provided by dummy.

R=niemeyer
CC=
https://codereview.appspot.com/6304104

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'cmd/cmd_test.go'
2--- cmd/cmd_test.go 2012-06-21 20:40:39 +0000
3+++ cmd/cmd_test.go 2012-07-10 15:08:35 +0000
4@@ -3,11 +3,14 @@
5 import (
6 . "launchpad.net/gocheck"
7 "launchpad.net/juju-core/cmd"
8+ "launchpad.net/juju-core/testing"
9 "path/filepath"
10- "testing"
11+ stdtesting "testing"
12 )
13
14-func Test(t *testing.T) { TestingT(t) }
15+func Test(t *stdtesting.T) {
16+ testing.ZkTestPackage(t)
17+}
18
19 type CmdSuite struct{}
20
21
22=== added file 'cmd/firewall.go'
23--- cmd/firewall.go 1970-01-01 00:00:00 +0000
24+++ cmd/firewall.go 2012-07-10 15:08:35 +0000
25@@ -0,0 +1,472 @@
26+package cmd
27+
28+import (
29+ "errors"
30+ "fmt"
31+ "launchpad.net/juju-core/environs"
32+ "launchpad.net/juju-core/log"
33+ "launchpad.net/juju-core/state"
34+ "launchpad.net/juju-core/state/watcher"
35+ "launchpad.net/tomb"
36+)
37+
38+// Ports is a list of ports.
39+type Ports []state.Port
40+
41+// Missing returns the ports that exist in p but not in other.
42+func (p Ports) Missing(other Ports) (missing Ports) {
43+next:
44+ for _, portA := range p {
45+ for _, portB := range other {
46+ if portA.Number == portB.Number && portA.Protocol == portB.Protocol {
47+ continue next
48+ }
49+ }
50+ missing = append(missing, portA)
51+ }
52+ return
53+}
54+
55+// Len is the number of ports.
56+func (p Ports) Len() int {
57+ return len(p)
58+}
59+
60+// Less returns whether the port with index i should sort
61+// before the port with index j.
62+func (p Ports) Less(i, j int) bool {
63+ if p[i].Protocol == p[j].Protocol {
64+ return p[i].Number < p[j].Number
65+ }
66+ return p[i].Protocol < p[j].Protocol
67+}
68+
69+// Swap swaps the ports with indexes i and j.
70+func (p Ports) Swap(i, j int) {
71+ p[i], p[j] = p[j], p[i]
72+}
73+
74+// PortManager defines the interface for the component
75+// responsible for the management of the ports on the
76+// instances and machines.
77+type PortManager interface {
78+ // OpenPorts returns the current open ports of the
79+ // machine on the instance.
80+ OpenPorts(instanceId string, machineId int) (Ports, error)
81+ // OpenPort opens the passed port on the machine and instance.
82+ OpenPort(instanceId string, machineId int, port state.Port) error
83+ // ClosePort closes the passed port on the machine and instance.
84+ ClosePort(instanceId string, machineId int, port state.Port) error
85+}
86+
87+// Firewall manages the opening and closing of ports in the firewall.
88+type Firewall struct {
89+ state *state.State
90+ isRunning func() bool
91+ portManager PortManager
92+ watchedMachines map[int]*machineUnitsWatcher
93+ watchedServices map[string]*serviceExposedWatcher
94+ retryMachinesOnPortError map[int]bool
95+ openClosePortsObservers []func(*state.Unit)
96+ openClosePortsOnMachineObservers []func(int)
97+}
98+
99+// NewFirewall creates a new firewall. is Running is a function
100+// that returns whether the associated agent is running or not.
101+func NewFirewall(st *state.State, isRunning func() bool, portManager PortManager) *Firewall {
102+ return &Firewall{
103+ state: st,
104+ isRunning: isRunning,
105+ portManager: portManager,
106+ watchedMachines: make(map[int]*machineUnitsWatcher),
107+ watchedServices: make(map[string]*serviceExposedWatcher),
108+ retryMachinesOnPortError: make(map[int]bool),
109+ openClosePortsObservers: make([]func(*state.Unit), 0),
110+ openClosePortsOnMachineObservers: make([]func(int), 0),
111+ }
112+}
113+
114+// ProcessMachine ensures that a watch is setup per machine and
115+// performs any necessary retry.
116+func (fw *Firewall) ProcessMachine(machine *state.Machine) error {
117+ machineId := machine.Id()
118+ if fw.retryMachinesOnPortError[machineId] {
119+ delete(fw.retryMachinesOnPortError, machineId)
120+ if err := fw.OpenClosePortsOnMachine(machineId); err != nil {
121+ return err
122+ }
123+ }
124+ if fw.watchedMachines[machineId] == nil {
125+ fw.watchedMachines[machineId] = newMachineUnitsWatcher(fw, machineId)
126+ }
127+ return nil
128+}
129+
130+// WatchServiceChanges is called when services are added or removed and
131+// startd the watchers of the services exposed flag.
132+func (fw *Firewall) WatchServiceChanges(change *state.ServicesChange) error {
133+ for _, service := range change.Removed {
134+ sew, ok := fw.watchedServices[service.Name()]
135+ if ok {
136+ if err := sew.stop(); err != nil {
137+ return err
138+ }
139+ delete(fw.watchedServices, service.Name())
140+ }
141+ }
142+ for _, service := range change.Added {
143+ if _, ok := fw.watchedServices[service.Name()]; ok {
144+ // Already watched.
145+ continue
146+ }
147+ fw.watchedServices[service.Name()] = newServiceExposedWatcher(fw, service)
148+ }
149+ return nil
150+}
151+
152+// AddOpenClosePortsObserver adds an observer for the opening
153+// or closing of ports for a service unit.
154+func (fw *Firewall) AddOpenClosePortsObserver(observer func(*state.Unit)) {
155+ fw.openClosePortsObservers = append(fw.openClosePortsObservers, observer)
156+}
157+
158+// OpenClosePorts is called upon changes that *may* open/close
159+// ports for a service unit.
160+func (fw *Firewall) OpenClosePorts(unit *state.Unit) (err error) {
161+ defer errorContextf(&err, "can't open/close ports for unit %q", unit)
162+ if !fw.isRunning() {
163+ return errors.New("agent associated to firewall is not running")
164+ }
165+ defer func() {
166+ // Ensure that the observations runs after the
167+ // corresponding action completes. In particular, tests
168+ // that use observation depend on this ordering to ensure
169+ // that the action has in fact happened before they can
170+ // proceed.
171+ for _, observer := range fw.openClosePortsObservers {
172+ observer(unit)
173+ }
174+ }()
175+ machineId, err := unit.AssignedMachineId()
176+ if err != nil {
177+ return err
178+ }
179+ return fw.OpenClosePortsOnMachine(machineId)
180+}
181+
182+// AddOpenClosePortsOnMachineObserver adds an observer for the opening
183+// or closing of ports on a machine.
184+func (fw *Firewall) AddOpenClosePortsOnMachineObserver(observer func(int)) {
185+ fw.openClosePortsOnMachineObservers = append(fw.openClosePortsOnMachineObservers, observer)
186+}
187+
188+// OpenClosePortsOnMachine is called upon changes that *may* open/close
189+// ports for a machine.
190+func (fw *Firewall) OpenClosePortsOnMachine(machineId int) (err error) {
191+ defer errorContextf(&err, "can't open/close ports on machine %d", machineId)
192+ if !fw.isRunning() {
193+ return errors.New("agent associated to firewall is not running")
194+ }
195+ defer func() {
196+ if err == environs.ErrNoInstances {
197+ log.Printf("no provisioned instance for machine %d", machineId)
198+ } else if err != nil {
199+ log.Printf("got error in opening/closing ports, will retry")
200+ fw.retryMachinesOnPortError[machineId] = true
201+ }
202+ // Ensure that the observation runs after the corresponding
203+ // action completes. In particular, tests that use
204+ // observation depend on this ordering to ensure that this
205+ // action has happened before they can proceed.
206+ for _, observer := range fw.openClosePortsOnMachineObservers {
207+ observer(machineId)
208+ }
209+
210+ }()
211+ machine, err := fw.state.Machine(machineId)
212+ if err != nil {
213+ return err
214+ }
215+ instanceId, err := machine.InstanceId()
216+ if err != nil {
217+ return err
218+ }
219+ units, err := machine.Units()
220+ if err != nil {
221+ return err
222+ }
223+ policyPorts := Ports{}
224+ for _, unit := range units {
225+ service, err := fw.state.Service(unit.ServiceName())
226+ if err != nil {
227+ return err
228+ }
229+ exposed, err := service.IsExposed()
230+ if err != nil {
231+ return err
232+ }
233+ if exposed {
234+ ports, err := unit.OpenPorts()
235+ if err != nil {
236+ return err
237+ }
238+ policyPorts = append(policyPorts, ports...)
239+ }
240+ }
241+ currentPorts, err := fw.portManager.OpenPorts(instanceId, machineId)
242+ toOpen := policyPorts.Missing(currentPorts)
243+ toClose := currentPorts.Missing(policyPorts)
244+ for _, port := range toOpen {
245+ if err := fw.portManager.OpenPort(instanceId, machineId, port); err != nil {
246+ return err
247+ }
248+ }
249+ for _, port := range toClose {
250+ if err := fw.portManager.ClosePort(instanceId, machineId, port); err != nil {
251+ return err
252+ }
253+ }
254+ return nil
255+}
256+
257+// machineUnitsWatcher manages the opening and closing of ports in the firewall
258+// for one machine.
259+type machineUnitsWatcher struct {
260+ fw *Firewall
261+ machineId int
262+ tomb tomb.Tomb
263+}
264+
265+// newMachineUnitsWatcher creates a new watcher for the passed machine.
266+func newMachineUnitsWatcher(fw *Firewall, machineId int) *machineUnitsWatcher {
267+ muw := &machineUnitsWatcher{
268+ fw: fw,
269+ machineId: machineId,
270+ }
271+ go muw.loop()
272+ return muw
273+}
274+
275+// loop watches assigned units for changes possible require port management.
276+func (muw *machineUnitsWatcher) loop() {
277+ defer muw.tomb.Done()
278+ machine, err := muw.fw.state.Machine(muw.machineId)
279+ if err != nil {
280+ muw.tomb.Kill(err)
281+ return
282+ }
283+ unitsWatcher := machine.WatchUnits()
284+ defer watcher.Stop(unitsWatcher, &muw.tomb)
285+
286+ for {
287+ select {
288+ case <-muw.tomb.Dying():
289+ return
290+ case _, ok := <-unitsWatcher.Changes():
291+ if !ok {
292+ muw.tomb.Kill(watcher.MustErr(unitsWatcher))
293+ return
294+ }
295+ if err := muw.fw.OpenClosePortsOnMachine(muw.machineId); err != nil {
296+ muw.tomb.Kill(err)
297+ return
298+ }
299+ }
300+ }
301+}
302+
303+// serviceExposedWatcher manages the opening and closing of ports in the firewall
304+// for a service.
305+type serviceExposedWatcher struct {
306+ fw *Firewall
307+ service *state.Service
308+ suw *serviceUnitsWatcher
309+ exposed bool
310+ tomb tomb.Tomb
311+}
312+
313+// newServiceExposedWatcher creates a new watcher for the passed service.
314+func newServiceExposedWatcher(fw *Firewall, service *state.Service) *serviceExposedWatcher {
315+ sew := &serviceExposedWatcher{
316+ fw: fw,
317+ service: service,
318+ }
319+ go sew.loop()
320+ return sew
321+}
322+
323+// loop watches if the exposed flag of the service is set.
324+func (sew *serviceExposedWatcher) loop() {
325+ log.Debugf("start watching of %q on changes to being exposed", sew.service)
326+ defer sew.tomb.Done()
327+ exposedWatcher := sew.service.WatchExposed()
328+ defer watcher.Stop(exposedWatcher, &sew.tomb)
329+
330+ for {
331+ select {
332+ case <-sew.tomb.Dying():
333+ return
334+ case exposed, ok := <-exposedWatcher.Changes():
335+ if !ok {
336+ sew.tomb.Kill(watcher.MustErr(exposedWatcher))
337+ return
338+ }
339+ if !sew.fw.isRunning() {
340+ sew.tomb.Kill(fmt.Errorf("agent associated to firewall is not running"))
341+ return
342+
343+ }
344+ if exposed {
345+ log.Debugf("service %q exposed flag is set", sew.service)
346+ } else {
347+ log.Debugf("service %q exposed flag is unset", sew.service)
348+ }
349+ units, err := sew.service.AllUnits()
350+ if err != nil {
351+ log.Debugf("stopping watch on %q: %v", sew.service, err)
352+ sew.tomb.Kill(err)
353+ return
354+ }
355+ for _, unit := range units {
356+ sew.fw.OpenClosePorts(unit)
357+ }
358+ if !exposed {
359+ sew.exposed = false
360+ } else {
361+ sew.exposed = true
362+ sew.suw = newServiceUnitsWatcher(sew.fw, sew)
363+ }
364+ }
365+ }
366+}
367+
368+// stop stops the service exposed watcher and returns any error
369+// encountered while watching.
370+func (sew *serviceExposedWatcher) stop() error {
371+ sew.tomb.Kill(nil)
372+ return sew.tomb.Wait()
373+}
374+
375+// serviceUnitsWatcher manages the opening and closing of ports in the firewall
376+// for the units of one service.
377+type serviceUnitsWatcher struct {
378+ fw *Firewall
379+ sew *serviceExposedWatcher
380+ upws map[string]*unitPortsWatcher
381+ tomb tomb.Tomb
382+}
383+
384+// newServiceUnitsWatcher creates a new watcher for the units of
385+// the passed service.
386+func newServiceUnitsWatcher(fw *Firewall, sew *serviceExposedWatcher) *serviceUnitsWatcher {
387+ suw := &serviceUnitsWatcher{
388+ fw: fw,
389+ sew: sew,
390+ upws: make(map[string]*unitPortsWatcher),
391+ }
392+ go suw.loop()
393+ return suw
394+}
395+
396+// loop watches on the units of its service.
397+func (suw *serviceUnitsWatcher) loop() {
398+ log.Debugf("start watching of service units for service %q", suw.sew.service)
399+ defer suw.tomb.Done()
400+ defer func() {
401+ for _, upw := range suw.upws {
402+ upw.stop()
403+ }
404+ }()
405+ unitsWatcher := suw.sew.service.WatchUnits()
406+ defer watcher.Stop(unitsWatcher, &suw.tomb)
407+
408+ for {
409+ select {
410+ case <-suw.tomb.Dying():
411+ return
412+ case <-suw.sew.tomb.Dying():
413+ return
414+ case changes, ok := <-unitsWatcher.Changes():
415+ if !ok {
416+ suw.tomb.Kill(watcher.MustErr(unitsWatcher))
417+ return
418+ }
419+ if !suw.fw.isRunning() || !suw.sew.exposed {
420+ suw.tomb.Kill(fmt.Errorf("agent associated to firewall is not running or service is not exposed"))
421+ return
422+ }
423+ for _, unit := range changes.Removed {
424+ if !suw.fw.isRunning() {
425+ suw.tomb.Kill(fmt.Errorf("agent associated to firewall is not running"))
426+ }
427+ delete(suw.upws, unit.Name())
428+
429+ suw.fw.OpenClosePorts(unit)
430+ }
431+ for _, unit := range changes.Added {
432+ suw.upws[unit.Name()] = newUnitPortsWatcher(suw.fw, suw, unit)
433+ }
434+ }
435+ }
436+}
437+
438+// unitPortsWatcher watches the opening and closing of the ports for a unit.
439+type unitPortsWatcher struct {
440+ fw *Firewall
441+ suw *serviceUnitsWatcher
442+ unit *state.Unit
443+ tomb tomb.Tomb
444+}
445+
446+// newUnitPortsWatch creates the new watcher for the ports of the passed unit.
447+func newUnitPortsWatcher(fw *Firewall, suw *serviceUnitsWatcher, unit *state.Unit) *unitPortsWatcher {
448+ upw := &unitPortsWatcher{
449+ fw: fw,
450+ suw: suw,
451+ unit: unit,
452+ }
453+ go upw.loop()
454+ return upw
455+}
456+
457+// loop watches on the ports of its unit.
458+func (upw *unitPortsWatcher) loop() {
459+ log.Debugf("start watching of ports for unit %q", upw.unit)
460+ defer upw.tomb.Done()
461+ portsWatcher := upw.unit.WatchPorts()
462+ defer watcher.Stop(portsWatcher, &upw.tomb)
463+
464+ for {
465+ select {
466+ case <-upw.tomb.Dying():
467+ return
468+ case _, ok := <-portsWatcher.Changes():
469+ if !ok {
470+ upw.tomb.Kill(watcher.MustErr(portsWatcher))
471+ return
472+ }
473+ _, unitIsWatched := upw.suw.upws[upw.unit.Name()]
474+ if !upw.fw.isRunning() || !upw.suw.sew.exposed || !unitIsWatched {
475+ upw.tomb.Kill(fmt.Errorf("agent associated to firewall is not running or service is not exposed"))
476+ return
477+ }
478+ upw.fw.OpenClosePorts(upw.unit)
479+ }
480+ }
481+}
482+
483+// stop stops the unit ports watcher and returns any error
484+// encountered while watching.
485+func (upw *unitPortsWatcher) stop() error {
486+ upw.tomb.Kill(nil)
487+ return upw.tomb.Wait()
488+}
489+
490+// errorContextf prefixes any error stored in err with text formatted
491+// according to the format specifier. If err does not contain an error,
492+// errorContextf does nothing.
493+func errorContextf(err *error, format string, args ...interface{}) {
494+ if *err != nil {
495+ *err = errors.New(fmt.Sprintf(format, args...) + ": " + (*err).Error())
496+ }
497+}
498
499=== added file 'cmd/firewall_test.go'
500--- cmd/firewall_test.go 1970-01-01 00:00:00 +0000
501+++ cmd/firewall_test.go 2012-07-10 15:08:35 +0000
502@@ -0,0 +1,348 @@
503+package cmd_test
504+
505+import (
506+ "fmt"
507+ . "launchpad.net/gocheck"
508+ "launchpad.net/juju-core/cmd"
509+ "launchpad.net/juju-core/state"
510+ "launchpad.net/juju-core/state/testing"
511+ "sort"
512+ "time"
513+)
514+
515+type FirewallSuite struct {
516+ testing.StateSuite
517+ charm *state.Charm
518+ running bool
519+ portManager cmd.PortManager
520+ fw *cmd.Firewall
521+ servicesWatcher *state.ServicesWatcher
522+ seenUnitChan chan bool
523+ seenMachineChan chan bool
524+}
525+
526+func (s *FirewallSuite) SetUpTest(c *C) {
527+ s.StateSuite.SetUpTest(c)
528+ s.charm = s.AddTestingCharm(c, "dummy")
529+}
530+
531+func (s *FirewallSuite) TearDownTest(c *C) {
532+ s.StateSuite.TearDownTest(c)
533+}
534+
535+func (s *FirewallSuite) startFirewall() {
536+ s.portManager = newPortManagerMock()
537+ s.fw = cmd.NewFirewall(s.State, func() bool { return s.running }, s.portManager)
538+ s.servicesWatcher = s.State.WatchServices()
539+ go func() {
540+ for change := range s.servicesWatcher.Changes() {
541+ s.fw.WatchServiceChanges(change)
542+ }
543+ }()
544+ s.seenUnitChan = make(chan bool)
545+ s.seenMachineChan = make(chan bool)
546+ s.running = true
547+}
548+
549+func (s *FirewallSuite) stopFirewall() {
550+ s.servicesWatcher.Stop()
551+ s.fw = nil
552+}
553+
554+func (s *FirewallSuite) observeUnits(unitNames ...string) func() error {
555+ seen := map[string]bool{}
556+ s.fw.AddOpenClosePortsObserver(func(unit *state.Unit) {
557+ seen[unit.Name()] = true
558+ for _, unitName := range unitNames {
559+ if seen[unitName] {
560+ select {
561+ case s.seenUnitChan <- true:
562+ default:
563+ }
564+ }
565+ }
566+ })
567+ return func() error {
568+ select {
569+ case <-s.seenUnitChan:
570+ return nil
571+ case <-time.After(2 * time.Second):
572+ return fmt.Errorf("timeout during wait for observed units")
573+ }
574+ return nil
575+ }
576+}
577+
578+func (s *FirewallSuite) observeMachines(machineIds ...int) func() error {
579+ seen := map[int]bool{}
580+ s.fw.AddOpenClosePortsOnMachineObserver(func(machineId int) {
581+ seen[machineId] = true
582+ for _, machineId := range machineIds {
583+ if seen[machineId] {
584+ select {
585+ case s.seenMachineChan <- true:
586+ default:
587+ }
588+ }
589+ }
590+ })
591+ return func() error {
592+ select {
593+ case <-s.seenMachineChan:
594+ return nil
595+ case <-time.After(2 * time.Second):
596+ return fmt.Errorf("timeout during wait for observed machines")
597+ }
598+ return nil
599+ }
600+}
601+
602+func (s *FirewallSuite) providerPorts(c *C, machine *state.Machine) cmd.Ports {
603+ instanceId, err := machine.InstanceId()
604+ c.Assert(err, IsNil)
605+ ports, err := s.portManager.OpenPorts(instanceId, machine.Id())
606+ c.Assert(err, IsNil)
607+ sort.Sort(ports)
608+ return ports
609+}
610+
611+var _ = Suite(&FirewallSuite{})
612+
613+func (s *FirewallSuite) TestWatchServiceChanges(c *C) {
614+ // Verify that a service unit is checked whenever a change
615+ // occurs such that ports may need to be opened and/or closed
616+ // for the machine corresponding to a given service unit.
617+ s.startFirewall()
618+
619+ wait := s.observeUnits("wordpress/0")
620+ wordpress, err := s.State.AddService("wordpress", s.charm)
621+ c.Assert(err, IsNil)
622+ _, err = wordpress.AddUnit()
623+ c.Assert(err, IsNil)
624+ err = wordpress.SetExposed()
625+ c.Assert(err, IsNil)
626+ c.Assert(wait(), IsNil)
627+
628+ err = wordpress.ClearExposed()
629+ c.Assert(err, IsNil)
630+ c.Assert(wait(), IsNil)
631+
632+ err = wordpress.SetExposed()
633+ c.Assert(err, IsNil)
634+ c.Assert(wait(), IsNil)
635+
636+ s.stopFirewall()
637+}
638+
639+func (s *FirewallSuite) TestAddRemoveUnitsForExposedService(c *C) {
640+ // Verify that adding/removing service units for an exposed
641+ // service triggers the appropriate firewall management of
642+ // opening/closing ports on the machines for the corresponding
643+ //service units.
644+ s.startFirewall()
645+
646+ wordpress, err := s.State.AddService("wordpress", s.charm)
647+ c.Assert(err, IsNil)
648+ err = wordpress.SetExposed()
649+ c.Assert(err, IsNil)
650+
651+ wait := s.observeUnits("wordpress/0", "wordpress/1")
652+ wordpress0, err := wordpress.AddUnit()
653+ c.Assert(err, IsNil)
654+ c.Assert(wait(), IsNil)
655+
656+ wait = s.observeUnits("wordpress/2")
657+ err = wordpress.RemoveUnit(wordpress0)
658+ c.Assert(err, IsNil)
659+ _, err = wordpress.AddUnit()
660+ c.Assert(err, IsNil)
661+ c.Assert(wait(), IsNil)
662+
663+ s.stopFirewall()
664+}
665+
666+func (s *FirewallSuite) TestOpenClosePorts(c *C) {
667+ // Verify that opening/closing ports triggers the appropriate
668+ // firewall management for the corresponding service units.
669+ s.startFirewall()
670+
671+ wait := s.observeUnits("wordpress/0")
672+ wordpress, err := s.State.AddService("wordpress", s.charm)
673+ c.Assert(err, IsNil)
674+ err = wordpress.SetExposed()
675+ c.Assert(err, IsNil)
676+ wordpress0, err := wordpress.AddUnit()
677+ _, err = wordpress.AddUnit()
678+ wordpress0.OpenPort("tcp", 443)
679+ wordpress0.OpenPort("tcp", 80)
680+ wordpress0.ClosePort("tcp", 443)
681+ c.Assert(wait(), IsNil)
682+
683+ wait = s.observeUnits("wordpress/1", "wordpress/3")
684+ wordpress3, err := wordpress.AddUnit()
685+ wordpress0.OpenPort("tcp", 53)
686+ wordpress3.OpenPort("tcp", 80)
687+ c.Assert(wait(), IsNil)
688+
689+ wait = s.observeUnits("wordpress/0", "wordpress/1", "wordpress/3")
690+ wordpress.ClearExposed()
691+ c.Assert(wait(), IsNil)
692+
693+ s.stopFirewall()
694+}
695+
696+func (s *FirewallSuite) TestRemoveService(c *C) {
697+ // Verify that firewall for corresponding service units
698+ // is triggered upon the service's removal.
699+ s.startFirewall()
700+
701+ wait := s.observeUnits("wordpress/0", "wordpress/1")
702+ wordpress, err := s.State.AddService("wordpress", s.charm)
703+ c.Assert(err, IsNil)
704+ wordpress.AddUnit()
705+ wordpress.AddUnit()
706+ wordpress.SetExposed()
707+ c.Assert(wait(), IsNil)
708+
709+ // Do not clear the exposed flag prior to removal, triggering
710+ // should still occur as expected
711+ s.State.RemoveService(wordpress)
712+
713+ s.stopFirewall()
714+}
715+
716+func (s *FirewallSuite) TestPortManagementForUnexposedServiceIsANop(c *C) {
717+ // Verify that activity on an unexposed service does NOT
718+ // trigger firewall for the corresponding service unit.
719+ s.startFirewall()
720+
721+ wait := s.observeUnits("not-called")
722+ wordpress, err := s.State.AddService("wordpress", s.charm)
723+ c.Assert(err, IsNil)
724+ wordpress0, err := wordpress.AddUnit()
725+ c.Assert(err, IsNil)
726+ wordpress0.OpenPort("tcp", 53)
727+ c.Assert(wait(), ErrorMatches, `timeout during wait for observed units`)
728+
729+ s.stopFirewall()
730+}
731+
732+func (s *FirewallSuite) TestProvisioningAgentRestart(c *C) {
733+ // Verify that firewall management is correct if the agent restarts.
734+ // In particular, this test verifies that all state relevant for
735+ // firewall management is stored in ZK and not in the agent
736+ // itself.
737+ wordpress, err := s.State.AddService("wordpress", s.charm)
738+ c.Assert(err, IsNil)
739+ wordpress0, err := wordpress.AddUnit()
740+ wordpress1, err := wordpress.AddUnit()
741+ wordpress1.OpenPort("tcp", 443)
742+ wordpress1.OpenPort("tcp", 80)
743+ wordpress.SetExposed()
744+
745+ // Simulate agent start.
746+ s.startFirewall()
747+
748+ wait := s.observeUnits("wordpress/0", "wordpress/1")
749+ wordpress0.OpenPort("tcp", 53)
750+ wordpress1.ClosePort("tcp", 443)
751+ c.Assert(wait(), IsNil)
752+
753+ wait = s.observeUnits("wordpress/1")
754+ wordpress1.ClosePort("tcp", 80)
755+
756+ wait = s.observeUnits("wordpress/0", "wordpress/1")
757+ wordpress.ClearExposed()
758+ c.Assert(wait(), IsNil)
759+
760+ s.stopFirewall()
761+}
762+
763+func (s *FirewallSuite) TestOpenClosePortsOnMachine(c *C) {
764+ // Verify opening/closing ports on a machine works properly.
765+ s.startFirewall()
766+
767+ machine, err := s.State.AddMachine()
768+ c.Assert(err, IsNil)
769+ machine.SetInstanceId("m-0")
770+ s.fw.ProcessMachine(machine)
771+
772+ // Expose a service.
773+ wordpress, err := s.State.AddService("wordpress", s.charm)
774+ c.Assert(err, IsNil)
775+ wordpress.SetExposed()
776+ wordpress0, err := wordpress.AddUnit()
777+ wordpress0.OpenPort("tcp", 80)
778+ wordpress0.OpenPort("tcp", 443)
779+ wordpress0.AssignToMachine(machine)
780+ err = s.fw.OpenClosePortsOnMachine(machine.Id())
781+ c.Assert(err, IsNil)
782+ ports := s.providerPorts(c, machine)
783+ c.Assert(ports, DeepEquals, cmd.Ports{{"tcp", 80}, {"tcp", 443}})
784+
785+ // Change port setup.
786+ wordpress0.OpenPort("tcp", 8080)
787+ wordpress0.ClosePort("tcp", 443)
788+ err = s.fw.OpenClosePortsOnMachine(machine.Id())
789+ c.Assert(err, IsNil)
790+ ports = s.providerPorts(c, machine)
791+ c.Assert(ports, DeepEquals, cmd.Ports{{"tcp", 80}, {"tcp", 8080}})
792+
793+ s.stopFirewall()
794+}
795+
796+type portManagerMock struct {
797+ ports map[string]map[int]map[state.Port]bool
798+}
799+
800+func newPortManagerMock() cmd.PortManager {
801+ return &portManagerMock{make(map[string]map[int]map[state.Port]bool)}
802+}
803+
804+func (m *portManagerMock) OpenPorts(instanceId string, machineId int) (cmd.Ports, error) {
805+ instance, ok := m.ports[instanceId]
806+ if !ok {
807+ instance = make(map[int]map[state.Port]bool)
808+ m.ports[instanceId] = instance
809+ }
810+ machine, ok := instance[machineId]
811+ if !ok {
812+ machine = make(map[state.Port]bool)
813+ instance[machineId] = machine
814+ }
815+ ports := cmd.Ports{}
816+ for port, open := range machine {
817+ if open {
818+ ports = append(ports, port)
819+ }
820+ }
821+ return ports, nil
822+}
823+
824+func (m *portManagerMock) OpenPort(instanceId string, machineId int, port state.Port) error {
825+ instance, ok := m.ports[instanceId]
826+ if !ok {
827+ instance = make(map[int]map[state.Port]bool)
828+ m.ports[instanceId] = instance
829+ }
830+ machine, ok := instance[machineId]
831+ if !ok {
832+ machine = make(map[state.Port]bool)
833+ instance[machineId] = machine
834+ }
835+ machine[port] = true
836+ return nil
837+}
838+
839+func (m *portManagerMock) ClosePort(instanceId string, machineId int, port state.Port) error {
840+ instance, ok := m.ports[instanceId]
841+ if !ok {
842+ return fmt.Errorf("illegal instance %q", instanceId)
843+ }
844+ machine, ok := instance[machineId]
845+ if !ok {
846+ return fmt.Errorf("illegal machine %d", machineId)
847+ }
848+ delete(machine, port)
849+ return nil
850+}

Subscribers

People subscribed via source and target branches