Merge lp:~chipaca/ubuntu-push/no-left-over-goroutines-p1 into lp:ubuntu-push/automatic

Proposed by John Lenton
Status: Merged
Approved by: John Lenton
Approved revision: 362
Merged at revision: 361
Proposed branch: lp:~chipaca/ubuntu-push/no-left-over-goroutines-p1
Merge into: lp:ubuntu-push/automatic
Diff against target: 268 lines (+88/-14)
7 files modified
bus/testing/testing_endpoint.go (+5/-1)
client/client_test.go (+17/-1)
client/session/seenstate/seenstate.go (+6/-1)
client/session/seenstate/sqlseenstate.go (+5/-0)
client/session/seenstate/sqlseenstate_test.go (+9/-0)
client/session/session_test.go (+1/-0)
util/redialer.go (+45/-11)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/no-left-over-goroutines-p1
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+248355@code.launchpad.net

Commit message

pedronis' branch, with a test fixed.

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

Ignore the prereq :)

Revision history for this message
Samuele Pedroni (pedronis) :
review: Approve
Revision history for this message
Ubuntu One Auto Pilot (otto-pilot) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bus/testing/testing_endpoint.go'
2--- bus/testing/testing_endpoint.go 2014-07-04 23:00:42 +0000
3+++ bus/testing/testing_endpoint.go 2015-02-03 11:03:23 +0000
4@@ -89,7 +89,11 @@
5 ticker := tc.watchTicker
6 tc.watchLck.RUnlock()
7 if ticker != nil {
8- <-ticker
9+ _, ok := <-ticker
10+ if !ok {
11+ // bail out
12+ return
13+ }
14 } else {
15 time.Sleep(10 * time.Millisecond)
16 }
17
18=== modified file 'client/client_test.go'
19--- client/client_test.go 2015-01-22 09:52:07 +0000
20+++ client/client_test.go 2015-02-03 11:03:23 +0000
21@@ -27,6 +27,7 @@
22 "os"
23 "path/filepath"
24 "reflect"
25+ //"runtime"
26 "testing"
27 "time"
28
29@@ -203,6 +204,15 @@
30 cs.writeTestConfig(nil)
31 }
32
33+func (cs *clientSuite) TearDownTest(c *C) {
34+ //fmt.Println("GOROUTINE# ", runtime.NumGoroutine())
35+ /*
36+ var x [16*1024]byte
37+ sz := runtime.Stack(x[:], true)
38+ fmt.Println(string(x[:sz]))
39+ */
40+}
41+
42 type sqlientSuite struct{ clientSuite }
43
44 func (s *sqlientSuite) SetUpSuite(c *C) {
45@@ -651,7 +661,9 @@
46 )
47 siCond := condition.Fail2Work(2)
48 siEndp := testibus.NewMultiValuedTestingEndpoint(siCond, condition.Work(true), []interface{}{int32(101), "mako", "daily", "Unknown", map[string]string{}})
49- testibus.SetWatchTicker(cEndp, make(chan bool))
50+ tickerCh := make(chan bool)
51+ testibus.SetWatchTicker(cEndp, tickerCh)
52+ defer close(tickerCh)
53 // ok, create the thing
54 cli := NewPushClient(cs.configPath, cs.leveldbPath)
55 cli.log = cs.log
56@@ -700,6 +712,7 @@
57 c.Assert(cli.initSessionAndPoller(), IsNil)
58 cs.log.ResetCapture()
59 cli.hasConnectivity = true
60+ defer cli.session.Close()
61 cli.handleErr(errors.New("bananas"))
62 c.Check(cs.log.Captured(), Matches, ".*session exited.*bananas\n")
63 }
64@@ -712,6 +725,7 @@
65 cli := NewPushClient(cs.configPath, "")
66 ln, err := cli.seenStateFactory()
67 c.Assert(err, IsNil)
68+ defer ln.Close()
69 c.Check(fmt.Sprintf("%T", ln), Equals, "*seenstate.memSeenState")
70 }
71
72@@ -719,6 +733,7 @@
73 cli := NewPushClient(cs.configPath, ":memory:")
74 ln, err := cli.seenStateFactory()
75 c.Assert(err, IsNil)
76+ defer ln.Close()
77 c.Check(fmt.Sprintf("%T", ln), Equals, "*seenstate.sqliteSeenState")
78 }
79
80@@ -733,6 +748,7 @@
81 c.Assert(cli.initSessionAndPoller(), IsNil)
82
83 c.Assert(cli.hasConnectivity, Equals, false)
84+ defer cli.session.Close()
85 cli.handleConnState(true)
86 c.Check(cli.hasConnectivity, Equals, true)
87 c.Assert(cli.session, NotNil)
88
89=== modified file 'client/session/seenstate/seenstate.go'
90--- client/session/seenstate/seenstate.go 2014-05-14 17:42:24 +0000
91+++ client/session/seenstate/seenstate.go 2015-02-03 11:03:23 +0000
92@@ -28,8 +28,10 @@
93 // GetAll() returns a "simple" map of the current levels.
94 GetAllLevels() (map[string]int64, error)
95 // FilterBySeen filters notifications already seen, keep track
96- // of them as well
97+ // of them as well.
98 FilterBySeen([]protocol.Notification) ([]protocol.Notification, error)
99+ // Close closes state.
100+ Close()
101 }
102
103 type memSeenState struct {
104@@ -58,6 +60,9 @@
105 return acc, nil
106 }
107
108+func (m *memSeenState) Close() {
109+}
110+
111 var _ SeenState = (*memSeenState)(nil)
112
113 // NewSeenState returns an implementation of SeenState that is memory-based and
114
115=== modified file 'client/session/seenstate/sqlseenstate.go'
116--- client/session/seenstate/sqlseenstate.go 2014-05-14 17:42:24 +0000
117+++ client/session/seenstate/sqlseenstate.go 2015-02-03 11:03:23 +0000
118@@ -48,6 +48,11 @@
119 return &sqliteSeenState{db}, nil
120 }
121
122+// Closes closes the underlying db.
123+func (ps *sqliteSeenState) Close() {
124+ ps.db.Close()
125+}
126+
127 func (ps *sqliteSeenState) SetLevel(level string, top int64) error {
128 _, err := ps.db.Exec("REPLACE INTO level_map (level, top) VALUES (?, ?)", level, top)
129 if err != nil {
130
131=== modified file 'client/session/seenstate/sqlseenstate_test.go'
132--- client/session/seenstate/sqlseenstate_test.go 2014-05-14 17:42:24 +0000
133+++ client/session/seenstate/sqlseenstate_test.go 2015-02-03 11:03:23 +0000
134@@ -112,6 +112,15 @@
135 c.Check(err, ErrorMatches, "cannot insert .*")
136 }
137
138+func (s *sqlsSuite) TestClose(c *C) {
139+ dir := c.MkDir()
140+ filename := dir + "test.db"
141+ sqls, err := NewSqliteSeenState(filename)
142+ c.Check(err, IsNil)
143+ c.Assert(sqls, NotNil)
144+ sqls.Close()
145+}
146+
147 func (s *sqlsSuite) TestDropPrevThan(c *C) {
148 dir := c.MkDir()
149 filename := dir + "test.db"
150
151=== modified file 'client/session/session_test.go'
152--- client/session/session_test.go 2014-11-25 17:36:27 +0000
153+++ client/session/session_test.go 2015-02-03 11:03:23 +0000
154@@ -162,6 +162,7 @@
155
156 func (*brokenSeenState) SetLevel(string, int64) error { return errors.New("broken.") }
157 func (*brokenSeenState) GetAllLevels() (map[string]int64, error) { return nil, errors.New("broken.") }
158+func (*brokenSeenState) Close() {}
159 func (*brokenSeenState) FilterBySeen([]protocol.Notification) ([]protocol.Notification, error) {
160 return nil, errors.New("broken.")
161 }
162
163=== modified file 'util/redialer.go'
164--- util/redialer.go 2015-01-21 17:55:35 +0000
165+++ util/redialer.go 2015-02-03 11:03:23 +0000
166@@ -19,7 +19,6 @@
167
168 import (
169 "sync"
170- "sync/atomic"
171 "time"
172 )
173
174@@ -79,26 +78,53 @@
175 }
176
177 type autoRedialer struct {
178- stateP *redialerState
179- dial func() error
180- jitter func(time.Duration) time.Duration
181+ stateLock sync.RWMutex
182+ stateValue redialerState
183+ stopping chan struct{}
184+ reallyStopped chan struct{}
185+ dial func() error
186+ jitter func(time.Duration) time.Duration
187 }
188
189 func (ar *autoRedialer) state() redialerState {
190- return redialerState(atomic.LoadUint32((*uint32)(ar.stateP)))
191+ ar.stateLock.RLock()
192+ defer ar.stateLock.RUnlock()
193+ return ar.stateValue
194 }
195
196 func (ar *autoRedialer) setState(s redialerState) {
197- atomic.StoreUint32((*uint32)(ar.stateP), uint32(s))
198+ ar.stateLock.Lock()
199+ defer ar.stateLock.Unlock()
200+ ar.stateValue = s
201 }
202
203 func (ar *autoRedialer) setStateIfEqual(oldState, newState redialerState) bool {
204- return atomic.CompareAndSwapUint32((*uint32)(ar.stateP), uint32(oldState), uint32(newState))
205+ ar.stateLock.Lock()
206+ defer ar.stateLock.Unlock()
207+ if ar.stateValue != oldState {
208+ return false
209+ }
210+ ar.stateValue = newState
211+ return true
212+}
213+
214+func (ar *autoRedialer) setStateStopped() {
215+ ar.stateLock.Lock()
216+ defer ar.stateLock.Unlock()
217+ switch ar.stateValue {
218+ case Stopped:
219+ return
220+ case Unconfigured:
221+ close(ar.reallyStopped)
222+ }
223+ ar.stateValue = Stopped
224+ close(ar.stopping)
225 }
226
227 func (ar *autoRedialer) Stop() {
228 if ar != nil {
229- ar.setState(Stopped)
230+ ar.setStateStopped()
231+ <-ar.reallyStopped
232 }
233 }
234
235@@ -113,6 +139,7 @@
236 // XXX log this
237 return 0
238 }
239+ defer close(ar.reallyStopped)
240
241 var timeout time.Duration
242 var dialAttempts uint32 = 0 // unsigned so it can wrap safely ...
243@@ -134,15 +161,22 @@
244 timeout += ar.jitter(timeout)
245 }
246 dialAttempts++
247- time.Sleep(timeout)
248+ select {
249+ case <-ar.stopping:
250+ case <-time.After(timeout):
251+ }
252 }
253 }
254
255 // Returns a stoppable AutoRedialer using the provided Dialer. If the Dialer
256 // is also a Jitterer, the backoff will be jittered.
257 func NewAutoRedialer(dialer Dialer) AutoRedialer {
258- state := Unconfigured
259- ar := &autoRedialer{stateP: &state, dial: dialer.Dial}
260+ ar := &autoRedialer{
261+ stateValue: Unconfigured,
262+ dial: dialer.Dial,
263+ reallyStopped: make(chan struct{}),
264+ stopping: make(chan struct{}),
265+ }
266 jitterer, ok := dialer.(Jitterer)
267 if ok {
268 ar.jitter = jitterer.Jitter

Subscribers

People subscribed via source and target branches