Merge lp:~niemeyer/juju-core/mstate-machine-watcher into lp:~juju/juju-core/trunk

Proposed by Gustavo Niemeyer
Status: Merged
Merged at revision: 481
Proposed branch: lp:~niemeyer/juju-core/mstate-machine-watcher
Merge into: lp:~juju/juju-core/trunk
Prerequisite: lp:~niemeyer/juju-core/mstate-watcher-helpers
Diff against target: 400 lines (+246/-21)
7 files modified
mstate/export_test.go (+4/-0)
mstate/machine.go (+6/-0)
mstate/machine_test.go (+101/-0)
mstate/open.go (+39/-17)
mstate/state.go (+11/-2)
mstate/state_test.go (+11/-2)
mstate/watcher.go (+74/-0)
To merge this branch: bzr merge lp:~niemeyer/juju-core/mstate-machine-watcher
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+123614@code.launchpad.net

Description of the change

mstate: add machine watcher

This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.

https://codereview.appspot.com/6489104/

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Reviewers: mp+123614_code.launchpad.net,

Message:
Please take a look.

Description:
mstate: add machine watcher

This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.

https://code.launchpad.net/~niemeyer/juju-core/mstate-machine-watcher/+merge/123614

Requires:
https://code.launchpad.net/~niemeyer/juju-core/mstate-watcher-helpers/+merge/123596

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/6489104/

Affected files:
   A [revision details]
   M mstate/export_test.go
   M mstate/machine.go
   M mstate/machine_test.go
   M mstate/open.go
   M mstate/state.go
   M mstate/state_test.go
   A mstate/watcher.go

Revision history for this message
William Reade (fwereade) wrote :

LGTM modulo locale worries.

https://codereview.appspot.com/6489104/diff/1/mstate/open.go
File mstate/open.go (right):

https://codereview.appspot.com/6489104/diff/1/mstate/open.go#newcode45
mstate/open.go:45: // Quite unfortunate that the error has no
appropriate code.
Very much so. Can we be screwed by locale settings here?

https://codereview.appspot.com/6489104/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

*** Submitted:

mstate: add machine watcher

This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.

R=fwereade
CC=
https://codereview.appspot.com/6489104

https://codereview.appspot.com/6489104/diff/1/mstate/open.go
File mstate/open.go (right):

https://codereview.appspot.com/6489104/diff/1/mstate/open.go#newcode45
mstate/open.go:45: // Quite unfortunate that the error has no
appropriate code.
On 2012/09/11 07:28:07, fwereade wrote:
> Very much so. Can we be screwed by locale settings here?

Not at the moment at least. I've checked the upstream code and it
doesn't translate the message.

I've included in the comment the URL of the upstream bug reported on
this issue.

https://codereview.appspot.com/6489104/

Revision history for this message
Aram Hăvărneanu (aramh) wrote :

https://codereview.appspot.com/6489104/diff/5001/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6489104/diff/5001/mstate/machine.go#newcode23
mstate/machine.go:23: TxnRevno int64 `bson:"txn-revno"`
This complicates tests and watchers because Insert can't set TxnRevno
and DeepEquals will fail. How about reverting this, and create a new
machineDocTxn like this:

type machineDocTxn struct {
 machineDoc `bson:",inline"`
 TxnRevno int64 `bson:"txn-revno"`
}

and use that where we need it?

https://codereview.appspot.com/6489104/diff/5001/mstate/watcher.go
File mstate/watcher.go (right):

https://codereview.appspot.com/6489104/diff/5001/mstate/watcher.go#newcode68
mstate/watcher.go:68: case w.changeChan <- m:
Shouldn't this be a default case, and w.changeChan <- m *after* the
select? That way we will always completely drain ch before sending m.
Now there's a chance we will send m before draining ch.

https://codereview.appspot.com/6489104/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

https://codereview.appspot.com/6489104/diff/5001/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6489104/diff/5001/mstate/machine.go#newcode23
mstate/machine.go:23: TxnRevno int64 `bson:"txn-revno"`
On 2012/09/11 17:40:53, aram wrote:
> This complicates tests and watchers because Insert can't set TxnRevno
and
> DeepEquals will fail. How about reverting this, and create a new
machineDocTxn
> like this:

The reasoning for doing it doesn't sound great. Doing DeepEquals on such
a structure is most definitely wrong. Private data is private, and by
definition we should be able to cache information freely or change it
without breaking tests that work on the public APIs.

It sounds sensible to have a omitempty setting there, though, so that we
don't try to insert it. Can you please make the change on a follow up
CL?

https://codereview.appspot.com/6489104/diff/5001/mstate/watcher.go
File mstate/watcher.go (right):

https://codereview.appspot.com/6489104/diff/5001/mstate/watcher.go#newcode68
mstate/watcher.go:68: case w.changeChan <- m:
On 2012/09/11 17:40:53, aram wrote:
> Shouldn't this be a default case, and w.changeChan <- m *after* the
select? That
> way we will always completely drain ch before sending m. Now there's a
chance we
> will send m before draining ch.

No, there's some misunderstanding about how the watcher works. The
channel is never drained, and the construction here is purposefully
operating *while* trying to send.

https://codereview.appspot.com/6489104/

Revision history for this message
Aram Hăvărneanu (aramh) wrote :

> The reasoning for doing it doesn't sound great. Doing DeepEquals on
such
> a structure is most definitely wrong. Private data is private, and by
> definition we should be able to cache information freely or change it
> without breaking tests that work on the public APIs.

It may be wrong, but in that case we have to do lots of work to fix it
because all the documents will have this field.

  white:mstate$ cd state
  /home/aram/src/launchpad.net/juju-core/state
  white:state$ g DeepEquals | wc -l
  89
  white:state$ cd mstate
  /home/aram/src/launchpad.net/juju-core/mstate
  white:mstate$ g DeepEquals | wc -l
  58

To be honest, using DeepEquals in tests always felt wrong to be because
it was checking the implementation details instead of relying on the
interface, but I assumed it was ok since this is the way state tests
were written in the first place.

I will put omitempty, but this doesn't fix the tests. Entities returned
by Insert will have TxtRevno always zero but entities returned by other
querying functions or by Refresh will have a non-zero TxtRevno.

If you are against this change, perhaps an alternative is to call
Refresh() in tests where this might matter.

https://codereview.appspot.com/6489104/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

On Tue, Sep 11, 2012 at 3:13 PM, <email address hidden> wrote:
> white:mstate$ cd state
> /home/aram/src/launchpad.net/juju-core/state
> white:state$ g DeepEquals | wc -l
> 89
> white:state$ cd mstate
> /home/aram/src/launchpad.net/juju-core/mstate
> white:mstate$ g DeepEquals | wc -l
> 58

This is counting the number of times we've used DeepEquals, which
doesn't really mean much.

How many of those entries are comparing the private fields of machine?

> I will put omitempty, but this doesn't fix the tests. Entities returned
> by Insert will have TxtRevno always zero but entities returned by other
> querying functions or by Refresh will have a non-zero TxtRevno.

That's a good point. How can we fix it?

gustavo @ http://niemeyer.net

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'mstate/export_test.go'
2--- mstate/export_test.go 2012-08-16 13:44:09 +0000
3+++ mstate/export_test.go 2012-09-10 18:17:20 +0000
4@@ -12,3 +12,7 @@
5 m := &Machine{doc: machineDoc(*doc)}
6 return m.String()
7 }
8+
9+func init() {
10+ logSize = logSizeTests
11+}
12
13=== modified file 'mstate/machine.go'
14--- mstate/machine.go 2012-09-06 17:04:54 +0000
15+++ mstate/machine.go 2012-09-10 18:17:20 +0000
16@@ -20,6 +20,7 @@
17 Id int `bson:"_id"`
18 InstanceId string
19 Life Life
20+ TxnRevno int64 `bson:"txn-revno"`
21 }
22
23 func newMachine(st *State, doc *machineDoc) *Machine {
24@@ -73,6 +74,11 @@
25 return nil
26 }
27
28+// Watch returns a watcher that fires when the machine changes.
29+func (m *Machine) Watch() *MachineWatcher {
30+ return newMachineWatcher(m)
31+}
32+
33 // AgentAlive returns whether the respective remote agent is alive.
34 func (m *Machine) AgentAlive() bool {
35 return m.st.presencew.Alive(m.globalKey())
36
37=== modified file 'mstate/machine_test.go'
38--- mstate/machine_test.go 2012-09-06 17:04:54 +0000
39+++ mstate/machine_test.go 2012-09-10 18:17:20 +0000
40@@ -3,6 +3,7 @@
41 import (
42 . "launchpad.net/gocheck"
43 state "launchpad.net/juju-core/mstate"
44+ "launchpad.net/juju-core/version"
45 "sort"
46 "time"
47 )
48@@ -220,3 +221,103 @@
49 sort.Strings(names)
50 return names
51 }
52+
53+type machineInfo struct {
54+ tools *state.Tools
55+ instanceId string
56+}
57+
58+func tools(tools int, url string) *state.Tools {
59+ return &state.Tools{
60+ URL: url,
61+ Binary: version.Binary{
62+ Number: version.Number{0, 0, tools},
63+ Series: "series",
64+ Arch: "arch",
65+ },
66+ }
67+}
68+
69+var watchMachineTests = []struct {
70+ test func(m *state.Machine) error
71+ want machineInfo
72+}{
73+ {
74+ func(m *state.Machine) error {
75+ return nil
76+ },
77+ machineInfo{
78+ tools: &state.Tools{},
79+ },
80+ },
81+ {
82+ func(m *state.Machine) error {
83+ return m.SetInstanceId("m-foo")
84+ },
85+ machineInfo{
86+ tools: &state.Tools{},
87+ instanceId: "m-foo",
88+ },
89+ },
90+ {
91+ func(m *state.Machine) error {
92+ return m.SetInstanceId("")
93+ },
94+ machineInfo{
95+ tools: &state.Tools{},
96+ instanceId: "",
97+ },
98+ },
99+ // TODO SetAgentTools is missing.
100+ //{
101+ // func(m *state.Machine) error {
102+ // return m.SetAgentTools(tools(3, "baz"))
103+ // },
104+ // machineInfo{
105+ // tools: tools(3, "baz"),
106+ // },
107+ //},
108+ //{
109+ // func(m *state.Machine) error {
110+ // return m.SetAgentTools(tools(4, "khroomph"))
111+ // },
112+ // machineInfo{
113+ // tools: tools(4, "khroomph"),
114+ // },
115+ //},
116+}
117+
118+func (s *MachineSuite) TestWatchMachine(c *C) {
119+ w := s.machine.Watch()
120+ defer func() {
121+ c.Assert(w.Stop(), IsNil)
122+ }()
123+ for i, test := range watchMachineTests {
124+ c.Logf("test %d", i)
125+ err := test.test(s.machine)
126+ c.Assert(err, IsNil)
127+ s.State.StartSync()
128+ select {
129+ case m, ok := <-w.Changes():
130+ c.Assert(ok, Equals, true)
131+ c.Assert(m.Id(), Equals, s.machine.Id())
132+ var info machineInfo
133+ // TODO AgentTools is missing.
134+ info.tools = test.want.tools
135+ //info.tools, err = m.AgentTools()
136+ //c.Assert(err, IsNil)
137+ info.instanceId, err = m.InstanceId()
138+ if _, ok := err.(*state.NotFoundError); !ok {
139+ c.Assert(err, IsNil)
140+ }
141+ c.Assert(info, DeepEquals, test.want)
142+ case <-time.After(500 * time.Millisecond):
143+ c.Fatalf("did not get change: %v", test.want)
144+ }
145+ }
146+ select {
147+ case got := <-w.Changes():
148+ c.Fatalf("got unexpected change: %#v", got)
149+ case <-time.After(100 * time.Millisecond):
150+ }
151+}
152
153=== modified file 'mstate/open.go'
154--- mstate/open.go 2012-09-06 22:12:07 +0000
155+++ mstate/open.go 2012-09-10 18:17:20 +0000
156@@ -1,10 +1,12 @@
157 package mstate
158
159 import (
160+ "fmt"
161 "labix.org/v2/mgo"
162 "labix.org/v2/mgo/txn"
163 "launchpad.net/juju-core/log"
164 "launchpad.net/juju-core/mstate/presence"
165+ "launchpad.net/juju-core/mstate/watcher"
166 )
167
168 var indexes = []mgo.Index{
169@@ -12,6 +14,14 @@
170 {Key: []string{"endpoints.servicename"}},
171 }
172
173+// The capped collection used for transaction logs defaults to 200MB.
174+// It's tweaked in export_test.go to 1MB to avoid the overhead of
175+// creating and deleting the large file repeatedly.
176+var (
177+ logSize = 200000000
178+ logSizeTests = 1000000
179+)
180+
181 func Dial(servers string) (*State, error) {
182 log.Printf("opening state with servers: %q", servers)
183 session, err := mgo.Dial(servers)
184@@ -19,32 +29,44 @@
185 return nil, err
186 }
187 db := session.DB("juju")
188- presencedb := session.DB("presence")
189- txns := db.C("txns")
190+ pdb := session.DB("presence")
191 st := &State{
192- db: db,
193- presencedb: presencedb,
194- charms: db.C("charms"),
195- machines: db.C("machines"),
196- relations: db.C("relations"),
197- services: db.C("services"),
198- settings: db.C("settings"),
199- units: db.C("units"),
200- presence: presencedb.C("presence"),
201- runner: txn.NewRunner(txns),
202- }
203- st.presencew = presence.NewWatcher(st.presence)
204+ db: db,
205+ charms: db.C("charms"),
206+ machines: db.C("machines"),
207+ relations: db.C("relations"),
208+ services: db.C("services"),
209+ settings: db.C("settings"),
210+ units: db.C("units"),
211+ presence: pdb.C("presence"),
212+ }
213+ log := db.C("txns.log")
214+ info := mgo.CollectionInfo{Capped: true, MaxBytes: logSize}
215+ // Quite unfortunate that the error has no appropriate code.
216+ if err := log.Create(&info); err != nil && err.Error() != "collection already exists" {
217+ return nil, fmt.Errorf("cannot create log collection: %v", err)
218+ }
219+ st.runner = txn.NewRunner(db.C("txns"))
220+ st.runner.ChangeLog(db.C("txns.log"))
221+ st.watcher = watcher.New(db.C("txns.log"))
222+ st.presencew = presence.NewWatcher(pdb.C("presence"))
223 for _, index := range indexes {
224 err = st.relations.EnsureIndex(index)
225 if err != nil {
226- return nil, err
227+ return nil, fmt.Errorf("cannot create database index: %v", err)
228 }
229 }
230 return st, nil
231 }
232
233 func (st *State) Close() error {
234- err := st.presencew.Stop()
235+ err1 := st.presencew.Stop()
236+ err2 := st.watcher.Stop()
237 st.db.Session.Close()
238- return err
239+ for _, err := range []error{err1, err2} {
240+ if err != nil {
241+ return err
242+ }
243+ }
244+ return nil
245 }
246
247=== modified file 'mstate/state.go'
248--- mstate/state.go 2012-09-06 16:54:22 +0000
249+++ mstate/state.go 2012-09-10 18:17:20 +0000
250@@ -11,6 +11,7 @@
251 "launchpad.net/juju-core/charm"
252 "launchpad.net/juju-core/environs/config"
253 "launchpad.net/juju-core/mstate/presence"
254+ "launchpad.net/juju-core/mstate/watcher"
255 "launchpad.net/juju-core/trivial"
256 "launchpad.net/juju-core/version"
257 "net/url"
258@@ -28,7 +29,6 @@
259 // managed by juju.
260 type State struct {
261 db *mgo.Database
262- presencedb *mgo.Database
263 charms *mgo.Collection
264 machines *mgo.Collection
265 relations *mgo.Collection
266@@ -36,8 +36,9 @@
267 settings *mgo.Collection
268 units *mgo.Collection
269 presence *mgo.Collection
270+ runner *txn.Runner
271+ watcher *watcher.Watcher
272 presencew *presence.Watcher
273- runner *txn.Runner
274 }
275
276 func deadOnAbort(err error) error {
277@@ -369,3 +370,11 @@
278 func (s *State) ForcePresenceRefresh() {
279 s.presencew.ForceRefresh()
280 }
281+
282+// StartSync forces watchers to resynchronize their state with the
283+// database immediately. This will happen periodically automatically.
284+func (s *State) StartSync() {
285+ // TODO Make presence more like watcher, add it here, and
286+ // remove ForcePresenceRefresh.
287+ s.watcher.StartSync()
288+}
289
290=== modified file 'mstate/state_test.go'
291--- mstate/state_test.go 2012-09-06 16:54:22 +0000
292+++ mstate/state_test.go 2012-09-10 18:17:20 +0000
293@@ -7,7 +7,7 @@
294 "launchpad.net/juju-core/charm"
295 "launchpad.net/juju-core/environs/config"
296 state "launchpad.net/juju-core/mstate"
297- coretesting "launchpad.net/juju-core/testing"
298+ "launchpad.net/juju-core/testing"
299 "net/url"
300 )
301
302@@ -19,9 +19,18 @@
303
304 var _ = Suite(&StateSuite{})
305
306+func (s *StateSuite) TestDialAgain(c *C) {
307+ // Ensure idempotent operations on Dial are working fine.
308+ for i := 0; i < 2; i++ {
309+ st, err := state.Dial(testing.MgoAddr)
310+ c.Assert(err, IsNil)
311+ c.Assert(st.Close(), IsNil)
312+ }
313+}
314+
315 func (s *StateSuite) TestAddCharm(c *C) {
316 // Check that adding charms from scratch works correctly.
317- ch := coretesting.Charms.Dir("dummy")
318+ ch := testing.Charms.Dir("dummy")
319 curl := charm.MustParseURL(
320 fmt.Sprintf("local:series/%s-%d", ch.Meta().Name, ch.Revision()),
321 )
322
323=== added file 'mstate/watcher.go'
324--- mstate/watcher.go 1970-01-01 00:00:00 +0000
325+++ mstate/watcher.go 2012-09-10 18:17:20 +0000
326@@ -0,0 +1,74 @@
327+package mstate
328+
329+import (
330+ "launchpad.net/juju-core/mstate/watcher"
331+ "launchpad.net/tomb"
332+)
333+
334+// MachineWatcher observes changes to the settings of a machine.
335+type MachineWatcher struct {
336+ changeChan chan *Machine
337+ tomb tomb.Tomb
338+}
339+
340+// newMachineWatcher creates and starts a watcher to watch information
341+// about the machine.
342+func newMachineWatcher(m *Machine) *MachineWatcher {
343+ w := &MachineWatcher{changeChan: make(chan *Machine)}
344+ go func() {
345+ defer w.tomb.Done()
346+ defer close(w.changeChan)
347+ w.tomb.Kill(w.loop(m))
348+ }()
349+ return w
350+}
351+
352+// Changes returns a channel that will receive the new
353+// *Machine when a change is detected. Note that multiple
354+// changes may be observed as a single event in the channel.
355+// The first event on the channel holds the initial state
356+// as returned by Machine.Info.
357+func (w *MachineWatcher) Changes() <-chan *Machine {
358+ return w.changeChan
359+}
360+
361+func (w *MachineWatcher) Stop() error {
362+ w.tomb.Kill(nil)
363+ return w.tomb.Wait()
364+}
365+
366+func (w *MachineWatcher) loop(m *Machine) (err error) {
367+ ch := make(chan watcher.Change)
368+ id := m.Id()
369+ st := m.st
370+ st.watcher.Watch(st.machines.Name, id, m.doc.TxnRevno, ch)
371+ defer st.watcher.Unwatch(st.machines.Name, id, ch)
372+ for {
373+ select {
374+ case <-st.watcher.Dying():
375+ return watcher.MustErr(st.watcher)
376+ case <-w.tomb.Dying():
377+ return tomb.ErrDying
378+ case <-ch:
379+ }
380+ if m, err = st.Machine(id); err != nil {
381+ return err
382+ }
383+ for {
384+ select {
385+ case <-st.watcher.Dying():
386+ return watcher.MustErr(st.watcher)
387+ case <-w.tomb.Dying():
388+ return tomb.ErrDying
389+ case <-ch:
390+ if err := m.Refresh(); err != nil {
391+ return err
392+ }
393+ continue
394+ case w.changeChan <- m:
395+ }
396+ break
397+ }
398+ }
399+ return nil
400+}

Subscribers

People subscribed via source and target branches