Merge lp:~niemeyer/juju-core/mstate-machine-watcher into lp:~juju/juju-core/trunk
- mstate-machine-watcher
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 481 |
Proposed branch: | lp:~niemeyer/juju-core/mstate-machine-watcher |
Merge into: | lp:~juju/juju-core/trunk |
Prerequisite: | lp:~niemeyer/juju-core/mstate-watcher-helpers |
Diff against target: |
400 lines (+246/-21) 7 files modified
mstate/export_test.go (+4/-0) mstate/machine.go (+6/-0) mstate/machine_test.go (+101/-0) mstate/open.go (+39/-17) mstate/state.go (+11/-2) mstate/state_test.go (+11/-2) mstate/watcher.go (+74/-0) |
To merge this branch: | bzr merge lp:~niemeyer/juju-core/mstate-machine-watcher |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
The Go Language Gophers | Pending | ||
Review via email: mp+123614@code.launchpad.net |
Commit message
Description of the change
mstate: add machine watcher
This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.
Gustavo Niemeyer (niemeyer) wrote : | # |
William Reade (fwereade) wrote : | # |
LGTM modulo locale worries.
https:/
File mstate/open.go (right):
https:/
mstate/open.go:45: // Quite unfortunate that the error has no
appropriate code.
Very much so. Can we be screwed by locale settings here?
Gustavo Niemeyer (niemeyer) wrote : | # |
*** Submitted:
mstate: add machine watcher
This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.
R=fwereade
CC=
https:/
https:/
File mstate/open.go (right):
https:/
mstate/open.go:45: // Quite unfortunate that the error has no
appropriate code.
On 2012/09/11 07:28:07, fwereade wrote:
> Very much so. Can we be screwed by locale settings here?
Not at the moment at least. I've checked the upstream code and it
doesn't translate the message.
I've included in the comment the URL of the upstream bug reported on
this issue.
Aram Hăvărneanu (aramh) wrote : | # |
https:/
File mstate/machine.go (right):
https:/
mstate/
This complicates tests and watchers because Insert can't set TxnRevno
and DeepEquals will fail. How about reverting this, and create a new
machineDocTxn like this:
type machineDocTxn struct {
machineDoc `bson:",inline"`
TxnRevno int64 `bson:"txn-revno"`
}
and use that where we need it?
https:/
File mstate/watcher.go (right):
https:/
mstate/
Shouldn't this be a default case, and w.changeChan <- m *after* the
select? That way we will always completely drain ch before sending m.
Now there's a chance we will send m before draining ch.
Gustavo Niemeyer (niemeyer) wrote : | # |
https:/
File mstate/machine.go (right):
https:/
mstate/
On 2012/09/11 17:40:53, aram wrote:
> This complicates tests and watchers because Insert can't set TxnRevno
and
> DeepEquals will fail. How about reverting this, and create a new
machineDocTxn
> like this:
The reasoning for doing it doesn't sound great. Doing DeepEquals on such
a structure is most definitely wrong. Private data is private, and by
definition we should be able to cache information freely or change it
without breaking tests that work on the public APIs.
It sounds sensible to have a omitempty setting there, though, so that we
don't try to insert it. Can you please make the change on a follow up
CL?
https:/
File mstate/watcher.go (right):
https:/
mstate/
On 2012/09/11 17:40:53, aram wrote:
> Shouldn't this be a default case, and w.changeChan <- m *after* the
select? That
> way we will always completely drain ch before sending m. Now there's a
chance we
> will send m before draining ch.
No, there's some misunderstanding about how the watcher works. The
channel is never drained, and the construction here is purposefully
operating *while* trying to send.
Aram Hăvărneanu (aramh) wrote : | # |
> The reasoning for doing it doesn't sound great. Doing DeepEquals on
such
> a structure is most definitely wrong. Private data is private, and by
> definition we should be able to cache information freely or change it
> without breaking tests that work on the public APIs.
It may be wrong, but in that case we have to do lots of work to fix it
because all the documents will have this field.
white:mstate$ cd state
/home/
white:state$ g DeepEquals | wc -l
89
white:state$ cd mstate
/home/
white:mstate$ g DeepEquals | wc -l
58
To be honest, using DeepEquals in tests always felt wrong to be because
it was checking the implementation details instead of relying on the
interface, but I assumed it was ok since this is the way state tests
were written in the first place.
I will put omitempty, but this doesn't fix the tests. Entities returned
by Insert will have TxtRevno always zero but entities returned by other
querying functions or by Refresh will have a non-zero TxtRevno.
If you are against this change, perhaps an alternative is to call
Refresh() in tests where this might matter.
Gustavo Niemeyer (niemeyer) wrote : | # |
On Tue, Sep 11, 2012 at 3:13 PM, <email address hidden> wrote:
> white:mstate$ cd state
> /home/aram/
> white:state$ g DeepEquals | wc -l
> 89
> white:state$ cd mstate
> /home/aram/
> white:mstate$ g DeepEquals | wc -l
> 58
This is counting the number of times we've used DeepEquals, which
doesn't really mean much.
How many of those entries are comparing the private fields of machine?
> I will put omitempty, but this doesn't fix the tests. Entities returned
> by Insert will have TxtRevno always zero but entities returned by other
> querying functions or by Refresh will have a non-zero TxtRevno.
That's a good point. How can we fix it?
gustavo @ http://
Preview Diff
1 | === modified file 'mstate/export_test.go' |
2 | --- mstate/export_test.go 2012-08-16 13:44:09 +0000 |
3 | +++ mstate/export_test.go 2012-09-10 18:17:20 +0000 |
4 | @@ -12,3 +12,7 @@ |
5 | m := &Machine{doc: machineDoc(*doc)} |
6 | return m.String() |
7 | } |
8 | + |
9 | +func init() { |
10 | + logSize = logSizeTests |
11 | +} |
12 | |
13 | === modified file 'mstate/machine.go' |
14 | --- mstate/machine.go 2012-09-06 17:04:54 +0000 |
15 | +++ mstate/machine.go 2012-09-10 18:17:20 +0000 |
16 | @@ -20,6 +20,7 @@ |
17 | Id int `bson:"_id"` |
18 | InstanceId string |
19 | Life Life |
20 | + TxnRevno int64 `bson:"txn-revno"` |
21 | } |
22 | |
23 | func newMachine(st *State, doc *machineDoc) *Machine { |
24 | @@ -73,6 +74,11 @@ |
25 | return nil |
26 | } |
27 | |
28 | +// Watch returns a watcher that fires when the machine changes. |
29 | +func (m *Machine) Watch() *MachineWatcher { |
30 | + return newMachineWatcher(m) |
31 | +} |
32 | + |
33 | // AgentAlive returns whether the respective remote agent is alive. |
34 | func (m *Machine) AgentAlive() bool { |
35 | return m.st.presencew.Alive(m.globalKey()) |
36 | |
37 | === modified file 'mstate/machine_test.go' |
38 | --- mstate/machine_test.go 2012-09-06 17:04:54 +0000 |
39 | +++ mstate/machine_test.go 2012-09-10 18:17:20 +0000 |
40 | @@ -3,6 +3,7 @@ |
41 | import ( |
42 | . "launchpad.net/gocheck" |
43 | state "launchpad.net/juju-core/mstate" |
44 | + "launchpad.net/juju-core/version" |
45 | "sort" |
46 | "time" |
47 | ) |
48 | @@ -220,3 +221,103 @@ |
49 | sort.Strings(names) |
50 | return names |
51 | } |
52 | + |
53 | +type machineInfo struct { |
54 | + tools *state.Tools |
55 | + instanceId string |
56 | +} |
57 | + |
58 | +func tools(tools int, url string) *state.Tools { |
59 | + return &state.Tools{ |
60 | + URL: url, |
61 | + Binary: version.Binary{ |
62 | + Number: version.Number{0, 0, tools}, |
63 | + Series: "series", |
64 | + Arch: "arch", |
65 | + }, |
66 | + } |
67 | +} |
68 | + |
69 | +var watchMachineTests = []struct { |
70 | + test func(m *state.Machine) error |
71 | + want machineInfo |
72 | +}{ |
73 | + { |
74 | + func(m *state.Machine) error { |
75 | + return nil |
76 | + }, |
77 | + machineInfo{ |
78 | + tools: &state.Tools{}, |
79 | + }, |
80 | + }, |
81 | + { |
82 | + func(m *state.Machine) error { |
83 | + return m.SetInstanceId("m-foo") |
84 | + }, |
85 | + machineInfo{ |
86 | + tools: &state.Tools{}, |
87 | + instanceId: "m-foo", |
88 | + }, |
89 | + }, |
90 | + { |
91 | + func(m *state.Machine) error { |
92 | + return m.SetInstanceId("") |
93 | + }, |
94 | + machineInfo{ |
95 | + tools: &state.Tools{}, |
96 | + instanceId: "", |
97 | + }, |
98 | + }, |
99 | + // TODO SetAgentTools is missing. |
100 | + //{ |
101 | + // func(m *state.Machine) error { |
102 | + // return m.SetAgentTools(tools(3, "baz")) |
103 | + // }, |
104 | + // machineInfo{ |
105 | + // tools: tools(3, "baz"), |
106 | + // }, |
107 | + //}, |
108 | + //{ |
109 | + // func(m *state.Machine) error { |
110 | + // return m.SetAgentTools(tools(4, "khroomph")) |
111 | + // }, |
112 | + // machineInfo{ |
113 | + // tools: tools(4, "khroomph"), |
114 | + // }, |
115 | + //}, |
116 | +} |
117 | + |
118 | +func (s *MachineSuite) TestWatchMachine(c *C) { |
119 | + w := s.machine.Watch() |
120 | + defer func() { |
121 | + c.Assert(w.Stop(), IsNil) |
122 | + }() |
123 | + for i, test := range watchMachineTests { |
124 | + c.Logf("test %d", i) |
125 | + err := test.test(s.machine) |
126 | + c.Assert(err, IsNil) |
127 | + s.State.StartSync() |
128 | + select { |
129 | + case m, ok := <-w.Changes(): |
130 | + c.Assert(ok, Equals, true) |
131 | + c.Assert(m.Id(), Equals, s.machine.Id()) |
132 | + var info machineInfo |
133 | + // TODO AgentTools is missing. |
134 | + info.tools = test.want.tools |
135 | + //info.tools, err = m.AgentTools() |
136 | + //c.Assert(err, IsNil) |
137 | + info.instanceId, err = m.InstanceId() |
138 | + if _, ok := err.(*state.NotFoundError); !ok { |
139 | + c.Assert(err, IsNil) |
140 | + } |
141 | + c.Assert(info, DeepEquals, test.want) |
142 | + case <-time.After(500 * time.Millisecond): |
143 | + c.Fatalf("did not get change: %v", test.want) |
144 | + } |
145 | + } |
146 | + select { |
147 | + case got := <-w.Changes(): |
148 | + c.Fatalf("got unexpected change: %#v", got) |
149 | + case <-time.After(100 * time.Millisecond): |
150 | + } |
151 | +} |
152 | |
153 | === modified file 'mstate/open.go' |
154 | --- mstate/open.go 2012-09-06 22:12:07 +0000 |
155 | +++ mstate/open.go 2012-09-10 18:17:20 +0000 |
156 | @@ -1,10 +1,12 @@ |
157 | package mstate |
158 | |
159 | import ( |
160 | + "fmt" |
161 | "labix.org/v2/mgo" |
162 | "labix.org/v2/mgo/txn" |
163 | "launchpad.net/juju-core/log" |
164 | "launchpad.net/juju-core/mstate/presence" |
165 | + "launchpad.net/juju-core/mstate/watcher" |
166 | ) |
167 | |
168 | var indexes = []mgo.Index{ |
169 | @@ -12,6 +14,14 @@ |
170 | {Key: []string{"endpoints.servicename"}}, |
171 | } |
172 | |
173 | +// The capped collection used for transaction logs defaults to 200MB. |
174 | +// It's tweaked in export_test.go to 1MB to avoid the overhead of |
175 | +// creating and deleting the large file repeatedly. |
176 | +var ( |
177 | + logSize = 200000000 |
178 | + logSizeTests = 1000000 |
179 | +) |
180 | + |
181 | func Dial(servers string) (*State, error) { |
182 | log.Printf("opening state with servers: %q", servers) |
183 | session, err := mgo.Dial(servers) |
184 | @@ -19,32 +29,44 @@ |
185 | return nil, err |
186 | } |
187 | db := session.DB("juju") |
188 | - presencedb := session.DB("presence") |
189 | - txns := db.C("txns") |
190 | + pdb := session.DB("presence") |
191 | st := &State{ |
192 | - db: db, |
193 | - presencedb: presencedb, |
194 | - charms: db.C("charms"), |
195 | - machines: db.C("machines"), |
196 | - relations: db.C("relations"), |
197 | - services: db.C("services"), |
198 | - settings: db.C("settings"), |
199 | - units: db.C("units"), |
200 | - presence: presencedb.C("presence"), |
201 | - runner: txn.NewRunner(txns), |
202 | - } |
203 | - st.presencew = presence.NewWatcher(st.presence) |
204 | + db: db, |
205 | + charms: db.C("charms"), |
206 | + machines: db.C("machines"), |
207 | + relations: db.C("relations"), |
208 | + services: db.C("services"), |
209 | + settings: db.C("settings"), |
210 | + units: db.C("units"), |
211 | + presence: pdb.C("presence"), |
212 | + } |
213 | + log := db.C("txns.log") |
214 | + info := mgo.CollectionInfo{Capped: true, MaxBytes: logSize} |
215 | + // Quite unfortunate that the error has no appropriate code. |
216 | + if err := log.Create(&info); err != nil && err.Error() != "collection already exists" { |
217 | + return nil, fmt.Errorf("cannot create log collection: %v", err) |
218 | + } |
219 | + st.runner = txn.NewRunner(db.C("txns")) |
220 | + st.runner.ChangeLog(db.C("txns.log")) |
221 | + st.watcher = watcher.New(db.C("txns.log")) |
222 | + st.presencew = presence.NewWatcher(pdb.C("presence")) |
223 | for _, index := range indexes { |
224 | err = st.relations.EnsureIndex(index) |
225 | if err != nil { |
226 | - return nil, err |
227 | + return nil, fmt.Errorf("cannot create database index: %v", err) |
228 | } |
229 | } |
230 | return st, nil |
231 | } |
232 | |
233 | func (st *State) Close() error { |
234 | - err := st.presencew.Stop() |
235 | + err1 := st.presencew.Stop() |
236 | + err2 := st.watcher.Stop() |
237 | st.db.Session.Close() |
238 | - return err |
239 | + for _, err := range []error{err1, err2} { |
240 | + if err != nil { |
241 | + return err |
242 | + } |
243 | + } |
244 | + return nil |
245 | } |
246 | |
247 | === modified file 'mstate/state.go' |
248 | --- mstate/state.go 2012-09-06 16:54:22 +0000 |
249 | +++ mstate/state.go 2012-09-10 18:17:20 +0000 |
250 | @@ -11,6 +11,7 @@ |
251 | "launchpad.net/juju-core/charm" |
252 | "launchpad.net/juju-core/environs/config" |
253 | "launchpad.net/juju-core/mstate/presence" |
254 | + "launchpad.net/juju-core/mstate/watcher" |
255 | "launchpad.net/juju-core/trivial" |
256 | "launchpad.net/juju-core/version" |
257 | "net/url" |
258 | @@ -28,7 +29,6 @@ |
259 | // managed by juju. |
260 | type State struct { |
261 | db *mgo.Database |
262 | - presencedb *mgo.Database |
263 | charms *mgo.Collection |
264 | machines *mgo.Collection |
265 | relations *mgo.Collection |
266 | @@ -36,8 +36,9 @@ |
267 | settings *mgo.Collection |
268 | units *mgo.Collection |
269 | presence *mgo.Collection |
270 | + runner *txn.Runner |
271 | + watcher *watcher.Watcher |
272 | presencew *presence.Watcher |
273 | - runner *txn.Runner |
274 | } |
275 | |
276 | func deadOnAbort(err error) error { |
277 | @@ -369,3 +370,11 @@ |
278 | func (s *State) ForcePresenceRefresh() { |
279 | s.presencew.ForceRefresh() |
280 | } |
281 | + |
282 | +// StartSync forces watchers to resynchronize their state with the |
283 | +// database immediately. This will happen periodically automatically. |
284 | +func (s *State) StartSync() { |
285 | + // TODO Make presence more like watcher, add it here, and |
286 | + // remove ForcePresenceRefresh. |
287 | + s.watcher.StartSync() |
288 | +} |
289 | |
290 | === modified file 'mstate/state_test.go' |
291 | --- mstate/state_test.go 2012-09-06 16:54:22 +0000 |
292 | +++ mstate/state_test.go 2012-09-10 18:17:20 +0000 |
293 | @@ -7,7 +7,7 @@ |
294 | "launchpad.net/juju-core/charm" |
295 | "launchpad.net/juju-core/environs/config" |
296 | state "launchpad.net/juju-core/mstate" |
297 | - coretesting "launchpad.net/juju-core/testing" |
298 | + "launchpad.net/juju-core/testing" |
299 | "net/url" |
300 | ) |
301 | |
302 | @@ -19,9 +19,18 @@ |
303 | |
304 | var _ = Suite(&StateSuite{}) |
305 | |
306 | +func (s *StateSuite) TestDialAgain(c *C) { |
307 | + // Ensure idempotent operations on Dial are working fine. |
308 | + for i := 0; i < 2; i++ { |
309 | + st, err := state.Dial(testing.MgoAddr) |
310 | + c.Assert(err, IsNil) |
311 | + c.Assert(st.Close(), IsNil) |
312 | + } |
313 | +} |
314 | + |
315 | func (s *StateSuite) TestAddCharm(c *C) { |
316 | // Check that adding charms from scratch works correctly. |
317 | - ch := coretesting.Charms.Dir("dummy") |
318 | + ch := testing.Charms.Dir("dummy") |
319 | curl := charm.MustParseURL( |
320 | fmt.Sprintf("local:series/%s-%d", ch.Meta().Name, ch.Revision()), |
321 | ) |
322 | |
323 | === added file 'mstate/watcher.go' |
324 | --- mstate/watcher.go 1970-01-01 00:00:00 +0000 |
325 | +++ mstate/watcher.go 2012-09-10 18:17:20 +0000 |
326 | @@ -0,0 +1,74 @@ |
327 | +package mstate |
328 | + |
329 | +import ( |
330 | + "launchpad.net/juju-core/mstate/watcher" |
331 | + "launchpad.net/tomb" |
332 | +) |
333 | + |
334 | +// MachineWatcher observes changes to the settings of a machine. |
335 | +type MachineWatcher struct { |
336 | + changeChan chan *Machine |
337 | + tomb tomb.Tomb |
338 | +} |
339 | + |
340 | +// newMachineWatcher creates and starts a watcher to watch information |
341 | +// about the machine. |
342 | +func newMachineWatcher(m *Machine) *MachineWatcher { |
343 | + w := &MachineWatcher{changeChan: make(chan *Machine)} |
344 | + go func() { |
345 | + defer w.tomb.Done() |
346 | + defer close(w.changeChan) |
347 | + w.tomb.Kill(w.loop(m)) |
348 | + }() |
349 | + return w |
350 | +} |
351 | + |
352 | +// Changes returns a channel that will receive the new |
353 | +// *Machine when a change is detected. Note that multiple |
354 | +// changes may be observed as a single event in the channel. |
355 | +// The first event on the channel holds the initial state |
356 | +// as returned by Machine.Info. |
357 | +func (w *MachineWatcher) Changes() <-chan *Machine { |
358 | + return w.changeChan |
359 | +} |
360 | + |
361 | +func (w *MachineWatcher) Stop() error { |
362 | + w.tomb.Kill(nil) |
363 | + return w.tomb.Wait() |
364 | +} |
365 | + |
366 | +func (w *MachineWatcher) loop(m *Machine) (err error) { |
367 | + ch := make(chan watcher.Change) |
368 | + id := m.Id() |
369 | + st := m.st |
370 | + st.watcher.Watch(st.machines.Name, id, m.doc.TxnRevno, ch) |
371 | + defer st.watcher.Unwatch(st.machines.Name, id, ch) |
372 | + for { |
373 | + select { |
374 | + case <-st.watcher.Dying(): |
375 | + return watcher.MustErr(st.watcher) |
376 | + case <-w.tomb.Dying(): |
377 | + return tomb.ErrDying |
378 | + case <-ch: |
379 | + } |
380 | + if m, err = st.Machine(id); err != nil { |
381 | + return err |
382 | + } |
383 | + for { |
384 | + select { |
385 | + case <-st.watcher.Dying(): |
386 | + return watcher.MustErr(st.watcher) |
387 | + case <-w.tomb.Dying(): |
388 | + return tomb.ErrDying |
389 | + case <-ch: |
390 | + if err := m.Refresh(); err != nil { |
391 | + return err |
392 | + } |
393 | + continue |
394 | + case w.changeChan <- m: |
395 | + } |
396 | + break |
397 | + } |
398 | + } |
399 | + return nil |
400 | +} |
Reviewers: mp+123614_ code.launchpad. net,
Message:
Please take a look.
Description:
mstate: add machine watcher
This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.
https:/ /code.launchpad .net/~niemeyer/ juju-core/ mstate- machine- watcher/ +merge/ 123614
Requires: /code.launchpad .net/~niemeyer/ juju-core/ mstate- watcher- helpers/ +merge/ 123596
https:/
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/6489104/
Affected files: export_ test.go machine_ test.go state_test. go
A [revision details]
M mstate/
M mstate/machine.go
M mstate/
M mstate/open.go
M mstate/state.go
M mstate/
A mstate/watcher.go