Merge lp:~chipaca/ubuntu-push/client-v0 into lp:ubuntu-push
- client-v0
- Merge into trunk
Proposed by
John Lenton
Status: | Rejected |
---|---|
Rejected by: | John Lenton |
Proposed branch: | lp:~chipaca/ubuntu-push/client-v0 |
Merge into: | lp:ubuntu-push |
Prerequisite: | lp:~chipaca/ubuntu-push/redialer |
Diff against target: |
824 lines (+801/-1) 3 files modified
client/session/session.go (+214/-0) client/session/session_test.go (+586/-0) protocol/protocol.go (+1/-1) |
To merge this branch: | bzr merge lp:~chipaca/ubuntu-push/client-v0 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Samuele Pedroni | Needs Fixing | ||
Review via email:
|
Commit message
The client (session), v0.
Description of the change
The client (session), v0.
A lot of comonality with some of the testing bits of protocol and
server/session; later, let's see if we can refactor.
To post a comment you must log in.
- 25. By John Lenton
-
a few cleanups
- 26. By John Lenton
-
Moving it into client/session because that's what it is.
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Samuele Pedroni (pedronis) wrote : | # |
probably run can be split even more,
- in start/setup
- get next message
- acting
- reply
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added directory 'client' |
2 | === added directory 'client/session' |
3 | === added file 'client/session/session.go' |
4 | --- client/session/session.go 1970-01-01 00:00:00 +0000 |
5 | +++ client/session/session.go 2014-01-25 02:14:54 +0000 |
6 | @@ -0,0 +1,214 @@ |
7 | +/* |
8 | + Copyright 2013-2014 Canonical Ltd. |
9 | + |
10 | + This program is free software: you can redistribute it and/or modify it |
11 | + under the terms of the GNU General Public License version 3, as published |
12 | + by the Free Software Foundation. |
13 | + |
14 | + This program is distributed in the hope that it will be useful, but |
15 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
16 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
17 | + PURPOSE. See the GNU General Public License for more details. |
18 | + |
19 | + You should have received a copy of the GNU General Public License along |
20 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
21 | +*/ |
22 | + |
23 | +package session |
24 | + |
25 | +import ( |
26 | + "crypto/tls" |
27 | + "crypto/x509" |
28 | + "errors" |
29 | + "io/ioutil" |
30 | + "launchpad.net/ubuntu-push/config" |
31 | + "launchpad.net/ubuntu-push/logger" |
32 | + "launchpad.net/ubuntu-push/protocol" |
33 | + "net" |
34 | + "time" |
35 | +) |
36 | + |
37 | +var wireVersionBytes = []byte{protocol.ProtocolWireVersion} |
38 | + |
39 | +type Notification struct { |
40 | + // something something something |
41 | +} |
42 | + |
43 | +type LevelMap interface { |
44 | + Set(level string, top int64) |
45 | + GetAll() map[string]int64 |
46 | +} |
47 | + |
48 | +type mapLevelMap map[string]int64 |
49 | + |
50 | +func (m *mapLevelMap) Set(level string, top int64) { |
51 | + (*m)[level] = top |
52 | +} |
53 | +func (m *mapLevelMap) GetAll() map[string]int64 { |
54 | + return map[string]int64(*m) |
55 | +} |
56 | + |
57 | +var _ LevelMap = &mapLevelMap{} |
58 | + |
59 | +type Config struct { |
60 | + // session configuration |
61 | + ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"` |
62 | + // server connection config |
63 | + Addr config.ConfigHostPort |
64 | + CertPEMFile string `json:"cert_pem_file"` |
65 | +} |
66 | + |
67 | +// ClienSession holds a client<->server session and its configuration. |
68 | +type ClientSession struct { |
69 | + // configuration |
70 | + DeviceId string |
71 | + ServerAddr string |
72 | + ExchangeTimeout time.Duration |
73 | + Levels LevelMap |
74 | + // connection |
75 | + Connection net.Conn |
76 | + Protocolator func(net.Conn) protocol.Protocol |
77 | + Log logger.Logger |
78 | + TLS *tls.Config |
79 | + // status |
80 | + ErrCh chan error |
81 | + MsgCh chan *Notification |
82 | +} |
83 | + |
84 | +func NewSession(config Config, log logger.Logger, deviceId string) (*ClientSession, error) { |
85 | + sess := &ClientSession{ |
86 | + ExchangeTimeout: config.ExchangeTimeout.TimeDuration(), |
87 | + ServerAddr: config.Addr.HostPort(), |
88 | + DeviceId: deviceId, |
89 | + Log: log, |
90 | + Protocolator: protocol.NewProtocol0, |
91 | + Levels: &mapLevelMap{}, |
92 | + TLS: &tls.Config{InsecureSkipVerify: true}, // XXX |
93 | + } |
94 | + if config.CertPEMFile != "" { |
95 | + cert, err := ioutil.ReadFile(config.CertPEMFile) |
96 | + if err != nil { |
97 | + return nil, err |
98 | + } |
99 | + cp := x509.NewCertPool() |
100 | + ok := cp.AppendCertsFromPEM(cert) |
101 | + if !ok { |
102 | + return nil, errors.New("dial: could not parse certificate") |
103 | + } |
104 | + sess.TLS.RootCAs = cp |
105 | + } |
106 | + return sess, nil |
107 | +} |
108 | + |
109 | +// Dial connects to a server using the configuration in the ClientSession |
110 | +// and sets up the connection. |
111 | +func (sess *ClientSession) Dial() error { |
112 | + conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout) |
113 | + if err != nil { |
114 | + return err |
115 | + } |
116 | + sess.Connection = tls.Client(conn, sess.TLS) |
117 | + return nil |
118 | +} |
119 | + |
120 | +type serverMsg struct { |
121 | + Type string `json:"T"` |
122 | + protocol.BroadcastMsg |
123 | + protocol.NotificationsMsg |
124 | +} |
125 | + |
126 | +func (sess *ClientSession) Reset() error { |
127 | + if sess.Protocolator == nil { |
128 | + return errors.New("Can't Reset() without a protocol constructor.") |
129 | + } |
130 | + if sess.Connection != nil { |
131 | + sess.Connection.Close() // just in case |
132 | + } |
133 | + err := sess.Dial() |
134 | + if err != nil { |
135 | + sess.Log.Errorf("%s", err) |
136 | + return err |
137 | + } |
138 | + sess.ErrCh = make(chan error, 1) |
139 | + sess.MsgCh = make(chan *Notification) |
140 | + sess.Run() |
141 | + return nil |
142 | +} |
143 | + |
144 | +func (sess *ClientSession) Run() { |
145 | + go func() { sess.ErrCh <- sess.run() }() |
146 | +} |
147 | + |
148 | +// Run the session with the server, emits a stream of events. |
149 | +func (sess *ClientSession) run() error { |
150 | + conn := sess.Connection |
151 | + if conn == nil { |
152 | + return errors.New("Can't run() disconnected.") |
153 | + } |
154 | + if sess.Protocolator == nil { |
155 | + return errors.New("Can't run() without a protocol constructor.") |
156 | + } |
157 | + defer conn.Close() |
158 | + err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
159 | + if err != nil { |
160 | + return err |
161 | + } |
162 | + _, err = conn.Write(wireVersionBytes) |
163 | + // The Writer docs: Write must return a non-nil error if it returns |
164 | + // n < len(p). So, no need to check number of bytes written, hooray. |
165 | + if err != nil { |
166 | + return err |
167 | + } |
168 | + proto := sess.Protocolator(conn) |
169 | + err = proto.WriteMessage(protocol.ConnectMsg{ |
170 | + Type: "connect", |
171 | + DeviceId: sess.DeviceId, |
172 | + Levels: sess.Levels.GetAll(), |
173 | + }) |
174 | + if err != nil { |
175 | + return err |
176 | + } |
177 | + var connAck protocol.ConnAckMsg |
178 | + err = proto.ReadMessage(&connAck) |
179 | + if err != nil { |
180 | + return err |
181 | + } |
182 | + pingInterval, err := time.ParseDuration(connAck.Params.PingInterval) |
183 | + if err != nil { |
184 | + return err |
185 | + } |
186 | + sess.Log.Debugf("Connected %v.", conn.LocalAddr()) |
187 | + var recv serverMsg |
188 | + for { |
189 | + deadAfter := pingInterval + sess.ExchangeTimeout |
190 | + conn.SetDeadline(time.Now().Add(deadAfter)) |
191 | + err = proto.ReadMessage(&recv) |
192 | + if err != nil { |
193 | + return err |
194 | + } |
195 | + switch recv.Type { |
196 | + case "ping": |
197 | + conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
198 | + err := proto.WriteMessage(protocol.PingPongMsg{Type: "pong"}) |
199 | + if err != nil { |
200 | + return err |
201 | + } |
202 | + sess.Log.Debugf("Ping.") |
203 | + case "broadcast": |
204 | + conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
205 | + err := proto.WriteMessage(protocol.PingPongMsg{Type: "ack"}) |
206 | + if err != nil { |
207 | + return err |
208 | + } |
209 | + sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s", |
210 | + recv.ChanId, recv.AppId, recv.TopLevel, recv.Payloads) |
211 | + if recv.ChanId == protocol.SystemChannelId { |
212 | + // the system channel id, the only one we care about for now |
213 | + sess.Levels.Set(recv.ChanId, recv.TopLevel) |
214 | + sess.MsgCh <- &Notification{} |
215 | + } else { |
216 | + sess.Log.Debugf("What is this weird channel, %s?", recv.ChanId) |
217 | + } |
218 | + } |
219 | + } |
220 | +} |
221 | |
222 | === added file 'client/session/session_test.go' |
223 | --- client/session/session_test.go 1970-01-01 00:00:00 +0000 |
224 | +++ client/session/session_test.go 2014-01-25 02:14:54 +0000 |
225 | @@ -0,0 +1,586 @@ |
226 | +/* |
227 | + Copyright 2013-2014 Canonical Ltd. |
228 | + |
229 | + This program is free software: you can redistribute it and/or modify it |
230 | + under the terms of the GNU General Public License version 3, as published |
231 | + by the Free Software Foundation. |
232 | + |
233 | + This program is distributed in the hope that it will be useful, but |
234 | + WITHOUT ANY WARRANTY; without even the implied warranties of |
235 | + MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR |
236 | + PURPOSE. See the GNU General Public License for more details. |
237 | + |
238 | + You should have received a copy of the GNU General Public License along |
239 | + with this program. If not, see <http://www.gnu.org/licenses/>. |
240 | +*/ |
241 | + |
242 | +package session |
243 | + |
244 | +import ( |
245 | + "encoding/json" |
246 | + "errors" |
247 | + "fmt" |
248 | + "io" |
249 | + "io/ioutil" |
250 | + . "launchpad.net/gocheck" |
251 | + "launchpad.net/ubuntu-push/logger" |
252 | + "launchpad.net/ubuntu-push/protocol" |
253 | + helpers "launchpad.net/ubuntu-push/testing" |
254 | + "launchpad.net/ubuntu-push/testing/condition" |
255 | + "net" |
256 | + "os" |
257 | + "reflect" |
258 | + "runtime" |
259 | + "strings" |
260 | + "testing" |
261 | + "time" |
262 | +) |
263 | + |
264 | +func TestSession(t *testing.T) { TestingT(t) } |
265 | + |
266 | +type clientSessionSuite struct{} |
267 | + |
268 | +var nullog = logger.NewSimpleLogger(ioutil.Discard, "error") |
269 | +var debuglog = logger.NewSimpleLogger(os.Stderr, "debug") |
270 | +var _ = Suite(&clientSessionSuite{}) |
271 | + |
272 | +/**************************************************************** |
273 | + NewSession() tests |
274 | +****************************************************************/ |
275 | + |
276 | +func (cs *clientSessionSuite) TestNewSessionPlainWorks(c *C) { |
277 | + cfg := Config{} |
278 | + sess, err := NewSession(cfg, nullog, "wah") |
279 | + c.Check(sess, NotNil) |
280 | + c.Check(err, IsNil) |
281 | +} |
282 | + |
283 | +var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert") |
284 | + |
285 | +func (cs *clientSessionSuite) TestNewSessionPEMWorks(c *C) { |
286 | + cfg := Config{CertPEMFile: certfile} |
287 | + sess, err := NewSession(cfg, nullog, "wah") |
288 | + c.Check(sess, NotNil) |
289 | + c.Assert(err, IsNil) |
290 | + c.Check(sess.TLS.RootCAs, NotNil) |
291 | +} |
292 | + |
293 | +func (cs *clientSessionSuite) TestNewSessionBadPEMFilePathFails(c *C) { |
294 | + cfg := Config{CertPEMFile: "/no/such/path"} |
295 | + sess, err := NewSession(cfg, nullog, "wah") |
296 | + c.Check(sess, IsNil) |
297 | + c.Check(err, NotNil) |
298 | +} |
299 | + |
300 | +func (cs *clientSessionSuite) TestNewSessionBadPEMFileContentFails(c *C) { |
301 | + cfg := Config{CertPEMFile: "/etc/passwd"} |
302 | + sess, err := NewSession(cfg, nullog, "wah") |
303 | + c.Check(sess, IsNil) |
304 | + c.Check(err, NotNil) |
305 | +} |
306 | + |
307 | +/**************************************************************** |
308 | + Run() tests |
309 | +****************************************************************/ |
310 | + |
311 | +func testname() string { |
312 | + pcs := make([]uintptr, 200) |
313 | + runtime.Callers(0, pcs) |
314 | + testname := "<unknown>" |
315 | + for _, pc := range pcs { |
316 | + me := runtime.FuncForPC(pc) |
317 | + if me == nil { |
318 | + break |
319 | + } |
320 | + parts := strings.Split(me.Name(), ".") |
321 | + funcname := parts[len(parts)-1] |
322 | + if strings.HasPrefix(funcname, "Test") { |
323 | + testname = funcname |
324 | + } |
325 | + } |
326 | + return testname |
327 | +} |
328 | + |
329 | +type xAddr string |
330 | + |
331 | +func (x xAddr) Network() string { return "<:>" } |
332 | +func (x xAddr) String() string { return string(x) } |
333 | + |
334 | +// testConn (roughly based on the one in protocol_test) |
335 | + |
336 | +type testConn struct { |
337 | + Name string |
338 | + Deadlines []time.Duration |
339 | + Writes [][]byte |
340 | + WriteCondition condition.Interface |
341 | + DeadlineCondition condition.Interface |
342 | +} |
343 | + |
344 | +func (tc *testConn) LocalAddr() net.Addr { return xAddr(tc.Name) } |
345 | + |
346 | +func (tc *testConn) RemoteAddr() net.Addr { return xAddr(tc.Name) } |
347 | + |
348 | +func (tc *testConn) Close() error { return nil } |
349 | + |
350 | +func (tc *testConn) SetDeadline(t time.Time) error { |
351 | + tc.Deadlines = append(tc.Deadlines, t.Sub(time.Now())) |
352 | + if tc.DeadlineCondition == nil || tc.DeadlineCondition.OK() { |
353 | + return nil |
354 | + } else { |
355 | + return errors.New("deadliner on fire") |
356 | + } |
357 | +} |
358 | + |
359 | +func (tc *testConn) SetReadDeadline(t time.Time) error { panic("NIH"); return nil } |
360 | +func (tc *testConn) SetWriteDeadline(t time.Time) error { panic("NIH"); return nil } |
361 | +func (tc *testConn) Read(buf []byte) (n int, err error) { panic("NIH"); return -1, nil } |
362 | + |
363 | +func (tc *testConn) Write(buf []byte) (int, error) { |
364 | + store := make([]byte, len(buf)) |
365 | + copy(store, buf) |
366 | + tc.Writes = append(tc.Writes, store) |
367 | + if tc.WriteCondition == nil || tc.WriteCondition.OK() { |
368 | + return len(store), nil |
369 | + } else { |
370 | + return -1, errors.New("writer on fire") |
371 | + } |
372 | +} |
373 | + |
374 | +// test protocol (from session_test) |
375 | + |
376 | +type testProtocol struct { |
377 | + up chan interface{} |
378 | + down chan interface{} |
379 | +} |
380 | + |
381 | +// takeNext takes a value from given channel with a 5s timeout |
382 | +func takeNext(ch <-chan interface{}) interface{} { |
383 | + select { |
384 | + case <-time.After(5 * time.Second): |
385 | + panic("test protocol exchange stuck: too long waiting") |
386 | + case v := <-ch: |
387 | + return v |
388 | + } |
389 | + return nil |
390 | +} |
391 | + |
392 | +func (c *testProtocol) SetDeadline(t time.Time) { |
393 | + deadAfter := t.Sub(time.Now()) |
394 | + deadAfter = (deadAfter + time.Millisecond/2) / time.Millisecond * time.Millisecond |
395 | + c.down <- fmt.Sprintf("deadline %v", deadAfter) |
396 | +} |
397 | + |
398 | +func (c *testProtocol) ReadMessage(dest interface{}) error { |
399 | + switch v := takeNext(c.up).(type) { |
400 | + case error: |
401 | + return v |
402 | + default: |
403 | + // make sure JSON.Unmarshal works with dest |
404 | + var marshalledMsg []byte |
405 | + marshalledMsg, err := json.Marshal(v) |
406 | + if err != nil { |
407 | + return fmt.Errorf("can't jsonify test value %v: %s", v, err) |
408 | + } |
409 | + return json.Unmarshal(marshalledMsg, dest) |
410 | + } |
411 | + return nil |
412 | +} |
413 | + |
414 | +func (c *testProtocol) WriteMessage(src interface{}) error { |
415 | + // make sure JSON.Marshal works with src |
416 | + _, err := json.Marshal(src) |
417 | + if err != nil { |
418 | + return err |
419 | + } |
420 | + val := reflect.ValueOf(src) |
421 | + if val.Kind() == reflect.Ptr { |
422 | + src = val.Elem().Interface() |
423 | + } |
424 | + c.down <- src |
425 | + switch v := takeNext(c.up).(type) { |
426 | + case error: |
427 | + return v |
428 | + } |
429 | + return nil |
430 | +} |
431 | + |
432 | +/**************************************************************** |
433 | + * |
434 | + * Go way down to the bottom if you want to see the full, working case. |
435 | + * This has a rather slow buildup. |
436 | + * |
437 | + * TODO: check deadlines |
438 | + * |
439 | + ****************************************************************/ |
440 | + |
441 | +func (cs *clientSessionSuite) TestRunFailsIfNilConnection(c *C) { |
442 | + sess, err := NewSession(Config{}, debuglog, "wah") |
443 | + c.Assert(err, IsNil) |
444 | + // not connected! |
445 | + err = sess.run() |
446 | + c.Assert(err, NotNil) |
447 | + c.Check(err.Error(), Matches, ".*disconnected.*") |
448 | +} |
449 | + |
450 | +func (cs *clientSessionSuite) TestRunFailsIfNilProtocolator(c *C) { |
451 | + sess, err := NewSession(Config{}, debuglog, "wah") |
452 | + c.Assert(err, IsNil) |
453 | + sess.Connection = &testConn{Name: testname()} // ok, have a constructor |
454 | + sess.Protocolator = nil // but no protocol, seeficare. |
455 | + err = sess.run() |
456 | + c.Assert(err, NotNil) |
457 | + c.Check(err.Error(), Matches, ".*protocol constructor.*") |
458 | +} |
459 | + |
460 | +func (cs *clientSessionSuite) TestRunFailsIfSetDeadlineFails(c *C) { |
461 | + sess, err := NewSession(Config{}, debuglog, "wah") |
462 | + c.Assert(err, IsNil) |
463 | + sess.Connection = &testConn{Name: testname(), |
464 | + DeadlineCondition: condition.Work(false)} // setdeadline will fail |
465 | + err = sess.run() |
466 | + c.Assert(err, NotNil) |
467 | + c.Check(err.Error(), Matches, ".*deadline.*") |
468 | +} |
469 | + |
470 | +func (cs *clientSessionSuite) TestRunFailsIfWriteFails(c *C) { |
471 | + sess, err := NewSession(Config{}, debuglog, "wah") |
472 | + c.Assert(err, IsNil) |
473 | + sess.Connection = &testConn{Name: testname(), |
474 | + WriteCondition: condition.Work(false)} // write will fail |
475 | + err = sess.run() |
476 | + c.Assert(err, NotNil) |
477 | + c.Check(err.Error(), Matches, ".*write.*") |
478 | +} |
479 | + |
480 | +func (cs *clientSessionSuite) TestRunConnectMessageFails(c *C) { |
481 | + sess, err := NewSession(Config{}, debuglog, "wah") |
482 | + c.Assert(err, IsNil) |
483 | + sess.Connection = &testConn{Name: testname()} |
484 | + errCh := make(chan error, 1) |
485 | + upCh := make(chan interface{}, 5) |
486 | + downCh := make(chan interface{}, 5) |
487 | + proto := &testProtocol{up: upCh, down: downCh} |
488 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
489 | + |
490 | + go func() { |
491 | + errCh <- sess.run() |
492 | + }() |
493 | + |
494 | + c.Check(takeNext(downCh), DeepEquals, protocol.ConnectMsg{ |
495 | + Type: "connect", |
496 | + DeviceId: sess.DeviceId, |
497 | + Levels: map[string]int64{}, |
498 | + }) |
499 | + upCh <- errors.New("Overflow error in /dev/null") |
500 | + err = <-errCh |
501 | + c.Assert(err, NotNil) |
502 | + c.Check(err.Error(), Matches, "Overflow.*null") |
503 | +} |
504 | + |
505 | +func (cs *clientSessionSuite) TestRunConnackReadError(c *C) { |
506 | + sess, err := NewSession(Config{}, debuglog, "wah") |
507 | + c.Assert(err, IsNil) |
508 | + sess.Connection = &testConn{Name: testname()} |
509 | + errCh := make(chan error, 1) |
510 | + upCh := make(chan interface{}, 5) |
511 | + downCh := make(chan interface{}, 5) |
512 | + proto := &testProtocol{up: upCh, down: downCh} |
513 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
514 | + |
515 | + go func() { |
516 | + errCh <- sess.run() |
517 | + }() |
518 | + |
519 | + takeNext(downCh) // connectMsg |
520 | + upCh <- nil // no error |
521 | + upCh <- io.EOF |
522 | + err = <-errCh |
523 | + c.Assert(err, NotNil) |
524 | + c.Check(err.Error(), Matches, ".*EOF.*") |
525 | +} |
526 | + |
527 | +func (cs *clientSessionSuite) TestRunBadConnack(c *C) { |
528 | + sess, err := NewSession(Config{}, debuglog, "wah") |
529 | + c.Assert(err, IsNil) |
530 | + sess.Connection = &testConn{Name: testname()} |
531 | + errCh := make(chan error, 1) |
532 | + upCh := make(chan interface{}, 5) |
533 | + downCh := make(chan interface{}, 5) |
534 | + proto := &testProtocol{up: upCh, down: downCh} |
535 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
536 | + |
537 | + go func() { |
538 | + errCh <- sess.run() |
539 | + }() |
540 | + |
541 | + takeNext(downCh) // connectMsg |
542 | + upCh <- nil // no error |
543 | + upCh <- protocol.ConnAckMsg{} |
544 | + err = <-errCh |
545 | + c.Assert(err, NotNil) |
546 | + c.Check(err.Error(), Matches, ".*invalid.*") |
547 | +} |
548 | + |
549 | +func (cs *clientSessionSuite) TestRunMainloopReadError(c *C) { |
550 | + sess, err := NewSession(Config{}, debuglog, "wah") |
551 | + c.Assert(err, IsNil) |
552 | + sess.Connection = &testConn{Name: testname()} |
553 | + errCh := make(chan error, 1) |
554 | + upCh := make(chan interface{}, 5) |
555 | + downCh := make(chan interface{}, 5) |
556 | + proto := &testProtocol{up: upCh, down: downCh} |
557 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
558 | + |
559 | + go func() { |
560 | + errCh <- sess.run() |
561 | + }() |
562 | + |
563 | + takeNext(downCh) // connectMsg |
564 | + upCh <- nil // no error |
565 | + upCh <- protocol.ConnAckMsg{ |
566 | + Type: "connack", |
567 | + Params: protocol.ConnAckParams{(10 * time.Millisecond).String()}, |
568 | + } |
569 | + // in the mainloop! |
570 | + upCh <- errors.New("Read") |
571 | + err = <-errCh |
572 | + c.Assert(err, NotNil) |
573 | + c.Check(err.Error(), Equals, "Read") |
574 | +} |
575 | + |
576 | +func (cs *clientSessionSuite) TestRunPongWriteError(c *C) { |
577 | + sess, err := NewSession(Config{}, debuglog, "wah") |
578 | + c.Assert(err, IsNil) |
579 | + sess.Connection = &testConn{Name: testname()} |
580 | + errCh := make(chan error, 1) |
581 | + upCh := make(chan interface{}, 5) |
582 | + downCh := make(chan interface{}, 5) |
583 | + proto := &testProtocol{up: upCh, down: downCh} |
584 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
585 | + |
586 | + go func() { |
587 | + errCh <- sess.run() |
588 | + }() |
589 | + |
590 | + takeNext(downCh) // connectMsg |
591 | + upCh <- nil // no error |
592 | + upCh <- protocol.ConnAckMsg{ |
593 | + Type: "connack", |
594 | + Params: protocol.ConnAckParams{(10 * time.Millisecond).String()}, |
595 | + } |
596 | + // in the mainloop! |
597 | + upCh <- protocol.PingPongMsg{Type: "ping"} |
598 | + c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"}) |
599 | + upCh <- errors.New("Pong") |
600 | + err = <-errCh |
601 | + c.Assert(err, NotNil) |
602 | + c.Check(err.Error(), Equals, "Pong") |
603 | +} |
604 | + |
605 | +func (cs *clientSessionSuite) TestRunPingPong(c *C) { |
606 | + sess, err := NewSession(Config{}, debuglog, "wah") |
607 | + c.Assert(err, IsNil) |
608 | + sess.Connection = &testConn{Name: testname()} |
609 | + errCh := make(chan error, 1) |
610 | + upCh := make(chan interface{}, 5) |
611 | + downCh := make(chan interface{}, 5) |
612 | + proto := &testProtocol{up: upCh, down: downCh} |
613 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
614 | + |
615 | + go func() { |
616 | + errCh <- sess.run() |
617 | + }() |
618 | + |
619 | + takeNext(downCh) // connectMsg |
620 | + upCh <- nil // no error |
621 | + upCh <- protocol.ConnAckMsg{ |
622 | + Type: "connack", |
623 | + Params: protocol.ConnAckParams{(10 * time.Millisecond).String()}, |
624 | + } |
625 | + // in the mainloop! |
626 | + upCh <- protocol.PingPongMsg{Type: "ping"} |
627 | + c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"}) |
628 | + upCh <- nil // pong ok |
629 | + upCh <- io.EOF // close it down |
630 | + err = <-errCh |
631 | +} |
632 | + |
633 | +func (cs *clientSessionSuite) TestRunBadAckWrite(c *C) { |
634 | + sess, err := NewSession(Config{}, debuglog, "wah") |
635 | + c.Assert(err, IsNil) |
636 | + sess.Connection = &testConn{Name: testname()} |
637 | + errCh := make(chan error, 1) |
638 | + upCh := make(chan interface{}, 5) |
639 | + downCh := make(chan interface{}, 5) |
640 | + proto := &testProtocol{up: upCh, down: downCh} |
641 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
642 | + sess.MsgCh = make(chan *Notification, 5) |
643 | + |
644 | + go func() { |
645 | + errCh <- sess.run() |
646 | + }() |
647 | + |
648 | + takeNext(downCh) // connectMsg |
649 | + upCh <- nil // no error |
650 | + upCh <- protocol.ConnAckMsg{ |
651 | + Type: "connack", |
652 | + Params: protocol.ConnAckParams{time.Second.String()}, |
653 | + } |
654 | + // in the mainloop! |
655 | + |
656 | + b := &protocol.BroadcastMsg{ |
657 | + Type: "broadcast", |
658 | + AppId: "APP", |
659 | + ChanId: "0", |
660 | + TopLevel: 2, |
661 | + Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
662 | + } |
663 | + upCh <- b |
664 | + c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "ack"}) |
665 | + upCh <- errors.New("ACK ACK ACK") |
666 | + err = <-errCh |
667 | + c.Assert(err, NotNil) |
668 | + c.Check(err.Error(), Equals, "ACK ACK ACK") |
669 | +} |
670 | + |
671 | +func (cs *clientSessionSuite) TestRunBroadcastWrongChannel(c *C) { |
672 | + sess, err := NewSession(Config{}, debuglog, "wah") |
673 | + c.Assert(err, IsNil) |
674 | + sess.Connection = &testConn{Name: testname()} |
675 | + errCh := make(chan error, 1) |
676 | + upCh := make(chan interface{}, 5) |
677 | + downCh := make(chan interface{}, 5) |
678 | + proto := &testProtocol{up: upCh, down: downCh} |
679 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
680 | + sess.MsgCh = make(chan *Notification, 5) |
681 | + |
682 | + go func() { |
683 | + errCh <- sess.run() |
684 | + }() |
685 | + |
686 | + takeNext(downCh) // connectMsg |
687 | + upCh <- nil // no error |
688 | + upCh <- protocol.ConnAckMsg{ |
689 | + Type: "connack", |
690 | + Params: protocol.ConnAckParams{time.Second.String()}, |
691 | + } |
692 | + // in the mainloop! |
693 | + |
694 | + b := &protocol.BroadcastMsg{ |
695 | + Type: "broadcast", |
696 | + AppId: "APP", |
697 | + ChanId: "42", |
698 | + TopLevel: 2, |
699 | + Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
700 | + } |
701 | + upCh <- b |
702 | + c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "ack"}) |
703 | + upCh <- nil // ack ok |
704 | + upCh <- io.EOF // close it down |
705 | + err = <-errCh |
706 | + c.Check(len(sess.MsgCh), Equals, 0) |
707 | +} |
708 | + |
709 | +func (cs *clientSessionSuite) TestRunBroadcastRightChannel(c *C) { |
710 | + sess, err := NewSession(Config{}, debuglog, "wah") |
711 | + c.Assert(err, IsNil) |
712 | + sess.Connection = &testConn{Name: testname()} |
713 | + sess.ErrCh = make(chan error, 1) |
714 | + upCh := make(chan interface{}, 5) |
715 | + downCh := make(chan interface{}, 5) |
716 | + proto := &testProtocol{up: upCh, down: downCh} |
717 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
718 | + sess.MsgCh = make(chan *Notification, 5) |
719 | + |
720 | + sess.Run() |
721 | + |
722 | + takeNext(downCh) // connectMsg |
723 | + upCh <- nil // no error |
724 | + upCh <- protocol.ConnAckMsg{ |
725 | + Type: "connack", |
726 | + Params: protocol.ConnAckParams{time.Second.String()}, |
727 | + } |
728 | + // in the mainloop! |
729 | + |
730 | + b := &protocol.BroadcastMsg{ |
731 | + Type: "broadcast", |
732 | + AppId: "--ignored--", |
733 | + ChanId: "0", |
734 | + TopLevel: 2, |
735 | + Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
736 | + } |
737 | + upCh <- b |
738 | + c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "ack"}) |
739 | + upCh <- nil // ack ok |
740 | + upCh <- io.EOF // close it down |
741 | + err = <-sess.ErrCh |
742 | + c.Assert(len(sess.MsgCh), Equals, 1) |
743 | + c.Check(<-sess.MsgCh, Equals, &Notification{}) |
744 | + // and finally, the session keeps track of the levels |
745 | + c.Check(sess.Levels.GetAll(), DeepEquals, map[string]int64{"0": 2}) |
746 | +} |
747 | + |
748 | +/* |
749 | + * |
750 | + * |
751 | + * |
752 | + * breathe in... |
753 | + */ |
754 | + |
755 | +func (cs *clientSessionSuite) TestDialFailsWithNoAddress(c *C) { |
756 | + sess, err := NewSession(Config{}, debuglog, "wah") |
757 | + c.Assert(err, IsNil) |
758 | + err = sess.Dial() |
759 | + c.Assert(err, NotNil) |
760 | + c.Check(err.Error(), Matches, ".*dial.*address.*") |
761 | +} |
762 | + |
763 | +func (cs *clientSessionSuite) TestDialConnects(c *C) { |
764 | + lp, err := net.Listen("tcp", ":0") |
765 | + c.Assert(err, IsNil) |
766 | + defer lp.Close() |
767 | + sess, err := NewSession(Config{}, debuglog, "wah") |
768 | + c.Assert(err, IsNil) |
769 | + sess.ServerAddr = lp.Addr().String() |
770 | + err = sess.Dial() |
771 | + c.Check(err, IsNil) |
772 | + c.Check(sess.Connection, NotNil) |
773 | +} |
774 | + |
775 | +func (cs *clientSessionSuite) TestResetFailsWithoutProtocolator(c *C) { |
776 | + sess, _ := NewSession(Config{}, debuglog, "wah") |
777 | + sess.Protocolator = nil |
778 | + err := sess.Reset() |
779 | + c.Assert(err, NotNil) |
780 | + c.Check(err.Error(), Matches, ".*protocol constructor\\.") |
781 | +} |
782 | + |
783 | +func (cs *clientSessionSuite) TestResetFailsWithNoAddress(c *C) { |
784 | + sess, err := NewSession(Config{}, debuglog, "wah") |
785 | + c.Assert(err, IsNil) |
786 | + err = sess.Reset() |
787 | + c.Assert(err, NotNil) |
788 | + c.Check(err.Error(), Matches, ".*dial.*address.*") |
789 | +} |
790 | + |
791 | +func (cs *clientSessionSuite) TestResets(c *C) { |
792 | + upCh := make(chan interface{}, 5) |
793 | + downCh := make(chan interface{}, 5) |
794 | + proto := &testProtocol{up: upCh, down: downCh} |
795 | + lp, err := net.Listen("tcp", ":0") |
796 | + c.Assert(err, IsNil) |
797 | + defer lp.Close() |
798 | + |
799 | + sess, err := NewSession(Config{}, debuglog, "wah") |
800 | + c.Assert(err, IsNil) |
801 | + sess.ServerAddr = lp.Addr().String() |
802 | + sess.Connection = &testConn{Name: testname()} |
803 | + sess.Protocolator = func(_ net.Conn) protocol.Protocol { return proto } |
804 | + |
805 | + sess.Reset() |
806 | + |
807 | + // wheee |
808 | + err = <-sess.ErrCh |
809 | + c.Assert(err, NotNil) // some random tcp error because |
810 | + // there's nobody talking to the port |
811 | +} |
812 | |
813 | === modified file 'protocol/protocol.go' |
814 | --- protocol/protocol.go 2014-01-14 15:35:20 +0000 |
815 | +++ protocol/protocol.go 2014-01-25 02:14:54 +0000 |
816 | @@ -55,7 +55,7 @@ |
817 | } |
818 | |
819 | // NewProtocol0 creates and initialises a protocol with wire format version 0. |
820 | -func NewProtocol0(conn net.Conn) *protocol0 { |
821 | +func NewProtocol0(conn net.Conn) Protocol { |
822 | buf := bytes.NewBuffer(make([]byte, 5000)) |
823 | return &protocol0{ |
824 | buffer: buf, |
I think it would be saner for the testing if Config was an interface and not the concrete thing, I don't think it's the task of the session to read pem files.
same for run, it should be split in a start and a loop so that tests can poke at simpler bits