Merge lp:~aramh/juju-core/121-state-machiner-watchers-machine-principals7 into lp:~juju/juju-core/trunk
- 121-state-machiner-watchers-machine-principals7
- Merge into trunk
Status: | Work in progress |
---|---|
Proposed branch: | lp:~aramh/juju-core/121-state-machiner-watchers-machine-principals7 |
Merge into: | lp:~juju/juju-core/trunk |
Prerequisite: | lp:~aramh/juju-core/120-firewaller-new-watcher-units7 |
Diff against target: |
764 lines (+278/-256) 7 files modified
cmd/jujud/machine.go (+1/-1) state/machine_test.go (+92/-100) state/unit.go (+1/-1) state/watcher.go (+132/-140) worker/machiner/export_test.go (+2/-2) worker/machiner/machiner.go (+31/-10) worker/machiner/machiner_test.go (+19/-2) |
To merge this branch: | bzr merge lp:~aramh/juju-core/121-state-machiner-watchers-machine-principals7 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
The Go Language Gophers | Pending | ||
Review via email: mp+133504@code.launchpad.net |
Commit message
Description of the change
state, machiner: new style principal units watcher
Convert the machine principal units watcher to the new, id only model
and refactor the machiner to make use of it.
Roger Peppe (rogpeppe) wrote : | # |
mostly looks good. all superficial comments, other than the logic and
tests in worker/machiner.
https:/
File cmd/jujud/
https:/
cmd/jujud/
&a.Conf.StateInfo, a.Conf.DataDir)
i think st should probably be the first argument here, so the workers
are consistent in that respect.
https:/
File state/machine_
https:/
state/machine_
can we please leave this in the same place, please, so i can easily see
what has remained the same?
https:/
state/machine_
just nil?
https:/
File state/unit.go (right):
https:/
state/unit.go:409: func (u *Unit) IsAssignedTo(m *Machine) bool {
is this necessary? can't we just use AssignedMachineId instead?
https:/
File state/watcher.go (right):
https:/
state/watcher.
please can we put code block moves in their own CL? i don't disagree on
principle (it is indeed better next to the MachineUnitsWat
means we can't easily see the diffs.
https:/
state/watcher.
it may be freely refreshed
i wonder if we should have a Clone or Copy method on Machine, Unit, etc.
https:/
state/watcher.
updateMachine(
i found the name "pending" slightly confusing here. how about using
"changed" and "newChanged" throughout (replacing "pending" and "new")?
then the relationship between updateMachine, merge and the main loop
becomes clearer, i think.
https:/
state/watcher.
could we just use w.st.Unit(unit) here? then the logic below could just
use the normal Unit methods (e.g. IsPrincipal, AssignedMachineId).
https:/
state/watcher.
w.machine.doc.Id, w.machine.
could we just use MachineWatcher here?
https:/
state/watcher.
s/[]string(
https:/
File worker/
- 722. By Aram Hăvărneanu
-
all: merge lp:~aramh/juju-core/120-firewaller-new-watcher-units7
- 723. By Aram Hăvărneanu
-
firewaller: change NewMachiner signature so that state.State is the first argument
- 724. By Aram Hăvărneanu
-
machiner: use new style Unit.AssignedMa
chineId instead of Unit.IsAssigned To(*state. Machine) - 725. By Aram Hăvărneanu
-
state: more principal units watcher tests to original position
- 726. By Aram Hăvărneanu
-
machiner: make sure we don't redeploy units when they change lifecycle
- 727. By Aram Hăvărneanu
-
machiner: add lifecycle tests to machiner
Aram Hăvărneanu (aramh) wrote : | # |
Please take a look.
https:/
File cmd/jujud/
https:/
cmd/jujud/
&a.Conf.StateInfo, a.Conf.DataDir)
> i think st should probably be the first argument here, so the workers
are
> consistent in that respect.
I agree. Done.
https:/
File state/machine_
https:/
state/machine_
On 2012/11/09 09:08:08, rog wrote:
> can we please leave this in the same place, please, so i can easily
see what has
> remained the same?
Done.
https:/
state/machine_
On 2012/11/09 09:08:08, rog wrote:
> just nil?
No, DeepEquals fails with just nil.
https:/
File state/unit.go (right):
https:/
state/unit.go:409: func (u *Unit) IsAssignedTo(m *Machine) bool {
On 2012/11/09 09:08:08, rog wrote:
> is this necessary? can't we just use AssignedMachineId instead?
It was impossible to use the previous version of AssignedMachineId, but
I fixed that function instead to return usable errors and I deleted
this.
https:/
File state/watcher.go (right):
https:/
state/watcher.
> please can we put code block moves in their own CL? i don't
> disagree on principle (it is indeed better next to the
> MachineUnitsWat
Sorry, no, it was intentional. This is 100% new code, it's not
a variation of the old code. It's a variation of the current
MachineUnitsWatcher code.
https:/
state/watcher.
it may be freely refreshed
> i wonder if we should have a Clone or Copy method on Machine, Unit,
etc.
I wish we had, but niemeyer rules against it in one of the Lisbon
sprints, didn't he?
https:/
state/watcher.
updateMachine(
> i found the name "pending" slightly confusing here. how about using
"changed"
> and "newChanged" throughout (replacing "pending" and "new")? then the
> relationship between updateMachine, merge and the main loop becomes
clearer, i
> think.
I agree, but what about the MachineUnitsWat
subset of that. If we change anything here, we should change it
everywhere.
https:/
state/watcher.
On 2012/11/09 09:08:08, rog wrote:
> could we j...
Aram Hăvărneanu (aramh) wrote : | # |
https:/
File cmd/jujud/
https:/
cmd/jujud/
&a.Conf.StateInfo, a.Conf.DataDir)
> i think st should probably be the first argument here, so the workers
are
> consistent in that respect.
I agree. Done.
https:/
File state/machine_
https:/
state/machine_
On 2012/11/09 09:08:08, rog wrote:
> can we please leave this in the same place, please, so i can easily
see what has
> remained the same?
Done.
https:/
state/machine_
On 2012/11/09 09:08:08, rog wrote:
> just nil?
No, DeepEquals fails with just nil.
https:/
File state/unit.go (right):
https:/
state/unit.go:409: func (u *Unit) IsAssignedTo(m *Machine) bool {
On 2012/11/09 09:08:08, rog wrote:
> is this necessary? can't we just use AssignedMachineId instead?
It was impossible to use the previous version of AssignedMachineId, but
I fixed that function instead to return usable errors and I deleted
this.
https:/
File state/watcher.go (right):
https:/
state/watcher.
> please can we put code block moves in their own CL? i don't
> disagree on principle (it is indeed better next to the
> MachineUnitsWat
Sorry, no, it was intentional. This is 100% new code, it's not
a variation of the old code. It's a variation of the current
MachineUnitsWatcher code.
https:/
state/watcher.
it may be freely refreshed
> i wonder if we should have a Clone or Copy method on Machine, Unit,
etc.
I wish we had, but niemeyer rules against it in one of the Lisbon
sprints, didn't he?
https:/
state/watcher.
updateMachine(
> i found the name "pending" slightly confusing here. how about using
"changed"
> and "newChanged" throughout (replacing "pending" and "new")? then the
> relationship between updateMachine, merge and the main loop becomes
clearer, i
> think.
I agree, but what about the MachineUnitsWat
subset of that. If we change anything here, we should change it
everywhere.
https:/
state/watcher.
On 2012/11/09 09:08:08, rog wrote:
> could we just use w.st.Unit(uni...
Gustavo Niemeyer (niemeyer) wrote : | # |
That's great. LGTM assuming the following:
https:/
File state/machine_
https:/
state/machine_
Thanks for the summaries.
https:/
File state/unit.go (right):
https:/
state/unit.go:417: return ¬AssignedErr
assigned to a machine", args...)}
Pre-req has a missing test? It should have a test actually expecting
the NotAssignedError suggested there.
https:/
File worker/
https:/
worker/
New logic was added here verifying that it is indeed assigned to the
machine we think it is. This is awesome, but deserves proper testing.
Gustavo Niemeyer (niemeyer) wrote : | # |
There are open points and the last review is from mid-last-week. Can you
please see what needs to be done to move this forward?
I'm marking this as WIP.
Aram Hăvărneanu (aramh) wrote : | # |
I'm on vacation for two weeks.
Gustavo Niemeyer (niemeyer) wrote : | # |
On Mon, Nov 19, 2012 at 10:34 AM, <email address hidden> wrote:
> I'm on vacation for two weeks.
Have fun there. That'd be a good note to send to canonical-juju or similar.
As far as this CL is concerned, I suppose it can wait for you to be back.
--
gustavo @ http://
William Reade (fwereade) wrote : | # |
I have serious reservations about the Machiner, but I don't think it's
any less good than it was before. FWIW the watcher essentially LGTM;
since Aram's on holiday, I'll be branching from here and trying to make
sure all other comments have been addressed, and will worry about the
machiner in a future CL.
https:/
File state/watcher.go (right):
https:/
state/watcher.
&& (doc.MachineId == nil || *doc.MachineId != w.machine.doc.Id) {
I think the principal check is redundant; I see no way for a subordinate
to get in here (and if it does, it's a bug, not just a thing-to-ignore).
https:/
state/watcher.
s/_, //
(surely..?)
https:/
File worker/
https:/
worker/
units so
Yeah, this is extremely important. If we're now tracking units to figure
out what to do, we should *surely* be doing so persistently.
https:/
worker/
m.stateInfo, m.tools); err != nil {
Isn't this actually a serious problem that should cause us to return an
error? If the agent is doing its job, we'll get retried anyway...
https:/
File worker/
https:/
worker/
I can accept handling reassignments that never happen in practice, but
we don't seem to be destroying dead units; surely that's necessary?
Unmerged revisions
- 727. By Aram Hăvărneanu
-
machiner: add lifecycle tests to machiner
- 726. By Aram Hăvărneanu
-
machiner: make sure we don't redeploy units when they change lifecycle
- 725. By Aram Hăvărneanu
-
state: more principal units watcher tests to original position
- 724. By Aram Hăvărneanu
-
machiner: use new style Unit.AssignedMa
chineId instead of Unit.IsAssigned To(*state. Machine) - 723. By Aram Hăvărneanu
-
firewaller: change NewMachiner signature so that state.State is the first argument
- 722. By Aram Hăvărneanu
-
all: merge lp:~aramh/juju-core/120-firewaller-new-watcher-units7
- 721. By Aram Hăvărneanu
-
machiner: convert to new style principal units watcher
- 720. By Aram Hăvărneanu
-
machiner: change NewMachiner signature to take a *state.State
- 719. By Aram Hăvărneanu
-
state: convert MachinePrincipa
lUnitsWatcher to a new style watcher - 718. By Aram Hăvărneanu
-
state: update machine principal units watcher table driven tests to new watcher model
Preview Diff
1 | === modified file 'cmd/jujud/machine.go' |
2 | --- cmd/jujud/machine.go 2012-10-12 07:40:16 +0000 |
3 | +++ cmd/jujud/machine.go 2012-11-09 16:51:25 +0000 |
4 | @@ -104,7 +104,7 @@ |
5 | var t task |
6 | switch w { |
7 | case state.MachinerWorker: |
8 | - t = machiner.NewMachiner(m, &a.Conf.StateInfo, a.Conf.DataDir) |
9 | + t = machiner.NewMachiner(st, m, &a.Conf.StateInfo, a.Conf.DataDir) |
10 | case state.ProvisionerWorker: |
11 | t = provisioner.NewProvisioner(st) |
12 | case state.FirewallerWorker: |
13 | |
14 | === modified file 'state/machine_test.go' |
15 | --- state/machine_test.go 2012-11-02 12:25:01 +0000 |
16 | +++ state/machine_test.go 2012-11-09 16:51:25 +0000 |
17 | @@ -317,34 +317,39 @@ |
18 | } |
19 | |
20 | var machinePrincipalsWatchTests = []struct { |
21 | + summary string |
22 | test func(*C, *MachineSuite, *state.Service) |
23 | - added []string |
24 | - removed []string |
25 | + changes []string |
26 | }{ |
27 | { |
28 | - test: func(_ *C, _ *MachineSuite, _ *state.Service) {}, |
29 | - added: []string{}, |
30 | - }, |
31 | - { |
32 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
33 | - unit, err := service.AddUnit() |
34 | - c.Assert(err, IsNil) |
35 | - err = unit.AssignToMachine(s.machine) |
36 | - c.Assert(err, IsNil) |
37 | - }, |
38 | - added: []string{"mysql/0"}, |
39 | - }, |
40 | - { |
41 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
42 | - unit, err := service.AddUnit() |
43 | - c.Assert(err, IsNil) |
44 | - err = unit.AssignToMachine(s.machine) |
45 | - c.Assert(err, IsNil) |
46 | - }, |
47 | - added: []string{"mysql/1"}, |
48 | - }, |
49 | - { |
50 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
51 | + "check initial empty event", |
52 | + func(_ *C, _ *MachineSuite, _ *state.Service) {}, |
53 | + []string(nil), |
54 | + }, { |
55 | + "add a unit", |
56 | + func(c *C, s *MachineSuite, service *state.Service) { |
57 | + unit, err := service.AddUnit() |
58 | + c.Assert(err, IsNil) |
59 | + err = unit.AssignToMachine(s.machine) |
60 | + c.Assert(err, IsNil) |
61 | + }, |
62 | + []string{"mysql/0"}, |
63 | + }, { |
64 | + "ignore unrelated change", |
65 | + func(c *C, s *MachineSuite, service *state.Service) { |
66 | + unit, err := service.Unit("mysql/0") |
67 | + c.Assert(err, IsNil) |
68 | + err = unit.SetPrivateAddress("what.ever") |
69 | + c.Assert(err, IsNil) |
70 | + unit, err = service.AddUnit() |
71 | + c.Assert(err, IsNil) |
72 | + err = unit.AssignToMachine(s.machine) |
73 | + c.Assert(err, IsNil) |
74 | + }, |
75 | + []string{"mysql/1"}, |
76 | + }, { |
77 | + "add two units", |
78 | + func(c *C, s *MachineSuite, service *state.Service) { |
79 | unit2, err := service.AddUnit() |
80 | c.Assert(err, IsNil) |
81 | err = unit2.AssignToMachine(s.machine) |
82 | @@ -354,38 +359,45 @@ |
83 | err = unit3.AssignToMachine(s.machine) |
84 | c.Assert(err, IsNil) |
85 | }, |
86 | - added: []string{"mysql/2", "mysql/3"}, |
87 | - }, |
88 | - { |
89 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
90 | + []string{"mysql/2", "mysql/3"}, |
91 | + }, { |
92 | + "set unit to dying", |
93 | + func(c *C, s *MachineSuite, service *state.Service) { |
94 | + unit3, err := service.Unit("mysql/3") |
95 | + c.Assert(err, IsNil) |
96 | + err = unit3.EnsureDying() |
97 | + c.Assert(err, IsNil) |
98 | + }, |
99 | + []string{"mysql/3"}, |
100 | + }, { |
101 | + "set unit to dead", |
102 | + func(c *C, s *MachineSuite, service *state.Service) { |
103 | unit3, err := service.Unit("mysql/3") |
104 | c.Assert(err, IsNil) |
105 | err = unit3.EnsureDead() |
106 | c.Assert(err, IsNil) |
107 | - err = service.RemoveUnit(unit3) |
108 | - c.Assert(err, IsNil) |
109 | }, |
110 | - removed: []string{"mysql/3"}, |
111 | - }, |
112 | - { |
113 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
114 | + []string{"mysql/3"}, |
115 | + }, { |
116 | + "set multiple units to dying, dead, and remove already reported dead units", |
117 | + func(c *C, s *MachineSuite, service *state.Service) { |
118 | unit0, err := service.Unit("mysql/0") |
119 | c.Assert(err, IsNil) |
120 | - err = unit0.EnsureDead() |
121 | - c.Assert(err, IsNil) |
122 | - err = service.RemoveUnit(unit0) |
123 | + err = unit0.EnsureDying() |
124 | c.Assert(err, IsNil) |
125 | unit2, err := service.Unit("mysql/2") |
126 | c.Assert(err, IsNil) |
127 | err = unit2.EnsureDead() |
128 | c.Assert(err, IsNil) |
129 | - err = service.RemoveUnit(unit2) |
130 | + unit3, err := service.Unit("mysql/3") |
131 | + c.Assert(err, IsNil) |
132 | + err = service.RemoveUnit(unit3) |
133 | c.Assert(err, IsNil) |
134 | }, |
135 | - removed: []string{"mysql/0", "mysql/2"}, |
136 | - }, |
137 | - { |
138 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
139 | + []string{"mysql/0", "mysql/2"}, |
140 | + }, { |
141 | + "add unit and remove unit at the same time", |
142 | + func(c *C, s *MachineSuite, service *state.Service) { |
143 | unit4, err := service.AddUnit() |
144 | c.Assert(err, IsNil) |
145 | err = unit4.AssignToMachine(s.machine) |
146 | @@ -397,11 +409,10 @@ |
147 | err = service.RemoveUnit(unit1) |
148 | c.Assert(err, IsNil) |
149 | }, |
150 | - added: []string{"mysql/4"}, |
151 | - removed: []string{"mysql/1"}, |
152 | - }, |
153 | - { |
154 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
155 | + []string{"mysql/1", "mysql/4"}, |
156 | + }, { |
157 | + "add and remove many units at once", |
158 | + func(c *C, s *MachineSuite, service *state.Service) { |
159 | units := [20]*state.Unit{} |
160 | var err error |
161 | for i := 0; i < len(units); i++ { |
162 | @@ -417,10 +428,10 @@ |
163 | c.Assert(err, IsNil) |
164 | } |
165 | }, |
166 | - added: []string{"mysql/10", "mysql/11", "mysql/12", "mysql/13", "mysql/14", "mysql/5", "mysql/6", "mysql/7", "mysql/8", "mysql/9"}, |
167 | - }, |
168 | - { |
169 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
170 | + []string{"mysql/10", "mysql/11", "mysql/12", "mysql/13", "mysql/14", "mysql/5", "mysql/6", "mysql/7", "mysql/8", "mysql/9"}, |
171 | + }, { |
172 | + "report dead when first seen and also add a new unit", |
173 | + func(c *C, s *MachineSuite, service *state.Service) { |
174 | unit25, err := service.AddUnit() |
175 | c.Assert(err, IsNil) |
176 | err = unit25.AssignToMachine(s.machine) |
177 | @@ -432,11 +443,10 @@ |
178 | err = service.RemoveUnit(unit9) |
179 | c.Assert(err, IsNil) |
180 | }, |
181 | - added: []string{"mysql/25"}, |
182 | - removed: []string{"mysql/9"}, |
183 | - }, |
184 | - { |
185 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
186 | + []string{"mysql/9", "mysql/25"}, |
187 | + }, { |
188 | + "ignore units not assigned to this machine", |
189 | + func(c *C, s *MachineSuite, service *state.Service) { |
190 | unit26, err := service.AddUnit() |
191 | c.Assert(err, IsNil) |
192 | err = unit26.AssignToMachine(s.machine) |
193 | @@ -479,11 +489,10 @@ |
194 | err = service.RemoveUnit(unit14) |
195 | c.Assert(err, IsNil) |
196 | }, |
197 | - added: []string{"bacon/0", "bacon/1", "mysql/26", "mysql/27"}, |
198 | - removed: []string{"mysql/14"}, |
199 | - }, |
200 | - { |
201 | - test: func(c *C, s *MachineSuite, service *state.Service) { |
202 | + []string{"bacon/0", "bacon/1", "mysql/26", "mysql/27", "mysql/14"}, |
203 | + }, { |
204 | + "ignore subordinate units", |
205 | + func(c *C, s *MachineSuite, service *state.Service) { |
206 | unit28, err := service.AddUnit() |
207 | c.Assert(err, IsNil) |
208 | err = unit28.AssignToMachine(s.machine) |
209 | @@ -502,7 +511,19 @@ |
210 | _, err = logService.AddUnitSubordinateTo(unit29) |
211 | c.Assert(err, IsNil) |
212 | }, |
213 | - added: []string{"mysql/28", "mysql/29"}, |
214 | + []string{"mysql/28", "mysql/29"}, |
215 | + }, { |
216 | + "unassign from machine", |
217 | + func(c *C, s *MachineSuite, service *state.Service) { |
218 | + unit28, err := service.Unit("mysql/28") |
219 | + c.Assert(err, IsNil) |
220 | + err = unit28.UnassignFromMachine() |
221 | + c.Assert(err, IsNil) |
222 | + unit29, err := service.Unit("mysql/29") |
223 | + c.Assert(err, IsNil) |
224 | + err = unit29.UnassignFromMachine() |
225 | + }, |
226 | + []string{"mysql/28", "mysql/29"}, |
227 | }, |
228 | } |
229 | |
230 | @@ -515,21 +536,23 @@ |
231 | c.Assert(unitWatcher.Stop(), IsNil) |
232 | }() |
233 | for i, test := range machinePrincipalsWatchTests { |
234 | - c.Logf("test %d", i) |
235 | + c.Logf("test %d: %s", i, test.summary) |
236 | test.test(c, s, service) |
237 | s.State.StartSync() |
238 | - got := &state.MachinePrincipalUnitsChange{} |
239 | + var got []string |
240 | for { |
241 | select { |
242 | case new, ok := <-unitWatcher.Changes(): |
243 | c.Assert(ok, Equals, true) |
244 | - addMachineUnitChanges(got, new) |
245 | - if moreMachinePrincipalUnitsRequired(got, test.added, test.removed) { |
246 | + got = append(got, new...) |
247 | + if len(got) < len(test.changes) { |
248 | continue |
249 | } |
250 | - assertSameMachinePrincipalUnits(c, got, test.added, test.removed) |
251 | + sort.Strings(got) |
252 | + sort.Strings(test.changes) |
253 | + c.Assert(got, DeepEquals, test.changes) |
254 | case <-time.After(500 * time.Millisecond): |
255 | - c.Fatalf("did not get change, want: added: %#v, removed: %#v, got: %#v", test.added, test.removed, got) |
256 | + c.Fatalf("did not get change, want: %#v, got: %#v", test.changes, got) |
257 | } |
258 | break |
259 | } |
260 | @@ -541,37 +564,6 @@ |
261 | } |
262 | } |
263 | |
264 | -func moreMachinePrincipalUnitsRequired(got *state.MachinePrincipalUnitsChange, added, removed []string) bool { |
265 | - return len(got.Added)+len(got.Removed) < len(added)+len(removed) |
266 | -} |
267 | - |
268 | -func addMachineUnitChanges(changes *state.MachinePrincipalUnitsChange, more *state.MachinePrincipalUnitsChange) { |
269 | - changes.Added = append(changes.Added, more.Added...) |
270 | - changes.Removed = append(changes.Removed, more.Removed...) |
271 | -} |
272 | - |
273 | -func assertSameMachinePrincipalUnits(c *C, change *state.MachinePrincipalUnitsChange, added, removed []string) { |
274 | - c.Assert(change, NotNil) |
275 | - if len(added) == 0 { |
276 | - added = nil |
277 | - } |
278 | - if len(removed) == 0 { |
279 | - removed = nil |
280 | - } |
281 | - sort.Sort(unitSlice(change.Added)) |
282 | - sort.Sort(unitSlice(change.Removed)) |
283 | - var got []string |
284 | - for _, g := range change.Added { |
285 | - got = append(got, g.Name()) |
286 | - } |
287 | - c.Assert(got, DeepEquals, added) |
288 | - got = nil |
289 | - for _, g := range change.Removed { |
290 | - got = append(got, g.Name()) |
291 | - } |
292 | - c.Assert(got, DeepEquals, removed) |
293 | -} |
294 | - |
295 | var machineUnitsWatchTests = []struct { |
296 | summary string |
297 | test func(*C, *MachineSuite, *state.Unit, *state.Charm) |
298 | |
299 | === modified file 'state/unit.go' |
300 | --- state/unit.go 2012-11-09 16:51:25 +0000 |
301 | +++ state/unit.go 2012-11-09 16:51:25 +0000 |
302 | @@ -414,7 +414,7 @@ |
303 | } |
304 | |
305 | func notAssigned(format string, args ...interface{}) error { |
306 | - return &NotFoundError{fmt.Sprintf(format+" is not assigned to a machine", args...)} |
307 | + return ¬AssignedError{fmt.Sprintf(format+" is not assigned to a machine", args...)} |
308 | } |
309 | |
310 | func IsNotAssigned(err error) bool { |
311 | |
312 | === modified file 'state/watcher.go' |
313 | --- state/watcher.go 2012-11-02 12:25:01 +0000 |
314 | +++ state/watcher.go 2012-11-09 16:51:25 +0000 |
315 | @@ -44,22 +44,6 @@ |
316 | Left []string |
317 | } |
318 | |
319 | -// MachinePrincipalUnitsWatcher observes the assignment and removal of units |
320 | -// to and from a machine. |
321 | -type MachinePrincipalUnitsWatcher struct { |
322 | - commonWatcher |
323 | - machine *Machine |
324 | - changeChan chan *MachinePrincipalUnitsChange |
325 | - knownUnits map[string]*Unit |
326 | -} |
327 | - |
328 | -// MachinePrincipalUnitsChange contains information about units that have been |
329 | -// assigned to or removed from the machine. |
330 | -type MachinePrincipalUnitsChange struct { |
331 | - Added []*Unit |
332 | - Removed []*Unit |
333 | -} |
334 | - |
335 | func hasString(changes []string, name string) bool { |
336 | for _, v := range changes { |
337 | if v == name { |
338 | @@ -575,130 +559,6 @@ |
339 | return nil |
340 | } |
341 | |
342 | -// WatchPrincipalUnits returns a watcher for observing units being |
343 | -// added to or removed from the machine. |
344 | -func (m *Machine) WatchPrincipalUnits() *MachinePrincipalUnitsWatcher { |
345 | - return newMachinePrincipalUnitsWatcher(m) |
346 | -} |
347 | - |
348 | -// newMachinePrincipalUnitsWatcher creates and starts a watcher to watch information |
349 | -// about units being added to or deleted from the machine. |
350 | -func newMachinePrincipalUnitsWatcher(m *Machine) *MachinePrincipalUnitsWatcher { |
351 | - w := &MachinePrincipalUnitsWatcher{ |
352 | - changeChan: make(chan *MachinePrincipalUnitsChange), |
353 | - machine: m, |
354 | - knownUnits: make(map[string]*Unit), |
355 | - commonWatcher: commonWatcher{st: m.st}, |
356 | - } |
357 | - go func() { |
358 | - defer w.tomb.Done() |
359 | - defer close(w.changeChan) |
360 | - w.tomb.Kill(w.loop()) |
361 | - }() |
362 | - return w |
363 | -} |
364 | - |
365 | -// Changes returns a channel that will receive changes when units are |
366 | -// added or deleted. The Added field in the first event on the channel |
367 | -// holds the initial state as returned by Machine.Units. |
368 | -func (w *MachinePrincipalUnitsWatcher) Changes() <-chan *MachinePrincipalUnitsChange { |
369 | - return w.changeChan |
370 | -} |
371 | - |
372 | -func (w *MachinePrincipalUnitsWatcher) mergeChange(changes *MachinePrincipalUnitsChange, ch watcher.Change) (err error) { |
373 | - err = w.machine.Refresh() |
374 | - if err != nil { |
375 | - return err |
376 | - } |
377 | - units := make(map[string]*Unit) |
378 | - for _, name := range w.machine.doc.Principals { |
379 | - var unit *Unit |
380 | - doc := &unitDoc{} |
381 | - if _, ok := w.knownUnits[name]; !ok { |
382 | - err = w.st.units.FindId(name).One(doc) |
383 | - if err == mgo.ErrNotFound { |
384 | - continue |
385 | - } |
386 | - if err != nil { |
387 | - return err |
388 | - } |
389 | - unit = newUnit(w.st, doc) |
390 | - changes.Added = append(changes.Added, unit) |
391 | - w.knownUnits[name] = unit |
392 | - } |
393 | - units[name] = unit |
394 | - } |
395 | - for name, unit := range w.knownUnits { |
396 | - if _, ok := units[name]; !ok { |
397 | - changes.Removed = append(changes.Removed, unit) |
398 | - delete(w.knownUnits, name) |
399 | - } |
400 | - } |
401 | - return nil |
402 | -} |
403 | - |
404 | -func (changes *MachinePrincipalUnitsChange) isEmpty() bool { |
405 | - return len(changes.Added)+len(changes.Removed) == 0 |
406 | -} |
407 | - |
408 | -func (w *MachinePrincipalUnitsWatcher) getInitialEvent() (initial *MachinePrincipalUnitsChange, err error) { |
409 | - changes := &MachinePrincipalUnitsChange{} |
410 | - docs := []unitDoc{} |
411 | - err = w.st.units.Find(D{{"_id", D{{"$in", w.machine.doc.Principals}}}}).All(&docs) |
412 | - if err != nil { |
413 | - return nil, err |
414 | - } |
415 | - for _, doc := range docs { |
416 | - unit := newUnit(w.st, &doc) |
417 | - w.knownUnits[doc.Name] = unit |
418 | - changes.Added = append(changes.Added, unit) |
419 | - } |
420 | - return changes, nil |
421 | -} |
422 | - |
423 | -func (w *MachinePrincipalUnitsWatcher) loop() (err error) { |
424 | - ch := make(chan watcher.Change) |
425 | - w.st.watcher.Watch(w.st.machines.Name, w.machine.doc.Id, w.machine.doc.TxnRevno, ch) |
426 | - defer w.st.watcher.Unwatch(w.st.machines.Name, w.machine.doc.Id, ch) |
427 | - changes, err := w.getInitialEvent() |
428 | - if err != nil { |
429 | - return err |
430 | - } |
431 | - for { |
432 | - for changes != nil { |
433 | - select { |
434 | - case <-w.st.watcher.Dead(): |
435 | - return watcher.MustErr(w.st.watcher) |
436 | - case <-w.tomb.Dying(): |
437 | - return tomb.ErrDying |
438 | - case c := <-ch: |
439 | - err := w.mergeChange(changes, c) |
440 | - if err != nil { |
441 | - return err |
442 | - } |
443 | - case w.changeChan <- changes: |
444 | - changes = nil |
445 | - } |
446 | - } |
447 | - select { |
448 | - case <-w.st.watcher.Dead(): |
449 | - return watcher.MustErr(w.st.watcher) |
450 | - case <-w.tomb.Dying(): |
451 | - return tomb.ErrDying |
452 | - case c := <-ch: |
453 | - changes = &MachinePrincipalUnitsChange{} |
454 | - err := w.mergeChange(changes, c) |
455 | - if err != nil { |
456 | - return err |
457 | - } |
458 | - if changes.isEmpty() { |
459 | - changes = nil |
460 | - } |
461 | - } |
462 | - } |
463 | - return nil |
464 | -} |
465 | - |
466 | func newRelationScopeWatcher(st *State, scope, ignore string) *RelationScopeWatcher { |
467 | w := &RelationScopeWatcher{ |
468 | commonWatcher: commonWatcher{st: st}, |
469 | @@ -1385,3 +1245,135 @@ |
470 | } |
471 | panic("unreachable") |
472 | } |
473 | + |
474 | +// MachinePrincipalUnitsWatcher notifies about assignments and lifecycle |
475 | +// changes for all principal units assigned to a machine. |
476 | +// |
477 | +// The first event emitted contains the unit names of all principal units |
478 | +// currently assigned to the machine, irrespective of their life state. From |
479 | +// then on, a new event is emitted whenever a principal unit is assigned |
480 | +// to or unassigned from the machine, or the lifecycle of a unit that is |
481 | +// currently assigned to the machine changes. |
482 | +// |
483 | +// After a unit is found to be Dead, no further event will include it. |
484 | +type MachinePrincipalUnitsWatcher struct { |
485 | + commonWatcher |
486 | + machine *Machine |
487 | + out chan []string |
488 | + in chan watcher.Change |
489 | + known map[string]Life |
490 | +} |
491 | + |
492 | +// WatchUnits returns a new MachinePrincipalUnitsWatcher for m. |
493 | +func (m *Machine) WatchPrincipalUnits() *MachinePrincipalUnitsWatcher { |
494 | + return newMachinePrincipalUnitsWatcher(m) |
495 | +} |
496 | + |
497 | +func newMachinePrincipalUnitsWatcher(m *Machine) *MachinePrincipalUnitsWatcher { |
498 | + w := &MachinePrincipalUnitsWatcher{ |
499 | + commonWatcher: commonWatcher{st: m.st}, |
500 | + out: make(chan []string), |
501 | + in: make(chan watcher.Change), |
502 | + known: make(map[string]Life), |
503 | + machine: &Machine{m.st, m.doc}, // Copy so it may be freely refreshed |
504 | + } |
505 | + go func() { |
506 | + defer w.tomb.Done() |
507 | + defer close(w.out) |
508 | + w.tomb.Kill(w.loop()) |
509 | + }() |
510 | + return w |
511 | +} |
512 | + |
513 | +// Changes returns the event channel for w. |
514 | +func (w *MachinePrincipalUnitsWatcher) Changes() <-chan []string { |
515 | + return w.out |
516 | +} |
517 | + |
518 | +func (w *MachinePrincipalUnitsWatcher) updateMachine(pending []string) (new []string, err error) { |
519 | + err = w.machine.Refresh() |
520 | + if err != nil { |
521 | + return nil, err |
522 | + } |
523 | + for _, unit := range w.machine.doc.Principals { |
524 | + if _, ok := w.known[unit]; !ok { |
525 | + pending, err = w.merge(pending, unit) |
526 | + if err != nil { |
527 | + return nil, err |
528 | + } |
529 | + } |
530 | + } |
531 | + return pending, nil |
532 | +} |
533 | + |
534 | +func (w *MachinePrincipalUnitsWatcher) merge(pending []string, unit string) (new []string, err error) { |
535 | + doc := unitDoc{} |
536 | + err = w.st.units.FindId(unit).One(&doc) |
537 | + if err != nil && err != mgo.ErrNotFound { |
538 | + return nil, err |
539 | + } |
540 | + life, known := w.known[unit] |
541 | + if err == mgo.ErrNotFound || doc.Principal == "" && (doc.MachineId == nil || *doc.MachineId != w.machine.doc.Id) { |
542 | + // Unit was removed or unassigned from w.machine. |
543 | + if known { |
544 | + delete(w.known, unit) |
545 | + w.st.watcher.Unwatch(w.st.units.Name, unit, w.in) |
546 | + if life != Dead && !hasString(pending, unit) { |
547 | + pending = append(pending, unit) |
548 | + } |
549 | + } |
550 | + return pending, nil |
551 | + } |
552 | + if !known { |
553 | + w.st.watcher.Watch(w.st.units.Name, unit, doc.TxnRevno, w.in) |
554 | + pending = append(pending, unit) |
555 | + } else if life != doc.Life && !hasString(pending, unit) { |
556 | + pending = append(pending, unit) |
557 | + } |
558 | + w.known[unit] = doc.Life |
559 | + return pending, nil |
560 | +} |
561 | + |
562 | +func (w *MachinePrincipalUnitsWatcher) loop() (err error) { |
563 | + defer func() { |
564 | + for _, unit := range w.known { |
565 | + w.st.watcher.Unwatch(w.st.units.Name, unit, w.in) |
566 | + } |
567 | + }() |
568 | + machineCh := make(chan watcher.Change) |
569 | + w.st.watcher.Watch(w.st.machines.Name, w.machine.doc.Id, w.machine.doc.TxnRevno, machineCh) |
570 | + defer w.st.watcher.Unwatch(w.st.machines.Name, w.machine.doc.Id, machineCh) |
571 | + changes, err := w.updateMachine([]string(nil)) |
572 | + if err != nil { |
573 | + return err |
574 | + } |
575 | + out := w.out |
576 | + for { |
577 | + select { |
578 | + case <-w.st.watcher.Dead(): |
579 | + return watcher.MustErr(w.st.watcher) |
580 | + case <-w.tomb.Dying(): |
581 | + return tomb.ErrDying |
582 | + case <-machineCh: |
583 | + changes, err = w.updateMachine(changes) |
584 | + if err != nil { |
585 | + return err |
586 | + } |
587 | + if len(changes) > 0 { |
588 | + out = w.out |
589 | + } |
590 | + case c := <-w.in: |
591 | + changes, err = w.merge(changes, c.Id.(string)) |
592 | + if err != nil { |
593 | + return err |
594 | + } |
595 | + if len(changes) > 0 { |
596 | + out = w.out |
597 | + } |
598 | + case out <- changes: |
599 | + out = nil |
600 | + changes = nil |
601 | + } |
602 | + } |
603 | + panic("unreachable") |
604 | +} |
605 | |
606 | === modified file 'worker/machiner/export_test.go' |
607 | --- worker/machiner/export_test.go 2012-09-12 17:06:25 +0000 |
608 | +++ worker/machiner/export_test.go 2012-11-09 16:51:25 +0000 |
609 | @@ -5,6 +5,6 @@ |
610 | "launchpad.net/juju-core/state" |
611 | ) |
612 | |
613 | -func NewMachinerWithContainer(m *state.Machine, info *state.Info, dataDir string, cont container.Container) *Machiner { |
614 | - return newMachiner(m, info, dataDir, cont) |
615 | +func NewMachinerWithContainer(st *state.State, m *state.Machine, info *state.Info, dataDir string, cont container.Container) *Machiner { |
616 | + return newMachiner(st, m, info, dataDir, cont) |
617 | } |
618 | |
619 | === modified file 'worker/machiner/machiner.go' |
620 | --- worker/machiner/machiner.go 2012-10-11 14:52:21 +0000 |
621 | +++ worker/machiner/machiner.go 2012-11-09 16:51:25 +0000 |
622 | @@ -13,6 +13,7 @@ |
623 | |
624 | // Machiner represents a running machine agent. |
625 | type Machiner struct { |
626 | + st *state.State |
627 | tomb tomb.Tomb |
628 | machine *state.Machine |
629 | localContainer container.Container |
630 | @@ -23,17 +24,18 @@ |
631 | // NewMachiner starts a machine agent running that |
632 | // deploys agents in the given directory. |
633 | // The Machiner dies when it encounters an error. |
634 | -func NewMachiner(machine *state.Machine, info *state.Info, dataDir string) *Machiner { |
635 | +func NewMachiner(st *state.State, machine *state.Machine, info *state.Info, dataDir string) *Machiner { |
636 | cont := &container.Simple{DataDir: dataDir} |
637 | - return newMachiner(machine, info, dataDir, cont) |
638 | + return newMachiner(st, machine, info, dataDir, cont) |
639 | } |
640 | |
641 | -func newMachiner(machine *state.Machine, info *state.Info, dataDir string, cont container.Container) *Machiner { |
642 | +func newMachiner(st *state.State, machine *state.Machine, info *state.Info, dataDir string, cont container.Container) *Machiner { |
643 | tools, err := environs.ReadTools(dataDir, version.Current) |
644 | if err != nil { |
645 | tools = &state.Tools{Binary: version.Current} |
646 | } |
647 | m := &Machiner{ |
648 | + st: st, |
649 | machine: machine, |
650 | stateInfo: info, |
651 | tools: tools, |
652 | @@ -47,6 +49,7 @@ |
653 | defer m.tomb.Done() |
654 | w := m.machine.WatchPrincipalUnits() |
655 | defer watcher.Stop(w, &m.tomb) |
656 | + units := map[string]*state.Unit{} |
657 | |
658 | // TODO read initial units, check if they're running |
659 | // and restart them if not. Also track units so |
660 | @@ -60,19 +63,37 @@ |
661 | m.tomb.Kill(watcher.MustErr(w)) |
662 | return |
663 | } |
664 | - for _, u := range change.Removed { |
665 | - if u.IsPrincipal() { |
666 | - if err := m.localContainer.Destroy(u); err != nil { |
667 | - log.Printf("worker/machiner: cannot destroy unit %s: %v", u.Name(), err) |
668 | + for _, uname := range change { |
669 | + assigned := false |
670 | + u, err := m.st.Unit(uname) |
671 | + if err != nil { |
672 | + if !state.IsNotFound(err) { |
673 | + m.tomb.Kill(err) |
674 | + return |
675 | + } |
676 | + } else { |
677 | + machineId, err := u.AssignedMachineId() |
678 | + if err != nil { |
679 | + if !state.IsNotAssigned(err) { |
680 | + m.tomb.Kill(err) |
681 | + return |
682 | + } |
683 | + } else { |
684 | + assigned = machineId == m.machine.Id() |
685 | } |
686 | } |
687 | - } |
688 | - for _, u := range change.Added { |
689 | - if u.IsPrincipal() { |
690 | + _, known := units[uname] |
691 | + if assigned && !known { |
692 | if err := m.localContainer.Deploy(u, m.stateInfo, m.tools); err != nil { |
693 | // TODO put unit into a queue to retry the deploy. |
694 | log.Printf("worker/machiner: cannot deploy unit %s: %v", u.Name(), err) |
695 | } |
696 | + units[uname] = u |
697 | + } else if !assigned && known { |
698 | + if err := m.localContainer.Destroy(units[uname]); err != nil { |
699 | + log.Printf("worker/machiner: cannot destroy unit %s: %v", u.Name(), err) |
700 | + } |
701 | + delete(units, uname) |
702 | } |
703 | } |
704 | } |
705 | |
706 | === modified file 'worker/machiner/machiner_test.go' |
707 | --- worker/machiner/machiner_test.go 2012-09-28 07:21:43 +0000 |
708 | +++ worker/machiner/machiner_test.go 2012-11-09 16:51:25 +0000 |
709 | @@ -26,7 +26,7 @@ |
710 | m, err := s.State.AddMachine(state.MachinerWorker) |
711 | c.Assert(err, IsNil) |
712 | |
713 | - p := machiner.NewMachiner(m, &state.Info{}, c.MkDir()) |
714 | + p := machiner.NewMachiner(s.State, m, &state.Info{}, c.MkDir()) |
715 | c.Assert(p.Stop(), IsNil) |
716 | } |
717 | |
718 | @@ -38,6 +38,8 @@ |
719 | c.Assert(err, IsNil) |
720 | d1, err := s.State.AddService("d1", dummyCharm) |
721 | c.Assert(err, IsNil) |
722 | + d2, err := s.State.AddService("d2", dummyCharm) |
723 | + c.Assert(err, IsNil) |
724 | sub0, err := s.State.AddService("sub0", loggingCharm) |
725 | c.Assert(err, IsNil) |
726 | |
727 | @@ -50,6 +52,9 @@ |
728 | ud1, err := d1.AddUnit() |
729 | c.Assert(err, IsNil) |
730 | |
731 | + ud2, err := d2.AddUnit() |
732 | + c.Assert(err, IsNil) |
733 | + |
734 | m0, err := s.State.AddMachine(state.MachinerWorker) |
735 | c.Assert(err, IsNil) |
736 | |
737 | @@ -67,7 +72,7 @@ |
738 | expectedInfo: stateInfo, |
739 | action: make(chan string, 5), |
740 | } |
741 | - machiner := machiner.NewMachinerWithContainer(m0, stateInfo, s.DataDir(), dcontainer) |
742 | + machiner := machiner.NewMachinerWithContainer(s.State, m0, stateInfo, s.DataDir(), dcontainer) |
743 | defer func() { |
744 | err := machiner.Stop() |
745 | c.Assert(err, IsNil) |
746 | @@ -88,6 +93,18 @@ |
747 | []string{"+d1/0"}, |
748 | }, { |
749 | func() { |
750 | + err := ud2.AssignToMachine(m0) |
751 | + c.Assert(err, IsNil) |
752 | + }, |
753 | + []string{"+d2/0"}, |
754 | + }, { |
755 | + func() { |
756 | + err := ud2.EnsureDying() |
757 | + c.Assert(err, IsNil) |
758 | + }, |
759 | + nil, |
760 | + }, { |
761 | + func() { |
762 | err := ud0.UnassignFromMachine() |
763 | c.Assert(err, IsNil) |
764 | }, |
Reviewers: mp+133504_ code.launchpad. net,
Message:
Please take a look.
Description:
state, machiner: new style principal units watcher
Convert the machine principal units watcher to the new, id only model
and refactor the machiner to make use of it.
https:/ /code.launchpad .net/~aramh/ juju-core/ 121-state- machiner- watchers- machine- principals7/ +merge/ 133504
Requires: /code.launchpad .net/~aramh/ juju-core/ 120-firewaller- new-watcher- units7/ +merge/ 133269
https:/
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/6814108/
Affected files: machine. go test.go machiner/ export_ test.go machiner/ machiner. go machiner/ machiner_ test.go
A [revision details]
M cmd/jujud/
M state/machine_
M state/unit.go
M state/watcher.go
M worker/
M worker/
M worker/