Merge lp:~rogpeppe/gozk/update-server-interface into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Merged
Merge reported by: Gustavo Niemeyer
Merged at revision: not available
Proposed branch: lp:~rogpeppe/gozk/update-server-interface
Merge into: lp:~juju/gozk/trunk
Diff against target: 3053 lines (+1235/-782)
9 files modified
Makefile (+6/-2)
example/example.go (+22/-23)
reattach_test.go (+198/-0)
retry_test.go (+65/-74)
runserver.go (+168/-0)
server.go (+229/-0)
suite_test.go (+42/-121)
zk.go (+252/-311)
zk_test.go (+253/-251)
To merge this branch: bzr merge lp:~rogpeppe/gozk/update-server-interface
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Needs Fixing
Review via email: mp+77009@code.launchpad.net

Description of the change

Updated server API.

To post a comment you must log in.
24. By Roger Peppe

Fix zookeeper spelling

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Great stuff here Rog, thanks!

Some comments:

[1]

-// The ZooKepeer installation directory is specified by installedDir.
+// The ZooKeeper installation directory is specified by installDir.

installDir feels like a strange name in this context. I agree installedDir
isn't great either, but the new one gives a wrong impression about what
it means.. we're not installing anything there.

Let's use something like zkDir instead.

[2]

+func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
(...)
+func AttachServer(runDir string) (*Server, os.Error) {

The organization inside these is looking very nice.

[3]

+func (srv *Server) getServerProcess() (*os.Process, os.Error) {
(...)
+ return os.FindProcess(pid)

FindProcess does nothing on Unix. We can run at least a Signal 0 against
the pid to tell if it exists or not.

[4]

+ if p != nil {
+ p.Release()
+ }
+ return fmt.Errorf("ZooKeeper server may already be running (remove %q to clear)", srv.path("pid.txt"))

The recommendation here is a bit strange. If the process is actually running,
removing the pid file will allow a second server to be started in parallel,
which is not nice.

I suggest just saying "ZooKeeper server seems to be running already", which
is a bit more true given that we'll be checking the pid as per the point above.
Then, the developer can run Stop() on it if he feels like stopping it, or
do whatever else out of band if he doesn't want to use the interface, but
then it's his responsibility to check for background processes, etc.

[5]

+// Stop kills the ZooKeeper server. It is a no-op if it is already running.

The second part is reversed, and reads a bit hard.

Suggestion: It does nothing if not running.

Also, please mention that the data is preserved, and that the server can
be restarted. "kill" may feel like otherwise.

[6]

+// Destroy stops the ZooKeeper server, and then removes its run
+// directory and all its contents.

s/and all its contents././

Removing the directory means removing its contents necessarily.

[7]

+func (srv *Server) writeInstallDir() os.Error {
+ return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)

These functions and filename should be renamed in sync to the first point.

[8]

+ if data[len(data)-1] == '\n' {
+ data = data[0 : len(data)-1]
+ }
+ srv.installDir = string(data)

srv.zkDir = string(bytes.TrimSpace(data))

[9]

+ s.zkTestRoot = c.MkDir() + "/zk"
+ port := 21812
+ s.zkAddr = fmt.Sprint("localhost:", port)
+
+ s.zkServer, err = zk.CreateServer(port, s.zkTestRoot, "")
  if err != nil {
   c.Fatal("Cannot set up server environment: ", err)
  }
- s.StartZK(c)
+ err = s.zkServer.Start()
+ if err != nil {
+ c.Fatal("Cannot start ZooKeeper server: ", err)
+ }

This is awesome! Thanks!

review: Approve
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

[10]

+// exist, it assumes the server is not running and returns (nil, nil).

This interface is strange, and can easily lead to bugs since it's not an idiom in Go.
I'd never assume p is nil if err == nil. Please return an error if p == nil.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

On 5 October 2011 15:34, Gustavo Niemeyer <email address hidden> wrote:
> Review: Approve
>
> Great stuff here Rog, thanks!
>
> Some comments:
>
>
> [1]
>
> -// The ZooKepeer installation directory is specified by installedDir.
> +// The ZooKeeper installation directory is specified by installDir.
>
> installDir feels like a strange name in this context. I agree installedDir
> isn't great either, but the new one gives a wrong impression about what
> it means.. we're not installing anything there.
>
> Let's use something like zkDir instead.

done.

> +func (srv *Server) getServerProcess() (*os.Process, os.Error) {
> (...)
> +       return os.FindProcess(pid)
>
> FindProcess does nothing on Unix.  We can run at least a Signal 0 against
> the pid to tell if it exists or not.

ok, as discussed i'll just not bother about the race.

> [5]
>
> +// Stop kills the ZooKeeper server. It is a no-op if it is already running.
>
> The second part is reversed, and reads a bit hard.
>
> Suggestion: It does nothing if not running.

better. my text was just wrong. done.
>
> Also, please mention that the data is preserved, and that the server can
> be restarted. "kill" may feel like otherwise.

done.

>
>
> [6]
>
> +// Destroy stops the ZooKeeper server, and then removes its run
> +// directory and all its contents.
>
> s/and all its contents././
>
> Removing the directory means removing its contents necessarily.

done.

> [7]
>
> +func (srv *Server) writeInstallDir() os.Error {
> +       return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)
>
> These functions and filename should be renamed in sync to the first point.

i'm not sure what you mean here.

> +       if data[len(data)-1] == '\n' {
> +               data = data[0 : len(data)-1]
> +       }
> +       srv.installDir = string(data)
>
> srv.zkDir = string(bytes.TrimSpace(data))

really? this code will have written the data inside the file, so there
should be no
extra space added. it's legal (ok it's pretty unusual) for a file name to
end in a space, and calling TrimSpace would break that.

25. By Roger Peppe

Added reattach_test.go to test server reattaching.
Service-related methods now get their own file.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

On 5 October 2011 15:49, Gustavo Niemeyer <email address hidden> wrote:
> [10]
>
>
> +// exist, it assumes the server is not running and returns (nil, nil).
>
> This interface is strange, and can easily lead to bugs since it's not an idiom in Go.
> I'd never assume p is nil if err == nil. Please return an error if p == nil.

i've changed this, but for reference, i've just noticed a similar example
in your code, in Exists:

 // We diverge a bit from the usual here: a ZNONODE is not an error
 // for an exists call, otherwise every Exists call would have to check
 // for err != nil and err.Code() != ZNONODE.

it's for a very similar reason that i returned a nil process and nil error from
Process when there was no process to be found.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (3.3 KiB)

Good changes overall, thanks. Some follow ups:

[7]

You've renamed installDir to zkDir. Let's rename functions and filenames
to match that as well (zkdir.txt, writeZkDir, etc)

[8]

Ok, but this logic is wrong because it doesn't check the length and could
panic.

Alright, so instead of doing either of those things, please just don't append
a '\n' at the end, so we can just string(data) straight. This is already
being done for pid.txt, and sounds like a sane approach.

[10]

Indeed, but that feels like a slightly different case because the method
is already a question (does it exist?), so the fact it doesn't exist
is an answer to that question rather than an error.

If the method was named something like Stat, it should definitely be an
error rather than simply an empty stat.

[11]

+// ServerCommand returns the command used to start the
+// ZooKeeper server. It is provided for debugging and testing
+// purposes only.
+func (srv *Server) command() ([]string, os.Error) {

s/ServerCommand/command/
s/It is provided.*//

[12]

+// NetworkPort returns the TCP port number that
+// the server is configured for.

s/NetworkPort/networkPort/

[13]

+// This file defines methods on Server that deal with starting
+// and stopping the ZooKeeper service. They are independent of ZooKeeper
+// itself, and may be factored out at a later date.

Please merge this on server.go. As pointed out in the other review and obvious
through out the code, this is really specific to ZooKeeper. If/when we do
refactor this out to generalize, we'll have to reorganize anyway.

[14]

+var ErrNotRunning = os.NewError("process not running")

I'm not a big fan of the Err* convention. Makes for an ugly name with typing
information embedded on it. Some packages do use it, but thankfully it's
also not strictly followed in the Go standard library either.

I'd appreciate renaming this to NotRunning instead. zk.NotRunning seems
pretty good.

[15]

+// Process returns a Process referring to the running server from
+// where it's been stored in pid.txt. If the file does not
+// exist, or it cannot find the process, it returns the error
+// ErrNotRunning.

Where the pid is stored is an implementation detail that can
change. We don't have to document that for now.

[16]

+// getProcess gets a Process from a pid and check that the

s/check/checks/

[17]

+ if e, ok := err.(*os.SyscallError); ok && e.Errno == os.ECHILD || err == os.ECHILD {

Ugh. I hope we get that cleaned in Go itself with the suggested changes for 1.

[18]

+ // is to poll until it exits. If the process has taken longer than
+ // a second to exit, then it's probably not going to.

This is very optimistic. A process can easily take more than a second to exit. If
this is an edge case and there's a reason to wait at all (dir content, etc), then
let's wait a bit longer, like 10 seconds for instance.

[19]

+// cases to test:
+// child server, stopped normally; reattach, start
+// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
+// non-direct child server, still running; reattach, start (->error), stop, start
+// child server, still running; reattach,...

Read more...

review: Needs Fixing
Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (3.3 KiB)

PTAL

On 7 October 2011 16:58, Gustavo Niemeyer <email address hidden> wrote:
> [7]
>
> You've renamed installDir to zkDir. Let's rename functions and filenames
> to match that as well (zkdir.txt, writeZkDir, etc)

done

>
> [8]
>
> Ok, but this logic is wrong because it doesn't check the length and could
> panic.
>
> Alright, so instead of doing either of those things, please just don't append
> a '\n' at the end, so we can just string(data) straight. This is already
> being done for pid.txt, and sounds like a sane approach.

done

> [11]
>
> +// ServerCommand returns the command used to start the
> +// ZooKeeper server. It is provided for debugging and testing
> +// purposes only.
> +func (srv *Server) command() ([]string, os.Error) {
>
> s/ServerCommand/command/
> s/It is provided.*//

given that it's not external any more, that "debugging and testing"
remark isn't necessary now.

> [12]
>
> +// NetworkPort returns the TCP port number that
> +// the server is configured for.
>
> s/NetworkPort/networkPort/
>
> [13]
>
> +// This file defines methods on Server that deal with starting
> +// and stopping the ZooKeeper service. They are independent of ZooKeeper
> +// itself, and may be factored out at a later date.
>
> Please merge this on server.go. As pointed out in the other review and obvious
> through out the code, this is really specific to ZooKeeper. If/when we do
> refactor this out to generalize, we'll have to reorganize anyway.

i can do this, but given that the dependencies of the two files
are mostly different (only fmt, ioutil and os in common), and there's
very little interlink between the two, i think it makes sense to keep
them as two different files - i think it makes the code easier to read.
perhaps "runserver.go" might be a better name than "service.go" though.
i've done that for the time being. that said, it's no big deal.

> [14]
>
> +var ErrNotRunning = os.NewError("process not running")
>
> I'm not a big fan of the Err* convention. Makes for an ugly name with typing
> information embedded on it. Some packages do use it, but thankfully it's
> also not strictly followed in the Go standard library either.
>
> I'd appreciate renaming this to NotRunning instead. zk.NotRunning seems
> pretty good.

ok.

>
> [15]
>
> +// Process returns a Process referring to the running server from
> +// where it's been stored in pid.txt. If the file does not
> +// exist, or it cannot find the process, it returns the error
> +// ErrNotRunning.
>
> Where the pid is stored is an implementation detail that can
> change. We don't have to document that for now.
>
> [16]
>
> +// getProcess gets a Process from a pid and check that the
>
> s/check/checks/

done

> [18]
>
> +               // is to poll until it exits. If the process has taken longer than
> +               // a second to exit, then it's probably not going to.
>
> This is very optimistic. A process can easily take more than a second to exit. If
> this is an edge case and there's a reason to wait at all (dir content, etc), then
> let's wait a bit longer, like 10 seconds for instance.

ok. given that we're killing the server with SIGKILL, it's not going to
have any chance to take ages to c...

Read more...

26. By Roger Peppe

Fixes as per review.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile'
2--- Makefile 2011-08-03 01:56:58 +0000
3+++ Makefile 2011-10-10 13:55:28 +0000
4@@ -2,10 +2,14 @@
5
6 all: package
7
8-TARG=gozk
9+TARG=launchpad.net/gozk/zk
10
11+GOFILES=\
12+ server.go\
13+ runserver.go\
14+
15 CGOFILES=\
16- gozk.go\
17+ zk.go\
18
19 CGO_OFILES=\
20 helpers.o\
21
22=== added directory 'example'
23=== renamed file 'example.go' => 'example/example.go'
24--- example.go 2011-08-03 01:47:25 +0000
25+++ example/example.go 2011-10-10 13:55:28 +0000
26@@ -1,30 +1,29 @@
27 package main
28
29 import (
30- "gozk"
31+ "launchpad.net/zookeeper/zookeeper"
32 )
33
34 func main() {
35- zk, session, err := gozk.Init("localhost:2181", 5000)
36- if err != nil {
37- println("Couldn't connect: " + err.String())
38- return
39- }
40-
41- defer zk.Close()
42-
43- // Wait for connection.
44- event := <-session
45- if event.State != gozk.STATE_CONNECTED {
46- println("Couldn't connect")
47- return
48- }
49-
50- _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))
51- if err != nil {
52- println(err.String())
53- } else {
54- println("Created!")
55- }
56+ zk, session, err := zookeeper.Init("localhost:2181", 5000)
57+ if err != nil {
58+ println("Couldn't connect: " + err.String())
59+ return
60+ }
61+
62+ defer zk.Close()
63+
64+ // Wait for connection.
65+ event := <-session
66+ if event.State != zookeeper.STATE_CONNECTED {
67+ println("Couldn't connect")
68+ return
69+ }
70+
71+ _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
72+ if err != nil {
73+ println(err.String())
74+ } else {
75+ println("Created!")
76+ }
77 }
78-
79
80=== added file 'reattach_test.go'
81--- reattach_test.go 1970-01-01 00:00:00 +0000
82+++ reattach_test.go 2011-10-10 13:55:28 +0000
83@@ -0,0 +1,198 @@
84+package zk_test
85+
86+import (
87+ "bufio"
88+ . "launchpad.net/gocheck"
89+ "launchpad.net/gozk/zk"
90+ "exec"
91+ "flag"
92+ "fmt"
93+ "os"
94+ "strings"
95+ "testing"
96+ "time"
97+)
98+
99+var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing")
100+var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing")
101+var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing")
102+
103+// This is the reentrancy point for testing ZooKeeper servers
104+// started by processes that are not direct children of the
105+// testing process. This test always succeeds - the status
106+// will be written to stdout and read by indirectServer.
107+func TestStartNonChildServer(t *testing.T) {
108+ if !*reattach {
109+ // not re-entrant, so ignore this test.
110+ return
111+ }
112+ err := startServer(*reattachRunDir, *reattachAbnormalStop)
113+ if err != nil {
114+ fmt.Printf("error:%v\n", err)
115+ return
116+ }
117+ fmt.Printf("done\n")
118+}
119+
120+func (s *S) startServer(c *C, abort bool) {
121+ err := startServer(s.zkTestRoot, abort)
122+ c.Assert(err, IsNil)
123+}
124+
125+// startServerIndirect starts a ZooKeeper server that is not
126+// a direct child of the current process. If abort is true,
127+// the server will be started and then terminated abnormally.
128+func (s *S) startServerIndirect(c *C, abort bool) {
129+ if len(os.Args) == 0 {
130+ c.Fatal("Cannot find self executable name")
131+ }
132+ cmd := exec.Command(
133+ os.Args[0],
134+ "-zktest.reattach",
135+ "-zktest.rundir", s.zkTestRoot,
136+ "-zktest.stop=", fmt.Sprint(abort),
137+ "-test.run", "StartNonChildServer",
138+ )
139+ r, err := cmd.StdoutPipe()
140+ c.Assert(err, IsNil)
141+ defer r.Close()
142+ if err := cmd.Start(); err != nil {
143+ c.Fatalf("cannot start re-entrant gotest process: %v", err)
144+ }
145+ defer cmd.Wait()
146+ bio := bufio.NewReader(r)
147+ for {
148+ line, err := bio.ReadSlice('\n')
149+ if err != nil {
150+ c.Fatalf("indirect server status line not found: %v", err)
151+ }
152+ s := string(line)
153+ if strings.HasPrefix(s, "error:") {
154+ c.Fatalf("indirect server error: %s", s[len("error:"):])
155+ }
156+ if s == "done\n" {
157+ return
158+ }
159+ }
160+ panic("not reached")
161+}
162+
163+// startServer starts a ZooKeeper server, and terminates it abnormally
164+// if abort is true.
165+func startServer(runDir string, abort bool) os.Error {
166+ srv, err := zk.AttachServer(runDir)
167+ if err != nil {
168+ return fmt.Errorf("cannot attach to server at %q: %v", runDir, err)
169+ }
170+ if err := srv.Start(); err != nil {
171+ return fmt.Errorf("cannot start server: %v", err)
172+ }
173+ if abort {
174+ // Give it time to start up, then kill the server process abnormally,
175+ // leaving the pid.txt file behind.
176+ time.Sleep(0.5e9)
177+ p, err := srv.Process()
178+ if err != nil {
179+ return fmt.Errorf("cannot get server process: %v", err)
180+ }
181+ defer p.Release()
182+ if err := p.Kill(); err != nil {
183+ return fmt.Errorf("cannot kill server process: %v", err)
184+ }
185+ }
186+ return nil
187+}
188+
189+func (s *S) checkCookie(c *C) {
190+ conn, watch, err := zk.Dial(s.zkAddr, 5e9)
191+ c.Assert(err, IsNil)
192+
193+ e, ok := <-watch
194+ c.Assert(ok, Equals, true)
195+ c.Assert(e.Ok(), Equals, true)
196+
197+ c.Assert(err, IsNil)
198+ cookie, _, err := conn.Get("/testAttachCookie")
199+ c.Assert(err, IsNil)
200+ c.Assert(cookie, Equals, "testAttachCookie")
201+ conn.Close()
202+}
203+
204+// cases to test:
205+// child server, stopped normally; reattach, start
206+// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
207+// non-direct child server, still running; reattach, start (->error), stop, start
208+// child server, still running; reattach, start (-> error)
209+// child server, still running; reattach, stop, start.
210+// non-direct child server, still running; reattach, stop, start.
211+func (s *S) TestAttachServer(c *C) {
212+ // Create a cookie so that we know we are reattaching to the same instance.
213+ conn, _ := s.init(c)
214+ _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL))
215+ c.Assert(err, IsNil)
216+ s.checkCookie(c)
217+ s.zkServer.Stop()
218+ s.zkServer = nil
219+
220+ s.testAttachServer(c, (*S).startServer)
221+ s.testAttachServer(c, (*S).startServerIndirect)
222+ s.testAttachServerAbnormalTerminate(c, (*S).startServer)
223+ s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect)
224+
225+ srv, err := zk.AttachServer(s.zkTestRoot)
226+ c.Assert(err, IsNil)
227+
228+ s.zkServer = srv
229+ err = s.zkServer.Start()
230+ c.Assert(err, IsNil)
231+
232+ conn, _ = s.init(c)
233+ err = conn.Delete("/testAttachCookie", -1)
234+ c.Assert(err, IsNil)
235+}
236+
237+func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) {
238+ start(s, c, false)
239+
240+ s.checkCookie(c)
241+
242+ // try attaching to it while it is still running - it should fail.
243+ srv, err := zk.AttachServer(s.zkTestRoot)
244+ c.Assert(err, IsNil)
245+
246+ err = srv.Start()
247+ c.Assert(err, NotNil)
248+
249+ // stop it and then start it again - it should succeed.
250+ err = srv.Stop()
251+ c.Assert(err, IsNil)
252+
253+ err = srv.Start()
254+ c.Assert(err, IsNil)
255+
256+ s.checkCookie(c)
257+
258+ err = srv.Stop()
259+ c.Assert(err, IsNil)
260+}
261+
262+func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) {
263+ start(s, c, true)
264+
265+ // try attaching to it and starting - it should fail, because pid.txt
266+ // won't have been removed.
267+ srv, err := zk.AttachServer(s.zkTestRoot)
268+ c.Assert(err, IsNil)
269+ err = srv.Start()
270+ c.Assert(err, NotNil)
271+
272+ // stopping it should bring things back to normal.
273+ err = srv.Stop()
274+ c.Assert(err, IsNil)
275+ err = srv.Start()
276+ c.Assert(err, IsNil)
277+
278+ s.checkCookie(c)
279+ err = srv.Stop()
280+ c.Assert(err, IsNil)
281+}
282
283=== modified file 'retry_test.go'
284--- retry_test.go 2011-08-19 01:51:37 +0000
285+++ retry_test.go 2011-10-10 13:55:28 +0000
286@@ -1,42 +1,41 @@
287-package gozk_test
288+package zk_test
289
290 import (
291 . "launchpad.net/gocheck"
292- "gozk"
293+ "launchpad.net/gozk/zk"
294 "os"
295 )
296
297 func (s *S) TestRetryChangeCreating(c *C) {
298- zk, _ := s.init(c)
299+ conn, _ := s.init(c)
300
301- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
302- func(data string, stat gozk.Stat) (string, os.Error) {
303+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
304+ func(data string, stat *zk.Stat) (string, os.Error) {
305 c.Assert(data, Equals, "")
306 c.Assert(stat, IsNil)
307 return "new", nil
308 })
309 c.Assert(err, IsNil)
310
311- data, stat, err := zk.Get("/test")
312+ data, stat, err := conn.Get("/test")
313 c.Assert(err, IsNil)
314 c.Assert(stat, NotNil)
315 c.Assert(stat.Version(), Equals, int32(0))
316 c.Assert(data, Equals, "new")
317
318- acl, _, err := zk.ACL("/test")
319+ acl, _, err := conn.ACL("/test")
320 c.Assert(err, IsNil)
321- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
322+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
323 }
324
325 func (s *S) TestRetryChangeSetting(c *C) {
326- zk, _ := s.init(c)
327+ conn, _ := s.init(c)
328
329- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
330- gozk.WorldACL(gozk.PERM_ALL))
331+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
332 c.Assert(err, IsNil)
333
334- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},
335- func(data string, stat gozk.Stat) (string, os.Error) {
336+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
337+ func(data string, stat *zk.Stat) (string, os.Error) {
338 c.Assert(data, Equals, "old")
339 c.Assert(stat, NotNil)
340 c.Assert(stat.Version(), Equals, int32(0))
341@@ -44,27 +43,26 @@
342 })
343 c.Assert(err, IsNil)
344
345- data, stat, err := zk.Get("/test")
346+ data, stat, err := conn.Get("/test")
347 c.Assert(err, IsNil)
348 c.Assert(stat, NotNil)
349 c.Assert(stat.Version(), Equals, int32(1))
350 c.Assert(data, Equals, "brand new")
351
352 // ACL was unchanged by RetryChange().
353- acl, _, err := zk.ACL("/test")
354+ acl, _, err := conn.ACL("/test")
355 c.Assert(err, IsNil)
356- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
357+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
358 }
359
360 func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
361- zk, _ := s.init(c)
362+ conn, _ := s.init(c)
363
364- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
365- gozk.WorldACL(gozk.PERM_ALL))
366+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
367 c.Assert(err, IsNil)
368
369- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},
370- func(data string, stat gozk.Stat) (string, os.Error) {
371+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
372+ func(data string, stat *zk.Stat) (string, os.Error) {
373 c.Assert(data, Equals, "old")
374 c.Assert(stat, NotNil)
375 c.Assert(stat.Version(), Equals, int32(0))
376@@ -72,7 +70,7 @@
377 })
378 c.Assert(err, IsNil)
379
380- data, stat, err := zk.Get("/test")
381+ data, stat, err := conn.Get("/test")
382 c.Assert(err, IsNil)
383 c.Assert(stat, NotNil)
384 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!
385@@ -80,14 +78,14 @@
386 }
387
388 func (s *S) TestRetryChangeConflictOnCreate(c *C) {
389- zk, _ := s.init(c)
390+ conn, _ := s.init(c)
391
392- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
393+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
394 switch data {
395 case "":
396 c.Assert(stat, IsNil)
397- _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,
398- gozk.WorldACL(gozk.PERM_ALL))
399+ _, err := conn.Create("/test", "conflict", zk.EPHEMERAL,
400+ zk.WorldACL(zk.PERM_ALL))
401 c.Assert(err, IsNil)
402 return "<none> => conflict", nil
403 case "conflict":
404@@ -100,11 +98,10 @@
405 return "can't happen", nil
406 }
407
408- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
409- changeFunc)
410+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc)
411 c.Assert(err, IsNil)
412
413- data, stat, err := zk.Get("/test")
414+ data, stat, err := conn.Get("/test")
415 c.Assert(err, IsNil)
416 c.Assert(data, Equals, "conflict => new")
417 c.Assert(stat, NotNil)
418@@ -112,18 +109,17 @@
419 }
420
421 func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
422- zk, _ := s.init(c)
423+ conn, _ := s.init(c)
424
425- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
426- gozk.WorldACL(gozk.PERM_ALL))
427+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
428 c.Assert(err, IsNil)
429
430- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
431+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
432 switch data {
433 case "old":
434 c.Assert(stat, NotNil)
435 c.Assert(stat.Version(), Equals, int32(0))
436- _, err := zk.Set("/test", "conflict", 0)
437+ _, err := conn.Set("/test", "conflict", 0)
438 c.Assert(err, IsNil)
439 return "old => new", nil
440 case "conflict":
441@@ -136,10 +132,10 @@
442 return "can't happen", nil
443 }
444
445- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)
446+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc)
447 c.Assert(err, IsNil)
448
449- data, stat, err := zk.Get("/test")
450+ data, stat, err := conn.Get("/test")
451 c.Assert(err, IsNil)
452 c.Assert(data, Equals, "conflict => new")
453 c.Assert(stat, NotNil)
454@@ -147,18 +143,17 @@
455 }
456
457 func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
458- zk, _ := s.init(c)
459+ conn, _ := s.init(c)
460
461- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
462- gozk.WorldACL(gozk.PERM_ALL))
463+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
464 c.Assert(err, IsNil)
465
466- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
467+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
468 switch data {
469 case "old":
470 c.Assert(stat, NotNil)
471 c.Assert(stat.Version(), Equals, int32(0))
472- err := zk.Delete("/test", 0)
473+ err := conn.Delete("/test", 0)
474 c.Assert(err, IsNil)
475 return "old => <deleted>", nil
476 case "":
477@@ -170,55 +165,53 @@
478 return "can't happen", nil
479 }
480
481- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),
482- changeFunc)
483+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc)
484 c.Assert(err, IsNil)
485
486- data, stat, err := zk.Get("/test")
487+ data, stat, err := conn.Get("/test")
488 c.Assert(err, IsNil)
489 c.Assert(data, Equals, "<deleted> => new")
490 c.Assert(stat, NotNil)
491 c.Assert(stat.Version(), Equals, int32(0))
492
493 // Should be the new ACL.
494- acl, _, err := zk.ACL("/test")
495+ acl, _, err := conn.ACL("/test")
496 c.Assert(err, IsNil)
497- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))
498+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
499 }
500
501 func (s *S) TestRetryChangeErrorInCallback(c *C) {
502- zk, _ := s.init(c)
503+ conn, _ := s.init(c)
504
505- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
506- func(data string, stat gozk.Stat) (string, os.Error) {
507+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
508+ func(data string, stat *zk.Stat) (string, os.Error) {
509 return "don't use this", os.NewError("BOOM!")
510 })
511 c.Assert(err, NotNil)
512- c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
513 c.Assert(err.String(), Equals, "BOOM!")
514
515- stat, err := zk.Exists("/test")
516+ stat, err := conn.Exists("/test")
517 c.Assert(err, IsNil)
518 c.Assert(stat, IsNil)
519 }
520
521 func (s *S) TestRetryChangeFailsReading(c *C) {
522- zk, _ := s.init(c)
523+ conn, _ := s.init(c)
524
525- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
526- gozk.WorldACL(gozk.PERM_WRITE)) // Write only!
527+ // Write only!
528+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE))
529 c.Assert(err, IsNil)
530
531 var called bool
532- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
533- func(data string, stat gozk.Stat) (string, os.Error) {
534+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
535+ func(data string, stat *zk.Stat) (string, os.Error) {
536 called = true
537 return "", nil
538 })
539 c.Assert(err, NotNil)
540- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
541+ c.Assert(err, Equals, zk.ZNOAUTH)
542
543- stat, err := zk.Exists("/test")
544+ stat, err := conn.Exists("/test")
545 c.Assert(err, IsNil)
546 c.Assert(stat, NotNil)
547 c.Assert(stat.Version(), Equals, int32(0))
548@@ -227,22 +220,21 @@
549 }
550
551 func (s *S) TestRetryChangeFailsSetting(c *C) {
552- zk, _ := s.init(c)
553+ conn, _ := s.init(c)
554
555- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
556- gozk.WorldACL(gozk.PERM_READ)) // Read only!
557+ // Read only!
558+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
559 c.Assert(err, IsNil)
560
561 var called bool
562- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
563- func(data string, stat gozk.Stat) (string, os.Error) {
564+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
565+ func(data string, stat *zk.Stat) (string, os.Error) {
566 called = true
567 return "", nil
568 })
569- c.Assert(err, NotNil)
570- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
571+ c.Assert(err, Equals, zk.ZNOAUTH)
572
573- stat, err := zk.Exists("/test")
574+ stat, err := conn.Exists("/test")
575 c.Assert(err, IsNil)
576 c.Assert(stat, NotNil)
577 c.Assert(stat.Version(), Equals, int32(0))
578@@ -251,23 +243,22 @@
579 }
580
581 func (s *S) TestRetryChangeFailsCreating(c *C) {
582- zk, _ := s.init(c)
583+ conn, _ := s.init(c)
584
585- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
586- gozk.WorldACL(gozk.PERM_READ)) // Read only!
587+ // Read only!
588+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
589 c.Assert(err, IsNil)
590
591 var called bool
592- err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,
593- gozk.WorldACL(gozk.PERM_ALL),
594- func(data string, stat gozk.Stat) (string, os.Error) {
595+ err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
596+ func(data string, stat *zk.Stat) (string, os.Error) {
597 called = true
598 return "", nil
599 })
600 c.Assert(err, NotNil)
601- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
602+ c.Assert(err, Equals, zk.ZNOAUTH)
603
604- stat, err := zk.Exists("/test/sub")
605+ stat, err := conn.Exists("/test/sub")
606 c.Assert(err, IsNil)
607 c.Assert(stat, IsNil)
608
609
610=== added file 'runserver.go'
611--- runserver.go 1970-01-01 00:00:00 +0000
612+++ runserver.go 2011-10-10 13:55:28 +0000
613@@ -0,0 +1,168 @@
614+package zk
615+
616+// This file defines methods on Server that deal with starting
617+// and stopping the ZooKeeper service. They are independent of ZooKeeper
618+// itself, and may be factored out at a later date.
619+
620+import (
621+ "exec"
622+ "fmt"
623+ "io/ioutil"
624+ "os"
625+ "strconv"
626+ "time"
627+)
628+
629+var NotRunning = os.NewError("process not running")
630+
631+// Process returns a Process referring to the running server from
632+// where it's been stored in pid.txt. If the file does not
633+// exist, or it cannot find the process, it returns the error
634+// NotRunning.
635+func (srv *Server) Process() (*os.Process, os.Error) {
636+ data, err := ioutil.ReadFile(srv.path("pid.txt"))
637+ if err != nil {
638+ if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT {
639+ return nil, NotRunning
640+ }
641+ return nil, err
642+ }
643+ pid, err := strconv.Atoi(string(data))
644+ if err != nil {
645+ return nil, os.NewError("bad process id found in pid.txt")
646+ }
647+ return getProcess(pid)
648+}
649+
650+// getProcess gets a Process from a pid and checks that the
651+// process is actually running. If the process
652+// is not running, then getProcess returns a nil
653+// Process and the error NotRunning.
654+func getProcess(pid int) (*os.Process, os.Error) {
655+ p, err := os.FindProcess(pid)
656+ if err != nil {
657+ return nil, err
658+ }
659+
660+ // try to check if the process is actually running by sending
661+ // it signal 0.
662+ err = p.Signal(os.UnixSignal(0))
663+ if err == nil {
664+ return p, nil
665+ }
666+ if err, ok := err.(os.Errno); ok && err == os.ESRCH {
667+ return nil, NotRunning
668+ }
669+ return nil, os.NewError("server running but inaccessible")
670+}
671+
672+// Start starts the ZooKeeper server.
673+// It returns an error if the server is already running.
674+func (srv *Server) Start() os.Error {
675+ if err := srv.checkAvailability(); err != nil {
676+ return err
677+ }
678+ p, err := srv.Process()
679+ if err == nil || err != NotRunning {
680+ if p != nil {
681+ p.Release()
682+ }
683+ return os.NewError("server is already running")
684+ }
685+
686+ if _, err := os.Stat(srv.path("pid.txt")); err == nil {
687+ // Thre pid.txt file still exists although server is not running.
688+ // Remove it so it can be re-created.
689+ // This leads to a race: if two processes are both
690+ // calling Start, one might remove the file the other
691+ // has just created, leading to a situation where
692+ // pid.txt describes the wrong server process.
693+ // We ignore that possibility for now.
694+ // TODO use syscall.Flock?
695+ if err := os.Remove(srv.path("pid.txt")); err != nil {
696+ return fmt.Errorf("cannot remove pid.txt: %v", err)
697+ }
698+ }
699+
700+ // Open the pid file before starting the process so that if we get two
701+ // programs trying to concurrently start a server on the same directory
702+ // at the same time, only one should succeed.
703+ pidf, err := os.OpenFile(srv.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666)
704+ if err != nil {
705+ return fmt.Errorf("cannot create pid.txt: %v", err)
706+ }
707+ defer pidf.Close()
708+ args, err := srv.command()
709+ if err != nil {
710+ return fmt.Errorf("cannot determine command: %v", err)
711+ }
712+ cmd := exec.Command(args[0], args[1:]...)
713+
714+ logf, err := os.OpenFile(srv.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
715+ if err != nil {
716+ return fmt.Errorf("cannot create log file: %v", err)
717+ }
718+ defer logf.Close()
719+ cmd.Stdout = logf
720+ cmd.Stderr = logf
721+ if err := cmd.Start(); err != nil {
722+ return fmt.Errorf("cannot start server: %v", err)
723+ }
724+ if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil {
725+ return fmt.Errorf("cannot write pid file: %v", err)
726+ }
727+ return nil
728+}
729+
730+// Stop kills the ZooKeeper server. It does nothing if it is not running.
731+// Note that Stop does not remove any data from the run directory,
732+// so Start may be called later on the same directory.
733+func (srv *Server) Stop() os.Error {
734+ p, err := srv.Process()
735+ if p == nil {
736+ if err != nil {
737+ return fmt.Errorf("cannot read process ID of server: %v", err)
738+ }
739+ return nil
740+ }
741+ defer p.Release()
742+ if err := p.Kill(); err != nil {
743+ return fmt.Errorf("cannot kill server process: %v", err)
744+ }
745+ // ignore the error returned from Wait because there's little
746+ // we can do about it - it either means that the process has just exited
747+ // anyway or that we can't wait for it for some other reason,
748+ // for example because it was originally started by some other process.
749+ _, err = p.Wait(0)
750+ if e, ok := err.(*os.SyscallError); ok && e.Errno == os.ECHILD || err == os.ECHILD {
751+ // If we can't wait for the server, it's possible that it was running
752+ // but not as a child of this process, so the only thing we can do
753+ // is to poll until it exits. If the process has taken longer than
754+ // a second to exit, then it's probably not going to.
755+ for i := 0; i < 5*4; i++ {
756+ time.Sleep(1e9 / 4)
757+ if np, err := getProcess(p.Pid); err != nil {
758+ break
759+ } else {
760+ np.Release()
761+ }
762+ }
763+ }
764+
765+ if err := os.Remove(srv.path("pid.txt")); err != nil {
766+ return fmt.Errorf("cannot remove server process ID file: %v", err)
767+ }
768+ return nil
769+}
770+
771+// Destroy stops the ZooKeeper server, and then removes its run
772+// directory. Warning: this will destroy all data associated with the server.
773+func (srv *Server) Destroy() os.Error {
774+ if err := srv.Stop(); err != nil {
775+ return err
776+ }
777+ if err := os.RemoveAll(srv.runDir); err != nil {
778+ return err
779+ }
780+ return nil
781+}
782
783=== added file 'server.go'
784--- server.go 1970-01-01 00:00:00 +0000
785+++ server.go 2011-10-10 13:55:28 +0000
786@@ -0,0 +1,229 @@
787+package zk
788+
789+import (
790+ "bufio"
791+ "bytes"
792+ "fmt"
793+ "io/ioutil"
794+ "net"
795+ "os"
796+ "path/filepath"
797+ "strings"
798+)
799+
800+// Server represents a ZooKeeper server, its data and configuration files.
801+type Server struct {
802+ runDir string
803+ zkDir string
804+}
805+
806+// CreateServer creates the directory runDir and sets up a ZooKeeper server
807+// environment inside it. It is an error if runDir already exists.
808+// The server will listen on the specified TCP port.
809+//
810+// The ZooKeeper installation directory is specified by zkDir.
811+// If this is empty, a system default will be used.
812+//
813+// CreateServer does not start the server.
814+func CreateServer(port int, runDir, zkDir string) (*Server, os.Error) {
815+ if err := os.Mkdir(runDir, 0777); err != nil {
816+ return nil, err
817+ }
818+ srv := &Server{runDir: runDir, zkDir: zkDir}
819+ if err := srv.writeLog4JConfig(); err != nil {
820+ return nil, err
821+ }
822+ if err := srv.writeZooKeeperConfig(port); err != nil {
823+ return nil, err
824+ }
825+ if err := srv.writeZkDir(); err != nil {
826+ return nil, err
827+ }
828+ return srv, nil
829+}
830+
831+// AttachServer creates a new ZooKeeper Server instance
832+// to operate inside an existing run directory, runDir.
833+// The directory must have been created with CreateServer.
834+func AttachServer(runDir string) (*Server, os.Error) {
835+ srv := &Server{runDir: runDir}
836+ if err := srv.readZkDir(); err != nil {
837+ return nil, fmt.Errorf("cannot read server install directory: %v", err)
838+ }
839+ return srv, nil
840+}
841+
842+func (srv *Server) checkAvailability() os.Error {
843+ port, err := srv.networkPort()
844+ if err != nil {
845+ return fmt.Errorf("cannot get network port: %v", err)
846+ }
847+ l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
848+ if err != nil {
849+ return fmt.Errorf("cannot listen on port %v: %v", port, err)
850+ }
851+ l.Close()
852+ return nil
853+}
854+
855+// networkPort returns the TCP port number that
856+// the server is configured for.
857+func (srv *Server) networkPort() (int, os.Error) {
858+ f, err := os.Open(srv.path("zoo.cfg"))
859+ if err != nil {
860+ return 0, err
861+ }
862+ r := bufio.NewReader(f)
863+ for {
864+ line, err := r.ReadSlice('\n')
865+ if err != nil {
866+ return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg"))
867+ }
868+ var port int
869+ if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 {
870+ return port, nil
871+ }
872+ }
873+ panic("not reached")
874+}
875+
876+// command returns the command used to start the
877+// ZooKeeper server.
878+func (srv *Server) command() ([]string, os.Error) {
879+ cp, err := srv.classPath()
880+ if err != nil {
881+ return nil, fmt.Errorf("cannot get class path: %v", err)
882+ }
883+ return []string{
884+ "java",
885+ "-cp", strings.Join(cp, ":"),
886+ "-Dzookeeper.root.logger=INFO,CONSOLE",
887+ "-Dlog4j.configuration=file:" + srv.path("log4j.properties"),
888+ "org.apache.zookeeper.server.quorum.QuorumPeerMain",
889+ srv.path("zoo.cfg"),
890+ }, nil
891+}
892+
893+var log4jProperties = `
894+log4j.rootLogger=INFO, CONSOLE
895+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
896+log4j.appender.CONSOLE.Threshold=INFO
897+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
898+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
899+`
900+
901+func (srv *Server) writeLog4JConfig() (err os.Error) {
902+ return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666)
903+}
904+
905+func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) {
906+ return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf(
907+ "tickTime=2000\n"+
908+ "dataDir=%s\n"+
909+ "clientPort=%d\n"+
910+ "maxClientCnxns=500\n",
911+ srv.runDir, port)), 0666)
912+}
913+
914+func (srv *Server) writeZkDir() os.Error {
915+ return ioutil.WriteFile(srv.path("zkdir.txt"), []byte(srv.zkDir), 0666)
916+}
917+
918+func (srv *Server) readZkDir() os.Error {
919+ data, err := ioutil.ReadFile(srv.path("zkdir.txt"))
920+ if err != nil {
921+ return err
922+ }
923+ srv.zkDir = string(data)
924+ return nil
925+}
926+
927+func (srv *Server) classPath() ([]string, os.Error) {
928+ dir := srv.zkDir
929+ if dir == "" {
930+ return systemClassPath()
931+ }
932+ if err := checkDirectory(dir); err != nil {
933+ return nil, err
934+ }
935+ // Two possibilities, as seen in zkEnv.sh:
936+ // 1) locally built binaries (jars are in build directory)
937+ // 2) release binaries
938+ if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
939+ dir = build
940+ }
941+ classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
942+ if err != nil {
943+ panic(fmt.Errorf("glob for jar files: %v", err))
944+ }
945+ more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
946+ if err != nil {
947+ panic(fmt.Errorf("glob for lib jar files: %v", err))
948+ }
949+
950+ classPath = append(classPath, more...)
951+ if len(classPath) == 0 {
952+ return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
953+ }
954+ return classPath, nil
955+}
956+
957+const zookeeperEnviron = "/etc/zookeeper/conf/environment"
958+
959+func systemClassPath() ([]string, os.Error) {
960+ f, err := os.Open(zookeeperEnviron)
961+ if f == nil {
962+ return nil, err
963+ }
964+ r := bufio.NewReader(f)
965+ for {
966+ line, err := r.ReadSlice('\n')
967+ if err != nil {
968+ break
969+ }
970+ if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
971+ continue
972+ }
973+
974+ // remove variable and newline
975+ path := string(line[len("CLASSPATH=") : len(line)-1])
976+
977+ // trim white space
978+ path = strings.Trim(path, " \t\r")
979+
980+ // strip quotes
981+ if path[0] == '"' {
982+ path = path[1 : len(path)-1]
983+ }
984+
985+ // split on :
986+ classPath := strings.Split(path, ":")
987+
988+ // split off $ZOOCFGDIR
989+ if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
990+ classPath = classPath[1:]
991+ }
992+
993+ if len(classPath) == 0 {
994+ return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
995+ }
996+ return classPath, nil
997+ }
998+ return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
999+}
1000+
1001+// checkDirectory returns an error if the given path
1002+// does not exist or is not a directory.
1003+func checkDirectory(path string) os.Error {
1004+ if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
1005+ if err == nil {
1006+ err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
1007+ }
1008+ return err
1009+ }
1010+ return nil
1011+}
1012+
1013+func (srv *Server) path(name string) string {
1014+ return filepath.Join(srv.runDir, name)
1015+}
1016
1017=== modified file 'suite_test.go'
1018--- suite_test.go 2011-08-19 01:43:37 +0000
1019+++ suite_test.go 2011-10-10 13:55:28 +0000
1020@@ -1,13 +1,11 @@
1021-package gozk_test
1022+package zk_test
1023
1024 import (
1025 . "launchpad.net/gocheck"
1026- "testing"
1027- "io/ioutil"
1028- "path"
1029 "fmt"
1030+ "launchpad.net/gozk/zk"
1031 "os"
1032- "gozk"
1033+ "testing"
1034 "time"
1035 )
1036
1037@@ -18,48 +16,32 @@
1038 var _ = Suite(&S{})
1039
1040 type S struct {
1041- zkRoot string
1042- zkTestRoot string
1043- zkTestPort int
1044- zkServerSh string
1045- zkServerOut *os.File
1046- zkAddr string
1047+ zkServer *zk.Server
1048+ zkTestRoot string
1049+ zkTestPort int
1050+ zkProcess *os.Process // The running ZooKeeper process
1051+ zkAddr string
1052
1053- handles []*gozk.ZooKeeper
1054- events []*gozk.Event
1055+ handles []*zk.Conn
1056+ events []*zk.Event
1057 liveWatches int
1058 deadWatches chan bool
1059 }
1060
1061-var logLevel = 0 //gozk.LOG_ERROR
1062-
1063-
1064-var testZooCfg = ("dataDir=%s\n" +
1065- "clientPort=%d\n" +
1066- "tickTime=2000\n" +
1067- "initLimit=10\n" +
1068- "syncLimit=5\n" +
1069- "")
1070-
1071-var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
1072- "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
1073- "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
1074- "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
1075- "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
1076- "")
1077-
1078-func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
1079- zk, watch, err := gozk.Init(s.zkAddr, 5e9)
1080+var logLevel = 0 //zk.LOG_ERROR
1081+
1082+func (s *S) init(c *C) (*zk.Conn, chan zk.Event) {
1083+ conn, watch, err := zk.Dial(s.zkAddr, 5e9)
1084 c.Assert(err, IsNil)
1085
1086- s.handles = append(s.handles, zk)
1087+ s.handles = append(s.handles, conn)
1088
1089 event := <-watch
1090
1091- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
1092- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
1093+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
1094+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
1095
1096- bufferedWatch := make(chan gozk.Event, 256)
1097+ bufferedWatch := make(chan zk.Event, 256)
1098 bufferedWatch <- event
1099
1100 s.liveWatches += 1
1101@@ -82,13 +64,13 @@
1102 s.deadWatches <- true
1103 }()
1104
1105- return zk, bufferedWatch
1106+ return conn, bufferedWatch
1107 }
1108
1109 func (s *S) SetUpTest(c *C) {
1110- c.Assert(gozk.CountPendingWatches(), Equals, 0,
1111+ c.Assert(zk.CountPendingWatches(), Equals, 0,
1112 Bug("Test got a dirty watch state before running!"))
1113- gozk.SetLogLevel(logLevel)
1114+ zk.SetLogLevel(logLevel)
1115 }
1116
1117 func (s *S) TearDownTest(c *C) {
1118@@ -108,98 +90,37 @@
1119 }
1120
1121 // Reset the list of handles.
1122- s.handles = make([]*gozk.ZooKeeper, 0)
1123+ s.handles = make([]*zk.Conn, 0)
1124
1125- c.Assert(gozk.CountPendingWatches(), Equals, 0,
1126+ c.Assert(zk.CountPendingWatches(), Equals, 0,
1127 Bug("Test left live watches behind!"))
1128 }
1129
1130-// We use the suite set up and tear down to manage a custom zookeeper
1131+// We use the suite set up and tear down to manage a custom ZooKeeper
1132 //
1133 func (s *S) SetUpSuite(c *C) {
1134-
1135+ var err os.Error
1136 s.deadWatches = make(chan bool)
1137
1138- var err os.Error
1139-
1140- s.zkRoot = os.Getenv("ZKROOT")
1141- if s.zkRoot == "" {
1142- panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")
1143- }
1144-
1145- s.zkTestRoot = c.MkDir()
1146- s.zkTestPort = 21812
1147-
1148- println("ZooKeeper test server directory:", s.zkTestRoot)
1149- println("ZooKeeper test server port:", s.zkTestPort)
1150-
1151- s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)
1152-
1153- s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")
1154- s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
1155- os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
1156- if err != nil {
1157- panic("Can't open stdout.txt file for server: " + err.String())
1158- }
1159-
1160- dataDir := path.Join(s.zkTestRoot, "data")
1161- confDir := path.Join(s.zkTestRoot, "conf")
1162-
1163- os.Mkdir(dataDir, 0755)
1164- os.Mkdir(confDir, 0755)
1165-
1166- err = os.Setenv("ZOOCFGDIR", confDir)
1167- if err != nil {
1168- panic("Can't set $ZOOCFGDIR: " + err.String())
1169- }
1170-
1171- zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
1172- err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
1173- if err != nil {
1174- panic("Can't write zoo.cfg: " + err.String())
1175- }
1176-
1177- log4jPrp := []byte(testLog4jPrp)
1178- err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
1179- if err != nil {
1180- panic("Can't write log4j.properties: " + err.String())
1181- }
1182-
1183- s.StartZK()
1184+ // N.B. We meed to create a subdirectory because zk.CreateServer
1185+ // insists on creating its own directory.
1186+
1187+ s.zkTestRoot = c.MkDir() + "/zk"
1188+ port := 21812
1189+ s.zkAddr = fmt.Sprint("localhost:", port)
1190+
1191+ s.zkServer, err = zk.CreateServer(port, s.zkTestRoot, "")
1192+ if err != nil {
1193+ c.Fatal("Cannot set up server environment: ", err)
1194+ }
1195+ err = s.zkServer.Start()
1196+ if err != nil {
1197+ c.Fatal("Cannot start ZooKeeper server: ", err)
1198+ }
1199 }
1200
1201 func (s *S) TearDownSuite(c *C) {
1202- s.StopZK()
1203- s.zkServerOut.Close()
1204-}
1205-
1206-func (s *S) StartZK() {
1207- attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
1208- proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)
1209- if err != nil {
1210- panic("Problem executing zkServer.sh start: " + err.String())
1211- }
1212-
1213- result, err := proc.Wait(0)
1214- if err != nil {
1215- panic(err.String())
1216- } else if result.ExitStatus() != 0 {
1217- panic("'zkServer.sh start' exited with non-zero status")
1218- }
1219-}
1220-
1221-func (s *S) StopZK() {
1222- attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
1223- proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)
1224- if err != nil {
1225- panic("Problem executing zkServer.sh stop: " + err.String() +
1226- " (look for runaway java processes!)")
1227- }
1228- result, err := proc.Wait(0)
1229- if err != nil {
1230- panic(err.String())
1231- } else if result.ExitStatus() != 0 {
1232- panic("'zkServer.sh stop' exited with non-zero status " +
1233- "(look for runaway java processes!)")
1234+ if s.zkServer != nil {
1235+ s.zkServer.Destroy()
1236 }
1237 }
1238
1239=== renamed file 'gozk.go' => 'zk.go'
1240--- gozk.go 2011-08-19 01:56:39 +0000
1241+++ zk.go 2011-10-10 13:55:28 +0000
1242@@ -1,4 +1,4 @@
1243-// gozk - Zookeeper support for the Go language
1244+// gozk - ZooKeeper support for the Go language
1245 //
1246 // https://wiki.ubuntu.com/gozk
1247 //
1248@@ -6,7 +6,7 @@
1249 //
1250 // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
1251 //
1252-package gozk
1253+package zk
1254
1255 /*
1256 #cgo CFLAGS: -I/usr/include/c-client-src
1257@@ -27,17 +27,16 @@
1258 // -----------------------------------------------------------------------
1259 // Main constants and data types.
1260
1261-// The main ZooKeeper object, created through the Init function.
1262-// Encapsulates all communication with ZooKeeper.
1263-type ZooKeeper struct {
1264+// Conn represents a connection to a set of ZooKeeper nodes.
1265+type Conn struct {
1266 watchChannels map[uintptr]chan Event
1267 sessionWatchId uintptr
1268 handle *C.zhandle_t
1269 mutex sync.Mutex
1270 }
1271
1272-// ClientId represents the established session in ZooKeeper. This is only
1273-// useful to be passed back into the ReInit function.
1274+// ClientId represents an established ZooKeeper session. It can be
1275+// passed into Redial to reestablish a connection to an existing session.
1276 type ClientId struct {
1277 cId C.clientid_t
1278 }
1279@@ -98,35 +97,60 @@
1280 State int
1281 }
1282
1283-// Error codes that may be used to verify the result of the
1284-// Code method from Error.
1285+// Error represents a ZooKeeper error.
1286+type Error int
1287+
1288 const (
1289- ZOK = C.ZOK
1290- ZSYSTEMERROR = C.ZSYSTEMERROR
1291- ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY
1292- ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY
1293- ZCONNECTIONLOSS = C.ZCONNECTIONLOSS
1294- ZMARSHALLINGERROR = C.ZMARSHALLINGERROR
1295- ZUNIMPLEMENTED = C.ZUNIMPLEMENTED
1296- ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT
1297- ZBADARGUMENTS = C.ZBADARGUMENTS
1298- ZINVALIDSTATE = C.ZINVALIDSTATE
1299- ZAPIERROR = C.ZAPIERROR
1300- ZNONODE = C.ZNONODE
1301- ZNOAUTH = C.ZNOAUTH
1302- ZBADVERSION = C.ZBADVERSION
1303- ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS
1304- ZNODEEXISTS = C.ZNODEEXISTS
1305- ZNOTEMPTY = C.ZNOTEMPTY
1306- ZSESSIONEXPIRED = C.ZSESSIONEXPIRED
1307- ZINVALIDCALLBACK = C.ZINVALIDCALLBACK
1308- ZINVALIDACL = C.ZINVALIDACL
1309- ZAUTHFAILED = C.ZAUTHFAILED
1310- ZCLOSING = C.ZCLOSING
1311- ZNOTHING = C.ZNOTHING
1312- ZSESSIONMOVED = C.ZSESSIONMOVED
1313+ ZOK Error = C.ZOK
1314+ ZSYSTEMERROR Error = C.ZSYSTEMERROR
1315+ ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
1316+ ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
1317+ ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
1318+ ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
1319+ ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
1320+ ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
1321+ ZBADARGUMENTS Error = C.ZBADARGUMENTS
1322+ ZINVALIDSTATE Error = C.ZINVALIDSTATE
1323+ ZAPIERROR Error = C.ZAPIERROR
1324+ ZNONODE Error = C.ZNONODE
1325+ ZNOAUTH Error = C.ZNOAUTH
1326+ ZBADVERSION Error = C.ZBADVERSION
1327+ ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
1328+ ZNODEEXISTS Error = C.ZNODEEXISTS
1329+ ZNOTEMPTY Error = C.ZNOTEMPTY
1330+ ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
1331+ ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
1332+ ZINVALIDACL Error = C.ZINVALIDACL
1333+ ZAUTHFAILED Error = C.ZAUTHFAILED
1334+ ZCLOSING Error = C.ZCLOSING
1335+ ZNOTHING Error = C.ZNOTHING
1336+ ZSESSIONMOVED Error = C.ZSESSIONMOVED
1337 )
1338
1339+func (error Error) String() string {
1340+ return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
1341+}
1342+
1343+// zkError creates an appropriate error return from
1344+// a ZooKeeper status and the errno return from a C API
1345+// call.
1346+func zkError(rc C.int, cerr os.Error) os.Error {
1347+ code := Error(rc)
1348+ switch code {
1349+ case ZOK:
1350+ return nil
1351+
1352+ case ZSYSTEMERROR:
1353+ // If a ZooKeeper call returns ZSYSTEMERROR, then
1354+ // errno becomes significant. If errno has not been
1355+ // set, then we will return ZSYSTEMERROR nonetheless.
1356+ if cerr != nil {
1357+ return cerr
1358+ }
1359+ }
1360+ return code
1361+}
1362+
1363 // Constants for SetLogLevel.
1364 const (
1365 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
1366@@ -273,104 +297,54 @@
1367 }
1368
1369 // -----------------------------------------------------------------------
1370-// Error interface which maps onto the ZooKeeper error codes.
1371-
1372-type Error interface {
1373- String() string
1374- Code() int
1375-}
1376-
1377-type errorType struct {
1378- zkrc C.int
1379- err os.Error
1380-}
1381-
1382-func newError(zkrc C.int, err os.Error) Error {
1383- return &errorType{zkrc, err}
1384-}
1385-
1386-func (error *errorType) String() (result string) {
1387- if error.zkrc == ZSYSTEMERROR && error.err != nil {
1388- result = error.err.String()
1389- } else {
1390- result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
1391- }
1392- return
1393-}
1394-
1395-// Code returns the error code that may be compared against one of
1396-// the gozk.Z* constants.
1397-func (error *errorType) Code() int {
1398- return int(error.zkrc)
1399-}
1400-
1401-// -----------------------------------------------------------------------
1402-// Stat interface which maps onto the ZooKeeper Stat struct.
1403-
1404-// We declare this as an interface rather than an actual struct because
1405-// this way we don't have to copy data around between the real C struct
1406-// and the Go one on every call. Most uses will only touch a few elements,
1407-// or even ignore the stat entirely, so that's a win.
1408
1409 // Stat contains detailed information about a node.
1410-type Stat interface {
1411- Czxid() int64
1412- Mzxid() int64
1413- CTime() int64
1414- MTime() int64
1415- Version() int32
1416- CVersion() int32
1417- AVersion() int32
1418- EphemeralOwner() int64
1419- DataLength() int32
1420- NumChildren() int32
1421- Pzxid() int64
1422-}
1423-
1424-type resultStat C.struct_Stat
1425-
1426-func (stat *resultStat) Czxid() int64 {
1427- return int64(stat.czxid)
1428-}
1429-
1430-func (stat *resultStat) Mzxid() int64 {
1431- return int64(stat.mzxid)
1432-}
1433-
1434-func (stat *resultStat) CTime() int64 {
1435- return int64(stat.ctime)
1436-}
1437-
1438-func (stat *resultStat) MTime() int64 {
1439- return int64(stat.mtime)
1440-}
1441-
1442-func (stat *resultStat) Version() int32 {
1443- return int32(stat.version)
1444-}
1445-
1446-func (stat *resultStat) CVersion() int32 {
1447- return int32(stat.cversion)
1448-}
1449-
1450-func (stat *resultStat) AVersion() int32 {
1451- return int32(stat.aversion)
1452-}
1453-
1454-func (stat *resultStat) EphemeralOwner() int64 {
1455- return int64(stat.ephemeralOwner)
1456-}
1457-
1458-func (stat *resultStat) DataLength() int32 {
1459- return int32(stat.dataLength)
1460-}
1461-
1462-func (stat *resultStat) NumChildren() int32 {
1463- return int32(stat.numChildren)
1464-}
1465-
1466-func (stat *resultStat) Pzxid() int64 {
1467- return int64(stat.pzxid)
1468+type Stat struct {
1469+ c C.struct_Stat
1470+}
1471+
1472+func (stat *Stat) Czxid() int64 {
1473+ return int64(stat.c.czxid)
1474+}
1475+
1476+func (stat *Stat) Mzxid() int64 {
1477+ return int64(stat.c.mzxid)
1478+}
1479+
1480+func (stat *Stat) CTime() int64 {
1481+ return int64(stat.c.ctime)
1482+}
1483+
1484+func (stat *Stat) MTime() int64 {
1485+ return int64(stat.c.mtime)
1486+}
1487+
1488+func (stat *Stat) Version() int32 {
1489+ return int32(stat.c.version)
1490+}
1491+
1492+func (stat *Stat) CVersion() int32 {
1493+ return int32(stat.c.cversion)
1494+}
1495+
1496+func (stat *Stat) AVersion() int32 {
1497+ return int32(stat.c.aversion)
1498+}
1499+
1500+func (stat *Stat) EphemeralOwner() int64 {
1501+ return int64(stat.c.ephemeralOwner)
1502+}
1503+
1504+func (stat *Stat) DataLength() int32 {
1505+ return int32(stat.c.dataLength)
1506+}
1507+
1508+func (stat *Stat) NumChildren() int32 {
1509+ return int32(stat.c.numChildren)
1510+}
1511+
1512+func (stat *Stat) Pzxid() int64 {
1513+ return int64(stat.c.pzxid)
1514 }
1515
1516 // -----------------------------------------------------------------------
1517@@ -384,7 +358,7 @@
1518 C.zoo_set_debug_level(C.ZooLogLevel(level))
1519 }
1520
1521-// Init initializes the communication with a ZooKeeper cluster. The provided
1522+// Dial initializes the communication with a ZooKeeper cluster. The provided
1523 // servers parameter may include multiple server addresses, separated
1524 // by commas, so that the client will automatically attempt to connect
1525 // to another server if one of them stops working for whatever reason.
1526@@ -398,79 +372,73 @@
1527 // The watch channel receives events of type SESSION_EVENT when any change
1528 // to the state of the established connection happens. See the documentation
1529 // for the Event type for more details.
1530-func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {
1531- zk, watch, err = internalInit(servers, recvTimeoutNS, nil)
1532- return
1533+func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) {
1534+ return dial(servers, recvTimeoutNS, nil)
1535 }
1536
1537-// Equivalent to Init, but attempt to reestablish an existing session
1538+// Redial is equivalent to Dial, but attempts to reestablish an existing session
1539 // identified via the clientId parameter.
1540-func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {
1541- zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)
1542- return
1543+func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
1544+ return dial(servers, recvTimeoutNS, clientId)
1545 }
1546
1547-func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {
1548-
1549- zk := &ZooKeeper{}
1550- zk.watchChannels = make(map[uintptr]chan Event)
1551+func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
1552+ conn := &Conn{}
1553+ conn.watchChannels = make(map[uintptr]chan Event)
1554
1555 var cId *C.clientid_t
1556 if clientId != nil {
1557 cId = &clientId.cId
1558 }
1559
1560- watchId, watchChannel := zk.createWatch(true)
1561- zk.sessionWatchId = watchId
1562+ watchId, watchChannel := conn.createWatch(true)
1563+ conn.sessionWatchId = watchId
1564
1565 cservers := C.CString(servers)
1566- handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)
1567+ handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
1568 C.free(unsafe.Pointer(cservers))
1569 if handle == nil {
1570- zk.closeAllWatches()
1571- return nil, nil, newError(ZSYSTEMERROR, cerr)
1572+ conn.closeAllWatches()
1573+ return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
1574 }
1575- zk.handle = handle
1576+ conn.handle = handle
1577 runWatchLoop()
1578- return zk, watchChannel, nil
1579+ return conn, watchChannel, nil
1580 }
1581
1582 // ClientId returns the client ID for the existing session with ZooKeeper.
1583 // This is useful to reestablish an existing session via ReInit.
1584-func (zk *ZooKeeper) ClientId() *ClientId {
1585- return &ClientId{*C.zoo_client_id(zk.handle)}
1586+func (conn *Conn) ClientId() *ClientId {
1587+ return &ClientId{*C.zoo_client_id(conn.handle)}
1588 }
1589
1590 // Close terminates the ZooKeeper interaction.
1591-func (zk *ZooKeeper) Close() Error {
1592-
1593- // Protect from concurrency around zk.handle change.
1594- zk.mutex.Lock()
1595- defer zk.mutex.Unlock()
1596-
1597- if zk.handle == nil {
1598+func (conn *Conn) Close() os.Error {
1599+
1600+ // Protect from concurrency around conn.handle change.
1601+ conn.mutex.Lock()
1602+ defer conn.mutex.Unlock()
1603+
1604+ if conn.handle == nil {
1605 // ZooKeeper may hang indefinitely if a handler is closed twice,
1606 // so we get in the way and prevent it from happening.
1607- return newError(ZCLOSING, nil)
1608+ return ZCLOSING
1609 }
1610- rc, cerr := C.zookeeper_close(zk.handle)
1611+ rc, cerr := C.zookeeper_close(conn.handle)
1612
1613- zk.closeAllWatches()
1614+ conn.closeAllWatches()
1615 stopWatchLoop()
1616
1617- // At this point, nothing else should need zk.handle.
1618- zk.handle = nil
1619+ // At this point, nothing else should need conn.handle.
1620+ conn.handle = nil
1621
1622- if rc != C.ZOK {
1623- return newError(rc, cerr)
1624- }
1625- return nil
1626+ return zkError(rc, cerr)
1627 }
1628
1629 // Get returns the data and status from an existing node. err will be nil,
1630 // unless an error is found. Attempting to retrieve data from a non-existing
1631 // node is an error.
1632-func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {
1633+func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
1634
1635 cpath := C.CString(path)
1636 cbuffer := (*C.char)(C.malloc(bufferSize))
1637@@ -478,22 +446,21 @@
1638 defer C.free(unsafe.Pointer(cpath))
1639 defer C.free(unsafe.Pointer(cbuffer))
1640
1641- cstat := C.struct_Stat{}
1642- rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,
1643- cbuffer, &cbufferLen, &cstat)
1644+ var cstat Stat
1645+ rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c)
1646 if rc != C.ZOK {
1647- return "", nil, newError(rc, cerr)
1648+ return "", nil, zkError(rc, cerr)
1649 }
1650+
1651 result := C.GoStringN(cbuffer, cbufferLen)
1652-
1653- return result, (*resultStat)(&cstat), nil
1654+ return result, &cstat, nil
1655 }
1656
1657 // GetW works like Get but also returns a channel that will receive
1658 // a single Event value when the data or existence of the given ZooKeeper
1659 // node changes or when critical session events happen. See the
1660 // documentation of the Event type for more details.
1661-func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {
1662+func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) {
1663
1664 cpath := C.CString(path)
1665 cbuffer := (*C.char)(C.malloc(bufferSize))
1666@@ -501,42 +468,38 @@
1667 defer C.free(unsafe.Pointer(cpath))
1668 defer C.free(unsafe.Pointer(cbuffer))
1669
1670- watchId, watchChannel := zk.createWatch(true)
1671+ watchId, watchChannel := conn.createWatch(true)
1672
1673- cstat := C.struct_Stat{}
1674- rc, cerr := C.zoo_wget(zk.handle, cpath,
1675- C.watch_handler, unsafe.Pointer(watchId),
1676- cbuffer, &cbufferLen, &cstat)
1677+ var cstat Stat
1678+ rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c)
1679 if rc != C.ZOK {
1680- zk.forgetWatch(watchId)
1681- return "", nil, nil, newError(rc, cerr)
1682+ conn.forgetWatch(watchId)
1683+ return "", nil, nil, zkError(rc, cerr)
1684 }
1685
1686 result := C.GoStringN(cbuffer, cbufferLen)
1687- return result, (*resultStat)(&cstat), watchChannel, nil
1688+ return result, &cstat, watchChannel, nil
1689 }
1690
1691 // Children returns the children list and status from an existing node.
1692-// err will be nil, unless an error is found. Attempting to retrieve the
1693-// children list from a non-existent node is an error.
1694-func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
1695+// Attempting to retrieve the children list from a non-existent node is an error.
1696+func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
1697
1698 cpath := C.CString(path)
1699 defer C.free(unsafe.Pointer(cpath))
1700
1701 cvector := C.struct_String_vector{}
1702- cstat := C.struct_Stat{}
1703- rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,
1704- &cvector, &cstat)
1705+ var cstat Stat
1706+ rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
1707
1708 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
1709 if cvector.count != 0 {
1710 children = parseStringVector(&cvector)
1711 }
1712- if rc != C.ZOK {
1713- err = newError(rc, cerr)
1714+ if rc == C.ZOK {
1715+ stat = &cstat
1716 } else {
1717- stat = (*resultStat)(&cstat)
1718+ err = zkError(rc, cerr)
1719 }
1720 return
1721 }
1722@@ -545,29 +508,27 @@
1723 // receive a single Event value when a node is added or removed under the
1724 // provided path or when critical session events happen. See the documentation
1725 // of the Event type for more details.
1726-func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {
1727+func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) {
1728
1729 cpath := C.CString(path)
1730 defer C.free(unsafe.Pointer(cpath))
1731
1732- watchId, watchChannel := zk.createWatch(true)
1733+ watchId, watchChannel := conn.createWatch(true)
1734
1735 cvector := C.struct_String_vector{}
1736- cstat := C.struct_Stat{}
1737- rc, cerr := C.zoo_wget_children2(zk.handle, cpath,
1738- C.watch_handler, unsafe.Pointer(watchId),
1739- &cvector, &cstat)
1740+ var cstat Stat
1741+ rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c)
1742
1743 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
1744 if cvector.count != 0 {
1745 children = parseStringVector(&cvector)
1746 }
1747- if rc != C.ZOK {
1748- zk.forgetWatch(watchId)
1749- err = newError(rc, cerr)
1750+ if rc == C.ZOK {
1751+ stat = &cstat
1752+ watch = watchChannel
1753 } else {
1754- stat = (*resultStat)(&cstat)
1755- watch = watchChannel
1756+ conn.forgetWatch(watchId)
1757+ err = zkError(rc, cerr)
1758 }
1759 return
1760 }
1761@@ -588,20 +549,20 @@
1762 // Exists checks if a node exists at the given path. If it does,
1763 // stat will contain meta information on the existing node, otherwise
1764 // it will be nil.
1765-func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {
1766+func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) {
1767 cpath := C.CString(path)
1768 defer C.free(unsafe.Pointer(cpath))
1769
1770- cstat := C.struct_Stat{}
1771- rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)
1772+ var cstat Stat
1773+ rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c)
1774
1775 // We diverge a bit from the usual here: a ZNONODE is not an error
1776 // for an exists call, otherwise every Exists call would have to check
1777 // for err != nil and err.Code() != ZNONODE.
1778 if rc == C.ZOK {
1779- stat = (*resultStat)(&cstat)
1780+ stat = &cstat
1781 } else if rc != C.ZNONODE {
1782- err = newError(rc, cerr)
1783+ err = zkError(rc, cerr)
1784 }
1785 return
1786 }
1787@@ -611,28 +572,27 @@
1788 // stat is nil and the node didn't exist, or when the existing node
1789 // is removed. It will also receive critical session events. See the
1790 // documentation of the Event type for more details.
1791-func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {
1792+func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) {
1793 cpath := C.CString(path)
1794 defer C.free(unsafe.Pointer(cpath))
1795
1796- watchId, watchChannel := zk.createWatch(true)
1797+ watchId, watchChannel := conn.createWatch(true)
1798
1799- cstat := C.struct_Stat{}
1800- rc, cerr := C.zoo_wexists(zk.handle, cpath,
1801- C.watch_handler, unsafe.Pointer(watchId), &cstat)
1802+ var cstat Stat
1803+ rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
1804
1805 // We diverge a bit from the usual here: a ZNONODE is not an error
1806 // for an exists call, otherwise every Exists call would have to check
1807 // for err != nil and err.Code() != ZNONODE.
1808- switch rc {
1809+ switch Error(rc) {
1810 case ZOK:
1811- stat = (*resultStat)(&cstat)
1812+ stat = &cstat
1813 watch = watchChannel
1814 case ZNONODE:
1815 watch = watchChannel
1816 default:
1817- zk.forgetWatch(watchId)
1818- err = newError(rc, cerr)
1819+ conn.forgetWatch(watchId)
1820+ err = zkError(rc, cerr)
1821 }
1822 return
1823 }
1824@@ -646,7 +606,7 @@
1825 // The returned path is useful in cases where the created path may differ
1826 // from the requested one, such as when a sequence number is appended
1827 // to it due to the use of the gozk.SEQUENCE flag.
1828-func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {
1829+func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
1830 cpath := C.CString(path)
1831 cvalue := C.CString(value)
1832 defer C.free(unsafe.Pointer(cpath))
1833@@ -660,13 +620,13 @@
1834 cpathCreated := (*C.char)(C.malloc(cpathLen))
1835 defer C.free(unsafe.Pointer(cpathCreated))
1836
1837- rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),
1838- caclv, C.int(flags), cpathCreated, C.int(cpathLen))
1839- if rc != C.ZOK {
1840- return "", newError(rc, cerr)
1841+ rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
1842+ if rc == C.ZOK {
1843+ pathCreated = C.GoString(cpathCreated)
1844+ } else {
1845+ err = zkError(rc, cerr)
1846 }
1847-
1848- return C.GoString(cpathCreated), nil
1849+ return
1850 }
1851
1852 // Set modifies the data for the existing node at the given path, replacing it
1853@@ -677,35 +637,31 @@
1854 //
1855 // It is an error to attempt to set the data of a non-existing node with
1856 // this function. In these cases, use Create instead.
1857-func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {
1858+func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
1859
1860 cpath := C.CString(path)
1861 cvalue := C.CString(value)
1862 defer C.free(unsafe.Pointer(cpath))
1863 defer C.free(unsafe.Pointer(cvalue))
1864
1865- cstat := C.struct_Stat{}
1866-
1867- rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),
1868- C.int(version), &cstat)
1869- if rc != C.ZOK {
1870- return nil, newError(rc, cerr)
1871+ var cstat Stat
1872+ rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c)
1873+ if rc == C.ZOK {
1874+ stat = &cstat
1875+ } else {
1876+ err = zkError(rc, cerr)
1877 }
1878-
1879- return (*resultStat)(&cstat), nil
1880+ return
1881 }
1882
1883 // Delete removes the node at path. If version is not -1, the operation
1884 // will only succeed if the node is still at this version when the
1885 // node is deleted as an atomic operation.
1886-func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {
1887+func (conn *Conn) Delete(path string, version int32) (err os.Error) {
1888 cpath := C.CString(path)
1889 defer C.free(unsafe.Pointer(cpath))
1890- rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))
1891- if rc != C.ZOK {
1892- return newError(rc, cerr)
1893- }
1894- return nil
1895+ rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
1896+ return zkError(rc, cerr)
1897 }
1898
1899 // AddAuth adds a new authentication certificate to the ZooKeeper
1900@@ -713,7 +669,7 @@
1901 // authentication information, while the cert parameter provides the
1902 // identity data itself. For instance, the "digest" scheme requires
1903 // a pair like "username:password" to be provided as the certificate.
1904-func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {
1905+func (conn *Conn) AddAuth(scheme, cert string) os.Error {
1906 cscheme := C.CString(scheme)
1907 ccert := C.CString(cert)
1908 defer C.free(unsafe.Pointer(cscheme))
1909@@ -725,43 +681,38 @@
1910 }
1911 defer C.destroy_completion_data(data)
1912
1913- rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),
1914- C.handle_void_completion, unsafe.Pointer(data))
1915+ rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data))
1916 if rc != C.ZOK {
1917- return newError(rc, cerr)
1918+ return zkError(rc, cerr)
1919 }
1920
1921 C.wait_for_completion(data)
1922
1923 rc = C.int(uintptr(data.data))
1924- if rc != C.ZOK {
1925- return newError(rc, nil)
1926- }
1927-
1928- return nil
1929+ return zkError(rc, nil)
1930 }
1931
1932 // ACL returns the access control list for path.
1933-func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {
1934+func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
1935
1936 cpath := C.CString(path)
1937 defer C.free(unsafe.Pointer(cpath))
1938
1939 caclv := C.struct_ACL_vector{}
1940- cstat := C.struct_Stat{}
1941
1942- rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)
1943+ var cstat Stat
1944+ rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
1945 if rc != C.ZOK {
1946- return nil, nil, newError(rc, cerr)
1947+ return nil, nil, zkError(rc, cerr)
1948 }
1949
1950 aclv := parseACLVector(&caclv)
1951
1952- return aclv, (*resultStat)(&cstat), nil
1953+ return aclv, &cstat, nil
1954 }
1955
1956 // SetACL changes the access control list for path.
1957-func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {
1958+func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
1959
1960 cpath := C.CString(path)
1961 defer C.free(unsafe.Pointer(cpath))
1962@@ -769,12 +720,8 @@
1963 caclv := buildACLVector(aclv)
1964 defer C.deallocate_ACL_vector(caclv)
1965
1966- rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)
1967- if rc != C.ZOK {
1968- return newError(rc, cerr)
1969- }
1970-
1971- return nil
1972+ rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
1973+ return zkError(rc, cerr)
1974 }
1975
1976 func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
1977@@ -822,7 +769,7 @@
1978 // -----------------------------------------------------------------------
1979 // RetryChange utility method.
1980
1981-type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)
1982+type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
1983
1984 // RetryChange runs changeFunc to attempt to atomically change path
1985 // in a lock free manner, and retries in case there was another
1986@@ -831,8 +778,7 @@
1987 // changeFunc must work correctly if called multiple times in case
1988 // the modification fails due to concurrent changes, and it may return
1989 // an error that will cause the the RetryChange function to stop and
1990-// return an Error with code ZSYSTEMERROR and the same .String() result
1991-// as the provided error.
1992+// return the same error.
1993 //
1994 // This mechanism is not suitable for a node that is frequently modified
1995 // concurrently. For those cases, consider using a pessimistic locking
1996@@ -845,8 +791,7 @@
1997 //
1998 // 2. Call the changeFunc with the current node value and stat,
1999 // or with an empty string and nil stat, if the node doesn't yet exist.
2000-// If the changeFunc returns an error, stop and return an Error with
2001-// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
2002+// If the changeFunc returns an error, stop and return the same error.
2003 //
2004 // 3. If the changeFunc returns no errors, use the string returned as
2005 // the new candidate value for the node, and attempt to either create
2006@@ -855,36 +800,32 @@
2007 // in the same node), repeat from step 1. If this procedure fails with any
2008 // other error, stop and return the error found.
2009 //
2010-func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {
2011+func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
2012 for {
2013- oldValue, oldStat, getErr := zk.Get(path)
2014- if getErr != nil && getErr.Code() != ZNONODE {
2015- err = getErr
2016- break
2017- }
2018- newValue, osErr := changeFunc(oldValue, oldStat)
2019- if osErr != nil {
2020- return newError(ZSYSTEMERROR, osErr)
2021- } else if oldStat == nil {
2022- _, err = zk.Create(path, newValue, flags, acl)
2023- if err == nil || err.Code() != ZNODEEXISTS {
2024- break
2025+ oldValue, oldStat, err := conn.Get(path)
2026+ if err != nil && err != ZNONODE {
2027+ return err
2028+ }
2029+ newValue, err := changeFunc(oldValue, oldStat)
2030+ if err != nil {
2031+ return err
2032+ }
2033+ if oldStat == nil {
2034+ _, err := conn.Create(path, newValue, flags, acl)
2035+ if err == nil || err != ZNODEEXISTS {
2036+ return err
2037 }
2038- } else if newValue == oldValue {
2039+ continue
2040+ }
2041+ if newValue == oldValue {
2042 return nil // Nothing to do.
2043- } else {
2044- _, err = zk.Set(path, newValue, oldStat.Version())
2045- if err == nil {
2046- break
2047- } else {
2048- code := err.Code()
2049- if code != ZBADVERSION && code != ZNONODE {
2050- break
2051- }
2052- }
2053+ }
2054+ _, err = conn.Set(path, newValue, oldStat.Version())
2055+ if err == nil || (err != ZBADVERSION && err != ZNONODE) {
2056+ return err
2057 }
2058 }
2059- return err
2060+ panic("not reached")
2061 }
2062
2063 // -----------------------------------------------------------------------
2064@@ -897,7 +838,7 @@
2065 // Whenever a *W method is called, it will return a channel which
2066 // outputs Event values. Internally, a map is used to maintain references
2067 // between an unique integer key (the watchId), and the event channel. The
2068-// watchId is then handed to the C zookeeper library as the watch context,
2069+// watchId is then handed to the C ZooKeeper library as the watch context,
2070 // so that we get it back when events happen. Using an integer key as the
2071 // watch context rather than a pointer is needed because there's no guarantee
2072 // that in the future the GC will not move objects around, and also because
2073@@ -910,13 +851,13 @@
2074 // Since Cgo doesn't allow calling back into Go, we actually fire a new
2075 // goroutine the very first time Init is called, and allow it to block
2076 // in a pthread condition variable within a C function. This condition
2077-// will only be notified once a zookeeper watch callback appends new
2078+// will only be notified once a ZooKeeper watch callback appends new
2079 // entries to the event list. When this happens, the C function returns
2080 // and we get back into Go land with the pointer to the watch data,
2081 // including the watchId and other event details such as type and path.
2082
2083 var watchMutex sync.Mutex
2084-var watchZooKeepers = make(map[uintptr]*ZooKeeper)
2085+var watchConns = make(map[uintptr]*Conn)
2086 var watchCounter uintptr
2087 var watchLoopCounter int
2088
2089@@ -925,14 +866,14 @@
2090 // mostly as a debugging and testing aid.
2091 func CountPendingWatches() int {
2092 watchMutex.Lock()
2093- count := len(watchZooKeepers)
2094+ count := len(watchConns)
2095 watchMutex.Unlock()
2096 return count
2097 }
2098
2099 // createWatch creates and registers a watch, returning the watch id
2100 // and channel.
2101-func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
2102+func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
2103 buf := 1 // session/watch event
2104 if session {
2105 buf = 32
2106@@ -942,8 +883,8 @@
2107 defer watchMutex.Unlock()
2108 watchId = watchCounter
2109 watchCounter += 1
2110- zk.watchChannels[watchId] = watchChannel
2111- watchZooKeepers[watchId] = zk
2112+ conn.watchChannels[watchId] = watchChannel
2113+ watchConns[watchId] = conn
2114 return
2115 }
2116
2117@@ -951,21 +892,21 @@
2118 // from ever getting delivered. It shouldn't be used if there's any
2119 // chance the watch channel is still visible and not closed, since
2120 // it might mean a goroutine would be blocked forever.
2121-func (zk *ZooKeeper) forgetWatch(watchId uintptr) {
2122+func (conn *Conn) forgetWatch(watchId uintptr) {
2123 watchMutex.Lock()
2124 defer watchMutex.Unlock()
2125- zk.watchChannels[watchId] = nil, false
2126- watchZooKeepers[watchId] = nil, false
2127+ conn.watchChannels[watchId] = nil, false
2128+ watchConns[watchId] = nil, false
2129 }
2130
2131-// closeAllWatches closes all watch channels for zk.
2132-func (zk *ZooKeeper) closeAllWatches() {
2133+// closeAllWatches closes all watch channels for conn.
2134+func (conn *Conn) closeAllWatches() {
2135 watchMutex.Lock()
2136 defer watchMutex.Unlock()
2137- for watchId, ch := range zk.watchChannels {
2138+ for watchId, ch := range conn.watchChannels {
2139 close(ch)
2140- zk.watchChannels[watchId] = nil, false
2141- watchZooKeepers[watchId] = nil, false
2142+ conn.watchChannels[watchId] = nil, false
2143+ watchConns[watchId] = nil, false
2144 }
2145 }
2146
2147@@ -978,11 +919,11 @@
2148 }
2149 watchMutex.Lock()
2150 defer watchMutex.Unlock()
2151- zk, ok := watchZooKeepers[watchId]
2152+ conn, ok := watchConns[watchId]
2153 if !ok {
2154 return
2155 }
2156- if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId {
2157+ if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
2158 switch event.State {
2159 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
2160 default:
2161@@ -990,7 +931,7 @@
2162 return
2163 }
2164 }
2165- ch := zk.watchChannels[watchId]
2166+ ch := conn.watchChannels[watchId]
2167 if ch == nil {
2168 return
2169 }
2170@@ -1002,15 +943,15 @@
2171 // straight to the buffer), and the application isn't paying
2172 // attention for long enough to have the buffer filled up.
2173 // Break down now rather than leaking forever.
2174- if watchId == zk.sessionWatchId {
2175+ if watchId == conn.sessionWatchId {
2176 panic("Session event channel buffer is full")
2177 } else {
2178 panic("Watch event channel buffer is full")
2179 }
2180 }
2181- if watchId != zk.sessionWatchId {
2182- zk.watchChannels[watchId] = nil, false
2183- watchZooKeepers[watchId] = nil, false
2184+ if watchId != conn.sessionWatchId {
2185+ conn.watchChannels[watchId] = nil, false
2186+ watchConns[watchId] = nil, false
2187 close(ch)
2188 }
2189 }
2190
2191=== renamed file 'gozk_test.go' => 'zk_test.go'
2192--- gozk_test.go 2011-08-19 01:51:37 +0000
2193+++ zk_test.go 2011-10-10 13:55:28 +0000
2194@@ -1,17 +1,17 @@
2195-package gozk_test
2196+package zk_test
2197
2198 import (
2199 . "launchpad.net/gocheck"
2200- "gozk"
2201+ "launchpad.net/gozk/zk"
2202 "time"
2203 )
2204
2205 // This error will be delivered via C errno, since ZK unfortunately
2206 // only provides the handler back from zookeeper_init().
2207 func (s *S) TestInitErrorThroughErrno(c *C) {
2208- zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)
2209- if zk != nil {
2210- zk.Close()
2211+ conn, watch, err := zk.Dial("bad-domain-without-port", 5e9)
2212+ if conn != nil {
2213+ conn.Close()
2214 }
2215 if watch != nil {
2216 go func() {
2217@@ -23,15 +23,15 @@
2218 }
2219 }()
2220 }
2221- c.Assert(zk, IsNil)
2222+ c.Assert(conn, IsNil)
2223 c.Assert(watch, IsNil)
2224 c.Assert(err, Matches, "invalid argument")
2225 }
2226
2227 func (s *S) TestRecvTimeoutInitParameter(c *C) {
2228- zk, watch, err := gozk.Init(s.zkAddr, 0)
2229+ conn, watch, err := zk.Dial(s.zkAddr, 0)
2230 c.Assert(err, IsNil)
2231- defer zk.Close()
2232+ defer conn.Close()
2233
2234 select {
2235 case <-watch:
2236@@ -40,7 +40,7 @@
2237 }
2238
2239 for i := 0; i != 1000; i++ {
2240- _, _, err := zk.Get("/zookeeper")
2241+ _, _, err := conn.Get("/zookeeper")
2242 if err != nil {
2243 c.Assert(err, Matches, "operation timeout")
2244 c.SucceedNow()
2245@@ -51,76 +51,79 @@
2246 }
2247
2248 func (s *S) TestSessionWatches(c *C) {
2249- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2250+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2251
2252 zk1, watch1 := s.init(c)
2253 zk2, watch2 := s.init(c)
2254 zk3, watch3 := s.init(c)
2255
2256- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2257+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2258
2259 event1 := <-watch1
2260- c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)
2261- c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)
2262+ c.Assert(event1.Type, Equals, zk.EVENT_SESSION)
2263+ c.Assert(event1.State, Equals, zk.STATE_CONNECTED)
2264
2265- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2266+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2267
2268 event2 := <-watch2
2269- c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)
2270- c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)
2271+ c.Assert(event2.Type, Equals, zk.EVENT_SESSION)
2272+ c.Assert(event2.State, Equals, zk.STATE_CONNECTED)
2273
2274- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2275+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2276
2277 event3 := <-watch3
2278- c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)
2279- c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)
2280+ c.Assert(event3.Type, Equals, zk.EVENT_SESSION)
2281+ c.Assert(event3.State, Equals, zk.STATE_CONNECTED)
2282
2283- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2284+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2285
2286 zk1.Close()
2287- c.Assert(gozk.CountPendingWatches(), Equals, 2)
2288+ c.Assert(zk.CountPendingWatches(), Equals, 2)
2289 zk2.Close()
2290- c.Assert(gozk.CountPendingWatches(), Equals, 1)
2291+ c.Assert(zk.CountPendingWatches(), Equals, 1)
2292 zk3.Close()
2293- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2294+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2295 }
2296
2297-// Gozk injects a STATE_CLOSED event when zk.Close() is called, right
2298+// Gozk injects a STATE_CLOSED event when conn.Close() is called, right
2299 // before the channel is closed. Closing the channel injects a nil
2300 // pointer, as usual for Go, so the STATE_CLOSED gives a chance to
2301 // know that a nil pointer is coming, and to stop the procedure.
2302 // Hopefully this procedure will avoid some nil-pointer references by
2303 // mistake.
2304 func (s *S) TestClosingStateInSessionWatch(c *C) {
2305- zk, watch := s.init(c)
2306+ conn, watch := s.init(c)
2307
2308 event := <-watch
2309- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
2310- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
2311+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
2312+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
2313
2314- zk.Close()
2315+ conn.Close()
2316 event, ok := <-watch
2317 c.Assert(ok, Equals, false)
2318- c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)
2319- c.Assert(event.State, Equals, gozk.STATE_CLOSED)
2320+ c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
2321+ c.Assert(event.State, Equals, zk.STATE_CLOSED)
2322 }
2323
2324 func (s *S) TestEventString(c *C) {
2325- var event gozk.Event
2326- event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}
2327+ var event zk.Event
2328+ event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED}
2329 c.Assert(event, Matches, "ZooKeeper connected")
2330- event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}
2331+ event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED}
2332 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")
2333- event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}
2334+ event = zk.Event{-1, "/path", zk.STATE_CLOSED}
2335 c.Assert(event, Matches, "ZooKeeper connection closed")
2336 }
2337
2338-var okTests = []struct{gozk.Event; Ok bool}{
2339- {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},
2340- {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},
2341- {gozk.Event{0, "", gozk.STATE_CLOSED}, false},
2342- {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},
2343- {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},
2344+var okTests = []struct {
2345+ zk.Event
2346+ Ok bool
2347+}{
2348+ {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true},
2349+ {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true},
2350+ {zk.Event{0, "", zk.STATE_CLOSED}, false},
2351+ {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false},
2352+ {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false},
2353 }
2354
2355 func (s *S) TestEventOk(c *C) {
2356@@ -130,9 +133,9 @@
2357 }
2358
2359 func (s *S) TestGetAndStat(c *C) {
2360- zk, _ := s.init(c)
2361+ conn, _ := s.init(c)
2362
2363- data, stat, err := zk.Get("/zookeeper")
2364+ data, stat, err := conn.Get("/zookeeper")
2365 c.Assert(err, IsNil)
2366 c.Assert(data, Equals, "")
2367 c.Assert(stat.Czxid(), Equals, int64(0))
2368@@ -149,58 +152,58 @@
2369 }
2370
2371 func (s *S) TestGetAndError(c *C) {
2372- zk, _ := s.init(c)
2373+ conn, _ := s.init(c)
2374
2375- data, stat, err := zk.Get("/non-existent")
2376+ data, stat, err := conn.Get("/non-existent")
2377
2378 c.Assert(data, Equals, "")
2379 c.Assert(stat, IsNil)
2380 c.Assert(err, Matches, "no node")
2381- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2382+ c.Assert(err, Equals, zk.ZNONODE)
2383 }
2384
2385 func (s *S) TestCreateAndGet(c *C) {
2386- zk, _ := s.init(c)
2387+ conn, _ := s.init(c)
2388
2389- path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2390+ path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2391 c.Assert(err, IsNil)
2392 c.Assert(path, Matches, "/test-[0-9]+")
2393
2394 // Check the error condition from Create().
2395- _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2396+ _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2397 c.Assert(err, Matches, "node exists")
2398
2399- data, _, err := zk.Get(path)
2400+ data, _, err := conn.Get(path)
2401 c.Assert(err, IsNil)
2402 c.Assert(data, Equals, "bababum")
2403 }
2404
2405 func (s *S) TestCreateSetAndGet(c *C) {
2406- zk, _ := s.init(c)
2407+ conn, _ := s.init(c)
2408
2409- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2410+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2411 c.Assert(err, IsNil)
2412
2413- stat, err := zk.Set("/test", "bababum", -1) // Any version.
2414+ stat, err := conn.Set("/test", "bababum", -1) // Any version.
2415 c.Assert(err, IsNil)
2416 c.Assert(stat.Version(), Equals, int32(1))
2417
2418- data, _, err := zk.Get("/test")
2419+ data, _, err := conn.Get("/test")
2420 c.Assert(err, IsNil)
2421 c.Assert(data, Equals, "bababum")
2422 }
2423
2424 func (s *S) TestGetAndWatch(c *C) {
2425- c.Check(gozk.CountPendingWatches(), Equals, 0)
2426-
2427- zk, _ := s.init(c)
2428-
2429- c.Check(gozk.CountPendingWatches(), Equals, 1)
2430-
2431- _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2432+ c.Check(zk.CountPendingWatches(), Equals, 0)
2433+
2434+ conn, _ := s.init(c)
2435+
2436+ c.Check(zk.CountPendingWatches(), Equals, 1)
2437+
2438+ _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2439 c.Assert(err, IsNil)
2440
2441- data, stat, watch, err := zk.GetW("/test")
2442+ data, stat, watch, err := conn.GetW("/test")
2443 c.Assert(err, IsNil)
2444 c.Assert(data, Equals, "one")
2445 c.Assert(stat.Version(), Equals, int32(0))
2446@@ -211,17 +214,17 @@
2447 default:
2448 }
2449
2450- c.Check(gozk.CountPendingWatches(), Equals, 2)
2451+ c.Check(zk.CountPendingWatches(), Equals, 2)
2452
2453- _, err = zk.Set("/test", "two", -1)
2454+ _, err = conn.Set("/test", "two", -1)
2455 c.Assert(err, IsNil)
2456
2457 event := <-watch
2458- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2459-
2460- c.Check(gozk.CountPendingWatches(), Equals, 1)
2461-
2462- data, _, watch, err = zk.GetW("/test")
2463+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2464+
2465+ c.Check(zk.CountPendingWatches(), Equals, 1)
2466+
2467+ data, _, watch, err = conn.GetW("/test")
2468 c.Assert(err, IsNil)
2469 c.Assert(data, Equals, "two")
2470
2471@@ -231,86 +234,86 @@
2472 default:
2473 }
2474
2475- c.Check(gozk.CountPendingWatches(), Equals, 2)
2476+ c.Check(zk.CountPendingWatches(), Equals, 2)
2477
2478- _, err = zk.Set("/test", "three", -1)
2479+ _, err = conn.Set("/test", "three", -1)
2480 c.Assert(err, IsNil)
2481
2482 event = <-watch
2483- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2484+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2485
2486- c.Check(gozk.CountPendingWatches(), Equals, 1)
2487+ c.Check(zk.CountPendingWatches(), Equals, 1)
2488 }
2489
2490 func (s *S) TestGetAndWatchWithError(c *C) {
2491- c.Check(gozk.CountPendingWatches(), Equals, 0)
2492-
2493- zk, _ := s.init(c)
2494-
2495- c.Check(gozk.CountPendingWatches(), Equals, 1)
2496-
2497- _, _, watch, err := zk.GetW("/test")
2498+ c.Check(zk.CountPendingWatches(), Equals, 0)
2499+
2500+ conn, _ := s.init(c)
2501+
2502+ c.Check(zk.CountPendingWatches(), Equals, 1)
2503+
2504+ _, _, watch, err := conn.GetW("/test")
2505 c.Assert(err, NotNil)
2506- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2507+ c.Assert(err, Equals, zk.ZNONODE)
2508 c.Assert(watch, IsNil)
2509
2510- c.Check(gozk.CountPendingWatches(), Equals, 1)
2511+ c.Check(zk.CountPendingWatches(), Equals, 1)
2512 }
2513
2514 func (s *S) TestCloseReleasesWatches(c *C) {
2515- c.Check(gozk.CountPendingWatches(), Equals, 0)
2516-
2517- zk, _ := s.init(c)
2518-
2519- c.Check(gozk.CountPendingWatches(), Equals, 1)
2520-
2521- _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2522- c.Assert(err, IsNil)
2523-
2524- _, _, _, err = zk.GetW("/test")
2525- c.Assert(err, IsNil)
2526-
2527- c.Assert(gozk.CountPendingWatches(), Equals, 2)
2528-
2529- zk.Close()
2530-
2531- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2532+ c.Check(zk.CountPendingWatches(), Equals, 0)
2533+
2534+ conn, _ := s.init(c)
2535+
2536+ c.Check(zk.CountPendingWatches(), Equals, 1)
2537+
2538+ _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2539+ c.Assert(err, IsNil)
2540+
2541+ _, _, _, err = conn.GetW("/test")
2542+ c.Assert(err, IsNil)
2543+
2544+ c.Assert(zk.CountPendingWatches(), Equals, 2)
2545+
2546+ conn.Close()
2547+
2548+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2549 }
2550
2551 // By default, the ZooKeeper C client will hang indefinitely if a
2552 // handler is closed twice. We get in the way and prevent it.
2553 func (s *S) TestClosingTwiceDoesntHang(c *C) {
2554- zk, _ := s.init(c)
2555- err := zk.Close()
2556+ conn, _ := s.init(c)
2557+ err := conn.Close()
2558 c.Assert(err, IsNil)
2559- err = zk.Close()
2560+ err = conn.Close()
2561 c.Assert(err, NotNil)
2562- c.Assert(err.Code(), Equals, gozk.ZCLOSING)
2563+ c.Assert(err, Equals, zk.ZCLOSING)
2564 }
2565
2566 func (s *S) TestChildren(c *C) {
2567- zk, _ := s.init(c)
2568+ conn, _ := s.init(c)
2569
2570- children, stat, err := zk.Children("/")
2571+ children, stat, err := conn.Children("/")
2572 c.Assert(err, IsNil)
2573 c.Assert(children, Equals, []string{"zookeeper"})
2574 c.Assert(stat.NumChildren(), Equals, int32(1))
2575
2576- children, stat, err = zk.Children("/non-existent")
2577+ children, stat, err = conn.Children("/non-existent")
2578 c.Assert(err, NotNil)
2579- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2580+ c.Assert(err, Equals, zk.ZNONODE)
2581 c.Assert(children, Equals, []string{})
2582- c.Assert(stat, Equals, nil)
2583+ c.Assert(stat, IsNil)
2584 }
2585
2586 func (s *S) TestChildrenAndWatch(c *C) {
2587- c.Check(gozk.CountPendingWatches(), Equals, 0)
2588-
2589- zk, _ := s.init(c)
2590-
2591- c.Check(gozk.CountPendingWatches(), Equals, 1)
2592-
2593- children, stat, watch, err := zk.ChildrenW("/")
2594+ c.Check(zk.CountPendingWatches(), Equals, 0)
2595+
2596+ conn, _ := s.init(c)
2597+
2598+ c.Check(zk.CountPendingWatches(), Equals, 1)
2599+
2600+ children, stat, watch, err := conn.ChildrenW("/")
2601 c.Assert(err, IsNil)
2602 c.Assert(children, Equals, []string{"zookeeper"})
2603 c.Assert(stat.NumChildren(), Equals, int32(1))
2604@@ -321,18 +324,18 @@
2605 default:
2606 }
2607
2608- c.Check(gozk.CountPendingWatches(), Equals, 2)
2609+ c.Check(zk.CountPendingWatches(), Equals, 2)
2610
2611- _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2612+ _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2613 c.Assert(err, IsNil)
2614
2615 event := <-watch
2616- c.Assert(event.Type, Equals, gozk.EVENT_CHILD)
2617+ c.Assert(event.Type, Equals, zk.EVENT_CHILD)
2618 c.Assert(event.Path, Equals, "/")
2619
2620- c.Check(gozk.CountPendingWatches(), Equals, 1)
2621+ c.Check(zk.CountPendingWatches(), Equals, 1)
2622
2623- children, stat, watch, err = zk.ChildrenW("/")
2624+ children, stat, watch, err = conn.ChildrenW("/")
2625 c.Assert(err, IsNil)
2626 c.Assert(stat.NumChildren(), Equals, int32(2))
2627
2628@@ -345,57 +348,56 @@
2629 default:
2630 }
2631
2632- c.Check(gozk.CountPendingWatches(), Equals, 2)
2633+ c.Check(zk.CountPendingWatches(), Equals, 2)
2634
2635- _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2636+ _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2637 c.Assert(err, IsNil)
2638
2639 event = <-watch
2640- c.Assert(event.Type, Equals, gozk.EVENT_CHILD)
2641+ c.Assert(event.Type, Equals, zk.EVENT_CHILD)
2642
2643- c.Check(gozk.CountPendingWatches(), Equals, 1)
2644+ c.Check(zk.CountPendingWatches(), Equals, 1)
2645 }
2646
2647 func (s *S) TestChildrenAndWatchWithError(c *C) {
2648- c.Check(gozk.CountPendingWatches(), Equals, 0)
2649-
2650- zk, _ := s.init(c)
2651-
2652- c.Check(gozk.CountPendingWatches(), Equals, 1)
2653-
2654- _, stat, watch, err := zk.ChildrenW("/test")
2655+ c.Check(zk.CountPendingWatches(), Equals, 0)
2656+
2657+ conn, _ := s.init(c)
2658+
2659+ c.Check(zk.CountPendingWatches(), Equals, 1)
2660+
2661+ _, stat, watch, err := conn.ChildrenW("/test")
2662 c.Assert(err, NotNil)
2663- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2664+ c.Assert(err, Equals, zk.ZNONODE)
2665 c.Assert(watch, IsNil)
2666 c.Assert(stat, IsNil)
2667
2668- c.Check(gozk.CountPendingWatches(), Equals, 1)
2669+ c.Check(zk.CountPendingWatches(), Equals, 1)
2670 }
2671
2672 func (s *S) TestExists(c *C) {
2673- zk, _ := s.init(c)
2674-
2675- stat, err := zk.Exists("/zookeeper")
2676- c.Assert(err, IsNil)
2677- c.Assert(stat.NumChildren(), Equals, int32(1))
2678-
2679- stat, err = zk.Exists("/non-existent")
2680+ conn, _ := s.init(c)
2681+
2682+ stat, err := conn.Exists("/non-existent")
2683 c.Assert(err, IsNil)
2684 c.Assert(stat, IsNil)
2685+
2686+ stat, err = conn.Exists("/zookeeper")
2687+ c.Assert(err, IsNil)
2688 }
2689
2690 func (s *S) TestExistsAndWatch(c *C) {
2691- c.Check(gozk.CountPendingWatches(), Equals, 0)
2692-
2693- zk, _ := s.init(c)
2694-
2695- c.Check(gozk.CountPendingWatches(), Equals, 1)
2696-
2697- stat, watch, err := zk.ExistsW("/test")
2698+ c.Check(zk.CountPendingWatches(), Equals, 0)
2699+
2700+ conn, _ := s.init(c)
2701+
2702+ c.Check(zk.CountPendingWatches(), Equals, 1)
2703+
2704+ stat, watch, err := conn.ExistsW("/test")
2705 c.Assert(err, IsNil)
2706 c.Assert(stat, IsNil)
2707
2708- c.Check(gozk.CountPendingWatches(), Equals, 2)
2709+ c.Check(zk.CountPendingWatches(), Equals, 2)
2710
2711 select {
2712 case <-watch:
2713@@ -403,62 +405,62 @@
2714 default:
2715 }
2716
2717- _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2718+ _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2719 c.Assert(err, IsNil)
2720
2721 event := <-watch
2722- c.Assert(event.Type, Equals, gozk.EVENT_CREATED)
2723+ c.Assert(event.Type, Equals, zk.EVENT_CREATED)
2724 c.Assert(event.Path, Equals, "/test")
2725
2726- c.Check(gozk.CountPendingWatches(), Equals, 1)
2727+ c.Check(zk.CountPendingWatches(), Equals, 1)
2728
2729- stat, watch, err = zk.ExistsW("/test")
2730+ stat, watch, err = conn.ExistsW("/test")
2731 c.Assert(err, IsNil)
2732 c.Assert(stat, NotNil)
2733 c.Assert(stat.NumChildren(), Equals, int32(0))
2734
2735- c.Check(gozk.CountPendingWatches(), Equals, 2)
2736+ c.Check(zk.CountPendingWatches(), Equals, 2)
2737 }
2738
2739 func (s *S) TestExistsAndWatchWithError(c *C) {
2740- c.Check(gozk.CountPendingWatches(), Equals, 0)
2741-
2742- zk, _ := s.init(c)
2743-
2744- c.Check(gozk.CountPendingWatches(), Equals, 1)
2745-
2746- stat, watch, err := zk.ExistsW("///")
2747+ c.Check(zk.CountPendingWatches(), Equals, 0)
2748+
2749+ conn, _ := s.init(c)
2750+
2751+ c.Check(zk.CountPendingWatches(), Equals, 1)
2752+
2753+ stat, watch, err := conn.ExistsW("///")
2754 c.Assert(err, NotNil)
2755- c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)
2756+ c.Assert(err, Equals, zk.ZBADARGUMENTS)
2757 c.Assert(stat, IsNil)
2758 c.Assert(watch, IsNil)
2759
2760- c.Check(gozk.CountPendingWatches(), Equals, 1)
2761+ c.Check(zk.CountPendingWatches(), Equals, 1)
2762 }
2763
2764 func (s *S) TestDelete(c *C) {
2765- zk, _ := s.init(c)
2766-
2767- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2768- c.Assert(err, IsNil)
2769-
2770- err = zk.Delete("/test", 5)
2771- c.Assert(err, NotNil)
2772- c.Assert(err.Code(), Equals, gozk.ZBADVERSION)
2773-
2774- err = zk.Delete("/test", -1)
2775- c.Assert(err, IsNil)
2776-
2777- err = zk.Delete("/test", -1)
2778- c.Assert(err, NotNil)
2779- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2780+ conn, _ := s.init(c)
2781+
2782+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2783+ c.Assert(err, IsNil)
2784+
2785+ err = conn.Delete("/test", 5)
2786+ c.Assert(err, NotNil)
2787+ c.Assert(err, Equals, zk.ZBADVERSION)
2788+
2789+ err = conn.Delete("/test", -1)
2790+ c.Assert(err, IsNil)
2791+
2792+ err = conn.Delete("/test", -1)
2793+ c.Assert(err, NotNil)
2794+ c.Assert(err, Equals, zk.ZNONODE)
2795 }
2796
2797 func (s *S) TestClientIdAndReInit(c *C) {
2798 zk1, _ := s.init(c)
2799 clientId1 := zk1.ClientId()
2800
2801- zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)
2802+ zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1)
2803 c.Assert(err, IsNil)
2804 defer zk2.Close()
2805 clientId2 := zk2.ClientId()
2806@@ -469,110 +471,110 @@
2807 // Surprisingly for some (including myself, initially), the watch
2808 // returned by the exists method actually fires on data changes too.
2809 func (s *S) TestExistsWatchOnDataChange(c *C) {
2810- zk, _ := s.init(c)
2811-
2812- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2813- c.Assert(err, IsNil)
2814-
2815- _, watch, err := zk.ExistsW("/test")
2816- c.Assert(err, IsNil)
2817-
2818- _, err = zk.Set("/test", "new", -1)
2819+ conn, _ := s.init(c)
2820+
2821+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2822+ c.Assert(err, IsNil)
2823+
2824+ _, watch, err := conn.ExistsW("/test")
2825+ c.Assert(err, IsNil)
2826+
2827+ _, err = conn.Set("/test", "new", -1)
2828 c.Assert(err, IsNil)
2829
2830 event := <-watch
2831
2832 c.Assert(event.Path, Equals, "/test")
2833- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2834+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2835 }
2836
2837 func (s *S) TestACL(c *C) {
2838- zk, _ := s.init(c)
2839-
2840- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2841- c.Assert(err, IsNil)
2842-
2843- acl, stat, err := zk.ACL("/test")
2844- c.Assert(err, IsNil)
2845- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
2846+ conn, _ := s.init(c)
2847+
2848+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2849+ c.Assert(err, IsNil)
2850+
2851+ acl, stat, err := conn.ACL("/test")
2852+ c.Assert(err, IsNil)
2853+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
2854 c.Assert(stat, NotNil)
2855 c.Assert(stat.Version(), Equals, int32(0))
2856
2857- acl, stat, err = zk.ACL("/non-existent")
2858+ acl, stat, err = conn.ACL("/non-existent")
2859 c.Assert(err, NotNil)
2860- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2861+ c.Assert(err, Equals, zk.ZNONODE)
2862 c.Assert(acl, IsNil)
2863 c.Assert(stat, IsNil)
2864 }
2865
2866 func (s *S) TestSetACL(c *C) {
2867- zk, _ := s.init(c)
2868+ conn, _ := s.init(c)
2869
2870- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2871+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2872 c.Assert(err, IsNil)
2873
2874- err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)
2875+ err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5)
2876 c.Assert(err, NotNil)
2877- c.Assert(err.Code(), Equals, gozk.ZBADVERSION)
2878-
2879- err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)
2880- c.Assert(err, IsNil)
2881-
2882- acl, _, err := zk.ACL("/test")
2883- c.Assert(err, IsNil)
2884- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))
2885+ c.Assert(err, Equals, zk.ZBADVERSION)
2886+
2887+ err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1)
2888+ c.Assert(err, IsNil)
2889+
2890+ acl, _, err := conn.ACL("/test")
2891+ c.Assert(err, IsNil)
2892+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
2893 }
2894
2895 func (s *S) TestAddAuth(c *C) {
2896- zk, _ := s.init(c)
2897-
2898- acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
2899-
2900- _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)
2901+ conn, _ := s.init(c)
2902+
2903+ acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
2904+
2905+ _, err := conn.Create("/test", "", zk.EPHEMERAL, acl)
2906 c.Assert(err, IsNil)
2907
2908- _, _, err = zk.Get("/test")
2909+ _, _, err = conn.Get("/test")
2910 c.Assert(err, NotNil)
2911- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
2912+ c.Assert(err, Equals, zk.ZNOAUTH)
2913
2914- err = zk.AddAuth("digest", "joe:passwd")
2915+ err = conn.AddAuth("digest", "joe:passwd")
2916 c.Assert(err, IsNil)
2917
2918- _, _, err = zk.Get("/test")
2919+ _, _, err = conn.Get("/test")
2920 c.Assert(err, IsNil)
2921 }
2922
2923 func (s *S) TestWatchOnReconnection(c *C) {
2924- c.Check(gozk.CountPendingWatches(), Equals, 0)
2925+ c.Check(zk.CountPendingWatches(), Equals, 0)
2926
2927- zk, session := s.init(c)
2928+ conn, session := s.init(c)
2929
2930 event := <-session
2931- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
2932- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
2933-
2934- c.Check(gozk.CountPendingWatches(), Equals, 1)
2935-
2936- stat, watch, err := zk.ExistsW("/test")
2937+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
2938+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
2939+
2940+ c.Check(zk.CountPendingWatches(), Equals, 1)
2941+
2942+ stat, watch, err := conn.ExistsW("/test")
2943 c.Assert(err, IsNil)
2944 c.Assert(stat, IsNil)
2945
2946- c.Check(gozk.CountPendingWatches(), Equals, 2)
2947+ c.Check(zk.CountPendingWatches(), Equals, 2)
2948
2949- s.StopZK()
2950+ s.zkServer.Stop()
2951 time.Sleep(2e9)
2952- s.StartZK()
2953+ s.zkServer.Start()
2954
2955- // The session channel should receive the reconnection notification,
2956+ // The session channel should receive the reconnection notification.
2957 select {
2958 case event := <-session:
2959- c.Assert(event.State, Equals, gozk.STATE_CONNECTING)
2960+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
2961 case <-time.After(3e9):
2962 c.Fatal("Session watch didn't fire")
2963 }
2964 select {
2965 case event := <-session:
2966- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
2967+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
2968 case <-time.After(3e9):
2969 c.Fatal("Session watch didn't fire")
2970 }
2971@@ -585,40 +587,40 @@
2972 }
2973
2974 // And it should still work.
2975- _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2976+ _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2977 c.Assert(err, IsNil)
2978
2979 event = <-watch
2980- c.Assert(event.Type, Equals, gozk.EVENT_CREATED)
2981+ c.Assert(event.Type, Equals, zk.EVENT_CREATED)
2982 c.Assert(event.Path, Equals, "/test")
2983
2984- c.Check(gozk.CountPendingWatches(), Equals, 1)
2985+ c.Check(zk.CountPendingWatches(), Equals, 1)
2986 }
2987
2988 func (s *S) TestWatchOnSessionExpiration(c *C) {
2989- c.Check(gozk.CountPendingWatches(), Equals, 0)
2990+ c.Check(zk.CountPendingWatches(), Equals, 0)
2991
2992- zk, session := s.init(c)
2993+ conn, session := s.init(c)
2994
2995 event := <-session
2996- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
2997- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
2998-
2999- c.Check(gozk.CountPendingWatches(), Equals, 1)
3000-
3001- stat, watch, err := zk.ExistsW("/test")
3002+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
3003+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
3004+
3005+ c.Check(zk.CountPendingWatches(), Equals, 1)
3006+
3007+ stat, watch, err := conn.ExistsW("/test")
3008 c.Assert(err, IsNil)
3009 c.Assert(stat, IsNil)
3010
3011- c.Check(gozk.CountPendingWatches(), Equals, 2)
3012+ c.Check(zk.CountPendingWatches(), Equals, 2)
3013
3014 // Use expiration trick described in the FAQ.
3015- clientId := zk.ClientId()
3016- zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)
3017+ clientId := conn.ClientId()
3018+ zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId)
3019
3020 for event := range session2 {
3021 c.Log("Event from overlapping session: ", event)
3022- if event.State == gozk.STATE_CONNECTED {
3023+ if event.State == zk.STATE_CONNECTED {
3024 // Wait for zk to process the connection.
3025 // Not reliable without this. :-(
3026 time.Sleep(1e9)
3027@@ -627,21 +629,21 @@
3028 }
3029 for event := range session {
3030 c.Log("Event from primary session: ", event)
3031- if event.State == gozk.STATE_EXPIRED_SESSION {
3032+ if event.State == zk.STATE_EXPIRED_SESSION {
3033 break
3034 }
3035 }
3036
3037 select {
3038 case event := <-watch:
3039- c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)
3040+ c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
3041 case <-time.After(3e9):
3042 c.Fatal("Watch event didn't fire")
3043 }
3044
3045 event = <-watch
3046- c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)
3047- c.Assert(event.State, Equals, gozk.STATE_CLOSED)
3048+ c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
3049+ c.Assert(event.State, Equals, zk.STATE_CLOSED)
3050
3051- c.Check(gozk.CountPendingWatches(), Equals, 1)
3052+ c.Check(zk.CountPendingWatches(), Equals, 1)
3053 }

Subscribers

People subscribed via source and target branches

to all changes: