Merge lp:~pedronis/ubuntu-push/cancel-cancel-cancel into lp:ubuntu-push/automatic
- cancel-cancel-cancel
- Merge into automatic
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 369 |
Merged at revision: | 366 |
Proposed branch: | lp:~pedronis/ubuntu-push/cancel-cancel-cancel |
Merge into: | lp:ubuntu-push/automatic |
Diff against target: |
846 lines (+235/-101) 11 files modified
bus/connectivity/connectivity.go (+76/-23) bus/connectivity/connectivity_test.go (+26/-23) bus/endpoint.go (+10/-5) bus/networkmanager/networkmanager.go (+10/-10) bus/networkmanager/networkmanager_test.go (+23/-15) bus/notifications/raw.go (+1/-1) bus/testing/testing_endpoint.go (+47/-7) bus/testing/testing_endpoint_test.go (+28/-9) client/client.go (+4/-2) client/client_test.go (+1/-6) testing/helpers.go (+9/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/cancel-cancel-cancel |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email: mp+251240@code.launchpad.net |
Commit message
WatchSignal cancelling, and connectivity exposed cancelling, make connectivity start not leave watches behind, fixes deadlock XXX in test
Description of the change
WatchSignal cancelling, and connectivity exposed cancelling,
make connectivity start not leave watches behind, fixes deadlock XXX in test;
don't leave goroutines behind in touched bus packages
helper to dump goroutines to check whether tests leave them around
To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'bus/connectivity/connectivity.go' |
2 | --- bus/connectivity/connectivity.go 2015-02-24 14:10:05 +0000 |
3 | +++ bus/connectivity/connectivity.go 2015-02-27 10:54:31 +0000 |
4 | @@ -24,12 +24,14 @@ |
5 | |
6 | import ( |
7 | "errors" |
8 | + "sync" |
9 | + "time" |
10 | + |
11 | "launchpad.net/ubuntu-push/bus" |
12 | "launchpad.net/ubuntu-push/bus/networkmanager" |
13 | "launchpad.net/ubuntu-push/config" |
14 | "launchpad.net/ubuntu-push/logger" |
15 | "launchpad.net/ubuntu-push/util" |
16 | - "time" |
17 | ) |
18 | |
19 | // The configuration for ConnectedState, intended to be populated from a config file. |
20 | @@ -45,7 +47,8 @@ |
21 | ConnectivityCheckMD5 string `json:"connectivity_check_md5"` |
22 | } |
23 | |
24 | -type connectedState struct { |
25 | +// ConnectedState helps tracking connectivity. |
26 | +type ConnectedState struct { |
27 | networkStateCh <-chan networkmanager.State |
28 | networkConCh <-chan string |
29 | config ConnectivityConfig |
30 | @@ -57,11 +60,43 @@ |
31 | currentState networkmanager.State |
32 | lastSent bool |
33 | timer *time.Timer |
34 | + doneLck sync.Mutex |
35 | + done chan struct{} |
36 | + canceled bool |
37 | + stateWatch bus.Cancellable |
38 | + conWatch bus.Cancellable |
39 | +} |
40 | + |
41 | +// New makes a ConnectedState for connectivity tracking. |
42 | +// |
43 | +// The endpoint need not be dialed; Track() will Dial() and |
44 | +// Close() it as it sees fit. |
45 | +func New(endp bus.Endpoint, config ConnectivityConfig, log logger.Logger) *ConnectedState { |
46 | + wg := NewWebchecker(config.ConnectivityCheckURL, config.ConnectivityCheckMD5, 10*time.Second, log) |
47 | + return &ConnectedState{ |
48 | + config: config, |
49 | + log: log, |
50 | + endp: endp, |
51 | + webget: wg.Webcheck, |
52 | + done: make(chan struct{}), |
53 | + } |
54 | +} |
55 | + |
56 | +// cancel watches if any |
57 | +func (cs *ConnectedState) reset() { |
58 | + if cs.stateWatch != nil { |
59 | + cs.stateWatch.Cancel() |
60 | + cs.stateWatch = nil |
61 | + } |
62 | + if cs.conWatch != nil { |
63 | + cs.conWatch.Cancel() |
64 | + cs.conWatch = nil |
65 | + } |
66 | } |
67 | |
68 | // start connects to the bus, gets the initial NetworkManager state, and sets |
69 | // up the watch. |
70 | -func (cs *connectedState) start() networkmanager.State { |
71 | +func (cs *ConnectedState) start() networkmanager.State { |
72 | var initial networkmanager.State |
73 | var stateCh <-chan networkmanager.State |
74 | var primary string |
75 | @@ -72,8 +107,9 @@ |
76 | cs.connAttempts += ar.Redial() |
77 | nm := networkmanager.New(cs.endp, cs.log) |
78 | |
79 | + cs.reset() |
80 | // set up the watch |
81 | - stateCh, err = nm.WatchState() |
82 | + stateCh, cs.stateWatch, err = nm.WatchState() |
83 | if err != nil { |
84 | cs.log.Debugf("failed to set up the state watch: %s", err) |
85 | goto Continue |
86 | @@ -87,7 +123,7 @@ |
87 | } |
88 | cs.log.Debugf("got initial state of %s", initial) |
89 | |
90 | - conCh, err = nm.WatchPrimaryConnection() |
91 | + conCh, cs.conWatch, err = nm.WatchPrimaryConnection() |
92 | if err != nil { |
93 | cs.log.Debugf("failed to set up the connection watch: %s", err) |
94 | goto Continue |
95 | @@ -107,9 +143,11 @@ |
96 | } |
97 | } |
98 | |
99 | -// connectedStateStep takes one step forwards in the “am I connected?” |
100 | +var errCanceled = errors.New("canceled") |
101 | + |
102 | +// step takes one step forwards in the “am I connected?” |
103 | // answering state machine. |
104 | -func (cs *connectedState) connectedStateStep() (bool, error) { |
105 | +func (cs *ConnectedState) step() (bool, error) { |
106 | stabilizingTimeout := cs.config.StabilizingTimeout.Duration |
107 | recheckTimeout := cs.config.RecheckTimeout.Duration |
108 | log := cs.log |
109 | @@ -117,6 +155,8 @@ |
110 | Loop: |
111 | for { |
112 | select { |
113 | + case <-cs.done: |
114 | + return false, errCanceled |
115 | case <-cs.networkConCh: |
116 | cs.webgetCh = nil |
117 | cs.timer.Reset(stabilizingTimeout) |
118 | @@ -178,35 +218,48 @@ |
119 | return cs.lastSent, nil |
120 | } |
121 | |
122 | -// ConnectedState sends the initial NetworkManager state and changes to it |
123 | +// Track sends the initial NetworkManager state and changes to it |
124 | // over the "out" channel. Sends "false" as soon as it detects trouble, "true" |
125 | // after checking actual connectivity. |
126 | // |
127 | -// The endpoint need not be dialed; connectivity will Dial() and Close() |
128 | -// it as it sees fit. |
129 | -func ConnectedState(endp bus.Endpoint, config ConnectivityConfig, log logger.Logger, out chan<- bool) { |
130 | - wg := NewWebchecker(config.ConnectivityCheckURL, config.ConnectivityCheckMD5, 10*time.Second, log) |
131 | - cs := &connectedState{ |
132 | - config: config, |
133 | - log: log, |
134 | - endp: endp, |
135 | - webget: wg.Webcheck, |
136 | - } |
137 | +func (cs *ConnectedState) Track(out chan<- bool) { |
138 | |
139 | Start: |
140 | - log.Debugf("sending initial 'disconnected'.") |
141 | - out <- false |
142 | + cs.log.Debugf("sending initial 'disconnected'.") |
143 | + select { |
144 | + case <-cs.done: |
145 | + return |
146 | + case out <- false: |
147 | + } |
148 | cs.lastSent = false |
149 | cs.currentState = cs.start() |
150 | + defer cs.reset() |
151 | cs.timer = time.NewTimer(cs.config.StabilizingTimeout.Duration) |
152 | |
153 | for { |
154 | - v, err := cs.connectedStateStep() |
155 | + v, err := cs.step() |
156 | + if err == errCanceled { |
157 | + return |
158 | + } |
159 | if err != nil { |
160 | // tear it all down and start over |
161 | - log.Errorf("%s", err) |
162 | + cs.log.Errorf("%s", err) |
163 | goto Start |
164 | } |
165 | - out <- v |
166 | + select { |
167 | + case <-cs.done: |
168 | + return |
169 | + case out <- v: |
170 | + } |
171 | + } |
172 | +} |
173 | + |
174 | +// Cancel stops the ConnectedState machinary. |
175 | +func (cs *ConnectedState) Cancel() { |
176 | + cs.doneLck.Lock() |
177 | + defer cs.doneLck.Unlock() |
178 | + if !cs.canceled { |
179 | + cs.canceled = true |
180 | + close(cs.done) |
181 | } |
182 | } |
183 | |
184 | === modified file 'bus/connectivity/connectivity_test.go' |
185 | --- bus/connectivity/connectivity_test.go 2015-02-24 14:10:05 +0000 |
186 | +++ bus/connectivity/connectivity_test.go 2015-02-27 10:54:31 +0000 |
187 | @@ -64,13 +64,13 @@ |
188 | ) |
189 | |
190 | /* |
191 | - tests for connectedState's Start() method |
192 | + tests for ConnectedState's Start() method |
193 | */ |
194 | |
195 | // when given a working config and bus, Start() will work |
196 | func (s *ConnSuite) TestStartWorks(c *C) { |
197 | endp := testingbus.NewTestingEndpoint(condition.Work(true), condition.Work(true), uint32(networkmanager.Connecting), helloCon) |
198 | - cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
199 | + cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
200 | |
201 | nopTicker := make(chan []interface{}) |
202 | testingbus.SetWatchSource(endp, "StateChanged", nopTicker) |
203 | @@ -83,7 +83,7 @@ |
204 | // if the bus fails a couple of times, we're still OK |
205 | func (s *ConnSuite) TestStartRetriesConnect(c *C) { |
206 | endp := testingbus.NewTestingEndpoint(condition.Fail2Work(2), condition.Work(true), uint32(networkmanager.Connecting), helloCon) |
207 | - cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
208 | + cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
209 | |
210 | nopTicker := make(chan []interface{}) |
211 | testingbus.SetWatchSource(endp, "StateChanged", nopTicker) |
212 | @@ -97,7 +97,7 @@ |
213 | // when the calls to NetworkManager fails for a bit, we're still OK |
214 | func (s *ConnSuite) TestStartRetriesCall(c *C) { |
215 | endp := testingbus.NewTestingEndpoint(condition.Work(true), condition.Fail2Work(5), uint32(networkmanager.Connecting), helloCon) |
216 | - cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
217 | + cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
218 | |
219 | nopTicker := make(chan []interface{}) |
220 | testingbus.SetWatchSource(endp, "StateChanged", nopTicker) |
221 | @@ -111,14 +111,14 @@ |
222 | |
223 | // when some of the calls to NetworkManager fails for a bit, we're still OK |
224 | func (s *ConnSuite) TestStartRetriesCall2(c *C) { |
225 | - cond := condition.Chain(3, condition.Work(true), 1, condition.Work(false), |
226 | + cond := condition.Chain(1, condition.Work(true), 1, condition.Work(false), |
227 | 1, condition.Work(true)) |
228 | |
229 | endp := testingbus.NewTestingEndpoint(condition.Work(true), cond, |
230 | uint32(networkmanager.Connecting), helloCon, |
231 | uint32(networkmanager.Connecting), helloCon, |
232 | ) |
233 | - cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
234 | + cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
235 | |
236 | nopTicker := make(chan []interface{}) |
237 | testingbus.SetWatchSource(endp, "StateChanged", nopTicker) |
238 | @@ -141,7 +141,7 @@ |
239 | uint32(networkmanager.Connecting), |
240 | helloCon, |
241 | ) |
242 | - cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
243 | + cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp} |
244 | watchTicker := make(chan []interface{}, 1) |
245 | nopTicker := make(chan []interface{}) |
246 | testingbus.SetWatchSource(endp, "StateChanged", watchTicker) |
247 | @@ -151,7 +151,6 @@ |
248 | |
249 | c.Check(cs.start(), Equals, networkmanager.Connecting) |
250 | c.Check(cs.connAttempts, Equals, uint32(2)) |
251 | - // XXX this may be stolen by an old watch => dead lock |
252 | watchTicker <- []interface{}{uint32(networkmanager.ConnectedGlobal)} |
253 | c.Check(<-cs.networkStateCh, Equals, networkmanager.ConnectedGlobal) |
254 | } |
255 | @@ -181,7 +180,7 @@ |
256 | } |
257 | } |
258 | |
259 | -func (rep *racyEndpoint) WatchSignal(member string, f func(...interface{}), d func()) error { |
260 | +func (rep *racyEndpoint) WatchSignal(member string, f func(...interface{}), d func()) (bus.Cancellable, error) { |
261 | if member == "StateChanged" { |
262 | // we count never having gotten the state as happening "after" now. |
263 | rep.lock.RLock() |
264 | @@ -194,7 +193,7 @@ |
265 | d() |
266 | }() |
267 | } |
268 | - return nil |
269 | + return nil, nil |
270 | } |
271 | |
272 | func (*racyEndpoint) Close() {} |
273 | @@ -223,7 +222,7 @@ |
274 | func (s *ConnSuite) TestStartAvoidsRace(c *C) { |
275 | for delta := time.Second; delta > 1; delta /= 2 { |
276 | rep := &racyEndpoint{delta: delta} |
277 | - cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: rep} |
278 | + cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: rep} |
279 | f := Commentf("when delta=%s", delta) |
280 | c.Assert(cs.start(), Equals, networkmanager.Connecting, f) |
281 | c.Assert(takeNext(cs.networkStateCh), Equals, networkmanager.ConnectedGlobal, f) |
282 | @@ -231,7 +230,7 @@ |
283 | } |
284 | |
285 | /* |
286 | - tests for connectedStateStep() |
287 | + tests for step() |
288 | */ |
289 | |
290 | func (s *ConnSuite) TestSteps(c *C) { |
291 | @@ -242,7 +241,7 @@ |
292 | RecheckTimeout: config.ConfigTimeDuration{recheck_timeout}, |
293 | } |
294 | ch := make(chan networkmanager.State, 10) |
295 | - cs := &connectedState{ |
296 | + cs := &ConnectedState{ |
297 | config: cfg, |
298 | networkStateCh: ch, |
299 | timer: time.NewTimer(time.Second), |
300 | @@ -251,15 +250,15 @@ |
301 | lastSent: false, |
302 | } |
303 | ch <- networkmanager.ConnectedGlobal |
304 | - f, e := cs.connectedStateStep() |
305 | + f, e := cs.step() |
306 | c.Check(e, IsNil) |
307 | c.Check(f, Equals, true) |
308 | ch <- networkmanager.Disconnected |
309 | ch <- networkmanager.ConnectedGlobal |
310 | - f, e = cs.connectedStateStep() |
311 | + f, e = cs.step() |
312 | c.Check(e, IsNil) |
313 | c.Check(f, Equals, false) |
314 | - f, e = cs.connectedStateStep() |
315 | + f, e = cs.step() |
316 | c.Check(e, IsNil) |
317 | c.Check(f, Equals, true) |
318 | |
319 | @@ -267,7 +266,7 @@ |
320 | webget_p = condition.Fail2Work(1) |
321 | ch <- networkmanager.Disconnected |
322 | ch <- networkmanager.ConnectedGlobal |
323 | - f, e = cs.connectedStateStep() |
324 | + f, e = cs.step() |
325 | c.Check(e, IsNil) |
326 | c.Check(f, Equals, false) // first false is from the Disconnected |
327 | |
328 | @@ -276,7 +275,7 @@ |
329 | _t := time.NewTimer(recheck_timeout / 2) |
330 | |
331 | go func() { |
332 | - f, e := cs.connectedStateStep() |
333 | + f, e := cs.step() |
334 | c.Check(e, IsNil) |
335 | _ch <- f |
336 | }() |
337 | @@ -294,15 +293,15 @@ |
338 | ch <- networkmanager.Disconnected // this should not |
339 | ch <- networkmanager.ConnectedGlobal // this should trigger a 'true' |
340 | |
341 | - f, e = cs.connectedStateStep() |
342 | + f, e = cs.step() |
343 | c.Check(e, IsNil) |
344 | c.Check(f, Equals, false) |
345 | - f, e = cs.connectedStateStep() |
346 | + f, e = cs.step() |
347 | c.Check(e, IsNil) |
348 | c.Check(f, Equals, true) |
349 | |
350 | close(ch) // this should make it error out |
351 | - _, e = cs.connectedStateStep() |
352 | + _, e = cs.step() |
353 | c.Check(e, NotNil) |
354 | } |
355 | |
356 | @@ -336,7 +335,9 @@ |
357 | out := make(chan bool) |
358 | dt := time.Second / 10 |
359 | timer := time.NewTimer(dt) |
360 | - go ConnectedState(endp, cfg, s.log, out) |
361 | + cs := New(endp, cfg, s.log) |
362 | + defer cs.Cancel() |
363 | + go cs.Track(out) |
364 | var v bool |
365 | expecteds := []struct { |
366 | p bool |
367 | @@ -399,7 +400,9 @@ |
368 | out := make(chan bool) |
369 | dt := time.Second / 10 |
370 | timer := time.NewTimer(dt) |
371 | - go ConnectedState(endp, cfg, s.log, out) |
372 | + cs := New(endp, cfg, s.log) |
373 | + defer cs.Cancel() |
374 | + go cs.Track(out) |
375 | var v bool |
376 | expecteds := []struct { |
377 | p bool |
378 | |
379 | === modified file 'bus/endpoint.go' |
380 | --- bus/endpoint.go 2015-01-22 09:52:07 +0000 |
381 | +++ bus/endpoint.go 2015-02-27 10:54:31 +0000 |
382 | @@ -35,10 +35,15 @@ |
383 | type BusMethod func(string, []interface{}, []interface{}) ([]interface{}, error) |
384 | type DispatchMap map[string]BusMethod |
385 | |
386 | +// Cancellable can be canceled. |
387 | +type Cancellable interface { |
388 | + Cancel() error |
389 | +} |
390 | + |
391 | // bus.Endpoint represents the DBus connection itself. |
392 | type Endpoint interface { |
393 | GrabName(allowReplacement bool) <-chan error |
394 | - WatchSignal(member string, f func(...interface{}), d func()) error |
395 | + WatchSignal(member string, f func(...interface{}), d func()) (Cancellable, error) |
396 | WatchMethod(DispatchMap, string, ...interface{}) |
397 | Signal(string, string, []interface{}) error |
398 | Call(member string, args []interface{}, rvs ...interface{}) error |
399 | @@ -123,16 +128,16 @@ |
400 | // sends the values over a channel, and d() would close the channel. |
401 | // |
402 | // XXX: untested |
403 | -func (endp *endpoint) WatchSignal(member string, f func(...interface{}), d func()) error { |
404 | +func (endp *endpoint) WatchSignal(member string, f func(...interface{}), d func()) (Cancellable, error) { |
405 | watch, err := endp.proxy.WatchSignal(endp.addr.Interface, member) |
406 | if err != nil { |
407 | endp.log.Debugf("failed to set up the watch: %s", err) |
408 | - return err |
409 | + return nil, err |
410 | } |
411 | |
412 | go endp.unpackMessages(watch, f, d, member) |
413 | |
414 | - return nil |
415 | + return watch, nil |
416 | } |
417 | |
418 | // Call() invokes the provided member method (on the name, path and |
419 | @@ -324,6 +329,6 @@ |
420 | } |
421 | f(endp.unpackOneMsg(msg, member)...) |
422 | } |
423 | - endp.log.Errorf("got not-OK from %s watch", member) |
424 | + endp.log.Debugf("got not-OK from %s watch", member) |
425 | d() |
426 | } |
427 | |
428 | === modified file 'bus/networkmanager/networkmanager.go' |
429 | --- bus/networkmanager/networkmanager.go 2015-01-22 09:52:07 +0000 |
430 | +++ bus/networkmanager/networkmanager.go 2015-02-27 10:54:31 +0000 |
431 | @@ -42,13 +42,13 @@ |
432 | GetState() State |
433 | // WatchState listens for changes to NetworkManager's state, and sends |
434 | // them out over the channel returned. |
435 | - WatchState() (<-chan State, error) |
436 | + WatchState() (<-chan State, bus.Cancellable, error) |
437 | // GetPrimaryConnection fetches and returns NetworkManager's current |
438 | // primary connection. |
439 | GetPrimaryConnection() string |
440 | // WatchPrimaryConnection listens for changes of NetworkManager's |
441 | // Primary Connection, and sends it out over the channel returned. |
442 | - WatchPrimaryConnection() (<-chan string, error) |
443 | + WatchPrimaryConnection() (<-chan string, bus.Cancellable, error) |
444 | } |
445 | |
446 | type networkManager struct { |
447 | @@ -85,9 +85,9 @@ |
448 | return State(v) |
449 | } |
450 | |
451 | -func (nm *networkManager) WatchState() (<-chan State, error) { |
452 | +func (nm *networkManager) WatchState() (<-chan State, bus.Cancellable, error) { |
453 | ch := make(chan State) |
454 | - err := nm.bus.WatchSignal("StateChanged", |
455 | + w, err := nm.bus.WatchSignal("StateChanged", |
456 | func(ns ...interface{}) { |
457 | stint, ok := ns[0].(uint32) |
458 | if !ok { |
459 | @@ -101,10 +101,10 @@ |
460 | func() { close(ch) }) |
461 | if err != nil { |
462 | nm.log.Debugf("Failed to set up the watch: %s", err) |
463 | - return nil, err |
464 | + return nil, nil, err |
465 | } |
466 | |
467 | - return ch, nil |
468 | + return ch, w, nil |
469 | } |
470 | |
471 | func (nm *networkManager) GetPrimaryConnection() string { |
472 | @@ -124,9 +124,9 @@ |
473 | return string(v) |
474 | } |
475 | |
476 | -func (nm *networkManager) WatchPrimaryConnection() (<-chan string, error) { |
477 | +func (nm *networkManager) WatchPrimaryConnection() (<-chan string, bus.Cancellable, error) { |
478 | ch := make(chan string) |
479 | - err := nm.bus.WatchSignal("PropertiesChanged", |
480 | + w, err := nm.bus.WatchSignal("PropertiesChanged", |
481 | func(ppsi ...interface{}) { |
482 | pps, ok := ppsi[0].(map[string]dbus.Variant) |
483 | if !ok { |
484 | @@ -147,8 +147,8 @@ |
485 | }, func() { close(ch) }) |
486 | if err != nil { |
487 | nm.log.Debugf("failed to set up the watch: %s", err) |
488 | - return nil, err |
489 | + return nil, nil, err |
490 | } |
491 | |
492 | - return ch, nil |
493 | + return ch, w, nil |
494 | } |
495 | |
496 | === modified file 'bus/networkmanager/networkmanager_test.go' |
497 | --- bus/networkmanager/networkmanager_test.go 2014-04-04 12:01:42 +0000 |
498 | +++ bus/networkmanager/networkmanager_test.go 2015-02-27 10:54:31 +0000 |
499 | @@ -90,8 +90,9 @@ |
500 | func (s *NMSuite) TestWatchState(c *C) { |
501 | tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), uint32(Unknown), uint32(Asleep), uint32(ConnectedGlobal)) |
502 | nm := New(tc, s.log) |
503 | - ch, err := nm.WatchState() |
504 | - c.Check(err, IsNil) |
505 | + ch, w, err := nm.WatchState() |
506 | + c.Assert(err, IsNil) |
507 | + defer w.Cancel() |
508 | l := []State{<-ch, <-ch, <-ch} |
509 | c.Check(l, DeepEquals, []State{Unknown, Asleep, ConnectedGlobal}) |
510 | } |
511 | @@ -99,7 +100,7 @@ |
512 | // WatchState returns on error if the dbus call fails |
513 | func (s *NMSuite) TestWatchStateFails(c *C) { |
514 | nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false)), s.log) |
515 | - _, err := nm.WatchState() |
516 | + _, _, err := nm.WatchState() |
517 | c.Check(err, NotNil) |
518 | } |
519 | |
520 | @@ -107,8 +108,9 @@ |
521 | func (s *NMSuite) TestWatchStateClosesOnWatchBail(c *C) { |
522 | tc := testingbus.NewTestingEndpoint(nil, condition.Work(true)) |
523 | nm := New(tc, s.log) |
524 | - ch, err := nm.WatchState() |
525 | - c.Check(err, IsNil) |
526 | + ch, w, err := nm.WatchState() |
527 | + c.Assert(err, IsNil) |
528 | + defer w.Cancel() |
529 | _, ok := <-ch |
530 | c.Check(ok, Equals, false) |
531 | } |
532 | @@ -117,8 +119,9 @@ |
533 | func (s *NMSuite) TestWatchStateSurvivesRubbishValues(c *C) { |
534 | tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), "a") |
535 | nm := New(tc, s.log) |
536 | - ch, err := nm.WatchState() |
537 | - c.Check(err, IsNil) |
538 | + ch, w, err := nm.WatchState() |
539 | + c.Assert(err, IsNil) |
540 | + defer w.Cancel() |
541 | _, ok := <-ch |
542 | c.Check(ok, Equals, false) |
543 | } |
544 | @@ -164,8 +167,9 @@ |
545 | mkPriConMap("/b/2"), |
546 | mkPriConMap("/c/3")) |
547 | nm := New(tc, s.log) |
548 | - ch, err := nm.WatchPrimaryConnection() |
549 | - c.Check(err, IsNil) |
550 | + ch, w, err := nm.WatchPrimaryConnection() |
551 | + c.Assert(err, IsNil) |
552 | + defer w.Cancel() |
553 | l := []string{<-ch, <-ch, <-ch} |
554 | c.Check(l, DeepEquals, []string{"/a/1", "/b/2", "/c/3"}) |
555 | } |
556 | @@ -173,7 +177,7 @@ |
557 | // WatchPrimaryConnection returns on error if the dbus call fails |
558 | func (s *NMSuite) TestWatchPrimaryConnectionFails(c *C) { |
559 | nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false)), s.log) |
560 | - _, err := nm.WatchPrimaryConnection() |
561 | + _, _, err := nm.WatchPrimaryConnection() |
562 | c.Check(err, NotNil) |
563 | } |
564 | |
565 | @@ -181,8 +185,9 @@ |
566 | func (s *NMSuite) TestWatchPrimaryConnectionClosesOnWatchBail(c *C) { |
567 | tc := testingbus.NewTestingEndpoint(nil, condition.Work(true)) |
568 | nm := New(tc, s.log) |
569 | - ch, err := nm.WatchPrimaryConnection() |
570 | - c.Check(err, IsNil) |
571 | + ch, w, err := nm.WatchPrimaryConnection() |
572 | + c.Assert(err, IsNil) |
573 | + defer w.Cancel() |
574 | _, ok := <-ch |
575 | c.Check(ok, Equals, false) |
576 | } |
577 | @@ -191,8 +196,9 @@ |
578 | func (s *NMSuite) TestWatchPrimaryConnectionSurvivesRubbishValues(c *C) { |
579 | tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), "a") |
580 | nm := New(tc, s.log) |
581 | - ch, err := nm.WatchPrimaryConnection() |
582 | + ch, w, err := nm.WatchPrimaryConnection() |
583 | c.Assert(err, IsNil) |
584 | + defer w.Cancel() |
585 | _, ok := <-ch |
586 | c.Check(ok, Equals, false) |
587 | } |
588 | @@ -204,8 +210,9 @@ |
589 | map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("42")}}, |
590 | ) |
591 | nm := New(tc, s.log) |
592 | - ch, err := nm.WatchPrimaryConnection() |
593 | + ch, w, err := nm.WatchPrimaryConnection() |
594 | c.Assert(err, IsNil) |
595 | + defer w.Cancel() |
596 | v, ok := <-ch |
597 | c.Check(ok, Equals, true) |
598 | c.Check(v, Equals, "42") |
599 | @@ -218,8 +225,9 @@ |
600 | map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("42")}}, |
601 | ) |
602 | nm := New(tc, s.log) |
603 | - ch, err := nm.WatchPrimaryConnection() |
604 | + ch, w, err := nm.WatchPrimaryConnection() |
605 | c.Assert(err, IsNil) |
606 | + defer w.Cancel() |
607 | v, ok := <-ch |
608 | c.Check(ok, Equals, true) |
609 | c.Check(v, Equals, "42") |
610 | |
611 | === modified file 'bus/notifications/raw.go' |
612 | --- bus/notifications/raw.go 2015-01-22 09:52:07 +0000 |
613 | +++ bus/notifications/raw.go 2015-02-27 10:54:31 +0000 |
614 | @@ -93,7 +93,7 @@ |
615 | // and sends them over the channel provided |
616 | func (raw *RawNotifications) WatchActions() (<-chan *RawAction, error) { |
617 | ch := make(chan *RawAction) |
618 | - err := raw.bus.WatchSignal("ActionInvoked", |
619 | + _, err := raw.bus.WatchSignal("ActionInvoked", |
620 | func(ns ...interface{}) { |
621 | if len(ns) != 2 { |
622 | raw.log.Debugf("ActionInvoked delivered %d things instead of 2", len(ns)) |
623 | |
624 | === modified file 'bus/testing/testing_endpoint.go' |
625 | --- bus/testing/testing_endpoint.go 2015-02-24 14:10:05 +0000 |
626 | +++ bus/testing/testing_endpoint.go 2015-02-27 10:54:31 +0000 |
627 | @@ -80,11 +80,32 @@ |
628 | return tc.(*testingEndpoint).callArgs |
629 | } |
630 | |
631 | +type watchCancel struct { |
632 | + done chan struct{} |
633 | + cancelled chan struct{} |
634 | + lck sync.Mutex |
635 | + member string |
636 | +} |
637 | + |
638 | +// this waits for actual cancelllation for test convenience |
639 | +func (wc *watchCancel) Cancel() error { |
640 | + wc.lck.Lock() |
641 | + defer wc.lck.Unlock() |
642 | + if wc.cancelled != nil { |
643 | + close(wc.cancelled) |
644 | + wc.cancelled = nil |
645 | + <-wc.done |
646 | + } |
647 | + return nil |
648 | +} |
649 | + |
650 | // See Endpoint's WatchSignal. This WatchSignal will check its condition to |
651 | // decide whether to return an error, or provide each of its return values |
652 | // or values from the previously set watchSource for member. |
653 | -func (tc *testingEndpoint) WatchSignal(member string, f func(...interface{}), d func()) error { |
654 | +func (tc *testingEndpoint) WatchSignal(member string, f func(...interface{}), d func()) (bus.Cancellable, error) { |
655 | if tc.callCond.OK() { |
656 | + cancelled := make(chan struct{}) |
657 | + done := make(chan struct{}) |
658 | go func() { |
659 | tc.watchLck.RLock() |
660 | source := tc.watchSources[member] |
661 | @@ -96,21 +117,40 @@ |
662 | tc.usedLck.Unlock() |
663 | source = make(chan []interface{}) |
664 | go func() { |
665 | + Feed: |
666 | for _, v := range tc.retvals[idx:] { |
667 | - source <- v |
668 | - time.Sleep(10 * time.Millisecond) |
669 | + select { |
670 | + case source <- v: |
671 | + case <-cancelled: |
672 | + break Feed |
673 | + } |
674 | + select { |
675 | + case <-time.After(10 * time.Millisecond): |
676 | + case <-cancelled: |
677 | + break Feed |
678 | + } |
679 | } |
680 | close(source) |
681 | }() |
682 | } |
683 | - for v := range source { |
684 | - f(v...) |
685 | + Receive: |
686 | + for { |
687 | + select { |
688 | + case v, ok := <-source: |
689 | + if !ok { |
690 | + break Receive |
691 | + } |
692 | + f(v...) |
693 | + case <-cancelled: |
694 | + break Receive |
695 | + } |
696 | } |
697 | d() |
698 | + close(done) |
699 | }() |
700 | - return nil |
701 | + return &watchCancel{cancelled: cancelled, done: done, member: member}, nil |
702 | } else { |
703 | - return errors.New("no way") |
704 | + return nil, errors.New("no way") |
705 | } |
706 | } |
707 | |
708 | |
709 | === modified file 'bus/testing/testing_endpoint_test.go' |
710 | --- bus/testing/testing_endpoint_test.go 2015-02-20 17:40:57 +0000 |
711 | +++ bus/testing/testing_endpoint_test.go 2015-02-27 10:54:31 +0000 |
712 | @@ -17,11 +17,13 @@ |
713 | package testing |
714 | |
715 | import ( |
716 | + "testing" |
717 | + "time" |
718 | + |
719 | . "launchpad.net/gocheck" |
720 | + |
721 | "launchpad.net/ubuntu-push/bus" |
722 | "launchpad.net/ubuntu-push/testing/condition" |
723 | - "testing" |
724 | - "time" |
725 | ) |
726 | |
727 | // hook up gocheck |
728 | @@ -100,8 +102,9 @@ |
729 | var m, n uint32 = 42, 17 |
730 | endp := NewTestingEndpoint(nil, condition.Work(true), m, n) |
731 | ch := make(chan uint32) |
732 | - e := endp.WatchSignal("what", func(us ...interface{}) { ch <- us[0].(uint32) }, func() { close(ch) }) |
733 | - c.Check(e, IsNil) |
734 | + w, e := endp.WatchSignal("which", func(us ...interface{}) { ch <- us[0].(uint32) }, func() { close(ch) }) |
735 | + c.Assert(e, IsNil) |
736 | + defer w.Cancel() |
737 | c.Check(<-ch, Equals, m) |
738 | c.Check(<-ch, Equals, n) |
739 | } |
740 | @@ -110,8 +113,9 @@ |
741 | func (s *TestingEndpointSuite) TestWatchDestructor(c *C) { |
742 | endp := NewTestingEndpoint(nil, condition.Work(true)) |
743 | ch := make(chan uint32) |
744 | - e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) }) |
745 | - c.Check(e, IsNil) |
746 | + w, e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) }) |
747 | + c.Assert(e, IsNil) |
748 | + defer w.Cancel() |
749 | _, ok := <-ch |
750 | c.Check(ok, Equals, false) |
751 | } |
752 | @@ -130,8 +134,9 @@ |
753 | // Test that WatchSignal() with a negative condition returns an error. |
754 | func (s *TestingEndpointSuite) TestWatchFails(c *C) { |
755 | endp := NewTestingEndpoint(nil, condition.Work(false)) |
756 | - e := endp.WatchSignal("what", func(us ...interface{}) {}, func() {}) |
757 | + w, e := endp.WatchSignal("what", func(us ...interface{}) {}, func() {}) |
758 | c.Check(e, NotNil) |
759 | + c.Check(w, IsNil) |
760 | } |
761 | |
762 | // Test WatchSignal can use a watchSource instead of a timeout and retvals (if |
763 | @@ -146,8 +151,9 @@ |
764 | endp := NewTestingEndpoint(nil, condition.Work(true), 0, 0) |
765 | SetWatchSource(endp, "what", watchTicker) |
766 | ch := make(chan int) |
767 | - e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) }) |
768 | - c.Check(e, IsNil) |
769 | + w, e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) }) |
770 | + c.Assert(e, IsNil) |
771 | + defer w.Cancel() |
772 | |
773 | close(watchTicker) |
774 | // wait for the destructor to be called |
775 | @@ -161,6 +167,19 @@ |
776 | c.Assert(len(watchTicker), Equals, 0) |
777 | } |
778 | |
779 | +// Test that WatchSignal() calls the destructor callback when canceled. |
780 | +func (s *TestingEndpointSuite) TestWatchCancel(c *C) { |
781 | + endp := NewTestingEndpoint(nil, condition.Work(true)) |
782 | + ch := make(chan uint32) |
783 | + w, e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) }) |
784 | + c.Assert(e, IsNil) |
785 | + defer w.Cancel() |
786 | + SetWatchSource(endp, "what", make(chan []interface{})) |
787 | + w.Cancel() |
788 | + _, ok := <-ch |
789 | + c.Check(ok, Equals, false) |
790 | +} |
791 | + |
792 | // Tests that GetProperty() works |
793 | func (s *TestingEndpointSuite) TestGetProperty(c *C) { |
794 | var m uint32 = 42 |
795 | |
796 | === modified file 'client/client.go' |
797 | --- client/client.go 2015-01-26 21:00:27 +0000 |
798 | +++ client/client.go 2015-02-27 10:54:31 +0000 |
799 | @@ -280,8 +280,10 @@ |
800 | |
801 | // takeTheBus starts the connection(s) to D-Bus and sets up associated event channels |
802 | func (client *PushClient) takeTheBus() error { |
803 | - go connectivity.ConnectedState(client.connectivityEndp, |
804 | - client.config.ConnectivityConfig, client.log, client.connCh) |
805 | + fmt.Println("FOO") |
806 | + cs := connectivity.New(client.connectivityEndp, |
807 | + client.config.ConnectivityConfig, client.log) |
808 | + go cs.Track(client.connCh) |
809 | util.NewAutoRedialer(client.systemImageEndp).Redial() |
810 | sysimg := systemimage.New(client.systemImageEndp, client.log) |
811 | info, err := sysimg.Info() |
812 | |
813 | === modified file 'client/client_test.go' |
814 | --- client/client_test.go 2015-02-24 14:10:05 +0000 |
815 | +++ client/client_test.go 2015-02-27 10:54:31 +0000 |
816 | @@ -206,12 +206,7 @@ |
817 | } |
818 | |
819 | func (cs *clientSuite) TearDownTest(c *C) { |
820 | - //fmt.Println("GOROUTINE# ", runtime.NumGoroutine()) |
821 | - /* |
822 | - var x [16*1024]byte |
823 | - sz := runtime.Stack(x[:], true) |
824 | - fmt.Println(string(x[:sz])) |
825 | - */ |
826 | + //helpers.DumpGoroutines() |
827 | } |
828 | |
829 | type sqlientSuite struct{ clientSuite } |
830 | |
831 | === modified file 'testing/helpers.go' |
832 | --- testing/helpers.go 2015-01-22 09:52:07 +0000 |
833 | +++ testing/helpers.go 2015-02-27 10:54:31 +0000 |
834 | @@ -170,3 +170,12 @@ |
835 | } |
836 | return purl |
837 | } |
838 | + |
839 | +// DumpGoroutines dumps current goroutines. |
840 | +func DumpGoroutines() { |
841 | + var buf [64 * 1024]byte |
842 | + sz := runtime.Stack(buf[:], true) |
843 | + dump := string(buf[:sz]) |
844 | + fmt.Println(dump) |
845 | + fmt.Println("#goroutines#", strings.Count("\n"+dump, "\ngoroutine ")) |
846 | +} |