Merge lp:~chipaca/ubuntu-push/client-session-getting-there into lp:ubuntu-push
- client-session-getting-there
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | John Lenton |
Approved revision: | no longer in the source branch. |
Merged at revision: | 42 |
Proposed branch: | lp:~chipaca/ubuntu-push/client-session-getting-there |
Merge into: | lp:ubuntu-push |
Diff against target: |
282 lines (+198/-3) 3 files modified
client/session/session.go (+28/-0) client/session/session_test.go (+158/-2) testing/condition/condition.go (+12/-1) |
To merge this branch: | bzr merge lp:~chipaca/ubuntu-push/client-session-getting-there |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Samuele Pedroni | Approve | ||
Review via email: mp+204365@code.launchpad.net |
This proposal supersedes a proposal from 2014-01-30.
Commit message
Ladies and gentlemen, the client session.
Description of the change
Refactored the whole dance.
Samuele Pedroni (pedronis) wrote : Posted in a previous version of this proposal | # |
Samuele Pedroni (pedronis) wrote : Posted in a previous version of this proposal | # |
98 +func (cs *clientSessionS
it's hard to tell what this is testing concretely? it's not cliear it's checking the various bits of channel resetting for example
maybe run/Run should set a flag on the session or something to use here?
Samuele Pedroni (pedronis) wrote : Posted in a previous version of this proposal | # |
Run and Reset need descriptions
Samuele Pedroni (pedronis) wrote : Posted in a previous version of this proposal | # |
vaguer comments:
bit unsure about the assymmetry that the happy start path has you call Dial and Run,
and Reset does both for you,
also not sure about Run doing "go ..." for you
Samuele Pedroni (pedronis) wrote : | # |
40 + sess.Close()
that one needs testing, calling Dial twice is a bit annoying though, maybe easier if it lives in run() ?
minor:
8 + sess.ErrCh = make(chan error, 1)
9 + sess.MsgCh = make(chan *Notification)
this could be done in run() as well, run is the only place that uses sess.ErrCh
38 + panic("can't Reset() without a protocol constructor.")
should say Dial
114 + func() error { return failure })
could use a channel there given that loop is run in a goroutine
John Lenton (chipaca) wrote : | # |
Oops, yes, calling close was untested (no need to call Dial twice). Fixed that.
Also moved it into run (as a parameter) and tested it there as well (what's the convention to avoid shadowing builtins? gone with close_ for now).
Moved channel creation, good point.
Fixed the panic message.
Added a second channel to the loop test function; note "return failure" goes into the error channel, so it was already doing one. Made the second one more explicit though.
Samuele Pedroni (pedronis) wrote : | # |
> Also moved it into run (as a parameter) and tested it there as well (what's
> the convention to avoid shadowing builtins? gone with close_ for now).
yes the single namespace (for types, modules and variables and builtins) plus the convention for protected/public makes for clunky clashes sometimes, I haven't seen deeper recommendations than to inflict the most clunkiness on variable naming; doClose|closeFunc though perhaps
> Added a second channel to the loop test function; note "return failure" goes
> into the error channel, so it was already doing one. Made the second one more
> explicit though.
I was meaning this, to check that run() is running is a goroutine, given that the MsgCh is also unbuffered it doesn't matter for now (but this is more independent of the details of ErrCh/MsgCh)
--- client/
+++ client/
@@ -584,18 +584,20 @@
// biggie if this stops being true)
c.Check(
c.Check(
- failure := errors.
+ failureCh := make(chan error)
notf := &Notification{}
err = sess.run(
func() {},
func() error { return nil },
func() error { return nil },
- func() error { sess.MsgCh <- notf; return failure })
+ func() error { sess.MsgCh <- notf; return <-failureCh })
c.Check(err, Equals, nil)
// if run doesn't error it sets up the channels
c.Assert(
c.Assert(
c.Check(
+ failure := errors.
+ failureCh <- failure
c.Check(
}
Samuele Pedroni (pedronis) wrote : | # |
there's a data race in TestDialsWorks:
https:/
I fear the stuff in condition may need Mutexes or be channel based more
Samuele Pedroni (pedronis) wrote : | # |
74 + ch := make(chan bool, 1)
75 + err = sess.run(
76 + func() { ch <- true },
close isn't called in a goroutine so a flag would work there as well
John Lenton (chipaca) wrote : | # |
> > Also moved it into run (as a parameter) and tested it there as well (what's
> > the convention to avoid shadowing builtins? gone with close_ for now).
>
> yes the single namespace (for types, modules and variables and builtins) plus
> the convention for protected/public makes for clunky clashes sometimes, I
> haven't seen deeper recommendations than to inflict the most clunkiness on
> variable naming; doClose|closeFunc though perhaps
went with using the noun forms.
> I was meaning this, to check that run() is running is a goroutine, given that
> the MsgCh is also unbuffered it doesn't matter for now (but this is more
> independent of the details of ErrCh/MsgCh)
now I got it. And stole your patch :)
John Lenton (chipaca) wrote : | # |
> there's a data race in TestDialsWorks:
>
> https:/
>
> I fear the stuff in condition may need Mutexes or be channel based more
Yup. Done.
John Lenton (chipaca) wrote : | # |
> 74 + ch := make(chan bool, 1)
> 75 + err = sess.run(
> 76 + func() { ch <- true },
>
> close isn't called in a goroutine so a flag would work there as well
and the code is more legible as a result. Done.
Samuele Pedroni (pedronis) wrote : | # |
thanks for bearing with me
looks good !
- 42. By John Lenton
-
[r=pedronis] Ladies and gentlemen, the client session.
Preview Diff
1 | === modified file 'client/session/session.go' |
2 | --- client/session/session.go 2014-01-31 21:47:15 +0000 |
3 | +++ client/session/session.go 2014-02-01 21:04:46 +0000 |
4 | @@ -196,3 +196,31 @@ |
5 | sess.Log.Debugf("Connected %v.", conn.LocalAddr()) |
6 | return nil |
7 | } |
8 | + |
9 | +// run calls connect, and if it works it calls start, and if it works |
10 | +// it runs loop in a goroutine, and ships its return value over ErrCh. |
11 | +func (sess *ClientSession) run(closer func(), connecter, starter, looper func() error) error { |
12 | + closer() |
13 | + err := connecter() |
14 | + if err == nil { |
15 | + err = starter() |
16 | + if err == nil { |
17 | + sess.ErrCh = make(chan error, 1) |
18 | + sess.MsgCh = make(chan *Notification) |
19 | + go func() { sess.ErrCh <- looper() }() |
20 | + } |
21 | + } |
22 | + return err |
23 | +} |
24 | + |
25 | +// Dial takes the session from newly created (or newly disconnected) |
26 | +// to running the main loop. |
27 | +func (sess *ClientSession) Dial() error { |
28 | + if sess.Protocolator == nil { |
29 | + // a missing protocolator means you've willfully overridden |
30 | + // it; returning an error here would prompt AutoRedial to just |
31 | + // keep on trying. |
32 | + panic("can't Dial() without a protocol constructor.") |
33 | + } |
34 | + return sess.run(sess.Close, sess.connect, sess.start, sess.loop) |
35 | +} |
36 | |
37 | === modified file 'client/session/session_test.go' |
38 | --- client/session/session_test.go 2014-01-31 21:47:15 +0000 |
39 | +++ client/session/session_test.go 2014-02-01 21:04:46 +0000 |
40 | @@ -17,6 +17,7 @@ |
41 | package session |
42 | |
43 | import ( |
44 | + "crypto/tls" |
45 | "encoding/json" |
46 | "errors" |
47 | "fmt" |
48 | @@ -39,7 +40,8 @@ |
49 | type clientSessionSuite struct{} |
50 | |
51 | var nullog = logger.NewSimpleLogger(ioutil.Discard, "error") |
52 | -var debuglog = logger.NewSimpleLogger(os.Stderr, "debug") |
53 | +var noisylog = logger.NewSimpleLogger(os.Stderr, "debug") |
54 | +var debuglog = nullog |
55 | var _ = Suite(&clientSessionSuite{}) |
56 | |
57 | // |
58 | @@ -542,5 +544,159 @@ |
59 | } |
60 | // start is now done. |
61 | err = <-errCh |
62 | - c.Assert(err, IsNil) |
63 | + c.Check(err, IsNil) |
64 | +} |
65 | + |
66 | +/**************************************************************** |
67 | + run() tests |
68 | +****************************************************************/ |
69 | + |
70 | +func (cs *clientSessionSuite) TestRunBailsIfConnectFails(c *C) { |
71 | + sess, err := NewSession("", nil, 0, "wah", debuglog) |
72 | + c.Assert(err, IsNil) |
73 | + failure := errors.New("TestRunBailsIfConnectFails") |
74 | + has_closed := false |
75 | + err = sess.run( |
76 | + func() { has_closed = true }, |
77 | + func() error { return failure }, |
78 | + nil, |
79 | + nil) |
80 | + c.Check(err, Equals, failure) |
81 | + c.Check(has_closed, Equals, true) |
82 | +} |
83 | + |
84 | +func (cs *clientSessionSuite) TestRunBailsIfStartFails(c *C) { |
85 | + sess, err := NewSession("", nil, 0, "wah", debuglog) |
86 | + c.Assert(err, IsNil) |
87 | + failure := errors.New("TestRunBailsIfStartFails") |
88 | + err = sess.run( |
89 | + func() {}, |
90 | + func() error { return nil }, |
91 | + func() error { return failure }, |
92 | + nil) |
93 | + c.Check(err, Equals, failure) |
94 | +} |
95 | + |
96 | +func (cs *clientSessionSuite) TestRunRunsEvenIfLoopFails(c *C) { |
97 | + sess, err := NewSession("", nil, 0, "wah", debuglog) |
98 | + c.Assert(err, IsNil) |
99 | + // just to make a point: until here we haven't set ErrCh & MsgCh (no |
100 | + // biggie if this stops being true) |
101 | + c.Check(sess.ErrCh, IsNil) |
102 | + c.Check(sess.MsgCh, IsNil) |
103 | + failureCh := make(chan error) // must be unbuffered |
104 | + notf := &Notification{} |
105 | + err = sess.run( |
106 | + func() {}, |
107 | + func() error { return nil }, |
108 | + func() error { return nil }, |
109 | + func() error { sess.MsgCh <- notf; return <-failureCh }) |
110 | + c.Check(err, Equals, nil) |
111 | + // if run doesn't error it sets up the channels |
112 | + c.Assert(sess.ErrCh, NotNil) |
113 | + c.Assert(sess.MsgCh, NotNil) |
114 | + c.Check(<-sess.MsgCh, Equals, notf) |
115 | + failure := errors.New("TestRunRunsEvenIfLoopFails") |
116 | + failureCh <- failure |
117 | + c.Check(<-sess.ErrCh, Equals, failure) |
118 | + // so now you know it was running in a goroutine :) |
119 | +} |
120 | + |
121 | +/**************************************************************** |
122 | + Dial() tests |
123 | +****************************************************************/ |
124 | + |
125 | +func (cs *clientSessionSuite) TestDialPanics(c *C) { |
126 | + // one last unhappy test |
127 | + sess, err := NewSession("", nil, 0, "wah", debuglog) |
128 | + c.Assert(err, IsNil) |
129 | + sess.Protocolator = nil |
130 | + c.Check(sess.Dial, PanicMatches, ".*protocol constructor.") |
131 | +} |
132 | + |
133 | +func (cs *clientSessionSuite) TestDialWorks(c *C) { |
134 | + // happy path thoughts |
135 | + cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock) |
136 | + c.Assert(err, IsNil) |
137 | + tlsCfg := &tls.Config{ |
138 | + Certificates: []tls.Certificate{cert}, |
139 | + SessionTicketsDisabled: true, |
140 | + } |
141 | + |
142 | + timeout := 100 * time.Millisecond |
143 | + lst, err := tls.Listen("tcp", "localhost:0", tlsCfg) |
144 | + c.Assert(err, IsNil) |
145 | + sess, err := NewSession(lst.Addr().String(), nil, timeout, "wah", debuglog) |
146 | + c.Assert(err, IsNil) |
147 | + tconn := &testConn{CloseCondition: condition.Fail2Work(10)} |
148 | + sess.Connection = tconn |
149 | + // just to be sure: |
150 | + c.Check(tconn.CloseCondition.String(), Matches, ".* 10 to go.") |
151 | + |
152 | + upCh := make(chan interface{}, 5) |
153 | + downCh := make(chan interface{}, 5) |
154 | + proto := &testProtocol{up: upCh, down: downCh} |
155 | + sess.Protocolator = func(net.Conn) protocol.Protocol { return proto } |
156 | + |
157 | + go sess.Dial() |
158 | + |
159 | + srv, err := lst.Accept() |
160 | + c.Assert(err, IsNil) |
161 | + |
162 | + // connect done |
163 | + |
164 | + // Dial should have had the session's old connection (tconn) closed |
165 | + // before connecting a new one; if that was done, tconn's condition |
166 | + // ticked forward: |
167 | + c.Check(tconn.CloseCondition.String(), Matches, ".* 9 to go.") |
168 | + |
169 | + // now, start: 1. protocol version |
170 | + v, err := protocol.ReadWireFormatVersion(srv, timeout) |
171 | + c.Assert(err, IsNil) |
172 | + c.Assert(v, Equals, protocol.ProtocolWireVersion) |
173 | + |
174 | + // 2. "connect" (but on the fake protcol above! woo) |
175 | + |
176 | + c.Check(takeNext(downCh), Equals, "deadline 100ms") |
177 | + _, ok := takeNext(downCh).(protocol.ConnectMsg) |
178 | + c.Check(ok, Equals, true) |
179 | + upCh <- nil // no error |
180 | + upCh <- protocol.ConnAckMsg{ |
181 | + Type: "connack", |
182 | + Params: protocol.ConnAckParams{(10 * time.Millisecond).String()}, |
183 | + } |
184 | + // start is now done. |
185 | + |
186 | + // 3. "loop" |
187 | + |
188 | + // ping works, |
189 | + c.Check(takeNext(downCh), Equals, "deadline 110ms") |
190 | + upCh <- protocol.PingPongMsg{Type: "ping"} |
191 | + c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"}) |
192 | + upCh <- nil |
193 | + |
194 | + // and broadcasts... |
195 | + b := &protocol.BroadcastMsg{ |
196 | + Type: "broadcast", |
197 | + AppId: "--ignored--", |
198 | + ChanId: "0", |
199 | + TopLevel: 2, |
200 | + Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
201 | + } |
202 | + c.Check(takeNext(downCh), Equals, "deadline 110ms") |
203 | + upCh <- b |
204 | + c.Check(takeNext(downCh), Equals, protocol.AckMsg{"ack"}) |
205 | + upCh <- nil |
206 | + // ...get bubbled up, |
207 | + c.Check(<-sess.MsgCh, NotNil) |
208 | + // and their TopLevel remembered |
209 | + c.Check(sess.Levels.GetAll(), DeepEquals, map[string]int64{"0": 2}) |
210 | + |
211 | + // and ping still work even after that. |
212 | + c.Check(takeNext(downCh), Equals, "deadline 110ms") |
213 | + upCh <- protocol.PingPongMsg{Type: "ping"} |
214 | + c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"}) |
215 | + failure := errors.New("pongs") |
216 | + upCh <- failure |
217 | + c.Check(<-sess.ErrCh, Equals, failure) |
218 | } |
219 | |
220 | === modified file 'testing/condition/condition.go' |
221 | --- testing/condition/condition.go 2014-01-20 06:23:17 +0000 |
222 | +++ testing/condition/condition.go 2014-02-01 21:04:46 +0000 |
223 | @@ -20,6 +20,7 @@ |
224 | import ( |
225 | "fmt" |
226 | "strings" |
227 | + "sync" |
228 | ) |
229 | |
230 | type Interface interface { |
231 | @@ -62,9 +63,12 @@ |
232 | |
233 | type fail2Work struct { |
234 | Left int32 |
235 | + lock sync.RWMutex |
236 | } |
237 | |
238 | func (c *fail2Work) OK() bool { |
239 | + c.lock.Lock() |
240 | + defer c.lock.Unlock() |
241 | if c.Left > 0 { |
242 | c.Left-- |
243 | return false |
244 | @@ -74,6 +78,8 @@ |
245 | } |
246 | |
247 | func (c *fail2Work) String() string { |
248 | + c.lock.RLock() |
249 | + defer c.lock.RUnlock() |
250 | if c.Left > 0 { |
251 | return fmt.Sprintf("Still Broken, %d to go.", c.Left) |
252 | } else { |
253 | @@ -104,10 +110,13 @@ |
254 | |
255 | type chain struct { |
256 | subs []*_iter |
257 | + lock sync.RWMutex |
258 | } |
259 | |
260 | func (c *chain) OK() bool { |
261 | var sub *_iter |
262 | + c.lock.Lock() |
263 | + defer c.lock.Unlock() |
264 | for _, sub = range c.subs { |
265 | if sub.remaining > 0 { |
266 | sub.remaining-- |
267 | @@ -119,6 +128,8 @@ |
268 | |
269 | func (c *chain) String() string { |
270 | ss := make([]string, len(c.subs)) |
271 | + c.lock.RLock() |
272 | + defer c.lock.RUnlock() |
273 | for i, sub := range c.subs { |
274 | ss[i] = sub.String() |
275 | } |
276 | @@ -138,5 +149,5 @@ |
277 | args = args[2:] |
278 | } |
279 | |
280 | - return &chain{iters} |
281 | + return &chain{subs: iters} |
282 | } |
31 + sess.Log. Errorf( "%s", err)
seems that error needs a prefix of sort