Merge lp:~chipaca/ubuntu-push/no-more-session-autoredial into lp:ubuntu-push/automatic

Proposed by John Lenton
Status: Merged
Approved by: John Lenton
Approved revision: 381
Merged at revision: 375
Proposed branch: lp:~chipaca/ubuntu-push/no-more-session-autoredial
Merge into: lp:ubuntu-push/automatic
Prerequisite: lp:~chipaca/ubuntu-push/session-state-lock
Diff against target: 1169 lines (+401/-196)
4 files modified
client/client.go (+4/-26)
client/client_test.go (+30/-109)
client/session/session.go (+106/-7)
client/session/session_test.go (+261/-54)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/no-more-session-autoredial
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+252612@code.launchpad.net

Commit message

Remove AutoRedial from ClientSession's interface.

To post a comment you must log in.
378. By John Lenton

removed spurious test

379. By John Lenton

merged from lp

Revision history for this message
Samuele Pedroni (pedronis) wrote :

see comments

Revision history for this message
John Lenton (chipaca) wrote :

Agreed on both points, fixing.

380. By John Lenton

call close from stopCh handler; set state to shutdown before closing stopCh.

381. By John Lenton

repeat myself inside the stopCh handler, because it's not *exactly* doClose what I want, and there isn't yet a clear way of generalizing that function without making a mess down-pipe.

Revision history for this message
Samuele Pedroni (pedronis) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'client/client.go'
--- client/client.go 2015-03-03 10:46:04 +0000
+++ client/client.go 2015-03-17 16:47:16 +0000
@@ -115,7 +115,6 @@
115 systemImageEndp bus.Endpoint115 systemImageEndp bus.Endpoint
116 systemImageInfo *systemimage.InfoResult116 systemImageInfo *systemimage.InfoResult
117 connCh chan bool117 connCh chan bool
118 hasConnectivity bool
119 session session.ClientSession118 session session.ClientSession
120 sessionConnectedCh chan uint32119 sessionConnectedCh chan uint32
121 pushService PushService120 pushService PushService
@@ -126,7 +125,6 @@
126 poller poller.Poller125 poller poller.Poller
127 accountsCh <-chan accounts.Changed126 accountsCh <-chan accounts.Changed
128 // session-side channels127 // session-side channels
129 errCh chan error
130 broadcastCh chan *session.BroadcastNotification128 broadcastCh chan *session.BroadcastNotification
131 notificationsCh chan session.AddressedNotification129 notificationsCh chan session.AddressedNotification
132}130}
@@ -137,7 +135,6 @@
137 return &PushClient{135 return &PushClient{
138 configPath: configPath,136 configPath: configPath,
139 leveldbPath: leveldbPath,137 leveldbPath: leveldbPath,
140 errCh: make(chan error),
141 broadcastCh: make(chan *session.BroadcastNotification),138 broadcastCh: make(chan *session.BroadcastNotification),
142 notificationsCh: make(chan session.AddressedNotification),139 notificationsCh: make(chan session.AddressedNotification),
143 }140 }
@@ -212,7 +209,6 @@
212 AuthGetter: client.getAuthorization,209 AuthGetter: client.getAuthorization,
213 AuthURL: client.config.SessionURL,210 AuthURL: client.config.SessionURL,
214 AddresseeChecker: client,211 AddresseeChecker: client,
215 ErrCh: client.errCh,
216 BroadcastCh: client.broadcastCh,212 BroadcastCh: client.broadcastCh,
217 NotificationsCh: client.notificationsCh,213 NotificationsCh: client.notificationsCh,
218 }214 }
@@ -317,6 +313,7 @@
317 return err313 return err
318 }314 }
319 client.session = sess315 client.session = sess
316 sess.KeepConnection()
320 client.poller = poller.New(client.derivePollerSetup())317 client.poller = poller.New(client.derivePollerSetup())
321 return nil318 return nil
322}319}
@@ -388,24 +385,8 @@
388// handleConnState deals with connectivity events385// handleConnState deals with connectivity events
389func (client *PushClient) handleConnState(hasConnectivity bool) {386func (client *PushClient) handleConnState(hasConnectivity bool) {
390 client.log.Debugf("handleConnState: %v", hasConnectivity)387 client.log.Debugf("handleConnState: %v", hasConnectivity)
391 if client.hasConnectivity == hasConnectivity {388 client.session.HasConnectivity(hasConnectivity)
392 // nothing to do!389 client.log.Debugf("handled.")
393 return
394 }
395 client.hasConnectivity = hasConnectivity
396 client.session.Close()
397 if hasConnectivity {
398 client.session.AutoRedial(client.sessionConnectedCh)
399 }
400}
401
402// handleErr deals with the session erroring out of its loop
403func (client *PushClient) handleErr(err error) {
404 // if we're not connected, we don't really care
405 client.log.Errorf("session exited: %s", err)
406 if client.hasConnectivity {
407 client.session.AutoRedial(client.sessionConnectedCh)
408 }
409}390}
410391
411// filterBroadcastNotification finds out if the notification is about an actual392// filterBroadcastNotification finds out if the notification is about an actual
@@ -487,7 +468,7 @@
487}468}
488469
489// doLoop connects events with their handlers470// doLoop connects events with their handlers
490func (client *PushClient) doLoop(connhandler func(bool), bcasthandler func(*session.BroadcastNotification) error, ucasthandler func(session.AddressedNotification) error, errhandler func(error), unregisterhandler func(*click.AppId), accountshandler func()) {471func (client *PushClient) doLoop(connhandler func(bool), bcasthandler func(*session.BroadcastNotification) error, ucasthandler func(session.AddressedNotification) error, unregisterhandler func(*click.AppId), accountshandler func()) {
491 for {472 for {
492 select {473 select {
493 case <-client.accountsCh:474 case <-client.accountsCh:
@@ -498,8 +479,6 @@
498 bcasthandler(bcast)479 bcasthandler(bcast)
499 case aucast := <-client.notificationsCh:480 case aucast := <-client.notificationsCh:
500 ucasthandler(aucast)481 ucasthandler(aucast)
501 case err := <-client.errCh:
502 errhandler(err)
503 case count := <-client.sessionConnectedCh:482 case count := <-client.sessionConnectedCh:
504 client.log.Debugf("session connected after %d attempts", count)483 client.log.Debugf("session connected after %d attempts", count)
505 case app := <-client.unregisterCh:484 case app := <-client.unregisterCh:
@@ -524,7 +503,6 @@
524 client.doLoop(client.handleConnState,503 client.doLoop(client.handleConnState,
525 client.handleBroadcastNotification,504 client.handleBroadcastNotification,
526 client.handleUnicastNotification,505 client.handleUnicastNotification,
527 client.handleErr,
528 client.handleUnregister,506 client.handleUnregister,
529 client.handleAccountsChange,507 client.handleAccountsChange,
530 )508 )
531509
=== modified file 'client/client_test.go'
--- client/client_test.go 2015-03-03 10:46:04 +0000
+++ client/client_test.go 2015-03-17 16:47:16 +0000
@@ -43,7 +43,6 @@
43 clickhelp "launchpad.net/ubuntu-push/click/testing"43 clickhelp "launchpad.net/ubuntu-push/click/testing"
44 "launchpad.net/ubuntu-push/client/service"44 "launchpad.net/ubuntu-push/client/service"
45 "launchpad.net/ubuntu-push/client/session"45 "launchpad.net/ubuntu-push/client/session"
46 "launchpad.net/ubuntu-push/client/session/seenstate"
47 "launchpad.net/ubuntu-push/config"46 "launchpad.net/ubuntu-push/config"
48 "launchpad.net/ubuntu-push/identifier"47 "launchpad.net/ubuntu-push/identifier"
49 idtesting "launchpad.net/ubuntu-push/identifier/testing"48 idtesting "launchpad.net/ubuntu-push/identifier/testing"
@@ -427,7 +426,6 @@
427 AuthGetter: func(string) string { return "" },426 AuthGetter: func(string) string { return "" },
428 AuthURL: "xyzzy://",427 AuthURL: "xyzzy://",
429 AddresseeChecker: cli,428 AddresseeChecker: cli,
430 ErrCh: make(chan error),
431 BroadcastCh: make(chan *session.BroadcastNotification),429 BroadcastCh: make(chan *session.BroadcastNotification),
432 NotificationsCh: make(chan session.AddressedNotification),430 NotificationsCh: make(chan session.AddressedNotification),
433 }431 }
@@ -444,10 +442,8 @@
444 // compare authGetter by string442 // compare authGetter by string
445 c.Check(fmt.Sprintf("%#v", conf.AuthGetter), Equals, fmt.Sprintf("%#v", cli.getAuthorization))443 c.Check(fmt.Sprintf("%#v", conf.AuthGetter), Equals, fmt.Sprintf("%#v", cli.getAuthorization))
446 // channels are ok as long as non-nil444 // channels are ok as long as non-nil
447 conf.ErrCh = nil
448 conf.BroadcastCh = nil445 conf.BroadcastCh = nil
449 conf.NotificationsCh = nil446 conf.NotificationsCh = nil
450 expected.ErrCh = nil
451 expected.BroadcastCh = nil447 expected.BroadcastCh = nil
452 expected.NotificationsCh = nil448 expected.NotificationsCh = nil
453 // and set it to nil449 // and set it to nil
@@ -534,9 +530,11 @@
534type derivePollerSession struct{}530type derivePollerSession struct{}
535531
536func (s *derivePollerSession) Close() {}532func (s *derivePollerSession) Close() {}
537func (s *derivePollerSession) AutoRedial(ch chan uint32) {}
538func (s *derivePollerSession) ClearCookie() {}533func (s *derivePollerSession) ClearCookie() {}
539func (s *derivePollerSession) State() session.ClientSessionState { return session.Unknown }534func (s *derivePollerSession) State() session.ClientSessionState { return session.Unknown }
535func (s *derivePollerSession) HasConnectivity(bool) error { return nil }
536func (s *derivePollerSession) KeepConnection() error { return nil }
537func (s *derivePollerSession) StopKeepConnection() {}
540538
541func (cs *clientSuite) TestDerivePollerSetup(c *C) {539func (cs *clientSuite) TestDerivePollerSetup(c *C) {
542 cs.writeTestConfig(map[string]interface{}{})540 cs.writeTestConfig(map[string]interface{}{})
@@ -722,22 +720,6 @@
722}720}
723721
724/*****************************************************************722/*****************************************************************
725 handleErr tests
726******************************************************************/
727
728func (cs *clientSuite) TestHandleErr(c *C) {
729 cli := NewPushClient(cs.configPath, cs.leveldbPath)
730 cli.log = cs.log
731 cli.systemImageInfo = siInfoRes
732 c.Assert(cli.initSessionAndPoller(), IsNil)
733 cs.log.ResetCapture()
734 cli.hasConnectivity = true
735 defer cli.session.Close()
736 cli.handleErr(errors.New("bananas"))
737 c.Check(cs.log.Captured(), Matches, ".*session exited.*bananas\n")
738}
739
740/*****************************************************************
741 seenStateFactory tests723 seenStateFactory tests
742******************************************************************/724******************************************************************/
743725
@@ -758,66 +740,6 @@
758}740}
759741
760/*****************************************************************742/*****************************************************************
761 handleConnState tests
762******************************************************************/
763
764type handleConnStateSession struct {
765 connected bool
766}
767
768func (s *handleConnStateSession) AutoRedial(ch chan uint32) { s.connected = true }
769func (s *handleConnStateSession) Close() { s.connected = false }
770func (s *handleConnStateSession) ClearCookie() {}
771func (s *handleConnStateSession) State() session.ClientSessionState { return session.Unknown }
772
773func (cs *clientSuite) TestHandleConnStateD2C(c *C) {
774 cli := NewPushClient(cs.configPath, cs.leveldbPath)
775 cli.log = cs.log
776 sess := &handleConnStateSession{connected: false}
777 cli.session = sess
778
779 c.Assert(cli.hasConnectivity, Equals, false)
780 cli.handleConnState(true)
781 c.Check(cli.hasConnectivity, Equals, true)
782 c.Check(sess.connected, Equals, true)
783}
784
785func (cs *clientSuite) TestHandleConnStateSame(c *C) {
786 cli := NewPushClient(cs.configPath, cs.leveldbPath)
787 cli.log = cs.log
788 // here we want to check that we don't do anything
789 c.Assert(cli.session, IsNil)
790 c.Assert(cli.hasConnectivity, Equals, false)
791 cli.handleConnState(false)
792 c.Check(cli.session, IsNil)
793
794 cli.hasConnectivity = true
795 cli.handleConnState(true)
796 c.Check(cli.session, IsNil)
797}
798
799func (cs *clientSuite) TestHandleConnStateC2D(c *C) {
800 cli := NewPushClient(cs.configPath, cs.leveldbPath)
801 cli.log = cs.log
802 sess := &handleConnStateSession{connected: true}
803 cli.session = sess
804 cli.hasConnectivity = true
805
806 cli.handleConnState(false)
807 c.Check(sess.connected, Equals, false)
808}
809
810func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) {
811 cli := NewPushClient(cs.configPath, cs.leveldbPath)
812 cli.log = cs.log
813 cli.session, _ = session.NewSession(cli.config.Addr, cli.deriveSessionConfig(nil), cli.deviceId, seenstate.NewSeenState, cs.log)
814 cli.hasConnectivity = true
815
816 cli.handleConnState(false)
817 c.Check(cli.session.State(), Equals, session.Disconnected)
818}
819
820/*****************************************************************
821 filterBroadcastNotification tests743 filterBroadcastNotification tests
822******************************************************************/744******************************************************************/
823745
@@ -1035,7 +957,6 @@
1035var nopConn = func(bool) {}957var nopConn = func(bool) {}
1036var nopBcast = func(*session.BroadcastNotification) error { return nil }958var nopBcast = func(*session.BroadcastNotification) error { return nil }
1037var nopUcast = func(session.AddressedNotification) error { return nil }959var nopUcast = func(session.AddressedNotification) error { return nil }
1038var nopError = func(error) {}
1039var nopUnregister = func(*click.AppId) {}960var nopUnregister = func(*click.AppId) {}
1040var nopAcct = func() {}961var nopAcct = func() {}
1041962
@@ -1048,7 +969,7 @@
1048 c.Assert(cli.initSessionAndPoller(), IsNil)969 c.Assert(cli.initSessionAndPoller(), IsNil)
1049970
1050 ch := make(chan bool, 1)971 ch := make(chan bool, 1)
1051 go cli.doLoop(func(bool) { ch <- true }, nopBcast, nopUcast, nopError, nopUnregister, nopAcct)972 go cli.doLoop(func(bool) { ch <- true }, nopBcast, nopUcast, nopUnregister, nopAcct)
1052 c.Check(takeNextBool(ch), Equals, true)973 c.Check(takeNextBool(ch), Equals, true)
1053}974}
1054975
@@ -1061,7 +982,7 @@
1061 cli.broadcastCh <- &session.BroadcastNotification{}982 cli.broadcastCh <- &session.BroadcastNotification{}
1062983
1063 ch := make(chan bool, 1)984 ch := make(chan bool, 1)
1064 go cli.doLoop(nopConn, func(_ *session.BroadcastNotification) error { ch <- true; return nil }, nopUcast, nopError, nopUnregister, nopAcct)985 go cli.doLoop(nopConn, func(_ *session.BroadcastNotification) error { ch <- true; return nil }, nopUcast, nopUnregister, nopAcct)
1065 c.Check(takeNextBool(ch), Equals, true)986 c.Check(takeNextBool(ch), Equals, true)
1066}987}
1067988
@@ -1074,20 +995,7 @@
1074 cli.notificationsCh <- session.AddressedNotification{}995 cli.notificationsCh <- session.AddressedNotification{}
1075996
1076 ch := make(chan bool, 1)997 ch := make(chan bool, 1)
1077 go cli.doLoop(nopConn, nopBcast, func(session.AddressedNotification) error { ch <- true; return nil }, nopError, nopUnregister, nopAcct)998 go cli.doLoop(nopConn, nopBcast, func(session.AddressedNotification) error { ch <- true; return nil }, nopUnregister, nopAcct)
1078 c.Check(takeNextBool(ch), Equals, true)
1079}
1080
1081func (cs *clientSuite) TestDoLoopErr(c *C) {
1082 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1083 cli.log = cs.log
1084 cli.systemImageInfo = siInfoRes
1085 c.Assert(cli.initSessionAndPoller(), IsNil)
1086 cli.errCh = make(chan error, 1)
1087 cli.errCh <- nil
1088
1089 ch := make(chan bool, 1)
1090 go cli.doLoop(nopConn, nopBcast, nopUcast, func(error) { ch <- true }, nopUnregister, nopAcct)
1091 c.Check(takeNextBool(ch), Equals, true)999 c.Check(takeNextBool(ch), Equals, true)
1092}1000}
10931001
@@ -1100,7 +1008,7 @@
1100 cli.unregisterCh <- app11008 cli.unregisterCh <- app1
11011009
1102 ch := make(chan bool, 1)1010 ch := make(chan bool, 1)
1103 go cli.doLoop(nopConn, nopBcast, nopUcast, nopError, func(app *click.AppId) { c.Check(app.Original(), Equals, appId1); ch <- true }, nopAcct)1011 go cli.doLoop(nopConn, nopBcast, nopUcast, func(app *click.AppId) { c.Check(app.Original(), Equals, appId1); ch <- true }, nopAcct)
1104 c.Check(takeNextBool(ch), Equals, true)1012 c.Check(takeNextBool(ch), Equals, true)
1105}1013}
11061014
@@ -1114,7 +1022,7 @@
1114 cli.accountsCh = acctCh1022 cli.accountsCh = acctCh
11151023
1116 ch := make(chan bool, 1)1024 ch := make(chan bool, 1)
1117 go cli.doLoop(nopConn, nopBcast, nopUcast, nopError, nopUnregister, func() { ch <- true })1025 go cli.doLoop(nopConn, nopBcast, nopUcast, nopUnregister, func() { ch <- true })
1118 c.Check(takeNextBool(ch), Equals, true)1026 c.Check(takeNextBool(ch), Equals, true)
1119}1027}
11201028
@@ -1149,6 +1057,21 @@
1149 Loop() tests1057 Loop() tests
1150******************************************************************/1058******************************************************************/
11511059
1060type loopSession struct{ hasConn bool }
1061
1062func (s *loopSession) Close() {}
1063func (s *loopSession) ClearCookie() {}
1064func (s *loopSession) State() session.ClientSessionState {
1065 if s.hasConn {
1066 return session.Connected
1067 } else {
1068 return session.Disconnected
1069 }
1070}
1071func (s *loopSession) HasConnectivity(hasConn bool) error { s.hasConn = hasConn; return nil }
1072func (s *loopSession) KeepConnection() error { return nil }
1073func (s *loopSession) StopKeepConnection() {}
1074
1152func (cs *clientSuite) TestLoop(c *C) {1075func (cs *clientSuite) TestLoop(c *C) {
1153 cli := NewPushClient(cs.configPath, cs.leveldbPath)1076 cli := NewPushClient(cs.configPath, cs.leveldbPath)
1154 cli.connCh = make(chan bool)1077 cli.connCh = make(chan bool)
@@ -1164,7 +1087,6 @@
1164 c.Assert(cli.initSessionAndPoller(), IsNil)1087 c.Assert(cli.initSessionAndPoller(), IsNil)
11651088
1166 cli.broadcastCh = make(chan *session.BroadcastNotification)1089 cli.broadcastCh = make(chan *session.BroadcastNotification)
1167 cli.errCh = make(chan error)
11681090
1169 // we use tick() to make sure things have been through the1091 // we use tick() to make sure things have been through the
1170 // event loop at least once before looking at things;1092 // event loop at least once before looking at things;
@@ -1179,26 +1101,25 @@
1179 tick()1101 tick()
1180 c.Check(cs.log.Captured(), Matches, "(?msi).*Session connected after 42 attempts$")1102 c.Check(cs.log.Captured(), Matches, "(?msi).*Session connected after 42 attempts$")
11811103
1104 c.Assert(cli.session, NotNil)
1105 cli.session.StopKeepConnection()
1106 cli.session = &loopSession{}
1107
1182 // loop() should have connected:1108 // loop() should have connected:
1183 // * connCh to the connectivity checker1109 // * connCh to the connectivity checker
1184 c.Check(cli.hasConnectivity, Equals, false)1110 c.Check(cli.session.State(), Equals, session.Disconnected)
1185 cli.connCh <- true1111 cli.connCh <- true
1186 tick()1112 tick()
1187 c.Check(cli.hasConnectivity, Equals, true)1113 c.Check(cli.session.State(), Equals, session.Connected)
1188 cli.connCh <- false1114 cli.connCh <- false
1189 tick()1115 tick()
1190 c.Check(cli.hasConnectivity, Equals, false)1116 c.Check(cli.session.State(), Equals, session.Disconnected)
11911117
1192 // * session.BroadcastCh to the notifications handler1118 // * session.BroadcastCh to the notifications handler
1193 c.Check(d.bcastCount, Equals, 0)1119 c.Check(d.bcastCount, Equals, 0)
1194 cli.broadcastCh <- positiveBroadcastNotification1120 cli.broadcastCh <- positiveBroadcastNotification
1195 tick()1121 tick()
1196 c.Check(d.bcastCount, Equals, 1)1122 c.Check(d.bcastCount, Equals, 1)
1197
1198 // * session.ErrCh to the error handler
1199 cli.errCh <- nil
1200 tick()
1201 c.Check(cs.log.Captured(), Matches, "(?ms).*session exited.*")
1202}1123}
12031124
1204/*****************************************************************1125/*****************************************************************
12051126
=== modified file 'client/session/session.go'
--- client/session/session.go 2015-03-12 12:05:40 +0000
+++ client/session/session.go 2015-03-17 16:47:16 +0000
@@ -70,10 +70,12 @@
7070
71const (71const (
72 Error ClientSessionState = iota72 Error ClientSessionState = iota
73 Pristine
73 Disconnected74 Disconnected
74 Connected75 Connected
75 Started76 Started
76 Running77 Running
78 Shutdown
77 Unknown79 Unknown
78)80)
7981
@@ -83,10 +85,12 @@
83 }85 }
84 return [Unknown]string{86 return [Unknown]string{
85 "Error",87 "Error",
88 "Pristine",
86 "Disconnected",89 "Disconnected",
87 "Connected",90 "Connected",
88 "Started",91 "Started",
89 "Running",92 "Running",
93 "Shutdown",
90 }[s]94 }[s]
91}95}
9296
@@ -118,7 +122,6 @@
118 AuthGetter func(string) string122 AuthGetter func(string) string
119 AuthURL string123 AuthURL string
120 AddresseeChecker AddresseeChecking124 AddresseeChecker AddresseeChecking
121 ErrCh chan error
122 BroadcastCh chan *BroadcastNotification125 BroadcastCh chan *BroadcastNotification
123 NotificationsCh chan AddressedNotification126 NotificationsCh chan AddressedNotification
124}127}
@@ -126,9 +129,11 @@
126// ClientSession holds a client<->server session and its configuration.129// ClientSession holds a client<->server session and its configuration.
127type ClientSession interface {130type ClientSession interface {
128 Close()131 Close()
129 AutoRedial(doneCh chan uint32)
130 ClearCookie()132 ClearCookie()
131 State() ClientSessionState133 State() ClientSessionState
134 HasConnectivity(bool) error
135 KeepConnection() error
136 StopKeepConnection()
132}137}
133138
134type clientSession struct {139type clientSession struct {
@@ -169,6 +174,20 @@
169 redialJitter func(time.Duration) time.Duration174 redialJitter func(time.Duration) time.Duration
170 redialDelays []time.Duration175 redialDelays []time.Duration
171 redialDelaysIdx int176 redialDelaysIdx int
177 // connection events come in over here
178 connCh chan bool
179 // last seen connection event is here
180 lastConn bool
181 // connection events are handled by this
182 connHandler func(bool)
183 // autoredial goes over here (xxx spurious goroutine involved)
184 doneCh chan uint32
185 // main loop errors out through here (possibly another spurious goroutine)
186 errCh chan error
187 // main loop errors are handled by this
188 errHandler func(error)
189 // look, a stopper!
190 stopCh chan struct{}
172}191}
173192
174func redialDelay(sess *clientSession) time.Duration {193func redialDelay(sess *clientSession) time.Duration {
@@ -207,10 +226,10 @@
207 Protocolator: protocol.NewProtocol0,226 Protocolator: protocol.NewProtocol0,
208 SeenState: seenState,227 SeenState: seenState,
209 TLS: &tls.Config{},228 TLS: &tls.Config{},
210 state: Disconnected,229 state: Pristine,
211 timeSince: time.Since,230 timeSince: time.Since,
212 shouldDelayP: &shouldDelay,231 shouldDelayP: &shouldDelay,
213 redialDelay: redialDelay,232 redialDelay: redialDelay, // NOTE there are tests that use calling sess.redialDelay as an indication of calling autoRedial!
214 redialDelays: util.Timeouts(),233 redialDelays: util.Timeouts(),
215 }234 }
216 sess.redialJitter = sess.Jitter235 sess.redialJitter = sess.Jitter
@@ -222,6 +241,15 @@
222 }241 }
223 sess.TLS.RootCAs = cp242 sess.TLS.RootCAs = cp
224 }243 }
244 sess.doneCh = make(chan uint32, 1)
245 sess.stopCh = make(chan struct{})
246 sess.connCh = make(chan bool, 1)
247 sess.errCh = make(chan error, 1)
248
249 // to be overridden by tests
250 sess.connHandler = sess.handleConn
251 sess.errHandler = sess.handleErr
252
225 return sess, nil253 return sess, nil
226}254}
227255
@@ -383,7 +411,7 @@
383 }411 }
384}412}
385413
386func (sess *clientSession) AutoRedial(doneCh chan uint32) {414func (sess *clientSession) autoRedial() {
387 sess.stopRedial()415 sess.stopRedial()
388 if time.Since(sess.lastAutoRedial) < 2*time.Second {416 if time.Since(sess.lastAutoRedial) < 2*time.Second {
389 sess.setShouldDelay()417 sess.setShouldDelay()
@@ -405,7 +433,7 @@
405 return433 return
406 }434 }
407 sess.Log.Debugf("session autoredialier launching Redial goroutine")435 sess.Log.Debugf("session autoredialier launching Redial goroutine")
408 doneCh <- retrier.Redial()436 sess.doneCh <- retrier.Redial()
409 }()437 }()
410}438}
411439
@@ -657,7 +685,7 @@
657 if err == nil {685 if err == nil {
658 err = starter()686 err = starter()
659 if err == nil {687 if err == nil {
660 go func() { sess.ErrCh <- looper() }()688 go func() { sess.errCh <- looper() }()
661 }689 }
662 }690 }
663 return err691 return err
@@ -684,6 +712,77 @@
684 return sess.run(sess.doClose, sess.addAuthorization, sess.getHosts, sess.connect, sess.start, sess.loop)712 return sess.run(sess.doClose, sess.addAuthorization, sess.getHosts, sess.connect, sess.start, sess.loop)
685}713}
686714
715func (sess *clientSession) doKeepConnection() {
716Loop:
717 for {
718 select {
719 case hasConn := <-sess.connCh:
720 sess.connHandler(hasConn)
721 case <-sess.stopCh:
722 sess.Log.Infof("session shutting down.")
723 sess.connLock.Lock()
724 defer sess.connLock.Unlock()
725 sess.stopRedial()
726 if sess.Connection != nil {
727 sess.Connection.Close()
728 sess.Connection = nil
729 }
730 break Loop
731 case n := <-sess.doneCh:
732 sess.Log.Debugf("connected after %d attempts.", n)
733 case err := <-sess.errCh:
734 sess.errHandler(err)
735 }
736 }
737}
738
739func (sess *clientSession) handleConn(hasConn bool) {
740 sess.lastConn = hasConn
741
742 // Note this does not depend on the current state! That's because Dial
743 // starts with doClose, which gets you to Disconnected even if you're
744 // connected, and you can call Close when Disconnected without it
745 // losing its stuff.
746 if hasConn {
747 sess.autoRedial()
748 } else {
749 sess.Close()
750 }
751}
752
753func (sess *clientSession) handleErr(err error) {
754 sess.Log.Errorf("session error'ed out with %v", err)
755 sess.stateLock.Lock()
756 if sess.state == Disconnected && sess.lastConn {
757 sess.autoRedial()
758 }
759 sess.stateLock.Unlock()
760}
761
762func (sess *clientSession) KeepConnection() error {
763 sess.stateLock.Lock()
764 defer sess.stateLock.Unlock()
765 if sess.state != Pristine {
766 return errors.New("don't call KeepConnection() on a non-pristine session.")
767 }
768 sess.state = Disconnected
769
770 go sess.doKeepConnection()
771
772 return nil
773}
774
775func (sess *clientSession) StopKeepConnection() {
776 sess.setState(Shutdown)
777 close(sess.stopCh)
778}
779
780func (sess *clientSession) HasConnectivity(hasConn bool) error {
781 sess.connCh <- hasConn
782 // XXX throw errors if called from weird state?
783 return nil
784}
785
687func init() {786func init() {
688 rand.Seed(time.Now().Unix()) // good enough for us (we're not using it for crypto)787 rand.Seed(time.Now().Unix()) // good enough for us (we're not using it for crypto)
689}788}
690789
=== modified file 'client/session/session_test.go'
--- client/session/session_test.go 2015-02-13 11:20:41 +0000
+++ client/session/session_test.go 2015-03-17 16:47:16 +0000
@@ -190,6 +190,24 @@
190 cs.lvls = func() (seenstate.SeenState, error) { return seenstate.NewSqliteSeenState(":memory:") }190 cs.lvls = func() (seenstate.SeenState, error) { return seenstate.NewSqliteSeenState(":memory:") }
191}191}
192192
193func (cs *clientSessionSuite) TestStateString(c *C) {
194 for _, i := range []struct {
195 v ClientSessionState
196 s string
197 }{
198 {Error, "Error"},
199 {Pristine, "Pristine"},
200 {Disconnected, "Disconnected"},
201 {Connected, "Connected"},
202 {Started, "Started"},
203 {Running, "Running"},
204 {Shutdown, "Shutdown"},
205 {Unknown, fmt.Sprintf("??? (%d)", Unknown)},
206 } {
207 c.Check(i.v.String(), Equals, i.s)
208 }
209}
210
193/****************************************************************211/****************************************************************
194 parseServerAddrSpec() tests212 parseServerAddrSpec() tests
195****************************************************************/213****************************************************************/
@@ -214,7 +232,6 @@
214232
215func dummyConf() ClientSessionConfig {233func dummyConf() ClientSessionConfig {
216 return ClientSessionConfig{234 return ClientSessionConfig{
217 ErrCh: make(chan error, 1),
218 BroadcastCh: make(chan *BroadcastNotification, 5),235 BroadcastCh: make(chan *BroadcastNotification, 5),
219 NotificationsCh: make(chan AddressedNotification, 5),236 NotificationsCh: make(chan AddressedNotification, 5),
220 }237 }
@@ -231,7 +248,9 @@
231 c.Check(sess.redialDelays, DeepEquals, util.Timeouts())248 c.Check(sess.redialDelays, DeepEquals, util.Timeouts())
232 // but no root CAs set249 // but no root CAs set
233 c.Check(sess.TLS.RootCAs, IsNil)250 c.Check(sess.TLS.RootCAs, IsNil)
234 c.Check(sess.State(), Equals, Disconnected)251 c.Check(sess.State(), Equals, Pristine)
252 c.Check(sess.stopCh, NotNil)
253 c.Check(sess.connCh, NotNil)
235}254}
236255
237func (cs *clientSessionSuite) TestNewSessionHostEndpointWorks(c *C) {256func (cs *clientSessionSuite) TestNewSessionHostEndpointWorks(c *C) {
@@ -569,9 +588,9 @@
569 c.Check(ar.stopped, Equals, true)588 c.Check(ar.stopped, Equals, true)
570}589}
571590
572/****************************************************************591// /****************************************************************
573 AutoRedial() tests592// AutoRedial() tests
574****************************************************************/593// ****************************************************************/
575594
576func (cs *clientSessionSuite) TestAutoRedialWorks(c *C) {595func (cs *clientSessionSuite) TestAutoRedialWorks(c *C) {
577 // checks that AutoRedial sets up a retrier and tries redialing it596 // checks that AutoRedial sets up a retrier and tries redialing it
@@ -580,7 +599,8 @@
580 ar := new(derp)599 ar := new(derp)
581 sess.retrier = ar600 sess.retrier = ar
582 c.Check(ar.stopped, Equals, false)601 c.Check(ar.stopped, Equals, false)
583 sess.AutoRedial(nil)602 sess.autoRedial()
603 defer sess.stopRedial()
584 c.Check(ar.stopped, Equals, true)604 c.Check(ar.stopped, Equals, true)
585}605}
586606
@@ -588,20 +608,21 @@
588 // checks that AutoRedial stops the previous retrier608 // checks that AutoRedial stops the previous retrier
589 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)609 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
590 c.Assert(err, IsNil)610 c.Assert(err, IsNil)
591 ch := make(chan uint32)611 sess.doneCh = make(chan uint32)
592 c.Check(sess.retrier, IsNil)612 c.Check(sess.retrier, IsNil)
593 sess.AutoRedial(ch)613 sess.autoRedial()
594 c.Assert(sess.retrier, NotNil)614 c.Assert(sess.retrier, NotNil)
595 sess.retrier.Stop()615 sess.retrier.Stop()
596 c.Check(<-ch, Not(Equals), 0)616 c.Check(<-sess.doneCh, Not(Equals), 0)
597}617}
598618
599func (cs *clientSessionSuite) TestAutoRedialCallsRedialDelay(c *C) {619func (cs *clientSessionSuite) TestAutoRedialCallsRedialDelay(c *C) {
620 // NOTE there are tests that use calling redialDelay as an indication of calling autoRedial!
600 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)621 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
601 c.Assert(err, IsNil)622 c.Assert(err, IsNil)
602 flag := false623 flag := false
603 sess.redialDelay = func(sess *clientSession) time.Duration { flag = true; return 0 }624 sess.redialDelay = func(sess *clientSession) time.Duration { flag = true; return 0 }
604 sess.AutoRedial(nil)625 sess.autoRedial()
605 c.Check(flag, Equals, true)626 c.Check(flag, Equals, true)
606}627}
607628
@@ -609,11 +630,11 @@
609 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)630 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
610 c.Assert(err, IsNil)631 c.Assert(err, IsNil)
611 sess.redialDelay = func(sess *clientSession) time.Duration { return 0 }632 sess.redialDelay = func(sess *clientSession) time.Duration { return 0 }
612 sess.AutoRedial(nil)633 sess.autoRedial()
613 c.Check(sess.ShouldDelay(), Equals, false)634 c.Check(sess.ShouldDelay(), Equals, false)
614 sess.stopRedial()635 sess.stopRedial()
615 sess.clearShouldDelay()636 sess.clearShouldDelay()
616 sess.AutoRedial(nil)637 sess.autoRedial()
617 c.Check(sess.ShouldDelay(), Equals, true)638 c.Check(sess.ShouldDelay(), Equals, true)
618}639}
619640
@@ -625,7 +646,6 @@
625 sess *clientSession646 sess *clientSession
626 upCh chan interface{}647 upCh chan interface{}
627 downCh chan interface{}648 downCh chan interface{}
628 errCh chan error
629}649}
630650
631var _ = Suite(&msgSuite{})651var _ = Suite(&msgSuite{})
@@ -637,7 +657,6 @@
637 s.sess, err = NewSession("", conf, "wah", seenstate.NewSeenState, helpers.NewTestLogger(c, "debug"))657 s.sess, err = NewSession("", conf, "wah", seenstate.NewSeenState, helpers.NewTestLogger(c, "debug"))
638 c.Assert(err, IsNil)658 c.Assert(err, IsNil)
639 s.sess.Connection = &testConn{Name: "TestHandle*"}659 s.sess.Connection = &testConn{Name: "TestHandle*"}
640 s.errCh = conf.ErrCh
641 s.upCh = make(chan interface{}, 5)660 s.upCh = make(chan interface{}, 5)
642 s.downCh = make(chan interface{}, 5)661 s.downCh = make(chan interface{}, 5)
643 s.sess.proto = &testProtocol{up: s.upCh, down: s.downCh}662 s.sess.proto = &testProtocol{up: s.upCh, down: s.downCh}
@@ -696,10 +715,10 @@
696 json.RawMessage(`{"img1/m1":[102,"tubular"]}`),715 json.RawMessage(`{"img1/m1":[102,"tubular"]}`),
697 },716 },
698 }717 }
699 go func() { s.errCh <- s.sess.handleBroadcast(msg) }()718 go func() { s.sess.errCh <- s.sess.handleBroadcast(msg) }()
700 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})719 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
701 s.upCh <- nil // ack ok720 s.upCh <- nil // ack ok
702 c.Check(<-s.errCh, Equals, nil)721 c.Check(<-s.sess.errCh, Equals, nil)
703 c.Assert(len(s.sess.BroadcastCh), Equals, 1)722 c.Assert(len(s.sess.BroadcastCh), Equals, 1)
704 c.Check(<-s.sess.BroadcastCh, DeepEquals, &BroadcastNotification{723 c.Check(<-s.sess.BroadcastCh, DeepEquals, &BroadcastNotification{
705 TopLevel: 2,724 TopLevel: 2,
@@ -728,11 +747,11 @@
728 TopLevel: 2,747 TopLevel: 2,
729 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},748 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
730 }749 }
731 go func() { s.errCh <- s.sess.handleBroadcast(msg) }()750 go func() { s.sess.errCh <- s.sess.handleBroadcast(msg) }()
732 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})751 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
733 failure := errors.New("ACK ACK ACK")752 failure := errors.New("ACK ACK ACK")
734 s.upCh <- failure753 s.upCh <- failure
735 c.Assert(<-s.errCh, Equals, failure)754 c.Assert(<-s.sess.errCh, Equals, failure)
736 c.Check(s.sess.State(), Equals, Error)755 c.Check(s.sess.State(), Equals, Error)
737}756}
738757
@@ -746,10 +765,10 @@
746 TopLevel: 2,765 TopLevel: 2,
747 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},766 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
748 }767 }
749 go func() { s.errCh <- s.sess.handleBroadcast(msg) }()768 go func() { s.sess.errCh <- s.sess.handleBroadcast(msg) }()
750 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})769 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
751 s.upCh <- nil // ack ok770 s.upCh <- nil // ack ok
752 c.Check(<-s.errCh, IsNil)771 c.Check(<-s.sess.errCh, IsNil)
753 c.Check(len(s.sess.BroadcastCh), Equals, 0)772 c.Check(len(s.sess.BroadcastCh), Equals, 0)
754}773}
755774
@@ -764,10 +783,10 @@
764 TopLevel: 2,783 TopLevel: 2,
765 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},784 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
766 }785 }
767 go func() { s.errCh <- s.sess.handleBroadcast(msg) }()786 go func() { s.sess.errCh <- s.sess.handleBroadcast(msg) }()
768 s.upCh <- nil // ack ok787 s.upCh <- nil // ack ok
769 // start returns with error788 // start returns with error
770 c.Check(<-s.errCh, Not(Equals), nil)789 c.Check(<-s.sess.errCh, Not(Equals), nil)
771 c.Check(s.sess.State(), Equals, Error)790 c.Check(s.sess.State(), Equals, Error)
772 // no message sent out791 // no message sent out
773 c.Check(len(s.sess.BroadcastCh), Equals, 0)792 c.Check(len(s.sess.BroadcastCh), Equals, 0)
@@ -780,10 +799,10 @@
780 s.sess.setShouldDelay()799 s.sess.setShouldDelay()
781800
782 msg := &serverMsg{Type: "broadcast"}801 msg := &serverMsg{Type: "broadcast"}
783 go func() { s.errCh <- s.sess.handleBroadcast(msg) }()802 go func() { s.sess.errCh <- s.sess.handleBroadcast(msg) }()
784 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})803 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
785 s.upCh <- nil // ack ok804 s.upCh <- nil // ack ok
786 c.Check(<-s.errCh, IsNil)805 c.Check(<-s.sess.errCh, IsNil)
787806
788 c.Check(s.sess.ShouldDelay(), Equals, false)807 c.Check(s.sess.ShouldDelay(), Equals, false)
789}808}
@@ -792,10 +811,10 @@
792 s.sess.setShouldDelay()811 s.sess.setShouldDelay()
793812
794 msg := &serverMsg{Type: "broadcast"}813 msg := &serverMsg{Type: "broadcast"}
795 go func() { s.errCh <- s.sess.handleBroadcast(msg) }()814 go func() { s.sess.errCh <- s.sess.handleBroadcast(msg) }()
796 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})815 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
797 s.upCh <- errors.New("bcast")816 s.upCh <- errors.New("bcast")
798 c.Check(<-s.errCh, NotNil)817 c.Check(<-s.sess.errCh, NotNil)
799818
800 c.Check(s.sess.ShouldDelay(), Equals, true)819 c.Check(s.sess.ShouldDelay(), Equals, true)
801}820}
@@ -845,10 +864,10 @@
845 msg.NotificationsMsg = protocol.NotificationsMsg{864 msg.NotificationsMsg = protocol.NotificationsMsg{
846 Notifications: []protocol.Notification{n1, n2},865 Notifications: []protocol.Notification{n1, n2},
847 }866 }
848 go func() { s.errCh <- s.sess.handleNotifications(msg) }()867 go func() { s.sess.errCh <- s.sess.handleNotifications(msg) }()
849 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})868 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
850 s.upCh <- nil // ack ok869 s.upCh <- nil // ack ok
851 c.Check(<-s.errCh, Equals, nil)870 c.Check(<-s.sess.errCh, Equals, nil)
852 c.Check(s.sess.ShouldDelay(), Equals, false)871 c.Check(s.sess.ShouldDelay(), Equals, false)
853 c.Assert(s.sess.NotificationsCh, HasLen, 2)872 c.Assert(s.sess.NotificationsCh, HasLen, 2)
854 app1, err := click.ParseAppId("com.example.app1_app1")873 app1, err := click.ParseAppId("com.example.app1_app1")
@@ -891,10 +910,10 @@
891 msg.NotificationsMsg = protocol.NotificationsMsg{910 msg.NotificationsMsg = protocol.NotificationsMsg{
892 Notifications: []protocol.Notification{n1, n2},911 Notifications: []protocol.Notification{n1, n2},
893 }912 }
894 go func() { s.errCh <- s.sess.handleNotifications(msg) }()913 go func() { s.sess.errCh <- s.sess.handleNotifications(msg) }()
895 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})914 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
896 s.upCh <- nil // ack ok915 s.upCh <- nil // ack ok
897 c.Check(<-s.errCh, Equals, nil)916 c.Check(<-s.sess.errCh, Equals, nil)
898 c.Check(s.sess.ShouldDelay(), Equals, false)917 c.Check(s.sess.ShouldDelay(), Equals, false)
899 c.Assert(s.sess.NotificationsCh, HasLen, 1)918 c.Assert(s.sess.NotificationsCh, HasLen, 1)
900 app2, err := click.ParseAppId("com.example.app2_app2")919 app2, err := click.ParseAppId("com.example.app2_app2")
@@ -926,10 +945,10 @@
926 msg.NotificationsMsg = protocol.NotificationsMsg{945 msg.NotificationsMsg = protocol.NotificationsMsg{
927 Notifications: []protocol.Notification{n1, n2},946 Notifications: []protocol.Notification{n1, n2},
928 }947 }
929 go func() { s.errCh <- s.sess.handleNotifications(msg) }()948 go func() { s.sess.errCh <- s.sess.handleNotifications(msg) }()
930 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})949 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
931 s.upCh <- nil // ack ok950 s.upCh <- nil // ack ok
932 c.Check(<-s.errCh, Equals, nil)951 c.Check(<-s.sess.errCh, Equals, nil)
933 c.Assert(s.sess.NotificationsCh, HasLen, 2)952 c.Assert(s.sess.NotificationsCh, HasLen, 2)
934 app1, err := click.ParseAppId("com.example.app1_app1")953 app1, err := click.ParseAppId("com.example.app1_app1")
935 c.Assert(err, IsNil)954 c.Assert(err, IsNil)
@@ -946,10 +965,10 @@
946 c.Check(ac.ops, HasLen, 3)965 c.Check(ac.ops, HasLen, 3)
947966
948 // second time they get ignored967 // second time they get ignored
949 go func() { s.errCh <- s.sess.handleNotifications(msg) }()968 go func() { s.sess.errCh <- s.sess.handleNotifications(msg) }()
950 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})969 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
951 s.upCh <- nil // ack ok970 s.upCh <- nil // ack ok
952 c.Check(<-s.errCh, Equals, nil)971 c.Check(<-s.sess.errCh, Equals, nil)
953 c.Assert(s.sess.NotificationsCh, HasLen, 0)972 c.Assert(s.sess.NotificationsCh, HasLen, 0)
954 c.Check(ac.ops, HasLen, 4)973 c.Check(ac.ops, HasLen, 4)
955}974}
@@ -966,11 +985,11 @@
966 msg.NotificationsMsg = protocol.NotificationsMsg{985 msg.NotificationsMsg = protocol.NotificationsMsg{
967 Notifications: []protocol.Notification{n1},986 Notifications: []protocol.Notification{n1},
968 }987 }
969 go func() { s.errCh <- s.sess.handleNotifications(msg) }()988 go func() { s.sess.errCh <- s.sess.handleNotifications(msg) }()
970 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})989 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
971 failure := errors.New("ACK ACK ACK")990 failure := errors.New("ACK ACK ACK")
972 s.upCh <- failure991 s.upCh <- failure
973 c.Assert(<-s.errCh, Equals, failure)992 c.Assert(<-s.sess.errCh, Equals, failure)
974 c.Check(s.sess.State(), Equals, Error)993 c.Check(s.sess.State(), Equals, Error)
975 // didn't get to clear994 // didn't get to clear
976 c.Check(s.sess.ShouldDelay(), Equals, true)995 c.Check(s.sess.ShouldDelay(), Equals, true)
@@ -989,10 +1008,10 @@
989 msg.NotificationsMsg = protocol.NotificationsMsg{1008 msg.NotificationsMsg = protocol.NotificationsMsg{
990 Notifications: []protocol.Notification{n1},1009 Notifications: []protocol.Notification{n1},
991 }1010 }
992 go func() { s.errCh <- s.sess.handleNotifications(msg) }()1011 go func() { s.sess.errCh <- s.sess.handleNotifications(msg) }()
993 s.upCh <- nil // ack ok1012 s.upCh <- nil // ack ok
994 // start returns with error1013 // start returns with error
995 c.Check(<-s.errCh, Not(Equals), nil)1014 c.Check(<-s.sess.errCh, Not(Equals), nil)
996 c.Check(s.sess.State(), Equals, Error)1015 c.Check(s.sess.State(), Equals, Error)
997 // no message sent out1016 // no message sent out
998 c.Check(len(s.sess.NotificationsCh), Equals, 0)1017 c.Check(len(s.sess.NotificationsCh), Equals, 0)
@@ -1013,8 +1032,8 @@
1013 msg.ConnBrokenMsg = protocol.ConnBrokenMsg{1032 msg.ConnBrokenMsg = protocol.ConnBrokenMsg{
1014 Reason: "REASON",1033 Reason: "REASON",
1015 }1034 }
1016 go func() { s.errCh <- s.sess.handleConnBroken(msg) }()1035 go func() { s.sess.errCh <- s.sess.handleConnBroken(msg) }()
1017 c.Check(<-s.errCh, ErrorMatches, "server broke connection: REASON")1036 c.Check(<-s.sess.errCh, ErrorMatches, "server broke connection: REASON")
1018 c.Check(s.sess.State(), Equals, Error)1037 c.Check(s.sess.State(), Equals, Error)
1019}1038}
10201039
@@ -1025,8 +1044,8 @@
1025 Reason: protocol.BrokenHostMismatch,1044 Reason: protocol.BrokenHostMismatch,
1026 }1045 }
1027 s.sess.deliveryHosts = []string{"foo:443", "bar:443"}1046 s.sess.deliveryHosts = []string{"foo:443", "bar:443"}
1028 go func() { s.errCh <- s.sess.handleConnBroken(msg) }()1047 go func() { s.sess.errCh <- s.sess.handleConnBroken(msg) }()
1029 c.Check(<-s.errCh, ErrorMatches, "server broke connection: host-mismatch")1048 c.Check(<-s.sess.errCh, ErrorMatches, "server broke connection: host-mismatch")
1030 c.Check(s.sess.State(), Equals, Error)1049 c.Check(s.sess.State(), Equals, Error)
1031 // hosts were reset1050 // hosts were reset
1032 c.Check(s.sess.deliveryHosts, IsNil)1051 c.Check(s.sess.deliveryHosts, IsNil)
@@ -1044,14 +1063,14 @@
1044 (*msgSuite)(s).SetUpTest(c)1063 (*msgSuite)(s).SetUpTest(c)
1045 s.sess.Connection.(*testConn).Name = "TestLoop*"1064 s.sess.Connection.(*testConn).Name = "TestLoop*"
1046 go func() {1065 go func() {
1047 s.errCh <- s.sess.loop()1066 s.sess.errCh <- s.sess.loop()
1048 }()1067 }()
1049}1068}
10501069
1051func (s *loopSuite) TestLoopReadError(c *C) {1070func (s *loopSuite) TestLoopReadError(c *C) {
1052 c.Check(s.sess.State(), Equals, Running)1071 c.Check(s.sess.State(), Equals, Running)
1053 s.upCh <- errors.New("Read")1072 s.upCh <- errors.New("Read")
1054 err := <-s.errCh1073 err := <-s.sess.errCh
1055 c.Check(err, ErrorMatches, "Read")1074 c.Check(err, ErrorMatches, "Read")
1056 c.Check(s.sess.State(), Equals, Error)1075 c.Check(s.sess.State(), Equals, Error)
1057}1076}
@@ -1063,7 +1082,7 @@
1063 c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})1082 c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})
1064 failure := errors.New("pong")1083 failure := errors.New("pong")
1065 s.upCh <- failure1084 s.upCh <- failure
1066 c.Check(<-s.errCh, Equals, failure)1085 c.Check(<-s.sess.errCh, Equals, failure)
1067}1086}
10681087
1069func (s *loopSuite) TestLoopLoopsDaLoop(c *C) {1088func (s *loopSuite) TestLoopLoopsDaLoop(c *C) {
@@ -1076,7 +1095,7 @@
1076 }1095 }
1077 failure := errors.New("pong")1096 failure := errors.New("pong")
1078 s.upCh <- failure1097 s.upCh <- failure
1079 c.Check(<-s.errCh, Equals, failure)1098 c.Check(<-s.sess.errCh, Equals, failure)
1080}1099}
10811100
1082func (s *loopSuite) TestLoopBroadcast(c *C) {1101func (s *loopSuite) TestLoopBroadcast(c *C) {
@@ -1093,7 +1112,7 @@
1093 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})1112 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
1094 failure := errors.New("ack")1113 failure := errors.New("ack")
1095 s.upCh <- failure1114 s.upCh <- failure
1096 c.Check(<-s.errCh, Equals, failure)1115 c.Check(<-s.sess.errCh, Equals, failure)
1097}1116}
10981117
1099func (s *loopSuite) TestLoopNotifications(c *C) {1118func (s *loopSuite) TestLoopNotifications(c *C) {
@@ -1113,7 +1132,7 @@
1113 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})1132 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
1114 failure := errors.New("ack")1133 failure := errors.New("ack")
1115 s.upCh <- failure1134 s.upCh <- failure
1116 c.Check(<-s.errCh, Equals, failure)1135 c.Check(<-s.sess.errCh, Equals, failure)
1117}1136}
11181137
1119func (s *loopSuite) TestLoopSetParams(c *C) {1138func (s *loopSuite) TestLoopSetParams(c *C) {
@@ -1126,7 +1145,7 @@
1126 s.upCh <- setParams1145 s.upCh <- setParams
1127 failure := errors.New("fail")1146 failure := errors.New("fail")
1128 s.upCh <- failure1147 s.upCh <- failure
1129 c.Assert(<-s.errCh, Equals, failure)1148 c.Assert(<-s.sess.errCh, Equals, failure)
1130 c.Check(s.sess.getCookie(), Equals, "COOKIE")1149 c.Check(s.sess.getCookie(), Equals, "COOKIE")
1131}1150}
11321151
@@ -1138,7 +1157,7 @@
1138 }1157 }
1139 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")1158 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
1140 s.upCh <- broken1159 s.upCh <- broken
1141 c.Check(<-s.errCh, NotNil)1160 c.Check(<-s.sess.errCh, NotNil)
1142}1161}
11431162
1144func (s *loopSuite) TestLoopConnWarn(c *C) {1163func (s *loopSuite) TestLoopConnWarn(c *C) {
@@ -1159,7 +1178,7 @@
1159 s.upCh <- warn1178 s.upCh <- warn
1160 s.upCh <- connwarn1179 s.upCh <- connwarn
1161 s.upCh <- failure1180 s.upCh <- failure
1162 c.Check(<-s.errCh, Equals, failure)1181 c.Check(<-s.sess.errCh, Equals, failure)
1163 c.Check(log.Captured(),1182 c.Check(log.Captured(),
1164 Matches, `(?ms).* warning: XXX$.*`)1183 Matches, `(?ms).* warning: XXX$.*`)
1165 c.Check(log.Captured(),1184 c.Check(log.Captured(),
@@ -1426,12 +1445,12 @@
1426 func() error { sess.BroadcastCh <- notf; return <-failureCh })1445 func() error { sess.BroadcastCh <- notf; return <-failureCh })
1427 c.Check(err, Equals, nil)1446 c.Check(err, Equals, nil)
1428 // if run doesn't error it sets up the channels1447 // if run doesn't error it sets up the channels
1429 c.Assert(sess.ErrCh, NotNil)1448 c.Assert(sess.errCh, NotNil)
1430 c.Assert(sess.BroadcastCh, NotNil)1449 c.Assert(sess.BroadcastCh, NotNil)
1431 c.Check(<-sess.BroadcastCh, Equals, notf)1450 c.Check(<-sess.BroadcastCh, Equals, notf)
1432 failure := errors.New("TestRunRunsEvenIfLoopFails")1451 failure := errors.New("TestRunRunsEvenIfLoopFails")
1433 failureCh <- failure1452 failureCh <- failure
1434 c.Check(<-sess.ErrCh, Equals, failure)1453 c.Check(<-sess.errCh, Equals, failure)
1435 // so now you know it was running in a goroutine :)1454 // so now you know it was running in a goroutine :)
1436}1455}
14371456
@@ -1632,7 +1651,7 @@
1632 c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})1651 c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"})
1633 failure := errors.New("pongs")1652 failure := errors.New("pongs")
1634 upCh <- failure1653 upCh <- failure
1635 c.Check(<-sess.ErrCh, Equals, failure)1654 c.Check(<-sess.errCh, Equals, failure)
1636}1655}
16371656
1638func (cs *clientSessionSuite) TestDialWorksDirect(c *C) {1657func (cs *clientSessionSuite) TestDialWorksDirect(c *C) {
@@ -1703,3 +1722,191 @@
1703 sess.ClearCookie()1722 sess.ClearCookie()
1704 c.Check(sess.getCookie(), Equals, "")1723 c.Check(sess.getCookie(), Equals, "")
1705}1724}
1725
1726/****************************************************************
1727 KeepConnection() (and related) tests
1728****************************************************************/
1729
1730func (cs *clientSessionSuite) TestKeepConnectionDoesNothingIfNotConnected(c *C) {
1731 // how do you test "does nothing?"
1732 sess, err := NewSession("foo:443", dummyConf(), "", cs.lvls, cs.log)
1733 c.Assert(err, IsNil)
1734 c.Assert(sess, NotNil)
1735 c.Assert(sess.State(), Equals, Pristine)
1736 c.Assert(sess.KeepConnection(), IsNil)
1737 // stopCh is meant to be used just for closing it, but abusing
1738 // it for testing seems the right thing to do: this ensures
1739 // the thing is ticking along before we check the state of
1740 // stuff.
1741 sess.stopCh <- struct{}{}
1742 c.Check(sess.State(), Equals, Disconnected)
1743}
1744
1745func (cs *clientSessionSuite) TestYouCantCallKeepConnectionTwice(c *C) {
1746 sess, err := NewSession("foo:443", dummyConf(), "", cs.lvls, cs.log)
1747 c.Assert(err, IsNil)
1748 c.Assert(sess, NotNil)
1749 c.Assert(sess.State(), Equals, Pristine)
1750 c.Assert(sess.KeepConnection(), IsNil)
1751 defer sess.StopKeepConnection()
1752 c.Check(sess.KeepConnection(), NotNil)
1753}
1754
1755func (cs *clientSessionSuite) TestStopKeepConnectionShutsdown(c *C) {
1756 sess, err := NewSession("foo:443", dummyConf(), "", cs.lvls, cs.log)
1757 c.Assert(err, IsNil)
1758 c.Assert(sess, NotNil)
1759 sess.StopKeepConnection()
1760 c.Check(sess.State(), Equals, Shutdown)
1761}
1762
1763func (cs *clientSessionSuite) TestHasConnectivityTriggersConnectivityHandler(c *C) {
1764 sess, err := NewSession("foo:443", dummyConf(), "", cs.lvls, cs.log)
1765 c.Assert(err, IsNil)
1766 c.Assert(sess, NotNil)
1767 testCh := make(chan bool)
1768 sess.connHandler = func(p bool) { testCh <- p }
1769 go sess.doKeepConnection()
1770 defer sess.StopKeepConnection()
1771 sess.HasConnectivity(true)
1772 c.Check(<-testCh, Equals, true)
1773 sess.HasConnectivity(false)
1774 c.Check(<-testCh, Equals, false)
1775}
1776
1777func (cs *clientSessionSuite) TestDoneChIsEmptiedAndLogged(c *C) {
1778 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
1779 c.Assert(err, IsNil)
1780 sess.doneCh = make(chan uint32) // unbuffered
1781
1782 sess.KeepConnection()
1783 defer sess.StopKeepConnection()
1784
1785 sess.doneCh <- 23
1786
1787 c.Check(cs.log.Captured(),
1788 Matches, `(?ms).* connected after 23 attempts\.`)
1789}
1790
1791func (cs *clientSessionSuite) TestErrChIsEmptiedAndLoggedAndAutoRedial(c *C) {
1792 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
1793 c.Assert(err, IsNil)
1794 ch := make(chan struct{}, 1)
1795 sess.errCh = make(chan error) // unbuffered
1796 sess.redialDelay = func(sess *clientSession) time.Duration { ch <- struct{}{}; return 0 }
1797 sess.lastConn = true // -> autoRedial, if the session is in Disconnected
1798
1799 sess.KeepConnection()
1800 defer sess.StopKeepConnection()
1801
1802 sess.errCh <- errors.New("potato")
1803 c.Assert(sess.State(), Equals, Disconnected)
1804 select {
1805 case <-ch:
1806 // all ok
1807 case <-time.After(100 * time.Millisecond):
1808 c.Fatalf("redialDelay not called (-> autoRedial not called)?")
1809 }
1810
1811 c.Check(cs.log.Captured(),
1812 Matches, `(?ms).* session error.*potato`)
1813}
1814
1815func (cs *clientSessionSuite) TestErrChIsEmptiedAndLoggedNoAutoRedial(c *C) {
1816 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
1817 c.Assert(err, IsNil)
1818 ch := make(chan struct{}, 1)
1819 sess.errCh = make(chan error) // unbuffered
1820 sess.redialDelay = func(sess *clientSession) time.Duration { ch <- struct{}{}; return 0 }
1821 sess.connHandler = func(bool) {}
1822 sess.lastConn = false // so, no autoredial
1823
1824 sess.KeepConnection()
1825 defer sess.StopKeepConnection()
1826
1827 sess.errCh <- errors.New("potato")
1828 c.Assert(sess.State(), Equals, Disconnected)
1829 select {
1830 case <-ch:
1831 c.Fatalf("redialDelay called (-> autoRedial called) when disconnected?")
1832 case <-time.After(100 * time.Millisecond):
1833 // all ok
1834 }
1835
1836 c.Check(cs.log.Captured(),
1837 Matches, `(?ms).* session error.*potato`)
1838}
1839
1840func (cs *clientSessionSuite) TestHandleConnConnFromConnected(c *C) {
1841 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
1842 c.Assert(err, IsNil)
1843 ch := make(chan struct{}, 1)
1844 sess.redialDelay = func(sess *clientSession) time.Duration { ch <- struct{}{}; return 0 }
1845 sess.state = Connected
1846 sess.lastConn = true
1847 sess.handleConn(true)
1848 c.Check(sess.lastConn, Equals, true)
1849
1850 select {
1851 case <-ch:
1852 // all ok
1853 case <-time.After(100 * time.Millisecond):
1854 c.Fatalf("redialDelay not called (-> autoRedial not called)?")
1855 }
1856}
1857
1858func (cs *clientSessionSuite) TestHandleConnConnFromDisconnected(c *C) {
1859 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
1860 c.Assert(err, IsNil)
1861 ch := make(chan struct{}, 1)
1862 sess.redialDelay = func(sess *clientSession) time.Duration { ch <- struct{}{}; return 0 }
1863 sess.state = Disconnected
1864 sess.lastConn = false
1865 sess.handleConn(true)
1866 c.Check(sess.lastConn, Equals, true)
1867
1868 select {
1869 case <-ch:
1870 // all ok
1871 case <-time.After(100 * time.Millisecond):
1872 c.Fatalf("redialDelay not called (-> autoRedial not called)?")
1873 }
1874}
1875
1876func (cs *clientSessionSuite) TestHandleConnNotConnFromDisconnected(c *C) {
1877 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
1878 c.Assert(err, IsNil)
1879 ch := make(chan struct{}, 1)
1880 sess.redialDelay = func(sess *clientSession) time.Duration { ch <- struct{}{}; return 0 }
1881 sess.state = Disconnected
1882 sess.lastConn = false
1883 sess.handleConn(false)
1884 c.Check(sess.lastConn, Equals, false)
1885
1886 select {
1887 case <-ch:
1888 c.Fatalf("redialDelay called (-> autoRedial called)?")
1889 case <-time.After(100 * time.Millisecond):
1890 // all ok
1891 }
1892 c.Check(cs.log.Captured(), Matches, `(?ms).*-> Disconnected`)
1893}
1894
1895func (cs *clientSessionSuite) TestHandleConnNotConnFromConnected(c *C) {
1896 sess, err := NewSession("", dummyConf(), "wah", cs.lvls, cs.log)
1897 c.Assert(err, IsNil)
1898 ch := make(chan struct{}, 1)
1899 sess.redialDelay = func(sess *clientSession) time.Duration { ch <- struct{}{}; return 0 }
1900 sess.state = Connected
1901 sess.lastConn = true
1902 sess.handleConn(false)
1903 c.Check(sess.lastConn, Equals, false)
1904
1905 select {
1906 case <-ch:
1907 c.Fatalf("redialDelay called (-> autoRedial called)?")
1908 case <-time.After(100 * time.Millisecond):
1909 // all ok
1910 }
1911 c.Check(cs.log.Captured(), Matches, `(?ms).*-> Disconnected`)
1912}

Subscribers

People subscribed via source and target branches