Merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:~go-bot/juju-core/trunk
- instancepoller-aggregate
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Roger Peppe | ||||
Approved revision: | no longer in the source branch. | ||||
Merged at revision: | 2428 | ||||
Proposed branch: | lp:~mfoord/juju-core/instancepoller-aggregate | ||||
Merge into: | lp:~go-bot/juju-core/trunk | ||||
Diff against target: |
444 lines (+330/-19) 8 files modified
container/lxc/lxc.go (+1/-1) dependencies.tsv (+2/-1) instance/address.go (+8/-0) instance/address_test.go (+9/-0) worker/instancepoller/aggregate.go (+120/-0) worker/instancepoller/aggregate_test.go (+187/-0) worker/instancepoller/worker.go (+2/-17) worker/instancepoller/worker_test.go (+1/-0) |
||||
To merge this branch: | bzr merge lp:~mfoord/juju-core/instancepoller-aggregate | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+209966@code.launchpad.net |
Commit message
worker/
use ratelimit Token Bucket to batch
requests in the instance poller.
Mitigates launchpad issue 1277397
Description of the change
worker/
use ratelimit Token Bucket to batch
requests in the instance poller.
Mitigates launchpad issue 1277397
Michael Foord (mfoord) wrote : | # |
Nate Finch (natefinch) wrote : | # |
Generally LGTM... just a few minor things.
https:/
File worker/
https:/
worker/
Commenting this would make it easier for people to understand what it
does. It's not exported, so it's not strictly necessary, but it's nice
:)
https:/
worker/
Always comment all public fields :)
Actually... why are these public? They seem to be an implementation
detail.
https:/
worker/
ratelimit.
I'm not sure we really need a rate limiter here. A simple timer channel
would work just as well. I don't know that it's worth changing at this
point, but take a look at time.After(), it's a super useful construct
for making a channel that fires after a certain period of time.
https:/
worker/
instance info for the given id
name in comment doesn't match the function
https:/
worker/
instance.Id, inst instance.Instance) (instanceInfo, error) {
Since you're not using the receiver (*aggregator), you can just make
this a bare function and not a method on aggregator.
https:/
worker/
probably a good idea to prepend this error with some identifying
information, like fmt.Errorf("error getting addresses for instance %v:
%v", id, err)
https:/
File worker/
https:/
worker/
bare returns are generally discouraged except where required for doing
tricks with defer. you can just make this return result, err... it seem
redundant, but it makes it more clear what's actually being returned.
https:/
worker/
gc.DeepEquals, fmt.Errorf(
minor, but better if you can factor out the gotcha error to a global
variable in this file.
https:/
worker/
we have a policy in this case to check for the specific error, even
though it's not exposed by the package at all.
https:/
Roger Peppe (rogpeppe) wrote : | # |
The code and the test coverage looks generally great, thanks. Quite a
few comments here but nothing particularly substantial.
https:/
File worker/
https:/
worker/
On 2014/03/12 19:48:51, nate.finch wrote:
> Always comment all public fields :)
> Actually... why are these public? They seem to be an implementation
detail.
agreed - there's no need to export this, or Capcity.
https:/
worker/
ratelimit.
On 2014/03/12 19:48:51, nate.finch wrote:
> I'm not sure we really need a rate limiter here. A simple timer
channel would
> work just as well. I don't know that it's worth changing at this
point, but
> take a look at time.After(), it's a super useful construct for making
a channel
> that fires after a certain period of time.
We're already using time.After here (well, timer.Reset, which saves us
making a new timer every time). We're using the rate limiter to inform
the time value that gets passed to the timer. We could probably inline
the logic here, but it's not quite as simple as it might seem at first
glance (the original version of this logic just waited for three seconds
from the first request received, but that's not ideal because we don't
really always want to delay sporadic requests by 3 seconds)
https:/
worker/
We never close reqc, so there's no need to check whether it's closed.
case req := <-a.reqc:
should be sufficient.
https:/
worker/
instance.Id, inst instance.Instance) (instanceInfo, error) {
On 2014/03/12 19:48:51, nate.finch wrote:
> Since you're not using the receiver (*aggregator), you can just make
this a bare
> function and not a method on aggregator.
Given that it's only used as part of aggregator, it seems reasonable to
make it a method, so it's obviously distinct from the rest of the
package.
https:/
worker/
On 2014/03/12 19:48:51, nate.finch wrote:
> probably a good idea to prepend this error with some identifying
information,
> like fmt.Errorf("error getting addresses for instance %v: %v", id,
err)
That would break logic elsewhere in the package, which checks
errors.
information flow will become more evident.
https:/
File worker/
https:/
Michael Foord (mfoord) wrote : | # |
There's new code (tests especially) that needs reviewing. I may have one
or two more points from these reviews to address. "lbox propose" is
currently failing so it maybe a case of reverting to the diff in the mp
I'm afraid.
Michael Foord (mfoord) wrote : | # |
On 2014/03/13 14:37:40, mfoord wrote:
> There's new code (tests especially) that needs reviewing. I may have
one or two
> more points from these reviews to address. "lbox propose" is currently
failing
> so it maybe a case of reverting to the diff in the mp I'm afraid.
I still need to add the NewAddresses utility function and add a comment
about the batching. Possibly also remove some unneeded instance.Id(...)
type conversions. The type I'm using for testInstanceGet
wrong (should really be a thread safe counter even though it's strictly
unnecessary).
I have removed the loop return on channel dying code, so there are code
changes as well as test changes.
Michael Foord (mfoord) wrote : | # |
Please take a look.
Roger Peppe (rogpeppe) wrote : | # |
LGTM with the timing-based test fixed to make it less potentially flaky.
https:/
File worker/
https:/
worker/
const capacity = 1
or just inline it, possibly with a comment saying
why we choose a capacity of 1 for the bucket.
https:/
File worker/
https:/
worker/
It would still be nice to see a comment here.
https:/
worker/
I'd still suggest that this is []instance.
particular reason not to.
Then Instances is just:
func (i *testInstanceGe
[]instance.
i.id = ids
atomic.
return i.result, i.err
}
https:/
worker/
c.Assert(
I think this is potentially flaky. If this test runs on a heavily loaded
machine, the millisecond sleeps may expand indefinitely.
I think a better approach might be to measure the total time taken
between starting the requests and receiving all the results, then
checking that the number of requests is no greater than that time
divided by 10ms + 1.
Michael Foord (mfoord) wrote : | # |
Please take a look.
Roger Peppe (rogpeppe) wrote : | # |
LGTM with the fix below.
https:/
File worker/
https:/
worker/
atomic.
ha ha, this is wrong, although it will work in the single-threaded case.
The reason AddInt32 takes a pointer is because it does the atomic
increment itself. By assigning to i.counter outside of that, we raise
the possibility that we might undo an add made by another concurrent
call to AddInt32.
just lose the "i.counter = " bit.
Go Bot (go-bot) wrote : | # |
There are additional revisions which have not been approved in review. Please seek review and approval of these new revisions.
Go Bot (go-bot) wrote : | # |
Attempt to merge into lp:juju-core failed due to conflicts:
text conflict in dependencies.tsv
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.
worker/
/usr/lib/
/home/
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.
worker/
/usr/lib/
/home/
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.
/bin/sh: 1: godeps: not found
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.
/bin/sh: 1: godeps: not found
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.
/bin/sh: 1: godeps: not found
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~mfoord/juju-core/instancepoller-aggregate into lp:juju-core failed. Below is the output from the failed tests.
/bin/sh: 1: godeps: not found
Preview Diff
1 | === modified file 'container/lxc/lxc.go' |
2 | --- container/lxc/lxc.go 2014-03-13 01:02:49 +0000 |
3 | +++ container/lxc/lxc.go 2014-03-14 10:52:20 +0000 |
4 | @@ -59,7 +59,7 @@ |
5 | // The filesystem is the second line. |
6 | lines := strings.Split(string(out), "\n") |
7 | if len(lines) < 2 { |
8 | - logger.Errorf("unexpected output: ", out) |
9 | + logger.Errorf("unexpected output: %q", out) |
10 | return "", fmt.Errorf("could not determine filesystem type") |
11 | } |
12 | return lines[1], nil |
13 | |
14 | === modified file 'dependencies.tsv' |
15 | --- dependencies.tsv 2014-03-14 04:38:34 +0000 |
16 | +++ dependencies.tsv 2014-03-14 10:52:20 +0000 |
17 | @@ -1,9 +1,10 @@ |
18 | code.google.com/p/go.crypto hg 6478cc9340cbbe6c04511280c5007722269108e9 184 |
19 | code.google.com/p/go.net hg 3591c18acabc99439c783463ef00e6dc277eee39 77 |
20 | -labix.org/v2/mgo bzr gustavo@niemeyer.net-20131118213720-aralgr4ienh0gdyq 248 |
21 | github.com/errgo/errgo git 93d72bf813883d1054cae1c001d3a46603f7f559 |
22 | github.com/juju/loggo git fa3acf9ab9ed09aea29030558528e24a254d27af |
23 | +github.com/juju/ratelimit git 0025ab75db6c6eaa4ffff0240c2c9e617ad1a0eb |
24 | github.com/juju/testing git 9c0e0686136637876ae659e9056897575236e11f |
25 | +labix.org/v2/mgo bzr gustavo@niemeyer.net-20131118213720-aralgr4ienh0gdyq 248 |
26 | launchpad.net/gnuflag bzr roger.peppe@canonical.com-20121003093437-zcyyw0lpvj2nifpk 12 |
27 | launchpad.net/goamz bzr roger.peppe@canonical.com-20131218155244-hbnkvlkkzy3vmlh9 44 |
28 | launchpad.net/gocheck bzr gustavo@niemeyer.net-20130302024745-6ikofwq2c03h7giu 85 |
29 | |
30 | === modified file 'instance/address.go' |
31 | --- instance/address.go 2014-01-30 02:43:45 +0000 |
32 | +++ instance/address.go 2014-03-14 10:52:20 +0000 |
33 | @@ -64,6 +64,14 @@ |
34 | return buf.String() |
35 | } |
36 | |
37 | +// NewAddresses is a convenience function to create addresses from a string slice |
38 | +func NewAddresses(inAddresses []string) (outAddresses []Address) { |
39 | + for _, address := range inAddresses { |
40 | + outAddresses = append(outAddresses, NewAddress(address)) |
41 | + } |
42 | + return outAddresses |
43 | +} |
44 | + |
45 | func DeriveAddressType(value string) AddressType { |
46 | ip := net.ParseIP(value) |
47 | if ip != nil { |
48 | |
49 | === modified file 'instance/address_test.go' |
50 | --- instance/address_test.go 2013-10-03 10:38:06 +0000 |
51 | +++ instance/address_test.go 2014-03-14 10:52:20 +0000 |
52 | @@ -30,6 +30,15 @@ |
53 | c.Check(addr.Type, gc.Equals, Ipv6Address) |
54 | } |
55 | |
56 | +func (s *AddressSuite) TestNewAddresses(c *gc.C) { |
57 | + addresses := NewAddresses( |
58 | + []string{"127.0.0.1", "192.168.1.1", "192.168.178.255"}) |
59 | + c.Assert(len(addresses), gc.Equals, 3) |
60 | + c.Assert(addresses[0].Value, gc.Equals, "127.0.0.1") |
61 | + c.Assert(addresses[1].Value, gc.Equals, "192.168.1.1") |
62 | + c.Assert(addresses[2].Value, gc.Equals, "192.168.178.255") |
63 | +} |
64 | + |
65 | func (s *AddressSuite) TestNewAddressHostname(c *gc.C) { |
66 | addr := NewAddress("localhost") |
67 | c.Check(addr.Value, gc.Equals, "localhost") |
68 | |
69 | === added file 'worker/instancepoller/aggregate.go' |
70 | --- worker/instancepoller/aggregate.go 1970-01-01 00:00:00 +0000 |
71 | +++ worker/instancepoller/aggregate.go 2014-03-14 10:52:20 +0000 |
72 | @@ -0,0 +1,120 @@ |
73 | +// Copyright 2014 Canonical Ltd. |
74 | +// Licensed under the AGPLv3, see LICENCE file for details. |
75 | + |
76 | +package instancepoller |
77 | + |
78 | +import ( |
79 | + "time" |
80 | + |
81 | + "github.com/juju/ratelimit" |
82 | + "launchpad.net/tomb" |
83 | + |
84 | + "launchpad.net/juju-core/environs" |
85 | + "launchpad.net/juju-core/errors" |
86 | + "launchpad.net/juju-core/instance" |
87 | +) |
88 | + |
89 | +type instanceGetter interface { |
90 | + Instances(ids []instance.Id) ([]instance.Instance, error) |
91 | +} |
92 | + |
93 | +type aggregator struct { |
94 | + environ instanceGetter |
95 | + reqc chan instanceInfoReq |
96 | + tomb tomb.Tomb |
97 | +} |
98 | + |
99 | +func newAggregator(env instanceGetter) *aggregator { |
100 | + a := &aggregator{ |
101 | + environ: env, |
102 | + reqc: make(chan instanceInfoReq), |
103 | + } |
104 | + go func() { |
105 | + defer a.tomb.Done() |
106 | + a.tomb.Kill(a.loop()) |
107 | + }() |
108 | + return a |
109 | +} |
110 | + |
111 | +type instanceInfoReq struct { |
112 | + instId instance.Id |
113 | + reply chan<- instanceInfoReply |
114 | +} |
115 | + |
116 | +type instanceInfoReply struct { |
117 | + info instanceInfo |
118 | + err error |
119 | +} |
120 | + |
121 | +func (a *aggregator) instanceInfo(id instance.Id) (instanceInfo, error) { |
122 | + reply := make(chan instanceInfoReply) |
123 | + a.reqc <- instanceInfoReq{ |
124 | + instId: id, |
125 | + reply: reply, |
126 | + } |
127 | + r := <-reply |
128 | + return r.info, r.err |
129 | +} |
130 | + |
131 | +var gatherTime = 3 * time.Second |
132 | + |
133 | +func (a *aggregator) loop() error { |
134 | + timer := time.NewTimer(0) |
135 | + timer.Stop() |
136 | + var reqs []instanceInfoReq |
137 | + // We use a capacity of 1 so that sporadic requests will |
138 | + // be serviced immediately without having to wait. |
139 | + bucket := ratelimit.New(gatherTime, 1) |
140 | + for { |
141 | + select { |
142 | + case <-a.tomb.Dying(): |
143 | + return tomb.ErrDying |
144 | + case req := <-a.reqc: |
145 | + if len(reqs) == 0 { |
146 | + waitTime := bucket.Take(1) |
147 | + timer.Reset(waitTime) |
148 | + } |
149 | + reqs = append(reqs, req) |
150 | + case <-timer.C: |
151 | + ids := make([]instance.Id, len(reqs)) |
152 | + for i, req := range reqs { |
153 | + ids[i] = req.instId |
154 | + } |
155 | + insts, err := a.environ.Instances(ids) |
156 | + for i, req := range reqs { |
157 | + var reply instanceInfoReply |
158 | + if err != nil && err != environs.ErrPartialInstances { |
159 | + reply.err = err |
160 | + } else { |
161 | + reply.info, reply.err = a.instInfo(req.instId, insts[i]) |
162 | + } |
163 | + req.reply <- reply |
164 | + } |
165 | + reqs = nil |
166 | + } |
167 | + } |
168 | +} |
169 | + |
170 | +// instInfo returns the instance info for the given id |
171 | +// and instance. If inst is nil, it returns a not-found error. |
172 | +func (*aggregator) instInfo(id instance.Id, inst instance.Instance) (instanceInfo, error) { |
173 | + if inst == nil { |
174 | + return instanceInfo{}, errors.NotFoundf("instance %v", id) |
175 | + } |
176 | + addr, err := inst.Addresses() |
177 | + if err != nil { |
178 | + return instanceInfo{}, err |
179 | + } |
180 | + return instanceInfo{ |
181 | + addr, |
182 | + inst.Status(), |
183 | + }, nil |
184 | +} |
185 | + |
186 | +func (a *aggregator) Kill() { |
187 | + a.tomb.Kill(nil) |
188 | +} |
189 | + |
190 | +func (a *aggregator) Wait() error { |
191 | + return a.tomb.Wait() |
192 | +} |
193 | |
194 | === added file 'worker/instancepoller/aggregate_test.go' |
195 | --- worker/instancepoller/aggregate_test.go 1970-01-01 00:00:00 +0000 |
196 | +++ worker/instancepoller/aggregate_test.go 2014-03-14 10:52:20 +0000 |
197 | @@ -0,0 +1,187 @@ |
198 | +// Copyright 2014 Canonical Ltd. |
199 | +// Licensed under the AGPLv3, see LICENCE file for details. |
200 | + |
201 | +package instancepoller |
202 | + |
203 | +import ( |
204 | + "fmt" |
205 | + "sync" |
206 | + "sync/atomic" |
207 | + "time" |
208 | + |
209 | + gc "launchpad.net/gocheck" |
210 | + |
211 | + "launchpad.net/juju-core/environs" |
212 | + "launchpad.net/juju-core/errors" |
213 | + "launchpad.net/juju-core/instance" |
214 | + jc "launchpad.net/juju-core/testing/checkers" |
215 | + "launchpad.net/juju-core/testing/testbase" |
216 | +) |
217 | + |
218 | +type aggregateSuite struct { |
219 | + testbase.LoggingSuite |
220 | +} |
221 | + |
222 | +var _ = gc.Suite(&aggregateSuite{}) |
223 | + |
224 | +type testInstance struct { |
225 | + instance.Instance |
226 | + addresses []instance.Address |
227 | + status string |
228 | + err error |
229 | +} |
230 | + |
231 | +var _ instance.Instance = (*testInstance)(nil) |
232 | + |
233 | +func (t *testInstance) Addresses() ([]instance.Address, error) { |
234 | + if t.err != nil { |
235 | + return nil, t.err |
236 | + } |
237 | + return t.addresses, nil |
238 | +} |
239 | + |
240 | +func (t *testInstance) Status() string { |
241 | + return t.status |
242 | +} |
243 | + |
244 | +type testInstanceGetter struct { |
245 | + // ids is set when the Instances method is called. |
246 | + ids []instance.Id |
247 | + results []instance.Instance |
248 | + err error |
249 | + counter int32 |
250 | +} |
251 | + |
252 | +func (i *testInstanceGetter) Instances(ids []instance.Id) (result []instance.Instance, err error) { |
253 | + i.ids = ids |
254 | + atomic.AddInt32(&i.counter, 1) |
255 | + return i.results, i.err |
256 | +} |
257 | + |
258 | +func newTestInstance(status string, addresses []string) *testInstance { |
259 | + thisInstance := testInstance{status: status} |
260 | + thisInstance.addresses = instance.NewAddresses(addresses) |
261 | + return &thisInstance |
262 | +} |
263 | + |
264 | +func (s *aggregateSuite) TestSingleRequest(c *gc.C) { |
265 | + testGetter := new(testInstanceGetter) |
266 | + instance1 := newTestInstance("foobar", []string{"127.0.0.1", "192.168.1.1"}) |
267 | + testGetter.results = []instance.Instance{instance1} |
268 | + aggregator := newAggregator(testGetter) |
269 | + |
270 | + info, err := aggregator.instanceInfo("foo") |
271 | + c.Assert(err, gc.IsNil) |
272 | + c.Assert(info, gc.DeepEquals, instanceInfo{ |
273 | + status: "foobar", |
274 | + addresses: instance1.addresses, |
275 | + }) |
276 | + c.Assert(testGetter.ids, gc.DeepEquals, []instance.Id{"foo"}) |
277 | +} |
278 | + |
279 | +func (s *aggregateSuite) TestMultipleResponseHandling(c *gc.C) { |
280 | + s.PatchValue(&gatherTime, 30*time.Millisecond) |
281 | + testGetter := new(testInstanceGetter) |
282 | + |
283 | + instance1 := newTestInstance("foobar", []string{"127.0.0.1", "192.168.1.1"}) |
284 | + testGetter.results = []instance.Instance{instance1} |
285 | + aggregator := newAggregator(testGetter) |
286 | + |
287 | + replyChan := make(chan instanceInfoReply) |
288 | + req := instanceInfoReq{ |
289 | + reply: replyChan, |
290 | + instId: instance.Id("foo"), |
291 | + } |
292 | + aggregator.reqc <- req |
293 | + reply := <-replyChan |
294 | + c.Assert(reply.err, gc.IsNil) |
295 | + |
296 | + instance2 := newTestInstance("not foobar", []string{"192.168.1.2"}) |
297 | + instance3 := newTestInstance("ok-ish", []string{"192.168.1.3"}) |
298 | + testGetter.results = []instance.Instance{instance2, instance3} |
299 | + |
300 | + var wg sync.WaitGroup |
301 | + checkInfo := func(id instance.Id, expectStatus string) { |
302 | + info, err := aggregator.instanceInfo(id) |
303 | + c.Check(err, gc.IsNil) |
304 | + c.Check(info.status, gc.Equals, expectStatus) |
305 | + wg.Done() |
306 | + } |
307 | + |
308 | + wg.Add(2) |
309 | + go checkInfo("foo2", "not foobar") |
310 | + go checkInfo("foo3", "ok-ish") |
311 | + wg.Wait() |
312 | + |
313 | + c.Assert(len(testGetter.ids), gc.DeepEquals, 2) |
314 | +} |
315 | + |
316 | +func (s *aggregateSuite) TestBatching(c *gc.C) { |
317 | + s.PatchValue(&gatherTime, 10*time.Millisecond) |
318 | + testGetter := new(testInstanceGetter) |
319 | + |
320 | + aggregator := newAggregator(testGetter) |
321 | + for i := 0; i < 100; i++ { |
322 | + testGetter.results = append(testGetter.results, newTestInstance("foobar", []string{"127.0.0.1", "192.168.1.1"})) |
323 | + } |
324 | + var wg sync.WaitGroup |
325 | + makeRequest := func() { |
326 | + _, err := aggregator.instanceInfo("foo") |
327 | + c.Check(err, gc.IsNil) |
328 | + wg.Done() |
329 | + } |
330 | + startTime := time.Now() |
331 | + wg.Add(100) |
332 | + for i := 0; i < 100; i++ { |
333 | + go makeRequest() |
334 | + time.Sleep(time.Millisecond) |
335 | + } |
336 | + wg.Wait() |
337 | + totalTime := time.Now().Sub(startTime) |
338 | + // +1 because we expect one extra call for the first request |
339 | + expectedMax := int((totalTime / (10 * time.Millisecond)) + 1) |
340 | + c.Assert(testGetter.counter, jc.LessThan, expectedMax+1) |
341 | + c.Assert(testGetter.counter, jc.GreaterThan, 10) |
342 | +} |
343 | + |
344 | +func (s *aggregateSuite) TestError(c *gc.C) { |
345 | + testGetter := new(testInstanceGetter) |
346 | + ourError := fmt.Errorf("Some error") |
347 | + testGetter.err = ourError |
348 | + |
349 | + aggregator := newAggregator(testGetter) |
350 | + |
351 | + _, err := aggregator.instanceInfo("foo") |
352 | + c.Assert(err, gc.Equals, ourError) |
353 | +} |
354 | + |
355 | +func (s *aggregateSuite) TestPartialErrResponse(c *gc.C) { |
356 | + testGetter := new(testInstanceGetter) |
357 | + testGetter.err = environs.ErrPartialInstances |
358 | + testGetter.results = []instance.Instance{nil} |
359 | + |
360 | + aggregator := newAggregator(testGetter) |
361 | + _, err := aggregator.instanceInfo("foo") |
362 | + |
363 | + c.Assert(err, gc.DeepEquals, errors.NotFoundf("instance foo")) |
364 | +} |
365 | + |
366 | +func (s *aggregateSuite) TestAddressesError(c *gc.C) { |
367 | + testGetter := new(testInstanceGetter) |
368 | + instance1 := newTestInstance("foobar", []string{"127.0.0.1", "192.168.1.1"}) |
369 | + ourError := fmt.Errorf("gotcha") |
370 | + instance1.err = ourError |
371 | + testGetter.results = []instance.Instance{instance1} |
372 | + |
373 | + aggregator := newAggregator(testGetter) |
374 | + _, err := aggregator.instanceInfo("foo") |
375 | + c.Assert(err, gc.Equals, ourError) |
376 | +} |
377 | + |
378 | +func (s *aggregateSuite) TestKillAndWait(c *gc.C) { |
379 | + testGetter := new(testInstanceGetter) |
380 | + aggregator := newAggregator(testGetter) |
381 | + aggregator.Kill() |
382 | + err := aggregator.Wait() |
383 | + c.Assert(err, gc.IsNil) |
384 | +} |
385 | |
386 | === modified file 'worker/instancepoller/worker.go' |
387 | --- worker/instancepoller/worker.go 2014-01-23 20:30:35 +0000 |
388 | +++ worker/instancepoller/worker.go 2014-03-14 10:52:20 +0000 |
389 | @@ -6,7 +6,6 @@ |
390 | import ( |
391 | "launchpad.net/tomb" |
392 | |
393 | - "launchpad.net/juju-core/instance" |
394 | "launchpad.net/juju-core/state" |
395 | "launchpad.net/juju-core/worker" |
396 | ) |
397 | @@ -14,6 +13,7 @@ |
398 | type updaterWorker struct { |
399 | st *state.State |
400 | tomb tomb.Tomb |
401 | + *aggregator |
402 | |
403 | observer *environObserver |
404 | } |
405 | @@ -46,6 +46,7 @@ |
406 | if err != nil { |
407 | return err |
408 | } |
409 | + u.aggregator = newAggregator(u.observer.environ) |
410 | logger.Infof("instance poller received inital environment configuration") |
411 | defer func() { |
412 | obsErr := worker.Stop(u.observer) |
413 | @@ -71,19 +72,3 @@ |
414 | func (u *updaterWorker) killAll(err error) { |
415 | u.tomb.Kill(err) |
416 | } |
417 | - |
418 | -func (u *updaterWorker) instanceInfo(id instance.Id) (instanceInfo, error) { |
419 | - env := u.observer.Environ() |
420 | - insts, err := env.Instances([]instance.Id{id}) |
421 | - if err != nil { |
422 | - return instanceInfo{}, err |
423 | - } |
424 | - addr, err := insts[0].Addresses() |
425 | - if err != nil { |
426 | - return instanceInfo{}, err |
427 | - } |
428 | - return instanceInfo{ |
429 | - addr, |
430 | - insts[0].Status(), |
431 | - }, nil |
432 | -} |
433 | |
434 | === modified file 'worker/instancepoller/worker_test.go' |
435 | --- worker/instancepoller/worker_test.go 2014-03-13 07:54:56 +0000 |
436 | +++ worker/instancepoller/worker_test.go 2014-03-14 10:52:20 +0000 |
437 | @@ -42,6 +42,7 @@ |
438 | // correctly. |
439 | s.PatchValue(&ShortPoll, 10*time.Millisecond) |
440 | s.PatchValue(&LongPoll, 10*time.Millisecond) |
441 | + s.PatchValue(&gatherTime, 10*time.Millisecond) |
442 | machines, insts := s.setupScenario(c) |
443 | s.State.StartSync() |
444 | w := NewWorker(s.State) |
Reviewers: mp+209966_ code.launchpad. net,
Message:
Please take a look.
Description: instancepoller: batch instancepoller
worker/
use ratelimit Token Bucket to batch
requests in the instance poller.
Mitigates launchpad issue 1277397
https:/ /code.launchpad .net/~mfoord/ juju-core/ instancepoller- aggregate/ +merge/ 209966
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/74900044/
Affected files (+324, -18 lines): instancepoller/ aggregate. go instancepoller/ aggregate_ test.go instancepoller/ worker. go instancepoller/ worker_ test.go
A [revision details]
M dependencies.tsv
A worker/
A worker/
M worker/
M worker/