Merge lp:~chipaca/ubuntu-push/client-v0-p11 into lp:ubuntu-push

Proposed by John Lenton
Status: Merged
Approved by: John Lenton
Approved revision: 60
Merged at revision: 51
Proposed branch: lp:~chipaca/ubuntu-push/client-v0-p11
Merge into: lp:ubuntu-push
Prerequisite: lp:~chipaca/ubuntu-push/client-v0-p9
Diff against target: 387 lines (+118/-18)
3 files modified
client/client.go (+31/-6)
client/client_test.go (+79/-12)
client/session/session.go (+8/-0)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/client-v0-p11
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+204696@code.launchpad.net

Commit message

Several things:
* features:
  * Client.doLoop, the puppet master.
* fixes and cleanups:
  * added log_level to client config
  * added the mysterious sessionRetryCh, used in doLoop to avoid a rather common starvation scenario.
  * found a way not to panic out in initSession (not that it's much better)
  * unified logging in the client tests a bit
  * added logging to session's start error states.

Description of the change

Several things:
* features:
  * Client.doLoop, the puppet master.
* fixes and cleanups:
  * added log_level to client config
  * added the mysterious sessionRetryCh, used in doLoop to avoid a rather common starvation scenario.
  * found a way not to panic out in initSession (not that it's much better)
  * unified logging in the client tests a bit
  * added logging to session's start error states.

To post a comment you must log in.
Revision history for this message
Samuele Pedroni (pedronis) wrote :

ok but land with todo markers

=== modified file 'client/client.go'
--- client/client.go 2014-02-04 13:10:18 +0000
+++ client/client.go 2014-02-04 17:34:01 +0000
@@ -142,6 +142,7 @@

 // connectSession kicks off the session connection dance
 func (client *Client) connectSession() {
+ // xxx lp:1276199
  if client.sessionRetrierStopper != nil {
   client.sessionRetrierStopper <- true
   client.sessionRetrierStopper = nil
@@ -156,6 +157,7 @@

 // disconnectSession disconnects the session
 func (client *Client) disconnectSession() {
+ // xxx lp:1276199
  if client.sessionRetrierStopper != nil {
   client.sessionRetrierStopper <- true

Revision history for this message
Samuele Pedroni (pedronis) wrote :

// ClientConfig holds the client configuration
type ClientConfig struct {
        connectivity.ConnectivityConfig // q.v.
        // A reasonably large maximum ping time

that's probably more: A reasonably large timeout for receive/answer pairs

Revision history for this message
Samuele Pedroni (pedronis) wrote :

ok, but see bug/markers above and as discussed

review: Approve
60. By John Lenton

added XXX comments, and fixed ExchangeTimeout docstring, as per pedronis suggestions

Revision history for this message
John Lenton (chipaca) wrote :

Done. Also added auto-highlighting of XXX comments to my .emacs... :-)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'client/client.go'
--- client/client.go 2014-02-04 18:19:28 +0000
+++ client/client.go 2014-02-04 18:19:28 +0000
@@ -39,12 +39,14 @@
39// ClientConfig holds the client configuration39// ClientConfig holds the client configuration
40type ClientConfig struct {40type ClientConfig struct {
41 connectivity.ConnectivityConfig // q.v.41 connectivity.ConnectivityConfig // q.v.
42 // A reasonably large maximum ping time42 // A reasonably larg timeout for receive/answer pairs
43 ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`43 ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`
44 // The server to connect to44 // The server to connect to
45 Addr config.ConfigHostPort45 Addr config.ConfigHostPort
46 // The PEM-encoded server certificate46 // The PEM-encoded server certificate
47 CertPEMFile string `json:"cert_pem_file"`47 CertPEMFile string `json:"cert_pem_file"`
48 // The logging level (one of "debug", "info", "error")
49 LogLevel string `json:"log_level"`
48}50}
4951
50// Client is the Ubuntu Push Notifications client-side daemon.52// Client is the Ubuntu Push Notifications client-side daemon.
@@ -62,6 +64,7 @@
62 actionsCh <-chan notifications.RawActionReply64 actionsCh <-chan notifications.RawActionReply
63 session *session.ClientSession65 session *session.ClientSession
64 sessionRetrierStopper chan bool66 sessionRetrierStopper chan bool
67 sessionRetryCh chan uint32
65}68}
6669
67// Configure loads the configuration specified in configPath, and sets it up.70// Configure loads the configuration specified in configPath, and sets it up.
@@ -74,8 +77,8 @@
74 if err != nil {77 if err != nil {
75 return fmt.Errorf("reading config: %v", err)78 return fmt.Errorf("reading config: %v", err)
76 }79 }
77 // later, we'll be specifying logging options in the config file80 // later, we'll be specifying more logging options in the config file
78 client.log = logger.NewSimpleLogger(os.Stderr, "error")81 client.log = logger.NewSimpleLogger(os.Stderr, client.config.LogLevel)
7982
80 // overridden for testing83 // overridden for testing
81 client.idder = identifier.New()84 client.idder = identifier.New()
@@ -84,6 +87,7 @@
84 client.connectivityEndp = bus.SystemBus.Endpoint(networkmanager.BusAddress, client.log)87 client.connectivityEndp = bus.SystemBus.Endpoint(networkmanager.BusAddress, client.log)
8588
86 client.connCh = make(chan bool)89 client.connCh = make(chan bool)
90 client.sessionRetryCh = make(chan uint32)
8791
88 if client.config.CertPEMFile != "" {92 if client.config.CertPEMFile != "" {
89 client.pem, err = ioutil.ReadFile(client.config.CertPEMFile)93 client.pem, err = ioutil.ReadFile(client.config.CertPEMFile)
@@ -126,17 +130,19 @@
126}130}
127131
128// initSession creates the session object132// initSession creates the session object
129func (client *Client) initSession() {133func (client *Client) initSession() error {
130 sess, err := session.NewSession(string(client.config.Addr), client.pem,134 sess, err := session.NewSession(string(client.config.Addr), client.pem,
131 client.config.ExchangeTimeout.Duration, client.deviceId, client.log)135 client.config.ExchangeTimeout.Duration, client.deviceId, client.log)
132 if err != nil {136 if err != nil {
133 panic("Don't know how to handle session creation failure.")137 return err
134 }138 }
135 client.session = sess139 client.session = sess
140 return nil
136}141}
137142
138// connectSession kicks off the session connection dance143// connectSession kicks off the session connection dance
139func (client *Client) connectSession() {144func (client *Client) connectSession() {
145 // XXX: lp:1276199
140 if client.sessionRetrierStopper != nil {146 if client.sessionRetrierStopper != nil {
141 client.sessionRetrierStopper <- true147 client.sessionRetrierStopper <- true
142 client.sessionRetrierStopper = nil148 client.sessionRetrierStopper = nil
@@ -146,11 +152,12 @@
146 client.session.Dial,152 client.session.Dial,
147 util.Jitter}153 util.Jitter}
148 client.sessionRetrierStopper = ar.Stop154 client.sessionRetrierStopper = ar.Stop
149 go ar.Retry()155 go func() { client.sessionRetryCh <- ar.Retry() }()
150}156}
151157
152// disconnectSession disconnects the session158// disconnectSession disconnects the session
153func (client *Client) disconnectSession() {159func (client *Client) disconnectSession() {
160 // XXX: lp:1276199
154 if client.sessionRetrierStopper != nil {161 if client.sessionRetrierStopper != nil {
155 client.sessionRetrierStopper <- true162 client.sessionRetrierStopper <- true
156 client.sessionRetrierStopper = nil163 client.sessionRetrierStopper = nil
@@ -212,3 +219,21 @@
212 urld := urldispatcher.New(client.urlDispatcherEndp, client.log)219 urld := urldispatcher.New(client.urlDispatcherEndp, client.log)
213 return urld.DispatchURL("settings:///system/system-update")220 return urld.DispatchURL("settings:///system/system-update")
214}221}
222
223// doLoop connects events with their handlers
224func (client *Client) doLoop(connhandler func(bool), clickhandler, notifhandler func() error, errhandler func(error)) {
225 for {
226 select {
227 case state := <-client.connCh:
228 connhandler(state)
229 case <-client.actionsCh:
230 clickhandler()
231 case <-client.session.MsgCh:
232 notifhandler()
233 case err := <-client.session.ErrCh:
234 errhandler(err)
235 case count := <-client.sessionRetryCh:
236 client.log.Debugf("Session connected after %d attempts", count)
237 }
238 }
239}
215240
=== modified file 'client/client_test.go'
--- client/client_test.go 2014-02-04 18:19:28 +0000
+++ client/client_test.go 2014-02-04 18:19:28 +0000
@@ -18,10 +18,12 @@
1818
19import (19import (
20 "bytes"20 "bytes"
21 "errors"
21 "fmt"22 "fmt"
22 "io/ioutil"23 "io/ioutil"
23 . "launchpad.net/gocheck"24 . "launchpad.net/gocheck"
24 "launchpad.net/ubuntu-push/bus/networkmanager"25 "launchpad.net/ubuntu-push/bus/networkmanager"
26 "launchpad.net/ubuntu-push/bus/notifications"
25 testibus "launchpad.net/ubuntu-push/bus/testing"27 testibus "launchpad.net/ubuntu-push/bus/testing"
26 "launchpad.net/ubuntu-push/client/session"28 "launchpad.net/ubuntu-push/client/session"
27 "launchpad.net/ubuntu-push/logger"29 "launchpad.net/ubuntu-push/logger"
@@ -57,6 +59,7 @@
5759
58var nullog = logger.NewSimpleLogger(ioutil.Discard, "error")60var nullog = logger.NewSimpleLogger(ioutil.Discard, "error")
59var noisylog = logger.NewSimpleLogger(os.Stderr, "debug")61var noisylog = logger.NewSimpleLogger(os.Stderr, "debug")
62var debuglog = noisylog
60var _ = Suite(&clientSuite{})63var _ = Suite(&clientSuite{})
6164
62const (65const (
@@ -82,6 +85,7 @@
82}85}
8386
84func (cs *clientSuite) SetUpTest(c *C) {87func (cs *clientSuite) SetUpTest(c *C) {
88 debuglog.Debugf("---")
85 dir := c.MkDir()89 dir := c.MkDir()
86 cs.configPath = filepath.Join(dir, "config")90 cs.configPath = filepath.Join(dir, "config")
87 cfg := fmt.Sprintf(`91 cfg := fmt.Sprintf(`
@@ -92,7 +96,8 @@
92 "connectivity_check_md5": "",96 "connectivity_check_md5": "",
93 "addr": ":0",97 "addr": ":0",
94 "cert_pem_file": %#v,98 "cert_pem_file": %#v,
95 "recheck_timeout": "3h"99 "recheck_timeout": "3h",
100 "log_level": "debug"
96}`, helpers.SourceRelative("../server/acceptance/config/testing.cert"))101}`, helpers.SourceRelative("../server/acceptance/config/testing.cert"))
97 ioutil.WriteFile(cs.configPath, []byte(cfg), 0600)102 ioutil.WriteFile(cs.configPath, []byte(cfg), 0600)
98}103}
@@ -205,6 +210,7 @@
205210
206func (cs *clientSuite) TestGetDeviceIdWorks(c *C) {211func (cs *clientSuite) TestGetDeviceIdWorks(c *C) {
207 cli := new(Client)212 cli := new(Client)
213 cli.log = debuglog
208 cli.idder = identifier.New()214 cli.idder = identifier.New()
209 c.Check(cli.deviceId, Equals, "")215 c.Check(cli.deviceId, Equals, "")
210 c.Check(cli.getDeviceId(), IsNil)216 c.Check(cli.getDeviceId(), IsNil)
@@ -213,6 +219,7 @@
213219
214func (cs *clientSuite) TestGetDeviceIdCanFail(c *C) {220func (cs *clientSuite) TestGetDeviceIdCanFail(c *C) {
215 cli := new(Client)221 cli := new(Client)
222 cli.log = debuglog
216 cli.idder = idtesting.Failing()223 cli.idder = idtesting.Failing()
217 c.Check(cli.deviceId, Equals, "")224 c.Check(cli.deviceId, Equals, "")
218 c.Check(cli.getDeviceId(), NotNil)225 c.Check(cli.getDeviceId(), NotNil)
@@ -237,8 +244,10 @@
237 cEndp := testibus.NewTestingEndpoint(cCond, condition.Work(true),244 cEndp := testibus.NewTestingEndpoint(cCond, condition.Work(true),
238 uint32(networkmanager.ConnectedGlobal),245 uint32(networkmanager.ConnectedGlobal),
239 )246 )
247 testibus.SetWatchTicker(cEndp, make(chan bool))
240 // ok, create the thing248 // ok, create the thing
241 cli := new(Client)249 cli := new(Client)
250 cli.log = debuglog
242 err := cli.Configure(cs.configPath)251 err := cli.Configure(cs.configPath)
243 c.Assert(err, IsNil)252 c.Assert(err, IsNil)
244 // the user actions channel has not been set up253 // the user actions channel has not been set up
@@ -267,6 +276,7 @@
267func (cs *clientSuite) TestTakeTheBusCanFail(c *C) {276func (cs *clientSuite) TestTakeTheBusCanFail(c *C) {
268 cli := new(Client)277 cli := new(Client)
269 err := cli.Configure(cs.configPath)278 err := cli.Configure(cs.configPath)
279 cli.log = debuglog
270 c.Assert(err, IsNil)280 c.Assert(err, IsNil)
271 // the user actions channel has not been set up281 // the user actions channel has not been set up
272 c.Check(cli.actionsCh, IsNil)282 c.Check(cli.actionsCh, IsNil)
@@ -285,15 +295,13 @@
285******************************************************************/295******************************************************************/
286296
287func (cs *clientSuite) TestHandleErr(c *C) {297func (cs *clientSuite) TestHandleErr(c *C) {
298 buf := &bytes.Buffer{}
288 cli := new(Client)299 cli := new(Client)
289 cli.log = noisylog300 cli.log = logger.NewSimpleLogger(buf, "debug")
290 cli.initSession()301 cli.initSession()
291 cli.hasConnectivity = true302 cli.hasConnectivity = true
292 cli.handleErr(nil)303 cli.handleErr(errors.New("bananas"))
293 c.Assert(cli.session, NotNil)304 c.Check(buf.String(), Matches, ".*session exited.*bananas\n")
294 // let the session connection fail
295 time.Sleep(100 * time.Millisecond)
296 c.Check(cli.session.State(), Equals, session.Error)
297}305}
298306
299/*****************************************************************307/*****************************************************************
@@ -302,19 +310,23 @@
302310
303func (cs *clientSuite) TestHandleConnStateD2C(c *C) {311func (cs *clientSuite) TestHandleConnStateD2C(c *C) {
304 cli := new(Client)312 cli := new(Client)
313 cli.log = debuglog
305 cli.initSession()314 cli.initSession()
306 // let's pretend the client had a previous attempt at connecting still pending315 // let's pretend the client had a previous attempt at connecting still pending
307 // (hard to trigger in real life, but possible)316 // (hard to trigger in real life, but possible)
308 cli.sessionRetrierStopper = make(chan bool, 1)317 ch := make(chan bool, 1)
318 cli.sessionRetrierStopper = ch
309319
310 c.Assert(cli.hasConnectivity, Equals, false)320 c.Assert(cli.hasConnectivity, Equals, false)
311 cli.handleConnState(true)321 cli.handleConnState(true)
322 c.Check(len(ch), Equals, 1)
312 c.Check(cli.hasConnectivity, Equals, true)323 c.Check(cli.hasConnectivity, Equals, true)
313 c.Assert(cli.session, NotNil)324 c.Assert(cli.session, NotNil)
314}325}
315326
316func (cs *clientSuite) TestHandleConnStateSame(c *C) {327func (cs *clientSuite) TestHandleConnStateSame(c *C) {
317 cli := new(Client)328 cli := new(Client)
329 cli.log = debuglog
318 // here we want to check that we don't do anything330 // here we want to check that we don't do anything
319 c.Assert(cli.session, IsNil)331 c.Assert(cli.session, IsNil)
320 c.Assert(cli.hasConnectivity, Equals, false)332 c.Assert(cli.hasConnectivity, Equals, false)
@@ -328,7 +340,8 @@
328340
329func (cs *clientSuite) TestHandleConnStateC2D(c *C) {341func (cs *clientSuite) TestHandleConnStateC2D(c *C) {
330 cli := new(Client)342 cli := new(Client)
331 cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, noisylog)343 cli.log = debuglog
344 cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, debuglog)
332 cli.session.Dial()345 cli.session.Dial()
333 cli.hasConnectivity = true346 cli.hasConnectivity = true
334347
@@ -340,7 +353,8 @@
340353
341func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) {354func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) {
342 cli := new(Client)355 cli := new(Client)
343 cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, noisylog)356 cli.log = debuglog
357 cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, debuglog)
344 cli.sessionRetrierStopper = make(chan bool, 1)358 cli.sessionRetrierStopper = make(chan bool, 1)
345 cli.hasConnectivity = true359 cli.hasConnectivity = true
346360
@@ -369,9 +383,9 @@
369383
370func (cs *clientSuite) TestHandleNotificationFail(c *C) {384func (cs *clientSuite) TestHandleNotificationFail(c *C) {
371 cli := new(Client)385 cli := new(Client)
386 cli.log = debuglog
372 endp := testibus.NewTestingEndpoint(nil, condition.Work(false))387 endp := testibus.NewTestingEndpoint(nil, condition.Work(false))
373 cli.notificationsEndp = endp388 cli.notificationsEndp = endp
374 cli.log = noisylog
375 c.Check(cli.handleNotification(), NotNil)389 c.Check(cli.handleNotification(), NotNil)
376}390}
377391
@@ -381,9 +395,9 @@
381395
382func (cs *clientSuite) TestHandleClick(c *C) {396func (cs *clientSuite) TestHandleClick(c *C) {
383 cli := new(Client)397 cli := new(Client)
398 cli.log = debuglog
384 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), nil)399 endp := testibus.NewTestingEndpoint(nil, condition.Work(true), nil)
385 cli.urlDispatcherEndp = endp400 cli.urlDispatcherEndp = endp
386 cli.log = noisylog
387 c.Check(cli.handleClick(), IsNil)401 c.Check(cli.handleClick(), IsNil)
388 // check we sent the notification402 // check we sent the notification
389 args := testibus.GetCallArgs(endp)403 args := testibus.GetCallArgs(endp)
@@ -391,3 +405,56 @@
391 c.Check(args[0].Member, Equals, "DispatchURL")405 c.Check(args[0].Member, Equals, "DispatchURL")
392 c.Check(args[0].Args, DeepEquals, []interface{}{"settings:///system/system-update"})406 c.Check(args[0].Args, DeepEquals, []interface{}{"settings:///system/system-update"})
393}407}
408
409/*****************************************************************
410 doLoop tests
411******************************************************************/
412
413func (cs *clientSuite) TestDoLoopConn(c *C) {
414 cli := new(Client)
415 cli.log = debuglog
416 cli.connCh = make(chan bool, 1)
417 cli.connCh <- true
418 cli.initSession()
419
420 ch := make(chan bool, 1)
421 go cli.doLoop(func(bool) { ch <- true }, func() error { return nil }, func() error { return nil }, func(error) {})
422 c.Check(takeNextBool(ch), Equals, true)
423}
424
425func (cs *clientSuite) TestDoLoopClick(c *C) {
426 cli := new(Client)
427 cli.log = debuglog
428 cli.initSession()
429 aCh := make(chan notifications.RawActionReply, 1)
430 aCh <- notifications.RawActionReply{}
431 cli.actionsCh = aCh
432
433 ch := make(chan bool, 1)
434 go cli.doLoop(func(bool) {}, func() error { ch <- true; return nil }, func() error { return nil }, func(error) {})
435 c.Check(takeNextBool(ch), Equals, true)
436}
437
438func (cs *clientSuite) TestDoLoopNotif(c *C) {
439 cli := new(Client)
440 cli.log = debuglog
441 cli.initSession()
442 cli.session.MsgCh = make(chan *session.Notification, 1)
443 cli.session.MsgCh <- &session.Notification{}
444
445 ch := make(chan bool, 1)
446 go cli.doLoop(func(bool) {}, func() error { return nil }, func() error { ch <- true; return nil }, func(error) {})
447 c.Check(takeNextBool(ch), Equals, true)
448}
449
450func (cs *clientSuite) TestDoLoopErr(c *C) {
451 cli := new(Client)
452 cli.log = debuglog
453 cli.initSession()
454 cli.session.ErrCh = make(chan error, 1)
455 cli.session.ErrCh <- nil
456
457 ch := make(chan bool, 1)
458 go cli.doLoop(func(bool) {}, func() error { return nil }, func() error { return nil }, func(error) { ch <- true })
459 c.Check(takeNextBool(ch), Equals, true)
460}
394461
=== modified file 'client/session/session.go'
--- client/session/session.go 2014-02-04 13:02:38 +0000
+++ client/session/session.go 2014-02-04 18:19:28 +0000
@@ -147,14 +147,17 @@
147 err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})147 err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})
148 if err != nil {148 if err != nil {
149 sess.setState(Error)149 sess.setState(Error)
150 sess.Log.Errorf("unable to ack broadcast: %s", err)
150 return err151 return err
151 }152 }
152 sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s",153 sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s",
153 bcast.ChanId, bcast.AppId, bcast.TopLevel, bcast.Payloads)154 bcast.ChanId, bcast.AppId, bcast.TopLevel, bcast.Payloads)
154 if bcast.ChanId == protocol.SystemChannelId {155 if bcast.ChanId == protocol.SystemChannelId {
155 // the system channel id, the only one we care about for now156 // the system channel id, the only one we care about for now
157 sess.Log.Debugf("sending it over")
156 sess.Levels.Set(bcast.ChanId, bcast.TopLevel)158 sess.Levels.Set(bcast.ChanId, bcast.TopLevel)
157 sess.MsgCh <- &Notification{}159 sess.MsgCh <- &Notification{}
160 sess.Log.Debugf("sent it over")
158 } else {161 } else {
159 sess.Log.Debugf("what is this weird channel, %#v?", bcast.ChanId)162 sess.Log.Debugf("what is this weird channel, %#v?", bcast.ChanId)
160 }163 }
@@ -192,6 +195,7 @@
192 err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))195 err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
193 if err != nil {196 if err != nil {
194 sess.setState(Error)197 sess.setState(Error)
198 sess.Log.Errorf("unable to start: set deadline: %s", err)
195 return err199 return err
196 }200 }
197 _, err = conn.Write(wireVersionBytes)201 _, err = conn.Write(wireVersionBytes)
@@ -199,6 +203,7 @@
199 // n < len(p). So, no need to check number of bytes written, hooray.203 // n < len(p). So, no need to check number of bytes written, hooray.
200 if err != nil {204 if err != nil {
201 sess.setState(Error)205 sess.setState(Error)
206 sess.Log.Errorf("unable to start: write version: %s", err)
202 return err207 return err
203 }208 }
204 proto := sess.Protocolator(conn)209 proto := sess.Protocolator(conn)
@@ -210,12 +215,14 @@
210 })215 })
211 if err != nil {216 if err != nil {
212 sess.setState(Error)217 sess.setState(Error)
218 sess.Log.Errorf("unable to start: connect: %s", err)
213 return err219 return err
214 }220 }
215 var connAck protocol.ConnAckMsg221 var connAck protocol.ConnAckMsg
216 err = proto.ReadMessage(&connAck)222 err = proto.ReadMessage(&connAck)
217 if err != nil {223 if err != nil {
218 sess.setState(Error)224 sess.setState(Error)
225 sess.Log.Errorf("unable to start: connack: %s", err)
219 return err226 return err
220 }227 }
221 if connAck.Type != "connack" {228 if connAck.Type != "connack" {
@@ -225,6 +232,7 @@
225 pingInterval, err := time.ParseDuration(connAck.Params.PingInterval)232 pingInterval, err := time.ParseDuration(connAck.Params.PingInterval)
226 if err != nil {233 if err != nil {
227 sess.setState(Error)234 sess.setState(Error)
235 sess.Log.Errorf("unable to start: parse ping interval: %s", err)
228 return err236 return err
229 }237 }
230 sess.proto = proto238 sess.proto = proto

Subscribers

People subscribed via source and target branches