Merge lp:~niemeyer/gozk/watches-die-on-reconnection into lp:gozk/zookeeper

Proposed by Gustavo Niemeyer
Status: Merged
Merged at revision: 35
Proposed branch: lp:~niemeyer/gozk/watches-die-on-reconnection
Merge into: lp:gozk/zookeeper
Diff against target: 89 lines (+26/-19)
2 files modified
zk.go (+6/-5)
zk_test.go (+20/-14)
To merge this branch: bzr merge lp:~niemeyer/gozk/watches-die-on-reconnection
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+108648@code.launchpad.net

Description of the change

Any session event on a watcher now fires and closes the watch.

Also test that closing the connection closes the watch.

https://codereview.appspot.com/6292044/

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (3.2 KiB)

Reviewers: mp+108648_code.launchpad.net,

Message:
Please take a look.

Description:
Any session event on a watcher now fires and closes the watch.

Also test that closing the connection closes the watch.

https://code.launchpad.net/~niemeyer/gozk/watches-die-on-reconnection/+merge/108648

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/6292044/

Affected files:
   A [revision details]
   M zk.go
   M zk_test.go

Index: [revision details]
=== added file '[revision details]'
--- [revision details] 2012-01-01 00:00:00 +0000
+++ [revision details] 2012-01-01 00:00:00 +0000
@@ -0,0 +1,2 @@
+Old revision: <email address hidden>
+New revision: <email address hidden>

Index: zk.go
=== modified file 'zk.go'
--- zk.go 2012-03-15 15:11:23 +0000
+++ zk.go 2012-06-04 22:03:50 +0000
@@ -1043,11 +1043,12 @@
    return
   }
   if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
- switch event.State {
- case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
- default:
- // WTF? Feels like TCP saying "dropped a dup packet, ok?"
- return
+ if event.State == STATE_CONNECTED {
+ // The watch was established while it was still
+ // connecting, but we take all session events on
+ // non-session watches as fatal.
+ // Change the state code to make the intent clear.
+ event.State = STATE_CONNECTING
    }
   }
   ch := conn.watchChannels[watchId]

Index: zk_test.go
=== modified file 'zk_test.go'
--- zk_test.go 2012-03-15 15:11:53 +0000
+++ zk_test.go 2012-06-04 22:03:50 +0000
@@ -325,7 +325,7 @@
   _, err := conn.Create("/test", "one", zk.EPHEMERAL,
zk.WorldACL(zk.PERM_ALL))
   c.Assert(err, IsNil)

- _, _, _, err = conn.GetW("/test")
+ _, _, watch, err := conn.GetW("/test")
   c.Assert(err, IsNil)

   c.Assert(zk.CountPendingWatches(), Equals, 2)
@@ -333,6 +333,13 @@
   conn.Close()

   c.Assert(zk.CountPendingWatches(), Equals, 0)
+
+ select {
+ case _, ok := <-watch:
+ c.Assert(ok, Equals, false)
+ case <-time.After(3e9):
+ c.Fatal("Watch didn't fire")
+ }
  }

  // By default, the ZooKeeper C client will hang indefinitely if a
@@ -634,20 +641,19 @@
    c.Fatal("Session watch didn't fire")
   }

- // The watch channel should not, since it's not affected.
+ // The watch channel should receive just the connecting notification.
   select {
   case event := <-watch:
- c.Fatalf("Exists watch fired: %s", event)
- default:
- }
-
- // And it should still work.
- _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
- c.Assert(err, IsNil)
-
- event = <-watch
- c.Assert(event.Type, Equals, zk.EVENT_CREATED)
- c.Assert(event.Path, Equals, "/test")
+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
+ case <-time.After(3e9):
+ c.Fatal("Watch didn't fire")
+ }
+ select {
+ case _, ok := <-watch:
+ c.Assert(ok, Equals, false)
+ case <-time.After(3e9):
+ c.Fatal("Watch wasn't closed")
+ }

   c.Check(zk.CountPendingWatches(), Equals, 1)
  }
@@ -691,7 +697,7 @@

   select {
   case event := <-watch:
- c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
...

Read more...

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

*** Submitted:

Any session event on a watcher now fires and closes the watch.

Also test that closing the connection closes the watch.

R=dfc
CC=
https://codereview.appspot.com/6292044

https://codereview.appspot.com/6292044/diff/1/zk.go
File zk.go (right):

https://codereview.appspot.com/6292044/diff/1/zk.go#newcode1049
zk.go:1049: // non-session watches as fatal.
On 2012/06/04 22:58:59, dfc wrote:
> this comment is a little confusing; does it mean

> // .. but we take all session events that occur before a session is
established
> as errors.

Improved comment. Thanks for the poke.

https://codereview.appspot.com/6292044/diff/1/zk_test.go
File zk_test.go (right):

https://codereview.appspot.com/6292044/diff/1/zk_test.go#newcode340
zk_test.go:340: case <-time.After(3e9):
On 2012/06/04 22:58:59, dfc wrote:
> nit: 3 * time.Second

> you've imported time, so you might as well make use of the constant.

I've copy & pasted that. There are tons of uses here, historical. I
woudln't like to change that right now if that's ok.

https://codereview.appspot.com/6292044/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'zk.go'
2--- zk.go 2012-03-15 15:11:23 +0000
3+++ zk.go 2012-06-04 22:08:18 +0000
4@@ -1043,11 +1043,12 @@
5 return
6 }
7 if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
8- switch event.State {
9- case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
10- default:
11- // WTF? Feels like TCP saying "dropped a dup packet, ok?"
12- return
13+ if event.State == STATE_CONNECTED {
14+ // The watch was established while it was still
15+ // connecting, but we take all session events on
16+ // non-session watches as fatal.
17+ // Change the state code to make the intent clear.
18+ event.State = STATE_CONNECTING
19 }
20 }
21 ch := conn.watchChannels[watchId]
22
23=== modified file 'zk_test.go'
24--- zk_test.go 2012-03-15 15:11:53 +0000
25+++ zk_test.go 2012-06-04 22:08:18 +0000
26@@ -325,7 +325,7 @@
27 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
28 c.Assert(err, IsNil)
29
30- _, _, _, err = conn.GetW("/test")
31+ _, _, watch, err := conn.GetW("/test")
32 c.Assert(err, IsNil)
33
34 c.Assert(zk.CountPendingWatches(), Equals, 2)
35@@ -333,6 +333,13 @@
36 conn.Close()
37
38 c.Assert(zk.CountPendingWatches(), Equals, 0)
39+
40+ select {
41+ case _, ok := <-watch:
42+ c.Assert(ok, Equals, false)
43+ case <-time.After(3e9):
44+ c.Fatal("Watch didn't fire")
45+ }
46 }
47
48 // By default, the ZooKeeper C client will hang indefinitely if a
49@@ -634,20 +641,19 @@
50 c.Fatal("Session watch didn't fire")
51 }
52
53- // The watch channel should not, since it's not affected.
54+ // The watch channel should receive just the connecting notification.
55 select {
56 case event := <-watch:
57- c.Fatalf("Exists watch fired: %s", event)
58- default:
59- }
60-
61- // And it should still work.
62- _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
63- c.Assert(err, IsNil)
64-
65- event = <-watch
66- c.Assert(event.Type, Equals, zk.EVENT_CREATED)
67- c.Assert(event.Path, Equals, "/test")
68+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
69+ case <-time.After(3e9):
70+ c.Fatal("Watch didn't fire")
71+ }
72+ select {
73+ case _, ok := <-watch:
74+ c.Assert(ok, Equals, false)
75+ case <-time.After(3e9):
76+ c.Fatal("Watch wasn't closed")
77+ }
78
79 c.Check(zk.CountPendingWatches(), Equals, 1)
80 }
81@@ -691,7 +697,7 @@
82
83 select {
84 case event := <-watch:
85- c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
86+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
87 case <-time.After(3e9):
88 c.Fatal("Watch event didn't fire")
89 }

Subscribers

People subscribed via source and target branches