Merge lp:~aramh/juju-core/121-state-machiner-watchers-machine-principals7 into lp:~juju/juju-core/trunk

Proposed by Aram Hăvărneanu
Status: Work in progress
Proposed branch: lp:~aramh/juju-core/121-state-machiner-watchers-machine-principals7
Merge into: lp:~juju/juju-core/trunk
Prerequisite: lp:~aramh/juju-core/120-firewaller-new-watcher-units7
Diff against target: 764 lines (+278/-256)
7 files modified
cmd/jujud/machine.go (+1/-1)
state/machine_test.go (+92/-100)
state/unit.go (+1/-1)
state/watcher.go (+132/-140)
worker/machiner/export_test.go (+2/-2)
worker/machiner/machiner.go (+31/-10)
worker/machiner/machiner_test.go (+19/-2)
To merge this branch: bzr merge lp:~aramh/juju-core/121-state-machiner-watchers-machine-principals7
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+133504@code.launchpad.net

Description of the change

state, machiner: new style principal units watcher

Convert the machine principal units watcher to the new, id only model
and refactor the machiner to make use of it.

https://codereview.appspot.com/6814108/

To post a comment you must log in.
Revision history for this message
Aram Hăvărneanu (aramh) wrote :

Reviewers: mp+133504_code.launchpad.net,

Message:
Please take a look.

Description:
state, machiner: new style principal units watcher

Convert the machine principal units watcher to the new, id only model
and refactor the machiner to make use of it.

https://code.launchpad.net/~aramh/juju-core/121-state-machiner-watchers-machine-principals7/+merge/133504

Requires:
https://code.launchpad.net/~aramh/juju-core/120-firewaller-new-watcher-units7/+merge/133269

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/6814108/

Affected files:
   A [revision details]
   M cmd/jujud/machine.go
   M state/machine_test.go
   M state/unit.go
   M state/watcher.go
   M worker/machiner/export_test.go
   M worker/machiner/machiner.go
   M worker/machiner/machiner_test.go

Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (5.1 KiB)

mostly looks good. all superficial comments, other than the logic and
tests in worker/machiner.

https://codereview.appspot.com/6814108/diff/2001/cmd/jujud/machine.go
File cmd/jujud/machine.go (right):

https://codereview.appspot.com/6814108/diff/2001/cmd/jujud/machine.go#newcode107
cmd/jujud/machine.go:107: t = machiner.NewMachiner(m, st,
&a.Conf.StateInfo, a.Conf.DataDir)
i think st should probably be the first argument here, so the workers
are consistent in that respect.

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go
File state/machine_test.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go#newcode763
state/machine_test.go:763: var machinePrincipalsWatchTests = []struct {
can we please leave this in the same place, please, so i can easily see
what has remained the same?

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go#newcode771
state/machine_test.go:771: []string(nil),
just nil?

https://codereview.appspot.com/6814108/diff/2001/state/unit.go
File state/unit.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/unit.go#newcode409
state/unit.go:409: func (u *Unit) IsAssignedTo(m *Machine) bool {
is this necessary? can't we just use AssignedMachineId instead?

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go
File state/watcher.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1259
state/watcher.go:1259: type MachinePrincipalUnitsWatcher struct {
please can we put code block moves in their own CL? i don't disagree on
principle (it is indeed better next to the MachineUnitsWatcher) but it
means we can't easily see the diffs.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1278
state/watcher.go:1278: machine: &Machine{m.st, m.doc}, // Copy so
it may be freely refreshed
i wonder if we should have a Clone or Copy method on Machine, Unit, etc.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1293
state/watcher.go:1293: func (w *MachinePrincipalUnitsWatcher)
updateMachine(pending []string) (new []string, err error) {
i found the name "pending" slightly confusing here. how about using
"changed" and "newChanged" throughout (replacing "pending" and "new")?
then the relationship between updateMachine, merge and the main loop
becomes clearer, i think.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1310
state/watcher.go:1310: doc := unitDoc{}
could we just use w.st.Unit(unit) here? then the logic below could just
use the normal Unit methods (e.g. IsPrincipal, AssignedMachineId).

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1344
state/watcher.go:1344: w.st.watcher.Watch(w.st.machines.Name,
w.machine.doc.Id, w.machine.doc.TxnRevno, machineCh)
could we just use MachineWatcher here?

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1346
state/watcher.go:1346: changes, err := w.updateMachine([]string(nil))
s/[]string(nil)/nil/

https://codereview.appspot.com/6814108/diff/2001/worker/machiner/export_test.go
File worker/machiner/export_test.go (right):

https://codereview.appspot.c...

Read more...

722. By Aram Hăvărneanu

all: merge lp:~aramh/juju-core/120-firewaller-new-watcher-units7

723. By Aram Hăvărneanu

firewaller: change NewMachiner signature so that state.State is the first argument

724. By Aram Hăvărneanu

machiner: use new style Unit.AssignedMachineId instead of Unit.IsAssignedTo(*state.Machine)

725. By Aram Hăvărneanu

state: more principal units watcher tests to original position

726. By Aram Hăvărneanu

machiner: make sure we don't redeploy units when they change lifecycle

727. By Aram Hăvărneanu

machiner: add lifecycle tests to machiner

Revision history for this message
Aram Hăvărneanu (aramh) wrote :
Download full text (6.0 KiB)

Please take a look.

https://codereview.appspot.com/6814108/diff/2001/cmd/jujud/machine.go
File cmd/jujud/machine.go (right):

https://codereview.appspot.com/6814108/diff/2001/cmd/jujud/machine.go#newcode107
cmd/jujud/machine.go:107: t = machiner.NewMachiner(m, st,
&a.Conf.StateInfo, a.Conf.DataDir)
> i think st should probably be the first argument here, so the workers
are
> consistent in that respect.

I agree. Done.

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go
File state/machine_test.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go#newcode763
state/machine_test.go:763: var machinePrincipalsWatchTests = []struct {
On 2012/11/09 09:08:08, rog wrote:
> can we please leave this in the same place, please, so i can easily
see what has
> remained the same?

Done.

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go#newcode771
state/machine_test.go:771: []string(nil),
On 2012/11/09 09:08:08, rog wrote:
> just nil?

No, DeepEquals fails with just nil.

https://codereview.appspot.com/6814108/diff/2001/state/unit.go
File state/unit.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/unit.go#newcode409
state/unit.go:409: func (u *Unit) IsAssignedTo(m *Machine) bool {
On 2012/11/09 09:08:08, rog wrote:
> is this necessary? can't we just use AssignedMachineId instead?

It was impossible to use the previous version of AssignedMachineId, but
I fixed that function instead to return usable errors and I deleted
this.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go
File state/watcher.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1259
state/watcher.go:1259: type MachinePrincipalUnitsWatcher struct {
> please can we put code block moves in their own CL? i don't
> disagree on principle (it is indeed better next to the
> MachineUnitsWatcher) but it means we can't easily see the diffs.

Sorry, no, it was intentional. This is 100% new code, it's not
a variation of the old code. It's a variation of the current
MachineUnitsWatcher code.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1278
state/watcher.go:1278: machine: &Machine{m.st, m.doc}, // Copy so
it may be freely refreshed
> i wonder if we should have a Clone or Copy method on Machine, Unit,
etc.

I wish we had, but niemeyer rules against it in one of the Lisbon
sprints, didn't he?

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1293
state/watcher.go:1293: func (w *MachinePrincipalUnitsWatcher)
updateMachine(pending []string) (new []string, err error) {
> i found the name "pending" slightly confusing here. how about using
"changed"
> and "newChanged" throughout (replacing "pending" and "new")? then the
> relationship between updateMachine, merge and the main loop becomes
clearer, i
> think.

I agree, but what about the MachineUnitsWatcher? This code is a proper
subset of that. If we change anything here, we should change it
everywhere.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1310
state/watcher.go:1310: doc := unitDoc{}
On 2012/11/09 09:08:08, rog wrote:
> could we j...

Read more...

Revision history for this message
Aram Hăvărneanu (aramh) wrote :
Download full text (6.0 KiB)

https://codereview.appspot.com/6814108/diff/2001/cmd/jujud/machine.go
File cmd/jujud/machine.go (right):

https://codereview.appspot.com/6814108/diff/2001/cmd/jujud/machine.go#newcode107
cmd/jujud/machine.go:107: t = machiner.NewMachiner(m, st,
&a.Conf.StateInfo, a.Conf.DataDir)
> i think st should probably be the first argument here, so the workers
are
> consistent in that respect.

I agree. Done.

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go
File state/machine_test.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go#newcode763
state/machine_test.go:763: var machinePrincipalsWatchTests = []struct {
On 2012/11/09 09:08:08, rog wrote:
> can we please leave this in the same place, please, so i can easily
see what has
> remained the same?

Done.

https://codereview.appspot.com/6814108/diff/2001/state/machine_test.go#newcode771
state/machine_test.go:771: []string(nil),
On 2012/11/09 09:08:08, rog wrote:
> just nil?

No, DeepEquals fails with just nil.

https://codereview.appspot.com/6814108/diff/2001/state/unit.go
File state/unit.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/unit.go#newcode409
state/unit.go:409: func (u *Unit) IsAssignedTo(m *Machine) bool {
On 2012/11/09 09:08:08, rog wrote:
> is this necessary? can't we just use AssignedMachineId instead?

It was impossible to use the previous version of AssignedMachineId, but
I fixed that function instead to return usable errors and I deleted
this.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go
File state/watcher.go (right):

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1259
state/watcher.go:1259: type MachinePrincipalUnitsWatcher struct {
> please can we put code block moves in their own CL? i don't
> disagree on principle (it is indeed better next to the
> MachineUnitsWatcher) but it means we can't easily see the diffs.

Sorry, no, it was intentional. This is 100% new code, it's not
a variation of the old code. It's a variation of the current
MachineUnitsWatcher code.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1278
state/watcher.go:1278: machine: &Machine{m.st, m.doc}, // Copy so
it may be freely refreshed
> i wonder if we should have a Clone or Copy method on Machine, Unit,
etc.

I wish we had, but niemeyer rules against it in one of the Lisbon
sprints, didn't he?

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1293
state/watcher.go:1293: func (w *MachinePrincipalUnitsWatcher)
updateMachine(pending []string) (new []string, err error) {
> i found the name "pending" slightly confusing here. how about using
"changed"
> and "newChanged" throughout (replacing "pending" and "new")? then the
> relationship between updateMachine, merge and the main loop becomes
clearer, i
> think.

I agree, but what about the MachineUnitsWatcher? This code is a proper
subset of that. If we change anything here, we should change it
everywhere.

https://codereview.appspot.com/6814108/diff/2001/state/watcher.go#newcode1310
state/watcher.go:1310: doc := unitDoc{}
On 2012/11/09 09:08:08, rog wrote:
> could we just use w.st.Unit(uni...

Read more...

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

That's great. LGTM assuming the following:

https://codereview.appspot.com/6814108/diff/8001/state/machine_test.go
File state/machine_test.go (right):

https://codereview.appspot.com/6814108/diff/8001/state/machine_test.go#newcode325
state/machine_test.go:325: "check initial empty event",
Thanks for the summaries.

https://codereview.appspot.com/6814108/diff/8001/state/unit.go
File state/unit.go (right):

https://codereview.appspot.com/6814108/diff/8001/state/unit.go#newcode417
state/unit.go:417: return &notAssignedError{fmt.Sprintf(format+" is not
assigned to a machine", args...)}
Pre-req has a missing test? It should have a test actually expecting
the NotAssignedError suggested there.

https://codereview.appspot.com/6814108/diff/8001/worker/machiner/machiner.go
File worker/machiner/machiner.go (right):

https://codereview.appspot.com/6814108/diff/8001/worker/machiner/machiner.go#newcode82
worker/machiner/machiner.go:82: assigned = machineId == m.machine.Id()
New logic was added here verifying that it is indeed assigned to the
machine we think it is. This is awesome, but deserves proper testing.

https://codereview.appspot.com/6814108/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

There are open points and the last review is from mid-last-week. Can you
please see what needs to be done to move this forward?

I'm marking this as WIP.

https://codereview.appspot.com/6814108/

Revision history for this message
Aram Hăvărneanu (aramh) wrote :

I'm on vacation for two weeks.

https://codereview.appspot.com/6814108/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

On Mon, Nov 19, 2012 at 10:34 AM, <email address hidden> wrote:
> I'm on vacation for two weeks.

Have fun there. That'd be a good note to send to canonical-juju or similar.

As far as this CL is concerned, I suppose it can wait for you to be back.

--
gustavo @ http://niemeyer.net

Revision history for this message
William Reade (fwereade) wrote :

I have serious reservations about the Machiner, but I don't think it's
any less good than it was before. FWIW the watcher essentially LGTM;
since Aram's on holiday, I'll be branching from here and trying to make
sure all other comments have been addressed, and will worry about the
machiner in a future CL.

https://codereview.appspot.com/6814108/diff/8001/state/watcher.go
File state/watcher.go (right):

https://codereview.appspot.com/6814108/diff/8001/state/watcher.go#newcode1316
state/watcher.go:1316: if err == mgo.ErrNotFound || doc.Principal == ""
&& (doc.MachineId == nil || *doc.MachineId != w.machine.doc.Id) {
I think the principal check is redundant; I see no way for a subordinate
to get in here (and if it does, it's a bug, not just a thing-to-ignore).

https://codereview.appspot.com/6814108/diff/8001/state/watcher.go#newcode1339
state/watcher.go:1339: for _, unit := range w.known {
s/_, //

(surely..?)

https://codereview.appspot.com/6814108/diff/8001/worker/machiner/machiner.go
File worker/machiner/machiner.go (left):

https://codereview.appspot.com/6814108/diff/8001/worker/machiner/machiner.go#oldcode52
worker/machiner/machiner.go:52: // and restart them if not. Also track
units so
Yeah, this is extremely important. If we're now tracking units to figure
out what to do, we should *surely* be doing so persistently.

https://codereview.appspot.com/6814108/diff/8001/worker/machiner/machiner.go#oldcode72
worker/machiner/machiner.go:72: if err := m.localContainer.Deploy(u,
m.stateInfo, m.tools); err != nil {
Isn't this actually a serious problem that should cause us to return an
error? If the agent is doing its job, we'll get retried anyway...

https://codereview.appspot.com/6814108/diff/8001/worker/machiner/machiner.go
File worker/machiner/machiner.go (right):

https://codereview.appspot.com/6814108/diff/8001/worker/machiner/machiner.go#newcode92
worker/machiner/machiner.go:92: } else if !assigned && known {
I can accept handling reassignments that never happen in practice, but
we don't seem to be destroying dead units; surely that's necessary?

https://codereview.appspot.com/6814108/

Unmerged revisions

727. By Aram Hăvărneanu

machiner: add lifecycle tests to machiner

726. By Aram Hăvărneanu

machiner: make sure we don't redeploy units when they change lifecycle

725. By Aram Hăvărneanu

state: more principal units watcher tests to original position

724. By Aram Hăvărneanu

machiner: use new style Unit.AssignedMachineId instead of Unit.IsAssignedTo(*state.Machine)

723. By Aram Hăvărneanu

firewaller: change NewMachiner signature so that state.State is the first argument

722. By Aram Hăvărneanu

all: merge lp:~aramh/juju-core/120-firewaller-new-watcher-units7

721. By Aram Hăvărneanu

machiner: convert to new style principal units watcher

720. By Aram Hăvărneanu

machiner: change NewMachiner signature to take a *state.State

719. By Aram Hăvărneanu

state: convert MachinePrincipalUnitsWatcher to a new style watcher

718. By Aram Hăvărneanu

state: update machine principal units watcher table driven tests to new watcher model

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'cmd/jujud/machine.go'
2--- cmd/jujud/machine.go 2012-10-12 07:40:16 +0000
3+++ cmd/jujud/machine.go 2012-11-09 16:51:25 +0000
4@@ -104,7 +104,7 @@
5 var t task
6 switch w {
7 case state.MachinerWorker:
8- t = machiner.NewMachiner(m, &a.Conf.StateInfo, a.Conf.DataDir)
9+ t = machiner.NewMachiner(st, m, &a.Conf.StateInfo, a.Conf.DataDir)
10 case state.ProvisionerWorker:
11 t = provisioner.NewProvisioner(st)
12 case state.FirewallerWorker:
13
14=== modified file 'state/machine_test.go'
15--- state/machine_test.go 2012-11-02 12:25:01 +0000
16+++ state/machine_test.go 2012-11-09 16:51:25 +0000
17@@ -317,34 +317,39 @@
18 }
19
20 var machinePrincipalsWatchTests = []struct {
21+ summary string
22 test func(*C, *MachineSuite, *state.Service)
23- added []string
24- removed []string
25+ changes []string
26 }{
27 {
28- test: func(_ *C, _ *MachineSuite, _ *state.Service) {},
29- added: []string{},
30- },
31- {
32- test: func(c *C, s *MachineSuite, service *state.Service) {
33- unit, err := service.AddUnit()
34- c.Assert(err, IsNil)
35- err = unit.AssignToMachine(s.machine)
36- c.Assert(err, IsNil)
37- },
38- added: []string{"mysql/0"},
39- },
40- {
41- test: func(c *C, s *MachineSuite, service *state.Service) {
42- unit, err := service.AddUnit()
43- c.Assert(err, IsNil)
44- err = unit.AssignToMachine(s.machine)
45- c.Assert(err, IsNil)
46- },
47- added: []string{"mysql/1"},
48- },
49- {
50- test: func(c *C, s *MachineSuite, service *state.Service) {
51+ "check initial empty event",
52+ func(_ *C, _ *MachineSuite, _ *state.Service) {},
53+ []string(nil),
54+ }, {
55+ "add a unit",
56+ func(c *C, s *MachineSuite, service *state.Service) {
57+ unit, err := service.AddUnit()
58+ c.Assert(err, IsNil)
59+ err = unit.AssignToMachine(s.machine)
60+ c.Assert(err, IsNil)
61+ },
62+ []string{"mysql/0"},
63+ }, {
64+ "ignore unrelated change",
65+ func(c *C, s *MachineSuite, service *state.Service) {
66+ unit, err := service.Unit("mysql/0")
67+ c.Assert(err, IsNil)
68+ err = unit.SetPrivateAddress("what.ever")
69+ c.Assert(err, IsNil)
70+ unit, err = service.AddUnit()
71+ c.Assert(err, IsNil)
72+ err = unit.AssignToMachine(s.machine)
73+ c.Assert(err, IsNil)
74+ },
75+ []string{"mysql/1"},
76+ }, {
77+ "add two units",
78+ func(c *C, s *MachineSuite, service *state.Service) {
79 unit2, err := service.AddUnit()
80 c.Assert(err, IsNil)
81 err = unit2.AssignToMachine(s.machine)
82@@ -354,38 +359,45 @@
83 err = unit3.AssignToMachine(s.machine)
84 c.Assert(err, IsNil)
85 },
86- added: []string{"mysql/2", "mysql/3"},
87- },
88- {
89- test: func(c *C, s *MachineSuite, service *state.Service) {
90+ []string{"mysql/2", "mysql/3"},
91+ }, {
92+ "set unit to dying",
93+ func(c *C, s *MachineSuite, service *state.Service) {
94+ unit3, err := service.Unit("mysql/3")
95+ c.Assert(err, IsNil)
96+ err = unit3.EnsureDying()
97+ c.Assert(err, IsNil)
98+ },
99+ []string{"mysql/3"},
100+ }, {
101+ "set unit to dead",
102+ func(c *C, s *MachineSuite, service *state.Service) {
103 unit3, err := service.Unit("mysql/3")
104 c.Assert(err, IsNil)
105 err = unit3.EnsureDead()
106 c.Assert(err, IsNil)
107- err = service.RemoveUnit(unit3)
108- c.Assert(err, IsNil)
109 },
110- removed: []string{"mysql/3"},
111- },
112- {
113- test: func(c *C, s *MachineSuite, service *state.Service) {
114+ []string{"mysql/3"},
115+ }, {
116+ "set multiple units to dying, dead, and remove already reported dead units",
117+ func(c *C, s *MachineSuite, service *state.Service) {
118 unit0, err := service.Unit("mysql/0")
119 c.Assert(err, IsNil)
120- err = unit0.EnsureDead()
121- c.Assert(err, IsNil)
122- err = service.RemoveUnit(unit0)
123+ err = unit0.EnsureDying()
124 c.Assert(err, IsNil)
125 unit2, err := service.Unit("mysql/2")
126 c.Assert(err, IsNil)
127 err = unit2.EnsureDead()
128 c.Assert(err, IsNil)
129- err = service.RemoveUnit(unit2)
130+ unit3, err := service.Unit("mysql/3")
131+ c.Assert(err, IsNil)
132+ err = service.RemoveUnit(unit3)
133 c.Assert(err, IsNil)
134 },
135- removed: []string{"mysql/0", "mysql/2"},
136- },
137- {
138- test: func(c *C, s *MachineSuite, service *state.Service) {
139+ []string{"mysql/0", "mysql/2"},
140+ }, {
141+ "add unit and remove unit at the same time",
142+ func(c *C, s *MachineSuite, service *state.Service) {
143 unit4, err := service.AddUnit()
144 c.Assert(err, IsNil)
145 err = unit4.AssignToMachine(s.machine)
146@@ -397,11 +409,10 @@
147 err = service.RemoveUnit(unit1)
148 c.Assert(err, IsNil)
149 },
150- added: []string{"mysql/4"},
151- removed: []string{"mysql/1"},
152- },
153- {
154- test: func(c *C, s *MachineSuite, service *state.Service) {
155+ []string{"mysql/1", "mysql/4"},
156+ }, {
157+ "add and remove many units at once",
158+ func(c *C, s *MachineSuite, service *state.Service) {
159 units := [20]*state.Unit{}
160 var err error
161 for i := 0; i < len(units); i++ {
162@@ -417,10 +428,10 @@
163 c.Assert(err, IsNil)
164 }
165 },
166- added: []string{"mysql/10", "mysql/11", "mysql/12", "mysql/13", "mysql/14", "mysql/5", "mysql/6", "mysql/7", "mysql/8", "mysql/9"},
167- },
168- {
169- test: func(c *C, s *MachineSuite, service *state.Service) {
170+ []string{"mysql/10", "mysql/11", "mysql/12", "mysql/13", "mysql/14", "mysql/5", "mysql/6", "mysql/7", "mysql/8", "mysql/9"},
171+ }, {
172+ "report dead when first seen and also add a new unit",
173+ func(c *C, s *MachineSuite, service *state.Service) {
174 unit25, err := service.AddUnit()
175 c.Assert(err, IsNil)
176 err = unit25.AssignToMachine(s.machine)
177@@ -432,11 +443,10 @@
178 err = service.RemoveUnit(unit9)
179 c.Assert(err, IsNil)
180 },
181- added: []string{"mysql/25"},
182- removed: []string{"mysql/9"},
183- },
184- {
185- test: func(c *C, s *MachineSuite, service *state.Service) {
186+ []string{"mysql/9", "mysql/25"},
187+ }, {
188+ "ignore units not assigned to this machine",
189+ func(c *C, s *MachineSuite, service *state.Service) {
190 unit26, err := service.AddUnit()
191 c.Assert(err, IsNil)
192 err = unit26.AssignToMachine(s.machine)
193@@ -479,11 +489,10 @@
194 err = service.RemoveUnit(unit14)
195 c.Assert(err, IsNil)
196 },
197- added: []string{"bacon/0", "bacon/1", "mysql/26", "mysql/27"},
198- removed: []string{"mysql/14"},
199- },
200- {
201- test: func(c *C, s *MachineSuite, service *state.Service) {
202+ []string{"bacon/0", "bacon/1", "mysql/26", "mysql/27", "mysql/14"},
203+ }, {
204+ "ignore subordinate units",
205+ func(c *C, s *MachineSuite, service *state.Service) {
206 unit28, err := service.AddUnit()
207 c.Assert(err, IsNil)
208 err = unit28.AssignToMachine(s.machine)
209@@ -502,7 +511,19 @@
210 _, err = logService.AddUnitSubordinateTo(unit29)
211 c.Assert(err, IsNil)
212 },
213- added: []string{"mysql/28", "mysql/29"},
214+ []string{"mysql/28", "mysql/29"},
215+ }, {
216+ "unassign from machine",
217+ func(c *C, s *MachineSuite, service *state.Service) {
218+ unit28, err := service.Unit("mysql/28")
219+ c.Assert(err, IsNil)
220+ err = unit28.UnassignFromMachine()
221+ c.Assert(err, IsNil)
222+ unit29, err := service.Unit("mysql/29")
223+ c.Assert(err, IsNil)
224+ err = unit29.UnassignFromMachine()
225+ },
226+ []string{"mysql/28", "mysql/29"},
227 },
228 }
229
230@@ -515,21 +536,23 @@
231 c.Assert(unitWatcher.Stop(), IsNil)
232 }()
233 for i, test := range machinePrincipalsWatchTests {
234- c.Logf("test %d", i)
235+ c.Logf("test %d: %s", i, test.summary)
236 test.test(c, s, service)
237 s.State.StartSync()
238- got := &state.MachinePrincipalUnitsChange{}
239+ var got []string
240 for {
241 select {
242 case new, ok := <-unitWatcher.Changes():
243 c.Assert(ok, Equals, true)
244- addMachineUnitChanges(got, new)
245- if moreMachinePrincipalUnitsRequired(got, test.added, test.removed) {
246+ got = append(got, new...)
247+ if len(got) < len(test.changes) {
248 continue
249 }
250- assertSameMachinePrincipalUnits(c, got, test.added, test.removed)
251+ sort.Strings(got)
252+ sort.Strings(test.changes)
253+ c.Assert(got, DeepEquals, test.changes)
254 case <-time.After(500 * time.Millisecond):
255- c.Fatalf("did not get change, want: added: %#v, removed: %#v, got: %#v", test.added, test.removed, got)
256+ c.Fatalf("did not get change, want: %#v, got: %#v", test.changes, got)
257 }
258 break
259 }
260@@ -541,37 +564,6 @@
261 }
262 }
263
264-func moreMachinePrincipalUnitsRequired(got *state.MachinePrincipalUnitsChange, added, removed []string) bool {
265- return len(got.Added)+len(got.Removed) < len(added)+len(removed)
266-}
267-
268-func addMachineUnitChanges(changes *state.MachinePrincipalUnitsChange, more *state.MachinePrincipalUnitsChange) {
269- changes.Added = append(changes.Added, more.Added...)
270- changes.Removed = append(changes.Removed, more.Removed...)
271-}
272-
273-func assertSameMachinePrincipalUnits(c *C, change *state.MachinePrincipalUnitsChange, added, removed []string) {
274- c.Assert(change, NotNil)
275- if len(added) == 0 {
276- added = nil
277- }
278- if len(removed) == 0 {
279- removed = nil
280- }
281- sort.Sort(unitSlice(change.Added))
282- sort.Sort(unitSlice(change.Removed))
283- var got []string
284- for _, g := range change.Added {
285- got = append(got, g.Name())
286- }
287- c.Assert(got, DeepEquals, added)
288- got = nil
289- for _, g := range change.Removed {
290- got = append(got, g.Name())
291- }
292- c.Assert(got, DeepEquals, removed)
293-}
294-
295 var machineUnitsWatchTests = []struct {
296 summary string
297 test func(*C, *MachineSuite, *state.Unit, *state.Charm)
298
299=== modified file 'state/unit.go'
300--- state/unit.go 2012-11-09 16:51:25 +0000
301+++ state/unit.go 2012-11-09 16:51:25 +0000
302@@ -414,7 +414,7 @@
303 }
304
305 func notAssigned(format string, args ...interface{}) error {
306- return &NotFoundError{fmt.Sprintf(format+" is not assigned to a machine", args...)}
307+ return &notAssignedError{fmt.Sprintf(format+" is not assigned to a machine", args...)}
308 }
309
310 func IsNotAssigned(err error) bool {
311
312=== modified file 'state/watcher.go'
313--- state/watcher.go 2012-11-02 12:25:01 +0000
314+++ state/watcher.go 2012-11-09 16:51:25 +0000
315@@ -44,22 +44,6 @@
316 Left []string
317 }
318
319-// MachinePrincipalUnitsWatcher observes the assignment and removal of units
320-// to and from a machine.
321-type MachinePrincipalUnitsWatcher struct {
322- commonWatcher
323- machine *Machine
324- changeChan chan *MachinePrincipalUnitsChange
325- knownUnits map[string]*Unit
326-}
327-
328-// MachinePrincipalUnitsChange contains information about units that have been
329-// assigned to or removed from the machine.
330-type MachinePrincipalUnitsChange struct {
331- Added []*Unit
332- Removed []*Unit
333-}
334-
335 func hasString(changes []string, name string) bool {
336 for _, v := range changes {
337 if v == name {
338@@ -575,130 +559,6 @@
339 return nil
340 }
341
342-// WatchPrincipalUnits returns a watcher for observing units being
343-// added to or removed from the machine.
344-func (m *Machine) WatchPrincipalUnits() *MachinePrincipalUnitsWatcher {
345- return newMachinePrincipalUnitsWatcher(m)
346-}
347-
348-// newMachinePrincipalUnitsWatcher creates and starts a watcher to watch information
349-// about units being added to or deleted from the machine.
350-func newMachinePrincipalUnitsWatcher(m *Machine) *MachinePrincipalUnitsWatcher {
351- w := &MachinePrincipalUnitsWatcher{
352- changeChan: make(chan *MachinePrincipalUnitsChange),
353- machine: m,
354- knownUnits: make(map[string]*Unit),
355- commonWatcher: commonWatcher{st: m.st},
356- }
357- go func() {
358- defer w.tomb.Done()
359- defer close(w.changeChan)
360- w.tomb.Kill(w.loop())
361- }()
362- return w
363-}
364-
365-// Changes returns a channel that will receive changes when units are
366-// added or deleted. The Added field in the first event on the channel
367-// holds the initial state as returned by Machine.Units.
368-func (w *MachinePrincipalUnitsWatcher) Changes() <-chan *MachinePrincipalUnitsChange {
369- return w.changeChan
370-}
371-
372-func (w *MachinePrincipalUnitsWatcher) mergeChange(changes *MachinePrincipalUnitsChange, ch watcher.Change) (err error) {
373- err = w.machine.Refresh()
374- if err != nil {
375- return err
376- }
377- units := make(map[string]*Unit)
378- for _, name := range w.machine.doc.Principals {
379- var unit *Unit
380- doc := &unitDoc{}
381- if _, ok := w.knownUnits[name]; !ok {
382- err = w.st.units.FindId(name).One(doc)
383- if err == mgo.ErrNotFound {
384- continue
385- }
386- if err != nil {
387- return err
388- }
389- unit = newUnit(w.st, doc)
390- changes.Added = append(changes.Added, unit)
391- w.knownUnits[name] = unit
392- }
393- units[name] = unit
394- }
395- for name, unit := range w.knownUnits {
396- if _, ok := units[name]; !ok {
397- changes.Removed = append(changes.Removed, unit)
398- delete(w.knownUnits, name)
399- }
400- }
401- return nil
402-}
403-
404-func (changes *MachinePrincipalUnitsChange) isEmpty() bool {
405- return len(changes.Added)+len(changes.Removed) == 0
406-}
407-
408-func (w *MachinePrincipalUnitsWatcher) getInitialEvent() (initial *MachinePrincipalUnitsChange, err error) {
409- changes := &MachinePrincipalUnitsChange{}
410- docs := []unitDoc{}
411- err = w.st.units.Find(D{{"_id", D{{"$in", w.machine.doc.Principals}}}}).All(&docs)
412- if err != nil {
413- return nil, err
414- }
415- for _, doc := range docs {
416- unit := newUnit(w.st, &doc)
417- w.knownUnits[doc.Name] = unit
418- changes.Added = append(changes.Added, unit)
419- }
420- return changes, nil
421-}
422-
423-func (w *MachinePrincipalUnitsWatcher) loop() (err error) {
424- ch := make(chan watcher.Change)
425- w.st.watcher.Watch(w.st.machines.Name, w.machine.doc.Id, w.machine.doc.TxnRevno, ch)
426- defer w.st.watcher.Unwatch(w.st.machines.Name, w.machine.doc.Id, ch)
427- changes, err := w.getInitialEvent()
428- if err != nil {
429- return err
430- }
431- for {
432- for changes != nil {
433- select {
434- case <-w.st.watcher.Dead():
435- return watcher.MustErr(w.st.watcher)
436- case <-w.tomb.Dying():
437- return tomb.ErrDying
438- case c := <-ch:
439- err := w.mergeChange(changes, c)
440- if err != nil {
441- return err
442- }
443- case w.changeChan <- changes:
444- changes = nil
445- }
446- }
447- select {
448- case <-w.st.watcher.Dead():
449- return watcher.MustErr(w.st.watcher)
450- case <-w.tomb.Dying():
451- return tomb.ErrDying
452- case c := <-ch:
453- changes = &MachinePrincipalUnitsChange{}
454- err := w.mergeChange(changes, c)
455- if err != nil {
456- return err
457- }
458- if changes.isEmpty() {
459- changes = nil
460- }
461- }
462- }
463- return nil
464-}
465-
466 func newRelationScopeWatcher(st *State, scope, ignore string) *RelationScopeWatcher {
467 w := &RelationScopeWatcher{
468 commonWatcher: commonWatcher{st: st},
469@@ -1385,3 +1245,135 @@
470 }
471 panic("unreachable")
472 }
473+
474+// MachinePrincipalUnitsWatcher notifies about assignments and lifecycle
475+// changes for all principal units assigned to a machine.
476+//
477+// The first event emitted contains the unit names of all principal units
478+// currently assigned to the machine, irrespective of their life state. From
479+// then on, a new event is emitted whenever a principal unit is assigned
480+// to or unassigned from the machine, or the lifecycle of a unit that is
481+// currently assigned to the machine changes.
482+//
483+// After a unit is found to be Dead, no further event will include it.
484+type MachinePrincipalUnitsWatcher struct {
485+ commonWatcher
486+ machine *Machine
487+ out chan []string
488+ in chan watcher.Change
489+ known map[string]Life
490+}
491+
492+// WatchUnits returns a new MachinePrincipalUnitsWatcher for m.
493+func (m *Machine) WatchPrincipalUnits() *MachinePrincipalUnitsWatcher {
494+ return newMachinePrincipalUnitsWatcher(m)
495+}
496+
497+func newMachinePrincipalUnitsWatcher(m *Machine) *MachinePrincipalUnitsWatcher {
498+ w := &MachinePrincipalUnitsWatcher{
499+ commonWatcher: commonWatcher{st: m.st},
500+ out: make(chan []string),
501+ in: make(chan watcher.Change),
502+ known: make(map[string]Life),
503+ machine: &Machine{m.st, m.doc}, // Copy so it may be freely refreshed
504+ }
505+ go func() {
506+ defer w.tomb.Done()
507+ defer close(w.out)
508+ w.tomb.Kill(w.loop())
509+ }()
510+ return w
511+}
512+
513+// Changes returns the event channel for w.
514+func (w *MachinePrincipalUnitsWatcher) Changes() <-chan []string {
515+ return w.out
516+}
517+
518+func (w *MachinePrincipalUnitsWatcher) updateMachine(pending []string) (new []string, err error) {
519+ err = w.machine.Refresh()
520+ if err != nil {
521+ return nil, err
522+ }
523+ for _, unit := range w.machine.doc.Principals {
524+ if _, ok := w.known[unit]; !ok {
525+ pending, err = w.merge(pending, unit)
526+ if err != nil {
527+ return nil, err
528+ }
529+ }
530+ }
531+ return pending, nil
532+}
533+
534+func (w *MachinePrincipalUnitsWatcher) merge(pending []string, unit string) (new []string, err error) {
535+ doc := unitDoc{}
536+ err = w.st.units.FindId(unit).One(&doc)
537+ if err != nil && err != mgo.ErrNotFound {
538+ return nil, err
539+ }
540+ life, known := w.known[unit]
541+ if err == mgo.ErrNotFound || doc.Principal == "" && (doc.MachineId == nil || *doc.MachineId != w.machine.doc.Id) {
542+ // Unit was removed or unassigned from w.machine.
543+ if known {
544+ delete(w.known, unit)
545+ w.st.watcher.Unwatch(w.st.units.Name, unit, w.in)
546+ if life != Dead && !hasString(pending, unit) {
547+ pending = append(pending, unit)
548+ }
549+ }
550+ return pending, nil
551+ }
552+ if !known {
553+ w.st.watcher.Watch(w.st.units.Name, unit, doc.TxnRevno, w.in)
554+ pending = append(pending, unit)
555+ } else if life != doc.Life && !hasString(pending, unit) {
556+ pending = append(pending, unit)
557+ }
558+ w.known[unit] = doc.Life
559+ return pending, nil
560+}
561+
562+func (w *MachinePrincipalUnitsWatcher) loop() (err error) {
563+ defer func() {
564+ for _, unit := range w.known {
565+ w.st.watcher.Unwatch(w.st.units.Name, unit, w.in)
566+ }
567+ }()
568+ machineCh := make(chan watcher.Change)
569+ w.st.watcher.Watch(w.st.machines.Name, w.machine.doc.Id, w.machine.doc.TxnRevno, machineCh)
570+ defer w.st.watcher.Unwatch(w.st.machines.Name, w.machine.doc.Id, machineCh)
571+ changes, err := w.updateMachine([]string(nil))
572+ if err != nil {
573+ return err
574+ }
575+ out := w.out
576+ for {
577+ select {
578+ case <-w.st.watcher.Dead():
579+ return watcher.MustErr(w.st.watcher)
580+ case <-w.tomb.Dying():
581+ return tomb.ErrDying
582+ case <-machineCh:
583+ changes, err = w.updateMachine(changes)
584+ if err != nil {
585+ return err
586+ }
587+ if len(changes) > 0 {
588+ out = w.out
589+ }
590+ case c := <-w.in:
591+ changes, err = w.merge(changes, c.Id.(string))
592+ if err != nil {
593+ return err
594+ }
595+ if len(changes) > 0 {
596+ out = w.out
597+ }
598+ case out <- changes:
599+ out = nil
600+ changes = nil
601+ }
602+ }
603+ panic("unreachable")
604+}
605
606=== modified file 'worker/machiner/export_test.go'
607--- worker/machiner/export_test.go 2012-09-12 17:06:25 +0000
608+++ worker/machiner/export_test.go 2012-11-09 16:51:25 +0000
609@@ -5,6 +5,6 @@
610 "launchpad.net/juju-core/state"
611 )
612
613-func NewMachinerWithContainer(m *state.Machine, info *state.Info, dataDir string, cont container.Container) *Machiner {
614- return newMachiner(m, info, dataDir, cont)
615+func NewMachinerWithContainer(st *state.State, m *state.Machine, info *state.Info, dataDir string, cont container.Container) *Machiner {
616+ return newMachiner(st, m, info, dataDir, cont)
617 }
618
619=== modified file 'worker/machiner/machiner.go'
620--- worker/machiner/machiner.go 2012-10-11 14:52:21 +0000
621+++ worker/machiner/machiner.go 2012-11-09 16:51:25 +0000
622@@ -13,6 +13,7 @@
623
624 // Machiner represents a running machine agent.
625 type Machiner struct {
626+ st *state.State
627 tomb tomb.Tomb
628 machine *state.Machine
629 localContainer container.Container
630@@ -23,17 +24,18 @@
631 // NewMachiner starts a machine agent running that
632 // deploys agents in the given directory.
633 // The Machiner dies when it encounters an error.
634-func NewMachiner(machine *state.Machine, info *state.Info, dataDir string) *Machiner {
635+func NewMachiner(st *state.State, machine *state.Machine, info *state.Info, dataDir string) *Machiner {
636 cont := &container.Simple{DataDir: dataDir}
637- return newMachiner(machine, info, dataDir, cont)
638+ return newMachiner(st, machine, info, dataDir, cont)
639 }
640
641-func newMachiner(machine *state.Machine, info *state.Info, dataDir string, cont container.Container) *Machiner {
642+func newMachiner(st *state.State, machine *state.Machine, info *state.Info, dataDir string, cont container.Container) *Machiner {
643 tools, err := environs.ReadTools(dataDir, version.Current)
644 if err != nil {
645 tools = &state.Tools{Binary: version.Current}
646 }
647 m := &Machiner{
648+ st: st,
649 machine: machine,
650 stateInfo: info,
651 tools: tools,
652@@ -47,6 +49,7 @@
653 defer m.tomb.Done()
654 w := m.machine.WatchPrincipalUnits()
655 defer watcher.Stop(w, &m.tomb)
656+ units := map[string]*state.Unit{}
657
658 // TODO read initial units, check if they're running
659 // and restart them if not. Also track units so
660@@ -60,19 +63,37 @@
661 m.tomb.Kill(watcher.MustErr(w))
662 return
663 }
664- for _, u := range change.Removed {
665- if u.IsPrincipal() {
666- if err := m.localContainer.Destroy(u); err != nil {
667- log.Printf("worker/machiner: cannot destroy unit %s: %v", u.Name(), err)
668+ for _, uname := range change {
669+ assigned := false
670+ u, err := m.st.Unit(uname)
671+ if err != nil {
672+ if !state.IsNotFound(err) {
673+ m.tomb.Kill(err)
674+ return
675+ }
676+ } else {
677+ machineId, err := u.AssignedMachineId()
678+ if err != nil {
679+ if !state.IsNotAssigned(err) {
680+ m.tomb.Kill(err)
681+ return
682+ }
683+ } else {
684+ assigned = machineId == m.machine.Id()
685 }
686 }
687- }
688- for _, u := range change.Added {
689- if u.IsPrincipal() {
690+ _, known := units[uname]
691+ if assigned && !known {
692 if err := m.localContainer.Deploy(u, m.stateInfo, m.tools); err != nil {
693 // TODO put unit into a queue to retry the deploy.
694 log.Printf("worker/machiner: cannot deploy unit %s: %v", u.Name(), err)
695 }
696+ units[uname] = u
697+ } else if !assigned && known {
698+ if err := m.localContainer.Destroy(units[uname]); err != nil {
699+ log.Printf("worker/machiner: cannot destroy unit %s: %v", u.Name(), err)
700+ }
701+ delete(units, uname)
702 }
703 }
704 }
705
706=== modified file 'worker/machiner/machiner_test.go'
707--- worker/machiner/machiner_test.go 2012-09-28 07:21:43 +0000
708+++ worker/machiner/machiner_test.go 2012-11-09 16:51:25 +0000
709@@ -26,7 +26,7 @@
710 m, err := s.State.AddMachine(state.MachinerWorker)
711 c.Assert(err, IsNil)
712
713- p := machiner.NewMachiner(m, &state.Info{}, c.MkDir())
714+ p := machiner.NewMachiner(s.State, m, &state.Info{}, c.MkDir())
715 c.Assert(p.Stop(), IsNil)
716 }
717
718@@ -38,6 +38,8 @@
719 c.Assert(err, IsNil)
720 d1, err := s.State.AddService("d1", dummyCharm)
721 c.Assert(err, IsNil)
722+ d2, err := s.State.AddService("d2", dummyCharm)
723+ c.Assert(err, IsNil)
724 sub0, err := s.State.AddService("sub0", loggingCharm)
725 c.Assert(err, IsNil)
726
727@@ -50,6 +52,9 @@
728 ud1, err := d1.AddUnit()
729 c.Assert(err, IsNil)
730
731+ ud2, err := d2.AddUnit()
732+ c.Assert(err, IsNil)
733+
734 m0, err := s.State.AddMachine(state.MachinerWorker)
735 c.Assert(err, IsNil)
736
737@@ -67,7 +72,7 @@
738 expectedInfo: stateInfo,
739 action: make(chan string, 5),
740 }
741- machiner := machiner.NewMachinerWithContainer(m0, stateInfo, s.DataDir(), dcontainer)
742+ machiner := machiner.NewMachinerWithContainer(s.State, m0, stateInfo, s.DataDir(), dcontainer)
743 defer func() {
744 err := machiner.Stop()
745 c.Assert(err, IsNil)
746@@ -88,6 +93,18 @@
747 []string{"+d1/0"},
748 }, {
749 func() {
750+ err := ud2.AssignToMachine(m0)
751+ c.Assert(err, IsNil)
752+ },
753+ []string{"+d2/0"},
754+ }, {
755+ func() {
756+ err := ud2.EnsureDying()
757+ c.Assert(err, IsNil)
758+ },
759+ nil,
760+ }, {
761+ func() {
762 err := ud0.UnassignFromMachine()
763 c.Assert(err, IsNil)
764 },

Subscribers

People subscribed via source and target branches