Merge lp:~rogpeppe/gozk/update-documentation into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Rejected
Rejected by: Gustavo Niemeyer
Proposed branch: lp:~rogpeppe/gozk/update-documentation
Merge into: lp:~juju/gozk/trunk
Diff against target: 3213 lines (+1334/-814)
10 files modified
Makefile (+5/-2)
example/example.go (+22/-23)
reattach_test.go (+202/-0)
retry_test.go (+65/-74)
server.go (+239/-0)
service/Makefile (+20/-0)
service/service.go (+160/-0)
suite_test.go (+45/-122)
zk.go (+318/-341)
zk_test.go (+258/-252)
To merge this branch: bzr merge lp:~rogpeppe/gozk/update-documentation
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+77980@code.launchpad.net

Description of the change

Update documentation.

Add more complete description of event delivery and comments
on Stat field accessors.

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

This is a massive change unrelated to documentation.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

this was a while ago. i didn't know how to handle the review system...

i think the intended changes are still worth making. i'll make a CL.
i could also use that proposal to change appropriate methods
to return a time.Time if that seems reasonable (e.g. Stat.CTime etc)

On 10 February 2012 12:27, Gustavo Niemeyer <email address hidden> wrote:
> The proposal to merge lp:~rogpeppe/gozk/update-documentation into lp:gozk has been updated.
>
>    Status: Needs review => Rejected
>
> For more details, see:
> https://code.launchpad.net/~rogpeppe/gozk/update-documentation/+merge/77980
> --
> https://code.launchpad.net/~rogpeppe/gozk/update-documentation/+merge/77980
> You are the owner of lp:~rogpeppe/gozk/update-documentation.

Unmerged revisions

24. By Roger Peppe

Update documentation to include more accurate
description of events and comments on stat field accessors.

23. By Roger Peppe

Factored out server running code into a separate package.

22. By Roger Peppe

Fix Server code.

21. By Gustavo Niemeyer

The package location is now http://launchpad.net/gozk/zk.

The shorter package name makes for significantly more digestible line lengths.

20. By Gustavo Niemeyer

Merged clean-up-interface branch, by Roger Peppe [r=niemeyer]

This branch does a number of incompatible changes that improve
the overall API of gozk.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile'
--- Makefile 2011-08-03 01:56:58 +0000
+++ Makefile 2011-10-03 17:02:23 +0000
@@ -2,10 +2,13 @@
22
3all: package3all: package
44
5TARG=gozk5TARG=launchpad.net/gozk/zk
66
7GOFILES=\
8 server.go\
9
7CGOFILES=\10CGOFILES=\
8 gozk.go\11 zk.go\
912
10CGO_OFILES=\13CGO_OFILES=\
11 helpers.o\14 helpers.o\
1215
=== added directory 'example'
=== renamed file 'example.go' => 'example/example.go'
--- example.go 2011-08-03 01:47:25 +0000
+++ example/example.go 2011-10-03 17:02:23 +0000
@@ -1,30 +1,29 @@
1package main1package main
22
3import (3import (
4 "gozk"4 "launchpad.net/zookeeper/zookeeper"
5)5)
66
7func main() {7func main() {
8 zk, session, err := gozk.Init("localhost:2181", 5000)8 zk, session, err := zookeeper.Init("localhost:2181", 5000)
9 if err != nil {9 if err != nil {
10 println("Couldn't connect: " + err.String())10 println("Couldn't connect: " + err.String())
11 return11 return
12 }12 }
1313
14 defer zk.Close()14 defer zk.Close()
1515
16 // Wait for connection.16 // Wait for connection.
17 event := <-session17 event := <-session
18 if event.State != gozk.STATE_CONNECTED {18 if event.State != zookeeper.STATE_CONNECTED {
19 println("Couldn't connect")19 println("Couldn't connect")
20 return20 return
21 }21 }
2222
23 _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))23 _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
24 if err != nil {24 if err != nil {
25 println(err.String())25 println(err.String())
26 } else {26 } else {
27 println("Created!")27 println("Created!")
28 }28 }
29}29}
30
3130
=== added file 'reattach_test.go'
--- reattach_test.go 1970-01-01 00:00:00 +0000
+++ reattach_test.go 2011-10-03 17:02:23 +0000
@@ -0,0 +1,202 @@
1package zk_test
2
3import (
4 "bufio"
5 . "launchpad.net/gocheck"
6 "launchpad.net/gozk/zk/service"
7 "launchpad.net/gozk/zk"
8 "exec"
9 "flag"
10 "fmt"
11 "os"
12 "strings"
13 "testing"
14 "time"
15)
16
17var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing")
18var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing")
19var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing")
20
21// This is the reentrancy point for testing ZooKeeper servers
22// started by processes that are not direct children of the
23// testing process. This test always succeeds - the status
24// will be written to stdout and read by indirectServer.
25func TestStartNonChildServer(t *testing.T) {
26 if !*reattach {
27 // not re-entrant, so ignore this test.
28 return
29 }
30 err := startServer(*reattachRunDir, *reattachAbnormalStop)
31 if err != nil {
32 fmt.Printf("error:%v\n", err)
33 return
34 }
35 fmt.Printf("done\n")
36}
37
38func (s *S) startServer(c *C, abort bool) {
39 err := startServer(s.zkTestRoot, abort)
40 c.Assert(err, IsNil)
41}
42
43// startServerIndirect starts a ZooKeeper server that is not
44// a direct child of the current process. If abort is true,
45// the server will be started and then terminated abnormally.
46func (s *S) startServerIndirect(c *C, abort bool) {
47 if len(os.Args) == 0 {
48 c.Fatal("Cannot find self executable name")
49 }
50 cmd := exec.Command(
51 os.Args[0],
52 "-zktest.reattach",
53 "-zktest.rundir", s.zkTestRoot,
54 "-zktest.stop=", fmt.Sprint(abort),
55 "-test.run", "StartNonChildServer",
56 )
57 r, err := cmd.StdoutPipe()
58 c.Assert(err, IsNil)
59 defer r.Close()
60 if err := cmd.Start(); err != nil {
61 c.Fatalf("cannot start re-entrant gotest process: %v", err)
62 }
63 defer cmd.Wait()
64 bio := bufio.NewReader(r)
65 for {
66 line, err := bio.ReadSlice('\n')
67 if err != nil {
68 c.Fatalf("indirect server status line not found: %v", err)
69 }
70 s := string(line)
71 if strings.HasPrefix(s, "error:") {
72 c.Fatalf("indirect server error: %s", s[len("error:"):])
73 }
74 if s == "done\n" {
75 return
76 }
77 }
78 panic("not reached")
79}
80
81// startServer starts a ZooKeeper server, and terminates it abnormally
82// if abort is true.
83func startServer(runDir string, abort bool) os.Error {
84 s, err := zk.AttachServer(runDir)
85 if err != nil {
86 return fmt.Errorf("cannot attach to server at %q: %v", runDir, err)
87 }
88 srv := service.New(s)
89 if err := srv.Start(); err != nil {
90 return fmt.Errorf("cannot start server: %v", err)
91 }
92 if abort {
93 // Give it time to start up, then kill the server process abnormally,
94 // leaving the pid.txt file behind.
95 time.Sleep(0.5e9)
96 p, err := srv.Process()
97 if err != nil {
98 return fmt.Errorf("cannot get server process: %v", err)
99 }
100 defer p.Release()
101 if err := p.Kill(); err != nil {
102 return fmt.Errorf("cannot kill server process: %v", err)
103 }
104 }
105 return nil
106}
107
108func (s *S) checkCookie(c *C) {
109 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
110 c.Assert(err, IsNil)
111
112 e, ok := <-watch
113 c.Assert(ok, Equals, true)
114 c.Assert(e.Ok(), Equals, true)
115
116 c.Assert(err, IsNil)
117 cookie, _, err := conn.Get("/testAttachCookie")
118 c.Assert(err, IsNil)
119 c.Assert(cookie, Equals, "testAttachCookie")
120 conn.Close()
121}
122
123// cases to test:
124// child server, stopped normally; reattach, start
125// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
126// non-direct child server, still running; reattach, start (->error), stop, start
127// child server, still running; reattach, start (-> error)
128// child server, still running; reattach, stop, start.
129// non-direct child server, still running; reattach, stop, start.
130func (s *S) TestAttachServer(c *C) {
131 // Create a cookie so that we know we are reattaching to the same instance.
132 conn, _ := s.init(c)
133 _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL))
134 c.Assert(err, IsNil)
135 s.checkCookie(c)
136 s.zkServer.Stop()
137 s.zkServer = nil
138
139 s.testAttachServer(c, (*S).startServer)
140 s.testAttachServer(c, (*S).startServerIndirect)
141 s.testAttachServerAbnormalTerminate(c, (*S).startServer)
142 s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect)
143
144 srv, err := zk.AttachServer(s.zkTestRoot)
145 c.Assert(err, IsNil)
146
147 s.zkServer = service.New(srv)
148 err = s.zkServer.Start()
149 c.Assert(err, IsNil)
150
151 conn, _ = s.init(c)
152 err = conn.Delete("/testAttachCookie", -1)
153 c.Assert(err, IsNil)
154}
155
156func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) {
157 start(s, c, false)
158
159 s.checkCookie(c)
160
161 // try attaching to it while it is still running - it should fail.
162 a, err := zk.AttachServer(s.zkTestRoot)
163 c.Assert(err, IsNil)
164 srv := service.New(a)
165
166 err = srv.Start()
167 c.Assert(err, NotNil)
168
169 // stop it and then start it again - it should succeed.
170 err = srv.Stop()
171 c.Assert(err, IsNil)
172
173 err = srv.Start()
174 c.Assert(err, IsNil)
175
176 s.checkCookie(c)
177
178 err = srv.Stop()
179 c.Assert(err, IsNil)
180}
181
182func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) {
183 start(s, c, true)
184
185 // try attaching to it and starting - it should fail, because pid.txt
186 // won't have been removed.
187 a, err := zk.AttachServer(s.zkTestRoot)
188 c.Assert(err, IsNil)
189 srv := service.New(a)
190 err = srv.Start()
191 c.Assert(err, NotNil)
192
193 // stopping it should bring things back to normal.
194 err = srv.Stop()
195 c.Assert(err, IsNil)
196 err = srv.Start()
197 c.Assert(err, IsNil)
198
199 s.checkCookie(c)
200 err = srv.Stop()
201 c.Assert(err, IsNil)
202}
0\ No newline at end of file203\ No newline at end of file
1204
=== modified file 'retry_test.go'
--- retry_test.go 2011-08-19 01:51:37 +0000
+++ retry_test.go 2011-10-03 17:02:23 +0000
@@ -1,42 +1,41 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/gozk/zk"
6 "os"6 "os"
7)7)
88
9func (s *S) TestRetryChangeCreating(c *C) {9func (s *S) TestRetryChangeCreating(c *C) {
10 zk, _ := s.init(c)10 conn, _ := s.init(c)
1111
12 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),12 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
13 func(data string, stat gozk.Stat) (string, os.Error) {13 func(data string, stat *zk.Stat) (string, os.Error) {
14 c.Assert(data, Equals, "")14 c.Assert(data, Equals, "")
15 c.Assert(stat, IsNil)15 c.Assert(stat, IsNil)
16 return "new", nil16 return "new", nil
17 })17 })
18 c.Assert(err, IsNil)18 c.Assert(err, IsNil)
1919
20 data, stat, err := zk.Get("/test")20 data, stat, err := conn.Get("/test")
21 c.Assert(err, IsNil)21 c.Assert(err, IsNil)
22 c.Assert(stat, NotNil)22 c.Assert(stat, NotNil)
23 c.Assert(stat.Version(), Equals, int32(0))23 c.Assert(stat.Version(), Equals, int32(0))
24 c.Assert(data, Equals, "new")24 c.Assert(data, Equals, "new")
2525
26 acl, _, err := zk.ACL("/test")26 acl, _, err := conn.ACL("/test")
27 c.Assert(err, IsNil)27 c.Assert(err, IsNil)
28 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))28 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
29}29}
3030
31func (s *S) TestRetryChangeSetting(c *C) {31func (s *S) TestRetryChangeSetting(c *C) {
32 zk, _ := s.init(c)32 conn, _ := s.init(c)
3333
34 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,34 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
35 gozk.WorldACL(gozk.PERM_ALL))
36 c.Assert(err, IsNil)35 c.Assert(err, IsNil)
3736
38 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},37 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
39 func(data string, stat gozk.Stat) (string, os.Error) {38 func(data string, stat *zk.Stat) (string, os.Error) {
40 c.Assert(data, Equals, "old")39 c.Assert(data, Equals, "old")
41 c.Assert(stat, NotNil)40 c.Assert(stat, NotNil)
42 c.Assert(stat.Version(), Equals, int32(0))41 c.Assert(stat.Version(), Equals, int32(0))
@@ -44,27 +43,26 @@
44 })43 })
45 c.Assert(err, IsNil)44 c.Assert(err, IsNil)
4645
47 data, stat, err := zk.Get("/test")46 data, stat, err := conn.Get("/test")
48 c.Assert(err, IsNil)47 c.Assert(err, IsNil)
49 c.Assert(stat, NotNil)48 c.Assert(stat, NotNil)
50 c.Assert(stat.Version(), Equals, int32(1))49 c.Assert(stat.Version(), Equals, int32(1))
51 c.Assert(data, Equals, "brand new")50 c.Assert(data, Equals, "brand new")
5251
53 // ACL was unchanged by RetryChange().52 // ACL was unchanged by RetryChange().
54 acl, _, err := zk.ACL("/test")53 acl, _, err := conn.ACL("/test")
55 c.Assert(err, IsNil)54 c.Assert(err, IsNil)
56 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))55 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
57}56}
5857
59func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {58func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
60 zk, _ := s.init(c)59 conn, _ := s.init(c)
6160
62 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,61 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
63 gozk.WorldACL(gozk.PERM_ALL))
64 c.Assert(err, IsNil)62 c.Assert(err, IsNil)
6563
66 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},64 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
67 func(data string, stat gozk.Stat) (string, os.Error) {65 func(data string, stat *zk.Stat) (string, os.Error) {
68 c.Assert(data, Equals, "old")66 c.Assert(data, Equals, "old")
69 c.Assert(stat, NotNil)67 c.Assert(stat, NotNil)
70 c.Assert(stat.Version(), Equals, int32(0))68 c.Assert(stat.Version(), Equals, int32(0))
@@ -72,7 +70,7 @@
72 })70 })
73 c.Assert(err, IsNil)71 c.Assert(err, IsNil)
7472
75 data, stat, err := zk.Get("/test")73 data, stat, err := conn.Get("/test")
76 c.Assert(err, IsNil)74 c.Assert(err, IsNil)
77 c.Assert(stat, NotNil)75 c.Assert(stat, NotNil)
78 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!76 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!
@@ -80,14 +78,14 @@
80}78}
8179
82func (s *S) TestRetryChangeConflictOnCreate(c *C) {80func (s *S) TestRetryChangeConflictOnCreate(c *C) {
83 zk, _ := s.init(c)81 conn, _ := s.init(c)
8482
85 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {83 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
86 switch data {84 switch data {
87 case "":85 case "":
88 c.Assert(stat, IsNil)86 c.Assert(stat, IsNil)
89 _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,87 _, err := conn.Create("/test", "conflict", zk.EPHEMERAL,
90 gozk.WorldACL(gozk.PERM_ALL))88 zk.WorldACL(zk.PERM_ALL))
91 c.Assert(err, IsNil)89 c.Assert(err, IsNil)
92 return "<none> => conflict", nil90 return "<none> => conflict", nil
93 case "conflict":91 case "conflict":
@@ -100,11 +98,10 @@
100 return "can't happen", nil98 return "can't happen", nil
101 }99 }
102100
103 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),101 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc)
104 changeFunc)
105 c.Assert(err, IsNil)102 c.Assert(err, IsNil)
106103
107 data, stat, err := zk.Get("/test")104 data, stat, err := conn.Get("/test")
108 c.Assert(err, IsNil)105 c.Assert(err, IsNil)
109 c.Assert(data, Equals, "conflict => new")106 c.Assert(data, Equals, "conflict => new")
110 c.Assert(stat, NotNil)107 c.Assert(stat, NotNil)
@@ -112,18 +109,17 @@
112}109}
113110
114func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {111func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
115 zk, _ := s.init(c)112 conn, _ := s.init(c)
116113
117 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,114 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
118 gozk.WorldACL(gozk.PERM_ALL))
119 c.Assert(err, IsNil)115 c.Assert(err, IsNil)
120116
121 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {117 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
122 switch data {118 switch data {
123 case "old":119 case "old":
124 c.Assert(stat, NotNil)120 c.Assert(stat, NotNil)
125 c.Assert(stat.Version(), Equals, int32(0))121 c.Assert(stat.Version(), Equals, int32(0))
126 _, err := zk.Set("/test", "conflict", 0)122 _, err := conn.Set("/test", "conflict", 0)
127 c.Assert(err, IsNil)123 c.Assert(err, IsNil)
128 return "old => new", nil124 return "old => new", nil
129 case "conflict":125 case "conflict":
@@ -136,10 +132,10 @@
136 return "can't happen", nil132 return "can't happen", nil
137 }133 }
138134
139 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)135 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc)
140 c.Assert(err, IsNil)136 c.Assert(err, IsNil)
141137
142 data, stat, err := zk.Get("/test")138 data, stat, err := conn.Get("/test")
143 c.Assert(err, IsNil)139 c.Assert(err, IsNil)
144 c.Assert(data, Equals, "conflict => new")140 c.Assert(data, Equals, "conflict => new")
145 c.Assert(stat, NotNil)141 c.Assert(stat, NotNil)
@@ -147,18 +143,17 @@
147}143}
148144
149func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {145func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
150 zk, _ := s.init(c)146 conn, _ := s.init(c)
151147
152 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,148 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
153 gozk.WorldACL(gozk.PERM_ALL))
154 c.Assert(err, IsNil)149 c.Assert(err, IsNil)
155150
156 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {151 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
157 switch data {152 switch data {
158 case "old":153 case "old":
159 c.Assert(stat, NotNil)154 c.Assert(stat, NotNil)
160 c.Assert(stat.Version(), Equals, int32(0))155 c.Assert(stat.Version(), Equals, int32(0))
161 err := zk.Delete("/test", 0)156 err := conn.Delete("/test", 0)
162 c.Assert(err, IsNil)157 c.Assert(err, IsNil)
163 return "old => <deleted>", nil158 return "old => <deleted>", nil
164 case "":159 case "":
@@ -170,55 +165,53 @@
170 return "can't happen", nil165 return "can't happen", nil
171 }166 }
172167
173 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),168 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc)
174 changeFunc)
175 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
176170
177 data, stat, err := zk.Get("/test")171 data, stat, err := conn.Get("/test")
178 c.Assert(err, IsNil)172 c.Assert(err, IsNil)
179 c.Assert(data, Equals, "<deleted> => new")173 c.Assert(data, Equals, "<deleted> => new")
180 c.Assert(stat, NotNil)174 c.Assert(stat, NotNil)
181 c.Assert(stat.Version(), Equals, int32(0))175 c.Assert(stat.Version(), Equals, int32(0))
182176
183 // Should be the new ACL.177 // Should be the new ACL.
184 acl, _, err := zk.ACL("/test")178 acl, _, err := conn.ACL("/test")
185 c.Assert(err, IsNil)179 c.Assert(err, IsNil)
186 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))180 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
187}181}
188182
189func (s *S) TestRetryChangeErrorInCallback(c *C) {183func (s *S) TestRetryChangeErrorInCallback(c *C) {
190 zk, _ := s.init(c)184 conn, _ := s.init(c)
191185
192 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),186 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
193 func(data string, stat gozk.Stat) (string, os.Error) {187 func(data string, stat *zk.Stat) (string, os.Error) {
194 return "don't use this", os.NewError("BOOM!")188 return "don't use this", os.NewError("BOOM!")
195 })189 })
196 c.Assert(err, NotNil)190 c.Assert(err, NotNil)
197 c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
198 c.Assert(err.String(), Equals, "BOOM!")191 c.Assert(err.String(), Equals, "BOOM!")
199192
200 stat, err := zk.Exists("/test")193 stat, err := conn.Exists("/test")
201 c.Assert(err, IsNil)194 c.Assert(err, IsNil)
202 c.Assert(stat, IsNil)195 c.Assert(stat, IsNil)
203}196}
204197
205func (s *S) TestRetryChangeFailsReading(c *C) {198func (s *S) TestRetryChangeFailsReading(c *C) {
206 zk, _ := s.init(c)199 conn, _ := s.init(c)
207200
208 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,201 // Write only!
209 gozk.WorldACL(gozk.PERM_WRITE)) // Write only!202 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE))
210 c.Assert(err, IsNil)203 c.Assert(err, IsNil)
211204
212 var called bool205 var called bool
213 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),206 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
214 func(data string, stat gozk.Stat) (string, os.Error) {207 func(data string, stat *zk.Stat) (string, os.Error) {
215 called = true208 called = true
216 return "", nil209 return "", nil
217 })210 })
218 c.Assert(err, NotNil)211 c.Assert(err, NotNil)
219 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)212 c.Assert(err, Equals, zk.ZNOAUTH)
220213
221 stat, err := zk.Exists("/test")214 stat, err := conn.Exists("/test")
222 c.Assert(err, IsNil)215 c.Assert(err, IsNil)
223 c.Assert(stat, NotNil)216 c.Assert(stat, NotNil)
224 c.Assert(stat.Version(), Equals, int32(0))217 c.Assert(stat.Version(), Equals, int32(0))
@@ -227,22 +220,21 @@
227}220}
228221
229func (s *S) TestRetryChangeFailsSetting(c *C) {222func (s *S) TestRetryChangeFailsSetting(c *C) {
230 zk, _ := s.init(c)223 conn, _ := s.init(c)
231224
232 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,225 // Read only!
233 gozk.WorldACL(gozk.PERM_READ)) // Read only!226 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
234 c.Assert(err, IsNil)227 c.Assert(err, IsNil)
235228
236 var called bool229 var called bool
237 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),230 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
238 func(data string, stat gozk.Stat) (string, os.Error) {231 func(data string, stat *zk.Stat) (string, os.Error) {
239 called = true232 called = true
240 return "", nil233 return "", nil
241 })234 })
242 c.Assert(err, NotNil)235 c.Assert(err, Equals, zk.ZNOAUTH)
243 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
244236
245 stat, err := zk.Exists("/test")237 stat, err := conn.Exists("/test")
246 c.Assert(err, IsNil)238 c.Assert(err, IsNil)
247 c.Assert(stat, NotNil)239 c.Assert(stat, NotNil)
248 c.Assert(stat.Version(), Equals, int32(0))240 c.Assert(stat.Version(), Equals, int32(0))
@@ -251,23 +243,22 @@
251}243}
252244
253func (s *S) TestRetryChangeFailsCreating(c *C) {245func (s *S) TestRetryChangeFailsCreating(c *C) {
254 zk, _ := s.init(c)246 conn, _ := s.init(c)
255247
256 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,248 // Read only!
257 gozk.WorldACL(gozk.PERM_READ)) // Read only!249 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
258 c.Assert(err, IsNil)250 c.Assert(err, IsNil)
259251
260 var called bool252 var called bool
261 err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,253 err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
262 gozk.WorldACL(gozk.PERM_ALL),254 func(data string, stat *zk.Stat) (string, os.Error) {
263 func(data string, stat gozk.Stat) (string, os.Error) {
264 called = true255 called = true
265 return "", nil256 return "", nil
266 })257 })
267 c.Assert(err, NotNil)258 c.Assert(err, NotNil)
268 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)259 c.Assert(err, Equals, zk.ZNOAUTH)
269260
270 stat, err := zk.Exists("/test/sub")261 stat, err := conn.Exists("/test/sub")
271 c.Assert(err, IsNil)262 c.Assert(err, IsNil)
272 c.Assert(stat, IsNil)263 c.Assert(stat, IsNil)
273264
274265
=== added file 'server.go'
--- server.go 1970-01-01 00:00:00 +0000
+++ server.go 2011-10-03 17:02:23 +0000
@@ -0,0 +1,239 @@
1package zk
2
3import (
4 "bufio"
5 "bytes"
6 "fmt"
7 "io/ioutil"
8 "net"
9 "os"
10 "path/filepath"
11 "strings"
12)
13
14// Server represents a ZooKeeper server, its data and configuration files.
15type Server struct {
16 runDir string
17 installDir string
18}
19
20func (srv *Server) Directory() string {
21 return srv.runDir
22}
23
24// CreateServer creates the directory runDir and sets up a ZooKeeper server
25// environment inside it. It is an error if runDir already exists.
26// The server will listen on the specified TCP port.
27//
28// The ZooKeeper installation directory is specified by installDir.
29// If this is empty, a system default will be used.
30//
31// CreateServer does not start the server - the *Server that
32// is returned can be used with the service package, for example:
33// see launchpad.net/gozk/zk/service.
34func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
35 if err := os.Mkdir(runDir, 0777); err != nil {
36 return nil, err
37 }
38 srv := &Server{runDir: runDir, installDir: installDir}
39 if err := srv.writeLog4JConfig(); err != nil {
40 return nil, err
41 }
42 if err := srv.writeZooKeeperConfig(port); err != nil {
43 return nil, err
44 }
45 if err := srv.writeInstallDir(); err != nil {
46 return nil, err
47 }
48 return srv, nil
49}
50
51// AttachServer creates a new ZooKeeper Server instance
52// to operate inside an existing run directory, runDir.
53// The directory must have been created with CreateServer.
54func AttachServer(runDir string) (*Server, os.Error) {
55 srv := &Server{runDir: runDir}
56 if err := srv.readInstallDir(); err != nil {
57 return nil, fmt.Errorf("cannot read server install directory: %v", err)
58 }
59 return srv, nil
60}
61
62func (srv *Server) path(name string) string {
63 return filepath.Join(srv.runDir, name)
64}
65
66func (srv *Server) CheckAvailability() os.Error {
67 port, err := srv.NetworkPort()
68 if err != nil {
69 return fmt.Errorf("cannot get network port: %v", err)
70 }
71 l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
72 if err != nil {
73 return fmt.Errorf("cannot listen on port %v: %v", port, err)
74 }
75 l.Close()
76 return nil
77}
78
79// ServerCommand returns the command used to start the
80// ZooKeeper server. It is provided for debugging and testing
81// purposes only.
82func (srv *Server) Command() ([]string, os.Error) {
83 cp, err := srv.classPath()
84 if err != nil {
85 return nil, fmt.Errorf("cannot get class path: %v", err)
86 }
87 return []string{
88 "java",
89 "-cp", strings.Join(cp, ":"),
90 "-Dzookeeper.root.logger=INFO,CONSOLE",
91 "-Dlog4j.configuration=file:"+srv.path("log4j.properties"),
92 "org.apache.zookeeper.server.quorum.QuorumPeerMain",
93 srv.path("zoo.cfg"),
94 }, nil
95}
96
97var log4jProperties = `
98log4j.rootLogger=INFO, CONSOLE
99log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
100log4j.appender.CONSOLE.Threshold=INFO
101log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
102log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
103`
104
105func (srv *Server) writeLog4JConfig() (err os.Error) {
106 return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666)
107}
108
109func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) {
110 return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf(
111 "tickTime=2000\n"+
112 "dataDir=%s\n"+
113 "clientPort=%d\n"+
114 "maxClientCnxns=500\n",
115 srv.runDir, port)), 0666)
116}
117
118// NetworkPort returns the TCP port number that
119// the server is configured for.
120func (srv *Server) NetworkPort() (int, os.Error) {
121 f, err := os.Open(srv.path("zoo.cfg"))
122 if err != nil {
123 return 0, err
124 }
125 r := bufio.NewReader(f)
126 for {
127 line, err := r.ReadSlice('\n')
128 if err != nil {
129 return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg"))
130 }
131 var port int
132 if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 {
133 return port, nil
134 }
135 }
136 panic("not reached")
137}
138
139func (srv *Server) writeInstallDir() os.Error {
140 return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)
141}
142
143func (srv *Server) readInstallDir() os.Error {
144 data, err := ioutil.ReadFile(srv.path("installdir.txt"))
145 if err != nil {
146 return err
147 }
148 if data[len(data)-1] == '\n' {
149 data = data[0 : len(data)-1]
150 }
151 srv.installDir = string(data)
152 return nil
153}
154
155func (srv *Server) classPath() ([]string, os.Error) {
156 dir := srv.installDir
157 if dir == "" {
158 return systemClassPath()
159 }
160 if err := checkDirectory(dir); err != nil {
161 return nil, err
162 }
163 // Two possibilities, as seen in zkEnv.sh:
164 // 1) locally built binaries (jars are in build directory)
165 // 2) release binaries
166 if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
167 dir = build
168 }
169 classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
170 if err != nil {
171 panic(fmt.Errorf("glob for jar files: %v", err))
172 }
173 more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
174 if err != nil {
175 panic(fmt.Errorf("glob for lib jar files: %v", err))
176 }
177
178 classPath = append(classPath, more...)
179 if len(classPath) == 0 {
180 return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
181 }
182 return classPath, nil
183}
184
185const zookeeperEnviron = "/etc/zookeeper/conf/environment"
186
187func systemClassPath() ([]string, os.Error) {
188 f, err := os.Open(zookeeperEnviron)
189 if f == nil {
190 return nil, err
191 }
192 r := bufio.NewReader(f)
193 for {
194 line, err := r.ReadSlice('\n')
195 if err != nil {
196 break
197 }
198 if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
199 continue
200 }
201
202 // remove variable and newline
203 path := string(line[len("CLASSPATH=") : len(line)-1])
204
205 // trim white space
206 path = strings.Trim(path, " \t\r")
207
208 // strip quotes
209 if path[0] == '"' {
210 path = path[1 : len(path)-1]
211 }
212
213 // split on :
214 classPath := strings.Split(path, ":")
215
216 // split off $ZOOCFGDIR
217 if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
218 classPath = classPath[1:]
219 }
220
221 if len(classPath) == 0 {
222 return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
223 }
224 return classPath, nil
225 }
226 return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
227}
228
229// checkDirectory returns an error if the given path
230// does not exist or is not a directory.
231func checkDirectory(path string) os.Error {
232 if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
233 if err == nil {
234 err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
235 }
236 return err
237 }
238 return nil
239}
0240
=== added directory 'service'
=== added file 'service/Makefile'
--- service/Makefile 1970-01-01 00:00:00 +0000
+++ service/Makefile 2011-10-03 17:02:23 +0000
@@ -0,0 +1,20 @@
1include $(GOROOT)/src/Make.inc
2
3TARG=launchpad.net/gozk/zk/service
4
5GOFILES=\
6 service.go\
7
8GOFMT=gofmt
9BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go))
10
11gofmt: $(BADFMT)
12 @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done
13
14ifneq ($(BADFMT),)
15ifneq ($(MAKECMDGOALS),gofmt)
16$(warning WARNING: make gofmt: $(BADFMT))
17endif
18endif
19
20include $(GOROOT)/src/Make.pkg
021
=== added file 'service/service.go'
--- service/service.go 1970-01-01 00:00:00 +0000
+++ service/service.go 2011-10-03 17:02:23 +0000
@@ -0,0 +1,160 @@
1// The service package provides support for long-running services.
2package service
3
4import (
5 "os"
6 "fmt"
7 "io/ioutil"
8 "strconv"
9 "path/filepath"
10 "exec"
11 "strings"
12 "time"
13)
14
15// Interface represents a single-process server.
16type Interface interface {
17 // Directory gives the path to the directory containing
18 // the server's configuration and data files. It is assumed that
19 // this exists and can be modified.
20 Directory() string
21
22 // CheckAvailability should return an error if resources
23 // required by the server are not available; for instance
24 // if the server requires a network port which is not free.
25 CheckAvailability() os.Error
26
27 // Command should return a command that can be
28 // used to run the server. The first element of the returned
29 // slice is the executable name; the rest of the elements are
30 // its arguments.
31 Command() ([]string, os.Error)
32}
33
34// Service represents a possibly running server process.
35type Service struct {
36 srv Interface
37}
38
39// New returns a new Service using the given
40// server interface.
41func New(srv Interface) *Service {
42 return &Service{srv}
43}
44
45// Process returns a reference to the identity
46// of the last running server in the Service's directory.
47// If the server was shut down, it returns (nil, nil).
48// The server process may not still be running
49// (for instance if it was terminated abnormally).
50// This function is provided for debugging and testing
51// purposes only.
52func (s *Service) Process() (*os.Process, os.Error) {
53 data, err := ioutil.ReadFile(s.path("pid.txt"))
54 if err != nil {
55 if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT {
56 return nil, nil
57 }
58 return nil, err
59 }
60 pid, err := strconv.Atoi(string(data))
61 if err != nil {
62 return nil, os.NewError("bad process id found in pid.txt")
63 }
64 return os.FindProcess(pid)
65}
66
67func (s *Service) path(name string) string {
68 return filepath.Join(s.srv.Directory(), name)
69}
70
71// Start starts the server. It returns an error if the server is already running.
72// It stores the process ID of the server in the file "pid.txt"
73// inside the server's directory. Stdout and stderr of the server
74// are similarly written to "log.txt".
75func (s *Service) Start() os.Error {
76 if err := s.srv.CheckAvailability(); err != nil {
77 return err
78 }
79 p, err := s.Process()
80 if p != nil || err != nil {
81 if p != nil {
82 p.Release()
83 }
84 return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt"))
85 }
86
87 // create the pid file before starting the process so that if we get two
88 // programs trying to concurrently start a server on the same directory
89 // at the same time, only one should succeed.
90 pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666)
91 if err != nil {
92 return fmt.Errorf("cannot create pid.txt: %v", err)
93 }
94 defer pidf.Close()
95
96 args, err := s.srv.Command()
97 if err != nil {
98 return fmt.Errorf("cannot determine command: %v", err)
99 }
100 cmd := exec.Command(args[0], args[1:]...)
101
102 logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
103 if err != nil {
104 return fmt.Errorf("cannot create log file: %v", err)
105 }
106 defer logf.Close()
107 cmd.Stdout = logf
108 cmd.Stderr = logf
109 if err := cmd.Start(); err != nil {
110 return fmt.Errorf("cannot start server: %v", err)
111 }
112 if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil {
113 return fmt.Errorf("cannot write pid file: %v", err)
114 }
115 return nil
116}
117
118// Stop kills the server. It is a no-op if it is already running.
119func (s *Service) Stop() os.Error {
120 p, err := s.Process()
121 if p == nil {
122 if err != nil {
123 return fmt.Errorf("cannot read process ID of server: %v", err)
124 }
125 return nil
126 }
127 defer p.Release()
128 if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") {
129 return fmt.Errorf("cannot kill server process: %v", err)
130 }
131 // ignore the error returned from Wait because there's little
132 // we can do about it - it either means that the process has just exited
133 // anyway or that we can't wait for it for some other reason,
134 // for example because it was originally started by some other process.
135 _, err = p.Wait(0)
136 if err != nil && strings.Contains(err.String(), "no child processes") {
137 // If we can't wait for the server, it's possible that it was running
138 // but not as a child of this process, so give it a little while
139 // to exit. TODO poll with kill(no signal)?
140 time.Sleep(0.5e9)
141 }
142
143 if err := os.Remove(s.path("pid.txt")); err != nil {
144 return fmt.Errorf("cannot remove server process ID file: %v", err)
145 }
146 return nil
147}
148
149// Destroy stops the server, and then removes its
150// directory and all its contents.
151// Warning: this will destroy all data associated with the server.
152func (s *Service) Destroy() os.Error {
153 if err := s.Stop(); err != nil {
154 return err
155 }
156 if err := os.RemoveAll(s.srv.Directory()); err != nil {
157 return err
158 }
159 return nil
160}
0161
=== modified file 'suite_test.go'
--- suite_test.go 2011-08-19 01:43:37 +0000
+++ suite_test.go 2011-10-03 17:02:23 +0000
@@ -1,65 +1,48 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "testing"
6 "io/ioutil"
7 "path"
8 "fmt"5 "fmt"
6 "launchpad.net/gozk/zk/service"
7 "launchpad.net/gozk/zk"
9 "os"8 "os"
10 "gozk"9 "testing"
11 "time"10 "time"
12)11)
1312
14func TestAll(t *testing.T) {13func TestAll(t *testing.T) {
15 TestingT(t)14 if !*reattach {
15 TestingT(t)
16 }
16}17}
1718
18var _ = Suite(&S{})19var _ = Suite(&S{})
1920
20type S struct {21type S struct {
21 zkRoot string22 zkServer *service.Service
22 zkTestRoot string23 zkTestRoot string
23 zkTestPort int24 zkAddr string
24 zkServerSh string
25 zkServerOut *os.File
26 zkAddr string
2725
28 handles []*gozk.ZooKeeper26 handles []*zk.Conn
29 events []*gozk.Event27 events []*zk.Event
30 liveWatches int28 liveWatches int
31 deadWatches chan bool29 deadWatches chan bool
32}30}
3331
34var logLevel = 0 //gozk.LOG_ERROR32var logLevel = 0 //zk.LOG_ERROR
3533
3634func (s *S) init(c *C) (*zk.Conn, chan zk.Event) {
37var testZooCfg = ("dataDir=%s\n" +35 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
38 "clientPort=%d\n" +
39 "tickTime=2000\n" +
40 "initLimit=10\n" +
41 "syncLimit=5\n" +
42 "")
43
44var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
45 "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
46 "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
47 "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
48 "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
49 "")
50
51func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
52 zk, watch, err := gozk.Init(s.zkAddr, 5e9)
53 c.Assert(err, IsNil)36 c.Assert(err, IsNil)
5437
55 s.handles = append(s.handles, zk)38 s.handles = append(s.handles, conn)
5639
57 event := <-watch40 event := <-watch
5841
59 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)42 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
60 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)43 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
6144
62 bufferedWatch := make(chan gozk.Event, 256)45 bufferedWatch := make(chan zk.Event, 256)
63 bufferedWatch <- event46 bufferedWatch <- event
6447
65 s.liveWatches += 148 s.liveWatches += 1
@@ -82,13 +65,13 @@
82 s.deadWatches <- true65 s.deadWatches <- true
83 }()66 }()
8467
85 return zk, bufferedWatch68 return conn, bufferedWatch
86}69}
8770
88func (s *S) SetUpTest(c *C) {71func (s *S) SetUpTest(c *C) {
89 c.Assert(gozk.CountPendingWatches(), Equals, 0,72 c.Assert(zk.CountPendingWatches(), Equals, 0,
90 Bug("Test got a dirty watch state before running!"))73 Bug("Test got a dirty watch state before running!"))
91 gozk.SetLogLevel(logLevel)74 zk.SetLogLevel(logLevel)
92}75}
9376
94func (s *S) TearDownTest(c *C) {77func (s *S) TearDownTest(c *C) {
@@ -108,98 +91,38 @@
108 }91 }
10992
110 // Reset the list of handles.93 // Reset the list of handles.
111 s.handles = make([]*gozk.ZooKeeper, 0)94 s.handles = make([]*zk.Conn, 0)
11295
113 c.Assert(gozk.CountPendingWatches(), Equals, 0,96 c.Assert(zk.CountPendingWatches(), Equals, 0,
114 Bug("Test left live watches behind!"))97 Bug("Test left live watches behind!"))
115}98}
11699
117// We use the suite set up and tear down to manage a custom zookeeper100// We use the suite set up and tear down to manage a custom ZooKeeper
118//101//
119func (s *S) SetUpSuite(c *C) {102func (s *S) SetUpSuite(c *C) {
120103 var err os.Error
121 s.deadWatches = make(chan bool)104 s.deadWatches = make(chan bool)
122105
123 var err os.Error106 // N.B. We meed to create a subdirectory because zk.CreateServer
124107 // insists on creating its own directory.
125 s.zkRoot = os.Getenv("ZKROOT")108 s.zkTestRoot = c.MkDir() + "/zk"
126 if s.zkRoot == "" {109
127 panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")110 port := 21812
128 }111 s.zkAddr = fmt.Sprint("localhost:", port)
129112
130 s.zkTestRoot = c.MkDir()113 srv, err := zk.CreateServer(port, s.zkTestRoot, "")
131 s.zkTestPort = 21812114 if err != nil {
132115 c.Fatal("Cannot set up server environment: ", err)
133 println("ZooKeeper test server directory:", s.zkTestRoot)116 }
134 println("ZooKeeper test server port:", s.zkTestPort)117 s.zkServer = service.New(srv)
135118 err = s.zkServer.Start()
136 s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)119 if err != nil {
137120 c.Fatal("Cannot start ZooKeeper server: ", err)
138 s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")121 }
139 s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
140 os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
141 if err != nil {
142 panic("Can't open stdout.txt file for server: " + err.String())
143 }
144
145 dataDir := path.Join(s.zkTestRoot, "data")
146 confDir := path.Join(s.zkTestRoot, "conf")
147
148 os.Mkdir(dataDir, 0755)
149 os.Mkdir(confDir, 0755)
150
151 err = os.Setenv("ZOOCFGDIR", confDir)
152 if err != nil {
153 panic("Can't set $ZOOCFGDIR: " + err.String())
154 }
155
156 zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
157 err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
158 if err != nil {
159 panic("Can't write zoo.cfg: " + err.String())
160 }
161
162 log4jPrp := []byte(testLog4jPrp)
163 err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
164 if err != nil {
165 panic("Can't write log4j.properties: " + err.String())
166 }
167
168 s.StartZK()
169}122}
170123
171func (s *S) TearDownSuite(c *C) {124func (s *S) TearDownSuite(c *C) {
172 s.StopZK()125 if s.zkServer != nil {
173 s.zkServerOut.Close()126 s.zkServer.Stop() // TODO Destroy
174}
175
176func (s *S) StartZK() {
177 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
178 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)
179 if err != nil {
180 panic("Problem executing zkServer.sh start: " + err.String())
181 }
182
183 result, err := proc.Wait(0)
184 if err != nil {
185 panic(err.String())
186 } else if result.ExitStatus() != 0 {
187 panic("'zkServer.sh start' exited with non-zero status")
188 }
189}
190
191func (s *S) StopZK() {
192 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
193 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)
194 if err != nil {
195 panic("Problem executing zkServer.sh stop: " + err.String() +
196 " (look for runaway java processes!)")
197 }
198 result, err := proc.Wait(0)
199 if err != nil {
200 panic(err.String())
201 } else if result.ExitStatus() != 0 {
202 panic("'zkServer.sh stop' exited with non-zero status " +
203 "(look for runaway java processes!)")
204 }127 }
205}128}
206129
=== renamed file 'gozk.go' => 'zk.go'
--- gozk.go 2011-08-19 01:56:39 +0000
+++ zk.go 2011-10-03 17:02:23 +0000
@@ -1,4 +1,4 @@
1// gozk - Zookeeper support for the Go language1// gozk - ZooKeeper support for the Go language
2//2//
3// https://wiki.ubuntu.com/gozk3// https://wiki.ubuntu.com/gozk
4//4//
@@ -6,7 +6,7 @@
6//6//
7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
8//8//
9package gozk9package zk
1010
11/*11/*
12#cgo CFLAGS: -I/usr/include/c-client-src12#cgo CFLAGS: -I/usr/include/c-client-src
@@ -27,17 +27,16 @@
27// -----------------------------------------------------------------------27// -----------------------------------------------------------------------
28// Main constants and data types.28// Main constants and data types.
2929
30// The main ZooKeeper object, created through the Init function.30// Conn represents a connection to a set of ZooKeeper nodes.
31// Encapsulates all communication with ZooKeeper.31type Conn struct {
32type ZooKeeper struct {
33 watchChannels map[uintptr]chan Event32 watchChannels map[uintptr]chan Event
34 sessionWatchId uintptr33 sessionWatchId uintptr
35 handle *C.zhandle_t34 handle *C.zhandle_t
36 mutex sync.Mutex35 mutex sync.Mutex
37}36}
3837
39// ClientId represents the established session in ZooKeeper. This is only38// ClientId represents an established ZooKeeper session. It can be
40// useful to be passed back into the ReInit function.39// passed into Redial to reestablish a connection to an existing session.
41type ClientId struct {40type ClientId struct {
42 cId C.clientid_t41 cId C.clientid_t
43}42}
@@ -59,25 +58,21 @@
59// through one of the W-suffixed functions (GetW, ExistsW, etc).58// through one of the W-suffixed functions (GetW, ExistsW, etc).
60// 59//
61// The session channel will only receive session-level events notifying60// The session channel will only receive session-level events notifying
62// about critical and transient changes in the ZooKeeper connection61// about changes in the ZooKeeper connection state (STATE_CONNECTED,
63// state (STATE_CONNECTED, STATE_EXPIRED_SESSION, etc). On long62// STATE_EXPIRED_SESSION, etc). On long running applications the session
64// running applications the session channel must *necessarily* be63// channel must *necessarily* be observed since certain events like session
65// observed since certain events like session expirations require an64// expirations require an explicit reconnection and reestablishment of
66// explicit reconnection and reestablishment of state (or bailing out).65// state (or bailing out). Because of that, the buffer used on the session
67// Because of that, the buffer used on the session channel has a limited66// channel has a limited size, and a panic will occur if too many events
68// size, and a panic will occur if too many events are not collected.67// are not collected.
69//68//
70// Watch channels enable monitoring state for nodes, and the69// Watch channels enable monitoring state for nodes, and the
71// moment they're fired depends on which function was called to70// moment they're fired depends on which function was called to
72// create them. Note that, unlike in other ZooKeeper interfaces,71// create them. Two critical session events, STATE_EXPIRED_SESSION
73// gozk will NOT dispatch unimportant session events such as72// and STATE_EXPIRED_SESSION, will be delivered to all
74// STATE_ASSOCIATING, STATE_CONNECTING and STATE_CONNECTED to73// node watch channels if they occur.
75// watch Event channels, since they are transient and disruptive
76// to the workflow. Critical state changes such as expirations
77// are still delivered to all event channels, though, and the
78// transient events may be obsererved in the session channel.
79//74//
80// Since every watch channel may receive critical session events, events75// Since every watch channel may receive these critical session events, events
81// received must not be handled blindly as if the watch requested has76// received must not be handled blindly as if the watch requested has
82// been fired. To facilitate such tests, Events offer the Ok method,77// been fired. To facilitate such tests, Events offer the Ok method,
83// and they also have a good String method so they may be used as an78// and they also have a good String method so they may be used as an
@@ -89,44 +84,78 @@
89// return84// return
90// }85// }
91//86//
92// Note that closed channels will deliver zeroed Event, which means87// Note that closed channels will deliver a zeroed Event, which means
93// event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED,88// event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED,
94// to facilitate handling.89// to facilitate handling.
95type Event struct {90type Event struct {
96 Type int91 // Type gives the type of event (one of the EVENT_* constants).
97 Path string92 // If Type is EVENT_SESSION, then the events is a session
93 // event.
94 Type int
95
96 // For non-session events, Path gives the path of the node
97 // that was being watched.
98 Path string
99
100 // For session events, State (one of the STATE* constants) gives the session
101 // status.
98 State int102 State int
99}103}
100104
101// Error codes that may be used to verify the result of the105// Error represents a ZooKeeper error.
102// Code method from Error.106type Error int
107
103const (108const (
104 ZOK = C.ZOK109 ZOK Error = C.ZOK
105 ZSYSTEMERROR = C.ZSYSTEMERROR110 ZSYSTEMERROR Error = C.ZSYSTEMERROR
106 ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY111 ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
107 ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY112 ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
108 ZCONNECTIONLOSS = C.ZCONNECTIONLOSS113 ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
109 ZMARSHALLINGERROR = C.ZMARSHALLINGERROR114 ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
110 ZUNIMPLEMENTED = C.ZUNIMPLEMENTED115 ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
111 ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT116 ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
112 ZBADARGUMENTS = C.ZBADARGUMENTS117 ZBADARGUMENTS Error = C.ZBADARGUMENTS
113 ZINVALIDSTATE = C.ZINVALIDSTATE118 ZINVALIDSTATE Error = C.ZINVALIDSTATE
114 ZAPIERROR = C.ZAPIERROR119 ZAPIERROR Error = C.ZAPIERROR
115 ZNONODE = C.ZNONODE120 ZNONODE Error = C.ZNONODE
116 ZNOAUTH = C.ZNOAUTH121 ZNOAUTH Error = C.ZNOAUTH
117 ZBADVERSION = C.ZBADVERSION122 ZBADVERSION Error = C.ZBADVERSION
118 ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS123 ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
119 ZNODEEXISTS = C.ZNODEEXISTS124 ZNODEEXISTS Error = C.ZNODEEXISTS
120 ZNOTEMPTY = C.ZNOTEMPTY125 ZNOTEMPTY Error = C.ZNOTEMPTY
121 ZSESSIONEXPIRED = C.ZSESSIONEXPIRED126 ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
122 ZINVALIDCALLBACK = C.ZINVALIDCALLBACK127 ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
123 ZINVALIDACL = C.ZINVALIDACL128 ZINVALIDACL Error = C.ZINVALIDACL
124 ZAUTHFAILED = C.ZAUTHFAILED129 ZAUTHFAILED Error = C.ZAUTHFAILED
125 ZCLOSING = C.ZCLOSING130 ZCLOSING Error = C.ZCLOSING
126 ZNOTHING = C.ZNOTHING131 ZNOTHING Error = C.ZNOTHING
127 ZSESSIONMOVED = C.ZSESSIONMOVED132 ZSESSIONMOVED Error = C.ZSESSIONMOVED
128)133)
129134
135func (error Error) String() string {
136 return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
137}
138
139// zkError creates an appropriate error return from
140// a ZooKeeper status and the errno return from a C API
141// call.
142func zkError(rc C.int, cerr os.Error) os.Error {
143 code := Error(rc)
144 switch code {
145 case ZOK:
146 return nil
147
148 case ZSYSTEMERROR:
149 // If a ZooKeeper call returns ZSYSTEMERROR, then
150 // errno becomes significant. If errno has not been
151 // set, then we will return ZSYSTEMERROR nonetheless.
152 if cerr != nil {
153 return cerr
154 }
155 }
156 return code
157}
158
130// Constants for SetLogLevel.159// Constants for SetLogLevel.
131const (160const (
132 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR161 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
@@ -141,8 +170,8 @@
141170
142// Constants for Create's flags parameter.171// Constants for Create's flags parameter.
143const (172const (
144 EPHEMERAL = 1 << iota173 EPHEMERAL = 1 << iota // The node will be deleted when the current session ends.
145 SEQUENCE174 SEQUENCE // Append a sequence number to the node name.
146)175)
147176
148// Constants for ACL Perms.177// Constants for ACL Perms.
@@ -273,104 +302,79 @@
273}302}
274303
275// -----------------------------------------------------------------------304// -----------------------------------------------------------------------
276// Error interface which maps onto the ZooKeeper error codes.
277
278type Error interface {
279 String() string
280 Code() int
281}
282
283type errorType struct {
284 zkrc C.int
285 err os.Error
286}
287
288func newError(zkrc C.int, err os.Error) Error {
289 return &errorType{zkrc, err}
290}
291
292func (error *errorType) String() (result string) {
293 if error.zkrc == ZSYSTEMERROR && error.err != nil {
294 result = error.err.String()
295 } else {
296 result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
297 }
298 return
299}
300
301// Code returns the error code that may be compared against one of
302// the gozk.Z* constants.
303func (error *errorType) Code() int {
304 return int(error.zkrc)
305}
306
307// -----------------------------------------------------------------------
308// Stat interface which maps onto the ZooKeeper Stat struct.
309
310// We declare this as an interface rather than an actual struct because
311// this way we don't have to copy data around between the real C struct
312// and the Go one on every call. Most uses will only touch a few elements,
313// or even ignore the stat entirely, so that's a win.
314305
315// Stat contains detailed information about a node.306// Stat contains detailed information about a node.
316type Stat interface {307type Stat struct {
317 Czxid() int64308 c C.struct_Stat
318 Mzxid() int64309}
319 CTime() int64310
320 MTime() int64311func (stat *Stat) String() string {
321 Version() int32312 return fmt.Sprintf(
322 CVersion() int32313 "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+
323 AVersion() int32314 "Version: %d; CVersion: %d; AVersion: %d; "+
324 EphemeralOwner() int64315 "EphemeralOwner: %d; DataLength: %d; "+
325 DataLength() int32316 "NumChildren: %d; Pzxid: %d}",
326 NumChildren() int32317 stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(),
327 Pzxid() int64318 stat.Version(), stat.CVersion(), stat.AVersion(),
328}319 stat.EphemeralOwner(), stat.DataLength(),
329320 stat.NumChildren(), stat.Pzxid(),
330type resultStat C.struct_Stat321 )
331322}
332func (stat *resultStat) Czxid() int64 {323
333 return int64(stat.czxid)324// Czxid returns the zxid of the change that caused the node to be created.
334}325func (stat *Stat) Czxid() int64 {
335326 return int64(stat.c.czxid)
336func (stat *resultStat) Mzxid() int64 {327}
337 return int64(stat.mzxid)328
338}329// Mzxid returns the zxid of the change that last modified the node.
339330func (stat *Stat) Mzxid() int64 {
340func (stat *resultStat) CTime() int64 {331 return int64(stat.c.mzxid)
341 return int64(stat.ctime)332}
342}333
343334// CTime returns the time in milliseconds from epoch when the node was created.
344func (stat *resultStat) MTime() int64 {335func (stat *Stat) CTime() int64 {
345 return int64(stat.mtime)336 return int64(stat.c.ctime)
346}337}
347338
348func (stat *resultStat) Version() int32 {339// MTime returns the time in milliseconds from epoch when the node was last modified.
349 return int32(stat.version)340func (stat *Stat) MTime() int64 {
350}341 return int64(stat.c.mtime)
351342}
352func (stat *resultStat) CVersion() int32 {343
353 return int32(stat.cversion)344// Version returns the number of changes to the data of the node.
354}345func (stat *Stat) Version() int32 {
355346 return int32(stat.c.version)
356func (stat *resultStat) AVersion() int32 {347}
357 return int32(stat.aversion)348
358}349// CVersion returns the number of changes to the children of the node.
359350func (stat *Stat) CVersion() int32 {
360func (stat *resultStat) EphemeralOwner() int64 {351 return int32(stat.c.cversion)
361 return int64(stat.ephemeralOwner)352}
362}353
363354// AVersion returns the number of changes to the ACL of the node.
364func (stat *resultStat) DataLength() int32 {355func (stat *Stat) AVersion() int32 {
365 return int32(stat.dataLength)356 return int32(stat.c.aversion)
366}357}
367358
368func (stat *resultStat) NumChildren() int32 {359// If the node is an ephemeral node, EphemeralOwner returns the session id
369 return int32(stat.numChildren)360// of the owner of the node; otherwise it will return zero.
370}361func (stat *Stat) EphemeralOwner() int64 {
371362 return int64(stat.c.ephemeralOwner)
372func (stat *resultStat) Pzxid() int64 {363}
373 return int64(stat.pzxid)364
365// DataLength returns the length of the data in the node in bytes.
366func (stat *Stat) DataLength() int32 {
367 return int32(stat.c.dataLength)
368}
369
370// NumChildren returns the number of children of the znode.
371func (stat *Stat) NumChildren() int32 {
372 return int32(stat.c.numChildren)
373}
374
375// Pzxid returns the Pzxid of the node, whatever that is.
376func (stat *Stat) Pzxid() int64 {
377 return int64(stat.c.pzxid)
374}378}
375379
376// -----------------------------------------------------------------------380// -----------------------------------------------------------------------
@@ -380,11 +384,13 @@
380384
381// SetLogLevel changes the minimum level of logging output generated385// SetLogLevel changes the minimum level of logging output generated
382// to adjust the amount of information provided.386// to adjust the amount of information provided.
387// The default is LOG_ERROR. If level is zero, no logging output
388// will be generated.
383func SetLogLevel(level int) {389func SetLogLevel(level int) {
384 C.zoo_set_debug_level(C.ZooLogLevel(level))390 C.zoo_set_debug_level(C.ZooLogLevel(level))
385}391}
386392
387// Init initializes the communication with a ZooKeeper cluster. The provided393// Dial initializes the communication with a ZooKeeper cluster. The provided
388// servers parameter may include multiple server addresses, separated394// servers parameter may include multiple server addresses, separated
389// by commas, so that the client will automatically attempt to connect395// by commas, so that the client will automatically attempt to connect
390// to another server if one of them stops working for whatever reason.396// to another server if one of them stops working for whatever reason.
@@ -398,79 +404,73 @@
398// The watch channel receives events of type SESSION_EVENT when any change404// The watch channel receives events of type SESSION_EVENT when any change
399// to the state of the established connection happens. See the documentation405// to the state of the established connection happens. See the documentation
400// for the Event type for more details.406// for the Event type for more details.
401func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {407func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) {
402 zk, watch, err = internalInit(servers, recvTimeoutNS, nil)408 return dial(servers, recvTimeoutNS, nil)
403 return
404}409}
405410
406// Equivalent to Init, but attempt to reestablish an existing session411// Redial is equivalent to Dial, but attempts to reestablish an existing session
407// identified via the clientId parameter.412// identified via the clientId parameter.
408func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {413func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
409 zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)414 return dial(servers, recvTimeoutNS, clientId)
410 return
411}415}
412416
413func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {417func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
414418 conn := &Conn{}
415 zk := &ZooKeeper{}419 conn.watchChannels = make(map[uintptr]chan Event)
416 zk.watchChannels = make(map[uintptr]chan Event)
417420
418 var cId *C.clientid_t421 var cId *C.clientid_t
419 if clientId != nil {422 if clientId != nil {
420 cId = &clientId.cId423 cId = &clientId.cId
421 }424 }
422425
423 watchId, watchChannel := zk.createWatch(true)426 watchId, watchChannel := conn.createWatch(true)
424 zk.sessionWatchId = watchId427 conn.sessionWatchId = watchId
425428
426 cservers := C.CString(servers)429 cservers := C.CString(servers)
427 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)430 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
428 C.free(unsafe.Pointer(cservers))431 C.free(unsafe.Pointer(cservers))
429 if handle == nil {432 if handle == nil {
430 zk.closeAllWatches()433 conn.closeAllWatches()
431 return nil, nil, newError(ZSYSTEMERROR, cerr)434 return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
432 }435 }
433 zk.handle = handle436 conn.handle = handle
434 runWatchLoop()437 runWatchLoop()
435 return zk, watchChannel, nil438 return conn, watchChannel, nil
436}439}
437440
438// ClientId returns the client ID for the existing session with ZooKeeper.441// ClientId returns the client ID for the existing session with ZooKeeper.
439// This is useful to reestablish an existing session via ReInit.442// This is useful to reestablish an existing session via ReInit.
440func (zk *ZooKeeper) ClientId() *ClientId {443func (conn *Conn) ClientId() *ClientId {
441 return &ClientId{*C.zoo_client_id(zk.handle)}444 return &ClientId{*C.zoo_client_id(conn.handle)}
442}445}
443446
444// Close terminates the ZooKeeper interaction.447// Close terminates the ZooKeeper interaction.
445func (zk *ZooKeeper) Close() Error {448func (conn *Conn) Close() os.Error {
446449
447 // Protect from concurrency around zk.handle change.450 // Protect from concurrency around conn.handle change.
448 zk.mutex.Lock()451 conn.mutex.Lock()
449 defer zk.mutex.Unlock()452 defer conn.mutex.Unlock()
450453
451 if zk.handle == nil {454 if conn.handle == nil {
452 // ZooKeeper may hang indefinitely if a handler is closed twice,455 // ZooKeeper may hang indefinitely if a handler is closed twice,
453 // so we get in the way and prevent it from happening.456 // so we get in the way and prevent it from happening.
454 return newError(ZCLOSING, nil)457 return ZCLOSING
455 }458 }
456 rc, cerr := C.zookeeper_close(zk.handle)459 rc, cerr := C.zookeeper_close(conn.handle)
457460
458 zk.closeAllWatches()461 conn.closeAllWatches()
459 stopWatchLoop()462 stopWatchLoop()
460463
461 // At this point, nothing else should need zk.handle.464 // At this point, nothing else should need conn.handle.
462 zk.handle = nil465 conn.handle = nil
463466
464 if rc != C.ZOK {467 return zkError(rc, cerr)
465 return newError(rc, cerr)
466 }
467 return nil
468}468}
469469
470// Get returns the data and status from an existing node. err will be nil,470// Get returns the data and status from an existing node. err will be nil,
471// unless an error is found. Attempting to retrieve data from a non-existing471// unless an error is found. Attempting to retrieve data from a non-existing
472// node is an error.472// node is an error.
473func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {473func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
474474
475 cpath := C.CString(path)475 cpath := C.CString(path)
476 cbuffer := (*C.char)(C.malloc(bufferSize))476 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -478,22 +478,22 @@
478 defer C.free(unsafe.Pointer(cpath))478 defer C.free(unsafe.Pointer(cpath))
479 defer C.free(unsafe.Pointer(cbuffer))479 defer C.free(unsafe.Pointer(cbuffer))
480480
481 cstat := C.struct_Stat{}481 var cstat Stat
482 rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,482 rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c)
483 cbuffer, &cbufferLen, &cstat)
484 if rc != C.ZOK {483 if rc != C.ZOK {
485 return "", nil, newError(rc, cerr)484 return "", nil, zkError(rc, cerr)
486 }485 }
486
487 result := C.GoStringN(cbuffer, cbufferLen)487 result := C.GoStringN(cbuffer, cbufferLen)
488488 return result, &cstat, nil
489 return result, (*resultStat)(&cstat), nil
490}489}
491490
492// GetW works like Get but also returns a channel that will receive491// GetW works like Get but also returns a channel that will receive
493// a single Event value when the data or existence of the given ZooKeeper492// a single Event value, with Type EVENT_CHANGED or EVENT_DELETED,
494// node changes or when critical session events happen. See the493// when the node contents change or it is deleted.
495// documentation of the Event type for more details.494// It will also receive a critical session event if the server goes down.
496func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {495// See the documentation of the Event type for more details.
496func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) {
497497
498 cpath := C.CString(path)498 cpath := C.CString(path)
499 cbuffer := (*C.char)(C.malloc(bufferSize))499 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -501,73 +501,69 @@
501 defer C.free(unsafe.Pointer(cpath))501 defer C.free(unsafe.Pointer(cpath))
502 defer C.free(unsafe.Pointer(cbuffer))502 defer C.free(unsafe.Pointer(cbuffer))
503503
504 watchId, watchChannel := zk.createWatch(true)504 watchId, watchChannel := conn.createWatch(true)
505505
506 cstat := C.struct_Stat{}506 var cstat Stat
507 rc, cerr := C.zoo_wget(zk.handle, cpath,507 rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c)
508 C.watch_handler, unsafe.Pointer(watchId),
509 cbuffer, &cbufferLen, &cstat)
510 if rc != C.ZOK {508 if rc != C.ZOK {
511 zk.forgetWatch(watchId)509 conn.forgetWatch(watchId)
512 return "", nil, nil, newError(rc, cerr)510 return "", nil, nil, zkError(rc, cerr)
513 }511 }
514512
515 result := C.GoStringN(cbuffer, cbufferLen)513 result := C.GoStringN(cbuffer, cbufferLen)
516 return result, (*resultStat)(&cstat), watchChannel, nil514 return result, &cstat, watchChannel, nil
517}515}
518516
519// Children returns the children list and status from an existing node.517// Children returns the children list and status from an existing node.
520// err will be nil, unless an error is found. Attempting to retrieve the518// Attempting to retrieve the children list from a non-existent node is an error.
521// children list from a non-existent node is an error.519func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
522func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
523520
524 cpath := C.CString(path)521 cpath := C.CString(path)
525 defer C.free(unsafe.Pointer(cpath))522 defer C.free(unsafe.Pointer(cpath))
526523
527 cvector := C.struct_String_vector{}524 cvector := C.struct_String_vector{}
528 cstat := C.struct_Stat{}525 var cstat Stat
529 rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,526 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
530 &cvector, &cstat)
531527
532 // Can't happen if rc != 0, but avoid potential memory leaks in the future.528 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
533 if cvector.count != 0 {529 if cvector.count != 0 {
534 children = parseStringVector(&cvector)530 children = parseStringVector(&cvector)
535 }531 }
536 if rc != C.ZOK {532 if rc == C.ZOK {
537 err = newError(rc, cerr)533 stat = &cstat
538 } else {534 } else {
539 stat = (*resultStat)(&cstat)535 err = zkError(rc, cerr)
540 }536 }
541 return537 return
542}538}
543539
544// ChildrenW works like Children but also returns a channel that will540// ChildrenW works like Children but also returns a channel that will
545// receive a single Event value when a node is added or removed under the541// receive a single Event value when a child of the given path is
546// provided path or when critical session events happen. See the documentation542// added or removed (EVENT_CHILD), or when the path node itself is
547// of the Event type for more details.543// deleted (EVENT_DELETED).
548func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {544// It will also receive a critical session event if the server goes down.
545// See the documentation of the Event type for more details.
546func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) {
549547
550 cpath := C.CString(path)548 cpath := C.CString(path)
551 defer C.free(unsafe.Pointer(cpath))549 defer C.free(unsafe.Pointer(cpath))
552550
553 watchId, watchChannel := zk.createWatch(true)551 watchId, watchChannel := conn.createWatch(true)
554552
555 cvector := C.struct_String_vector{}553 cvector := C.struct_String_vector{}
556 cstat := C.struct_Stat{}554 var cstat Stat
557 rc, cerr := C.zoo_wget_children2(zk.handle, cpath,555 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c)
558 C.watch_handler, unsafe.Pointer(watchId),
559 &cvector, &cstat)
560556
561 // Can't happen if rc != 0, but avoid potential memory leaks in the future.557 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
562 if cvector.count != 0 {558 if cvector.count != 0 {
563 children = parseStringVector(&cvector)559 children = parseStringVector(&cvector)
564 }560 }
565 if rc != C.ZOK {561 if rc == C.ZOK {
566 zk.forgetWatch(watchId)562 stat = &cstat
567 err = newError(rc, cerr)563 watch = watchChannel
568 } else {564 } else {
569 stat = (*resultStat)(&cstat)565 conn.forgetWatch(watchId)
570 watch = watchChannel566 err = zkError(rc, cerr)
571 }567 }
572 return568 return
573}569}
@@ -588,51 +584,51 @@
588// Exists checks if a node exists at the given path. If it does,584// Exists checks if a node exists at the given path. If it does,
589// stat will contain meta information on the existing node, otherwise585// stat will contain meta information on the existing node, otherwise
590// it will be nil.586// it will be nil.
591func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {587func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) {
592 cpath := C.CString(path)588 cpath := C.CString(path)
593 defer C.free(unsafe.Pointer(cpath))589 defer C.free(unsafe.Pointer(cpath))
594590
595 cstat := C.struct_Stat{}591 var cstat Stat
596 rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)592 rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c)
597593
598 // We diverge a bit from the usual here: a ZNONODE is not an error594 // We diverge a bit from the usual here: a ZNONODE is not an error
599 // for an exists call, otherwise every Exists call would have to check595 // for an exists call, otherwise every Exists call would have to check
600 // for err != nil and err.Code() != ZNONODE.596 // for err != nil and err.Code() != ZNONODE.
601 if rc == C.ZOK {597 if rc == C.ZOK {
602 stat = (*resultStat)(&cstat)598 stat = &cstat
603 } else if rc != C.ZNONODE {599 } else if rc != C.ZNONODE {
604 err = newError(rc, cerr)600 err = zkError(rc, cerr)
605 }601 }
606 return602 return
607}603}
608604
609// ExistsW works like Exists but also returns a channel that will605// ExistsW works like Exists but also returns a channel that will
610// receive an Event value when a node is created in case the returned606// receive a single Event, with Type EVENT_CREATED, EVENT_DELETED
611// stat is nil and the node didn't exist, or when the existing node607// or EVENT_CHANGED, when the node is next created, removed,
612// is removed. It will also receive critical session events. See the608// or its contents change.
613// documentation of the Event type for more details.609// It will also receive a critical session event if the server goes down.
614func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {610// See the documentation of the Event type for more details.
611func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) {
615 cpath := C.CString(path)612 cpath := C.CString(path)
616 defer C.free(unsafe.Pointer(cpath))613 defer C.free(unsafe.Pointer(cpath))
617614
618 watchId, watchChannel := zk.createWatch(true)615 watchId, watchChannel := conn.createWatch(true)
619616
620 cstat := C.struct_Stat{}617 var cstat Stat
621 rc, cerr := C.zoo_wexists(zk.handle, cpath,618 rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
622 C.watch_handler, unsafe.Pointer(watchId), &cstat)
623619
624 // We diverge a bit from the usual here: a ZNONODE is not an error620 // We diverge a bit from the usual here: a ZNONODE is not an error
625 // for an exists call, otherwise every Exists call would have to check621 // for an exists call, otherwise every Exists call would have to check
626 // for err != nil and err.Code() != ZNONODE.622 // for err != nil and err.Code() != ZNONODE.
627 switch rc {623 switch Error(rc) {
628 case ZOK:624 case ZOK:
629 stat = (*resultStat)(&cstat)625 stat = &cstat
630 watch = watchChannel626 watch = watchChannel
631 case ZNONODE:627 case ZNONODE:
632 watch = watchChannel628 watch = watchChannel
633 default:629 default:
634 zk.forgetWatch(watchId)630 conn.forgetWatch(watchId)
635 err = newError(rc, cerr)631 err = zkError(rc, cerr)
636 }632 }
637 return633 return
638}634}
@@ -646,7 +642,7 @@
646// The returned path is useful in cases where the created path may differ642// The returned path is useful in cases where the created path may differ
647// from the requested one, such as when a sequence number is appended643// from the requested one, such as when a sequence number is appended
648// to it due to the use of the gozk.SEQUENCE flag.644// to it due to the use of the gozk.SEQUENCE flag.
649func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {645func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
650 cpath := C.CString(path)646 cpath := C.CString(path)
651 cvalue := C.CString(value)647 cvalue := C.CString(value)
652 defer C.free(unsafe.Pointer(cpath))648 defer C.free(unsafe.Pointer(cpath))
@@ -660,13 +656,13 @@
660 cpathCreated := (*C.char)(C.malloc(cpathLen))656 cpathCreated := (*C.char)(C.malloc(cpathLen))
661 defer C.free(unsafe.Pointer(cpathCreated))657 defer C.free(unsafe.Pointer(cpathCreated))
662658
663 rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),659 rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
664 caclv, C.int(flags), cpathCreated, C.int(cpathLen))660 if rc == C.ZOK {
665 if rc != C.ZOK {661 pathCreated = C.GoString(cpathCreated)
666 return "", newError(rc, cerr)662 } else {
663 err = zkError(rc, cerr)
667 }664 }
668665 return
669 return C.GoString(cpathCreated), nil
670}666}
671667
672// Set modifies the data for the existing node at the given path, replacing it668// Set modifies the data for the existing node at the given path, replacing it
@@ -677,35 +673,31 @@
677//673//
678// It is an error to attempt to set the data of a non-existing node with674// It is an error to attempt to set the data of a non-existing node with
679// this function. In these cases, use Create instead.675// this function. In these cases, use Create instead.
680func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {676func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
681677
682 cpath := C.CString(path)678 cpath := C.CString(path)
683 cvalue := C.CString(value)679 cvalue := C.CString(value)
684 defer C.free(unsafe.Pointer(cpath))680 defer C.free(unsafe.Pointer(cpath))
685 defer C.free(unsafe.Pointer(cvalue))681 defer C.free(unsafe.Pointer(cvalue))
686682
687 cstat := C.struct_Stat{}683 var cstat Stat
688684 rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c)
689 rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),685 if rc == C.ZOK {
690 C.int(version), &cstat)686 stat = &cstat
691 if rc != C.ZOK {687 } else {
692 return nil, newError(rc, cerr)688 err = zkError(rc, cerr)
693 }689 }
694690 return
695 return (*resultStat)(&cstat), nil
696}691}
697692
698// Delete removes the node at path. If version is not -1, the operation693// Delete removes the node at path. If version is not -1, the operation
699// will only succeed if the node is still at this version when the694// will only succeed if the node is still at this version when the
700// node is deleted as an atomic operation.695// node is deleted as an atomic operation.
701func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {696func (conn *Conn) Delete(path string, version int32) (err os.Error) {
702 cpath := C.CString(path)697 cpath := C.CString(path)
703 defer C.free(unsafe.Pointer(cpath))698 defer C.free(unsafe.Pointer(cpath))
704 rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))699 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
705 if rc != C.ZOK {700 return zkError(rc, cerr)
706 return newError(rc, cerr)
707 }
708 return nil
709}701}
710702
711// AddAuth adds a new authentication certificate to the ZooKeeper703// AddAuth adds a new authentication certificate to the ZooKeeper
@@ -713,7 +705,7 @@
713// authentication information, while the cert parameter provides the705// authentication information, while the cert parameter provides the
714// identity data itself. For instance, the "digest" scheme requires706// identity data itself. For instance, the "digest" scheme requires
715// a pair like "username:password" to be provided as the certificate.707// a pair like "username:password" to be provided as the certificate.
716func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {708func (conn *Conn) AddAuth(scheme, cert string) os.Error {
717 cscheme := C.CString(scheme)709 cscheme := C.CString(scheme)
718 ccert := C.CString(cert)710 ccert := C.CString(cert)
719 defer C.free(unsafe.Pointer(cscheme))711 defer C.free(unsafe.Pointer(cscheme))
@@ -725,43 +717,38 @@
725 }717 }
726 defer C.destroy_completion_data(data)718 defer C.destroy_completion_data(data)
727719
728 rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),720 rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data))
729 C.handle_void_completion, unsafe.Pointer(data))
730 if rc != C.ZOK {721 if rc != C.ZOK {
731 return newError(rc, cerr)722 return zkError(rc, cerr)
732 }723 }
733724
734 C.wait_for_completion(data)725 C.wait_for_completion(data)
735726
736 rc = C.int(uintptr(data.data))727 rc = C.int(uintptr(data.data))
737 if rc != C.ZOK {728 return zkError(rc, nil)
738 return newError(rc, nil)
739 }
740
741 return nil
742}729}
743730
744// ACL returns the access control list for path.731// ACL returns the access control list for path.
745func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {732func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
746733
747 cpath := C.CString(path)734 cpath := C.CString(path)
748 defer C.free(unsafe.Pointer(cpath))735 defer C.free(unsafe.Pointer(cpath))
749736
750 caclv := C.struct_ACL_vector{}737 caclv := C.struct_ACL_vector{}
751 cstat := C.struct_Stat{}
752738
753 rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)739 var cstat Stat
740 rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
754 if rc != C.ZOK {741 if rc != C.ZOK {
755 return nil, nil, newError(rc, cerr)742 return nil, nil, zkError(rc, cerr)
756 }743 }
757744
758 aclv := parseACLVector(&caclv)745 aclv := parseACLVector(&caclv)
759746
760 return aclv, (*resultStat)(&cstat), nil747 return aclv, &cstat, nil
761}748}
762749
763// SetACL changes the access control list for path.750// SetACL changes the access control list for path.
764func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {751func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
765752
766 cpath := C.CString(path)753 cpath := C.CString(path)
767 defer C.free(unsafe.Pointer(cpath))754 defer C.free(unsafe.Pointer(cpath))
@@ -769,12 +756,8 @@
769 caclv := buildACLVector(aclv)756 caclv := buildACLVector(aclv)
770 defer C.deallocate_ACL_vector(caclv)757 defer C.deallocate_ACL_vector(caclv)
771758
772 rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)759 rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
773 if rc != C.ZOK {760 return zkError(rc, cerr)
774 return newError(rc, cerr)
775 }
776
777 return nil
778}761}
779762
780func parseACLVector(caclv *C.struct_ACL_vector) []ACL {763func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
@@ -822,7 +805,7 @@
822// -----------------------------------------------------------------------805// -----------------------------------------------------------------------
823// RetryChange utility method.806// RetryChange utility method.
824807
825type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)808type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
826809
827// RetryChange runs changeFunc to attempt to atomically change path810// RetryChange runs changeFunc to attempt to atomically change path
828// in a lock free manner, and retries in case there was another811// in a lock free manner, and retries in case there was another
@@ -831,8 +814,7 @@
831// changeFunc must work correctly if called multiple times in case814// changeFunc must work correctly if called multiple times in case
832// the modification fails due to concurrent changes, and it may return815// the modification fails due to concurrent changes, and it may return
833// an error that will cause the the RetryChange function to stop and816// an error that will cause the the RetryChange function to stop and
834// return an Error with code ZSYSTEMERROR and the same .String() result817// return the same error.
835// as the provided error.
836//818//
837// This mechanism is not suitable for a node that is frequently modified819// This mechanism is not suitable for a node that is frequently modified
838// concurrently. For those cases, consider using a pessimistic locking820// concurrently. For those cases, consider using a pessimistic locking
@@ -845,8 +827,7 @@
845//827//
846// 2. Call the changeFunc with the current node value and stat,828// 2. Call the changeFunc with the current node value and stat,
847// or with an empty string and nil stat, if the node doesn't yet exist.829// or with an empty string and nil stat, if the node doesn't yet exist.
848// If the changeFunc returns an error, stop and return an Error with830// If the changeFunc returns an error, stop and return the same error.
849// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
850//831//
851// 3. If the changeFunc returns no errors, use the string returned as832// 3. If the changeFunc returns no errors, use the string returned as
852// the new candidate value for the node, and attempt to either create833// the new candidate value for the node, and attempt to either create
@@ -855,36 +836,32 @@
855// in the same node), repeat from step 1. If this procedure fails with any836// in the same node), repeat from step 1. If this procedure fails with any
856// other error, stop and return the error found.837// other error, stop and return the error found.
857//838//
858func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {839func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
859 for {840 for {
860 oldValue, oldStat, getErr := zk.Get(path)841 oldValue, oldStat, err := conn.Get(path)
861 if getErr != nil && getErr.Code() != ZNONODE {842 if err != nil && err != ZNONODE {
862 err = getErr843 return err
863 break844 }
864 }845 newValue, err := changeFunc(oldValue, oldStat)
865 newValue, osErr := changeFunc(oldValue, oldStat)846 if err != nil {
866 if osErr != nil {847 return err
867 return newError(ZSYSTEMERROR, osErr)848 }
868 } else if oldStat == nil {849 if oldStat == nil {
869 _, err = zk.Create(path, newValue, flags, acl)850 _, err := conn.Create(path, newValue, flags, acl)
870 if err == nil || err.Code() != ZNODEEXISTS {851 if err == nil || err != ZNODEEXISTS {
871 break852 return err
872 }853 }
873 } else if newValue == oldValue {854 continue
855 }
856 if newValue == oldValue {
874 return nil // Nothing to do.857 return nil // Nothing to do.
875 } else {858 }
876 _, err = zk.Set(path, newValue, oldStat.Version())859 _, err = conn.Set(path, newValue, oldStat.Version())
877 if err == nil {860 if err == nil || (err != ZBADVERSION && err != ZNONODE) {
878 break861 return err
879 } else {
880 code := err.Code()
881 if code != ZBADVERSION && code != ZNONODE {
882 break
883 }
884 }
885 }862 }
886 }863 }
887 return err864 panic("not reached")
888}865}
889866
890// -----------------------------------------------------------------------867// -----------------------------------------------------------------------
@@ -897,7 +874,7 @@
897// Whenever a *W method is called, it will return a channel which874// Whenever a *W method is called, it will return a channel which
898// outputs Event values. Internally, a map is used to maintain references875// outputs Event values. Internally, a map is used to maintain references
899// between an unique integer key (the watchId), and the event channel. The876// between an unique integer key (the watchId), and the event channel. The
900// watchId is then handed to the C zookeeper library as the watch context,877// watchId is then handed to the C ZooKeeper library as the watch context,
901// so that we get it back when events happen. Using an integer key as the878// so that we get it back when events happen. Using an integer key as the
902// watch context rather than a pointer is needed because there's no guarantee879// watch context rather than a pointer is needed because there's no guarantee
903// that in the future the GC will not move objects around, and also because880// that in the future the GC will not move objects around, and also because
@@ -910,13 +887,13 @@
910// Since Cgo doesn't allow calling back into Go, we actually fire a new887// Since Cgo doesn't allow calling back into Go, we actually fire a new
911// goroutine the very first time Init is called, and allow it to block888// goroutine the very first time Init is called, and allow it to block
912// in a pthread condition variable within a C function. This condition889// in a pthread condition variable within a C function. This condition
913// will only be notified once a zookeeper watch callback appends new890// will only be notified once a ZooKeeper watch callback appends new
914// entries to the event list. When this happens, the C function returns891// entries to the event list. When this happens, the C function returns
915// and we get back into Go land with the pointer to the watch data,892// and we get back into Go land with the pointer to the watch data,
916// including the watchId and other event details such as type and path.893// including the watchId and other event details such as type and path.
917894
918var watchMutex sync.Mutex895var watchMutex sync.Mutex
919var watchZooKeepers = make(map[uintptr]*ZooKeeper)896var watchConns = make(map[uintptr]*Conn)
920var watchCounter uintptr897var watchCounter uintptr
921var watchLoopCounter int898var watchLoopCounter int
922899
@@ -925,14 +902,14 @@
925// mostly as a debugging and testing aid.902// mostly as a debugging and testing aid.
926func CountPendingWatches() int {903func CountPendingWatches() int {
927 watchMutex.Lock()904 watchMutex.Lock()
928 count := len(watchZooKeepers)905 count := len(watchConns)
929 watchMutex.Unlock()906 watchMutex.Unlock()
930 return count907 return count
931}908}
932909
933// createWatch creates and registers a watch, returning the watch id910// createWatch creates and registers a watch, returning the watch id
934// and channel.911// and channel.
935func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {912func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
936 buf := 1 // session/watch event913 buf := 1 // session/watch event
937 if session {914 if session {
938 buf = 32915 buf = 32
@@ -942,8 +919,8 @@
942 defer watchMutex.Unlock()919 defer watchMutex.Unlock()
943 watchId = watchCounter920 watchId = watchCounter
944 watchCounter += 1921 watchCounter += 1
945 zk.watchChannels[watchId] = watchChannel922 conn.watchChannels[watchId] = watchChannel
946 watchZooKeepers[watchId] = zk923 watchConns[watchId] = conn
947 return924 return
948}925}
949926
@@ -951,21 +928,21 @@
951// from ever getting delivered. It shouldn't be used if there's any928// from ever getting delivered. It shouldn't be used if there's any
952// chance the watch channel is still visible and not closed, since929// chance the watch channel is still visible and not closed, since
953// it might mean a goroutine would be blocked forever.930// it might mean a goroutine would be blocked forever.
954func (zk *ZooKeeper) forgetWatch(watchId uintptr) {931func (conn *Conn) forgetWatch(watchId uintptr) {
955 watchMutex.Lock()932 watchMutex.Lock()
956 defer watchMutex.Unlock()933 defer watchMutex.Unlock()
957 zk.watchChannels[watchId] = nil, false934 conn.watchChannels[watchId] = nil, false
958 watchZooKeepers[watchId] = nil, false935 watchConns[watchId] = nil, false
959}936}
960937
961// closeAllWatches closes all watch channels for zk.938// closeAllWatches closes all watch channels for conn.
962func (zk *ZooKeeper) closeAllWatches() {939func (conn *Conn) closeAllWatches() {
963 watchMutex.Lock()940 watchMutex.Lock()
964 defer watchMutex.Unlock()941 defer watchMutex.Unlock()
965 for watchId, ch := range zk.watchChannels {942 for watchId, ch := range conn.watchChannels {
966 close(ch)943 close(ch)
967 zk.watchChannels[watchId] = nil, false944 conn.watchChannels[watchId] = nil, false
968 watchZooKeepers[watchId] = nil, false945 watchConns[watchId] = nil, false
969 }946 }
970}947}
971948
@@ -978,11 +955,11 @@
978 }955 }
979 watchMutex.Lock()956 watchMutex.Lock()
980 defer watchMutex.Unlock()957 defer watchMutex.Unlock()
981 zk, ok := watchZooKeepers[watchId]958 conn, ok := watchConns[watchId]
982 if !ok {959 if !ok {
983 return960 return
984 }961 }
985 if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId {962 if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
986 switch event.State {963 switch event.State {
987 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:964 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
988 default:965 default:
@@ -990,7 +967,7 @@
990 return967 return
991 }968 }
992 }969 }
993 ch := zk.watchChannels[watchId]970 ch := conn.watchChannels[watchId]
994 if ch == nil {971 if ch == nil {
995 return972 return
996 }973 }
@@ -1002,15 +979,15 @@
1002 // straight to the buffer), and the application isn't paying979 // straight to the buffer), and the application isn't paying
1003 // attention for long enough to have the buffer filled up.980 // attention for long enough to have the buffer filled up.
1004 // Break down now rather than leaking forever.981 // Break down now rather than leaking forever.
1005 if watchId == zk.sessionWatchId {982 if watchId == conn.sessionWatchId {
1006 panic("Session event channel buffer is full")983 panic("Session event channel buffer is full")
1007 } else {984 } else {
1008 panic("Watch event channel buffer is full")985 panic("Watch event channel buffer is full")
1009 }986 }
1010 }987 }
1011 if watchId != zk.sessionWatchId {988 if watchId != conn.sessionWatchId {
1012 zk.watchChannels[watchId] = nil, false989 conn.watchChannels[watchId] = nil, false
1013 watchZooKeepers[watchId] = nil, false990 watchConns[watchId] = nil, false
1014 close(ch)991 close(ch)
1015 }992 }
1016}993}
1017994
=== renamed file 'gozk_test.go' => 'zk_test.go'
--- gozk_test.go 2011-08-19 01:51:37 +0000
+++ zk_test.go 2011-10-03 17:02:23 +0000
@@ -1,17 +1,17 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/gozk/zk"
6 "time"6 "time"
7)7)
88
9// This error will be delivered via C errno, since ZK unfortunately9// This error will be delivered via C errno, since ZK unfortunately
10// only provides the handler back from zookeeper_init().10// only provides the handler back from zookeeper_init().
11func (s *S) TestInitErrorThroughErrno(c *C) {11func (s *S) TestInitErrorThroughErrno(c *C) {
12 zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)12 conn, watch, err := zk.Dial("bad-domain-without-port", 5e9)
13 if zk != nil {13 if conn != nil {
14 zk.Close()14 conn.Close()
15 }15 }
16 if watch != nil {16 if watch != nil {
17 go func() {17 go func() {
@@ -23,15 +23,15 @@
23 }23 }
24 }()24 }()
25 }25 }
26 c.Assert(zk, IsNil)26 c.Assert(conn, IsNil)
27 c.Assert(watch, IsNil)27 c.Assert(watch, IsNil)
28 c.Assert(err, Matches, "invalid argument")28 c.Assert(err, Matches, "invalid argument")
29}29}
3030
31func (s *S) TestRecvTimeoutInitParameter(c *C) {31func (s *S) TestRecvTimeoutInitParameter(c *C) {
32 zk, watch, err := gozk.Init(s.zkAddr, 0)32 conn, watch, err := zk.Dial(s.zkAddr, 0)
33 c.Assert(err, IsNil)33 c.Assert(err, IsNil)
34 defer zk.Close()34 defer conn.Close()
3535
36 select {36 select {
37 case <-watch:37 case <-watch:
@@ -40,7 +40,7 @@
40 }40 }
4141
42 for i := 0; i != 1000; i++ {42 for i := 0; i != 1000; i++ {
43 _, _, err := zk.Get("/zookeeper")43 _, _, err := conn.Get("/zookeeper")
44 if err != nil {44 if err != nil {
45 c.Assert(err, Matches, "operation timeout")45 c.Assert(err, Matches, "operation timeout")
46 c.SucceedNow()46 c.SucceedNow()
@@ -51,76 +51,79 @@
51}51}
5252
53func (s *S) TestSessionWatches(c *C) {53func (s *S) TestSessionWatches(c *C) {
54 c.Assert(gozk.CountPendingWatches(), Equals, 0)54 c.Assert(zk.CountPendingWatches(), Equals, 0)
5555
56 zk1, watch1 := s.init(c)56 zk1, watch1 := s.init(c)
57 zk2, watch2 := s.init(c)57 zk2, watch2 := s.init(c)
58 zk3, watch3 := s.init(c)58 zk3, watch3 := s.init(c)
5959
60 c.Assert(gozk.CountPendingWatches(), Equals, 3)60 c.Assert(zk.CountPendingWatches(), Equals, 3)
6161
62 event1 := <-watch162 event1 := <-watch1
63 c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)63 c.Assert(event1.Type, Equals, zk.EVENT_SESSION)
64 c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)64 c.Assert(event1.State, Equals, zk.STATE_CONNECTED)
6565
66 c.Assert(gozk.CountPendingWatches(), Equals, 3)66 c.Assert(zk.CountPendingWatches(), Equals, 3)
6767
68 event2 := <-watch268 event2 := <-watch2
69 c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)69 c.Assert(event2.Type, Equals, zk.EVENT_SESSION)
70 c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)70 c.Assert(event2.State, Equals, zk.STATE_CONNECTED)
7171
72 c.Assert(gozk.CountPendingWatches(), Equals, 3)72 c.Assert(zk.CountPendingWatches(), Equals, 3)
7373
74 event3 := <-watch374 event3 := <-watch3
75 c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)75 c.Assert(event3.Type, Equals, zk.EVENT_SESSION)
76 c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)76 c.Assert(event3.State, Equals, zk.STATE_CONNECTED)
7777
78 c.Assert(gozk.CountPendingWatches(), Equals, 3)78 c.Assert(zk.CountPendingWatches(), Equals, 3)
7979
80 zk1.Close()80 zk1.Close()
81 c.Assert(gozk.CountPendingWatches(), Equals, 2)81 c.Assert(zk.CountPendingWatches(), Equals, 2)
82 zk2.Close()82 zk2.Close()
83 c.Assert(gozk.CountPendingWatches(), Equals, 1)83 c.Assert(zk.CountPendingWatches(), Equals, 1)
84 zk3.Close()84 zk3.Close()
85 c.Assert(gozk.CountPendingWatches(), Equals, 0)85 c.Assert(zk.CountPendingWatches(), Equals, 0)
86}86}
8787
88// Gozk injects a STATE_CLOSED event when zk.Close() is called, right88// Gozk injects a STATE_CLOSED event when conn.Close() is called, right
89// before the channel is closed. Closing the channel injects a nil89// before the channel is closed. Closing the channel injects a nil
90// pointer, as usual for Go, so the STATE_CLOSED gives a chance to90// pointer, as usual for Go, so the STATE_CLOSED gives a chance to
91// know that a nil pointer is coming, and to stop the procedure.91// know that a nil pointer is coming, and to stop the procedure.
92// Hopefully this procedure will avoid some nil-pointer references by92// Hopefully this procedure will avoid some nil-pointer references by
93// mistake.93// mistake.
94func (s *S) TestClosingStateInSessionWatch(c *C) {94func (s *S) TestClosingStateInSessionWatch(c *C) {
95 zk, watch := s.init(c)95 conn, watch := s.init(c)
9696
97 event := <-watch97 event := <-watch
98 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)98 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
99 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)99 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
100100
101 zk.Close()101 conn.Close()
102 event, ok := <-watch102 event, ok := <-watch
103 c.Assert(ok, Equals, false)103 c.Assert(ok, Equals, false)
104 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)104 c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
105 c.Assert(event.State, Equals, gozk.STATE_CLOSED)105 c.Assert(event.State, Equals, zk.STATE_CLOSED)
106}106}
107107
108func (s *S) TestEventString(c *C) {108func (s *S) TestEventString(c *C) {
109 var event gozk.Event109 var event zk.Event
110 event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}110 event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED}
111 c.Assert(event, Matches, "ZooKeeper connected")111 c.Assert(event, Matches, "ZooKeeper connected")
112 event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}112 event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED}
113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")
114 event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}114 event = zk.Event{-1, "/path", zk.STATE_CLOSED}
115 c.Assert(event, Matches, "ZooKeeper connection closed")115 c.Assert(event, Matches, "ZooKeeper connection closed")
116}116}
117117
118var okTests = []struct{gozk.Event; Ok bool}{118var okTests = []struct {
119 {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},119 zk.Event
120 {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},120 Ok bool
121 {gozk.Event{0, "", gozk.STATE_CLOSED}, false},121}{
122 {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},122 {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true},
123 {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},123 {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true},
124 {zk.Event{0, "", zk.STATE_CLOSED}, false},
125 {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false},
126 {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false},
124}127}
125128
126func (s *S) TestEventOk(c *C) {129func (s *S) TestEventOk(c *C) {
@@ -130,9 +133,9 @@
130}133}
131134
132func (s *S) TestGetAndStat(c *C) {135func (s *S) TestGetAndStat(c *C) {
133 zk, _ := s.init(c)136 conn, _ := s.init(c)
134137
135 data, stat, err := zk.Get("/zookeeper")138 data, stat, err := conn.Get("/zookeeper")
136 c.Assert(err, IsNil)139 c.Assert(err, IsNil)
137 c.Assert(data, Equals, "")140 c.Assert(data, Equals, "")
138 c.Assert(stat.Czxid(), Equals, int64(0))141 c.Assert(stat.Czxid(), Equals, int64(0))
@@ -149,58 +152,58 @@
149}152}
150153
151func (s *S) TestGetAndError(c *C) {154func (s *S) TestGetAndError(c *C) {
152 zk, _ := s.init(c)155 conn, _ := s.init(c)
153156
154 data, stat, err := zk.Get("/non-existent")157 data, stat, err := conn.Get("/non-existent")
155158
156 c.Assert(data, Equals, "")159 c.Assert(data, Equals, "")
157 c.Assert(stat, IsNil)160 c.Assert(stat, IsNil)
158 c.Assert(err, Matches, "no node")161 c.Assert(err, Matches, "no node")
159 c.Assert(err.Code(), Equals, gozk.ZNONODE)162 c.Assert(err, Equals, zk.ZNONODE)
160}163}
161164
162func (s *S) TestCreateAndGet(c *C) {165func (s *S) TestCreateAndGet(c *C) {
163 zk, _ := s.init(c)166 conn, _ := s.init(c)
164167
165 path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))168 path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
166 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
167 c.Assert(path, Matches, "/test-[0-9]+")170 c.Assert(path, Matches, "/test-[0-9]+")
168171
169 // Check the error condition from Create().172 // Check the error condition from Create().
170 _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))173 _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
171 c.Assert(err, Matches, "node exists")174 c.Assert(err, Matches, "node exists")
172175
173 data, _, err := zk.Get(path)176 data, _, err := conn.Get(path)
174 c.Assert(err, IsNil)177 c.Assert(err, IsNil)
175 c.Assert(data, Equals, "bababum")178 c.Assert(data, Equals, "bababum")
176}179}
177180
178func (s *S) TestCreateSetAndGet(c *C) {181func (s *S) TestCreateSetAndGet(c *C) {
179 zk, _ := s.init(c)182 conn, _ := s.init(c)
180183
181 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))184 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
182 c.Assert(err, IsNil)185 c.Assert(err, IsNil)
183186
184 stat, err := zk.Set("/test", "bababum", -1) // Any version.187 stat, err := conn.Set("/test", "bababum", -1) // Any version.
185 c.Assert(err, IsNil)188 c.Assert(err, IsNil)
186 c.Assert(stat.Version(), Equals, int32(1))189 c.Assert(stat.Version(), Equals, int32(1))
187190
188 data, _, err := zk.Get("/test")191 data, _, err := conn.Get("/test")
189 c.Assert(err, IsNil)192 c.Assert(err, IsNil)
190 c.Assert(data, Equals, "bababum")193 c.Assert(data, Equals, "bababum")
191}194}
192195
193func (s *S) TestGetAndWatch(c *C) {196func (s *S) TestGetAndWatch(c *C) {
194 c.Check(gozk.CountPendingWatches(), Equals, 0)197 c.Check(zk.CountPendingWatches(), Equals, 0)
195198
196 zk, _ := s.init(c)199 conn, _ := s.init(c)
197200
198 c.Check(gozk.CountPendingWatches(), Equals, 1)201 c.Check(zk.CountPendingWatches(), Equals, 1)
199202
200 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))203 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
201 c.Assert(err, IsNil)204 c.Assert(err, IsNil)
202205
203 data, stat, watch, err := zk.GetW("/test")206 data, stat, watch, err := conn.GetW("/test")
204 c.Assert(err, IsNil)207 c.Assert(err, IsNil)
205 c.Assert(data, Equals, "one")208 c.Assert(data, Equals, "one")
206 c.Assert(stat.Version(), Equals, int32(0))209 c.Assert(stat.Version(), Equals, int32(0))
@@ -211,17 +214,17 @@
211 default:214 default:
212 }215 }
213216
214 c.Check(gozk.CountPendingWatches(), Equals, 2)217 c.Check(zk.CountPendingWatches(), Equals, 2)
215218
216 _, err = zk.Set("/test", "two", -1)219 _, err = conn.Set("/test", "two", -1)
217 c.Assert(err, IsNil)220 c.Assert(err, IsNil)
218221
219 event := <-watch222 event := <-watch
220 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)223 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
221224
222 c.Check(gozk.CountPendingWatches(), Equals, 1)225 c.Check(zk.CountPendingWatches(), Equals, 1)
223226
224 data, _, watch, err = zk.GetW("/test")227 data, _, watch, err = conn.GetW("/test")
225 c.Assert(err, IsNil)228 c.Assert(err, IsNil)
226 c.Assert(data, Equals, "two")229 c.Assert(data, Equals, "two")
227230
@@ -231,86 +234,86 @@
231 default:234 default:
232 }235 }
233236
234 c.Check(gozk.CountPendingWatches(), Equals, 2)237 c.Check(zk.CountPendingWatches(), Equals, 2)
235238
236 _, err = zk.Set("/test", "three", -1)239 _, err = conn.Set("/test", "three", -1)
237 c.Assert(err, IsNil)240 c.Assert(err, IsNil)
238241
239 event = <-watch242 event = <-watch
240 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)243 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
241244
242 c.Check(gozk.CountPendingWatches(), Equals, 1)245 c.Check(zk.CountPendingWatches(), Equals, 1)
243}246}
244247
245func (s *S) TestGetAndWatchWithError(c *C) {248func (s *S) TestGetAndWatchWithError(c *C) {
246 c.Check(gozk.CountPendingWatches(), Equals, 0)249 c.Check(zk.CountPendingWatches(), Equals, 0)
247250
248 zk, _ := s.init(c)251 conn, _ := s.init(c)
249252
250 c.Check(gozk.CountPendingWatches(), Equals, 1)253 c.Check(zk.CountPendingWatches(), Equals, 1)
251254
252 _, _, watch, err := zk.GetW("/test")255 _, _, watch, err := conn.GetW("/test")
253 c.Assert(err, NotNil)256 c.Assert(err, NotNil)
254 c.Assert(err.Code(), Equals, gozk.ZNONODE)257 c.Assert(err, Equals, zk.ZNONODE)
255 c.Assert(watch, IsNil)258 c.Assert(watch, IsNil)
256259
257 c.Check(gozk.CountPendingWatches(), Equals, 1)260 c.Check(zk.CountPendingWatches(), Equals, 1)
258}261}
259262
260func (s *S) TestCloseReleasesWatches(c *C) {263func (s *S) TestCloseReleasesWatches(c *C) {
261 c.Check(gozk.CountPendingWatches(), Equals, 0)264 c.Check(zk.CountPendingWatches(), Equals, 0)
262265
263 zk, _ := s.init(c)266 conn, _ := s.init(c)
264267
265 c.Check(gozk.CountPendingWatches(), Equals, 1)268 c.Check(zk.CountPendingWatches(), Equals, 1)
266269
267 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))270 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
268 c.Assert(err, IsNil)271 c.Assert(err, IsNil)
269272
270 _, _, _, err = zk.GetW("/test")273 _, _, _, err = conn.GetW("/test")
271 c.Assert(err, IsNil)274 c.Assert(err, IsNil)
272275
273 c.Assert(gozk.CountPendingWatches(), Equals, 2)276 c.Assert(zk.CountPendingWatches(), Equals, 2)
274277
275 zk.Close()278 conn.Close()
276279
277 c.Assert(gozk.CountPendingWatches(), Equals, 0)280 c.Assert(zk.CountPendingWatches(), Equals, 0)
278}281}
279282
280// By default, the ZooKeeper C client will hang indefinitely if a283// By default, the ZooKeeper C client will hang indefinitely if a
281// handler is closed twice. We get in the way and prevent it.284// handler is closed twice. We get in the way and prevent it.
282func (s *S) TestClosingTwiceDoesntHang(c *C) {285func (s *S) TestClosingTwiceDoesntHang(c *C) {
283 zk, _ := s.init(c)286 conn, _ := s.init(c)
284 err := zk.Close()287 err := conn.Close()
285 c.Assert(err, IsNil)288 c.Assert(err, IsNil)
286 err = zk.Close()289 err = conn.Close()
287 c.Assert(err, NotNil)290 c.Assert(err, NotNil)
288 c.Assert(err.Code(), Equals, gozk.ZCLOSING)291 c.Assert(err, Equals, zk.ZCLOSING)
289}292}
290293
291func (s *S) TestChildren(c *C) {294func (s *S) TestChildren(c *C) {
292 zk, _ := s.init(c)295 conn, _ := s.init(c)
293296
294 children, stat, err := zk.Children("/")297 children, stat, err := conn.Children("/")
295 c.Assert(err, IsNil)298 c.Assert(err, IsNil)
296 c.Assert(children, Equals, []string{"zookeeper"})299 c.Assert(children, Equals, []string{"zookeeper"})
297 c.Assert(stat.NumChildren(), Equals, int32(1))300 c.Assert(stat.NumChildren(), Equals, int32(1))
298301
299 children, stat, err = zk.Children("/non-existent")302 children, stat, err = conn.Children("/non-existent")
300 c.Assert(err, NotNil)303 c.Assert(err, NotNil)
301 c.Assert(err.Code(), Equals, gozk.ZNONODE)304 c.Assert(err, Equals, zk.ZNONODE)
302 c.Assert(children, Equals, []string{})305 c.Assert(children, Equals, []string{})
303 c.Assert(stat, Equals, nil)306 c.Assert(stat, IsNil)
304}307}
305308
306func (s *S) TestChildrenAndWatch(c *C) {309func (s *S) TestChildrenAndWatch(c *C) {
307 c.Check(gozk.CountPendingWatches(), Equals, 0)310 c.Check(zk.CountPendingWatches(), Equals, 0)
308311
309 zk, _ := s.init(c)312 conn, _ := s.init(c)
310313
311 c.Check(gozk.CountPendingWatches(), Equals, 1)314 c.Check(zk.CountPendingWatches(), Equals, 1)
312315
313 children, stat, watch, err := zk.ChildrenW("/")316 children, stat, watch, err := conn.ChildrenW("/")
314 c.Assert(err, IsNil)317 c.Assert(err, IsNil)
315 c.Assert(children, Equals, []string{"zookeeper"})318 c.Assert(children, Equals, []string{"zookeeper"})
316 c.Assert(stat.NumChildren(), Equals, int32(1))319 c.Assert(stat.NumChildren(), Equals, int32(1))
@@ -321,18 +324,18 @@
321 default:324 default:
322 }325 }
323326
324 c.Check(gozk.CountPendingWatches(), Equals, 2)327 c.Check(zk.CountPendingWatches(), Equals, 2)
325328
326 _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))329 _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
327 c.Assert(err, IsNil)330 c.Assert(err, IsNil)
328331
329 event := <-watch332 event := <-watch
330 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)333 c.Assert(event.Type, Equals, zk.EVENT_CHILD)
331 c.Assert(event.Path, Equals, "/")334 c.Assert(event.Path, Equals, "/")
332335
333 c.Check(gozk.CountPendingWatches(), Equals, 1)336 c.Check(zk.CountPendingWatches(), Equals, 1)
334337
335 children, stat, watch, err = zk.ChildrenW("/")338 children, stat, watch, err = conn.ChildrenW("/")
336 c.Assert(err, IsNil)339 c.Assert(err, IsNil)
337 c.Assert(stat.NumChildren(), Equals, int32(2))340 c.Assert(stat.NumChildren(), Equals, int32(2))
338341
@@ -345,57 +348,56 @@
345 default:348 default:
346 }349 }
347350
348 c.Check(gozk.CountPendingWatches(), Equals, 2)351 c.Check(zk.CountPendingWatches(), Equals, 2)
349352
350 _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))353 _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
351 c.Assert(err, IsNil)354 c.Assert(err, IsNil)
352355
353 event = <-watch356 event = <-watch
354 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)357 c.Assert(event.Type, Equals, zk.EVENT_CHILD)
355358
356 c.Check(gozk.CountPendingWatches(), Equals, 1)359 c.Check(zk.CountPendingWatches(), Equals, 1)
357}360}
358361
359func (s *S) TestChildrenAndWatchWithError(c *C) {362func (s *S) TestChildrenAndWatchWithError(c *C) {
360 c.Check(gozk.CountPendingWatches(), Equals, 0)363 c.Check(zk.CountPendingWatches(), Equals, 0)
361364
362 zk, _ := s.init(c)365 conn, _ := s.init(c)
363366
364 c.Check(gozk.CountPendingWatches(), Equals, 1)367 c.Check(zk.CountPendingWatches(), Equals, 1)
365368
366 _, stat, watch, err := zk.ChildrenW("/test")369 _, stat, watch, err := conn.ChildrenW("/test")
367 c.Assert(err, NotNil)370 c.Assert(err, NotNil)
368 c.Assert(err.Code(), Equals, gozk.ZNONODE)371 c.Assert(err, Equals, zk.ZNONODE)
369 c.Assert(watch, IsNil)372 c.Assert(watch, IsNil)
370 c.Assert(stat, IsNil)373 c.Assert(stat, IsNil)
371374
372 c.Check(gozk.CountPendingWatches(), Equals, 1)375 c.Check(zk.CountPendingWatches(), Equals, 1)
373}376}
374377
375func (s *S) TestExists(c *C) {378func (s *S) TestExists(c *C) {
376 zk, _ := s.init(c)379 conn, _ := s.init(c)
377380
378 stat, err := zk.Exists("/zookeeper")381 stat, err := conn.Exists("/non-existent")
379 c.Assert(err, IsNil)
380 c.Assert(stat.NumChildren(), Equals, int32(1))
381
382 stat, err = zk.Exists("/non-existent")
383 c.Assert(err, IsNil)382 c.Assert(err, IsNil)
384 c.Assert(stat, IsNil)383 c.Assert(stat, IsNil)
384
385 stat, err = conn.Exists("/zookeeper")
386 c.Assert(err, IsNil)
385}387}
386388
387func (s *S) TestExistsAndWatch(c *C) {389func (s *S) TestExistsAndWatch(c *C) {
388 c.Check(gozk.CountPendingWatches(), Equals, 0)390 c.Check(zk.CountPendingWatches(), Equals, 0)
389391
390 zk, _ := s.init(c)392 conn, _ := s.init(c)
391393
392 c.Check(gozk.CountPendingWatches(), Equals, 1)394 c.Check(zk.CountPendingWatches(), Equals, 1)
393395
394 stat, watch, err := zk.ExistsW("/test")396 stat, watch, err := conn.ExistsW("/test")
395 c.Assert(err, IsNil)397 c.Assert(err, IsNil)
396 c.Assert(stat, IsNil)398 c.Assert(stat, IsNil)
397399
398 c.Check(gozk.CountPendingWatches(), Equals, 2)400 c.Check(zk.CountPendingWatches(), Equals, 2)
399401
400 select {402 select {
401 case <-watch:403 case <-watch:
@@ -403,62 +405,62 @@
403 default:405 default:
404 }406 }
405407
406 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))408 _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
407 c.Assert(err, IsNil)409 c.Assert(err, IsNil)
408410
409 event := <-watch411 event := <-watch
410 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)412 c.Assert(event.Type, Equals, zk.EVENT_CREATED)
411 c.Assert(event.Path, Equals, "/test")413 c.Assert(event.Path, Equals, "/test")
412414
413 c.Check(gozk.CountPendingWatches(), Equals, 1)415 c.Check(zk.CountPendingWatches(), Equals, 1)
414416
415 stat, watch, err = zk.ExistsW("/test")417 stat, watch, err = conn.ExistsW("/test")
416 c.Assert(err, IsNil)418 c.Assert(err, IsNil)
417 c.Assert(stat, NotNil)419 c.Assert(stat, NotNil)
418 c.Assert(stat.NumChildren(), Equals, int32(0))420 c.Assert(stat.NumChildren(), Equals, int32(0))
419421
420 c.Check(gozk.CountPendingWatches(), Equals, 2)422 c.Check(zk.CountPendingWatches(), Equals, 2)
421}423}
422424
423func (s *S) TestExistsAndWatchWithError(c *C) {425func (s *S) TestExistsAndWatchWithError(c *C) {
424 c.Check(gozk.CountPendingWatches(), Equals, 0)426 c.Check(zk.CountPendingWatches(), Equals, 0)
425427
426 zk, _ := s.init(c)428 conn, _ := s.init(c)
427429
428 c.Check(gozk.CountPendingWatches(), Equals, 1)430 c.Check(zk.CountPendingWatches(), Equals, 1)
429431
430 stat, watch, err := zk.ExistsW("///")432 stat, watch, err := conn.ExistsW("///")
431 c.Assert(err, NotNil)433 c.Assert(err, NotNil)
432 c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)434 c.Assert(err, Equals, zk.ZBADARGUMENTS)
433 c.Assert(stat, IsNil)435 c.Assert(stat, IsNil)
434 c.Assert(watch, IsNil)436 c.Assert(watch, IsNil)
435437
436 c.Check(gozk.CountPendingWatches(), Equals, 1)438 c.Check(zk.CountPendingWatches(), Equals, 1)
437}439}
438440
439func (s *S) TestDelete(c *C) {441func (s *S) TestDelete(c *C) {
440 zk, _ := s.init(c)442 conn, _ := s.init(c)
441443
442 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))444 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
443 c.Assert(err, IsNil)445 c.Assert(err, IsNil)
444446
445 err = zk.Delete("/test", 5)447 err = conn.Delete("/test", 5)
446 c.Assert(err, NotNil)448 c.Assert(err, NotNil)
447 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)449 c.Assert(err, Equals, zk.ZBADVERSION)
448450
449 err = zk.Delete("/test", -1)451 err = conn.Delete("/test", -1)
450 c.Assert(err, IsNil)452 c.Assert(err, IsNil)
451453
452 err = zk.Delete("/test", -1)454 err = conn.Delete("/test", -1)
453 c.Assert(err, NotNil)455 c.Assert(err, NotNil)
454 c.Assert(err.Code(), Equals, gozk.ZNONODE)456 c.Assert(err, Equals, zk.ZNONODE)
455}457}
456458
457func (s *S) TestClientIdAndReInit(c *C) {459func (s *S) TestClientIdAndReInit(c *C) {
458 zk1, _ := s.init(c)460 zk1, _ := s.init(c)
459 clientId1 := zk1.ClientId()461 clientId1 := zk1.ClientId()
460462
461 zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)463 zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1)
462 c.Assert(err, IsNil)464 c.Assert(err, IsNil)
463 defer zk2.Close()465 defer zk2.Close()
464 clientId2 := zk2.ClientId()466 clientId2 := zk2.ClientId()
@@ -469,110 +471,114 @@
469// Surprisingly for some (including myself, initially), the watch471// Surprisingly for some (including myself, initially), the watch
470// returned by the exists method actually fires on data changes too.472// returned by the exists method actually fires on data changes too.
471func (s *S) TestExistsWatchOnDataChange(c *C) {473func (s *S) TestExistsWatchOnDataChange(c *C) {
472 zk, _ := s.init(c)474 conn, _ := s.init(c)
473475
474 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))476 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
475 c.Assert(err, IsNil)477 c.Assert(err, IsNil)
476478
477 _, watch, err := zk.ExistsW("/test")479 _, watch, err := conn.ExistsW("/test")
478 c.Assert(err, IsNil)480 c.Assert(err, IsNil)
479481
480 _, err = zk.Set("/test", "new", -1)482 _, err = conn.Set("/test", "new", -1)
481 c.Assert(err, IsNil)483 c.Assert(err, IsNil)
482484
483 event := <-watch485 event := <-watch
484486
485 c.Assert(event.Path, Equals, "/test")487 c.Assert(event.Path, Equals, "/test")
486 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)488 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
487}489}
488490
489func (s *S) TestACL(c *C) {491func (s *S) TestACL(c *C) {
490 zk, _ := s.init(c)492 conn, _ := s.init(c)
491493
492 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))494 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
493 c.Assert(err, IsNil)495 c.Assert(err, IsNil)
494496
495 acl, stat, err := zk.ACL("/test")497 acl, stat, err := conn.ACL("/test")
496 c.Assert(err, IsNil)498 c.Assert(err, IsNil)
497 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))499 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
498 c.Assert(stat, NotNil)500 c.Assert(stat, NotNil)
499 c.Assert(stat.Version(), Equals, int32(0))501 c.Assert(stat.Version(), Equals, int32(0))
500502
501 acl, stat, err = zk.ACL("/non-existent")503 acl, stat, err = conn.ACL("/non-existent")
502 c.Assert(err, NotNil)504 c.Assert(err, NotNil)
503 c.Assert(err.Code(), Equals, gozk.ZNONODE)505 c.Assert(err, Equals, zk.ZNONODE)
504 c.Assert(acl, IsNil)506 c.Assert(acl, IsNil)
505 c.Assert(stat, IsNil)507 c.Assert(stat, IsNil)
506}508}
507509
508func (s *S) TestSetACL(c *C) {510func (s *S) TestSetACL(c *C) {
509 zk, _ := s.init(c)511 conn, _ := s.init(c)
510512
511 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))513 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
512 c.Assert(err, IsNil)514 c.Assert(err, IsNil)
513515
514 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)516 err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5)
515 c.Assert(err, NotNil)517 c.Assert(err, NotNil)
516 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)518 c.Assert(err, Equals, zk.ZBADVERSION)
517519
518 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)520 err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1)
519 c.Assert(err, IsNil)521 c.Assert(err, IsNil)
520522
521 acl, _, err := zk.ACL("/test")523 acl, _, err := conn.ACL("/test")
522 c.Assert(err, IsNil)524 c.Assert(err, IsNil)
523 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))525 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
524}526}
525527
526func (s *S) TestAddAuth(c *C) {528func (s *S) TestAddAuth(c *C) {
527 zk, _ := s.init(c)529 conn, _ := s.init(c)
528530
529 acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}531 acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
530532
531 _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)533 _, err := conn.Create("/test", "", zk.EPHEMERAL, acl)
532 c.Assert(err, IsNil)534 c.Assert(err, IsNil)
533535
534 _, _, err = zk.Get("/test")536 _, _, err = conn.Get("/test")
535 c.Assert(err, NotNil)537 c.Assert(err, NotNil)
536 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)538 c.Assert(err, Equals, zk.ZNOAUTH)
537539
538 err = zk.AddAuth("digest", "joe:passwd")540 err = conn.AddAuth("digest", "joe:passwd")
539 c.Assert(err, IsNil)541 c.Assert(err, IsNil)
540542
541 _, _, err = zk.Get("/test")543 _, _, err = conn.Get("/test")
542 c.Assert(err, IsNil)544 c.Assert(err, IsNil)
543}545}
544546
545func (s *S) TestWatchOnReconnection(c *C) {547func (s *S) TestWatchOnReconnection(c *C) {
546 c.Check(gozk.CountPendingWatches(), Equals, 0)548 c.Check(zk.CountPendingWatches(), Equals, 0)
547549
548 zk, session := s.init(c)550 conn, session := s.init(c)
549551
550 event := <-session552 event := <-session
551 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)553 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
552 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)554 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
553555
554 c.Check(gozk.CountPendingWatches(), Equals, 1)556 c.Check(zk.CountPendingWatches(), Equals, 1)
555557
556 stat, watch, err := zk.ExistsW("/test")558 stat, watch, err := conn.ExistsW("/test")
557 c.Assert(err, IsNil)559 c.Assert(err, IsNil)
558 c.Assert(stat, IsNil)560 c.Assert(stat, IsNil)
559561
560 c.Check(gozk.CountPendingWatches(), Equals, 2)562 c.Check(zk.CountPendingWatches(), Equals, 2)
561563
562 s.StopZK()564 err = s.zkServer.Stop()
565 c.Assert(err, IsNil)
566
563 time.Sleep(2e9)567 time.Sleep(2e9)
564 s.StartZK()
565568
566 // The session channel should receive the reconnection notification,569 err = s.zkServer.Start()
570 c.Assert(err, IsNil)
571
572 // The session channel should receive the reconnection notification.
567 select {573 select {
568 case event := <-session:574 case event := <-session:
569 c.Assert(event.State, Equals, gozk.STATE_CONNECTING)575 c.Assert(event.State, Equals, zk.STATE_CONNECTING)
570 case <-time.After(3e9):576 case <-time.After(3e9):
571 c.Fatal("Session watch didn't fire")577 c.Fatal("Session watch didn't fire")
572 }578 }
573 select {579 select {
574 case event := <-session:580 case event := <-session:
575 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)581 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
576 case <-time.After(3e9):582 case <-time.After(3e9):
577 c.Fatal("Session watch didn't fire")583 c.Fatal("Session watch didn't fire")
578 }584 }
@@ -585,40 +591,40 @@
585 }591 }
586592
587 // And it should still work.593 // And it should still work.
588 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))594 _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
589 c.Assert(err, IsNil)595 c.Assert(err, IsNil)
590596
591 event = <-watch597 event = <-watch
592 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)598 c.Assert(event.Type, Equals, zk.EVENT_CREATED)
593 c.Assert(event.Path, Equals, "/test")599 c.Assert(event.Path, Equals, "/test")
594600
595 c.Check(gozk.CountPendingWatches(), Equals, 1)601 c.Check(zk.CountPendingWatches(), Equals, 1)
596}602}
597603
598func (s *S) TestWatchOnSessionExpiration(c *C) {604func (s *S) TestWatchOnSessionExpiration(c *C) {
599 c.Check(gozk.CountPendingWatches(), Equals, 0)605 c.Check(zk.CountPendingWatches(), Equals, 0)
600606
601 zk, session := s.init(c)607 conn, session := s.init(c)
602608
603 event := <-session609 event := <-session
604 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)610 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
605 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)611 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
606612
607 c.Check(gozk.CountPendingWatches(), Equals, 1)613 c.Check(zk.CountPendingWatches(), Equals, 1)
608614
609 stat, watch, err := zk.ExistsW("/test")615 stat, watch, err := conn.ExistsW("/test")
610 c.Assert(err, IsNil)616 c.Assert(err, IsNil)
611 c.Assert(stat, IsNil)617 c.Assert(stat, IsNil)
612618
613 c.Check(gozk.CountPendingWatches(), Equals, 2)619 c.Check(zk.CountPendingWatches(), Equals, 2)
614620
615 // Use expiration trick described in the FAQ.621 // Use expiration trick described in the FAQ.
616 clientId := zk.ClientId()622 clientId := conn.ClientId()
617 zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)623 zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId)
618624
619 for event := range session2 {625 for event := range session2 {
620 c.Log("Event from overlapping session: ", event)626 c.Log("Event from overlapping session: ", event)
621 if event.State == gozk.STATE_CONNECTED {627 if event.State == zk.STATE_CONNECTED {
622 // Wait for zk to process the connection.628 // Wait for zk to process the connection.
623 // Not reliable without this. :-(629 // Not reliable without this. :-(
624 time.Sleep(1e9)630 time.Sleep(1e9)
@@ -627,21 +633,21 @@
627 }633 }
628 for event := range session {634 for event := range session {
629 c.Log("Event from primary session: ", event)635 c.Log("Event from primary session: ", event)
630 if event.State == gozk.STATE_EXPIRED_SESSION {636 if event.State == zk.STATE_EXPIRED_SESSION {
631 break637 break
632 }638 }
633 }639 }
634640
635 select {641 select {
636 case event := <-watch:642 case event := <-watch:
637 c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)643 c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
638 case <-time.After(3e9):644 case <-time.After(3e9):
639 c.Fatal("Watch event didn't fire")645 c.Fatal("Watch event didn't fire")
640 }646 }
641647
642 event = <-watch648 event = <-watch
643 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)649 c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
644 c.Assert(event.State, Equals, gozk.STATE_CLOSED)650 c.Assert(event.State, Equals, zk.STATE_CLOSED)
645651
646 c.Check(gozk.CountPendingWatches(), Equals, 1)652 c.Check(zk.CountPendingWatches(), Equals, 1)
647}653}

Subscribers

People subscribed via source and target branches

to all changes: