Code review comment for lp:~niemeyer/gozk/watches-die-on-reconnection

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Reviewers: mp+108648_code.launchpad.net,

Message:
Please take a look.

Description:
Any session event on a watcher now fires and closes the watch.

Also test that closing the connection closes the watch.

https://code.launchpad.net/~niemeyer/gozk/watches-die-on-reconnection/+merge/108648

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/6292044/

Affected files:
   A [revision details]
   M zk.go
   M zk_test.go

Index: [revision details]
=== added file '[revision details]'
--- [revision details] 2012-01-01 00:00:00 +0000
+++ [revision details] 2012-01-01 00:00:00 +0000
@@ -0,0 +1,2 @@
+Old revision: <email address hidden>
+New revision: <email address hidden>

Index: zk.go
=== modified file 'zk.go'
--- zk.go 2012-03-15 15:11:23 +0000
+++ zk.go 2012-06-04 22:03:50 +0000
@@ -1043,11 +1043,12 @@
    return
   }
   if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
- switch event.State {
- case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
- default:
- // WTF? Feels like TCP saying "dropped a dup packet, ok?"
- return
+ if event.State == STATE_CONNECTED {
+ // The watch was established while it was still
+ // connecting, but we take all session events on
+ // non-session watches as fatal.
+ // Change the state code to make the intent clear.
+ event.State = STATE_CONNECTING
    }
   }
   ch := conn.watchChannels[watchId]

Index: zk_test.go
=== modified file 'zk_test.go'
--- zk_test.go 2012-03-15 15:11:53 +0000
+++ zk_test.go 2012-06-04 22:03:50 +0000
@@ -325,7 +325,7 @@
   _, err := conn.Create("/test", "one", zk.EPHEMERAL,
zk.WorldACL(zk.PERM_ALL))
   c.Assert(err, IsNil)

- _, _, _, err = conn.GetW("/test")
+ _, _, watch, err := conn.GetW("/test")
   c.Assert(err, IsNil)

   c.Assert(zk.CountPendingWatches(), Equals, 2)
@@ -333,6 +333,13 @@
   conn.Close()

   c.Assert(zk.CountPendingWatches(), Equals, 0)
+
+ select {
+ case _, ok := <-watch:
+ c.Assert(ok, Equals, false)
+ case <-time.After(3e9):
+ c.Fatal("Watch didn't fire")
+ }
  }

  // By default, the ZooKeeper C client will hang indefinitely if a
@@ -634,20 +641,19 @@
    c.Fatal("Session watch didn't fire")
   }

- // The watch channel should not, since it's not affected.
+ // The watch channel should receive just the connecting notification.
   select {
   case event := <-watch:
- c.Fatalf("Exists watch fired: %s", event)
- default:
- }
-
- // And it should still work.
- _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
- c.Assert(err, IsNil)
-
- event = <-watch
- c.Assert(event.Type, Equals, zk.EVENT_CREATED)
- c.Assert(event.Path, Equals, "/test")
+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
+ case <-time.After(3e9):
+ c.Fatal("Watch didn't fire")
+ }
+ select {
+ case _, ok := <-watch:
+ c.Assert(ok, Equals, false)
+ case <-time.After(3e9):
+ c.Fatal("Watch wasn't closed")
+ }

   c.Check(zk.CountPendingWatches(), Equals, 1)
  }
@@ -691,7 +697,7 @@

   select {
   case event := <-watch:
- c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
   case <-time.After(3e9):
    c.Fatal("Watch event didn't fire")
   }

« Back to merge proposal