Merge lp:~chipaca/ubuntu-push/client-session-getting-there into lp:ubuntu-push

Proposed by John Lenton
Status: Merged
Approved by: John Lenton
Approved revision: no longer in the source branch.
Merged at revision: 42
Proposed branch: lp:~chipaca/ubuntu-push/client-session-getting-there
Merge into: lp:ubuntu-push
Diff against target: 282 lines (+198/-3)
3 files modified
client/session/session.go (+28/-0)
client/session/session_test.go (+158/-2)
testing/condition/condition.go (+12/-1)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/client-session-getting-there
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+204365@code.launchpad.net

This proposal supersedes a proposal from 2014-01-30.

Commit message

Ladies and gentlemen, the client session.

Description of the change

Refactored the whole dance.

To post a comment you must log in.
Revision history for this message
Samuele Pedroni (pedronis) wrote : Posted in a previous version of this proposal

31 + sess.Log.Errorf("%s", err)

seems that error needs a prefix of sort

Revision history for this message
Samuele Pedroni (pedronis) wrote : Posted in a previous version of this proposal

98 +func (cs *clientSessionSuite) TestResets(c *C) {

it's hard to tell what this is testing concretely? it's not cliear it's checking the various bits of channel resetting for example

maybe run/Run should set a flag on the session or something to use here?

Revision history for this message
Samuele Pedroni (pedronis) wrote : Posted in a previous version of this proposal

Run and Reset need descriptions

Revision history for this message
Samuele Pedroni (pedronis) wrote : Posted in a previous version of this proposal

vaguer comments:

bit unsure about the assymmetry that the happy start path has you call Dial and Run,

and Reset does both for you,

also not sure about Run doing "go ..." for you

Revision history for this message
Samuele Pedroni (pedronis) wrote :

40 + sess.Close()

that one needs testing, calling Dial twice is a bit annoying though, maybe easier if it lives in run() ?

minor:
8 + sess.ErrCh = make(chan error, 1)
9 + sess.MsgCh = make(chan *Notification)

this could be done in run() as well, run is the only place that uses sess.ErrCh

38 + panic("can't Reset() without a protocol constructor.")

should say Dial

114 + func() error { return failure })

could use a channel there given that loop is run in a goroutine

Revision history for this message
John Lenton (chipaca) wrote :

Oops, yes, calling close was untested (no need to call Dial twice). Fixed that.
Also moved it into run (as a parameter) and tested it there as well (what's the convention to avoid shadowing builtins? gone with close_ for now).

Moved channel creation, good point.

Fixed the panic message.

Added a second channel to the loop test function; note "return failure" goes into the error channel, so it was already doing one. Made the second one more explicit though.

Revision history for this message
Samuele Pedroni (pedronis) wrote :

> Also moved it into run (as a parameter) and tested it there as well (what's
> the convention to avoid shadowing builtins? gone with close_ for now).

yes the single namespace (for types, modules and variables and builtins) plus the convention for protected/public makes for clunky clashes sometimes, I haven't seen deeper recommendations than to inflict the most clunkiness on variable naming; doClose|closeFunc though perhaps

> Added a second channel to the loop test function; note "return failure" goes
> into the error channel, so it was already doing one. Made the second one more
> explicit though.

I was meaning this, to check that run() is running is a goroutine, given that the MsgCh is also unbuffered it doesn't matter for now (but this is more independent of the details of ErrCh/MsgCh)

--- client/session/session_test.go 2014-02-01 09:42:43 +0000
+++ client/session/session_test.go 2014-02-01 14:00:10 +0000
@@ -584,18 +584,20 @@
  // biggie if this stops being true)
  c.Check(sess.ErrCh, IsNil)
  c.Check(sess.MsgCh, IsNil)
- failure := errors.New("TestRunRunsEvenIfLoopFails")
+ failureCh := make(chan error)
  notf := &Notification{}
  err = sess.run(
   func() {},
   func() error { return nil },
   func() error { return nil },
- func() error { sess.MsgCh <- notf; return failure })
+ func() error { sess.MsgCh <- notf; return <-failureCh })
  c.Check(err, Equals, nil)
  // if run doesn't error it sets up the channels
  c.Assert(sess.ErrCh, NotNil)
  c.Assert(sess.MsgCh, NotNil)
  c.Check(<-sess.MsgCh, Equals, notf)
+ failure := errors.New("TestRunRunsEvenIfLoopFails")
+ failureCh <- failure
  c.Check(<-sess.ErrCh, Equals, failure)
 }

Revision history for this message
Samuele Pedroni (pedronis) wrote :

there's a data race in TestDialsWorks:

https://pastebin.canonical.com/104078/

I fear the stuff in condition may need Mutexes or be channel based more

Revision history for this message
Samuele Pedroni (pedronis) wrote :

74 + ch := make(chan bool, 1)
75 + err = sess.run(
76 + func() { ch <- true },

close isn't called in a goroutine so a flag would work there as well

Revision history for this message
John Lenton (chipaca) wrote :

> > Also moved it into run (as a parameter) and tested it there as well (what's
> > the convention to avoid shadowing builtins? gone with close_ for now).
>
> yes the single namespace (for types, modules and variables and builtins) plus
> the convention for protected/public makes for clunky clashes sometimes, I
> haven't seen deeper recommendations than to inflict the most clunkiness on
> variable naming; doClose|closeFunc though perhaps

went with using the noun forms.

> I was meaning this, to check that run() is running is a goroutine, given that
> the MsgCh is also unbuffered it doesn't matter for now (but this is more
> independent of the details of ErrCh/MsgCh)

now I got it. And stole your patch :)

Revision history for this message
John Lenton (chipaca) wrote :

> there's a data race in TestDialsWorks:
>
> https://pastebin.canonical.com/104078/
>
> I fear the stuff in condition may need Mutexes or be channel based more

Yup. Done.

Revision history for this message
John Lenton (chipaca) wrote :

> 74 + ch := make(chan bool, 1)
> 75 + err = sess.run(
> 76 + func() { ch <- true },
>
> close isn't called in a goroutine so a flag would work there as well

and the code is more legible as a result. Done.

Revision history for this message
Samuele Pedroni (pedronis) wrote :

thanks for bearing with me

looks good !

review: Approve
42. By John Lenton

[r=pedronis] Ladies and gentlemen, the client session.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'client/session/session.go'
2--- client/session/session.go 2014-01-31 21:47:15 +0000
3+++ client/session/session.go 2014-02-01 21:04:46 +0000
4@@ -196,3 +196,31 @@
5 sess.Log.Debugf("Connected %v.", conn.LocalAddr())
6 return nil
7 }
8+
9+// run calls connect, and if it works it calls start, and if it works
10+// it runs loop in a goroutine, and ships its return value over ErrCh.
11+func (sess *ClientSession) run(closer func(), connecter, starter, looper func() error) error {
12+ closer()
13+ err := connecter()
14+ if err == nil {
15+ err = starter()
16+ if err == nil {
17+ sess.ErrCh = make(chan error, 1)
18+ sess.MsgCh = make(chan *Notification)
19+ go func() { sess.ErrCh <- looper() }()
20+ }
21+ }
22+ return err
23+}
24+
25+// Dial takes the session from newly created (or newly disconnected)
26+// to running the main loop.
27+func (sess *ClientSession) Dial() error {
28+ if sess.Protocolator == nil {
29+ // a missing protocolator means you've willfully overridden
30+ // it; returning an error here would prompt AutoRedial to just
31+ // keep on trying.
32+ panic("can't Dial() without a protocol constructor.")
33+ }
34+ return sess.run(sess.Close, sess.connect, sess.start, sess.loop)
35+}
36
37=== modified file 'client/session/session_test.go'
38--- client/session/session_test.go 2014-01-31 21:47:15 +0000
39+++ client/session/session_test.go 2014-02-01 21:04:46 +0000
40@@ -17,6 +17,7 @@
41 package session
42
43 import (
44+ "crypto/tls"
45 "encoding/json"
46 "errors"
47 "fmt"
48@@ -39,7 +40,8 @@
49 type clientSessionSuite struct{}
50
51 var nullog = logger.NewSimpleLogger(ioutil.Discard, "error")
52-var debuglog = logger.NewSimpleLogger(os.Stderr, "debug")
53+var noisylog = logger.NewSimpleLogger(os.Stderr, "debug")
54+var debuglog = nullog
55 var _ = Suite(&clientSessionSuite{})
56
57 //
58@@ -542,5 +544,159 @@
59 }
60 // start is now done.
61 err = <-errCh
62- c.Assert(err, IsNil)
63+ c.Check(err, IsNil)
64+}
65+
66+/****************************************************************
67+ run() tests
68+****************************************************************/
69+
70+func (cs *clientSessionSuite) TestRunBailsIfConnectFails(c *C) {
71+ sess, err := NewSession("", nil, 0, "wah", debuglog)
72+ c.Assert(err, IsNil)
73+ failure := errors.New("TestRunBailsIfConnectFails")
74+ has_closed := false
75+ err = sess.run(
76+ func() { has_closed = true },
77+ func() error { return failure },
78+ nil,
79+ nil)
80+ c.Check(err, Equals, failure)
81+ c.Check(has_closed, Equals, true)
82+}
83+
84+func (cs *clientSessionSuite) TestRunBailsIfStartFails(c *C) {
85+ sess, err := NewSession("", nil, 0, "wah", debuglog)
86+ c.Assert(err, IsNil)
87+ failure := errors.New("TestRunBailsIfStartFails")
88+ err = sess.run(
89+ func() {},
90+ func() error { return nil },
91+ func() error { return failure },
92+ nil)
93+ c.Check(err, Equals, failure)
94+}
95+
96+func (cs *clientSessionSuite) TestRunRunsEvenIfLoopFails(c *C) {
97+ sess, err := NewSession("", nil, 0, "wah", debuglog)
98+ c.Assert(err, IsNil)
99+ // just to make a point: until here we haven't set ErrCh & MsgCh (no
100+ // biggie if this stops being true)
101+ c.Check(sess.ErrCh, IsNil)
102+ c.Check(sess.MsgCh, IsNil)
103+ failureCh := make(chan error) // must be unbuffered
104+ notf := &Notification{}
105+ err = sess.run(
106+ func() {},
107+ func() error { return nil },
108+ func() error { return nil },
109+ func() error { sess.MsgCh <- notf; return <-failureCh })
110+ c.Check(err, Equals, nil)
111+ // if run doesn't error it sets up the channels
112+ c.Assert(sess.ErrCh, NotNil)
113+ c.Assert(sess.MsgCh, NotNil)
114+ c.Check(<-sess.MsgCh, Equals, notf)
115+ failure := errors.New("TestRunRunsEvenIfLoopFails")
116+ failureCh <- failure
117+ c.Check(<-sess.ErrCh, Equals, failure)
118+ // so now you know it was running in a goroutine :)
119+}
120+
121+/****************************************************************
122+ Dial() tests
123+****************************************************************/
124+
125+func (cs *clientSessionSuite) TestDialPanics(c *C) {
126+ // one last unhappy test
127+ sess, err := NewSession("", nil, 0, "wah", debuglog)
128+ c.Assert(err, IsNil)
129+ sess.Protocolator = nil
130+ c.Check(sess.Dial, PanicMatches, ".*protocol constructor.")
131+}
132+
133+func (cs *clientSessionSuite) TestDialWorks(c *C) {
134+ // happy path thoughts
135+ cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock)
136+ c.Assert(err, IsNil)
137+ tlsCfg := &tls.Config{
138+ Certificates: []tls.Certificate{cert},
139+ SessionTicketsDisabled: true,
140+ }
141+
142+ timeout := 100 * time.Millisecond
143+ lst, err := tls.Listen("tcp", "localhost:0", tlsCfg)
144+ c.Assert(err, IsNil)
145+ sess, err := NewSession(lst.Addr().String(), nil, timeout, "wah", debuglog)
146+ c.Assert(err, IsNil)
147+ tconn := &testConn{CloseCondition: condition.Fail2Work(10)}
148+ sess.Connection = tconn
149+ // just to be sure:
150+ c.Check(tconn.CloseCondition.String(), Matches, ".* 10 to go.")
151+
152+ upCh := make(chan interface{}, 5)
153+ downCh := make(chan interface{}, 5)
154+ proto := &testProtocol{up: upCh, down: downCh}
155+ sess.Protocolator = func(net.Conn) protocol.Protocol { return proto }
156+
157+ go sess.Dial()
158+
159+ srv, err := lst.Accept()
160+ c.Assert(err, IsNil)
161+
162+ // connect done
163+
164+ // Dial should have had the session's old connection (tconn) closed
165+ // before connecting a new one; if that was done, tconn's condition
166+ // ticked forward:
167+ c.Check(tconn.CloseCondition.String(), Matches, ".* 9 to go.")
168+
169+ // now, start: 1. protocol version
170+ v, err := protocol.ReadWireFormatVersion(srv, timeout)
171+ c.Assert(err, IsNil)
172+ c.Assert(v, Equals, protocol.ProtocolWireVersion)
173+
174+ // 2. "connect" (but on the fake protcol above! woo)
175+
176+ c.Check(takeNext(downCh), Equals, "deadline 100ms")
177+ _, ok := takeNext(downCh).(protocol.ConnectMsg)
178+ c.Check(ok, Equals, true)
179+ upCh <- nil // no error
180+ upCh <- protocol.ConnAckMsg{
181+ Type: "connack",
182+ Params: protocol.ConnAckParams{(10 * time.Millisecond).String()},
183+ }
184+ // start is now done.
185+
186+ // 3. "loop"
187+
188+ // ping works,
189+ c.Check(takeNext(downCh), Equals, "deadline 110ms")
190+ upCh <- protocol.PingPongMsg{Type: "ping"}
191+ c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})
192+ upCh <- nil
193+
194+ // and broadcasts...
195+ b := &protocol.BroadcastMsg{
196+ Type: "broadcast",
197+ AppId: "--ignored--",
198+ ChanId: "0",
199+ TopLevel: 2,
200+ Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
201+ }
202+ c.Check(takeNext(downCh), Equals, "deadline 110ms")
203+ upCh <- b
204+ c.Check(takeNext(downCh), Equals, protocol.AckMsg{"ack"})
205+ upCh <- nil
206+ // ...get bubbled up,
207+ c.Check(<-sess.MsgCh, NotNil)
208+ // and their TopLevel remembered
209+ c.Check(sess.Levels.GetAll(), DeepEquals, map[string]int64{"0": 2})
210+
211+ // and ping still work even after that.
212+ c.Check(takeNext(downCh), Equals, "deadline 110ms")
213+ upCh <- protocol.PingPongMsg{Type: "ping"}
214+ c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})
215+ failure := errors.New("pongs")
216+ upCh <- failure
217+ c.Check(<-sess.ErrCh, Equals, failure)
218 }
219
220=== modified file 'testing/condition/condition.go'
221--- testing/condition/condition.go 2014-01-20 06:23:17 +0000
222+++ testing/condition/condition.go 2014-02-01 21:04:46 +0000
223@@ -20,6 +20,7 @@
224 import (
225 "fmt"
226 "strings"
227+ "sync"
228 )
229
230 type Interface interface {
231@@ -62,9 +63,12 @@
232
233 type fail2Work struct {
234 Left int32
235+ lock sync.RWMutex
236 }
237
238 func (c *fail2Work) OK() bool {
239+ c.lock.Lock()
240+ defer c.lock.Unlock()
241 if c.Left > 0 {
242 c.Left--
243 return false
244@@ -74,6 +78,8 @@
245 }
246
247 func (c *fail2Work) String() string {
248+ c.lock.RLock()
249+ defer c.lock.RUnlock()
250 if c.Left > 0 {
251 return fmt.Sprintf("Still Broken, %d to go.", c.Left)
252 } else {
253@@ -104,10 +110,13 @@
254
255 type chain struct {
256 subs []*_iter
257+ lock sync.RWMutex
258 }
259
260 func (c *chain) OK() bool {
261 var sub *_iter
262+ c.lock.Lock()
263+ defer c.lock.Unlock()
264 for _, sub = range c.subs {
265 if sub.remaining > 0 {
266 sub.remaining--
267@@ -119,6 +128,8 @@
268
269 func (c *chain) String() string {
270 ss := make([]string, len(c.subs))
271+ c.lock.RLock()
272+ defer c.lock.RUnlock()
273 for i, sub := range c.subs {
274 ss[i] = sub.String()
275 }
276@@ -138,5 +149,5 @@
277 args = args[2:]
278 }
279
280- return &chain{iters}
281+ return &chain{subs: iters}
282 }

Subscribers

People subscribed via source and target branches