Merge lp:~chipaca/ubuntu-push/no-left-over-goroutines-p1 into lp:ubuntu-push/automatic

Proposed by John Lenton
Status: Merged
Approved by: John Lenton
Approved revision: 362
Merged at revision: 361
Proposed branch: lp:~chipaca/ubuntu-push/no-left-over-goroutines-p1
Merge into: lp:ubuntu-push/automatic
Diff against target: 268 lines (+88/-14)
7 files modified
bus/testing/testing_endpoint.go (+5/-1)
client/client_test.go (+17/-1)
client/session/seenstate/seenstate.go (+6/-1)
client/session/seenstate/sqlseenstate.go (+5/-0)
client/session/seenstate/sqlseenstate_test.go (+9/-0)
client/session/session_test.go (+1/-0)
util/redialer.go (+45/-11)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/no-left-over-goroutines-p1
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+248355@code.launchpad.net

Commit message

pedronis' branch, with a test fixed.

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

Ignore the prereq :)

Revision history for this message
Samuele Pedroni (pedronis) :
review: Approve
Revision history for this message
Ubuntu One Auto Pilot (otto-pilot) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'bus/testing/testing_endpoint.go'
--- bus/testing/testing_endpoint.go 2014-07-04 23:00:42 +0000
+++ bus/testing/testing_endpoint.go 2015-02-03 11:03:23 +0000
@@ -89,7 +89,11 @@
89 ticker := tc.watchTicker89 ticker := tc.watchTicker
90 tc.watchLck.RUnlock()90 tc.watchLck.RUnlock()
91 if ticker != nil {91 if ticker != nil {
92 <-ticker92 _, ok := <-ticker
93 if !ok {
94 // bail out
95 return
96 }
93 } else {97 } else {
94 time.Sleep(10 * time.Millisecond)98 time.Sleep(10 * time.Millisecond)
95 }99 }
96100
=== modified file 'client/client_test.go'
--- client/client_test.go 2015-01-22 09:52:07 +0000
+++ client/client_test.go 2015-02-03 11:03:23 +0000
@@ -27,6 +27,7 @@
27 "os"27 "os"
28 "path/filepath"28 "path/filepath"
29 "reflect"29 "reflect"
30 //"runtime"
30 "testing"31 "testing"
31 "time"32 "time"
3233
@@ -203,6 +204,15 @@
203 cs.writeTestConfig(nil)204 cs.writeTestConfig(nil)
204}205}
205206
207func (cs *clientSuite) TearDownTest(c *C) {
208 //fmt.Println("GOROUTINE# ", runtime.NumGoroutine())
209 /*
210 var x [16*1024]byte
211 sz := runtime.Stack(x[:], true)
212 fmt.Println(string(x[:sz]))
213 */
214}
215
206type sqlientSuite struct{ clientSuite }216type sqlientSuite struct{ clientSuite }
207217
208func (s *sqlientSuite) SetUpSuite(c *C) {218func (s *sqlientSuite) SetUpSuite(c *C) {
@@ -651,7 +661,9 @@
651 )661 )
652 siCond := condition.Fail2Work(2)662 siCond := condition.Fail2Work(2)
653 siEndp := testibus.NewMultiValuedTestingEndpoint(siCond, condition.Work(true), []interface{}{int32(101), "mako", "daily", "Unknown", map[string]string{}})663 siEndp := testibus.NewMultiValuedTestingEndpoint(siCond, condition.Work(true), []interface{}{int32(101), "mako", "daily", "Unknown", map[string]string{}})
654 testibus.SetWatchTicker(cEndp, make(chan bool))664 tickerCh := make(chan bool)
665 testibus.SetWatchTicker(cEndp, tickerCh)
666 defer close(tickerCh)
655 // ok, create the thing667 // ok, create the thing
656 cli := NewPushClient(cs.configPath, cs.leveldbPath)668 cli := NewPushClient(cs.configPath, cs.leveldbPath)
657 cli.log = cs.log669 cli.log = cs.log
@@ -700,6 +712,7 @@
700 c.Assert(cli.initSessionAndPoller(), IsNil)712 c.Assert(cli.initSessionAndPoller(), IsNil)
701 cs.log.ResetCapture()713 cs.log.ResetCapture()
702 cli.hasConnectivity = true714 cli.hasConnectivity = true
715 defer cli.session.Close()
703 cli.handleErr(errors.New("bananas"))716 cli.handleErr(errors.New("bananas"))
704 c.Check(cs.log.Captured(), Matches, ".*session exited.*bananas\n")717 c.Check(cs.log.Captured(), Matches, ".*session exited.*bananas\n")
705}718}
@@ -712,6 +725,7 @@
712 cli := NewPushClient(cs.configPath, "")725 cli := NewPushClient(cs.configPath, "")
713 ln, err := cli.seenStateFactory()726 ln, err := cli.seenStateFactory()
714 c.Assert(err, IsNil)727 c.Assert(err, IsNil)
728 defer ln.Close()
715 c.Check(fmt.Sprintf("%T", ln), Equals, "*seenstate.memSeenState")729 c.Check(fmt.Sprintf("%T", ln), Equals, "*seenstate.memSeenState")
716}730}
717731
@@ -719,6 +733,7 @@
719 cli := NewPushClient(cs.configPath, ":memory:")733 cli := NewPushClient(cs.configPath, ":memory:")
720 ln, err := cli.seenStateFactory()734 ln, err := cli.seenStateFactory()
721 c.Assert(err, IsNil)735 c.Assert(err, IsNil)
736 defer ln.Close()
722 c.Check(fmt.Sprintf("%T", ln), Equals, "*seenstate.sqliteSeenState")737 c.Check(fmt.Sprintf("%T", ln), Equals, "*seenstate.sqliteSeenState")
723}738}
724739
@@ -733,6 +748,7 @@
733 c.Assert(cli.initSessionAndPoller(), IsNil)748 c.Assert(cli.initSessionAndPoller(), IsNil)
734749
735 c.Assert(cli.hasConnectivity, Equals, false)750 c.Assert(cli.hasConnectivity, Equals, false)
751 defer cli.session.Close()
736 cli.handleConnState(true)752 cli.handleConnState(true)
737 c.Check(cli.hasConnectivity, Equals, true)753 c.Check(cli.hasConnectivity, Equals, true)
738 c.Assert(cli.session, NotNil)754 c.Assert(cli.session, NotNil)
739755
=== modified file 'client/session/seenstate/seenstate.go'
--- client/session/seenstate/seenstate.go 2014-05-14 17:42:24 +0000
+++ client/session/seenstate/seenstate.go 2015-02-03 11:03:23 +0000
@@ -28,8 +28,10 @@
28 // GetAll() returns a "simple" map of the current levels.28 // GetAll() returns a "simple" map of the current levels.
29 GetAllLevels() (map[string]int64, error)29 GetAllLevels() (map[string]int64, error)
30 // FilterBySeen filters notifications already seen, keep track30 // FilterBySeen filters notifications already seen, keep track
31 // of them as well31 // of them as well.
32 FilterBySeen([]protocol.Notification) ([]protocol.Notification, error)32 FilterBySeen([]protocol.Notification) ([]protocol.Notification, error)
33 // Close closes state.
34 Close()
33}35}
3436
35type memSeenState struct {37type memSeenState struct {
@@ -58,6 +60,9 @@
58 return acc, nil60 return acc, nil
59}61}
6062
63func (m *memSeenState) Close() {
64}
65
61var _ SeenState = (*memSeenState)(nil)66var _ SeenState = (*memSeenState)(nil)
6267
63// NewSeenState returns an implementation of SeenState that is memory-based and68// NewSeenState returns an implementation of SeenState that is memory-based and
6469
=== modified file 'client/session/seenstate/sqlseenstate.go'
--- client/session/seenstate/sqlseenstate.go 2014-05-14 17:42:24 +0000
+++ client/session/seenstate/sqlseenstate.go 2015-02-03 11:03:23 +0000
@@ -48,6 +48,11 @@
48 return &sqliteSeenState{db}, nil48 return &sqliteSeenState{db}, nil
49}49}
5050
51// Closes closes the underlying db.
52func (ps *sqliteSeenState) Close() {
53 ps.db.Close()
54}
55
51func (ps *sqliteSeenState) SetLevel(level string, top int64) error {56func (ps *sqliteSeenState) SetLevel(level string, top int64) error {
52 _, err := ps.db.Exec("REPLACE INTO level_map (level, top) VALUES (?, ?)", level, top)57 _, err := ps.db.Exec("REPLACE INTO level_map (level, top) VALUES (?, ?)", level, top)
53 if err != nil {58 if err != nil {
5459
=== modified file 'client/session/seenstate/sqlseenstate_test.go'
--- client/session/seenstate/sqlseenstate_test.go 2014-05-14 17:42:24 +0000
+++ client/session/seenstate/sqlseenstate_test.go 2015-02-03 11:03:23 +0000
@@ -112,6 +112,15 @@
112 c.Check(err, ErrorMatches, "cannot insert .*")112 c.Check(err, ErrorMatches, "cannot insert .*")
113}113}
114114
115func (s *sqlsSuite) TestClose(c *C) {
116 dir := c.MkDir()
117 filename := dir + "test.db"
118 sqls, err := NewSqliteSeenState(filename)
119 c.Check(err, IsNil)
120 c.Assert(sqls, NotNil)
121 sqls.Close()
122}
123
115func (s *sqlsSuite) TestDropPrevThan(c *C) {124func (s *sqlsSuite) TestDropPrevThan(c *C) {
116 dir := c.MkDir()125 dir := c.MkDir()
117 filename := dir + "test.db"126 filename := dir + "test.db"
118127
=== modified file 'client/session/session_test.go'
--- client/session/session_test.go 2014-11-25 17:36:27 +0000
+++ client/session/session_test.go 2015-02-03 11:03:23 +0000
@@ -162,6 +162,7 @@
162162
163func (*brokenSeenState) SetLevel(string, int64) error { return errors.New("broken.") }163func (*brokenSeenState) SetLevel(string, int64) error { return errors.New("broken.") }
164func (*brokenSeenState) GetAllLevels() (map[string]int64, error) { return nil, errors.New("broken.") }164func (*brokenSeenState) GetAllLevels() (map[string]int64, error) { return nil, errors.New("broken.") }
165func (*brokenSeenState) Close() {}
165func (*brokenSeenState) FilterBySeen([]protocol.Notification) ([]protocol.Notification, error) {166func (*brokenSeenState) FilterBySeen([]protocol.Notification) ([]protocol.Notification, error) {
166 return nil, errors.New("broken.")167 return nil, errors.New("broken.")
167}168}
168169
=== modified file 'util/redialer.go'
--- util/redialer.go 2015-01-21 17:55:35 +0000
+++ util/redialer.go 2015-02-03 11:03:23 +0000
@@ -19,7 +19,6 @@
1919
20import (20import (
21 "sync"21 "sync"
22 "sync/atomic"
23 "time"22 "time"
24)23)
2524
@@ -79,26 +78,53 @@
79}78}
8079
81type autoRedialer struct {80type autoRedialer struct {
82 stateP *redialerState81 stateLock sync.RWMutex
83 dial func() error82 stateValue redialerState
84 jitter func(time.Duration) time.Duration83 stopping chan struct{}
84 reallyStopped chan struct{}
85 dial func() error
86 jitter func(time.Duration) time.Duration
85}87}
8688
87func (ar *autoRedialer) state() redialerState {89func (ar *autoRedialer) state() redialerState {
88 return redialerState(atomic.LoadUint32((*uint32)(ar.stateP)))90 ar.stateLock.RLock()
91 defer ar.stateLock.RUnlock()
92 return ar.stateValue
89}93}
9094
91func (ar *autoRedialer) setState(s redialerState) {95func (ar *autoRedialer) setState(s redialerState) {
92 atomic.StoreUint32((*uint32)(ar.stateP), uint32(s))96 ar.stateLock.Lock()
97 defer ar.stateLock.Unlock()
98 ar.stateValue = s
93}99}
94100
95func (ar *autoRedialer) setStateIfEqual(oldState, newState redialerState) bool {101func (ar *autoRedialer) setStateIfEqual(oldState, newState redialerState) bool {
96 return atomic.CompareAndSwapUint32((*uint32)(ar.stateP), uint32(oldState), uint32(newState))102 ar.stateLock.Lock()
103 defer ar.stateLock.Unlock()
104 if ar.stateValue != oldState {
105 return false
106 }
107 ar.stateValue = newState
108 return true
109}
110
111func (ar *autoRedialer) setStateStopped() {
112 ar.stateLock.Lock()
113 defer ar.stateLock.Unlock()
114 switch ar.stateValue {
115 case Stopped:
116 return
117 case Unconfigured:
118 close(ar.reallyStopped)
119 }
120 ar.stateValue = Stopped
121 close(ar.stopping)
97}122}
98123
99func (ar *autoRedialer) Stop() {124func (ar *autoRedialer) Stop() {
100 if ar != nil {125 if ar != nil {
101 ar.setState(Stopped)126 ar.setStateStopped()
127 <-ar.reallyStopped
102 }128 }
103}129}
104130
@@ -113,6 +139,7 @@
113 // XXX log this139 // XXX log this
114 return 0140 return 0
115 }141 }
142 defer close(ar.reallyStopped)
116143
117 var timeout time.Duration144 var timeout time.Duration
118 var dialAttempts uint32 = 0 // unsigned so it can wrap safely ...145 var dialAttempts uint32 = 0 // unsigned so it can wrap safely ...
@@ -134,15 +161,22 @@
134 timeout += ar.jitter(timeout)161 timeout += ar.jitter(timeout)
135 }162 }
136 dialAttempts++163 dialAttempts++
137 time.Sleep(timeout)164 select {
165 case <-ar.stopping:
166 case <-time.After(timeout):
167 }
138 }168 }
139}169}
140170
141// Returns a stoppable AutoRedialer using the provided Dialer. If the Dialer171// Returns a stoppable AutoRedialer using the provided Dialer. If the Dialer
142// is also a Jitterer, the backoff will be jittered.172// is also a Jitterer, the backoff will be jittered.
143func NewAutoRedialer(dialer Dialer) AutoRedialer {173func NewAutoRedialer(dialer Dialer) AutoRedialer {
144 state := Unconfigured174 ar := &autoRedialer{
145 ar := &autoRedialer{stateP: &state, dial: dialer.Dial}175 stateValue: Unconfigured,
176 dial: dialer.Dial,
177 reallyStopped: make(chan struct{}),
178 stopping: make(chan struct{}),
179 }
146 jitterer, ok := dialer.(Jitterer)180 jitterer, ok := dialer.(Jitterer)
147 if ok {181 if ok {
148 ar.jitter = jitterer.Jitter182 ar.jitter = jitterer.Jitter

Subscribers

People subscribed via source and target branches