Merge lp:~rogpeppe/gozk/update-server-interface into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Merged
Merge reported by: Gustavo Niemeyer
Merged at revision: not available
Proposed branch: lp:~rogpeppe/gozk/update-server-interface
Merge into: lp:~juju/gozk/trunk
Diff against target: 3053 lines (+1235/-782)
9 files modified
Makefile (+6/-2)
example/example.go (+22/-23)
reattach_test.go (+198/-0)
retry_test.go (+65/-74)
runserver.go (+168/-0)
server.go (+229/-0)
suite_test.go (+42/-121)
zk.go (+252/-311)
zk_test.go (+253/-251)
To merge this branch: bzr merge lp:~rogpeppe/gozk/update-server-interface
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Needs Fixing
Review via email: mp+77009@code.launchpad.net

Description of the change

Updated server API.

To post a comment you must log in.
24. By Roger Peppe

Fix zookeeper spelling

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Great stuff here Rog, thanks!

Some comments:

[1]

-// The ZooKepeer installation directory is specified by installedDir.
+// The ZooKeeper installation directory is specified by installDir.

installDir feels like a strange name in this context. I agree installedDir
isn't great either, but the new one gives a wrong impression about what
it means.. we're not installing anything there.

Let's use something like zkDir instead.

[2]

+func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
(...)
+func AttachServer(runDir string) (*Server, os.Error) {

The organization inside these is looking very nice.

[3]

+func (srv *Server) getServerProcess() (*os.Process, os.Error) {
(...)
+ return os.FindProcess(pid)

FindProcess does nothing on Unix. We can run at least a Signal 0 against
the pid to tell if it exists or not.

[4]

+ if p != nil {
+ p.Release()
+ }
+ return fmt.Errorf("ZooKeeper server may already be running (remove %q to clear)", srv.path("pid.txt"))

The recommendation here is a bit strange. If the process is actually running,
removing the pid file will allow a second server to be started in parallel,
which is not nice.

I suggest just saying "ZooKeeper server seems to be running already", which
is a bit more true given that we'll be checking the pid as per the point above.
Then, the developer can run Stop() on it if he feels like stopping it, or
do whatever else out of band if he doesn't want to use the interface, but
then it's his responsibility to check for background processes, etc.

[5]

+// Stop kills the ZooKeeper server. It is a no-op if it is already running.

The second part is reversed, and reads a bit hard.

Suggestion: It does nothing if not running.

Also, please mention that the data is preserved, and that the server can
be restarted. "kill" may feel like otherwise.

[6]

+// Destroy stops the ZooKeeper server, and then removes its run
+// directory and all its contents.

s/and all its contents././

Removing the directory means removing its contents necessarily.

[7]

+func (srv *Server) writeInstallDir() os.Error {
+ return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)

These functions and filename should be renamed in sync to the first point.

[8]

+ if data[len(data)-1] == '\n' {
+ data = data[0 : len(data)-1]
+ }
+ srv.installDir = string(data)

srv.zkDir = string(bytes.TrimSpace(data))

[9]

+ s.zkTestRoot = c.MkDir() + "/zk"
+ port := 21812
+ s.zkAddr = fmt.Sprint("localhost:", port)
+
+ s.zkServer, err = zk.CreateServer(port, s.zkTestRoot, "")
  if err != nil {
   c.Fatal("Cannot set up server environment: ", err)
  }
- s.StartZK(c)
+ err = s.zkServer.Start()
+ if err != nil {
+ c.Fatal("Cannot start ZooKeeper server: ", err)
+ }

This is awesome! Thanks!

review: Approve
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

[10]

+// exist, it assumes the server is not running and returns (nil, nil).

This interface is strange, and can easily lead to bugs since it's not an idiom in Go.
I'd never assume p is nil if err == nil. Please return an error if p == nil.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

On 5 October 2011 15:34, Gustavo Niemeyer <email address hidden> wrote:
> Review: Approve
>
> Great stuff here Rog, thanks!
>
> Some comments:
>
>
> [1]
>
> -// The ZooKepeer installation directory is specified by installedDir.
> +// The ZooKeeper installation directory is specified by installDir.
>
> installDir feels like a strange name in this context. I agree installedDir
> isn't great either, but the new one gives a wrong impression about what
> it means.. we're not installing anything there.
>
> Let's use something like zkDir instead.

done.

> +func (srv *Server) getServerProcess() (*os.Process, os.Error) {
> (...)
> +       return os.FindProcess(pid)
>
> FindProcess does nothing on Unix.  We can run at least a Signal 0 against
> the pid to tell if it exists or not.

ok, as discussed i'll just not bother about the race.

> [5]
>
> +// Stop kills the ZooKeeper server. It is a no-op if it is already running.
>
> The second part is reversed, and reads a bit hard.
>
> Suggestion: It does nothing if not running.

better. my text was just wrong. done.
>
> Also, please mention that the data is preserved, and that the server can
> be restarted. "kill" may feel like otherwise.

done.

>
>
> [6]
>
> +// Destroy stops the ZooKeeper server, and then removes its run
> +// directory and all its contents.
>
> s/and all its contents././
>
> Removing the directory means removing its contents necessarily.

done.

> [7]
>
> +func (srv *Server) writeInstallDir() os.Error {
> +       return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)
>
> These functions and filename should be renamed in sync to the first point.

i'm not sure what you mean here.

> +       if data[len(data)-1] == '\n' {
> +               data = data[0 : len(data)-1]
> +       }
> +       srv.installDir = string(data)
>
> srv.zkDir = string(bytes.TrimSpace(data))

really? this code will have written the data inside the file, so there
should be no
extra space added. it's legal (ok it's pretty unusual) for a file name to
end in a space, and calling TrimSpace would break that.

25. By Roger Peppe

Added reattach_test.go to test server reattaching.
Service-related methods now get their own file.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

On 5 October 2011 15:49, Gustavo Niemeyer <email address hidden> wrote:
> [10]
>
>
> +// exist, it assumes the server is not running and returns (nil, nil).
>
> This interface is strange, and can easily lead to bugs since it's not an idiom in Go.
> I'd never assume p is nil if err == nil. Please return an error if p == nil.

i've changed this, but for reference, i've just noticed a similar example
in your code, in Exists:

 // We diverge a bit from the usual here: a ZNONODE is not an error
 // for an exists call, otherwise every Exists call would have to check
 // for err != nil and err.Code() != ZNONODE.

it's for a very similar reason that i returned a nil process and nil error from
Process when there was no process to be found.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (3.3 KiB)

Good changes overall, thanks. Some follow ups:

[7]

You've renamed installDir to zkDir. Let's rename functions and filenames
to match that as well (zkdir.txt, writeZkDir, etc)

[8]

Ok, but this logic is wrong because it doesn't check the length and could
panic.

Alright, so instead of doing either of those things, please just don't append
a '\n' at the end, so we can just string(data) straight. This is already
being done for pid.txt, and sounds like a sane approach.

[10]

Indeed, but that feels like a slightly different case because the method
is already a question (does it exist?), so the fact it doesn't exist
is an answer to that question rather than an error.

If the method was named something like Stat, it should definitely be an
error rather than simply an empty stat.

[11]

+// ServerCommand returns the command used to start the
+// ZooKeeper server. It is provided for debugging and testing
+// purposes only.
+func (srv *Server) command() ([]string, os.Error) {

s/ServerCommand/command/
s/It is provided.*//

[12]

+// NetworkPort returns the TCP port number that
+// the server is configured for.

s/NetworkPort/networkPort/

[13]

+// This file defines methods on Server that deal with starting
+// and stopping the ZooKeeper service. They are independent of ZooKeeper
+// itself, and may be factored out at a later date.

Please merge this on server.go. As pointed out in the other review and obvious
through out the code, this is really specific to ZooKeeper. If/when we do
refactor this out to generalize, we'll have to reorganize anyway.

[14]

+var ErrNotRunning = os.NewError("process not running")

I'm not a big fan of the Err* convention. Makes for an ugly name with typing
information embedded on it. Some packages do use it, but thankfully it's
also not strictly followed in the Go standard library either.

I'd appreciate renaming this to NotRunning instead. zk.NotRunning seems
pretty good.

[15]

+// Process returns a Process referring to the running server from
+// where it's been stored in pid.txt. If the file does not
+// exist, or it cannot find the process, it returns the error
+// ErrNotRunning.

Where the pid is stored is an implementation detail that can
change. We don't have to document that for now.

[16]

+// getProcess gets a Process from a pid and check that the

s/check/checks/

[17]

+ if e, ok := err.(*os.SyscallError); ok && e.Errno == os.ECHILD || err == os.ECHILD {

Ugh. I hope we get that cleaned in Go itself with the suggested changes for 1.

[18]

+ // is to poll until it exits. If the process has taken longer than
+ // a second to exit, then it's probably not going to.

This is very optimistic. A process can easily take more than a second to exit. If
this is an edge case and there's a reason to wait at all (dir content, etc), then
let's wait a bit longer, like 10 seconds for instance.

[19]

+// cases to test:
+// child server, stopped normally; reattach, start
+// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
+// non-direct child server, still running; reattach, start (->error), stop, start
+// child server, still running; reattach,...

Read more...

review: Needs Fixing
Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (3.3 KiB)

PTAL

On 7 October 2011 16:58, Gustavo Niemeyer <email address hidden> wrote:
> [7]
>
> You've renamed installDir to zkDir. Let's rename functions and filenames
> to match that as well (zkdir.txt, writeZkDir, etc)

done

>
> [8]
>
> Ok, but this logic is wrong because it doesn't check the length and could
> panic.
>
> Alright, so instead of doing either of those things, please just don't append
> a '\n' at the end, so we can just string(data) straight. This is already
> being done for pid.txt, and sounds like a sane approach.

done

> [11]
>
> +// ServerCommand returns the command used to start the
> +// ZooKeeper server. It is provided for debugging and testing
> +// purposes only.
> +func (srv *Server) command() ([]string, os.Error) {
>
> s/ServerCommand/command/
> s/It is provided.*//

given that it's not external any more, that "debugging and testing"
remark isn't necessary now.

> [12]
>
> +// NetworkPort returns the TCP port number that
> +// the server is configured for.
>
> s/NetworkPort/networkPort/
>
> [13]
>
> +// This file defines methods on Server that deal with starting
> +// and stopping the ZooKeeper service. They are independent of ZooKeeper
> +// itself, and may be factored out at a later date.
>
> Please merge this on server.go. As pointed out in the other review and obvious
> through out the code, this is really specific to ZooKeeper. If/when we do
> refactor this out to generalize, we'll have to reorganize anyway.

i can do this, but given that the dependencies of the two files
are mostly different (only fmt, ioutil and os in common), and there's
very little interlink between the two, i think it makes sense to keep
them as two different files - i think it makes the code easier to read.
perhaps "runserver.go" might be a better name than "service.go" though.
i've done that for the time being. that said, it's no big deal.

> [14]
>
> +var ErrNotRunning = os.NewError("process not running")
>
> I'm not a big fan of the Err* convention. Makes for an ugly name with typing
> information embedded on it. Some packages do use it, but thankfully it's
> also not strictly followed in the Go standard library either.
>
> I'd appreciate renaming this to NotRunning instead. zk.NotRunning seems
> pretty good.

ok.

>
> [15]
>
> +// Process returns a Process referring to the running server from
> +// where it's been stored in pid.txt. If the file does not
> +// exist, or it cannot find the process, it returns the error
> +// ErrNotRunning.
>
> Where the pid is stored is an implementation detail that can
> change. We don't have to document that for now.
>
> [16]
>
> +// getProcess gets a Process from a pid and check that the
>
> s/check/checks/

done

> [18]
>
> +               // is to poll until it exits. If the process has taken longer than
> +               // a second to exit, then it's probably not going to.
>
> This is very optimistic. A process can easily take more than a second to exit. If
> this is an edge case and there's a reason to wait at all (dir content, etc), then
> let's wait a bit longer, like 10 seconds for instance.

ok. given that we're killing the server with SIGKILL, it's not going to
have any chance to take ages to c...

Read more...

26. By Roger Peppe

Fixes as per review.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile'
--- Makefile 2011-08-03 01:56:58 +0000
+++ Makefile 2011-10-10 13:55:28 +0000
@@ -2,10 +2,14 @@
22
3all: package3all: package
44
5TARG=gozk5TARG=launchpad.net/gozk/zk
66
7GOFILES=\
8 server.go\
9 runserver.go\
10
7CGOFILES=\11CGOFILES=\
8 gozk.go\12 zk.go\
913
10CGO_OFILES=\14CGO_OFILES=\
11 helpers.o\15 helpers.o\
1216
=== added directory 'example'
=== renamed file 'example.go' => 'example/example.go'
--- example.go 2011-08-03 01:47:25 +0000
+++ example/example.go 2011-10-10 13:55:28 +0000
@@ -1,30 +1,29 @@
1package main1package main
22
3import (3import (
4 "gozk"4 "launchpad.net/zookeeper/zookeeper"
5)5)
66
7func main() {7func main() {
8 zk, session, err := gozk.Init("localhost:2181", 5000)8 zk, session, err := zookeeper.Init("localhost:2181", 5000)
9 if err != nil {9 if err != nil {
10 println("Couldn't connect: " + err.String())10 println("Couldn't connect: " + err.String())
11 return11 return
12 }12 }
1313
14 defer zk.Close()14 defer zk.Close()
1515
16 // Wait for connection.16 // Wait for connection.
17 event := <-session17 event := <-session
18 if event.State != gozk.STATE_CONNECTED {18 if event.State != zookeeper.STATE_CONNECTED {
19 println("Couldn't connect")19 println("Couldn't connect")
20 return20 return
21 }21 }
2222
23 _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))23 _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
24 if err != nil {24 if err != nil {
25 println(err.String())25 println(err.String())
26 } else {26 } else {
27 println("Created!")27 println("Created!")
28 }28 }
29}29}
30
3130
=== added file 'reattach_test.go'
--- reattach_test.go 1970-01-01 00:00:00 +0000
+++ reattach_test.go 2011-10-10 13:55:28 +0000
@@ -0,0 +1,198 @@
1package zk_test
2
3import (
4 "bufio"
5 . "launchpad.net/gocheck"
6 "launchpad.net/gozk/zk"
7 "exec"
8 "flag"
9 "fmt"
10 "os"
11 "strings"
12 "testing"
13 "time"
14)
15
16var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing")
17var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing")
18var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing")
19
20// This is the reentrancy point for testing ZooKeeper servers
21// started by processes that are not direct children of the
22// testing process. This test always succeeds - the status
23// will be written to stdout and read by indirectServer.
24func TestStartNonChildServer(t *testing.T) {
25 if !*reattach {
26 // not re-entrant, so ignore this test.
27 return
28 }
29 err := startServer(*reattachRunDir, *reattachAbnormalStop)
30 if err != nil {
31 fmt.Printf("error:%v\n", err)
32 return
33 }
34 fmt.Printf("done\n")
35}
36
37func (s *S) startServer(c *C, abort bool) {
38 err := startServer(s.zkTestRoot, abort)
39 c.Assert(err, IsNil)
40}
41
42// startServerIndirect starts a ZooKeeper server that is not
43// a direct child of the current process. If abort is true,
44// the server will be started and then terminated abnormally.
45func (s *S) startServerIndirect(c *C, abort bool) {
46 if len(os.Args) == 0 {
47 c.Fatal("Cannot find self executable name")
48 }
49 cmd := exec.Command(
50 os.Args[0],
51 "-zktest.reattach",
52 "-zktest.rundir", s.zkTestRoot,
53 "-zktest.stop=", fmt.Sprint(abort),
54 "-test.run", "StartNonChildServer",
55 )
56 r, err := cmd.StdoutPipe()
57 c.Assert(err, IsNil)
58 defer r.Close()
59 if err := cmd.Start(); err != nil {
60 c.Fatalf("cannot start re-entrant gotest process: %v", err)
61 }
62 defer cmd.Wait()
63 bio := bufio.NewReader(r)
64 for {
65 line, err := bio.ReadSlice('\n')
66 if err != nil {
67 c.Fatalf("indirect server status line not found: %v", err)
68 }
69 s := string(line)
70 if strings.HasPrefix(s, "error:") {
71 c.Fatalf("indirect server error: %s", s[len("error:"):])
72 }
73 if s == "done\n" {
74 return
75 }
76 }
77 panic("not reached")
78}
79
80// startServer starts a ZooKeeper server, and terminates it abnormally
81// if abort is true.
82func startServer(runDir string, abort bool) os.Error {
83 srv, err := zk.AttachServer(runDir)
84 if err != nil {
85 return fmt.Errorf("cannot attach to server at %q: %v", runDir, err)
86 }
87 if err := srv.Start(); err != nil {
88 return fmt.Errorf("cannot start server: %v", err)
89 }
90 if abort {
91 // Give it time to start up, then kill the server process abnormally,
92 // leaving the pid.txt file behind.
93 time.Sleep(0.5e9)
94 p, err := srv.Process()
95 if err != nil {
96 return fmt.Errorf("cannot get server process: %v", err)
97 }
98 defer p.Release()
99 if err := p.Kill(); err != nil {
100 return fmt.Errorf("cannot kill server process: %v", err)
101 }
102 }
103 return nil
104}
105
106func (s *S) checkCookie(c *C) {
107 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
108 c.Assert(err, IsNil)
109
110 e, ok := <-watch
111 c.Assert(ok, Equals, true)
112 c.Assert(e.Ok(), Equals, true)
113
114 c.Assert(err, IsNil)
115 cookie, _, err := conn.Get("/testAttachCookie")
116 c.Assert(err, IsNil)
117 c.Assert(cookie, Equals, "testAttachCookie")
118 conn.Close()
119}
120
121// cases to test:
122// child server, stopped normally; reattach, start
123// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
124// non-direct child server, still running; reattach, start (->error), stop, start
125// child server, still running; reattach, start (-> error)
126// child server, still running; reattach, stop, start.
127// non-direct child server, still running; reattach, stop, start.
128func (s *S) TestAttachServer(c *C) {
129 // Create a cookie so that we know we are reattaching to the same instance.
130 conn, _ := s.init(c)
131 _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL))
132 c.Assert(err, IsNil)
133 s.checkCookie(c)
134 s.zkServer.Stop()
135 s.zkServer = nil
136
137 s.testAttachServer(c, (*S).startServer)
138 s.testAttachServer(c, (*S).startServerIndirect)
139 s.testAttachServerAbnormalTerminate(c, (*S).startServer)
140 s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect)
141
142 srv, err := zk.AttachServer(s.zkTestRoot)
143 c.Assert(err, IsNil)
144
145 s.zkServer = srv
146 err = s.zkServer.Start()
147 c.Assert(err, IsNil)
148
149 conn, _ = s.init(c)
150 err = conn.Delete("/testAttachCookie", -1)
151 c.Assert(err, IsNil)
152}
153
154func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) {
155 start(s, c, false)
156
157 s.checkCookie(c)
158
159 // try attaching to it while it is still running - it should fail.
160 srv, err := zk.AttachServer(s.zkTestRoot)
161 c.Assert(err, IsNil)
162
163 err = srv.Start()
164 c.Assert(err, NotNil)
165
166 // stop it and then start it again - it should succeed.
167 err = srv.Stop()
168 c.Assert(err, IsNil)
169
170 err = srv.Start()
171 c.Assert(err, IsNil)
172
173 s.checkCookie(c)
174
175 err = srv.Stop()
176 c.Assert(err, IsNil)
177}
178
179func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) {
180 start(s, c, true)
181
182 // try attaching to it and starting - it should fail, because pid.txt
183 // won't have been removed.
184 srv, err := zk.AttachServer(s.zkTestRoot)
185 c.Assert(err, IsNil)
186 err = srv.Start()
187 c.Assert(err, NotNil)
188
189 // stopping it should bring things back to normal.
190 err = srv.Stop()
191 c.Assert(err, IsNil)
192 err = srv.Start()
193 c.Assert(err, IsNil)
194
195 s.checkCookie(c)
196 err = srv.Stop()
197 c.Assert(err, IsNil)
198}
0199
=== modified file 'retry_test.go'
--- retry_test.go 2011-08-19 01:51:37 +0000
+++ retry_test.go 2011-10-10 13:55:28 +0000
@@ -1,42 +1,41 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/gozk/zk"
6 "os"6 "os"
7)7)
88
9func (s *S) TestRetryChangeCreating(c *C) {9func (s *S) TestRetryChangeCreating(c *C) {
10 zk, _ := s.init(c)10 conn, _ := s.init(c)
1111
12 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),12 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
13 func(data string, stat gozk.Stat) (string, os.Error) {13 func(data string, stat *zk.Stat) (string, os.Error) {
14 c.Assert(data, Equals, "")14 c.Assert(data, Equals, "")
15 c.Assert(stat, IsNil)15 c.Assert(stat, IsNil)
16 return "new", nil16 return "new", nil
17 })17 })
18 c.Assert(err, IsNil)18 c.Assert(err, IsNil)
1919
20 data, stat, err := zk.Get("/test")20 data, stat, err := conn.Get("/test")
21 c.Assert(err, IsNil)21 c.Assert(err, IsNil)
22 c.Assert(stat, NotNil)22 c.Assert(stat, NotNil)
23 c.Assert(stat.Version(), Equals, int32(0))23 c.Assert(stat.Version(), Equals, int32(0))
24 c.Assert(data, Equals, "new")24 c.Assert(data, Equals, "new")
2525
26 acl, _, err := zk.ACL("/test")26 acl, _, err := conn.ACL("/test")
27 c.Assert(err, IsNil)27 c.Assert(err, IsNil)
28 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))28 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
29}29}
3030
31func (s *S) TestRetryChangeSetting(c *C) {31func (s *S) TestRetryChangeSetting(c *C) {
32 zk, _ := s.init(c)32 conn, _ := s.init(c)
3333
34 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,34 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
35 gozk.WorldACL(gozk.PERM_ALL))
36 c.Assert(err, IsNil)35 c.Assert(err, IsNil)
3736
38 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},37 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
39 func(data string, stat gozk.Stat) (string, os.Error) {38 func(data string, stat *zk.Stat) (string, os.Error) {
40 c.Assert(data, Equals, "old")39 c.Assert(data, Equals, "old")
41 c.Assert(stat, NotNil)40 c.Assert(stat, NotNil)
42 c.Assert(stat.Version(), Equals, int32(0))41 c.Assert(stat.Version(), Equals, int32(0))
@@ -44,27 +43,26 @@
44 })43 })
45 c.Assert(err, IsNil)44 c.Assert(err, IsNil)
4645
47 data, stat, err := zk.Get("/test")46 data, stat, err := conn.Get("/test")
48 c.Assert(err, IsNil)47 c.Assert(err, IsNil)
49 c.Assert(stat, NotNil)48 c.Assert(stat, NotNil)
50 c.Assert(stat.Version(), Equals, int32(1))49 c.Assert(stat.Version(), Equals, int32(1))
51 c.Assert(data, Equals, "brand new")50 c.Assert(data, Equals, "brand new")
5251
53 // ACL was unchanged by RetryChange().52 // ACL was unchanged by RetryChange().
54 acl, _, err := zk.ACL("/test")53 acl, _, err := conn.ACL("/test")
55 c.Assert(err, IsNil)54 c.Assert(err, IsNil)
56 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))55 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
57}56}
5857
59func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {58func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
60 zk, _ := s.init(c)59 conn, _ := s.init(c)
6160
62 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,61 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
63 gozk.WorldACL(gozk.PERM_ALL))
64 c.Assert(err, IsNil)62 c.Assert(err, IsNil)
6563
66 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},64 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
67 func(data string, stat gozk.Stat) (string, os.Error) {65 func(data string, stat *zk.Stat) (string, os.Error) {
68 c.Assert(data, Equals, "old")66 c.Assert(data, Equals, "old")
69 c.Assert(stat, NotNil)67 c.Assert(stat, NotNil)
70 c.Assert(stat.Version(), Equals, int32(0))68 c.Assert(stat.Version(), Equals, int32(0))
@@ -72,7 +70,7 @@
72 })70 })
73 c.Assert(err, IsNil)71 c.Assert(err, IsNil)
7472
75 data, stat, err := zk.Get("/test")73 data, stat, err := conn.Get("/test")
76 c.Assert(err, IsNil)74 c.Assert(err, IsNil)
77 c.Assert(stat, NotNil)75 c.Assert(stat, NotNil)
78 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!76 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!
@@ -80,14 +78,14 @@
80}78}
8179
82func (s *S) TestRetryChangeConflictOnCreate(c *C) {80func (s *S) TestRetryChangeConflictOnCreate(c *C) {
83 zk, _ := s.init(c)81 conn, _ := s.init(c)
8482
85 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {83 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
86 switch data {84 switch data {
87 case "":85 case "":
88 c.Assert(stat, IsNil)86 c.Assert(stat, IsNil)
89 _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,87 _, err := conn.Create("/test", "conflict", zk.EPHEMERAL,
90 gozk.WorldACL(gozk.PERM_ALL))88 zk.WorldACL(zk.PERM_ALL))
91 c.Assert(err, IsNil)89 c.Assert(err, IsNil)
92 return "<none> => conflict", nil90 return "<none> => conflict", nil
93 case "conflict":91 case "conflict":
@@ -100,11 +98,10 @@
100 return "can't happen", nil98 return "can't happen", nil
101 }99 }
102100
103 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),101 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc)
104 changeFunc)
105 c.Assert(err, IsNil)102 c.Assert(err, IsNil)
106103
107 data, stat, err := zk.Get("/test")104 data, stat, err := conn.Get("/test")
108 c.Assert(err, IsNil)105 c.Assert(err, IsNil)
109 c.Assert(data, Equals, "conflict => new")106 c.Assert(data, Equals, "conflict => new")
110 c.Assert(stat, NotNil)107 c.Assert(stat, NotNil)
@@ -112,18 +109,17 @@
112}109}
113110
114func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {111func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
115 zk, _ := s.init(c)112 conn, _ := s.init(c)
116113
117 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,114 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
118 gozk.WorldACL(gozk.PERM_ALL))
119 c.Assert(err, IsNil)115 c.Assert(err, IsNil)
120116
121 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {117 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
122 switch data {118 switch data {
123 case "old":119 case "old":
124 c.Assert(stat, NotNil)120 c.Assert(stat, NotNil)
125 c.Assert(stat.Version(), Equals, int32(0))121 c.Assert(stat.Version(), Equals, int32(0))
126 _, err := zk.Set("/test", "conflict", 0)122 _, err := conn.Set("/test", "conflict", 0)
127 c.Assert(err, IsNil)123 c.Assert(err, IsNil)
128 return "old => new", nil124 return "old => new", nil
129 case "conflict":125 case "conflict":
@@ -136,10 +132,10 @@
136 return "can't happen", nil132 return "can't happen", nil
137 }133 }
138134
139 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)135 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc)
140 c.Assert(err, IsNil)136 c.Assert(err, IsNil)
141137
142 data, stat, err := zk.Get("/test")138 data, stat, err := conn.Get("/test")
143 c.Assert(err, IsNil)139 c.Assert(err, IsNil)
144 c.Assert(data, Equals, "conflict => new")140 c.Assert(data, Equals, "conflict => new")
145 c.Assert(stat, NotNil)141 c.Assert(stat, NotNil)
@@ -147,18 +143,17 @@
147}143}
148144
149func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {145func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
150 zk, _ := s.init(c)146 conn, _ := s.init(c)
151147
152 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,148 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
153 gozk.WorldACL(gozk.PERM_ALL))
154 c.Assert(err, IsNil)149 c.Assert(err, IsNil)
155150
156 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {151 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
157 switch data {152 switch data {
158 case "old":153 case "old":
159 c.Assert(stat, NotNil)154 c.Assert(stat, NotNil)
160 c.Assert(stat.Version(), Equals, int32(0))155 c.Assert(stat.Version(), Equals, int32(0))
161 err := zk.Delete("/test", 0)156 err := conn.Delete("/test", 0)
162 c.Assert(err, IsNil)157 c.Assert(err, IsNil)
163 return "old => <deleted>", nil158 return "old => <deleted>", nil
164 case "":159 case "":
@@ -170,55 +165,53 @@
170 return "can't happen", nil165 return "can't happen", nil
171 }166 }
172167
173 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),168 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc)
174 changeFunc)
175 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
176170
177 data, stat, err := zk.Get("/test")171 data, stat, err := conn.Get("/test")
178 c.Assert(err, IsNil)172 c.Assert(err, IsNil)
179 c.Assert(data, Equals, "<deleted> => new")173 c.Assert(data, Equals, "<deleted> => new")
180 c.Assert(stat, NotNil)174 c.Assert(stat, NotNil)
181 c.Assert(stat.Version(), Equals, int32(0))175 c.Assert(stat.Version(), Equals, int32(0))
182176
183 // Should be the new ACL.177 // Should be the new ACL.
184 acl, _, err := zk.ACL("/test")178 acl, _, err := conn.ACL("/test")
185 c.Assert(err, IsNil)179 c.Assert(err, IsNil)
186 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))180 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
187}181}
188182
189func (s *S) TestRetryChangeErrorInCallback(c *C) {183func (s *S) TestRetryChangeErrorInCallback(c *C) {
190 zk, _ := s.init(c)184 conn, _ := s.init(c)
191185
192 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),186 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
193 func(data string, stat gozk.Stat) (string, os.Error) {187 func(data string, stat *zk.Stat) (string, os.Error) {
194 return "don't use this", os.NewError("BOOM!")188 return "don't use this", os.NewError("BOOM!")
195 })189 })
196 c.Assert(err, NotNil)190 c.Assert(err, NotNil)
197 c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
198 c.Assert(err.String(), Equals, "BOOM!")191 c.Assert(err.String(), Equals, "BOOM!")
199192
200 stat, err := zk.Exists("/test")193 stat, err := conn.Exists("/test")
201 c.Assert(err, IsNil)194 c.Assert(err, IsNil)
202 c.Assert(stat, IsNil)195 c.Assert(stat, IsNil)
203}196}
204197
205func (s *S) TestRetryChangeFailsReading(c *C) {198func (s *S) TestRetryChangeFailsReading(c *C) {
206 zk, _ := s.init(c)199 conn, _ := s.init(c)
207200
208 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,201 // Write only!
209 gozk.WorldACL(gozk.PERM_WRITE)) // Write only!202 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE))
210 c.Assert(err, IsNil)203 c.Assert(err, IsNil)
211204
212 var called bool205 var called bool
213 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),206 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
214 func(data string, stat gozk.Stat) (string, os.Error) {207 func(data string, stat *zk.Stat) (string, os.Error) {
215 called = true208 called = true
216 return "", nil209 return "", nil
217 })210 })
218 c.Assert(err, NotNil)211 c.Assert(err, NotNil)
219 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)212 c.Assert(err, Equals, zk.ZNOAUTH)
220213
221 stat, err := zk.Exists("/test")214 stat, err := conn.Exists("/test")
222 c.Assert(err, IsNil)215 c.Assert(err, IsNil)
223 c.Assert(stat, NotNil)216 c.Assert(stat, NotNil)
224 c.Assert(stat.Version(), Equals, int32(0))217 c.Assert(stat.Version(), Equals, int32(0))
@@ -227,22 +220,21 @@
227}220}
228221
229func (s *S) TestRetryChangeFailsSetting(c *C) {222func (s *S) TestRetryChangeFailsSetting(c *C) {
230 zk, _ := s.init(c)223 conn, _ := s.init(c)
231224
232 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,225 // Read only!
233 gozk.WorldACL(gozk.PERM_READ)) // Read only!226 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
234 c.Assert(err, IsNil)227 c.Assert(err, IsNil)
235228
236 var called bool229 var called bool
237 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),230 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
238 func(data string, stat gozk.Stat) (string, os.Error) {231 func(data string, stat *zk.Stat) (string, os.Error) {
239 called = true232 called = true
240 return "", nil233 return "", nil
241 })234 })
242 c.Assert(err, NotNil)235 c.Assert(err, Equals, zk.ZNOAUTH)
243 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
244236
245 stat, err := zk.Exists("/test")237 stat, err := conn.Exists("/test")
246 c.Assert(err, IsNil)238 c.Assert(err, IsNil)
247 c.Assert(stat, NotNil)239 c.Assert(stat, NotNil)
248 c.Assert(stat.Version(), Equals, int32(0))240 c.Assert(stat.Version(), Equals, int32(0))
@@ -251,23 +243,22 @@
251}243}
252244
253func (s *S) TestRetryChangeFailsCreating(c *C) {245func (s *S) TestRetryChangeFailsCreating(c *C) {
254 zk, _ := s.init(c)246 conn, _ := s.init(c)
255247
256 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,248 // Read only!
257 gozk.WorldACL(gozk.PERM_READ)) // Read only!249 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
258 c.Assert(err, IsNil)250 c.Assert(err, IsNil)
259251
260 var called bool252 var called bool
261 err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,253 err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
262 gozk.WorldACL(gozk.PERM_ALL),254 func(data string, stat *zk.Stat) (string, os.Error) {
263 func(data string, stat gozk.Stat) (string, os.Error) {
264 called = true255 called = true
265 return "", nil256 return "", nil
266 })257 })
267 c.Assert(err, NotNil)258 c.Assert(err, NotNil)
268 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)259 c.Assert(err, Equals, zk.ZNOAUTH)
269260
270 stat, err := zk.Exists("/test/sub")261 stat, err := conn.Exists("/test/sub")
271 c.Assert(err, IsNil)262 c.Assert(err, IsNil)
272 c.Assert(stat, IsNil)263 c.Assert(stat, IsNil)
273264
274265
=== added file 'runserver.go'
--- runserver.go 1970-01-01 00:00:00 +0000
+++ runserver.go 2011-10-10 13:55:28 +0000
@@ -0,0 +1,168 @@
1package zk
2
3// This file defines methods on Server that deal with starting
4// and stopping the ZooKeeper service. They are independent of ZooKeeper
5// itself, and may be factored out at a later date.
6
7import (
8 "exec"
9 "fmt"
10 "io/ioutil"
11 "os"
12 "strconv"
13 "time"
14)
15
16var NotRunning = os.NewError("process not running")
17
18// Process returns a Process referring to the running server from
19// where it's been stored in pid.txt. If the file does not
20// exist, or it cannot find the process, it returns the error
21// NotRunning.
22func (srv *Server) Process() (*os.Process, os.Error) {
23 data, err := ioutil.ReadFile(srv.path("pid.txt"))
24 if err != nil {
25 if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT {
26 return nil, NotRunning
27 }
28 return nil, err
29 }
30 pid, err := strconv.Atoi(string(data))
31 if err != nil {
32 return nil, os.NewError("bad process id found in pid.txt")
33 }
34 return getProcess(pid)
35}
36
37// getProcess gets a Process from a pid and checks that the
38// process is actually running. If the process
39// is not running, then getProcess returns a nil
40// Process and the error NotRunning.
41func getProcess(pid int) (*os.Process, os.Error) {
42 p, err := os.FindProcess(pid)
43 if err != nil {
44 return nil, err
45 }
46
47 // try to check if the process is actually running by sending
48 // it signal 0.
49 err = p.Signal(os.UnixSignal(0))
50 if err == nil {
51 return p, nil
52 }
53 if err, ok := err.(os.Errno); ok && err == os.ESRCH {
54 return nil, NotRunning
55 }
56 return nil, os.NewError("server running but inaccessible")
57}
58
59// Start starts the ZooKeeper server.
60// It returns an error if the server is already running.
61func (srv *Server) Start() os.Error {
62 if err := srv.checkAvailability(); err != nil {
63 return err
64 }
65 p, err := srv.Process()
66 if err == nil || err != NotRunning {
67 if p != nil {
68 p.Release()
69 }
70 return os.NewError("server is already running")
71 }
72
73 if _, err := os.Stat(srv.path("pid.txt")); err == nil {
74 // Thre pid.txt file still exists although server is not running.
75 // Remove it so it can be re-created.
76 // This leads to a race: if two processes are both
77 // calling Start, one might remove the file the other
78 // has just created, leading to a situation where
79 // pid.txt describes the wrong server process.
80 // We ignore that possibility for now.
81 // TODO use syscall.Flock?
82 if err := os.Remove(srv.path("pid.txt")); err != nil {
83 return fmt.Errorf("cannot remove pid.txt: %v", err)
84 }
85 }
86
87 // Open the pid file before starting the process so that if we get two
88 // programs trying to concurrently start a server on the same directory
89 // at the same time, only one should succeed.
90 pidf, err := os.OpenFile(srv.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666)
91 if err != nil {
92 return fmt.Errorf("cannot create pid.txt: %v", err)
93 }
94 defer pidf.Close()
95 args, err := srv.command()
96 if err != nil {
97 return fmt.Errorf("cannot determine command: %v", err)
98 }
99 cmd := exec.Command(args[0], args[1:]...)
100
101 logf, err := os.OpenFile(srv.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
102 if err != nil {
103 return fmt.Errorf("cannot create log file: %v", err)
104 }
105 defer logf.Close()
106 cmd.Stdout = logf
107 cmd.Stderr = logf
108 if err := cmd.Start(); err != nil {
109 return fmt.Errorf("cannot start server: %v", err)
110 }
111 if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil {
112 return fmt.Errorf("cannot write pid file: %v", err)
113 }
114 return nil
115}
116
117// Stop kills the ZooKeeper server. It does nothing if it is not running.
118// Note that Stop does not remove any data from the run directory,
119// so Start may be called later on the same directory.
120func (srv *Server) Stop() os.Error {
121 p, err := srv.Process()
122 if p == nil {
123 if err != nil {
124 return fmt.Errorf("cannot read process ID of server: %v", err)
125 }
126 return nil
127 }
128 defer p.Release()
129 if err := p.Kill(); err != nil {
130 return fmt.Errorf("cannot kill server process: %v", err)
131 }
132 // ignore the error returned from Wait because there's little
133 // we can do about it - it either means that the process has just exited
134 // anyway or that we can't wait for it for some other reason,
135 // for example because it was originally started by some other process.
136 _, err = p.Wait(0)
137 if e, ok := err.(*os.SyscallError); ok && e.Errno == os.ECHILD || err == os.ECHILD {
138 // If we can't wait for the server, it's possible that it was running
139 // but not as a child of this process, so the only thing we can do
140 // is to poll until it exits. If the process has taken longer than
141 // a second to exit, then it's probably not going to.
142 for i := 0; i < 5*4; i++ {
143 time.Sleep(1e9 / 4)
144 if np, err := getProcess(p.Pid); err != nil {
145 break
146 } else {
147 np.Release()
148 }
149 }
150 }
151
152 if err := os.Remove(srv.path("pid.txt")); err != nil {
153 return fmt.Errorf("cannot remove server process ID file: %v", err)
154 }
155 return nil
156}
157
158// Destroy stops the ZooKeeper server, and then removes its run
159// directory. Warning: this will destroy all data associated with the server.
160func (srv *Server) Destroy() os.Error {
161 if err := srv.Stop(); err != nil {
162 return err
163 }
164 if err := os.RemoveAll(srv.runDir); err != nil {
165 return err
166 }
167 return nil
168}
0169
=== added file 'server.go'
--- server.go 1970-01-01 00:00:00 +0000
+++ server.go 2011-10-10 13:55:28 +0000
@@ -0,0 +1,229 @@
1package zk
2
3import (
4 "bufio"
5 "bytes"
6 "fmt"
7 "io/ioutil"
8 "net"
9 "os"
10 "path/filepath"
11 "strings"
12)
13
14// Server represents a ZooKeeper server, its data and configuration files.
15type Server struct {
16 runDir string
17 zkDir string
18}
19
20// CreateServer creates the directory runDir and sets up a ZooKeeper server
21// environment inside it. It is an error if runDir already exists.
22// The server will listen on the specified TCP port.
23//
24// The ZooKeeper installation directory is specified by zkDir.
25// If this is empty, a system default will be used.
26//
27// CreateServer does not start the server.
28func CreateServer(port int, runDir, zkDir string) (*Server, os.Error) {
29 if err := os.Mkdir(runDir, 0777); err != nil {
30 return nil, err
31 }
32 srv := &Server{runDir: runDir, zkDir: zkDir}
33 if err := srv.writeLog4JConfig(); err != nil {
34 return nil, err
35 }
36 if err := srv.writeZooKeeperConfig(port); err != nil {
37 return nil, err
38 }
39 if err := srv.writeZkDir(); err != nil {
40 return nil, err
41 }
42 return srv, nil
43}
44
45// AttachServer creates a new ZooKeeper Server instance
46// to operate inside an existing run directory, runDir.
47// The directory must have been created with CreateServer.
48func AttachServer(runDir string) (*Server, os.Error) {
49 srv := &Server{runDir: runDir}
50 if err := srv.readZkDir(); err != nil {
51 return nil, fmt.Errorf("cannot read server install directory: %v", err)
52 }
53 return srv, nil
54}
55
56func (srv *Server) checkAvailability() os.Error {
57 port, err := srv.networkPort()
58 if err != nil {
59 return fmt.Errorf("cannot get network port: %v", err)
60 }
61 l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
62 if err != nil {
63 return fmt.Errorf("cannot listen on port %v: %v", port, err)
64 }
65 l.Close()
66 return nil
67}
68
69// networkPort returns the TCP port number that
70// the server is configured for.
71func (srv *Server) networkPort() (int, os.Error) {
72 f, err := os.Open(srv.path("zoo.cfg"))
73 if err != nil {
74 return 0, err
75 }
76 r := bufio.NewReader(f)
77 for {
78 line, err := r.ReadSlice('\n')
79 if err != nil {
80 return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg"))
81 }
82 var port int
83 if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 {
84 return port, nil
85 }
86 }
87 panic("not reached")
88}
89
90// command returns the command used to start the
91// ZooKeeper server.
92func (srv *Server) command() ([]string, os.Error) {
93 cp, err := srv.classPath()
94 if err != nil {
95 return nil, fmt.Errorf("cannot get class path: %v", err)
96 }
97 return []string{
98 "java",
99 "-cp", strings.Join(cp, ":"),
100 "-Dzookeeper.root.logger=INFO,CONSOLE",
101 "-Dlog4j.configuration=file:" + srv.path("log4j.properties"),
102 "org.apache.zookeeper.server.quorum.QuorumPeerMain",
103 srv.path("zoo.cfg"),
104 }, nil
105}
106
107var log4jProperties = `
108log4j.rootLogger=INFO, CONSOLE
109log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
110log4j.appender.CONSOLE.Threshold=INFO
111log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
112log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
113`
114
115func (srv *Server) writeLog4JConfig() (err os.Error) {
116 return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666)
117}
118
119func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) {
120 return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf(
121 "tickTime=2000\n"+
122 "dataDir=%s\n"+
123 "clientPort=%d\n"+
124 "maxClientCnxns=500\n",
125 srv.runDir, port)), 0666)
126}
127
128func (srv *Server) writeZkDir() os.Error {
129 return ioutil.WriteFile(srv.path("zkdir.txt"), []byte(srv.zkDir), 0666)
130}
131
132func (srv *Server) readZkDir() os.Error {
133 data, err := ioutil.ReadFile(srv.path("zkdir.txt"))
134 if err != nil {
135 return err
136 }
137 srv.zkDir = string(data)
138 return nil
139}
140
141func (srv *Server) classPath() ([]string, os.Error) {
142 dir := srv.zkDir
143 if dir == "" {
144 return systemClassPath()
145 }
146 if err := checkDirectory(dir); err != nil {
147 return nil, err
148 }
149 // Two possibilities, as seen in zkEnv.sh:
150 // 1) locally built binaries (jars are in build directory)
151 // 2) release binaries
152 if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
153 dir = build
154 }
155 classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
156 if err != nil {
157 panic(fmt.Errorf("glob for jar files: %v", err))
158 }
159 more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
160 if err != nil {
161 panic(fmt.Errorf("glob for lib jar files: %v", err))
162 }
163
164 classPath = append(classPath, more...)
165 if len(classPath) == 0 {
166 return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
167 }
168 return classPath, nil
169}
170
171const zookeeperEnviron = "/etc/zookeeper/conf/environment"
172
173func systemClassPath() ([]string, os.Error) {
174 f, err := os.Open(zookeeperEnviron)
175 if f == nil {
176 return nil, err
177 }
178 r := bufio.NewReader(f)
179 for {
180 line, err := r.ReadSlice('\n')
181 if err != nil {
182 break
183 }
184 if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
185 continue
186 }
187
188 // remove variable and newline
189 path := string(line[len("CLASSPATH=") : len(line)-1])
190
191 // trim white space
192 path = strings.Trim(path, " \t\r")
193
194 // strip quotes
195 if path[0] == '"' {
196 path = path[1 : len(path)-1]
197 }
198
199 // split on :
200 classPath := strings.Split(path, ":")
201
202 // split off $ZOOCFGDIR
203 if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
204 classPath = classPath[1:]
205 }
206
207 if len(classPath) == 0 {
208 return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
209 }
210 return classPath, nil
211 }
212 return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
213}
214
215// checkDirectory returns an error if the given path
216// does not exist or is not a directory.
217func checkDirectory(path string) os.Error {
218 if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
219 if err == nil {
220 err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
221 }
222 return err
223 }
224 return nil
225}
226
227func (srv *Server) path(name string) string {
228 return filepath.Join(srv.runDir, name)
229}
0230
=== modified file 'suite_test.go'
--- suite_test.go 2011-08-19 01:43:37 +0000
+++ suite_test.go 2011-10-10 13:55:28 +0000
@@ -1,13 +1,11 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "testing"
6 "io/ioutil"
7 "path"
8 "fmt"5 "fmt"
6 "launchpad.net/gozk/zk"
9 "os"7 "os"
10 "gozk"8 "testing"
11 "time"9 "time"
12)10)
1311
@@ -18,48 +16,32 @@
18var _ = Suite(&S{})16var _ = Suite(&S{})
1917
20type S struct {18type S struct {
21 zkRoot string19 zkServer *zk.Server
22 zkTestRoot string20 zkTestRoot string
23 zkTestPort int21 zkTestPort int
24 zkServerSh string22 zkProcess *os.Process // The running ZooKeeper process
25 zkServerOut *os.File23 zkAddr string
26 zkAddr string
2724
28 handles []*gozk.ZooKeeper25 handles []*zk.Conn
29 events []*gozk.Event26 events []*zk.Event
30 liveWatches int27 liveWatches int
31 deadWatches chan bool28 deadWatches chan bool
32}29}
3330
34var logLevel = 0 //gozk.LOG_ERROR31var logLevel = 0 //zk.LOG_ERROR
3532
3633func (s *S) init(c *C) (*zk.Conn, chan zk.Event) {
37var testZooCfg = ("dataDir=%s\n" +34 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
38 "clientPort=%d\n" +
39 "tickTime=2000\n" +
40 "initLimit=10\n" +
41 "syncLimit=5\n" +
42 "")
43
44var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
45 "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
46 "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
47 "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
48 "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
49 "")
50
51func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
52 zk, watch, err := gozk.Init(s.zkAddr, 5e9)
53 c.Assert(err, IsNil)35 c.Assert(err, IsNil)
5436
55 s.handles = append(s.handles, zk)37 s.handles = append(s.handles, conn)
5638
57 event := <-watch39 event := <-watch
5840
59 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)41 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
60 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)42 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
6143
62 bufferedWatch := make(chan gozk.Event, 256)44 bufferedWatch := make(chan zk.Event, 256)
63 bufferedWatch <- event45 bufferedWatch <- event
6446
65 s.liveWatches += 147 s.liveWatches += 1
@@ -82,13 +64,13 @@
82 s.deadWatches <- true64 s.deadWatches <- true
83 }()65 }()
8466
85 return zk, bufferedWatch67 return conn, bufferedWatch
86}68}
8769
88func (s *S) SetUpTest(c *C) {70func (s *S) SetUpTest(c *C) {
89 c.Assert(gozk.CountPendingWatches(), Equals, 0,71 c.Assert(zk.CountPendingWatches(), Equals, 0,
90 Bug("Test got a dirty watch state before running!"))72 Bug("Test got a dirty watch state before running!"))
91 gozk.SetLogLevel(logLevel)73 zk.SetLogLevel(logLevel)
92}74}
9375
94func (s *S) TearDownTest(c *C) {76func (s *S) TearDownTest(c *C) {
@@ -108,98 +90,37 @@
108 }90 }
10991
110 // Reset the list of handles.92 // Reset the list of handles.
111 s.handles = make([]*gozk.ZooKeeper, 0)93 s.handles = make([]*zk.Conn, 0)
11294
113 c.Assert(gozk.CountPendingWatches(), Equals, 0,95 c.Assert(zk.CountPendingWatches(), Equals, 0,
114 Bug("Test left live watches behind!"))96 Bug("Test left live watches behind!"))
115}97}
11698
117// We use the suite set up and tear down to manage a custom zookeeper99// We use the suite set up and tear down to manage a custom ZooKeeper
118//100//
119func (s *S) SetUpSuite(c *C) {101func (s *S) SetUpSuite(c *C) {
120102 var err os.Error
121 s.deadWatches = make(chan bool)103 s.deadWatches = make(chan bool)
122104
123 var err os.Error105 // N.B. We meed to create a subdirectory because zk.CreateServer
124106 // insists on creating its own directory.
125 s.zkRoot = os.Getenv("ZKROOT")107
126 if s.zkRoot == "" {108 s.zkTestRoot = c.MkDir() + "/zk"
127 panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")109 port := 21812
128 }110 s.zkAddr = fmt.Sprint("localhost:", port)
129111
130 s.zkTestRoot = c.MkDir()112 s.zkServer, err = zk.CreateServer(port, s.zkTestRoot, "")
131 s.zkTestPort = 21812113 if err != nil {
132114 c.Fatal("Cannot set up server environment: ", err)
133 println("ZooKeeper test server directory:", s.zkTestRoot)115 }
134 println("ZooKeeper test server port:", s.zkTestPort)116 err = s.zkServer.Start()
135117 if err != nil {
136 s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)118 c.Fatal("Cannot start ZooKeeper server: ", err)
137119 }
138 s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")
139 s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
140 os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
141 if err != nil {
142 panic("Can't open stdout.txt file for server: " + err.String())
143 }
144
145 dataDir := path.Join(s.zkTestRoot, "data")
146 confDir := path.Join(s.zkTestRoot, "conf")
147
148 os.Mkdir(dataDir, 0755)
149 os.Mkdir(confDir, 0755)
150
151 err = os.Setenv("ZOOCFGDIR", confDir)
152 if err != nil {
153 panic("Can't set $ZOOCFGDIR: " + err.String())
154 }
155
156 zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
157 err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
158 if err != nil {
159 panic("Can't write zoo.cfg: " + err.String())
160 }
161
162 log4jPrp := []byte(testLog4jPrp)
163 err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
164 if err != nil {
165 panic("Can't write log4j.properties: " + err.String())
166 }
167
168 s.StartZK()
169}120}
170121
171func (s *S) TearDownSuite(c *C) {122func (s *S) TearDownSuite(c *C) {
172 s.StopZK()123 if s.zkServer != nil {
173 s.zkServerOut.Close()124 s.zkServer.Destroy()
174}
175
176func (s *S) StartZK() {
177 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
178 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)
179 if err != nil {
180 panic("Problem executing zkServer.sh start: " + err.String())
181 }
182
183 result, err := proc.Wait(0)
184 if err != nil {
185 panic(err.String())
186 } else if result.ExitStatus() != 0 {
187 panic("'zkServer.sh start' exited with non-zero status")
188 }
189}
190
191func (s *S) StopZK() {
192 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
193 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)
194 if err != nil {
195 panic("Problem executing zkServer.sh stop: " + err.String() +
196 " (look for runaway java processes!)")
197 }
198 result, err := proc.Wait(0)
199 if err != nil {
200 panic(err.String())
201 } else if result.ExitStatus() != 0 {
202 panic("'zkServer.sh stop' exited with non-zero status " +
203 "(look for runaway java processes!)")
204 }125 }
205}126}
206127
=== renamed file 'gozk.go' => 'zk.go'
--- gozk.go 2011-08-19 01:56:39 +0000
+++ zk.go 2011-10-10 13:55:28 +0000
@@ -1,4 +1,4 @@
1// gozk - Zookeeper support for the Go language1// gozk - ZooKeeper support for the Go language
2//2//
3// https://wiki.ubuntu.com/gozk3// https://wiki.ubuntu.com/gozk
4//4//
@@ -6,7 +6,7 @@
6//6//
7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
8//8//
9package gozk9package zk
1010
11/*11/*
12#cgo CFLAGS: -I/usr/include/c-client-src12#cgo CFLAGS: -I/usr/include/c-client-src
@@ -27,17 +27,16 @@
27// -----------------------------------------------------------------------27// -----------------------------------------------------------------------
28// Main constants and data types.28// Main constants and data types.
2929
30// The main ZooKeeper object, created through the Init function.30// Conn represents a connection to a set of ZooKeeper nodes.
31// Encapsulates all communication with ZooKeeper.31type Conn struct {
32type ZooKeeper struct {
33 watchChannels map[uintptr]chan Event32 watchChannels map[uintptr]chan Event
34 sessionWatchId uintptr33 sessionWatchId uintptr
35 handle *C.zhandle_t34 handle *C.zhandle_t
36 mutex sync.Mutex35 mutex sync.Mutex
37}36}
3837
39// ClientId represents the established session in ZooKeeper. This is only38// ClientId represents an established ZooKeeper session. It can be
40// useful to be passed back into the ReInit function.39// passed into Redial to reestablish a connection to an existing session.
41type ClientId struct {40type ClientId struct {
42 cId C.clientid_t41 cId C.clientid_t
43}42}
@@ -98,35 +97,60 @@
98 State int97 State int
99}98}
10099
101// Error codes that may be used to verify the result of the100// Error represents a ZooKeeper error.
102// Code method from Error.101type Error int
102
103const (103const (
104 ZOK = C.ZOK104 ZOK Error = C.ZOK
105 ZSYSTEMERROR = C.ZSYSTEMERROR105 ZSYSTEMERROR Error = C.ZSYSTEMERROR
106 ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY106 ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
107 ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY107 ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
108 ZCONNECTIONLOSS = C.ZCONNECTIONLOSS108 ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
109 ZMARSHALLINGERROR = C.ZMARSHALLINGERROR109 ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
110 ZUNIMPLEMENTED = C.ZUNIMPLEMENTED110 ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
111 ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT111 ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
112 ZBADARGUMENTS = C.ZBADARGUMENTS112 ZBADARGUMENTS Error = C.ZBADARGUMENTS
113 ZINVALIDSTATE = C.ZINVALIDSTATE113 ZINVALIDSTATE Error = C.ZINVALIDSTATE
114 ZAPIERROR = C.ZAPIERROR114 ZAPIERROR Error = C.ZAPIERROR
115 ZNONODE = C.ZNONODE115 ZNONODE Error = C.ZNONODE
116 ZNOAUTH = C.ZNOAUTH116 ZNOAUTH Error = C.ZNOAUTH
117 ZBADVERSION = C.ZBADVERSION117 ZBADVERSION Error = C.ZBADVERSION
118 ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS118 ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
119 ZNODEEXISTS = C.ZNODEEXISTS119 ZNODEEXISTS Error = C.ZNODEEXISTS
120 ZNOTEMPTY = C.ZNOTEMPTY120 ZNOTEMPTY Error = C.ZNOTEMPTY
121 ZSESSIONEXPIRED = C.ZSESSIONEXPIRED121 ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
122 ZINVALIDCALLBACK = C.ZINVALIDCALLBACK122 ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
123 ZINVALIDACL = C.ZINVALIDACL123 ZINVALIDACL Error = C.ZINVALIDACL
124 ZAUTHFAILED = C.ZAUTHFAILED124 ZAUTHFAILED Error = C.ZAUTHFAILED
125 ZCLOSING = C.ZCLOSING125 ZCLOSING Error = C.ZCLOSING
126 ZNOTHING = C.ZNOTHING126 ZNOTHING Error = C.ZNOTHING
127 ZSESSIONMOVED = C.ZSESSIONMOVED127 ZSESSIONMOVED Error = C.ZSESSIONMOVED
128)128)
129129
130func (error Error) String() string {
131 return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
132}
133
134// zkError creates an appropriate error return from
135// a ZooKeeper status and the errno return from a C API
136// call.
137func zkError(rc C.int, cerr os.Error) os.Error {
138 code := Error(rc)
139 switch code {
140 case ZOK:
141 return nil
142
143 case ZSYSTEMERROR:
144 // If a ZooKeeper call returns ZSYSTEMERROR, then
145 // errno becomes significant. If errno has not been
146 // set, then we will return ZSYSTEMERROR nonetheless.
147 if cerr != nil {
148 return cerr
149 }
150 }
151 return code
152}
153
130// Constants for SetLogLevel.154// Constants for SetLogLevel.
131const (155const (
132 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR156 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
@@ -273,104 +297,54 @@
273}297}
274298
275// -----------------------------------------------------------------------299// -----------------------------------------------------------------------
276// Error interface which maps onto the ZooKeeper error codes.
277
278type Error interface {
279 String() string
280 Code() int
281}
282
283type errorType struct {
284 zkrc C.int
285 err os.Error
286}
287
288func newError(zkrc C.int, err os.Error) Error {
289 return &errorType{zkrc, err}
290}
291
292func (error *errorType) String() (result string) {
293 if error.zkrc == ZSYSTEMERROR && error.err != nil {
294 result = error.err.String()
295 } else {
296 result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
297 }
298 return
299}
300
301// Code returns the error code that may be compared against one of
302// the gozk.Z* constants.
303func (error *errorType) Code() int {
304 return int(error.zkrc)
305}
306
307// -----------------------------------------------------------------------
308// Stat interface which maps onto the ZooKeeper Stat struct.
309
310// We declare this as an interface rather than an actual struct because
311// this way we don't have to copy data around between the real C struct
312// and the Go one on every call. Most uses will only touch a few elements,
313// or even ignore the stat entirely, so that's a win.
314300
315// Stat contains detailed information about a node.301// Stat contains detailed information about a node.
316type Stat interface {302type Stat struct {
317 Czxid() int64303 c C.struct_Stat
318 Mzxid() int64304}
319 CTime() int64305
320 MTime() int64306func (stat *Stat) Czxid() int64 {
321 Version() int32307 return int64(stat.c.czxid)
322 CVersion() int32308}
323 AVersion() int32309
324 EphemeralOwner() int64310func (stat *Stat) Mzxid() int64 {
325 DataLength() int32311 return int64(stat.c.mzxid)
326 NumChildren() int32312}
327 Pzxid() int64313
328}314func (stat *Stat) CTime() int64 {
329315 return int64(stat.c.ctime)
330type resultStat C.struct_Stat316}
331317
332func (stat *resultStat) Czxid() int64 {318func (stat *Stat) MTime() int64 {
333 return int64(stat.czxid)319 return int64(stat.c.mtime)
334}320}
335321
336func (stat *resultStat) Mzxid() int64 {322func (stat *Stat) Version() int32 {
337 return int64(stat.mzxid)323 return int32(stat.c.version)
338}324}
339325
340func (stat *resultStat) CTime() int64 {326func (stat *Stat) CVersion() int32 {
341 return int64(stat.ctime)327 return int32(stat.c.cversion)
342}328}
343329
344func (stat *resultStat) MTime() int64 {330func (stat *Stat) AVersion() int32 {
345 return int64(stat.mtime)331 return int32(stat.c.aversion)
346}332}
347333
348func (stat *resultStat) Version() int32 {334func (stat *Stat) EphemeralOwner() int64 {
349 return int32(stat.version)335 return int64(stat.c.ephemeralOwner)
350}336}
351337
352func (stat *resultStat) CVersion() int32 {338func (stat *Stat) DataLength() int32 {
353 return int32(stat.cversion)339 return int32(stat.c.dataLength)
354}340}
355341
356func (stat *resultStat) AVersion() int32 {342func (stat *Stat) NumChildren() int32 {
357 return int32(stat.aversion)343 return int32(stat.c.numChildren)
358}344}
359345
360func (stat *resultStat) EphemeralOwner() int64 {346func (stat *Stat) Pzxid() int64 {
361 return int64(stat.ephemeralOwner)347 return int64(stat.c.pzxid)
362}
363
364func (stat *resultStat) DataLength() int32 {
365 return int32(stat.dataLength)
366}
367
368func (stat *resultStat) NumChildren() int32 {
369 return int32(stat.numChildren)
370}
371
372func (stat *resultStat) Pzxid() int64 {
373 return int64(stat.pzxid)
374}348}
375349
376// -----------------------------------------------------------------------350// -----------------------------------------------------------------------
@@ -384,7 +358,7 @@
384 C.zoo_set_debug_level(C.ZooLogLevel(level))358 C.zoo_set_debug_level(C.ZooLogLevel(level))
385}359}
386360
387// Init initializes the communication with a ZooKeeper cluster. The provided361// Dial initializes the communication with a ZooKeeper cluster. The provided
388// servers parameter may include multiple server addresses, separated362// servers parameter may include multiple server addresses, separated
389// by commas, so that the client will automatically attempt to connect363// by commas, so that the client will automatically attempt to connect
390// to another server if one of them stops working for whatever reason.364// to another server if one of them stops working for whatever reason.
@@ -398,79 +372,73 @@
398// The watch channel receives events of type SESSION_EVENT when any change372// The watch channel receives events of type SESSION_EVENT when any change
399// to the state of the established connection happens. See the documentation373// to the state of the established connection happens. See the documentation
400// for the Event type for more details.374// for the Event type for more details.
401func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {375func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) {
402 zk, watch, err = internalInit(servers, recvTimeoutNS, nil)376 return dial(servers, recvTimeoutNS, nil)
403 return
404}377}
405378
406// Equivalent to Init, but attempt to reestablish an existing session379// Redial is equivalent to Dial, but attempts to reestablish an existing session
407// identified via the clientId parameter.380// identified via the clientId parameter.
408func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {381func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
409 zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)382 return dial(servers, recvTimeoutNS, clientId)
410 return
411}383}
412384
413func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {385func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
414386 conn := &Conn{}
415 zk := &ZooKeeper{}387 conn.watchChannels = make(map[uintptr]chan Event)
416 zk.watchChannels = make(map[uintptr]chan Event)
417388
418 var cId *C.clientid_t389 var cId *C.clientid_t
419 if clientId != nil {390 if clientId != nil {
420 cId = &clientId.cId391 cId = &clientId.cId
421 }392 }
422393
423 watchId, watchChannel := zk.createWatch(true)394 watchId, watchChannel := conn.createWatch(true)
424 zk.sessionWatchId = watchId395 conn.sessionWatchId = watchId
425396
426 cservers := C.CString(servers)397 cservers := C.CString(servers)
427 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)398 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
428 C.free(unsafe.Pointer(cservers))399 C.free(unsafe.Pointer(cservers))
429 if handle == nil {400 if handle == nil {
430 zk.closeAllWatches()401 conn.closeAllWatches()
431 return nil, nil, newError(ZSYSTEMERROR, cerr)402 return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
432 }403 }
433 zk.handle = handle404 conn.handle = handle
434 runWatchLoop()405 runWatchLoop()
435 return zk, watchChannel, nil406 return conn, watchChannel, nil
436}407}
437408
438// ClientId returns the client ID for the existing session with ZooKeeper.409// ClientId returns the client ID for the existing session with ZooKeeper.
439// This is useful to reestablish an existing session via ReInit.410// This is useful to reestablish an existing session via ReInit.
440func (zk *ZooKeeper) ClientId() *ClientId {411func (conn *Conn) ClientId() *ClientId {
441 return &ClientId{*C.zoo_client_id(zk.handle)}412 return &ClientId{*C.zoo_client_id(conn.handle)}
442}413}
443414
444// Close terminates the ZooKeeper interaction.415// Close terminates the ZooKeeper interaction.
445func (zk *ZooKeeper) Close() Error {416func (conn *Conn) Close() os.Error {
446417
447 // Protect from concurrency around zk.handle change.418 // Protect from concurrency around conn.handle change.
448 zk.mutex.Lock()419 conn.mutex.Lock()
449 defer zk.mutex.Unlock()420 defer conn.mutex.Unlock()
450421
451 if zk.handle == nil {422 if conn.handle == nil {
452 // ZooKeeper may hang indefinitely if a handler is closed twice,423 // ZooKeeper may hang indefinitely if a handler is closed twice,
453 // so we get in the way and prevent it from happening.424 // so we get in the way and prevent it from happening.
454 return newError(ZCLOSING, nil)425 return ZCLOSING
455 }426 }
456 rc, cerr := C.zookeeper_close(zk.handle)427 rc, cerr := C.zookeeper_close(conn.handle)
457428
458 zk.closeAllWatches()429 conn.closeAllWatches()
459 stopWatchLoop()430 stopWatchLoop()
460431
461 // At this point, nothing else should need zk.handle.432 // At this point, nothing else should need conn.handle.
462 zk.handle = nil433 conn.handle = nil
463434
464 if rc != C.ZOK {435 return zkError(rc, cerr)
465 return newError(rc, cerr)
466 }
467 return nil
468}436}
469437
470// Get returns the data and status from an existing node. err will be nil,438// Get returns the data and status from an existing node. err will be nil,
471// unless an error is found. Attempting to retrieve data from a non-existing439// unless an error is found. Attempting to retrieve data from a non-existing
472// node is an error.440// node is an error.
473func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {441func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
474442
475 cpath := C.CString(path)443 cpath := C.CString(path)
476 cbuffer := (*C.char)(C.malloc(bufferSize))444 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -478,22 +446,21 @@
478 defer C.free(unsafe.Pointer(cpath))446 defer C.free(unsafe.Pointer(cpath))
479 defer C.free(unsafe.Pointer(cbuffer))447 defer C.free(unsafe.Pointer(cbuffer))
480448
481 cstat := C.struct_Stat{}449 var cstat Stat
482 rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,450 rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c)
483 cbuffer, &cbufferLen, &cstat)
484 if rc != C.ZOK {451 if rc != C.ZOK {
485 return "", nil, newError(rc, cerr)452 return "", nil, zkError(rc, cerr)
486 }453 }
454
487 result := C.GoStringN(cbuffer, cbufferLen)455 result := C.GoStringN(cbuffer, cbufferLen)
488456 return result, &cstat, nil
489 return result, (*resultStat)(&cstat), nil
490}457}
491458
492// GetW works like Get but also returns a channel that will receive459// GetW works like Get but also returns a channel that will receive
493// a single Event value when the data or existence of the given ZooKeeper460// a single Event value when the data or existence of the given ZooKeeper
494// node changes or when critical session events happen. See the461// node changes or when critical session events happen. See the
495// documentation of the Event type for more details.462// documentation of the Event type for more details.
496func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {463func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) {
497464
498 cpath := C.CString(path)465 cpath := C.CString(path)
499 cbuffer := (*C.char)(C.malloc(bufferSize))466 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -501,42 +468,38 @@
501 defer C.free(unsafe.Pointer(cpath))468 defer C.free(unsafe.Pointer(cpath))
502 defer C.free(unsafe.Pointer(cbuffer))469 defer C.free(unsafe.Pointer(cbuffer))
503470
504 watchId, watchChannel := zk.createWatch(true)471 watchId, watchChannel := conn.createWatch(true)
505472
506 cstat := C.struct_Stat{}473 var cstat Stat
507 rc, cerr := C.zoo_wget(zk.handle, cpath,474 rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c)
508 C.watch_handler, unsafe.Pointer(watchId),
509 cbuffer, &cbufferLen, &cstat)
510 if rc != C.ZOK {475 if rc != C.ZOK {
511 zk.forgetWatch(watchId)476 conn.forgetWatch(watchId)
512 return "", nil, nil, newError(rc, cerr)477 return "", nil, nil, zkError(rc, cerr)
513 }478 }
514479
515 result := C.GoStringN(cbuffer, cbufferLen)480 result := C.GoStringN(cbuffer, cbufferLen)
516 return result, (*resultStat)(&cstat), watchChannel, nil481 return result, &cstat, watchChannel, nil
517}482}
518483
519// Children returns the children list and status from an existing node.484// Children returns the children list and status from an existing node.
520// err will be nil, unless an error is found. Attempting to retrieve the485// Attempting to retrieve the children list from a non-existent node is an error.
521// children list from a non-existent node is an error.486func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
522func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
523487
524 cpath := C.CString(path)488 cpath := C.CString(path)
525 defer C.free(unsafe.Pointer(cpath))489 defer C.free(unsafe.Pointer(cpath))
526490
527 cvector := C.struct_String_vector{}491 cvector := C.struct_String_vector{}
528 cstat := C.struct_Stat{}492 var cstat Stat
529 rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,493 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
530 &cvector, &cstat)
531494
532 // Can't happen if rc != 0, but avoid potential memory leaks in the future.495 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
533 if cvector.count != 0 {496 if cvector.count != 0 {
534 children = parseStringVector(&cvector)497 children = parseStringVector(&cvector)
535 }498 }
536 if rc != C.ZOK {499 if rc == C.ZOK {
537 err = newError(rc, cerr)500 stat = &cstat
538 } else {501 } else {
539 stat = (*resultStat)(&cstat)502 err = zkError(rc, cerr)
540 }503 }
541 return504 return
542}505}
@@ -545,29 +508,27 @@
545// receive a single Event value when a node is added or removed under the508// receive a single Event value when a node is added or removed under the
546// provided path or when critical session events happen. See the documentation509// provided path or when critical session events happen. See the documentation
547// of the Event type for more details.510// of the Event type for more details.
548func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {511func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) {
549512
550 cpath := C.CString(path)513 cpath := C.CString(path)
551 defer C.free(unsafe.Pointer(cpath))514 defer C.free(unsafe.Pointer(cpath))
552515
553 watchId, watchChannel := zk.createWatch(true)516 watchId, watchChannel := conn.createWatch(true)
554517
555 cvector := C.struct_String_vector{}518 cvector := C.struct_String_vector{}
556 cstat := C.struct_Stat{}519 var cstat Stat
557 rc, cerr := C.zoo_wget_children2(zk.handle, cpath,520 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c)
558 C.watch_handler, unsafe.Pointer(watchId),
559 &cvector, &cstat)
560521
561 // Can't happen if rc != 0, but avoid potential memory leaks in the future.522 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
562 if cvector.count != 0 {523 if cvector.count != 0 {
563 children = parseStringVector(&cvector)524 children = parseStringVector(&cvector)
564 }525 }
565 if rc != C.ZOK {526 if rc == C.ZOK {
566 zk.forgetWatch(watchId)527 stat = &cstat
567 err = newError(rc, cerr)528 watch = watchChannel
568 } else {529 } else {
569 stat = (*resultStat)(&cstat)530 conn.forgetWatch(watchId)
570 watch = watchChannel531 err = zkError(rc, cerr)
571 }532 }
572 return533 return
573}534}
@@ -588,20 +549,20 @@
588// Exists checks if a node exists at the given path. If it does,549// Exists checks if a node exists at the given path. If it does,
589// stat will contain meta information on the existing node, otherwise550// stat will contain meta information on the existing node, otherwise
590// it will be nil.551// it will be nil.
591func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {552func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) {
592 cpath := C.CString(path)553 cpath := C.CString(path)
593 defer C.free(unsafe.Pointer(cpath))554 defer C.free(unsafe.Pointer(cpath))
594555
595 cstat := C.struct_Stat{}556 var cstat Stat
596 rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)557 rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c)
597558
598 // We diverge a bit from the usual here: a ZNONODE is not an error559 // We diverge a bit from the usual here: a ZNONODE is not an error
599 // for an exists call, otherwise every Exists call would have to check560 // for an exists call, otherwise every Exists call would have to check
600 // for err != nil and err.Code() != ZNONODE.561 // for err != nil and err.Code() != ZNONODE.
601 if rc == C.ZOK {562 if rc == C.ZOK {
602 stat = (*resultStat)(&cstat)563 stat = &cstat
603 } else if rc != C.ZNONODE {564 } else if rc != C.ZNONODE {
604 err = newError(rc, cerr)565 err = zkError(rc, cerr)
605 }566 }
606 return567 return
607}568}
@@ -611,28 +572,27 @@
611// stat is nil and the node didn't exist, or when the existing node572// stat is nil and the node didn't exist, or when the existing node
612// is removed. It will also receive critical session events. See the573// is removed. It will also receive critical session events. See the
613// documentation of the Event type for more details.574// documentation of the Event type for more details.
614func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {575func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) {
615 cpath := C.CString(path)576 cpath := C.CString(path)
616 defer C.free(unsafe.Pointer(cpath))577 defer C.free(unsafe.Pointer(cpath))
617578
618 watchId, watchChannel := zk.createWatch(true)579 watchId, watchChannel := conn.createWatch(true)
619580
620 cstat := C.struct_Stat{}581 var cstat Stat
621 rc, cerr := C.zoo_wexists(zk.handle, cpath,582 rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
622 C.watch_handler, unsafe.Pointer(watchId), &cstat)
623583
624 // We diverge a bit from the usual here: a ZNONODE is not an error584 // We diverge a bit from the usual here: a ZNONODE is not an error
625 // for an exists call, otherwise every Exists call would have to check585 // for an exists call, otherwise every Exists call would have to check
626 // for err != nil and err.Code() != ZNONODE.586 // for err != nil and err.Code() != ZNONODE.
627 switch rc {587 switch Error(rc) {
628 case ZOK:588 case ZOK:
629 stat = (*resultStat)(&cstat)589 stat = &cstat
630 watch = watchChannel590 watch = watchChannel
631 case ZNONODE:591 case ZNONODE:
632 watch = watchChannel592 watch = watchChannel
633 default:593 default:
634 zk.forgetWatch(watchId)594 conn.forgetWatch(watchId)
635 err = newError(rc, cerr)595 err = zkError(rc, cerr)
636 }596 }
637 return597 return
638}598}
@@ -646,7 +606,7 @@
646// The returned path is useful in cases where the created path may differ606// The returned path is useful in cases where the created path may differ
647// from the requested one, such as when a sequence number is appended607// from the requested one, such as when a sequence number is appended
648// to it due to the use of the gozk.SEQUENCE flag.608// to it due to the use of the gozk.SEQUENCE flag.
649func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {609func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
650 cpath := C.CString(path)610 cpath := C.CString(path)
651 cvalue := C.CString(value)611 cvalue := C.CString(value)
652 defer C.free(unsafe.Pointer(cpath))612 defer C.free(unsafe.Pointer(cpath))
@@ -660,13 +620,13 @@
660 cpathCreated := (*C.char)(C.malloc(cpathLen))620 cpathCreated := (*C.char)(C.malloc(cpathLen))
661 defer C.free(unsafe.Pointer(cpathCreated))621 defer C.free(unsafe.Pointer(cpathCreated))
662622
663 rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),623 rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
664 caclv, C.int(flags), cpathCreated, C.int(cpathLen))624 if rc == C.ZOK {
665 if rc != C.ZOK {625 pathCreated = C.GoString(cpathCreated)
666 return "", newError(rc, cerr)626 } else {
627 err = zkError(rc, cerr)
667 }628 }
668629 return
669 return C.GoString(cpathCreated), nil
670}630}
671631
672// Set modifies the data for the existing node at the given path, replacing it632// Set modifies the data for the existing node at the given path, replacing it
@@ -677,35 +637,31 @@
677//637//
678// It is an error to attempt to set the data of a non-existing node with638// It is an error to attempt to set the data of a non-existing node with
679// this function. In these cases, use Create instead.639// this function. In these cases, use Create instead.
680func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {640func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
681641
682 cpath := C.CString(path)642 cpath := C.CString(path)
683 cvalue := C.CString(value)643 cvalue := C.CString(value)
684 defer C.free(unsafe.Pointer(cpath))644 defer C.free(unsafe.Pointer(cpath))
685 defer C.free(unsafe.Pointer(cvalue))645 defer C.free(unsafe.Pointer(cvalue))
686646
687 cstat := C.struct_Stat{}647 var cstat Stat
688648 rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c)
689 rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),649 if rc == C.ZOK {
690 C.int(version), &cstat)650 stat = &cstat
691 if rc != C.ZOK {651 } else {
692 return nil, newError(rc, cerr)652 err = zkError(rc, cerr)
693 }653 }
694654 return
695 return (*resultStat)(&cstat), nil
696}655}
697656
698// Delete removes the node at path. If version is not -1, the operation657// Delete removes the node at path. If version is not -1, the operation
699// will only succeed if the node is still at this version when the658// will only succeed if the node is still at this version when the
700// node is deleted as an atomic operation.659// node is deleted as an atomic operation.
701func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {660func (conn *Conn) Delete(path string, version int32) (err os.Error) {
702 cpath := C.CString(path)661 cpath := C.CString(path)
703 defer C.free(unsafe.Pointer(cpath))662 defer C.free(unsafe.Pointer(cpath))
704 rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))663 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
705 if rc != C.ZOK {664 return zkError(rc, cerr)
706 return newError(rc, cerr)
707 }
708 return nil
709}665}
710666
711// AddAuth adds a new authentication certificate to the ZooKeeper667// AddAuth adds a new authentication certificate to the ZooKeeper
@@ -713,7 +669,7 @@
713// authentication information, while the cert parameter provides the669// authentication information, while the cert parameter provides the
714// identity data itself. For instance, the "digest" scheme requires670// identity data itself. For instance, the "digest" scheme requires
715// a pair like "username:password" to be provided as the certificate.671// a pair like "username:password" to be provided as the certificate.
716func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {672func (conn *Conn) AddAuth(scheme, cert string) os.Error {
717 cscheme := C.CString(scheme)673 cscheme := C.CString(scheme)
718 ccert := C.CString(cert)674 ccert := C.CString(cert)
719 defer C.free(unsafe.Pointer(cscheme))675 defer C.free(unsafe.Pointer(cscheme))
@@ -725,43 +681,38 @@
725 }681 }
726 defer C.destroy_completion_data(data)682 defer C.destroy_completion_data(data)
727683
728 rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),684 rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data))
729 C.handle_void_completion, unsafe.Pointer(data))
730 if rc != C.ZOK {685 if rc != C.ZOK {
731 return newError(rc, cerr)686 return zkError(rc, cerr)
732 }687 }
733688
734 C.wait_for_completion(data)689 C.wait_for_completion(data)
735690
736 rc = C.int(uintptr(data.data))691 rc = C.int(uintptr(data.data))
737 if rc != C.ZOK {692 return zkError(rc, nil)
738 return newError(rc, nil)
739 }
740
741 return nil
742}693}
743694
744// ACL returns the access control list for path.695// ACL returns the access control list for path.
745func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {696func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
746697
747 cpath := C.CString(path)698 cpath := C.CString(path)
748 defer C.free(unsafe.Pointer(cpath))699 defer C.free(unsafe.Pointer(cpath))
749700
750 caclv := C.struct_ACL_vector{}701 caclv := C.struct_ACL_vector{}
751 cstat := C.struct_Stat{}
752702
753 rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)703 var cstat Stat
704 rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
754 if rc != C.ZOK {705 if rc != C.ZOK {
755 return nil, nil, newError(rc, cerr)706 return nil, nil, zkError(rc, cerr)
756 }707 }
757708
758 aclv := parseACLVector(&caclv)709 aclv := parseACLVector(&caclv)
759710
760 return aclv, (*resultStat)(&cstat), nil711 return aclv, &cstat, nil
761}712}
762713
763// SetACL changes the access control list for path.714// SetACL changes the access control list for path.
764func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {715func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
765716
766 cpath := C.CString(path)717 cpath := C.CString(path)
767 defer C.free(unsafe.Pointer(cpath))718 defer C.free(unsafe.Pointer(cpath))
@@ -769,12 +720,8 @@
769 caclv := buildACLVector(aclv)720 caclv := buildACLVector(aclv)
770 defer C.deallocate_ACL_vector(caclv)721 defer C.deallocate_ACL_vector(caclv)
771722
772 rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)723 rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
773 if rc != C.ZOK {724 return zkError(rc, cerr)
774 return newError(rc, cerr)
775 }
776
777 return nil
778}725}
779726
780func parseACLVector(caclv *C.struct_ACL_vector) []ACL {727func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
@@ -822,7 +769,7 @@
822// -----------------------------------------------------------------------769// -----------------------------------------------------------------------
823// RetryChange utility method.770// RetryChange utility method.
824771
825type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)772type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
826773
827// RetryChange runs changeFunc to attempt to atomically change path774// RetryChange runs changeFunc to attempt to atomically change path
828// in a lock free manner, and retries in case there was another775// in a lock free manner, and retries in case there was another
@@ -831,8 +778,7 @@
831// changeFunc must work correctly if called multiple times in case778// changeFunc must work correctly if called multiple times in case
832// the modification fails due to concurrent changes, and it may return779// the modification fails due to concurrent changes, and it may return
833// an error that will cause the the RetryChange function to stop and780// an error that will cause the the RetryChange function to stop and
834// return an Error with code ZSYSTEMERROR and the same .String() result781// return the same error.
835// as the provided error.
836//782//
837// This mechanism is not suitable for a node that is frequently modified783// This mechanism is not suitable for a node that is frequently modified
838// concurrently. For those cases, consider using a pessimistic locking784// concurrently. For those cases, consider using a pessimistic locking
@@ -845,8 +791,7 @@
845//791//
846// 2. Call the changeFunc with the current node value and stat,792// 2. Call the changeFunc with the current node value and stat,
847// or with an empty string and nil stat, if the node doesn't yet exist.793// or with an empty string and nil stat, if the node doesn't yet exist.
848// If the changeFunc returns an error, stop and return an Error with794// If the changeFunc returns an error, stop and return the same error.
849// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
850//795//
851// 3. If the changeFunc returns no errors, use the string returned as796// 3. If the changeFunc returns no errors, use the string returned as
852// the new candidate value for the node, and attempt to either create797// the new candidate value for the node, and attempt to either create
@@ -855,36 +800,32 @@
855// in the same node), repeat from step 1. If this procedure fails with any800// in the same node), repeat from step 1. If this procedure fails with any
856// other error, stop and return the error found.801// other error, stop and return the error found.
857//802//
858func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {803func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
859 for {804 for {
860 oldValue, oldStat, getErr := zk.Get(path)805 oldValue, oldStat, err := conn.Get(path)
861 if getErr != nil && getErr.Code() != ZNONODE {806 if err != nil && err != ZNONODE {
862 err = getErr807 return err
863 break808 }
864 }809 newValue, err := changeFunc(oldValue, oldStat)
865 newValue, osErr := changeFunc(oldValue, oldStat)810 if err != nil {
866 if osErr != nil {811 return err
867 return newError(ZSYSTEMERROR, osErr)812 }
868 } else if oldStat == nil {813 if oldStat == nil {
869 _, err = zk.Create(path, newValue, flags, acl)814 _, err := conn.Create(path, newValue, flags, acl)
870 if err == nil || err.Code() != ZNODEEXISTS {815 if err == nil || err != ZNODEEXISTS {
871 break816 return err
872 }817 }
873 } else if newValue == oldValue {818 continue
819 }
820 if newValue == oldValue {
874 return nil // Nothing to do.821 return nil // Nothing to do.
875 } else {822 }
876 _, err = zk.Set(path, newValue, oldStat.Version())823 _, err = conn.Set(path, newValue, oldStat.Version())
877 if err == nil {824 if err == nil || (err != ZBADVERSION && err != ZNONODE) {
878 break825 return err
879 } else {
880 code := err.Code()
881 if code != ZBADVERSION && code != ZNONODE {
882 break
883 }
884 }
885 }826 }
886 }827 }
887 return err828 panic("not reached")
888}829}
889830
890// -----------------------------------------------------------------------831// -----------------------------------------------------------------------
@@ -897,7 +838,7 @@
897// Whenever a *W method is called, it will return a channel which838// Whenever a *W method is called, it will return a channel which
898// outputs Event values. Internally, a map is used to maintain references839// outputs Event values. Internally, a map is used to maintain references
899// between an unique integer key (the watchId), and the event channel. The840// between an unique integer key (the watchId), and the event channel. The
900// watchId is then handed to the C zookeeper library as the watch context,841// watchId is then handed to the C ZooKeeper library as the watch context,
901// so that we get it back when events happen. Using an integer key as the842// so that we get it back when events happen. Using an integer key as the
902// watch context rather than a pointer is needed because there's no guarantee843// watch context rather than a pointer is needed because there's no guarantee
903// that in the future the GC will not move objects around, and also because844// that in the future the GC will not move objects around, and also because
@@ -910,13 +851,13 @@
910// Since Cgo doesn't allow calling back into Go, we actually fire a new851// Since Cgo doesn't allow calling back into Go, we actually fire a new
911// goroutine the very first time Init is called, and allow it to block852// goroutine the very first time Init is called, and allow it to block
912// in a pthread condition variable within a C function. This condition853// in a pthread condition variable within a C function. This condition
913// will only be notified once a zookeeper watch callback appends new854// will only be notified once a ZooKeeper watch callback appends new
914// entries to the event list. When this happens, the C function returns855// entries to the event list. When this happens, the C function returns
915// and we get back into Go land with the pointer to the watch data,856// and we get back into Go land with the pointer to the watch data,
916// including the watchId and other event details such as type and path.857// including the watchId and other event details such as type and path.
917858
918var watchMutex sync.Mutex859var watchMutex sync.Mutex
919var watchZooKeepers = make(map[uintptr]*ZooKeeper)860var watchConns = make(map[uintptr]*Conn)
920var watchCounter uintptr861var watchCounter uintptr
921var watchLoopCounter int862var watchLoopCounter int
922863
@@ -925,14 +866,14 @@
925// mostly as a debugging and testing aid.866// mostly as a debugging and testing aid.
926func CountPendingWatches() int {867func CountPendingWatches() int {
927 watchMutex.Lock()868 watchMutex.Lock()
928 count := len(watchZooKeepers)869 count := len(watchConns)
929 watchMutex.Unlock()870 watchMutex.Unlock()
930 return count871 return count
931}872}
932873
933// createWatch creates and registers a watch, returning the watch id874// createWatch creates and registers a watch, returning the watch id
934// and channel.875// and channel.
935func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {876func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
936 buf := 1 // session/watch event877 buf := 1 // session/watch event
937 if session {878 if session {
938 buf = 32879 buf = 32
@@ -942,8 +883,8 @@
942 defer watchMutex.Unlock()883 defer watchMutex.Unlock()
943 watchId = watchCounter884 watchId = watchCounter
944 watchCounter += 1885 watchCounter += 1
945 zk.watchChannels[watchId] = watchChannel886 conn.watchChannels[watchId] = watchChannel
946 watchZooKeepers[watchId] = zk887 watchConns[watchId] = conn
947 return888 return
948}889}
949890
@@ -951,21 +892,21 @@
951// from ever getting delivered. It shouldn't be used if there's any892// from ever getting delivered. It shouldn't be used if there's any
952// chance the watch channel is still visible and not closed, since893// chance the watch channel is still visible and not closed, since
953// it might mean a goroutine would be blocked forever.894// it might mean a goroutine would be blocked forever.
954func (zk *ZooKeeper) forgetWatch(watchId uintptr) {895func (conn *Conn) forgetWatch(watchId uintptr) {
955 watchMutex.Lock()896 watchMutex.Lock()
956 defer watchMutex.Unlock()897 defer watchMutex.Unlock()
957 zk.watchChannels[watchId] = nil, false898 conn.watchChannels[watchId] = nil, false
958 watchZooKeepers[watchId] = nil, false899 watchConns[watchId] = nil, false
959}900}
960901
961// closeAllWatches closes all watch channels for zk.902// closeAllWatches closes all watch channels for conn.
962func (zk *ZooKeeper) closeAllWatches() {903func (conn *Conn) closeAllWatches() {
963 watchMutex.Lock()904 watchMutex.Lock()
964 defer watchMutex.Unlock()905 defer watchMutex.Unlock()
965 for watchId, ch := range zk.watchChannels {906 for watchId, ch := range conn.watchChannels {
966 close(ch)907 close(ch)
967 zk.watchChannels[watchId] = nil, false908 conn.watchChannels[watchId] = nil, false
968 watchZooKeepers[watchId] = nil, false909 watchConns[watchId] = nil, false
969 }910 }
970}911}
971912
@@ -978,11 +919,11 @@
978 }919 }
979 watchMutex.Lock()920 watchMutex.Lock()
980 defer watchMutex.Unlock()921 defer watchMutex.Unlock()
981 zk, ok := watchZooKeepers[watchId]922 conn, ok := watchConns[watchId]
982 if !ok {923 if !ok {
983 return924 return
984 }925 }
985 if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId {926 if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
986 switch event.State {927 switch event.State {
987 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:928 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
988 default:929 default:
@@ -990,7 +931,7 @@
990 return931 return
991 }932 }
992 }933 }
993 ch := zk.watchChannels[watchId]934 ch := conn.watchChannels[watchId]
994 if ch == nil {935 if ch == nil {
995 return936 return
996 }937 }
@@ -1002,15 +943,15 @@
1002 // straight to the buffer), and the application isn't paying943 // straight to the buffer), and the application isn't paying
1003 // attention for long enough to have the buffer filled up.944 // attention for long enough to have the buffer filled up.
1004 // Break down now rather than leaking forever.945 // Break down now rather than leaking forever.
1005 if watchId == zk.sessionWatchId {946 if watchId == conn.sessionWatchId {
1006 panic("Session event channel buffer is full")947 panic("Session event channel buffer is full")
1007 } else {948 } else {
1008 panic("Watch event channel buffer is full")949 panic("Watch event channel buffer is full")
1009 }950 }
1010 }951 }
1011 if watchId != zk.sessionWatchId {952 if watchId != conn.sessionWatchId {
1012 zk.watchChannels[watchId] = nil, false953 conn.watchChannels[watchId] = nil, false
1013 watchZooKeepers[watchId] = nil, false954 watchConns[watchId] = nil, false
1014 close(ch)955 close(ch)
1015 }956 }
1016}957}
1017958
=== renamed file 'gozk_test.go' => 'zk_test.go'
--- gozk_test.go 2011-08-19 01:51:37 +0000
+++ zk_test.go 2011-10-10 13:55:28 +0000
@@ -1,17 +1,17 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/gozk/zk"
6 "time"6 "time"
7)7)
88
9// This error will be delivered via C errno, since ZK unfortunately9// This error will be delivered via C errno, since ZK unfortunately
10// only provides the handler back from zookeeper_init().10// only provides the handler back from zookeeper_init().
11func (s *S) TestInitErrorThroughErrno(c *C) {11func (s *S) TestInitErrorThroughErrno(c *C) {
12 zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)12 conn, watch, err := zk.Dial("bad-domain-without-port", 5e9)
13 if zk != nil {13 if conn != nil {
14 zk.Close()14 conn.Close()
15 }15 }
16 if watch != nil {16 if watch != nil {
17 go func() {17 go func() {
@@ -23,15 +23,15 @@
23 }23 }
24 }()24 }()
25 }25 }
26 c.Assert(zk, IsNil)26 c.Assert(conn, IsNil)
27 c.Assert(watch, IsNil)27 c.Assert(watch, IsNil)
28 c.Assert(err, Matches, "invalid argument")28 c.Assert(err, Matches, "invalid argument")
29}29}
3030
31func (s *S) TestRecvTimeoutInitParameter(c *C) {31func (s *S) TestRecvTimeoutInitParameter(c *C) {
32 zk, watch, err := gozk.Init(s.zkAddr, 0)32 conn, watch, err := zk.Dial(s.zkAddr, 0)
33 c.Assert(err, IsNil)33 c.Assert(err, IsNil)
34 defer zk.Close()34 defer conn.Close()
3535
36 select {36 select {
37 case <-watch:37 case <-watch:
@@ -40,7 +40,7 @@
40 }40 }
4141
42 for i := 0; i != 1000; i++ {42 for i := 0; i != 1000; i++ {
43 _, _, err := zk.Get("/zookeeper")43 _, _, err := conn.Get("/zookeeper")
44 if err != nil {44 if err != nil {
45 c.Assert(err, Matches, "operation timeout")45 c.Assert(err, Matches, "operation timeout")
46 c.SucceedNow()46 c.SucceedNow()
@@ -51,76 +51,79 @@
51}51}
5252
53func (s *S) TestSessionWatches(c *C) {53func (s *S) TestSessionWatches(c *C) {
54 c.Assert(gozk.CountPendingWatches(), Equals, 0)54 c.Assert(zk.CountPendingWatches(), Equals, 0)
5555
56 zk1, watch1 := s.init(c)56 zk1, watch1 := s.init(c)
57 zk2, watch2 := s.init(c)57 zk2, watch2 := s.init(c)
58 zk3, watch3 := s.init(c)58 zk3, watch3 := s.init(c)
5959
60 c.Assert(gozk.CountPendingWatches(), Equals, 3)60 c.Assert(zk.CountPendingWatches(), Equals, 3)
6161
62 event1 := <-watch162 event1 := <-watch1
63 c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)63 c.Assert(event1.Type, Equals, zk.EVENT_SESSION)
64 c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)64 c.Assert(event1.State, Equals, zk.STATE_CONNECTED)
6565
66 c.Assert(gozk.CountPendingWatches(), Equals, 3)66 c.Assert(zk.CountPendingWatches(), Equals, 3)
6767
68 event2 := <-watch268 event2 := <-watch2
69 c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)69 c.Assert(event2.Type, Equals, zk.EVENT_SESSION)
70 c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)70 c.Assert(event2.State, Equals, zk.STATE_CONNECTED)
7171
72 c.Assert(gozk.CountPendingWatches(), Equals, 3)72 c.Assert(zk.CountPendingWatches(), Equals, 3)
7373
74 event3 := <-watch374 event3 := <-watch3
75 c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)75 c.Assert(event3.Type, Equals, zk.EVENT_SESSION)
76 c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)76 c.Assert(event3.State, Equals, zk.STATE_CONNECTED)
7777
78 c.Assert(gozk.CountPendingWatches(), Equals, 3)78 c.Assert(zk.CountPendingWatches(), Equals, 3)
7979
80 zk1.Close()80 zk1.Close()
81 c.Assert(gozk.CountPendingWatches(), Equals, 2)81 c.Assert(zk.CountPendingWatches(), Equals, 2)
82 zk2.Close()82 zk2.Close()
83 c.Assert(gozk.CountPendingWatches(), Equals, 1)83 c.Assert(zk.CountPendingWatches(), Equals, 1)
84 zk3.Close()84 zk3.Close()
85 c.Assert(gozk.CountPendingWatches(), Equals, 0)85 c.Assert(zk.CountPendingWatches(), Equals, 0)
86}86}
8787
88// Gozk injects a STATE_CLOSED event when zk.Close() is called, right88// Gozk injects a STATE_CLOSED event when conn.Close() is called, right
89// before the channel is closed. Closing the channel injects a nil89// before the channel is closed. Closing the channel injects a nil
90// pointer, as usual for Go, so the STATE_CLOSED gives a chance to90// pointer, as usual for Go, so the STATE_CLOSED gives a chance to
91// know that a nil pointer is coming, and to stop the procedure.91// know that a nil pointer is coming, and to stop the procedure.
92// Hopefully this procedure will avoid some nil-pointer references by92// Hopefully this procedure will avoid some nil-pointer references by
93// mistake.93// mistake.
94func (s *S) TestClosingStateInSessionWatch(c *C) {94func (s *S) TestClosingStateInSessionWatch(c *C) {
95 zk, watch := s.init(c)95 conn, watch := s.init(c)
9696
97 event := <-watch97 event := <-watch
98 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)98 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
99 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)99 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
100100
101 zk.Close()101 conn.Close()
102 event, ok := <-watch102 event, ok := <-watch
103 c.Assert(ok, Equals, false)103 c.Assert(ok, Equals, false)
104 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)104 c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
105 c.Assert(event.State, Equals, gozk.STATE_CLOSED)105 c.Assert(event.State, Equals, zk.STATE_CLOSED)
106}106}
107107
108func (s *S) TestEventString(c *C) {108func (s *S) TestEventString(c *C) {
109 var event gozk.Event109 var event zk.Event
110 event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}110 event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED}
111 c.Assert(event, Matches, "ZooKeeper connected")111 c.Assert(event, Matches, "ZooKeeper connected")
112 event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}112 event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED}
113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")
114 event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}114 event = zk.Event{-1, "/path", zk.STATE_CLOSED}
115 c.Assert(event, Matches, "ZooKeeper connection closed")115 c.Assert(event, Matches, "ZooKeeper connection closed")
116}116}
117117
118var okTests = []struct{gozk.Event; Ok bool}{118var okTests = []struct {
119 {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},119 zk.Event
120 {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},120 Ok bool
121 {gozk.Event{0, "", gozk.STATE_CLOSED}, false},121}{
122 {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},122 {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true},
123 {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},123 {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true},
124 {zk.Event{0, "", zk.STATE_CLOSED}, false},
125 {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false},
126 {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false},
124}127}
125128
126func (s *S) TestEventOk(c *C) {129func (s *S) TestEventOk(c *C) {
@@ -130,9 +133,9 @@
130}133}
131134
132func (s *S) TestGetAndStat(c *C) {135func (s *S) TestGetAndStat(c *C) {
133 zk, _ := s.init(c)136 conn, _ := s.init(c)
134137
135 data, stat, err := zk.Get("/zookeeper")138 data, stat, err := conn.Get("/zookeeper")
136 c.Assert(err, IsNil)139 c.Assert(err, IsNil)
137 c.Assert(data, Equals, "")140 c.Assert(data, Equals, "")
138 c.Assert(stat.Czxid(), Equals, int64(0))141 c.Assert(stat.Czxid(), Equals, int64(0))
@@ -149,58 +152,58 @@
149}152}
150153
151func (s *S) TestGetAndError(c *C) {154func (s *S) TestGetAndError(c *C) {
152 zk, _ := s.init(c)155 conn, _ := s.init(c)
153156
154 data, stat, err := zk.Get("/non-existent")157 data, stat, err := conn.Get("/non-existent")
155158
156 c.Assert(data, Equals, "")159 c.Assert(data, Equals, "")
157 c.Assert(stat, IsNil)160 c.Assert(stat, IsNil)
158 c.Assert(err, Matches, "no node")161 c.Assert(err, Matches, "no node")
159 c.Assert(err.Code(), Equals, gozk.ZNONODE)162 c.Assert(err, Equals, zk.ZNONODE)
160}163}
161164
162func (s *S) TestCreateAndGet(c *C) {165func (s *S) TestCreateAndGet(c *C) {
163 zk, _ := s.init(c)166 conn, _ := s.init(c)
164167
165 path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))168 path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
166 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
167 c.Assert(path, Matches, "/test-[0-9]+")170 c.Assert(path, Matches, "/test-[0-9]+")
168171
169 // Check the error condition from Create().172 // Check the error condition from Create().
170 _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))173 _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
171 c.Assert(err, Matches, "node exists")174 c.Assert(err, Matches, "node exists")
172175
173 data, _, err := zk.Get(path)176 data, _, err := conn.Get(path)
174 c.Assert(err, IsNil)177 c.Assert(err, IsNil)
175 c.Assert(data, Equals, "bababum")178 c.Assert(data, Equals, "bababum")
176}179}
177180
178func (s *S) TestCreateSetAndGet(c *C) {181func (s *S) TestCreateSetAndGet(c *C) {
179 zk, _ := s.init(c)182 conn, _ := s.init(c)
180183
181 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))184 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
182 c.Assert(err, IsNil)185 c.Assert(err, IsNil)
183186
184 stat, err := zk.Set("/test", "bababum", -1) // Any version.187 stat, err := conn.Set("/test", "bababum", -1) // Any version.
185 c.Assert(err, IsNil)188 c.Assert(err, IsNil)
186 c.Assert(stat.Version(), Equals, int32(1))189 c.Assert(stat.Version(), Equals, int32(1))
187190
188 data, _, err := zk.Get("/test")191 data, _, err := conn.Get("/test")
189 c.Assert(err, IsNil)192 c.Assert(err, IsNil)
190 c.Assert(data, Equals, "bababum")193 c.Assert(data, Equals, "bababum")
191}194}
192195
193func (s *S) TestGetAndWatch(c *C) {196func (s *S) TestGetAndWatch(c *C) {
194 c.Check(gozk.CountPendingWatches(), Equals, 0)197 c.Check(zk.CountPendingWatches(), Equals, 0)
195198
196 zk, _ := s.init(c)199 conn, _ := s.init(c)
197200
198 c.Check(gozk.CountPendingWatches(), Equals, 1)201 c.Check(zk.CountPendingWatches(), Equals, 1)
199202
200 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))203 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
201 c.Assert(err, IsNil)204 c.Assert(err, IsNil)
202205
203 data, stat, watch, err := zk.GetW("/test")206 data, stat, watch, err := conn.GetW("/test")
204 c.Assert(err, IsNil)207 c.Assert(err, IsNil)
205 c.Assert(data, Equals, "one")208 c.Assert(data, Equals, "one")
206 c.Assert(stat.Version(), Equals, int32(0))209 c.Assert(stat.Version(), Equals, int32(0))
@@ -211,17 +214,17 @@
211 default:214 default:
212 }215 }
213216
214 c.Check(gozk.CountPendingWatches(), Equals, 2)217 c.Check(zk.CountPendingWatches(), Equals, 2)
215218
216 _, err = zk.Set("/test", "two", -1)219 _, err = conn.Set("/test", "two", -1)
217 c.Assert(err, IsNil)220 c.Assert(err, IsNil)
218221
219 event := <-watch222 event := <-watch
220 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)223 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
221224
222 c.Check(gozk.CountPendingWatches(), Equals, 1)225 c.Check(zk.CountPendingWatches(), Equals, 1)
223226
224 data, _, watch, err = zk.GetW("/test")227 data, _, watch, err = conn.GetW("/test")
225 c.Assert(err, IsNil)228 c.Assert(err, IsNil)
226 c.Assert(data, Equals, "two")229 c.Assert(data, Equals, "two")
227230
@@ -231,86 +234,86 @@
231 default:234 default:
232 }235 }
233236
234 c.Check(gozk.CountPendingWatches(), Equals, 2)237 c.Check(zk.CountPendingWatches(), Equals, 2)
235238
236 _, err = zk.Set("/test", "three", -1)239 _, err = conn.Set("/test", "three", -1)
237 c.Assert(err, IsNil)240 c.Assert(err, IsNil)
238241
239 event = <-watch242 event = <-watch
240 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)243 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
241244
242 c.Check(gozk.CountPendingWatches(), Equals, 1)245 c.Check(zk.CountPendingWatches(), Equals, 1)
243}246}
244247
245func (s *S) TestGetAndWatchWithError(c *C) {248func (s *S) TestGetAndWatchWithError(c *C) {
246 c.Check(gozk.CountPendingWatches(), Equals, 0)249 c.Check(zk.CountPendingWatches(), Equals, 0)
247250
248 zk, _ := s.init(c)251 conn, _ := s.init(c)
249252
250 c.Check(gozk.CountPendingWatches(), Equals, 1)253 c.Check(zk.CountPendingWatches(), Equals, 1)
251254
252 _, _, watch, err := zk.GetW("/test")255 _, _, watch, err := conn.GetW("/test")
253 c.Assert(err, NotNil)256 c.Assert(err, NotNil)
254 c.Assert(err.Code(), Equals, gozk.ZNONODE)257 c.Assert(err, Equals, zk.ZNONODE)
255 c.Assert(watch, IsNil)258 c.Assert(watch, IsNil)
256259
257 c.Check(gozk.CountPendingWatches(), Equals, 1)260 c.Check(zk.CountPendingWatches(), Equals, 1)
258}261}
259262
260func (s *S) TestCloseReleasesWatches(c *C) {263func (s *S) TestCloseReleasesWatches(c *C) {
261 c.Check(gozk.CountPendingWatches(), Equals, 0)264 c.Check(zk.CountPendingWatches(), Equals, 0)
262265
263 zk, _ := s.init(c)266 conn, _ := s.init(c)
264267
265 c.Check(gozk.CountPendingWatches(), Equals, 1)268 c.Check(zk.CountPendingWatches(), Equals, 1)
266269
267 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))270 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
268 c.Assert(err, IsNil)271 c.Assert(err, IsNil)
269272
270 _, _, _, err = zk.GetW("/test")273 _, _, _, err = conn.GetW("/test")
271 c.Assert(err, IsNil)274 c.Assert(err, IsNil)
272275
273 c.Assert(gozk.CountPendingWatches(), Equals, 2)276 c.Assert(zk.CountPendingWatches(), Equals, 2)
274277
275 zk.Close()278 conn.Close()
276279
277 c.Assert(gozk.CountPendingWatches(), Equals, 0)280 c.Assert(zk.CountPendingWatches(), Equals, 0)
278}281}
279282
280// By default, the ZooKeeper C client will hang indefinitely if a283// By default, the ZooKeeper C client will hang indefinitely if a
281// handler is closed twice. We get in the way and prevent it.284// handler is closed twice. We get in the way and prevent it.
282func (s *S) TestClosingTwiceDoesntHang(c *C) {285func (s *S) TestClosingTwiceDoesntHang(c *C) {
283 zk, _ := s.init(c)286 conn, _ := s.init(c)
284 err := zk.Close()287 err := conn.Close()
285 c.Assert(err, IsNil)288 c.Assert(err, IsNil)
286 err = zk.Close()289 err = conn.Close()
287 c.Assert(err, NotNil)290 c.Assert(err, NotNil)
288 c.Assert(err.Code(), Equals, gozk.ZCLOSING)291 c.Assert(err, Equals, zk.ZCLOSING)
289}292}
290293
291func (s *S) TestChildren(c *C) {294func (s *S) TestChildren(c *C) {
292 zk, _ := s.init(c)295 conn, _ := s.init(c)
293296
294 children, stat, err := zk.Children("/")297 children, stat, err := conn.Children("/")
295 c.Assert(err, IsNil)298 c.Assert(err, IsNil)
296 c.Assert(children, Equals, []string{"zookeeper"})299 c.Assert(children, Equals, []string{"zookeeper"})
297 c.Assert(stat.NumChildren(), Equals, int32(1))300 c.Assert(stat.NumChildren(), Equals, int32(1))
298301
299 children, stat, err = zk.Children("/non-existent")302 children, stat, err = conn.Children("/non-existent")
300 c.Assert(err, NotNil)303 c.Assert(err, NotNil)
301 c.Assert(err.Code(), Equals, gozk.ZNONODE)304 c.Assert(err, Equals, zk.ZNONODE)
302 c.Assert(children, Equals, []string{})305 c.Assert(children, Equals, []string{})
303 c.Assert(stat, Equals, nil)306 c.Assert(stat, IsNil)
304}307}
305308
306func (s *S) TestChildrenAndWatch(c *C) {309func (s *S) TestChildrenAndWatch(c *C) {
307 c.Check(gozk.CountPendingWatches(), Equals, 0)310 c.Check(zk.CountPendingWatches(), Equals, 0)
308311
309 zk, _ := s.init(c)312 conn, _ := s.init(c)
310313
311 c.Check(gozk.CountPendingWatches(), Equals, 1)314 c.Check(zk.CountPendingWatches(), Equals, 1)
312315
313 children, stat, watch, err := zk.ChildrenW("/")316 children, stat, watch, err := conn.ChildrenW("/")
314 c.Assert(err, IsNil)317 c.Assert(err, IsNil)
315 c.Assert(children, Equals, []string{"zookeeper"})318 c.Assert(children, Equals, []string{"zookeeper"})
316 c.Assert(stat.NumChildren(), Equals, int32(1))319 c.Assert(stat.NumChildren(), Equals, int32(1))
@@ -321,18 +324,18 @@
321 default:324 default:
322 }325 }
323326
324 c.Check(gozk.CountPendingWatches(), Equals, 2)327 c.Check(zk.CountPendingWatches(), Equals, 2)
325328
326 _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))329 _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
327 c.Assert(err, IsNil)330 c.Assert(err, IsNil)
328331
329 event := <-watch332 event := <-watch
330 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)333 c.Assert(event.Type, Equals, zk.EVENT_CHILD)
331 c.Assert(event.Path, Equals, "/")334 c.Assert(event.Path, Equals, "/")
332335
333 c.Check(gozk.CountPendingWatches(), Equals, 1)336 c.Check(zk.CountPendingWatches(), Equals, 1)
334337
335 children, stat, watch, err = zk.ChildrenW("/")338 children, stat, watch, err = conn.ChildrenW("/")
336 c.Assert(err, IsNil)339 c.Assert(err, IsNil)
337 c.Assert(stat.NumChildren(), Equals, int32(2))340 c.Assert(stat.NumChildren(), Equals, int32(2))
338341
@@ -345,57 +348,56 @@
345 default:348 default:
346 }349 }
347350
348 c.Check(gozk.CountPendingWatches(), Equals, 2)351 c.Check(zk.CountPendingWatches(), Equals, 2)
349352
350 _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))353 _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
351 c.Assert(err, IsNil)354 c.Assert(err, IsNil)
352355
353 event = <-watch356 event = <-watch
354 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)357 c.Assert(event.Type, Equals, zk.EVENT_CHILD)
355358
356 c.Check(gozk.CountPendingWatches(), Equals, 1)359 c.Check(zk.CountPendingWatches(), Equals, 1)
357}360}
358361
359func (s *S) TestChildrenAndWatchWithError(c *C) {362func (s *S) TestChildrenAndWatchWithError(c *C) {
360 c.Check(gozk.CountPendingWatches(), Equals, 0)363 c.Check(zk.CountPendingWatches(), Equals, 0)
361364
362 zk, _ := s.init(c)365 conn, _ := s.init(c)
363366
364 c.Check(gozk.CountPendingWatches(), Equals, 1)367 c.Check(zk.CountPendingWatches(), Equals, 1)
365368
366 _, stat, watch, err := zk.ChildrenW("/test")369 _, stat, watch, err := conn.ChildrenW("/test")
367 c.Assert(err, NotNil)370 c.Assert(err, NotNil)
368 c.Assert(err.Code(), Equals, gozk.ZNONODE)371 c.Assert(err, Equals, zk.ZNONODE)
369 c.Assert(watch, IsNil)372 c.Assert(watch, IsNil)
370 c.Assert(stat, IsNil)373 c.Assert(stat, IsNil)
371374
372 c.Check(gozk.CountPendingWatches(), Equals, 1)375 c.Check(zk.CountPendingWatches(), Equals, 1)
373}376}
374377
375func (s *S) TestExists(c *C) {378func (s *S) TestExists(c *C) {
376 zk, _ := s.init(c)379 conn, _ := s.init(c)
377380
378 stat, err := zk.Exists("/zookeeper")381 stat, err := conn.Exists("/non-existent")
379 c.Assert(err, IsNil)
380 c.Assert(stat.NumChildren(), Equals, int32(1))
381
382 stat, err = zk.Exists("/non-existent")
383 c.Assert(err, IsNil)382 c.Assert(err, IsNil)
384 c.Assert(stat, IsNil)383 c.Assert(stat, IsNil)
384
385 stat, err = conn.Exists("/zookeeper")
386 c.Assert(err, IsNil)
385}387}
386388
387func (s *S) TestExistsAndWatch(c *C) {389func (s *S) TestExistsAndWatch(c *C) {
388 c.Check(gozk.CountPendingWatches(), Equals, 0)390 c.Check(zk.CountPendingWatches(), Equals, 0)
389391
390 zk, _ := s.init(c)392 conn, _ := s.init(c)
391393
392 c.Check(gozk.CountPendingWatches(), Equals, 1)394 c.Check(zk.CountPendingWatches(), Equals, 1)
393395
394 stat, watch, err := zk.ExistsW("/test")396 stat, watch, err := conn.ExistsW("/test")
395 c.Assert(err, IsNil)397 c.Assert(err, IsNil)
396 c.Assert(stat, IsNil)398 c.Assert(stat, IsNil)
397399
398 c.Check(gozk.CountPendingWatches(), Equals, 2)400 c.Check(zk.CountPendingWatches(), Equals, 2)
399401
400 select {402 select {
401 case <-watch:403 case <-watch:
@@ -403,62 +405,62 @@
403 default:405 default:
404 }406 }
405407
406 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))408 _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
407 c.Assert(err, IsNil)409 c.Assert(err, IsNil)
408410
409 event := <-watch411 event := <-watch
410 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)412 c.Assert(event.Type, Equals, zk.EVENT_CREATED)
411 c.Assert(event.Path, Equals, "/test")413 c.Assert(event.Path, Equals, "/test")
412414
413 c.Check(gozk.CountPendingWatches(), Equals, 1)415 c.Check(zk.CountPendingWatches(), Equals, 1)
414416
415 stat, watch, err = zk.ExistsW("/test")417 stat, watch, err = conn.ExistsW("/test")
416 c.Assert(err, IsNil)418 c.Assert(err, IsNil)
417 c.Assert(stat, NotNil)419 c.Assert(stat, NotNil)
418 c.Assert(stat.NumChildren(), Equals, int32(0))420 c.Assert(stat.NumChildren(), Equals, int32(0))
419421
420 c.Check(gozk.CountPendingWatches(), Equals, 2)422 c.Check(zk.CountPendingWatches(), Equals, 2)
421}423}
422424
423func (s *S) TestExistsAndWatchWithError(c *C) {425func (s *S) TestExistsAndWatchWithError(c *C) {
424 c.Check(gozk.CountPendingWatches(), Equals, 0)426 c.Check(zk.CountPendingWatches(), Equals, 0)
425427
426 zk, _ := s.init(c)428 conn, _ := s.init(c)
427429
428 c.Check(gozk.CountPendingWatches(), Equals, 1)430 c.Check(zk.CountPendingWatches(), Equals, 1)
429431
430 stat, watch, err := zk.ExistsW("///")432 stat, watch, err := conn.ExistsW("///")
431 c.Assert(err, NotNil)433 c.Assert(err, NotNil)
432 c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)434 c.Assert(err, Equals, zk.ZBADARGUMENTS)
433 c.Assert(stat, IsNil)435 c.Assert(stat, IsNil)
434 c.Assert(watch, IsNil)436 c.Assert(watch, IsNil)
435437
436 c.Check(gozk.CountPendingWatches(), Equals, 1)438 c.Check(zk.CountPendingWatches(), Equals, 1)
437}439}
438440
439func (s *S) TestDelete(c *C) {441func (s *S) TestDelete(c *C) {
440 zk, _ := s.init(c)442 conn, _ := s.init(c)
441443
442 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))444 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
443 c.Assert(err, IsNil)445 c.Assert(err, IsNil)
444446
445 err = zk.Delete("/test", 5)447 err = conn.Delete("/test", 5)
446 c.Assert(err, NotNil)448 c.Assert(err, NotNil)
447 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)449 c.Assert(err, Equals, zk.ZBADVERSION)
448450
449 err = zk.Delete("/test", -1)451 err = conn.Delete("/test", -1)
450 c.Assert(err, IsNil)452 c.Assert(err, IsNil)
451453
452 err = zk.Delete("/test", -1)454 err = conn.Delete("/test", -1)
453 c.Assert(err, NotNil)455 c.Assert(err, NotNil)
454 c.Assert(err.Code(), Equals, gozk.ZNONODE)456 c.Assert(err, Equals, zk.ZNONODE)
455}457}
456458
457func (s *S) TestClientIdAndReInit(c *C) {459func (s *S) TestClientIdAndReInit(c *C) {
458 zk1, _ := s.init(c)460 zk1, _ := s.init(c)
459 clientId1 := zk1.ClientId()461 clientId1 := zk1.ClientId()
460462
461 zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)463 zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1)
462 c.Assert(err, IsNil)464 c.Assert(err, IsNil)
463 defer zk2.Close()465 defer zk2.Close()
464 clientId2 := zk2.ClientId()466 clientId2 := zk2.ClientId()
@@ -469,110 +471,110 @@
469// Surprisingly for some (including myself, initially), the watch471// Surprisingly for some (including myself, initially), the watch
470// returned by the exists method actually fires on data changes too.472// returned by the exists method actually fires on data changes too.
471func (s *S) TestExistsWatchOnDataChange(c *C) {473func (s *S) TestExistsWatchOnDataChange(c *C) {
472 zk, _ := s.init(c)474 conn, _ := s.init(c)
473475
474 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))476 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
475 c.Assert(err, IsNil)477 c.Assert(err, IsNil)
476478
477 _, watch, err := zk.ExistsW("/test")479 _, watch, err := conn.ExistsW("/test")
478 c.Assert(err, IsNil)480 c.Assert(err, IsNil)
479481
480 _, err = zk.Set("/test", "new", -1)482 _, err = conn.Set("/test", "new", -1)
481 c.Assert(err, IsNil)483 c.Assert(err, IsNil)
482484
483 event := <-watch485 event := <-watch
484486
485 c.Assert(event.Path, Equals, "/test")487 c.Assert(event.Path, Equals, "/test")
486 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)488 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
487}489}
488490
489func (s *S) TestACL(c *C) {491func (s *S) TestACL(c *C) {
490 zk, _ := s.init(c)492 conn, _ := s.init(c)
491493
492 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))494 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
493 c.Assert(err, IsNil)495 c.Assert(err, IsNil)
494496
495 acl, stat, err := zk.ACL("/test")497 acl, stat, err := conn.ACL("/test")
496 c.Assert(err, IsNil)498 c.Assert(err, IsNil)
497 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))499 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
498 c.Assert(stat, NotNil)500 c.Assert(stat, NotNil)
499 c.Assert(stat.Version(), Equals, int32(0))501 c.Assert(stat.Version(), Equals, int32(0))
500502
501 acl, stat, err = zk.ACL("/non-existent")503 acl, stat, err = conn.ACL("/non-existent")
502 c.Assert(err, NotNil)504 c.Assert(err, NotNil)
503 c.Assert(err.Code(), Equals, gozk.ZNONODE)505 c.Assert(err, Equals, zk.ZNONODE)
504 c.Assert(acl, IsNil)506 c.Assert(acl, IsNil)
505 c.Assert(stat, IsNil)507 c.Assert(stat, IsNil)
506}508}
507509
508func (s *S) TestSetACL(c *C) {510func (s *S) TestSetACL(c *C) {
509 zk, _ := s.init(c)511 conn, _ := s.init(c)
510512
511 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))513 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
512 c.Assert(err, IsNil)514 c.Assert(err, IsNil)
513515
514 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)516 err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5)
515 c.Assert(err, NotNil)517 c.Assert(err, NotNil)
516 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)518 c.Assert(err, Equals, zk.ZBADVERSION)
517519
518 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)520 err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1)
519 c.Assert(err, IsNil)521 c.Assert(err, IsNil)
520522
521 acl, _, err := zk.ACL("/test")523 acl, _, err := conn.ACL("/test")
522 c.Assert(err, IsNil)524 c.Assert(err, IsNil)
523 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))525 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
524}526}
525527
526func (s *S) TestAddAuth(c *C) {528func (s *S) TestAddAuth(c *C) {
527 zk, _ := s.init(c)529 conn, _ := s.init(c)
528530
529 acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}531 acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
530532
531 _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)533 _, err := conn.Create("/test", "", zk.EPHEMERAL, acl)
532 c.Assert(err, IsNil)534 c.Assert(err, IsNil)
533535
534 _, _, err = zk.Get("/test")536 _, _, err = conn.Get("/test")
535 c.Assert(err, NotNil)537 c.Assert(err, NotNil)
536 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)538 c.Assert(err, Equals, zk.ZNOAUTH)
537539
538 err = zk.AddAuth("digest", "joe:passwd")540 err = conn.AddAuth("digest", "joe:passwd")
539 c.Assert(err, IsNil)541 c.Assert(err, IsNil)
540542
541 _, _, err = zk.Get("/test")543 _, _, err = conn.Get("/test")
542 c.Assert(err, IsNil)544 c.Assert(err, IsNil)
543}545}
544546
545func (s *S) TestWatchOnReconnection(c *C) {547func (s *S) TestWatchOnReconnection(c *C) {
546 c.Check(gozk.CountPendingWatches(), Equals, 0)548 c.Check(zk.CountPendingWatches(), Equals, 0)
547549
548 zk, session := s.init(c)550 conn, session := s.init(c)
549551
550 event := <-session552 event := <-session
551 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)553 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
552 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)554 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
553555
554 c.Check(gozk.CountPendingWatches(), Equals, 1)556 c.Check(zk.CountPendingWatches(), Equals, 1)
555557
556 stat, watch, err := zk.ExistsW("/test")558 stat, watch, err := conn.ExistsW("/test")
557 c.Assert(err, IsNil)559 c.Assert(err, IsNil)
558 c.Assert(stat, IsNil)560 c.Assert(stat, IsNil)
559561
560 c.Check(gozk.CountPendingWatches(), Equals, 2)562 c.Check(zk.CountPendingWatches(), Equals, 2)
561563
562 s.StopZK()564 s.zkServer.Stop()
563 time.Sleep(2e9)565 time.Sleep(2e9)
564 s.StartZK()566 s.zkServer.Start()
565567
566 // The session channel should receive the reconnection notification,568 // The session channel should receive the reconnection notification.
567 select {569 select {
568 case event := <-session:570 case event := <-session:
569 c.Assert(event.State, Equals, gozk.STATE_CONNECTING)571 c.Assert(event.State, Equals, zk.STATE_CONNECTING)
570 case <-time.After(3e9):572 case <-time.After(3e9):
571 c.Fatal("Session watch didn't fire")573 c.Fatal("Session watch didn't fire")
572 }574 }
573 select {575 select {
574 case event := <-session:576 case event := <-session:
575 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)577 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
576 case <-time.After(3e9):578 case <-time.After(3e9):
577 c.Fatal("Session watch didn't fire")579 c.Fatal("Session watch didn't fire")
578 }580 }
@@ -585,40 +587,40 @@
585 }587 }
586588
587 // And it should still work.589 // And it should still work.
588 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))590 _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
589 c.Assert(err, IsNil)591 c.Assert(err, IsNil)
590592
591 event = <-watch593 event = <-watch
592 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)594 c.Assert(event.Type, Equals, zk.EVENT_CREATED)
593 c.Assert(event.Path, Equals, "/test")595 c.Assert(event.Path, Equals, "/test")
594596
595 c.Check(gozk.CountPendingWatches(), Equals, 1)597 c.Check(zk.CountPendingWatches(), Equals, 1)
596}598}
597599
598func (s *S) TestWatchOnSessionExpiration(c *C) {600func (s *S) TestWatchOnSessionExpiration(c *C) {
599 c.Check(gozk.CountPendingWatches(), Equals, 0)601 c.Check(zk.CountPendingWatches(), Equals, 0)
600602
601 zk, session := s.init(c)603 conn, session := s.init(c)
602604
603 event := <-session605 event := <-session
604 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)606 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
605 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)607 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
606608
607 c.Check(gozk.CountPendingWatches(), Equals, 1)609 c.Check(zk.CountPendingWatches(), Equals, 1)
608610
609 stat, watch, err := zk.ExistsW("/test")611 stat, watch, err := conn.ExistsW("/test")
610 c.Assert(err, IsNil)612 c.Assert(err, IsNil)
611 c.Assert(stat, IsNil)613 c.Assert(stat, IsNil)
612614
613 c.Check(gozk.CountPendingWatches(), Equals, 2)615 c.Check(zk.CountPendingWatches(), Equals, 2)
614616
615 // Use expiration trick described in the FAQ.617 // Use expiration trick described in the FAQ.
616 clientId := zk.ClientId()618 clientId := conn.ClientId()
617 zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)619 zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId)
618620
619 for event := range session2 {621 for event := range session2 {
620 c.Log("Event from overlapping session: ", event)622 c.Log("Event from overlapping session: ", event)
621 if event.State == gozk.STATE_CONNECTED {623 if event.State == zk.STATE_CONNECTED {
622 // Wait for zk to process the connection.624 // Wait for zk to process the connection.
623 // Not reliable without this. :-(625 // Not reliable without this. :-(
624 time.Sleep(1e9)626 time.Sleep(1e9)
@@ -627,21 +629,21 @@
627 }629 }
628 for event := range session {630 for event := range session {
629 c.Log("Event from primary session: ", event)631 c.Log("Event from primary session: ", event)
630 if event.State == gozk.STATE_EXPIRED_SESSION {632 if event.State == zk.STATE_EXPIRED_SESSION {
631 break633 break
632 }634 }
633 }635 }
634636
635 select {637 select {
636 case event := <-watch:638 case event := <-watch:
637 c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)639 c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
638 case <-time.After(3e9):640 case <-time.After(3e9):
639 c.Fatal("Watch event didn't fire")641 c.Fatal("Watch event didn't fire")
640 }642 }
641643
642 event = <-watch644 event = <-watch
643 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)645 c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
644 c.Assert(event.State, Equals, gozk.STATE_CLOSED)646 c.Assert(event.State, Equals, zk.STATE_CLOSED)
645647
646 c.Check(gozk.CountPendingWatches(), Equals, 1)648 c.Check(zk.CountPendingWatches(), Equals, 1)
647}649}

Subscribers

People subscribed via source and target branches

to all changes: