Merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:~go-bot/juju-core/trunk

Proposed by Michael Foord
Status: Merged
Approved by: Roger Peppe
Approved revision: no longer in the source branch.
Merged at revision: 2428
Proposed branch: lp:~mfoord/juju-core/instancepoller-aggregate
Merge into: lp:~go-bot/juju-core/trunk
Diff against target: 444 lines (+330/-19)
8 files modified
container/lxc/lxc.go (+1/-1)
dependencies.tsv (+2/-1)
instance/address.go (+8/-0)
instance/address_test.go (+9/-0)
worker/instancepoller/aggregate.go (+120/-0)
worker/instancepoller/aggregate_test.go (+187/-0)
worker/instancepoller/worker.go (+2/-17)
worker/instancepoller/worker_test.go (+1/-0)
To merge this branch: bzr merge lp:~mfoord/juju-core/instancepoller-aggregate
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+209966@code.launchpad.net

Commit message

worker/instancepoller: batch instancepoller

use ratelimit Token Bucket to batch
requests in the instance poller.

Mitigates launchpad issue 1277397

https://codereview.appspot.com/74900044/

Description of the change

worker/instancepoller: batch instancepoller

use ratelimit Token Bucket to batch
requests in the instance poller.

Mitigates launchpad issue 1277397

https://codereview.appspot.com/74900044/

To post a comment you must log in.
Revision history for this message
Michael Foord (mfoord) wrote :

Reviewers: mp+209966_code.launchpad.net,

Message:
Please take a look.

Description:
worker/instancepoller: batch instancepoller

use ratelimit Token Bucket to batch
requests in the instance poller.

Mitigates launchpad issue 1277397

https://code.launchpad.net/~mfoord/juju-core/instancepoller-aggregate/+merge/209966

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/74900044/

Affected files (+324, -18 lines):
   A [revision details]
   M dependencies.tsv
   A worker/instancepoller/aggregate.go
   A worker/instancepoller/aggregate_test.go
   M worker/instancepoller/worker.go
   M worker/instancepoller/worker_test.go

Revision history for this message
Nate Finch (natefinch) wrote :
Download full text (3.2 KiB)

Generally LGTM... just a few minor things.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go
File worker/instancepoller/aggregate.go (right):

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode22
worker/instancepoller/aggregate.go:22: type aggregator struct {
Commenting this would make it easier for people to understand what it
does. It's not exported, so it's not strictly necessary, but it's nice
:)

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode60
worker/instancepoller/aggregate.go:60: var GatherTime = 3 * time.Second
Always comment all public fields :)

Actually... why are these public? They seem to be an implementation
detail.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode67
worker/instancepoller/aggregate.go:67: bucket :=
ratelimit.New(GatherTime, Capacity)
I'm not sure we really need a rate limiter here. A simple timer channel
would work just as well. I don't know that it's worth changing at this
point, but take a look at time.After(), it's a super useful construct
for making a channel that fires after a certain period of time.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode101
worker/instancepoller/aggregate.go:101: // instanceInfo returns the
instance info for the given id
name in comment doesn't match the function

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode103
worker/instancepoller/aggregate.go:103: func (*aggregator) instInfo(id
instance.Id, inst instance.Instance) (instanceInfo, error) {
Since you're not using the receiver (*aggregator), you can just make
this a bare function and not a method on aggregator.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode109
worker/instancepoller/aggregate.go:109: return instanceInfo{}, err
probably a good idea to prepend this error with some identifying
information, like fmt.Errorf("error getting addresses for instance %v:
%v", id, err)

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate_test.go
File worker/instancepoller/aggregate_test.go (right):

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate_test.go#newcode60
worker/instancepoller/aggregate_test.go:60: return
bare returns are generally discouraged except where required for doing
tricks with defer. you can just make this return result, err... it seem
redundant, but it makes it more clear what's actually being returned.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate_test.go#newcode177
worker/instancepoller/aggregate_test.go:177: c.Assert(reply.err,
gc.DeepEquals, fmt.Errorf("gotcha"))
minor, but better if you can factor out the gotcha error to a global
variable in this file.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate_test.go#newcode193
worker/instancepoller/aggregate_test.go:193: c.Assert(err, gc.NotNil)
we have a policy in this case to check for the specific error, even
though it's not exposed by the package at all.

https://c...

Read more...

Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (7.7 KiB)

The code and the test coverage looks generally great, thanks. Quite a
few comments here but nothing particularly substantial.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go
File worker/instancepoller/aggregate.go (right):

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode60
worker/instancepoller/aggregate.go:60: var GatherTime = 3 * time.Second
On 2014/03/12 19:48:51, nate.finch wrote:
> Always comment all public fields :)

> Actually... why are these public? They seem to be an implementation
detail.

agreed - there's no need to export this, or Capcity.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode67
worker/instancepoller/aggregate.go:67: bucket :=
ratelimit.New(GatherTime, Capacity)
On 2014/03/12 19:48:51, nate.finch wrote:
> I'm not sure we really need a rate limiter here. A simple timer
channel would
> work just as well. I don't know that it's worth changing at this
point, but
> take a look at time.After(), it's a super useful construct for making
a channel
> that fires after a certain period of time.

We're already using time.After here (well, timer.Reset, which saves us
making a new timer every time). We're using the rate limiter to inform
the time value that gets passed to the timer. We could probably inline
the logic here, but it's not quite as simple as it might seem at first
glance (the original version of this logic just waited for three seconds
from the first request received, but that's not ideal because we don't
really always want to delay sporadic requests by 3 seconds)

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode72
worker/instancepoller/aggregate.go:72: case req, ok := <-a.reqc:
We never close reqc, so there's no need to check whether it's closed.

case req := <-a.reqc:

should be sufficient.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode103
worker/instancepoller/aggregate.go:103: func (*aggregator) instInfo(id
instance.Id, inst instance.Instance) (instanceInfo, error) {
On 2014/03/12 19:48:51, nate.finch wrote:
> Since you're not using the receiver (*aggregator), you can just make
this a bare
> function and not a method on aggregator.

Given that it's only used as part of aggregator, it seems reasonable to
make it a method, so it's obviously distinct from the rest of the
package.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate.go#newcode109
worker/instancepoller/aggregate.go:109: return instanceInfo{}, err
On 2014/03/12 19:48:51, nate.finch wrote:
> probably a good idea to prepend this error with some identifying
information,
> like fmt.Errorf("error getting addresses for instance %v: %v", id,
err)

That would break logic elsewhere in the package, which checks
errors.NotImplementedError. When we start using errgo, that
information flow will become more evident.

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate_test.go
File worker/instancepoller/aggregate_test.go (right):

https://codereview.appspot.com/74900044/diff/1/worker/instancepoller/aggregate_test.g...

Read more...

Revision history for this message
Michael Foord (mfoord) wrote :

There's new code (tests especially) that needs reviewing. I may have one
or two more points from these reviews to address. "lbox propose" is
currently failing so it maybe a case of reverting to the diff in the mp
I'm afraid.

https://codereview.appspot.com/74900044/

Revision history for this message
Michael Foord (mfoord) wrote :

On 2014/03/13 14:37:40, mfoord wrote:
> There's new code (tests especially) that needs reviewing. I may have
one or two
> more points from these reviews to address. "lbox propose" is currently
failing
> so it maybe a case of reverting to the diff in the mp I'm afraid.

I still need to add the NewAddresses utility function and add a comment
about the batching. Possibly also remove some unneeded instance.Id(...)
type conversions. The type I'm using for testInstanceGetter.counter is
wrong (should really be a thread safe counter even though it's strictly
unnecessary).

I have removed the loop return on channel dying code, so there are code
changes as well as test changes.

https://codereview.appspot.com/74900044/

Revision history for this message
Michael Foord (mfoord) wrote :
Revision history for this message
Roger Peppe (rogpeppe) wrote :

LGTM with the timing-based test fixed to make it less potentially flaky.

https://codereview.appspot.com/74900044/diff/20001/worker/instancepoller/aggregate.go
File worker/instancepoller/aggregate.go (right):

https://codereview.appspot.com/74900044/diff/20001/worker/instancepoller/aggregate.go#newcode60
worker/instancepoller/aggregate.go:60: var capacity int64 = 1
const capacity = 1

or just inline it, possibly with a comment saying
why we choose a capacity of 1 for the bucket.

https://codereview.appspot.com/74900044/diff/20001/worker/instancepoller/aggregate_test.go
File worker/instancepoller/aggregate_test.go (right):

https://codereview.appspot.com/74900044/diff/20001/worker/instancepoller/aggregate_test.go#newcode47
worker/instancepoller/aggregate_test.go:47: ids []instance.Id
It would still be nice to see a comment here.

https://codereview.appspot.com/74900044/diff/20001/worker/instancepoller/aggregate_test.go#newcode48
worker/instancepoller/aggregate_test.go:48: results []*testInstance
I'd still suggest that this is []instance.Instance, unless there's a
particular reason not to.

Then Instances is just:

func (i *testInstanceGetter) Instances(ids []instance.Id) (result
[]instance.Instance, err error) {
      i.id = ids
      atomic.AddInt32(i.counter, 1)
      return i.result, i.err
}

https://codereview.appspot.com/74900044/diff/20001/worker/instancepoller/aggregate_test.go#newcode148
worker/instancepoller/aggregate_test.go:148:
c.Assert(testGetter.counter, jc.GreaterThan, 10)
I think this is potentially flaky. If this test runs on a heavily loaded
machine, the millisecond sleeps may expand indefinitely.

I think a better approach might be to measure the total time taken
between starting the requests and receiving all the results, then
checking that the number of requests is no greater than that time
divided by 10ms + 1.

https://codereview.appspot.com/74900044/

Revision history for this message
Michael Foord (mfoord) wrote :
Revision history for this message
Roger Peppe (rogpeppe) wrote :

LGTM with the fix below.

https://codereview.appspot.com/74900044/diff/40001/worker/instancepoller/aggregate_test.go
File worker/instancepoller/aggregate_test.go (right):

https://codereview.appspot.com/74900044/diff/40001/worker/instancepoller/aggregate_test.go#newcode57
worker/instancepoller/aggregate_test.go:57: i.counter =
atomic.AddInt32(&i.counter, 1)
ha ha, this is wrong, although it will work in the single-threaded case.
The reason AddInt32 takes a pointer is because it does the atomic
increment itself. By assigning to i.counter outside of that, we raise
the possibility that we might undo an add made by another concurrent
call to AddInt32.

just lose the "i.counter = " bit.

https://codereview.appspot.com/74900044/

Revision history for this message
Go Bot (go-bot) wrote :

There are additional revisions which have not been approved in review. Please seek review and approval of these new revisions.

Revision history for this message
Go Bot (go-bot) wrote :

Attempt to merge into lp:juju-core failed due to conflicts:

text conflict in dependencies.tsv

Revision history for this message
Go Bot (go-bot) wrote :

The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.

worker/instancepoller/aggregate.go:9:2: cannot find package "github.com/juju/ratelimit" in any of:
 /usr/lib/go/src/pkg/github.com/juju/ratelimit (from $GOROOT)
 /home/tarmac/trees/src/github.com/juju/ratelimit (from $GOPATH)

Revision history for this message
Go Bot (go-bot) wrote :

The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.

worker/instancepoller/aggregate.go:9:2: cannot find package "github.com/juju/ratelimit" in any of:
 /usr/lib/go/src/pkg/github.com/juju/ratelimit (from $GOROOT)
 /home/tarmac/trees/src/github.com/juju/ratelimit (from $GOPATH)

Revision history for this message
Go Bot (go-bot) wrote :

The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.

/bin/sh: 1: godeps: not found

Revision history for this message
Go Bot (go-bot) wrote :

The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.

/bin/sh: 1: godeps: not found

Revision history for this message
Go Bot (go-bot) wrote :

The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.

/bin/sh: 1: godeps: not found

Revision history for this message
Go Bot (go-bot) wrote :

The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.

/bin/sh: 1: godeps: not found

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'container/lxc/lxc.go'
2--- container/lxc/lxc.go 2014-03-13 01:02:49 +0000
3+++ container/lxc/lxc.go 2014-03-14 10:52:20 +0000
4@@ -59,7 +59,7 @@
5 // The filesystem is the second line.
6 lines := strings.Split(string(out), "\n")
7 if len(lines) < 2 {
8- logger.Errorf("unexpected output: ", out)
9+ logger.Errorf("unexpected output: %q", out)
10 return "", fmt.Errorf("could not determine filesystem type")
11 }
12 return lines[1], nil
13
14=== modified file 'dependencies.tsv'
15--- dependencies.tsv 2014-03-14 04:38:34 +0000
16+++ dependencies.tsv 2014-03-14 10:52:20 +0000
17@@ -1,9 +1,10 @@
18 code.google.com/p/go.crypto hg 6478cc9340cbbe6c04511280c5007722269108e9 184
19 code.google.com/p/go.net hg 3591c18acabc99439c783463ef00e6dc277eee39 77
20-labix.org/v2/mgo bzr gustavo@niemeyer.net-20131118213720-aralgr4ienh0gdyq 248
21 github.com/errgo/errgo git 93d72bf813883d1054cae1c001d3a46603f7f559
22 github.com/juju/loggo git fa3acf9ab9ed09aea29030558528e24a254d27af
23+github.com/juju/ratelimit git 0025ab75db6c6eaa4ffff0240c2c9e617ad1a0eb
24 github.com/juju/testing git 9c0e0686136637876ae659e9056897575236e11f
25+labix.org/v2/mgo bzr gustavo@niemeyer.net-20131118213720-aralgr4ienh0gdyq 248
26 launchpad.net/gnuflag bzr roger.peppe@canonical.com-20121003093437-zcyyw0lpvj2nifpk 12
27 launchpad.net/goamz bzr roger.peppe@canonical.com-20131218155244-hbnkvlkkzy3vmlh9 44
28 launchpad.net/gocheck bzr gustavo@niemeyer.net-20130302024745-6ikofwq2c03h7giu 85
29
30=== modified file 'instance/address.go'
31--- instance/address.go 2014-01-30 02:43:45 +0000
32+++ instance/address.go 2014-03-14 10:52:20 +0000
33@@ -64,6 +64,14 @@
34 return buf.String()
35 }
36
37+// NewAddresses is a convenience function to create addresses from a string slice
38+func NewAddresses(inAddresses []string) (outAddresses []Address) {
39+ for _, address := range inAddresses {
40+ outAddresses = append(outAddresses, NewAddress(address))
41+ }
42+ return outAddresses
43+}
44+
45 func DeriveAddressType(value string) AddressType {
46 ip := net.ParseIP(value)
47 if ip != nil {
48
49=== modified file 'instance/address_test.go'
50--- instance/address_test.go 2013-10-03 10:38:06 +0000
51+++ instance/address_test.go 2014-03-14 10:52:20 +0000
52@@ -30,6 +30,15 @@
53 c.Check(addr.Type, gc.Equals, Ipv6Address)
54 }
55
56+func (s *AddressSuite) TestNewAddresses(c *gc.C) {
57+ addresses := NewAddresses(
58+ []string{"127.0.0.1", "192.168.1.1", "192.168.178.255"})
59+ c.Assert(len(addresses), gc.Equals, 3)
60+ c.Assert(addresses[0].Value, gc.Equals, "127.0.0.1")
61+ c.Assert(addresses[1].Value, gc.Equals, "192.168.1.1")
62+ c.Assert(addresses[2].Value, gc.Equals, "192.168.178.255")
63+}
64+
65 func (s *AddressSuite) TestNewAddressHostname(c *gc.C) {
66 addr := NewAddress("localhost")
67 c.Check(addr.Value, gc.Equals, "localhost")
68
69=== added file 'worker/instancepoller/aggregate.go'
70--- worker/instancepoller/aggregate.go 1970-01-01 00:00:00 +0000
71+++ worker/instancepoller/aggregate.go 2014-03-14 10:52:20 +0000
72@@ -0,0 +1,120 @@
73+// Copyright 2014 Canonical Ltd.
74+// Licensed under the AGPLv3, see LICENCE file for details.
75+
76+package instancepoller
77+
78+import (
79+ "time"
80+
81+ "github.com/juju/ratelimit"
82+ "launchpad.net/tomb"
83+
84+ "launchpad.net/juju-core/environs"
85+ "launchpad.net/juju-core/errors"
86+ "launchpad.net/juju-core/instance"
87+)
88+
89+type instanceGetter interface {
90+ Instances(ids []instance.Id) ([]instance.Instance, error)
91+}
92+
93+type aggregator struct {
94+ environ instanceGetter
95+ reqc chan instanceInfoReq
96+ tomb tomb.Tomb
97+}
98+
99+func newAggregator(env instanceGetter) *aggregator {
100+ a := &aggregator{
101+ environ: env,
102+ reqc: make(chan instanceInfoReq),
103+ }
104+ go func() {
105+ defer a.tomb.Done()
106+ a.tomb.Kill(a.loop())
107+ }()
108+ return a
109+}
110+
111+type instanceInfoReq struct {
112+ instId instance.Id
113+ reply chan<- instanceInfoReply
114+}
115+
116+type instanceInfoReply struct {
117+ info instanceInfo
118+ err error
119+}
120+
121+func (a *aggregator) instanceInfo(id instance.Id) (instanceInfo, error) {
122+ reply := make(chan instanceInfoReply)
123+ a.reqc <- instanceInfoReq{
124+ instId: id,
125+ reply: reply,
126+ }
127+ r := <-reply
128+ return r.info, r.err
129+}
130+
131+var gatherTime = 3 * time.Second
132+
133+func (a *aggregator) loop() error {
134+ timer := time.NewTimer(0)
135+ timer.Stop()
136+ var reqs []instanceInfoReq
137+ // We use a capacity of 1 so that sporadic requests will
138+ // be serviced immediately without having to wait.
139+ bucket := ratelimit.New(gatherTime, 1)
140+ for {
141+ select {
142+ case <-a.tomb.Dying():
143+ return tomb.ErrDying
144+ case req := <-a.reqc:
145+ if len(reqs) == 0 {
146+ waitTime := bucket.Take(1)
147+ timer.Reset(waitTime)
148+ }
149+ reqs = append(reqs, req)
150+ case <-timer.C:
151+ ids := make([]instance.Id, len(reqs))
152+ for i, req := range reqs {
153+ ids[i] = req.instId
154+ }
155+ insts, err := a.environ.Instances(ids)
156+ for i, req := range reqs {
157+ var reply instanceInfoReply
158+ if err != nil && err != environs.ErrPartialInstances {
159+ reply.err = err
160+ } else {
161+ reply.info, reply.err = a.instInfo(req.instId, insts[i])
162+ }
163+ req.reply <- reply
164+ }
165+ reqs = nil
166+ }
167+ }
168+}
169+
170+// instInfo returns the instance info for the given id
171+// and instance. If inst is nil, it returns a not-found error.
172+func (*aggregator) instInfo(id instance.Id, inst instance.Instance) (instanceInfo, error) {
173+ if inst == nil {
174+ return instanceInfo{}, errors.NotFoundf("instance %v", id)
175+ }
176+ addr, err := inst.Addresses()
177+ if err != nil {
178+ return instanceInfo{}, err
179+ }
180+ return instanceInfo{
181+ addr,
182+ inst.Status(),
183+ }, nil
184+}
185+
186+func (a *aggregator) Kill() {
187+ a.tomb.Kill(nil)
188+}
189+
190+func (a *aggregator) Wait() error {
191+ return a.tomb.Wait()
192+}
193
194=== added file 'worker/instancepoller/aggregate_test.go'
195--- worker/instancepoller/aggregate_test.go 1970-01-01 00:00:00 +0000
196+++ worker/instancepoller/aggregate_test.go 2014-03-14 10:52:20 +0000
197@@ -0,0 +1,187 @@
198+// Copyright 2014 Canonical Ltd.
199+// Licensed under the AGPLv3, see LICENCE file for details.
200+
201+package instancepoller
202+
203+import (
204+ "fmt"
205+ "sync"
206+ "sync/atomic"
207+ "time"
208+
209+ gc "launchpad.net/gocheck"
210+
211+ "launchpad.net/juju-core/environs"
212+ "launchpad.net/juju-core/errors"
213+ "launchpad.net/juju-core/instance"
214+ jc "launchpad.net/juju-core/testing/checkers"
215+ "launchpad.net/juju-core/testing/testbase"
216+)
217+
218+type aggregateSuite struct {
219+ testbase.LoggingSuite
220+}
221+
222+var _ = gc.Suite(&aggregateSuite{})
223+
224+type testInstance struct {
225+ instance.Instance
226+ addresses []instance.Address
227+ status string
228+ err error
229+}
230+
231+var _ instance.Instance = (*testInstance)(nil)
232+
233+func (t *testInstance) Addresses() ([]instance.Address, error) {
234+ if t.err != nil {
235+ return nil, t.err
236+ }
237+ return t.addresses, nil
238+}
239+
240+func (t *testInstance) Status() string {
241+ return t.status
242+}
243+
244+type testInstanceGetter struct {
245+ // ids is set when the Instances method is called.
246+ ids []instance.Id
247+ results []instance.Instance
248+ err error
249+ counter int32
250+}
251+
252+func (i *testInstanceGetter) Instances(ids []instance.Id) (result []instance.Instance, err error) {
253+ i.ids = ids
254+ atomic.AddInt32(&i.counter, 1)
255+ return i.results, i.err
256+}
257+
258+func newTestInstance(status string, addresses []string) *testInstance {
259+ thisInstance := testInstance{status: status}
260+ thisInstance.addresses = instance.NewAddresses(addresses)
261+ return &thisInstance
262+}
263+
264+func (s *aggregateSuite) TestSingleRequest(c *gc.C) {
265+ testGetter := new(testInstanceGetter)
266+ instance1 := newTestInstance("foobar", []string{"127.0.0.1", "192.168.1.1"})
267+ testGetter.results = []instance.Instance{instance1}
268+ aggregator := newAggregator(testGetter)
269+
270+ info, err := aggregator.instanceInfo("foo")
271+ c.Assert(err, gc.IsNil)
272+ c.Assert(info, gc.DeepEquals, instanceInfo{
273+ status: "foobar",
274+ addresses: instance1.addresses,
275+ })
276+ c.Assert(testGetter.ids, gc.DeepEquals, []instance.Id{"foo"})
277+}
278+
279+func (s *aggregateSuite) TestMultipleResponseHandling(c *gc.C) {
280+ s.PatchValue(&gatherTime, 30*time.Millisecond)
281+ testGetter := new(testInstanceGetter)
282+
283+ instance1 := newTestInstance("foobar", []string{"127.0.0.1", "192.168.1.1"})
284+ testGetter.results = []instance.Instance{instance1}
285+ aggregator := newAggregator(testGetter)
286+
287+ replyChan := make(chan instanceInfoReply)
288+ req := instanceInfoReq{
289+ reply: replyChan,
290+ instId: instance.Id("foo"),
291+ }
292+ aggregator.reqc <- req
293+ reply := <-replyChan
294+ c.Assert(reply.err, gc.IsNil)
295+
296+ instance2 := newTestInstance("not foobar", []string{"192.168.1.2"})
297+ instance3 := newTestInstance("ok-ish", []string{"192.168.1.3"})
298+ testGetter.results = []instance.Instance{instance2, instance3}
299+
300+ var wg sync.WaitGroup
301+ checkInfo := func(id instance.Id, expectStatus string) {
302+ info, err := aggregator.instanceInfo(id)
303+ c.Check(err, gc.IsNil)
304+ c.Check(info.status, gc.Equals, expectStatus)
305+ wg.Done()
306+ }
307+
308+ wg.Add(2)
309+ go checkInfo("foo2", "not foobar")
310+ go checkInfo("foo3", "ok-ish")
311+ wg.Wait()
312+
313+ c.Assert(len(testGetter.ids), gc.DeepEquals, 2)
314+}
315+
316+func (s *aggregateSuite) TestBatching(c *gc.C) {
317+ s.PatchValue(&gatherTime, 10*time.Millisecond)
318+ testGetter := new(testInstanceGetter)
319+
320+ aggregator := newAggregator(testGetter)
321+ for i := 0; i < 100; i++ {
322+ testGetter.results = append(testGetter.results, newTestInstance("foobar", []string{"127.0.0.1", "192.168.1.1"}))
323+ }
324+ var wg sync.WaitGroup
325+ makeRequest := func() {
326+ _, err := aggregator.instanceInfo("foo")
327+ c.Check(err, gc.IsNil)
328+ wg.Done()
329+ }
330+ startTime := time.Now()
331+ wg.Add(100)
332+ for i := 0; i < 100; i++ {
333+ go makeRequest()
334+ time.Sleep(time.Millisecond)
335+ }
336+ wg.Wait()
337+ totalTime := time.Now().Sub(startTime)
338+ // +1 because we expect one extra call for the first request
339+ expectedMax := int((totalTime / (10 * time.Millisecond)) + 1)
340+ c.Assert(testGetter.counter, jc.LessThan, expectedMax+1)
341+ c.Assert(testGetter.counter, jc.GreaterThan, 10)
342+}
343+
344+func (s *aggregateSuite) TestError(c *gc.C) {
345+ testGetter := new(testInstanceGetter)
346+ ourError := fmt.Errorf("Some error")
347+ testGetter.err = ourError
348+
349+ aggregator := newAggregator(testGetter)
350+
351+ _, err := aggregator.instanceInfo("foo")
352+ c.Assert(err, gc.Equals, ourError)
353+}
354+
355+func (s *aggregateSuite) TestPartialErrResponse(c *gc.C) {
356+ testGetter := new(testInstanceGetter)
357+ testGetter.err = environs.ErrPartialInstances
358+ testGetter.results = []instance.Instance{nil}
359+
360+ aggregator := newAggregator(testGetter)
361+ _, err := aggregator.instanceInfo("foo")
362+
363+ c.Assert(err, gc.DeepEquals, errors.NotFoundf("instance foo"))
364+}
365+
366+func (s *aggregateSuite) TestAddressesError(c *gc.C) {
367+ testGetter := new(testInstanceGetter)
368+ instance1 := newTestInstance("foobar", []string{"127.0.0.1", "192.168.1.1"})
369+ ourError := fmt.Errorf("gotcha")
370+ instance1.err = ourError
371+ testGetter.results = []instance.Instance{instance1}
372+
373+ aggregator := newAggregator(testGetter)
374+ _, err := aggregator.instanceInfo("foo")
375+ c.Assert(err, gc.Equals, ourError)
376+}
377+
378+func (s *aggregateSuite) TestKillAndWait(c *gc.C) {
379+ testGetter := new(testInstanceGetter)
380+ aggregator := newAggregator(testGetter)
381+ aggregator.Kill()
382+ err := aggregator.Wait()
383+ c.Assert(err, gc.IsNil)
384+}
385
386=== modified file 'worker/instancepoller/worker.go'
387--- worker/instancepoller/worker.go 2014-01-23 20:30:35 +0000
388+++ worker/instancepoller/worker.go 2014-03-14 10:52:20 +0000
389@@ -6,7 +6,6 @@
390 import (
391 "launchpad.net/tomb"
392
393- "launchpad.net/juju-core/instance"
394 "launchpad.net/juju-core/state"
395 "launchpad.net/juju-core/worker"
396 )
397@@ -14,6 +13,7 @@
398 type updaterWorker struct {
399 st *state.State
400 tomb tomb.Tomb
401+ *aggregator
402
403 observer *environObserver
404 }
405@@ -46,6 +46,7 @@
406 if err != nil {
407 return err
408 }
409+ u.aggregator = newAggregator(u.observer.environ)
410 logger.Infof("instance poller received inital environment configuration")
411 defer func() {
412 obsErr := worker.Stop(u.observer)
413@@ -71,19 +72,3 @@
414 func (u *updaterWorker) killAll(err error) {
415 u.tomb.Kill(err)
416 }
417-
418-func (u *updaterWorker) instanceInfo(id instance.Id) (instanceInfo, error) {
419- env := u.observer.Environ()
420- insts, err := env.Instances([]instance.Id{id})
421- if err != nil {
422- return instanceInfo{}, err
423- }
424- addr, err := insts[0].Addresses()
425- if err != nil {
426- return instanceInfo{}, err
427- }
428- return instanceInfo{
429- addr,
430- insts[0].Status(),
431- }, nil
432-}
433
434=== modified file 'worker/instancepoller/worker_test.go'
435--- worker/instancepoller/worker_test.go 2014-03-13 07:54:56 +0000
436+++ worker/instancepoller/worker_test.go 2014-03-14 10:52:20 +0000
437@@ -42,6 +42,7 @@
438 // correctly.
439 s.PatchValue(&ShortPoll, 10*time.Millisecond)
440 s.PatchValue(&LongPoll, 10*time.Millisecond)
441+ s.PatchValue(&gatherTime, 10*time.Millisecond)
442 machines, insts := s.setupScenario(c)
443 s.State.StartSync()
444 w := NewWorker(s.State)

Subscribers

People subscribed via source and target branches

to status/vote changes: