Merge lp:~chipaca/ubuntu-push/client-v0 into lp:ubuntu-push

Proposed by John Lenton
Status: Rejected
Rejected by: John Lenton
Proposed branch: lp:~chipaca/ubuntu-push/client-v0
Merge into: lp:ubuntu-push
Prerequisite: lp:~chipaca/ubuntu-push/redialer
Diff against target: 824 lines (+801/-1)
3 files modified
client/session/session.go (+214/-0)
client/session/session_test.go (+586/-0)
protocol/protocol.go (+1/-1)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/client-v0
Reviewer Review Type Date Requested Status
Samuele Pedroni Needs Fixing
Review via email: mp+203198@code.launchpad.net

Commit message

The client (session), v0.

Description of the change

The client (session), v0.

A lot of comonality with some of the testing bits of protocol and
server/session; later, let's see if we can refactor.

To post a comment you must log in.
lp:~chipaca/ubuntu-push/client-v0 updated
25. By John Lenton

a few cleanups

26. By John Lenton

Moving it into client/session because that's what it is.

Revision history for this message
Samuele Pedroni (pedronis) wrote :

I think it would be saner for the testing if Config was an interface and not the concrete thing, I don't think it's the task of the session to read pem files.

same for run, it should be split in a start and a loop so that tests can poke at simpler bits

review: Needs Fixing
Revision history for this message
Samuele Pedroni (pedronis) wrote :

probably run can be split even more,

- in start/setup

- get next message

- acting

- reply

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added directory 'client'
2=== added directory 'client/session'
3=== added file 'client/session/session.go'
4--- client/session/session.go 1970-01-01 00:00:00 +0000
5+++ client/session/session.go 2014-01-25 02:14:54 +0000
6@@ -0,0 +1,214 @@
7+/*
8+ Copyright 2013-2014 Canonical Ltd.
9+
10+ This program is free software: you can redistribute it and/or modify it
11+ under the terms of the GNU General Public License version 3, as published
12+ by the Free Software Foundation.
13+
14+ This program is distributed in the hope that it will be useful, but
15+ WITHOUT ANY WARRANTY; without even the implied warranties of
16+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
17+ PURPOSE. See the GNU General Public License for more details.
18+
19+ You should have received a copy of the GNU General Public License along
20+ with this program. If not, see <http://www.gnu.org/licenses/>.
21+*/
22+
23+package session
24+
25+import (
26+ "crypto/tls"
27+ "crypto/x509"
28+ "errors"
29+ "io/ioutil"
30+ "launchpad.net/ubuntu-push/config"
31+ "launchpad.net/ubuntu-push/logger"
32+ "launchpad.net/ubuntu-push/protocol"
33+ "net"
34+ "time"
35+)
36+
37+var wireVersionBytes = []byte{protocol.ProtocolWireVersion}
38+
39+type Notification struct {
40+ // something something something
41+}
42+
43+type LevelMap interface {
44+ Set(level string, top int64)
45+ GetAll() map[string]int64
46+}
47+
48+type mapLevelMap map[string]int64
49+
50+func (m *mapLevelMap) Set(level string, top int64) {
51+ (*m)[level] = top
52+}
53+func (m *mapLevelMap) GetAll() map[string]int64 {
54+ return map[string]int64(*m)
55+}
56+
57+var _ LevelMap = &mapLevelMap{}
58+
59+type Config struct {
60+ // session configuration
61+ ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`
62+ // server connection config
63+ Addr config.ConfigHostPort
64+ CertPEMFile string `json:"cert_pem_file"`
65+}
66+
67+// ClienSession holds a client<->server session and its configuration.
68+type ClientSession struct {
69+ // configuration
70+ DeviceId string
71+ ServerAddr string
72+ ExchangeTimeout time.Duration
73+ Levels LevelMap
74+ // connection
75+ Connection net.Conn
76+ Protocolator func(net.Conn) protocol.Protocol
77+ Log logger.Logger
78+ TLS *tls.Config
79+ // status
80+ ErrCh chan error
81+ MsgCh chan *Notification
82+}
83+
84+func NewSession(config Config, log logger.Logger, deviceId string) (*ClientSession, error) {
85+ sess := &ClientSession{
86+ ExchangeTimeout: config.ExchangeTimeout.TimeDuration(),
87+ ServerAddr: config.Addr.HostPort(),
88+ DeviceId: deviceId,
89+ Log: log,
90+ Protocolator: protocol.NewProtocol0,
91+ Levels: &mapLevelMap{},
92+ TLS: &tls.Config{InsecureSkipVerify: true}, // XXX
93+ }
94+ if config.CertPEMFile != "" {
95+ cert, err := ioutil.ReadFile(config.CertPEMFile)
96+ if err != nil {
97+ return nil, err
98+ }
99+ cp := x509.NewCertPool()
100+ ok := cp.AppendCertsFromPEM(cert)
101+ if !ok {
102+ return nil, errors.New("dial: could not parse certificate")
103+ }
104+ sess.TLS.RootCAs = cp
105+ }
106+ return sess, nil
107+}
108+
109+// Dial connects to a server using the configuration in the ClientSession
110+// and sets up the connection.
111+func (sess *ClientSession) Dial() error {
112+ conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout)
113+ if err != nil {
114+ return err
115+ }
116+ sess.Connection = tls.Client(conn, sess.TLS)
117+ return nil
118+}
119+
120+type serverMsg struct {
121+ Type string `json:"T"`
122+ protocol.BroadcastMsg
123+ protocol.NotificationsMsg
124+}
125+
126+func (sess *ClientSession) Reset() error {
127+ if sess.Protocolator == nil {
128+ return errors.New("Can't Reset() without a protocol constructor.")
129+ }
130+ if sess.Connection != nil {
131+ sess.Connection.Close() // just in case
132+ }
133+ err := sess.Dial()
134+ if err != nil {
135+ sess.Log.Errorf("%s", err)
136+ return err
137+ }
138+ sess.ErrCh = make(chan error, 1)
139+ sess.MsgCh = make(chan *Notification)
140+ sess.Run()
141+ return nil
142+}
143+
144+func (sess *ClientSession) Run() {
145+ go func() { sess.ErrCh <- sess.run() }()
146+}
147+
148+// Run the session with the server, emits a stream of events.
149+func (sess *ClientSession) run() error {
150+ conn := sess.Connection
151+ if conn == nil {
152+ return errors.New("Can't run() disconnected.")
153+ }
154+ if sess.Protocolator == nil {
155+ return errors.New("Can't run() without a protocol constructor.")
156+ }
157+ defer conn.Close()
158+ err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
159+ if err != nil {
160+ return err
161+ }
162+ _, err = conn.Write(wireVersionBytes)
163+ // The Writer docs: Write must return a non-nil error if it returns
164+ // n < len(p). So, no need to check number of bytes written, hooray.
165+ if err != nil {
166+ return err
167+ }
168+ proto := sess.Protocolator(conn)
169+ err = proto.WriteMessage(protocol.ConnectMsg{
170+ Type: "connect",
171+ DeviceId: sess.DeviceId,
172+ Levels: sess.Levels.GetAll(),
173+ })
174+ if err != nil {
175+ return err
176+ }
177+ var connAck protocol.ConnAckMsg
178+ err = proto.ReadMessage(&connAck)
179+ if err != nil {
180+ return err
181+ }
182+ pingInterval, err := time.ParseDuration(connAck.Params.PingInterval)
183+ if err != nil {
184+ return err
185+ }
186+ sess.Log.Debugf("Connected %v.", conn.LocalAddr())
187+ var recv serverMsg
188+ for {
189+ deadAfter := pingInterval + sess.ExchangeTimeout
190+ conn.SetDeadline(time.Now().Add(deadAfter))
191+ err = proto.ReadMessage(&recv)
192+ if err != nil {
193+ return err
194+ }
195+ switch recv.Type {
196+ case "ping":
197+ conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
198+ err := proto.WriteMessage(protocol.PingPongMsg{Type: "pong"})
199+ if err != nil {
200+ return err
201+ }
202+ sess.Log.Debugf("Ping.")
203+ case "broadcast":
204+ conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
205+ err := proto.WriteMessage(protocol.PingPongMsg{Type: "ack"})
206+ if err != nil {
207+ return err
208+ }
209+ sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s",
210+ recv.ChanId, recv.AppId, recv.TopLevel, recv.Payloads)
211+ if recv.ChanId == protocol.SystemChannelId {
212+ // the system channel id, the only one we care about for now
213+ sess.Levels.Set(recv.ChanId, recv.TopLevel)
214+ sess.MsgCh <- &Notification{}
215+ } else {
216+ sess.Log.Debugf("What is this weird channel, %s?", recv.ChanId)
217+ }
218+ }
219+ }
220+}
221
222=== added file 'client/session/session_test.go'
223--- client/session/session_test.go 1970-01-01 00:00:00 +0000
224+++ client/session/session_test.go 2014-01-25 02:14:54 +0000
225@@ -0,0 +1,586 @@
226+/*
227+ Copyright 2013-2014 Canonical Ltd.
228+
229+ This program is free software: you can redistribute it and/or modify it
230+ under the terms of the GNU General Public License version 3, as published
231+ by the Free Software Foundation.
232+
233+ This program is distributed in the hope that it will be useful, but
234+ WITHOUT ANY WARRANTY; without even the implied warranties of
235+ MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
236+ PURPOSE. See the GNU General Public License for more details.
237+
238+ You should have received a copy of the GNU General Public License along
239+ with this program. If not, see <http://www.gnu.org/licenses/>.
240+*/
241+
242+package session
243+
244+import (
245+ "encoding/json"
246+ "errors"
247+ "fmt"
248+ "io"
249+ "io/ioutil"
250+ . "launchpad.net/gocheck"
251+ "launchpad.net/ubuntu-push/logger"
252+ "launchpad.net/ubuntu-push/protocol"
253+ helpers "launchpad.net/ubuntu-push/testing"
254+ "launchpad.net/ubuntu-push/testing/condition"
255+ "net"
256+ "os"
257+ "reflect"
258+ "runtime"
259+ "strings"
260+ "testing"
261+ "time"
262+)
263+
264+func TestSession(t *testing.T) { TestingT(t) }
265+
266+type clientSessionSuite struct{}
267+
268+var nullog = logger.NewSimpleLogger(ioutil.Discard, "error")
269+var debuglog = logger.NewSimpleLogger(os.Stderr, "debug")
270+var _ = Suite(&clientSessionSuite{})
271+
272+/****************************************************************
273+ NewSession() tests
274+****************************************************************/
275+
276+func (cs *clientSessionSuite) TestNewSessionPlainWorks(c *C) {
277+ cfg := Config{}
278+ sess, err := NewSession(cfg, nullog, "wah")
279+ c.Check(sess, NotNil)
280+ c.Check(err, IsNil)
281+}
282+
283+var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert")
284+
285+func (cs *clientSessionSuite) TestNewSessionPEMWorks(c *C) {
286+ cfg := Config{CertPEMFile: certfile}
287+ sess, err := NewSession(cfg, nullog, "wah")
288+ c.Check(sess, NotNil)
289+ c.Assert(err, IsNil)
290+ c.Check(sess.TLS.RootCAs, NotNil)
291+}
292+
293+func (cs *clientSessionSuite) TestNewSessionBadPEMFilePathFails(c *C) {
294+ cfg := Config{CertPEMFile: "/no/such/path"}
295+ sess, err := NewSession(cfg, nullog, "wah")
296+ c.Check(sess, IsNil)
297+ c.Check(err, NotNil)
298+}
299+
300+func (cs *clientSessionSuite) TestNewSessionBadPEMFileContentFails(c *C) {
301+ cfg := Config{CertPEMFile: "/etc/passwd"}
302+ sess, err := NewSession(cfg, nullog, "wah")
303+ c.Check(sess, IsNil)
304+ c.Check(err, NotNil)
305+}
306+
307+/****************************************************************
308+ Run() tests
309+****************************************************************/
310+
311+func testname() string {
312+ pcs := make([]uintptr, 200)
313+ runtime.Callers(0, pcs)
314+ testname := "<unknown>"
315+ for _, pc := range pcs {
316+ me := runtime.FuncForPC(pc)
317+ if me == nil {
318+ break
319+ }
320+ parts := strings.Split(me.Name(), ".")
321+ funcname := parts[len(parts)-1]
322+ if strings.HasPrefix(funcname, "Test") {
323+ testname = funcname
324+ }
325+ }
326+ return testname
327+}
328+
329+type xAddr string
330+
331+func (x xAddr) Network() string { return "<:>" }
332+func (x xAddr) String() string { return string(x) }
333+
334+// testConn (roughly based on the one in protocol_test)
335+
336+type testConn struct {
337+ Name string
338+ Deadlines []time.Duration
339+ Writes [][]byte
340+ WriteCondition condition.Interface
341+ DeadlineCondition condition.Interface
342+}
343+
344+func (tc *testConn) LocalAddr() net.Addr { return xAddr(tc.Name) }
345+
346+func (tc *testConn) RemoteAddr() net.Addr { return xAddr(tc.Name) }
347+
348+func (tc *testConn) Close() error { return nil }
349+
350+func (tc *testConn) SetDeadline(t time.Time) error {
351+ tc.Deadlines = append(tc.Deadlines, t.Sub(time.Now()))
352+ if tc.DeadlineCondition == nil || tc.DeadlineCondition.OK() {
353+ return nil
354+ } else {
355+ return errors.New("deadliner on fire")
356+ }
357+}
358+
359+func (tc *testConn) SetReadDeadline(t time.Time) error { panic("NIH"); return nil }
360+func (tc *testConn) SetWriteDeadline(t time.Time) error { panic("NIH"); return nil }
361+func (tc *testConn) Read(buf []byte) (n int, err error) { panic("NIH"); return -1, nil }
362+
363+func (tc *testConn) Write(buf []byte) (int, error) {
364+ store := make([]byte, len(buf))
365+ copy(store, buf)
366+ tc.Writes = append(tc.Writes, store)
367+ if tc.WriteCondition == nil || tc.WriteCondition.OK() {
368+ return len(store), nil
369+ } else {
370+ return -1, errors.New("writer on fire")
371+ }
372+}
373+
374+// test protocol (from session_test)
375+
376+type testProtocol struct {
377+ up chan interface{}
378+ down chan interface{}
379+}
380+
381+// takeNext takes a value from given channel with a 5s timeout
382+func takeNext(ch <-chan interface{}) interface{} {
383+ select {
384+ case <-time.After(5 * time.Second):
385+ panic("test protocol exchange stuck: too long waiting")
386+ case v := <-ch:
387+ return v
388+ }
389+ return nil
390+}
391+
392+func (c *testProtocol) SetDeadline(t time.Time) {
393+ deadAfter := t.Sub(time.Now())
394+ deadAfter = (deadAfter + time.Millisecond/2) / time.Millisecond * time.Millisecond
395+ c.down <- fmt.Sprintf("deadline %v", deadAfter)
396+}
397+
398+func (c *testProtocol) ReadMessage(dest interface{}) error {
399+ switch v := takeNext(c.up).(type) {
400+ case error:
401+ return v
402+ default:
403+ // make sure JSON.Unmarshal works with dest
404+ var marshalledMsg []byte
405+ marshalledMsg, err := json.Marshal(v)
406+ if err != nil {
407+ return fmt.Errorf("can't jsonify test value %v: %s", v, err)
408+ }
409+ return json.Unmarshal(marshalledMsg, dest)
410+ }
411+ return nil
412+}
413+
414+func (c *testProtocol) WriteMessage(src interface{}) error {
415+ // make sure JSON.Marshal works with src
416+ _, err := json.Marshal(src)
417+ if err != nil {
418+ return err
419+ }
420+ val := reflect.ValueOf(src)
421+ if val.Kind() == reflect.Ptr {
422+ src = val.Elem().Interface()
423+ }
424+ c.down <- src
425+ switch v := takeNext(c.up).(type) {
426+ case error:
427+ return v
428+ }
429+ return nil
430+}
431+
432+/****************************************************************
433+ *
434+ * Go way down to the bottom if you want to see the full, working case.
435+ * This has a rather slow buildup.
436+ *
437+ * TODO: check deadlines
438+ *
439+ ****************************************************************/
440+
441+func (cs *clientSessionSuite) TestRunFailsIfNilConnection(c *C) {
442+ sess, err := NewSession(Config{}, debuglog, "wah")
443+ c.Assert(err, IsNil)
444+ // not connected!
445+ err = sess.run()
446+ c.Assert(err, NotNil)
447+ c.Check(err.Error(), Matches, ".*disconnected.*")
448+}
449+
450+func (cs *clientSessionSuite) TestRunFailsIfNilProtocolator(c *C) {
451+ sess, err := NewSession(Config{}, debuglog, "wah")
452+ c.Assert(err, IsNil)
453+ sess.Connection = &testConn{Name: testname()} // ok, have a constructor
454+ sess.Protocolator = nil // but no protocol, seeficare.
455+ err = sess.run()
456+ c.Assert(err, NotNil)
457+ c.Check(err.Error(), Matches, ".*protocol constructor.*")
458+}
459+
460+func (cs *clientSessionSuite) TestRunFailsIfSetDeadlineFails(c *C) {
461+ sess, err := NewSession(Config{}, debuglog, "wah")
462+ c.Assert(err, IsNil)
463+ sess.Connection = &testConn{Name: testname(),
464+ DeadlineCondition: condition.Work(false)} // setdeadline will fail
465+ err = sess.run()
466+ c.Assert(err, NotNil)
467+ c.Check(err.Error(), Matches, ".*deadline.*")
468+}
469+
470+func (cs *clientSessionSuite) TestRunFailsIfWriteFails(c *C) {
471+ sess, err := NewSession(Config{}, debuglog, "wah")
472+ c.Assert(err, IsNil)
473+ sess.Connection = &testConn{Name: testname(),
474+ WriteCondition: condition.Work(false)} // write will fail
475+ err = sess.run()
476+ c.Assert(err, NotNil)
477+ c.Check(err.Error(), Matches, ".*write.*")
478+}
479+
480+func (cs *clientSessionSuite) TestRunConnectMessageFails(c *C) {
481+ sess, err := NewSession(Config{}, debuglog, "wah")
482+ c.Assert(err, IsNil)
483+ sess.Connection = &testConn{Name: testname()}
484+ errCh := make(chan error, 1)
485+ upCh := make(chan interface{}, 5)
486+ downCh := make(chan interface{}, 5)
487+ proto := &testProtocol{up: upCh, down: downCh}
488+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
489+
490+ go func() {
491+ errCh <- sess.run()
492+ }()
493+
494+ c.Check(takeNext(downCh), DeepEquals, protocol.ConnectMsg{
495+ Type: "connect",
496+ DeviceId: sess.DeviceId,
497+ Levels: map[string]int64{},
498+ })
499+ upCh <- errors.New("Overflow error in /dev/null")
500+ err = <-errCh
501+ c.Assert(err, NotNil)
502+ c.Check(err.Error(), Matches, "Overflow.*null")
503+}
504+
505+func (cs *clientSessionSuite) TestRunConnackReadError(c *C) {
506+ sess, err := NewSession(Config{}, debuglog, "wah")
507+ c.Assert(err, IsNil)
508+ sess.Connection = &testConn{Name: testname()}
509+ errCh := make(chan error, 1)
510+ upCh := make(chan interface{}, 5)
511+ downCh := make(chan interface{}, 5)
512+ proto := &testProtocol{up: upCh, down: downCh}
513+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
514+
515+ go func() {
516+ errCh <- sess.run()
517+ }()
518+
519+ takeNext(downCh) // connectMsg
520+ upCh <- nil // no error
521+ upCh <- io.EOF
522+ err = <-errCh
523+ c.Assert(err, NotNil)
524+ c.Check(err.Error(), Matches, ".*EOF.*")
525+}
526+
527+func (cs *clientSessionSuite) TestRunBadConnack(c *C) {
528+ sess, err := NewSession(Config{}, debuglog, "wah")
529+ c.Assert(err, IsNil)
530+ sess.Connection = &testConn{Name: testname()}
531+ errCh := make(chan error, 1)
532+ upCh := make(chan interface{}, 5)
533+ downCh := make(chan interface{}, 5)
534+ proto := &testProtocol{up: upCh, down: downCh}
535+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
536+
537+ go func() {
538+ errCh <- sess.run()
539+ }()
540+
541+ takeNext(downCh) // connectMsg
542+ upCh <- nil // no error
543+ upCh <- protocol.ConnAckMsg{}
544+ err = <-errCh
545+ c.Assert(err, NotNil)
546+ c.Check(err.Error(), Matches, ".*invalid.*")
547+}
548+
549+func (cs *clientSessionSuite) TestRunMainloopReadError(c *C) {
550+ sess, err := NewSession(Config{}, debuglog, "wah")
551+ c.Assert(err, IsNil)
552+ sess.Connection = &testConn{Name: testname()}
553+ errCh := make(chan error, 1)
554+ upCh := make(chan interface{}, 5)
555+ downCh := make(chan interface{}, 5)
556+ proto := &testProtocol{up: upCh, down: downCh}
557+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
558+
559+ go func() {
560+ errCh <- sess.run()
561+ }()
562+
563+ takeNext(downCh) // connectMsg
564+ upCh <- nil // no error
565+ upCh <- protocol.ConnAckMsg{
566+ Type: "connack",
567+ Params: protocol.ConnAckParams{(10 * time.Millisecond).String()},
568+ }
569+ // in the mainloop!
570+ upCh <- errors.New("Read")
571+ err = <-errCh
572+ c.Assert(err, NotNil)
573+ c.Check(err.Error(), Equals, "Read")
574+}
575+
576+func (cs *clientSessionSuite) TestRunPongWriteError(c *C) {
577+ sess, err := NewSession(Config{}, debuglog, "wah")
578+ c.Assert(err, IsNil)
579+ sess.Connection = &testConn{Name: testname()}
580+ errCh := make(chan error, 1)
581+ upCh := make(chan interface{}, 5)
582+ downCh := make(chan interface{}, 5)
583+ proto := &testProtocol{up: upCh, down: downCh}
584+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
585+
586+ go func() {
587+ errCh <- sess.run()
588+ }()
589+
590+ takeNext(downCh) // connectMsg
591+ upCh <- nil // no error
592+ upCh <- protocol.ConnAckMsg{
593+ Type: "connack",
594+ Params: protocol.ConnAckParams{(10 * time.Millisecond).String()},
595+ }
596+ // in the mainloop!
597+ upCh <- protocol.PingPongMsg{Type: "ping"}
598+ c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})
599+ upCh <- errors.New("Pong")
600+ err = <-errCh
601+ c.Assert(err, NotNil)
602+ c.Check(err.Error(), Equals, "Pong")
603+}
604+
605+func (cs *clientSessionSuite) TestRunPingPong(c *C) {
606+ sess, err := NewSession(Config{}, debuglog, "wah")
607+ c.Assert(err, IsNil)
608+ sess.Connection = &testConn{Name: testname()}
609+ errCh := make(chan error, 1)
610+ upCh := make(chan interface{}, 5)
611+ downCh := make(chan interface{}, 5)
612+ proto := &testProtocol{up: upCh, down: downCh}
613+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
614+
615+ go func() {
616+ errCh <- sess.run()
617+ }()
618+
619+ takeNext(downCh) // connectMsg
620+ upCh <- nil // no error
621+ upCh <- protocol.ConnAckMsg{
622+ Type: "connack",
623+ Params: protocol.ConnAckParams{(10 * time.Millisecond).String()},
624+ }
625+ // in the mainloop!
626+ upCh <- protocol.PingPongMsg{Type: "ping"}
627+ c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})
628+ upCh <- nil // pong ok
629+ upCh <- io.EOF // close it down
630+ err = <-errCh
631+}
632+
633+func (cs *clientSessionSuite) TestRunBadAckWrite(c *C) {
634+ sess, err := NewSession(Config{}, debuglog, "wah")
635+ c.Assert(err, IsNil)
636+ sess.Connection = &testConn{Name: testname()}
637+ errCh := make(chan error, 1)
638+ upCh := make(chan interface{}, 5)
639+ downCh := make(chan interface{}, 5)
640+ proto := &testProtocol{up: upCh, down: downCh}
641+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
642+ sess.MsgCh = make(chan *Notification, 5)
643+
644+ go func() {
645+ errCh <- sess.run()
646+ }()
647+
648+ takeNext(downCh) // connectMsg
649+ upCh <- nil // no error
650+ upCh <- protocol.ConnAckMsg{
651+ Type: "connack",
652+ Params: protocol.ConnAckParams{time.Second.String()},
653+ }
654+ // in the mainloop!
655+
656+ b := &protocol.BroadcastMsg{
657+ Type: "broadcast",
658+ AppId: "APP",
659+ ChanId: "0",
660+ TopLevel: 2,
661+ Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
662+ }
663+ upCh <- b
664+ c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "ack"})
665+ upCh <- errors.New("ACK ACK ACK")
666+ err = <-errCh
667+ c.Assert(err, NotNil)
668+ c.Check(err.Error(), Equals, "ACK ACK ACK")
669+}
670+
671+func (cs *clientSessionSuite) TestRunBroadcastWrongChannel(c *C) {
672+ sess, err := NewSession(Config{}, debuglog, "wah")
673+ c.Assert(err, IsNil)
674+ sess.Connection = &testConn{Name: testname()}
675+ errCh := make(chan error, 1)
676+ upCh := make(chan interface{}, 5)
677+ downCh := make(chan interface{}, 5)
678+ proto := &testProtocol{up: upCh, down: downCh}
679+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
680+ sess.MsgCh = make(chan *Notification, 5)
681+
682+ go func() {
683+ errCh <- sess.run()
684+ }()
685+
686+ takeNext(downCh) // connectMsg
687+ upCh <- nil // no error
688+ upCh <- protocol.ConnAckMsg{
689+ Type: "connack",
690+ Params: protocol.ConnAckParams{time.Second.String()},
691+ }
692+ // in the mainloop!
693+
694+ b := &protocol.BroadcastMsg{
695+ Type: "broadcast",
696+ AppId: "APP",
697+ ChanId: "42",
698+ TopLevel: 2,
699+ Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
700+ }
701+ upCh <- b
702+ c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "ack"})
703+ upCh <- nil // ack ok
704+ upCh <- io.EOF // close it down
705+ err = <-errCh
706+ c.Check(len(sess.MsgCh), Equals, 0)
707+}
708+
709+func (cs *clientSessionSuite) TestRunBroadcastRightChannel(c *C) {
710+ sess, err := NewSession(Config{}, debuglog, "wah")
711+ c.Assert(err, IsNil)
712+ sess.Connection = &testConn{Name: testname()}
713+ sess.ErrCh = make(chan error, 1)
714+ upCh := make(chan interface{}, 5)
715+ downCh := make(chan interface{}, 5)
716+ proto := &testProtocol{up: upCh, down: downCh}
717+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
718+ sess.MsgCh = make(chan *Notification, 5)
719+
720+ sess.Run()
721+
722+ takeNext(downCh) // connectMsg
723+ upCh <- nil // no error
724+ upCh <- protocol.ConnAckMsg{
725+ Type: "connack",
726+ Params: protocol.ConnAckParams{time.Second.String()},
727+ }
728+ // in the mainloop!
729+
730+ b := &protocol.BroadcastMsg{
731+ Type: "broadcast",
732+ AppId: "--ignored--",
733+ ChanId: "0",
734+ TopLevel: 2,
735+ Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
736+ }
737+ upCh <- b
738+ c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "ack"})
739+ upCh <- nil // ack ok
740+ upCh <- io.EOF // close it down
741+ err = <-sess.ErrCh
742+ c.Assert(len(sess.MsgCh), Equals, 1)
743+ c.Check(<-sess.MsgCh, Equals, &Notification{})
744+ // and finally, the session keeps track of the levels
745+ c.Check(sess.Levels.GetAll(), DeepEquals, map[string]int64{"0": 2})
746+}
747+
748+/*
749+ *
750+ *
751+ *
752+ * breathe in...
753+ */
754+
755+func (cs *clientSessionSuite) TestDialFailsWithNoAddress(c *C) {
756+ sess, err := NewSession(Config{}, debuglog, "wah")
757+ c.Assert(err, IsNil)
758+ err = sess.Dial()
759+ c.Assert(err, NotNil)
760+ c.Check(err.Error(), Matches, ".*dial.*address.*")
761+}
762+
763+func (cs *clientSessionSuite) TestDialConnects(c *C) {
764+ lp, err := net.Listen("tcp", ":0")
765+ c.Assert(err, IsNil)
766+ defer lp.Close()
767+ sess, err := NewSession(Config{}, debuglog, "wah")
768+ c.Assert(err, IsNil)
769+ sess.ServerAddr = lp.Addr().String()
770+ err = sess.Dial()
771+ c.Check(err, IsNil)
772+ c.Check(sess.Connection, NotNil)
773+}
774+
775+func (cs *clientSessionSuite) TestResetFailsWithoutProtocolator(c *C) {
776+ sess, _ := NewSession(Config{}, debuglog, "wah")
777+ sess.Protocolator = nil
778+ err := sess.Reset()
779+ c.Assert(err, NotNil)
780+ c.Check(err.Error(), Matches, ".*protocol constructor\\.")
781+}
782+
783+func (cs *clientSessionSuite) TestResetFailsWithNoAddress(c *C) {
784+ sess, err := NewSession(Config{}, debuglog, "wah")
785+ c.Assert(err, IsNil)
786+ err = sess.Reset()
787+ c.Assert(err, NotNil)
788+ c.Check(err.Error(), Matches, ".*dial.*address.*")
789+}
790+
791+func (cs *clientSessionSuite) TestResets(c *C) {
792+ upCh := make(chan interface{}, 5)
793+ downCh := make(chan interface{}, 5)
794+ proto := &testProtocol{up: upCh, down: downCh}
795+ lp, err := net.Listen("tcp", ":0")
796+ c.Assert(err, IsNil)
797+ defer lp.Close()
798+
799+ sess, err := NewSession(Config{}, debuglog, "wah")
800+ c.Assert(err, IsNil)
801+ sess.ServerAddr = lp.Addr().String()
802+ sess.Connection = &testConn{Name: testname()}
803+ sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto }
804+
805+ sess.Reset()
806+
807+ // wheee
808+ err = <-sess.ErrCh
809+ c.Assert(err, NotNil) // some random tcp error because
810+ // there's nobody talking to the port
811+}
812
813=== modified file 'protocol/protocol.go'
814--- protocol/protocol.go 2014-01-14 15:35:20 +0000
815+++ protocol/protocol.go 2014-01-25 02:14:54 +0000
816@@ -55,7 +55,7 @@
817 }
818
819 // NewProtocol0 creates and initialises a protocol with wire format version 0.
820-func NewProtocol0(conn net.Conn) *protocol0 {
821+func NewProtocol0(conn net.Conn) Protocol {
822 buf := bytes.NewBuffer(make([]byte, 5000))
823 return &protocol0{
824 buffer: buf,

Subscribers

People subscribed via source and target branches