Merge lp:~pedronis/ubuntu-push/cancel-cancel-cancel into lp:ubuntu-push/automatic

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 369
Merged at revision: 366
Proposed branch: lp:~pedronis/ubuntu-push/cancel-cancel-cancel
Merge into: lp:ubuntu-push/automatic
Diff against target: 846 lines (+235/-101)
11 files modified
bus/connectivity/connectivity.go (+76/-23)
bus/connectivity/connectivity_test.go (+26/-23)
bus/endpoint.go (+10/-5)
bus/networkmanager/networkmanager.go (+10/-10)
bus/networkmanager/networkmanager_test.go (+23/-15)
bus/notifications/raw.go (+1/-1)
bus/testing/testing_endpoint.go (+47/-7)
bus/testing/testing_endpoint_test.go (+28/-9)
client/client.go (+4/-2)
client/client_test.go (+1/-6)
testing/helpers.go (+9/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/cancel-cancel-cancel
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+251240@code.launchpad.net

Commit message

WatchSignal cancelling, and connectivity exposed cancelling, make connectivity start not leave watches behind, fixes deadlock XXX in test

Description of the change

WatchSignal cancelling, and connectivity exposed cancelling,

make connectivity start not leave watches behind, fixes deadlock XXX in test;

don't leave goroutines behind in touched bus packages

helper to dump goroutines to check whether tests leave them around

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bus/connectivity/connectivity.go'
2--- bus/connectivity/connectivity.go 2015-02-24 14:10:05 +0000
3+++ bus/connectivity/connectivity.go 2015-02-27 10:54:31 +0000
4@@ -24,12 +24,14 @@
5
6 import (
7 "errors"
8+ "sync"
9+ "time"
10+
11 "launchpad.net/ubuntu-push/bus"
12 "launchpad.net/ubuntu-push/bus/networkmanager"
13 "launchpad.net/ubuntu-push/config"
14 "launchpad.net/ubuntu-push/logger"
15 "launchpad.net/ubuntu-push/util"
16- "time"
17 )
18
19 // The configuration for ConnectedState, intended to be populated from a config file.
20@@ -45,7 +47,8 @@
21 ConnectivityCheckMD5 string `json:"connectivity_check_md5"`
22 }
23
24-type connectedState struct {
25+// ConnectedState helps tracking connectivity.
26+type ConnectedState struct {
27 networkStateCh <-chan networkmanager.State
28 networkConCh <-chan string
29 config ConnectivityConfig
30@@ -57,11 +60,43 @@
31 currentState networkmanager.State
32 lastSent bool
33 timer *time.Timer
34+ doneLck sync.Mutex
35+ done chan struct{}
36+ canceled bool
37+ stateWatch bus.Cancellable
38+ conWatch bus.Cancellable
39+}
40+
41+// New makes a ConnectedState for connectivity tracking.
42+//
43+// The endpoint need not be dialed; Track() will Dial() and
44+// Close() it as it sees fit.
45+func New(endp bus.Endpoint, config ConnectivityConfig, log logger.Logger) *ConnectedState {
46+ wg := NewWebchecker(config.ConnectivityCheckURL, config.ConnectivityCheckMD5, 10*time.Second, log)
47+ return &ConnectedState{
48+ config: config,
49+ log: log,
50+ endp: endp,
51+ webget: wg.Webcheck,
52+ done: make(chan struct{}),
53+ }
54+}
55+
56+// cancel watches if any
57+func (cs *ConnectedState) reset() {
58+ if cs.stateWatch != nil {
59+ cs.stateWatch.Cancel()
60+ cs.stateWatch = nil
61+ }
62+ if cs.conWatch != nil {
63+ cs.conWatch.Cancel()
64+ cs.conWatch = nil
65+ }
66 }
67
68 // start connects to the bus, gets the initial NetworkManager state, and sets
69 // up the watch.
70-func (cs *connectedState) start() networkmanager.State {
71+func (cs *ConnectedState) start() networkmanager.State {
72 var initial networkmanager.State
73 var stateCh <-chan networkmanager.State
74 var primary string
75@@ -72,8 +107,9 @@
76 cs.connAttempts += ar.Redial()
77 nm := networkmanager.New(cs.endp, cs.log)
78
79+ cs.reset()
80 // set up the watch
81- stateCh, err = nm.WatchState()
82+ stateCh, cs.stateWatch, err = nm.WatchState()
83 if err != nil {
84 cs.log.Debugf("failed to set up the state watch: %s", err)
85 goto Continue
86@@ -87,7 +123,7 @@
87 }
88 cs.log.Debugf("got initial state of %s", initial)
89
90- conCh, err = nm.WatchPrimaryConnection()
91+ conCh, cs.conWatch, err = nm.WatchPrimaryConnection()
92 if err != nil {
93 cs.log.Debugf("failed to set up the connection watch: %s", err)
94 goto Continue
95@@ -107,9 +143,11 @@
96 }
97 }
98
99-// connectedStateStep takes one step forwards in the “am I connected?”
100+var errCanceled = errors.New("canceled")
101+
102+// step takes one step forwards in the “am I connected?”
103 // answering state machine.
104-func (cs *connectedState) connectedStateStep() (bool, error) {
105+func (cs *ConnectedState) step() (bool, error) {
106 stabilizingTimeout := cs.config.StabilizingTimeout.Duration
107 recheckTimeout := cs.config.RecheckTimeout.Duration
108 log := cs.log
109@@ -117,6 +155,8 @@
110 Loop:
111 for {
112 select {
113+ case <-cs.done:
114+ return false, errCanceled
115 case <-cs.networkConCh:
116 cs.webgetCh = nil
117 cs.timer.Reset(stabilizingTimeout)
118@@ -178,35 +218,48 @@
119 return cs.lastSent, nil
120 }
121
122-// ConnectedState sends the initial NetworkManager state and changes to it
123+// Track sends the initial NetworkManager state and changes to it
124 // over the "out" channel. Sends "false" as soon as it detects trouble, "true"
125 // after checking actual connectivity.
126 //
127-// The endpoint need not be dialed; connectivity will Dial() and Close()
128-// it as it sees fit.
129-func ConnectedState(endp bus.Endpoint, config ConnectivityConfig, log logger.Logger, out chan<- bool) {
130- wg := NewWebchecker(config.ConnectivityCheckURL, config.ConnectivityCheckMD5, 10*time.Second, log)
131- cs := &connectedState{
132- config: config,
133- log: log,
134- endp: endp,
135- webget: wg.Webcheck,
136- }
137+func (cs *ConnectedState) Track(out chan<- bool) {
138
139 Start:
140- log.Debugf("sending initial 'disconnected'.")
141- out <- false
142+ cs.log.Debugf("sending initial 'disconnected'.")
143+ select {
144+ case <-cs.done:
145+ return
146+ case out <- false:
147+ }
148 cs.lastSent = false
149 cs.currentState = cs.start()
150+ defer cs.reset()
151 cs.timer = time.NewTimer(cs.config.StabilizingTimeout.Duration)
152
153 for {
154- v, err := cs.connectedStateStep()
155+ v, err := cs.step()
156+ if err == errCanceled {
157+ return
158+ }
159 if err != nil {
160 // tear it all down and start over
161- log.Errorf("%s", err)
162+ cs.log.Errorf("%s", err)
163 goto Start
164 }
165- out <- v
166+ select {
167+ case <-cs.done:
168+ return
169+ case out <- v:
170+ }
171+ }
172+}
173+
174+// Cancel stops the ConnectedState machinary.
175+func (cs *ConnectedState) Cancel() {
176+ cs.doneLck.Lock()
177+ defer cs.doneLck.Unlock()
178+ if !cs.canceled {
179+ cs.canceled = true
180+ close(cs.done)
181 }
182 }
183
184=== modified file 'bus/connectivity/connectivity_test.go'
185--- bus/connectivity/connectivity_test.go 2015-02-24 14:10:05 +0000
186+++ bus/connectivity/connectivity_test.go 2015-02-27 10:54:31 +0000
187@@ -64,13 +64,13 @@
188 )
189
190 /*
191- tests for connectedState's Start() method
192+ tests for ConnectedState's Start() method
193 */
194
195 // when given a working config and bus, Start() will work
196 func (s *ConnSuite) TestStartWorks(c *C) {
197 endp := testingbus.NewTestingEndpoint(condition.Work(true), condition.Work(true), uint32(networkmanager.Connecting), helloCon)
198- cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
199+ cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
200
201 nopTicker := make(chan []interface{})
202 testingbus.SetWatchSource(endp, "StateChanged", nopTicker)
203@@ -83,7 +83,7 @@
204 // if the bus fails a couple of times, we're still OK
205 func (s *ConnSuite) TestStartRetriesConnect(c *C) {
206 endp := testingbus.NewTestingEndpoint(condition.Fail2Work(2), condition.Work(true), uint32(networkmanager.Connecting), helloCon)
207- cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
208+ cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
209
210 nopTicker := make(chan []interface{})
211 testingbus.SetWatchSource(endp, "StateChanged", nopTicker)
212@@ -97,7 +97,7 @@
213 // when the calls to NetworkManager fails for a bit, we're still OK
214 func (s *ConnSuite) TestStartRetriesCall(c *C) {
215 endp := testingbus.NewTestingEndpoint(condition.Work(true), condition.Fail2Work(5), uint32(networkmanager.Connecting), helloCon)
216- cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
217+ cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
218
219 nopTicker := make(chan []interface{})
220 testingbus.SetWatchSource(endp, "StateChanged", nopTicker)
221@@ -111,14 +111,14 @@
222
223 // when some of the calls to NetworkManager fails for a bit, we're still OK
224 func (s *ConnSuite) TestStartRetriesCall2(c *C) {
225- cond := condition.Chain(3, condition.Work(true), 1, condition.Work(false),
226+ cond := condition.Chain(1, condition.Work(true), 1, condition.Work(false),
227 1, condition.Work(true))
228
229 endp := testingbus.NewTestingEndpoint(condition.Work(true), cond,
230 uint32(networkmanager.Connecting), helloCon,
231 uint32(networkmanager.Connecting), helloCon,
232 )
233- cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
234+ cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
235
236 nopTicker := make(chan []interface{})
237 testingbus.SetWatchSource(endp, "StateChanged", nopTicker)
238@@ -141,7 +141,7 @@
239 uint32(networkmanager.Connecting),
240 helloCon,
241 )
242- cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
243+ cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: endp}
244 watchTicker := make(chan []interface{}, 1)
245 nopTicker := make(chan []interface{})
246 testingbus.SetWatchSource(endp, "StateChanged", watchTicker)
247@@ -151,7 +151,6 @@
248
249 c.Check(cs.start(), Equals, networkmanager.Connecting)
250 c.Check(cs.connAttempts, Equals, uint32(2))
251- // XXX this may be stolen by an old watch => dead lock
252 watchTicker <- []interface{}{uint32(networkmanager.ConnectedGlobal)}
253 c.Check(<-cs.networkStateCh, Equals, networkmanager.ConnectedGlobal)
254 }
255@@ -181,7 +180,7 @@
256 }
257 }
258
259-func (rep *racyEndpoint) WatchSignal(member string, f func(...interface{}), d func()) error {
260+func (rep *racyEndpoint) WatchSignal(member string, f func(...interface{}), d func()) (bus.Cancellable, error) {
261 if member == "StateChanged" {
262 // we count never having gotten the state as happening "after" now.
263 rep.lock.RLock()
264@@ -194,7 +193,7 @@
265 d()
266 }()
267 }
268- return nil
269+ return nil, nil
270 }
271
272 func (*racyEndpoint) Close() {}
273@@ -223,7 +222,7 @@
274 func (s *ConnSuite) TestStartAvoidsRace(c *C) {
275 for delta := time.Second; delta > 1; delta /= 2 {
276 rep := &racyEndpoint{delta: delta}
277- cs := connectedState{config: ConnectivityConfig{}, log: s.log, endp: rep}
278+ cs := ConnectedState{config: ConnectivityConfig{}, log: s.log, endp: rep}
279 f := Commentf("when delta=%s", delta)
280 c.Assert(cs.start(), Equals, networkmanager.Connecting, f)
281 c.Assert(takeNext(cs.networkStateCh), Equals, networkmanager.ConnectedGlobal, f)
282@@ -231,7 +230,7 @@
283 }
284
285 /*
286- tests for connectedStateStep()
287+ tests for step()
288 */
289
290 func (s *ConnSuite) TestSteps(c *C) {
291@@ -242,7 +241,7 @@
292 RecheckTimeout: config.ConfigTimeDuration{recheck_timeout},
293 }
294 ch := make(chan networkmanager.State, 10)
295- cs := &connectedState{
296+ cs := &ConnectedState{
297 config: cfg,
298 networkStateCh: ch,
299 timer: time.NewTimer(time.Second),
300@@ -251,15 +250,15 @@
301 lastSent: false,
302 }
303 ch <- networkmanager.ConnectedGlobal
304- f, e := cs.connectedStateStep()
305+ f, e := cs.step()
306 c.Check(e, IsNil)
307 c.Check(f, Equals, true)
308 ch <- networkmanager.Disconnected
309 ch <- networkmanager.ConnectedGlobal
310- f, e = cs.connectedStateStep()
311+ f, e = cs.step()
312 c.Check(e, IsNil)
313 c.Check(f, Equals, false)
314- f, e = cs.connectedStateStep()
315+ f, e = cs.step()
316 c.Check(e, IsNil)
317 c.Check(f, Equals, true)
318
319@@ -267,7 +266,7 @@
320 webget_p = condition.Fail2Work(1)
321 ch <- networkmanager.Disconnected
322 ch <- networkmanager.ConnectedGlobal
323- f, e = cs.connectedStateStep()
324+ f, e = cs.step()
325 c.Check(e, IsNil)
326 c.Check(f, Equals, false) // first false is from the Disconnected
327
328@@ -276,7 +275,7 @@
329 _t := time.NewTimer(recheck_timeout / 2)
330
331 go func() {
332- f, e := cs.connectedStateStep()
333+ f, e := cs.step()
334 c.Check(e, IsNil)
335 _ch <- f
336 }()
337@@ -294,15 +293,15 @@
338 ch <- networkmanager.Disconnected // this should not
339 ch <- networkmanager.ConnectedGlobal // this should trigger a 'true'
340
341- f, e = cs.connectedStateStep()
342+ f, e = cs.step()
343 c.Check(e, IsNil)
344 c.Check(f, Equals, false)
345- f, e = cs.connectedStateStep()
346+ f, e = cs.step()
347 c.Check(e, IsNil)
348 c.Check(f, Equals, true)
349
350 close(ch) // this should make it error out
351- _, e = cs.connectedStateStep()
352+ _, e = cs.step()
353 c.Check(e, NotNil)
354 }
355
356@@ -336,7 +335,9 @@
357 out := make(chan bool)
358 dt := time.Second / 10
359 timer := time.NewTimer(dt)
360- go ConnectedState(endp, cfg, s.log, out)
361+ cs := New(endp, cfg, s.log)
362+ defer cs.Cancel()
363+ go cs.Track(out)
364 var v bool
365 expecteds := []struct {
366 p bool
367@@ -399,7 +400,9 @@
368 out := make(chan bool)
369 dt := time.Second / 10
370 timer := time.NewTimer(dt)
371- go ConnectedState(endp, cfg, s.log, out)
372+ cs := New(endp, cfg, s.log)
373+ defer cs.Cancel()
374+ go cs.Track(out)
375 var v bool
376 expecteds := []struct {
377 p bool
378
379=== modified file 'bus/endpoint.go'
380--- bus/endpoint.go 2015-01-22 09:52:07 +0000
381+++ bus/endpoint.go 2015-02-27 10:54:31 +0000
382@@ -35,10 +35,15 @@
383 type BusMethod func(string, []interface{}, []interface{}) ([]interface{}, error)
384 type DispatchMap map[string]BusMethod
385
386+// Cancellable can be canceled.
387+type Cancellable interface {
388+ Cancel() error
389+}
390+
391 // bus.Endpoint represents the DBus connection itself.
392 type Endpoint interface {
393 GrabName(allowReplacement bool) <-chan error
394- WatchSignal(member string, f func(...interface{}), d func()) error
395+ WatchSignal(member string, f func(...interface{}), d func()) (Cancellable, error)
396 WatchMethod(DispatchMap, string, ...interface{})
397 Signal(string, string, []interface{}) error
398 Call(member string, args []interface{}, rvs ...interface{}) error
399@@ -123,16 +128,16 @@
400 // sends the values over a channel, and d() would close the channel.
401 //
402 // XXX: untested
403-func (endp *endpoint) WatchSignal(member string, f func(...interface{}), d func()) error {
404+func (endp *endpoint) WatchSignal(member string, f func(...interface{}), d func()) (Cancellable, error) {
405 watch, err := endp.proxy.WatchSignal(endp.addr.Interface, member)
406 if err != nil {
407 endp.log.Debugf("failed to set up the watch: %s", err)
408- return err
409+ return nil, err
410 }
411
412 go endp.unpackMessages(watch, f, d, member)
413
414- return nil
415+ return watch, nil
416 }
417
418 // Call() invokes the provided member method (on the name, path and
419@@ -324,6 +329,6 @@
420 }
421 f(endp.unpackOneMsg(msg, member)...)
422 }
423- endp.log.Errorf("got not-OK from %s watch", member)
424+ endp.log.Debugf("got not-OK from %s watch", member)
425 d()
426 }
427
428=== modified file 'bus/networkmanager/networkmanager.go'
429--- bus/networkmanager/networkmanager.go 2015-01-22 09:52:07 +0000
430+++ bus/networkmanager/networkmanager.go 2015-02-27 10:54:31 +0000
431@@ -42,13 +42,13 @@
432 GetState() State
433 // WatchState listens for changes to NetworkManager's state, and sends
434 // them out over the channel returned.
435- WatchState() (<-chan State, error)
436+ WatchState() (<-chan State, bus.Cancellable, error)
437 // GetPrimaryConnection fetches and returns NetworkManager's current
438 // primary connection.
439 GetPrimaryConnection() string
440 // WatchPrimaryConnection listens for changes of NetworkManager's
441 // Primary Connection, and sends it out over the channel returned.
442- WatchPrimaryConnection() (<-chan string, error)
443+ WatchPrimaryConnection() (<-chan string, bus.Cancellable, error)
444 }
445
446 type networkManager struct {
447@@ -85,9 +85,9 @@
448 return State(v)
449 }
450
451-func (nm *networkManager) WatchState() (<-chan State, error) {
452+func (nm *networkManager) WatchState() (<-chan State, bus.Cancellable, error) {
453 ch := make(chan State)
454- err := nm.bus.WatchSignal("StateChanged",
455+ w, err := nm.bus.WatchSignal("StateChanged",
456 func(ns ...interface{}) {
457 stint, ok := ns[0].(uint32)
458 if !ok {
459@@ -101,10 +101,10 @@
460 func() { close(ch) })
461 if err != nil {
462 nm.log.Debugf("Failed to set up the watch: %s", err)
463- return nil, err
464+ return nil, nil, err
465 }
466
467- return ch, nil
468+ return ch, w, nil
469 }
470
471 func (nm *networkManager) GetPrimaryConnection() string {
472@@ -124,9 +124,9 @@
473 return string(v)
474 }
475
476-func (nm *networkManager) WatchPrimaryConnection() (<-chan string, error) {
477+func (nm *networkManager) WatchPrimaryConnection() (<-chan string, bus.Cancellable, error) {
478 ch := make(chan string)
479- err := nm.bus.WatchSignal("PropertiesChanged",
480+ w, err := nm.bus.WatchSignal("PropertiesChanged",
481 func(ppsi ...interface{}) {
482 pps, ok := ppsi[0].(map[string]dbus.Variant)
483 if !ok {
484@@ -147,8 +147,8 @@
485 }, func() { close(ch) })
486 if err != nil {
487 nm.log.Debugf("failed to set up the watch: %s", err)
488- return nil, err
489+ return nil, nil, err
490 }
491
492- return ch, nil
493+ return ch, w, nil
494 }
495
496=== modified file 'bus/networkmanager/networkmanager_test.go'
497--- bus/networkmanager/networkmanager_test.go 2014-04-04 12:01:42 +0000
498+++ bus/networkmanager/networkmanager_test.go 2015-02-27 10:54:31 +0000
499@@ -90,8 +90,9 @@
500 func (s *NMSuite) TestWatchState(c *C) {
501 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), uint32(Unknown), uint32(Asleep), uint32(ConnectedGlobal))
502 nm := New(tc, s.log)
503- ch, err := nm.WatchState()
504- c.Check(err, IsNil)
505+ ch, w, err := nm.WatchState()
506+ c.Assert(err, IsNil)
507+ defer w.Cancel()
508 l := []State{<-ch, <-ch, <-ch}
509 c.Check(l, DeepEquals, []State{Unknown, Asleep, ConnectedGlobal})
510 }
511@@ -99,7 +100,7 @@
512 // WatchState returns on error if the dbus call fails
513 func (s *NMSuite) TestWatchStateFails(c *C) {
514 nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false)), s.log)
515- _, err := nm.WatchState()
516+ _, _, err := nm.WatchState()
517 c.Check(err, NotNil)
518 }
519
520@@ -107,8 +108,9 @@
521 func (s *NMSuite) TestWatchStateClosesOnWatchBail(c *C) {
522 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true))
523 nm := New(tc, s.log)
524- ch, err := nm.WatchState()
525- c.Check(err, IsNil)
526+ ch, w, err := nm.WatchState()
527+ c.Assert(err, IsNil)
528+ defer w.Cancel()
529 _, ok := <-ch
530 c.Check(ok, Equals, false)
531 }
532@@ -117,8 +119,9 @@
533 func (s *NMSuite) TestWatchStateSurvivesRubbishValues(c *C) {
534 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), "a")
535 nm := New(tc, s.log)
536- ch, err := nm.WatchState()
537- c.Check(err, IsNil)
538+ ch, w, err := nm.WatchState()
539+ c.Assert(err, IsNil)
540+ defer w.Cancel()
541 _, ok := <-ch
542 c.Check(ok, Equals, false)
543 }
544@@ -164,8 +167,9 @@
545 mkPriConMap("/b/2"),
546 mkPriConMap("/c/3"))
547 nm := New(tc, s.log)
548- ch, err := nm.WatchPrimaryConnection()
549- c.Check(err, IsNil)
550+ ch, w, err := nm.WatchPrimaryConnection()
551+ c.Assert(err, IsNil)
552+ defer w.Cancel()
553 l := []string{<-ch, <-ch, <-ch}
554 c.Check(l, DeepEquals, []string{"/a/1", "/b/2", "/c/3"})
555 }
556@@ -173,7 +177,7 @@
557 // WatchPrimaryConnection returns on error if the dbus call fails
558 func (s *NMSuite) TestWatchPrimaryConnectionFails(c *C) {
559 nm := New(testingbus.NewTestingEndpoint(nil, condition.Work(false)), s.log)
560- _, err := nm.WatchPrimaryConnection()
561+ _, _, err := nm.WatchPrimaryConnection()
562 c.Check(err, NotNil)
563 }
564
565@@ -181,8 +185,9 @@
566 func (s *NMSuite) TestWatchPrimaryConnectionClosesOnWatchBail(c *C) {
567 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true))
568 nm := New(tc, s.log)
569- ch, err := nm.WatchPrimaryConnection()
570- c.Check(err, IsNil)
571+ ch, w, err := nm.WatchPrimaryConnection()
572+ c.Assert(err, IsNil)
573+ defer w.Cancel()
574 _, ok := <-ch
575 c.Check(ok, Equals, false)
576 }
577@@ -191,8 +196,9 @@
578 func (s *NMSuite) TestWatchPrimaryConnectionSurvivesRubbishValues(c *C) {
579 tc := testingbus.NewTestingEndpoint(nil, condition.Work(true), "a")
580 nm := New(tc, s.log)
581- ch, err := nm.WatchPrimaryConnection()
582+ ch, w, err := nm.WatchPrimaryConnection()
583 c.Assert(err, IsNil)
584+ defer w.Cancel()
585 _, ok := <-ch
586 c.Check(ok, Equals, false)
587 }
588@@ -204,8 +210,9 @@
589 map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("42")}},
590 )
591 nm := New(tc, s.log)
592- ch, err := nm.WatchPrimaryConnection()
593+ ch, w, err := nm.WatchPrimaryConnection()
594 c.Assert(err, IsNil)
595+ defer w.Cancel()
596 v, ok := <-ch
597 c.Check(ok, Equals, true)
598 c.Check(v, Equals, "42")
599@@ -218,8 +225,9 @@
600 map[string]dbus.Variant{"PrimaryConnection": dbus.Variant{dbus.ObjectPath("42")}},
601 )
602 nm := New(tc, s.log)
603- ch, err := nm.WatchPrimaryConnection()
604+ ch, w, err := nm.WatchPrimaryConnection()
605 c.Assert(err, IsNil)
606+ defer w.Cancel()
607 v, ok := <-ch
608 c.Check(ok, Equals, true)
609 c.Check(v, Equals, "42")
610
611=== modified file 'bus/notifications/raw.go'
612--- bus/notifications/raw.go 2015-01-22 09:52:07 +0000
613+++ bus/notifications/raw.go 2015-02-27 10:54:31 +0000
614@@ -93,7 +93,7 @@
615 // and sends them over the channel provided
616 func (raw *RawNotifications) WatchActions() (<-chan *RawAction, error) {
617 ch := make(chan *RawAction)
618- err := raw.bus.WatchSignal("ActionInvoked",
619+ _, err := raw.bus.WatchSignal("ActionInvoked",
620 func(ns ...interface{}) {
621 if len(ns) != 2 {
622 raw.log.Debugf("ActionInvoked delivered %d things instead of 2", len(ns))
623
624=== modified file 'bus/testing/testing_endpoint.go'
625--- bus/testing/testing_endpoint.go 2015-02-24 14:10:05 +0000
626+++ bus/testing/testing_endpoint.go 2015-02-27 10:54:31 +0000
627@@ -80,11 +80,32 @@
628 return tc.(*testingEndpoint).callArgs
629 }
630
631+type watchCancel struct {
632+ done chan struct{}
633+ cancelled chan struct{}
634+ lck sync.Mutex
635+ member string
636+}
637+
638+// this waits for actual cancelllation for test convenience
639+func (wc *watchCancel) Cancel() error {
640+ wc.lck.Lock()
641+ defer wc.lck.Unlock()
642+ if wc.cancelled != nil {
643+ close(wc.cancelled)
644+ wc.cancelled = nil
645+ <-wc.done
646+ }
647+ return nil
648+}
649+
650 // See Endpoint's WatchSignal. This WatchSignal will check its condition to
651 // decide whether to return an error, or provide each of its return values
652 // or values from the previously set watchSource for member.
653-func (tc *testingEndpoint) WatchSignal(member string, f func(...interface{}), d func()) error {
654+func (tc *testingEndpoint) WatchSignal(member string, f func(...interface{}), d func()) (bus.Cancellable, error) {
655 if tc.callCond.OK() {
656+ cancelled := make(chan struct{})
657+ done := make(chan struct{})
658 go func() {
659 tc.watchLck.RLock()
660 source := tc.watchSources[member]
661@@ -96,21 +117,40 @@
662 tc.usedLck.Unlock()
663 source = make(chan []interface{})
664 go func() {
665+ Feed:
666 for _, v := range tc.retvals[idx:] {
667- source <- v
668- time.Sleep(10 * time.Millisecond)
669+ select {
670+ case source <- v:
671+ case <-cancelled:
672+ break Feed
673+ }
674+ select {
675+ case <-time.After(10 * time.Millisecond):
676+ case <-cancelled:
677+ break Feed
678+ }
679 }
680 close(source)
681 }()
682 }
683- for v := range source {
684- f(v...)
685+ Receive:
686+ for {
687+ select {
688+ case v, ok := <-source:
689+ if !ok {
690+ break Receive
691+ }
692+ f(v...)
693+ case <-cancelled:
694+ break Receive
695+ }
696 }
697 d()
698+ close(done)
699 }()
700- return nil
701+ return &watchCancel{cancelled: cancelled, done: done, member: member}, nil
702 } else {
703- return errors.New("no way")
704+ return nil, errors.New("no way")
705 }
706 }
707
708
709=== modified file 'bus/testing/testing_endpoint_test.go'
710--- bus/testing/testing_endpoint_test.go 2015-02-20 17:40:57 +0000
711+++ bus/testing/testing_endpoint_test.go 2015-02-27 10:54:31 +0000
712@@ -17,11 +17,13 @@
713 package testing
714
715 import (
716+ "testing"
717+ "time"
718+
719 . "launchpad.net/gocheck"
720+
721 "launchpad.net/ubuntu-push/bus"
722 "launchpad.net/ubuntu-push/testing/condition"
723- "testing"
724- "time"
725 )
726
727 // hook up gocheck
728@@ -100,8 +102,9 @@
729 var m, n uint32 = 42, 17
730 endp := NewTestingEndpoint(nil, condition.Work(true), m, n)
731 ch := make(chan uint32)
732- e := endp.WatchSignal("what", func(us ...interface{}) { ch <- us[0].(uint32) }, func() { close(ch) })
733- c.Check(e, IsNil)
734+ w, e := endp.WatchSignal("which", func(us ...interface{}) { ch <- us[0].(uint32) }, func() { close(ch) })
735+ c.Assert(e, IsNil)
736+ defer w.Cancel()
737 c.Check(<-ch, Equals, m)
738 c.Check(<-ch, Equals, n)
739 }
740@@ -110,8 +113,9 @@
741 func (s *TestingEndpointSuite) TestWatchDestructor(c *C) {
742 endp := NewTestingEndpoint(nil, condition.Work(true))
743 ch := make(chan uint32)
744- e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) })
745- c.Check(e, IsNil)
746+ w, e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) })
747+ c.Assert(e, IsNil)
748+ defer w.Cancel()
749 _, ok := <-ch
750 c.Check(ok, Equals, false)
751 }
752@@ -130,8 +134,9 @@
753 // Test that WatchSignal() with a negative condition returns an error.
754 func (s *TestingEndpointSuite) TestWatchFails(c *C) {
755 endp := NewTestingEndpoint(nil, condition.Work(false))
756- e := endp.WatchSignal("what", func(us ...interface{}) {}, func() {})
757+ w, e := endp.WatchSignal("what", func(us ...interface{}) {}, func() {})
758 c.Check(e, NotNil)
759+ c.Check(w, IsNil)
760 }
761
762 // Test WatchSignal can use a watchSource instead of a timeout and retvals (if
763@@ -146,8 +151,9 @@
764 endp := NewTestingEndpoint(nil, condition.Work(true), 0, 0)
765 SetWatchSource(endp, "what", watchTicker)
766 ch := make(chan int)
767- e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) })
768- c.Check(e, IsNil)
769+ w, e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) })
770+ c.Assert(e, IsNil)
771+ defer w.Cancel()
772
773 close(watchTicker)
774 // wait for the destructor to be called
775@@ -161,6 +167,19 @@
776 c.Assert(len(watchTicker), Equals, 0)
777 }
778
779+// Test that WatchSignal() calls the destructor callback when canceled.
780+func (s *TestingEndpointSuite) TestWatchCancel(c *C) {
781+ endp := NewTestingEndpoint(nil, condition.Work(true))
782+ ch := make(chan uint32)
783+ w, e := endp.WatchSignal("what", func(us ...interface{}) {}, func() { close(ch) })
784+ c.Assert(e, IsNil)
785+ defer w.Cancel()
786+ SetWatchSource(endp, "what", make(chan []interface{}))
787+ w.Cancel()
788+ _, ok := <-ch
789+ c.Check(ok, Equals, false)
790+}
791+
792 // Tests that GetProperty() works
793 func (s *TestingEndpointSuite) TestGetProperty(c *C) {
794 var m uint32 = 42
795
796=== modified file 'client/client.go'
797--- client/client.go 2015-01-26 21:00:27 +0000
798+++ client/client.go 2015-02-27 10:54:31 +0000
799@@ -280,8 +280,10 @@
800
801 // takeTheBus starts the connection(s) to D-Bus and sets up associated event channels
802 func (client *PushClient) takeTheBus() error {
803- go connectivity.ConnectedState(client.connectivityEndp,
804- client.config.ConnectivityConfig, client.log, client.connCh)
805+ fmt.Println("FOO")
806+ cs := connectivity.New(client.connectivityEndp,
807+ client.config.ConnectivityConfig, client.log)
808+ go cs.Track(client.connCh)
809 util.NewAutoRedialer(client.systemImageEndp).Redial()
810 sysimg := systemimage.New(client.systemImageEndp, client.log)
811 info, err := sysimg.Info()
812
813=== modified file 'client/client_test.go'
814--- client/client_test.go 2015-02-24 14:10:05 +0000
815+++ client/client_test.go 2015-02-27 10:54:31 +0000
816@@ -206,12 +206,7 @@
817 }
818
819 func (cs *clientSuite) TearDownTest(c *C) {
820- //fmt.Println("GOROUTINE# ", runtime.NumGoroutine())
821- /*
822- var x [16*1024]byte
823- sz := runtime.Stack(x[:], true)
824- fmt.Println(string(x[:sz]))
825- */
826+ //helpers.DumpGoroutines()
827 }
828
829 type sqlientSuite struct{ clientSuite }
830
831=== modified file 'testing/helpers.go'
832--- testing/helpers.go 2015-01-22 09:52:07 +0000
833+++ testing/helpers.go 2015-02-27 10:54:31 +0000
834@@ -170,3 +170,12 @@
835 }
836 return purl
837 }
838+
839+// DumpGoroutines dumps current goroutines.
840+func DumpGoroutines() {
841+ var buf [64 * 1024]byte
842+ sz := runtime.Stack(buf[:], true)
843+ dump := string(buf[:sz])
844+ fmt.Println(dump)
845+ fmt.Println("#goroutines#", strings.Count("\n"+dump, "\ngoroutine "))
846+}

Subscribers

People subscribed via source and target branches