Merge lp:~niemeyer/juju-core/mstate-machine-watcher into lp:~juju/juju-core/trunk

Proposed by Gustavo Niemeyer
Status: Merged
Merged at revision: 481
Proposed branch: lp:~niemeyer/juju-core/mstate-machine-watcher
Merge into: lp:~juju/juju-core/trunk
Prerequisite: lp:~niemeyer/juju-core/mstate-watcher-helpers
Diff against target: 400 lines (+246/-21)
7 files modified
mstate/export_test.go (+4/-0)
mstate/machine.go (+6/-0)
mstate/machine_test.go (+101/-0)
mstate/open.go (+39/-17)
mstate/state.go (+11/-2)
mstate/state_test.go (+11/-2)
mstate/watcher.go (+74/-0)
To merge this branch: bzr merge lp:~niemeyer/juju-core/mstate-machine-watcher
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+123614@code.launchpad.net

Description of the change

mstate: add machine watcher

This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.

https://codereview.appspot.com/6489104/

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Reviewers: mp+123614_code.launchpad.net,

Message:
Please take a look.

Description:
mstate: add machine watcher

This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.

https://code.launchpad.net/~niemeyer/juju-core/mstate-machine-watcher/+merge/123614

Requires:
https://code.launchpad.net/~niemeyer/juju-core/mstate-watcher-helpers/+merge/123596

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/6489104/

Affected files:
   A [revision details]
   M mstate/export_test.go
   M mstate/machine.go
   M mstate/machine_test.go
   M mstate/open.go
   M mstate/state.go
   M mstate/state_test.go
   A mstate/watcher.go

Revision history for this message
William Reade (fwereade) wrote :

LGTM modulo locale worries.

https://codereview.appspot.com/6489104/diff/1/mstate/open.go
File mstate/open.go (right):

https://codereview.appspot.com/6489104/diff/1/mstate/open.go#newcode45
mstate/open.go:45: // Quite unfortunate that the error has no
appropriate code.
Very much so. Can we be screwed by locale settings here?

https://codereview.appspot.com/6489104/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

*** Submitted:

mstate: add machine watcher

This is the first high-level watcher after the foundation for
watcher is in place, so it adds a few missing pieces onto the
state logic itself too.

R=fwereade
CC=
https://codereview.appspot.com/6489104

https://codereview.appspot.com/6489104/diff/1/mstate/open.go
File mstate/open.go (right):

https://codereview.appspot.com/6489104/diff/1/mstate/open.go#newcode45
mstate/open.go:45: // Quite unfortunate that the error has no
appropriate code.
On 2012/09/11 07:28:07, fwereade wrote:
> Very much so. Can we be screwed by locale settings here?

Not at the moment at least. I've checked the upstream code and it
doesn't translate the message.

I've included in the comment the URL of the upstream bug reported on
this issue.

https://codereview.appspot.com/6489104/

Revision history for this message
Aram Hăvărneanu (aramh) wrote :

https://codereview.appspot.com/6489104/diff/5001/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6489104/diff/5001/mstate/machine.go#newcode23
mstate/machine.go:23: TxnRevno int64 `bson:"txn-revno"`
This complicates tests and watchers because Insert can't set TxnRevno
and DeepEquals will fail. How about reverting this, and create a new
machineDocTxn like this:

type machineDocTxn struct {
 machineDoc `bson:",inline"`
 TxnRevno int64 `bson:"txn-revno"`
}

and use that where we need it?

https://codereview.appspot.com/6489104/diff/5001/mstate/watcher.go
File mstate/watcher.go (right):

https://codereview.appspot.com/6489104/diff/5001/mstate/watcher.go#newcode68
mstate/watcher.go:68: case w.changeChan <- m:
Shouldn't this be a default case, and w.changeChan <- m *after* the
select? That way we will always completely drain ch before sending m.
Now there's a chance we will send m before draining ch.

https://codereview.appspot.com/6489104/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

https://codereview.appspot.com/6489104/diff/5001/mstate/machine.go
File mstate/machine.go (right):

https://codereview.appspot.com/6489104/diff/5001/mstate/machine.go#newcode23
mstate/machine.go:23: TxnRevno int64 `bson:"txn-revno"`
On 2012/09/11 17:40:53, aram wrote:
> This complicates tests and watchers because Insert can't set TxnRevno
and
> DeepEquals will fail. How about reverting this, and create a new
machineDocTxn
> like this:

The reasoning for doing it doesn't sound great. Doing DeepEquals on such
a structure is most definitely wrong. Private data is private, and by
definition we should be able to cache information freely or change it
without breaking tests that work on the public APIs.

It sounds sensible to have a omitempty setting there, though, so that we
don't try to insert it. Can you please make the change on a follow up
CL?

https://codereview.appspot.com/6489104/diff/5001/mstate/watcher.go
File mstate/watcher.go (right):

https://codereview.appspot.com/6489104/diff/5001/mstate/watcher.go#newcode68
mstate/watcher.go:68: case w.changeChan <- m:
On 2012/09/11 17:40:53, aram wrote:
> Shouldn't this be a default case, and w.changeChan <- m *after* the
select? That
> way we will always completely drain ch before sending m. Now there's a
chance we
> will send m before draining ch.

No, there's some misunderstanding about how the watcher works. The
channel is never drained, and the construction here is purposefully
operating *while* trying to send.

https://codereview.appspot.com/6489104/

Revision history for this message
Aram Hăvărneanu (aramh) wrote :

> The reasoning for doing it doesn't sound great. Doing DeepEquals on
such
> a structure is most definitely wrong. Private data is private, and by
> definition we should be able to cache information freely or change it
> without breaking tests that work on the public APIs.

It may be wrong, but in that case we have to do lots of work to fix it
because all the documents will have this field.

  white:mstate$ cd state
  /home/aram/src/launchpad.net/juju-core/state
  white:state$ g DeepEquals | wc -l
  89
  white:state$ cd mstate
  /home/aram/src/launchpad.net/juju-core/mstate
  white:mstate$ g DeepEquals | wc -l
  58

To be honest, using DeepEquals in tests always felt wrong to be because
it was checking the implementation details instead of relying on the
interface, but I assumed it was ok since this is the way state tests
were written in the first place.

I will put omitempty, but this doesn't fix the tests. Entities returned
by Insert will have TxtRevno always zero but entities returned by other
querying functions or by Refresh will have a non-zero TxtRevno.

If you are against this change, perhaps an alternative is to call
Refresh() in tests where this might matter.

https://codereview.appspot.com/6489104/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

On Tue, Sep 11, 2012 at 3:13 PM, <email address hidden> wrote:
> white:mstate$ cd state
> /home/aram/src/launchpad.net/juju-core/state
> white:state$ g DeepEquals | wc -l
> 89
> white:state$ cd mstate
> /home/aram/src/launchpad.net/juju-core/mstate
> white:mstate$ g DeepEquals | wc -l
> 58

This is counting the number of times we've used DeepEquals, which
doesn't really mean much.

How many of those entries are comparing the private fields of machine?

> I will put omitempty, but this doesn't fix the tests. Entities returned
> by Insert will have TxtRevno always zero but entities returned by other
> querying functions or by Refresh will have a non-zero TxtRevno.

That's a good point. How can we fix it?

gustavo @ http://niemeyer.net

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'mstate/export_test.go'
--- mstate/export_test.go 2012-08-16 13:44:09 +0000
+++ mstate/export_test.go 2012-09-10 18:17:20 +0000
@@ -12,3 +12,7 @@
12 m := &Machine{doc: machineDoc(*doc)}12 m := &Machine{doc: machineDoc(*doc)}
13 return m.String()13 return m.String()
14}14}
15
16func init() {
17 logSize = logSizeTests
18}
1519
=== modified file 'mstate/machine.go'
--- mstate/machine.go 2012-09-06 17:04:54 +0000
+++ mstate/machine.go 2012-09-10 18:17:20 +0000
@@ -20,6 +20,7 @@
20 Id int `bson:"_id"`20 Id int `bson:"_id"`
21 InstanceId string21 InstanceId string
22 Life Life22 Life Life
23 TxnRevno int64 `bson:"txn-revno"`
23}24}
2425
25func newMachine(st *State, doc *machineDoc) *Machine {26func newMachine(st *State, doc *machineDoc) *Machine {
@@ -73,6 +74,11 @@
73 return nil74 return nil
74}75}
7576
77// Watch returns a watcher that fires when the machine changes.
78func (m *Machine) Watch() *MachineWatcher {
79 return newMachineWatcher(m)
80}
81
76// AgentAlive returns whether the respective remote agent is alive.82// AgentAlive returns whether the respective remote agent is alive.
77func (m *Machine) AgentAlive() bool {83func (m *Machine) AgentAlive() bool {
78 return m.st.presencew.Alive(m.globalKey())84 return m.st.presencew.Alive(m.globalKey())
7985
=== modified file 'mstate/machine_test.go'
--- mstate/machine_test.go 2012-09-06 17:04:54 +0000
+++ mstate/machine_test.go 2012-09-10 18:17:20 +0000
@@ -3,6 +3,7 @@
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 state "launchpad.net/juju-core/mstate"5 state "launchpad.net/juju-core/mstate"
6 "launchpad.net/juju-core/version"
6 "sort"7 "sort"
7 "time"8 "time"
8)9)
@@ -220,3 +221,103 @@
220 sort.Strings(names)221 sort.Strings(names)
221 return names222 return names
222}223}
224
225type machineInfo struct {
226 tools *state.Tools
227 instanceId string
228}
229
230func tools(tools int, url string) *state.Tools {
231 return &state.Tools{
232 URL: url,
233 Binary: version.Binary{
234 Number: version.Number{0, 0, tools},
235 Series: "series",
236 Arch: "arch",
237 },
238 }
239}
240
241var watchMachineTests = []struct {
242 test func(m *state.Machine) error
243 want machineInfo
244}{
245 {
246 func(m *state.Machine) error {
247 return nil
248 },
249 machineInfo{
250 tools: &state.Tools{},
251 },
252 },
253 {
254 func(m *state.Machine) error {
255 return m.SetInstanceId("m-foo")
256 },
257 machineInfo{
258 tools: &state.Tools{},
259 instanceId: "m-foo",
260 },
261 },
262 {
263 func(m *state.Machine) error {
264 return m.SetInstanceId("")
265 },
266 machineInfo{
267 tools: &state.Tools{},
268 instanceId: "",
269 },
270 },
271 // TODO SetAgentTools is missing.
272 //{
273 // func(m *state.Machine) error {
274 // return m.SetAgentTools(tools(3, "baz"))
275 // },
276 // machineInfo{
277 // tools: tools(3, "baz"),
278 // },
279 //},
280 //{
281 // func(m *state.Machine) error {
282 // return m.SetAgentTools(tools(4, "khroomph"))
283 // },
284 // machineInfo{
285 // tools: tools(4, "khroomph"),
286 // },
287 //},
288}
289
290func (s *MachineSuite) TestWatchMachine(c *C) {
291 w := s.machine.Watch()
292 defer func() {
293 c.Assert(w.Stop(), IsNil)
294 }()
295 for i, test := range watchMachineTests {
296 c.Logf("test %d", i)
297 err := test.test(s.machine)
298 c.Assert(err, IsNil)
299 s.State.StartSync()
300 select {
301 case m, ok := <-w.Changes():
302 c.Assert(ok, Equals, true)
303 c.Assert(m.Id(), Equals, s.machine.Id())
304 var info machineInfo
305 // TODO AgentTools is missing.
306 info.tools = test.want.tools
307 //info.tools, err = m.AgentTools()
308 //c.Assert(err, IsNil)
309 info.instanceId, err = m.InstanceId()
310 if _, ok := err.(*state.NotFoundError); !ok {
311 c.Assert(err, IsNil)
312 }
313 c.Assert(info, DeepEquals, test.want)
314 case <-time.After(500 * time.Millisecond):
315 c.Fatalf("did not get change: %v", test.want)
316 }
317 }
318 select {
319 case got := <-w.Changes():
320 c.Fatalf("got unexpected change: %#v", got)
321 case <-time.After(100 * time.Millisecond):
322 }
323}
223324
=== modified file 'mstate/open.go'
--- mstate/open.go 2012-09-06 22:12:07 +0000
+++ mstate/open.go 2012-09-10 18:17:20 +0000
@@ -1,10 +1,12 @@
1package mstate1package mstate
22
3import (3import (
4 "fmt"
4 "labix.org/v2/mgo"5 "labix.org/v2/mgo"
5 "labix.org/v2/mgo/txn"6 "labix.org/v2/mgo/txn"
6 "launchpad.net/juju-core/log"7 "launchpad.net/juju-core/log"
7 "launchpad.net/juju-core/mstate/presence"8 "launchpad.net/juju-core/mstate/presence"
9 "launchpad.net/juju-core/mstate/watcher"
8)10)
911
10var indexes = []mgo.Index{12var indexes = []mgo.Index{
@@ -12,6 +14,14 @@
12 {Key: []string{"endpoints.servicename"}},14 {Key: []string{"endpoints.servicename"}},
13}15}
1416
17// The capped collection used for transaction logs defaults to 200MB.
18// It's tweaked in export_test.go to 1MB to avoid the overhead of
19// creating and deleting the large file repeatedly.
20var (
21 logSize = 200000000
22 logSizeTests = 1000000
23)
24
15func Dial(servers string) (*State, error) {25func Dial(servers string) (*State, error) {
16 log.Printf("opening state with servers: %q", servers)26 log.Printf("opening state with servers: %q", servers)
17 session, err := mgo.Dial(servers)27 session, err := mgo.Dial(servers)
@@ -19,32 +29,44 @@
19 return nil, err29 return nil, err
20 }30 }
21 db := session.DB("juju")31 db := session.DB("juju")
22 presencedb := session.DB("presence")32 pdb := session.DB("presence")
23 txns := db.C("txns")
24 st := &State{33 st := &State{
25 db: db,34 db: db,
26 presencedb: presencedb,35 charms: db.C("charms"),
27 charms: db.C("charms"),36 machines: db.C("machines"),
28 machines: db.C("machines"),37 relations: db.C("relations"),
29 relations: db.C("relations"),38 services: db.C("services"),
30 services: db.C("services"),39 settings: db.C("settings"),
31 settings: db.C("settings"),40 units: db.C("units"),
32 units: db.C("units"),41 presence: pdb.C("presence"),
33 presence: presencedb.C("presence"),42 }
34 runner: txn.NewRunner(txns),43 log := db.C("txns.log")
35 }44 info := mgo.CollectionInfo{Capped: true, MaxBytes: logSize}
36 st.presencew = presence.NewWatcher(st.presence)45 // Quite unfortunate that the error has no appropriate code.
46 if err := log.Create(&info); err != nil && err.Error() != "collection already exists" {
47 return nil, fmt.Errorf("cannot create log collection: %v", err)
48 }
49 st.runner = txn.NewRunner(db.C("txns"))
50 st.runner.ChangeLog(db.C("txns.log"))
51 st.watcher = watcher.New(db.C("txns.log"))
52 st.presencew = presence.NewWatcher(pdb.C("presence"))
37 for _, index := range indexes {53 for _, index := range indexes {
38 err = st.relations.EnsureIndex(index)54 err = st.relations.EnsureIndex(index)
39 if err != nil {55 if err != nil {
40 return nil, err56 return nil, fmt.Errorf("cannot create database index: %v", err)
41 }57 }
42 }58 }
43 return st, nil59 return st, nil
44}60}
4561
46func (st *State) Close() error {62func (st *State) Close() error {
47 err := st.presencew.Stop()63 err1 := st.presencew.Stop()
64 err2 := st.watcher.Stop()
48 st.db.Session.Close()65 st.db.Session.Close()
49 return err66 for _, err := range []error{err1, err2} {
67 if err != nil {
68 return err
69 }
70 }
71 return nil
50}72}
5173
=== modified file 'mstate/state.go'
--- mstate/state.go 2012-09-06 16:54:22 +0000
+++ mstate/state.go 2012-09-10 18:17:20 +0000
@@ -11,6 +11,7 @@
11 "launchpad.net/juju-core/charm"11 "launchpad.net/juju-core/charm"
12 "launchpad.net/juju-core/environs/config"12 "launchpad.net/juju-core/environs/config"
13 "launchpad.net/juju-core/mstate/presence"13 "launchpad.net/juju-core/mstate/presence"
14 "launchpad.net/juju-core/mstate/watcher"
14 "launchpad.net/juju-core/trivial"15 "launchpad.net/juju-core/trivial"
15 "launchpad.net/juju-core/version"16 "launchpad.net/juju-core/version"
16 "net/url"17 "net/url"
@@ -28,7 +29,6 @@
28// managed by juju.29// managed by juju.
29type State struct {30type State struct {
30 db *mgo.Database31 db *mgo.Database
31 presencedb *mgo.Database
32 charms *mgo.Collection32 charms *mgo.Collection
33 machines *mgo.Collection33 machines *mgo.Collection
34 relations *mgo.Collection34 relations *mgo.Collection
@@ -36,8 +36,9 @@
36 settings *mgo.Collection36 settings *mgo.Collection
37 units *mgo.Collection37 units *mgo.Collection
38 presence *mgo.Collection38 presence *mgo.Collection
39 runner *txn.Runner
40 watcher *watcher.Watcher
39 presencew *presence.Watcher41 presencew *presence.Watcher
40 runner *txn.Runner
41}42}
4243
43func deadOnAbort(err error) error {44func deadOnAbort(err error) error {
@@ -369,3 +370,11 @@
369func (s *State) ForcePresenceRefresh() {370func (s *State) ForcePresenceRefresh() {
370 s.presencew.ForceRefresh()371 s.presencew.ForceRefresh()
371}372}
373
374// StartSync forces watchers to resynchronize their state with the
375// database immediately. This will happen periodically automatically.
376func (s *State) StartSync() {
377 // TODO Make presence more like watcher, add it here, and
378 // remove ForcePresenceRefresh.
379 s.watcher.StartSync()
380}
372381
=== modified file 'mstate/state_test.go'
--- mstate/state_test.go 2012-09-06 16:54:22 +0000
+++ mstate/state_test.go 2012-09-10 18:17:20 +0000
@@ -7,7 +7,7 @@
7 "launchpad.net/juju-core/charm"7 "launchpad.net/juju-core/charm"
8 "launchpad.net/juju-core/environs/config"8 "launchpad.net/juju-core/environs/config"
9 state "launchpad.net/juju-core/mstate"9 state "launchpad.net/juju-core/mstate"
10 coretesting "launchpad.net/juju-core/testing"10 "launchpad.net/juju-core/testing"
11 "net/url"11 "net/url"
12)12)
1313
@@ -19,9 +19,18 @@
1919
20var _ = Suite(&StateSuite{})20var _ = Suite(&StateSuite{})
2121
22func (s *StateSuite) TestDialAgain(c *C) {
23 // Ensure idempotent operations on Dial are working fine.
24 for i := 0; i < 2; i++ {
25 st, err := state.Dial(testing.MgoAddr)
26 c.Assert(err, IsNil)
27 c.Assert(st.Close(), IsNil)
28 }
29}
30
22func (s *StateSuite) TestAddCharm(c *C) {31func (s *StateSuite) TestAddCharm(c *C) {
23 // Check that adding charms from scratch works correctly.32 // Check that adding charms from scratch works correctly.
24 ch := coretesting.Charms.Dir("dummy")33 ch := testing.Charms.Dir("dummy")
25 curl := charm.MustParseURL(34 curl := charm.MustParseURL(
26 fmt.Sprintf("local:series/%s-%d", ch.Meta().Name, ch.Revision()),35 fmt.Sprintf("local:series/%s-%d", ch.Meta().Name, ch.Revision()),
27 )36 )
2837
=== added file 'mstate/watcher.go'
--- mstate/watcher.go 1970-01-01 00:00:00 +0000
+++ mstate/watcher.go 2012-09-10 18:17:20 +0000
@@ -0,0 +1,74 @@
1package mstate
2
3import (
4 "launchpad.net/juju-core/mstate/watcher"
5 "launchpad.net/tomb"
6)
7
8// MachineWatcher observes changes to the settings of a machine.
9type MachineWatcher struct {
10 changeChan chan *Machine
11 tomb tomb.Tomb
12}
13
14// newMachineWatcher creates and starts a watcher to watch information
15// about the machine.
16func newMachineWatcher(m *Machine) *MachineWatcher {
17 w := &MachineWatcher{changeChan: make(chan *Machine)}
18 go func() {
19 defer w.tomb.Done()
20 defer close(w.changeChan)
21 w.tomb.Kill(w.loop(m))
22 }()
23 return w
24}
25
26// Changes returns a channel that will receive the new
27// *Machine when a change is detected. Note that multiple
28// changes may be observed as a single event in the channel.
29// The first event on the channel holds the initial state
30// as returned by Machine.Info.
31func (w *MachineWatcher) Changes() <-chan *Machine {
32 return w.changeChan
33}
34
35func (w *MachineWatcher) Stop() error {
36 w.tomb.Kill(nil)
37 return w.tomb.Wait()
38}
39
40func (w *MachineWatcher) loop(m *Machine) (err error) {
41 ch := make(chan watcher.Change)
42 id := m.Id()
43 st := m.st
44 st.watcher.Watch(st.machines.Name, id, m.doc.TxnRevno, ch)
45 defer st.watcher.Unwatch(st.machines.Name, id, ch)
46 for {
47 select {
48 case <-st.watcher.Dying():
49 return watcher.MustErr(st.watcher)
50 case <-w.tomb.Dying():
51 return tomb.ErrDying
52 case <-ch:
53 }
54 if m, err = st.Machine(id); err != nil {
55 return err
56 }
57 for {
58 select {
59 case <-st.watcher.Dying():
60 return watcher.MustErr(st.watcher)
61 case <-w.tomb.Dying():
62 return tomb.ErrDying
63 case <-ch:
64 if err := m.Refresh(); err != nil {
65 return err
66 }
67 continue
68 case w.changeChan <- m:
69 }
70 break
71 }
72 }
73 return nil
74}

Subscribers

People subscribed via source and target branches