Merge lp:~rogpeppe/gozk/factor-out-service into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Rejected
Rejected by: Gustavo Niemeyer
Proposed branch: lp:~rogpeppe/gozk/factor-out-service
Merge into: lp:~juju/gozk/trunk
Diff against target: 3109 lines (+1281/-784)
10 files modified
Makefile (+5/-2)
example/example.go (+22/-23)
reattach_test.go (+202/-0)
retry_test.go (+65/-74)
server.go (+239/-0)
service/Makefile (+20/-0)
service/service.go (+160/-0)
suite_test.go (+45/-122)
zk.go (+265/-311)
zk_test.go (+258/-252)
To merge this branch: bzr merge lp:~rogpeppe/gozk/factor-out-service
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Disapprove
Review via email: mp+77960@code.launchpad.net

Description of the change

Factor out server starting code into its own package.

It was orthogonal to the ZooKeeper server code,
feels like it might gain complexity over time,
and it potentially re-usable for other servers.
gozk/service should not be the final name,
as it's entirely independent of zk functionality.

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (3.5 KiB)

This is an interesting idea, but at the same time it's full of problems and
not really necessary right now. I suggest we delay it until a future point
when we actually need that abstraction, as we'll be in a better position
to generalize it. At that point, we can even preserve for the internal
implementation of the ZooKeeper server itself, without many problems.

Here is some feedback, regardless, which I suggest you don't work on
right now besides for [1] which should be integrated.

[1]

=== added file 'reattach_test.go'
--- reattach_test.go 1970-01-01 00:00:00 +0000
+++ reattach_test.go 2011-10-05 14:37:11 +0000

This should be in a separate merge proposal as it's important and unrelated
to the strawman changes proposed.

[2]

+type Interface interface {
+ // Directory gives the path to the directory containing
+ // the server's configuration and data files. It is assumed that
+ // this exists and can be modified.
+ Directory() string

s/Directory/Dir/

[3]

+
+ // CheckAvailability should return an error if resources
+ // required by the server are not available; for instance
+ // if the server requires a network port which is not free.
+ CheckAvailability() os.Error

We don't need this. The Command function can return an error if
it knows it cannot start.

[4]

+ l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
+ if err != nil {
+ return fmt.Errorf("cannot listen on port %v: %v", port, err)
+ }
+ l.Close()

This doesn't really guarantee that when the service itself is started,
the port will be available, so we can as well not do it.

[5]

+// Process returns a reference to the identity
+// of the last running server in the Service's directory.
+// If the server was shut down, it returns (nil, nil).

This shouldn't be in a generic service API. What if there
are multiple processes handled by the service?

[6]

+// If the server was shut down, it returns (nil, nil).

If we're returning a nil *Process, we necessarily need err != nil.

[7]

+// It stores the process ID of the server in the file "pid.txt"
+// inside the server's directory. Stdout and stderr of the server
+// are similarly written to "log.txt".

That's not nice. Who owns the content of this directory, the
interface implementation, or the Service? We can't write
arbitrary content into a directory of different ownership.

Also, a generic interface would need to do better in terms
of logging. We could have a file somewhere named output.txt, maybe,
and let logging with the underlying implementation.

[8]

+ if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") {
+ return fmt.Errorf("cannot kill server process: %v", err)
+ }

Different services are stopped in different ways. Some prefer a SIGTERM,
before a SIGKILL, for instance. And what happens if there are multiple
processes?

[9]

+ if err != nil && strings.Contains(err.String(), "no child processes") {
+ // If we can't wait for the server, it's possible that it was running
+ // but not as a child of this process, so give it a little while
+ // to exit. TODO po...

Read more...

review: Disapprove
Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (6.2 KiB)

On 5 October 2011 16:32, Gustavo Niemeyer <email address hidden> wrote:
> Review: Disapprove
>
> This is an interesting idea, but at the same time it's full of problems and
> not really necessary right now.  I suggest we delay it until a future point
> when we actually need that abstraction, as we'll be in a better position
> to generalize it. At that point, we can even preserve for the internal
> implementation of the ZooKeeper server itself, without many problems.

ok, i'll do that if you don't mind.

the thing that was mostly bothering me was the mingling of the code
from Server and Service, and that the complexities of Service were
obscuring the essential simplicity of Server.

BTW many of the comments below are applicable even without
the factored-out service package.

> Here is some feedback, regardless, which I suggest you don't work on
> right now besides for [1] which should be integrated.
>
> [1]
>
> === added file 'reattach_test.go'
> --- reattach_test.go    1970-01-01 00:00:00 +0000
> +++ reattach_test.go    2011-10-05 14:37:11 +0000
>
> This should be in a separate merge proposal as it's important and unrelated
> to the strawman changes proposed.

oh, i thought it was in the earlier merge.

i've added to the update-server-interface, if that's ok.

> +type Interface interface {
> +       // Directory gives the path to the directory containing
> +       // the server's configuration and data files. It is assumed that
> +       // this exists and can be modified.
> +       Directory() string
>
> s/Directory/Dir/

actually maybe RunDir

> [3]
>
> +
> +       // CheckAvailability should return an error if resources
> +       // required by the server are not available; for instance
> +       // if the server requires a network port which is not free.
> +       CheckAvailability() os.Error
>
> We don't need this.  The Command function can return an error if
> it knows it cannot start.

perhaps. i preferred to keep the availability checking separate
so that calling Command didn't imply that the service was
going to be started immediately.

> [4]
>
> +       l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
> +       if err != nil {
> +               return fmt.Errorf("cannot listen on port %v: %v", port, err)
> +       }
> +       l.Close()
>
> This doesn't really guarantee that when the service itself is started,
> the port will be available, so we can as well not do it.

it doesn't guarantee it, but it does mean that we can return
an early error rather than relying on the server to print
the error somewhere at some unspecified later time.
i found it very useful when testing.

i wonder if the Start should print stdout and stderr from
the server for the first half second or so after it's been started.
that's a possible solution, but makes things slower.

> [5]
>
> +// Process returns a reference to the identity
> +// of the last running server in the Service's directory.
> +// If the server was shut down, it returns (nil, nil).
>
> This shouldn't be in a generic service API.  What if there
> are multiple processes handled by the service?

my plan was to change things so the server is started in
its own process group or sess...

Read more...

Unmerged revisions

23. By Roger Peppe

Factored out server running code into a separate package.

22. By Roger Peppe

Fix Server code.

21. By Gustavo Niemeyer

The package location is now http://launchpad.net/gozk/zk.

The shorter package name makes for significantly more digestible line lengths.

20. By Gustavo Niemeyer

Merged clean-up-interface branch, by Roger Peppe [r=niemeyer]

This branch does a number of incompatible changes that improve
the overall API of gozk.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile'
--- Makefile 2011-08-03 01:56:58 +0000
+++ Makefile 2011-10-03 15:02:28 +0000
@@ -2,10 +2,13 @@
22
3all: package3all: package
44
5TARG=gozk5TARG=launchpad.net/gozk/zk
66
7GOFILES=\
8 server.go\
9
7CGOFILES=\10CGOFILES=\
8 gozk.go\11 zk.go\
912
10CGO_OFILES=\13CGO_OFILES=\
11 helpers.o\14 helpers.o\
1215
=== added directory 'example'
=== renamed file 'example.go' => 'example/example.go'
--- example.go 2011-08-03 01:47:25 +0000
+++ example/example.go 2011-10-03 15:02:28 +0000
@@ -1,30 +1,29 @@
1package main1package main
22
3import (3import (
4 "gozk"4 "launchpad.net/zookeeper/zookeeper"
5)5)
66
7func main() {7func main() {
8 zk, session, err := gozk.Init("localhost:2181", 5000)8 zk, session, err := zookeeper.Init("localhost:2181", 5000)
9 if err != nil {9 if err != nil {
10 println("Couldn't connect: " + err.String())10 println("Couldn't connect: " + err.String())
11 return11 return
12 }12 }
1313
14 defer zk.Close()14 defer zk.Close()
1515
16 // Wait for connection.16 // Wait for connection.
17 event := <-session17 event := <-session
18 if event.State != gozk.STATE_CONNECTED {18 if event.State != zookeeper.STATE_CONNECTED {
19 println("Couldn't connect")19 println("Couldn't connect")
20 return20 return
21 }21 }
2222
23 _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))23 _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
24 if err != nil {24 if err != nil {
25 println(err.String())25 println(err.String())
26 } else {26 } else {
27 println("Created!")27 println("Created!")
28 }28 }
29}29}
30
3130
=== added file 'reattach_test.go'
--- reattach_test.go 1970-01-01 00:00:00 +0000
+++ reattach_test.go 2011-10-03 15:02:28 +0000
@@ -0,0 +1,202 @@
1package zk_test
2
3import (
4 "bufio"
5 . "launchpad.net/gocheck"
6 "launchpad.net/gozk/zk/service"
7 "launchpad.net/gozk/zk"
8 "exec"
9 "flag"
10 "fmt"
11 "os"
12 "strings"
13 "testing"
14 "time"
15)
16
17var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing")
18var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing")
19var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing")
20
21// This is the reentrancy point for testing ZooKeeper servers
22// started by processes that are not direct children of the
23// testing process. This test always succeeds - the status
24// will be written to stdout and read by indirectServer.
25func TestStartNonChildServer(t *testing.T) {
26 if !*reattach {
27 // not re-entrant, so ignore this test.
28 return
29 }
30 err := startServer(*reattachRunDir, *reattachAbnormalStop)
31 if err != nil {
32 fmt.Printf("error:%v\n", err)
33 return
34 }
35 fmt.Printf("done\n")
36}
37
38func (s *S) startServer(c *C, abort bool) {
39 err := startServer(s.zkTestRoot, abort)
40 c.Assert(err, IsNil)
41}
42
43// startServerIndirect starts a ZooKeeper server that is not
44// a direct child of the current process. If abort is true,
45// the server will be started and then terminated abnormally.
46func (s *S) startServerIndirect(c *C, abort bool) {
47 if len(os.Args) == 0 {
48 c.Fatal("Cannot find self executable name")
49 }
50 cmd := exec.Command(
51 os.Args[0],
52 "-zktest.reattach",
53 "-zktest.rundir", s.zkTestRoot,
54 "-zktest.stop=", fmt.Sprint(abort),
55 "-test.run", "StartNonChildServer",
56 )
57 r, err := cmd.StdoutPipe()
58 c.Assert(err, IsNil)
59 defer r.Close()
60 if err := cmd.Start(); err != nil {
61 c.Fatalf("cannot start re-entrant gotest process: %v", err)
62 }
63 defer cmd.Wait()
64 bio := bufio.NewReader(r)
65 for {
66 line, err := bio.ReadSlice('\n')
67 if err != nil {
68 c.Fatalf("indirect server status line not found: %v", err)
69 }
70 s := string(line)
71 if strings.HasPrefix(s, "error:") {
72 c.Fatalf("indirect server error: %s", s[len("error:"):])
73 }
74 if s == "done\n" {
75 return
76 }
77 }
78 panic("not reached")
79}
80
81// startServer starts a ZooKeeper server, and terminates it abnormally
82// if abort is true.
83func startServer(runDir string, abort bool) os.Error {
84 s, err := zk.AttachServer(runDir)
85 if err != nil {
86 return fmt.Errorf("cannot attach to server at %q: %v", runDir, err)
87 }
88 srv := service.New(s)
89 if err := srv.Start(); err != nil {
90 return fmt.Errorf("cannot start server: %v", err)
91 }
92 if abort {
93 // Give it time to start up, then kill the server process abnormally,
94 // leaving the pid.txt file behind.
95 time.Sleep(0.5e9)
96 p, err := srv.Process()
97 if err != nil {
98 return fmt.Errorf("cannot get server process: %v", err)
99 }
100 defer p.Release()
101 if err := p.Kill(); err != nil {
102 return fmt.Errorf("cannot kill server process: %v", err)
103 }
104 }
105 return nil
106}
107
108func (s *S) checkCookie(c *C) {
109 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
110 c.Assert(err, IsNil)
111
112 e, ok := <-watch
113 c.Assert(ok, Equals, true)
114 c.Assert(e.Ok(), Equals, true)
115
116 c.Assert(err, IsNil)
117 cookie, _, err := conn.Get("/testAttachCookie")
118 c.Assert(err, IsNil)
119 c.Assert(cookie, Equals, "testAttachCookie")
120 conn.Close()
121}
122
123// cases to test:
124// child server, stopped normally; reattach, start
125// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
126// non-direct child server, still running; reattach, start (->error), stop, start
127// child server, still running; reattach, start (-> error)
128// child server, still running; reattach, stop, start.
129// non-direct child server, still running; reattach, stop, start.
130func (s *S) TestAttachServer(c *C) {
131 // Create a cookie so that we know we are reattaching to the same instance.
132 conn, _ := s.init(c)
133 _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL))
134 c.Assert(err, IsNil)
135 s.checkCookie(c)
136 s.zkServer.Stop()
137 s.zkServer = nil
138
139 s.testAttachServer(c, (*S).startServer)
140 s.testAttachServer(c, (*S).startServerIndirect)
141 s.testAttachServerAbnormalTerminate(c, (*S).startServer)
142 s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect)
143
144 srv, err := zk.AttachServer(s.zkTestRoot)
145 c.Assert(err, IsNil)
146
147 s.zkServer = service.New(srv)
148 err = s.zkServer.Start()
149 c.Assert(err, IsNil)
150
151 conn, _ = s.init(c)
152 err = conn.Delete("/testAttachCookie", -1)
153 c.Assert(err, IsNil)
154}
155
156func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) {
157 start(s, c, false)
158
159 s.checkCookie(c)
160
161 // try attaching to it while it is still running - it should fail.
162 a, err := zk.AttachServer(s.zkTestRoot)
163 c.Assert(err, IsNil)
164 srv := service.New(a)
165
166 err = srv.Start()
167 c.Assert(err, NotNil)
168
169 // stop it and then start it again - it should succeed.
170 err = srv.Stop()
171 c.Assert(err, IsNil)
172
173 err = srv.Start()
174 c.Assert(err, IsNil)
175
176 s.checkCookie(c)
177
178 err = srv.Stop()
179 c.Assert(err, IsNil)
180}
181
182func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) {
183 start(s, c, true)
184
185 // try attaching to it and starting - it should fail, because pid.txt
186 // won't have been removed.
187 a, err := zk.AttachServer(s.zkTestRoot)
188 c.Assert(err, IsNil)
189 srv := service.New(a)
190 err = srv.Start()
191 c.Assert(err, NotNil)
192
193 // stopping it should bring things back to normal.
194 err = srv.Stop()
195 c.Assert(err, IsNil)
196 err = srv.Start()
197 c.Assert(err, IsNil)
198
199 s.checkCookie(c)
200 err = srv.Stop()
201 c.Assert(err, IsNil)
202}
0\ No newline at end of file203\ No newline at end of file
1204
=== modified file 'retry_test.go'
--- retry_test.go 2011-08-19 01:51:37 +0000
+++ retry_test.go 2011-10-03 15:02:28 +0000
@@ -1,42 +1,41 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/gozk/zk"
6 "os"6 "os"
7)7)
88
9func (s *S) TestRetryChangeCreating(c *C) {9func (s *S) TestRetryChangeCreating(c *C) {
10 zk, _ := s.init(c)10 conn, _ := s.init(c)
1111
12 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),12 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
13 func(data string, stat gozk.Stat) (string, os.Error) {13 func(data string, stat *zk.Stat) (string, os.Error) {
14 c.Assert(data, Equals, "")14 c.Assert(data, Equals, "")
15 c.Assert(stat, IsNil)15 c.Assert(stat, IsNil)
16 return "new", nil16 return "new", nil
17 })17 })
18 c.Assert(err, IsNil)18 c.Assert(err, IsNil)
1919
20 data, stat, err := zk.Get("/test")20 data, stat, err := conn.Get("/test")
21 c.Assert(err, IsNil)21 c.Assert(err, IsNil)
22 c.Assert(stat, NotNil)22 c.Assert(stat, NotNil)
23 c.Assert(stat.Version(), Equals, int32(0))23 c.Assert(stat.Version(), Equals, int32(0))
24 c.Assert(data, Equals, "new")24 c.Assert(data, Equals, "new")
2525
26 acl, _, err := zk.ACL("/test")26 acl, _, err := conn.ACL("/test")
27 c.Assert(err, IsNil)27 c.Assert(err, IsNil)
28 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))28 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
29}29}
3030
31func (s *S) TestRetryChangeSetting(c *C) {31func (s *S) TestRetryChangeSetting(c *C) {
32 zk, _ := s.init(c)32 conn, _ := s.init(c)
3333
34 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,34 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
35 gozk.WorldACL(gozk.PERM_ALL))
36 c.Assert(err, IsNil)35 c.Assert(err, IsNil)
3736
38 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},37 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
39 func(data string, stat gozk.Stat) (string, os.Error) {38 func(data string, stat *zk.Stat) (string, os.Error) {
40 c.Assert(data, Equals, "old")39 c.Assert(data, Equals, "old")
41 c.Assert(stat, NotNil)40 c.Assert(stat, NotNil)
42 c.Assert(stat.Version(), Equals, int32(0))41 c.Assert(stat.Version(), Equals, int32(0))
@@ -44,27 +43,26 @@
44 })43 })
45 c.Assert(err, IsNil)44 c.Assert(err, IsNil)
4645
47 data, stat, err := zk.Get("/test")46 data, stat, err := conn.Get("/test")
48 c.Assert(err, IsNil)47 c.Assert(err, IsNil)
49 c.Assert(stat, NotNil)48 c.Assert(stat, NotNil)
50 c.Assert(stat.Version(), Equals, int32(1))49 c.Assert(stat.Version(), Equals, int32(1))
51 c.Assert(data, Equals, "brand new")50 c.Assert(data, Equals, "brand new")
5251
53 // ACL was unchanged by RetryChange().52 // ACL was unchanged by RetryChange().
54 acl, _, err := zk.ACL("/test")53 acl, _, err := conn.ACL("/test")
55 c.Assert(err, IsNil)54 c.Assert(err, IsNil)
56 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))55 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
57}56}
5857
59func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {58func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
60 zk, _ := s.init(c)59 conn, _ := s.init(c)
6160
62 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,61 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
63 gozk.WorldACL(gozk.PERM_ALL))
64 c.Assert(err, IsNil)62 c.Assert(err, IsNil)
6563
66 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},64 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
67 func(data string, stat gozk.Stat) (string, os.Error) {65 func(data string, stat *zk.Stat) (string, os.Error) {
68 c.Assert(data, Equals, "old")66 c.Assert(data, Equals, "old")
69 c.Assert(stat, NotNil)67 c.Assert(stat, NotNil)
70 c.Assert(stat.Version(), Equals, int32(0))68 c.Assert(stat.Version(), Equals, int32(0))
@@ -72,7 +70,7 @@
72 })70 })
73 c.Assert(err, IsNil)71 c.Assert(err, IsNil)
7472
75 data, stat, err := zk.Get("/test")73 data, stat, err := conn.Get("/test")
76 c.Assert(err, IsNil)74 c.Assert(err, IsNil)
77 c.Assert(stat, NotNil)75 c.Assert(stat, NotNil)
78 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!76 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!
@@ -80,14 +78,14 @@
80}78}
8179
82func (s *S) TestRetryChangeConflictOnCreate(c *C) {80func (s *S) TestRetryChangeConflictOnCreate(c *C) {
83 zk, _ := s.init(c)81 conn, _ := s.init(c)
8482
85 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {83 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
86 switch data {84 switch data {
87 case "":85 case "":
88 c.Assert(stat, IsNil)86 c.Assert(stat, IsNil)
89 _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,87 _, err := conn.Create("/test", "conflict", zk.EPHEMERAL,
90 gozk.WorldACL(gozk.PERM_ALL))88 zk.WorldACL(zk.PERM_ALL))
91 c.Assert(err, IsNil)89 c.Assert(err, IsNil)
92 return "<none> => conflict", nil90 return "<none> => conflict", nil
93 case "conflict":91 case "conflict":
@@ -100,11 +98,10 @@
100 return "can't happen", nil98 return "can't happen", nil
101 }99 }
102100
103 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),101 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc)
104 changeFunc)
105 c.Assert(err, IsNil)102 c.Assert(err, IsNil)
106103
107 data, stat, err := zk.Get("/test")104 data, stat, err := conn.Get("/test")
108 c.Assert(err, IsNil)105 c.Assert(err, IsNil)
109 c.Assert(data, Equals, "conflict => new")106 c.Assert(data, Equals, "conflict => new")
110 c.Assert(stat, NotNil)107 c.Assert(stat, NotNil)
@@ -112,18 +109,17 @@
112}109}
113110
114func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {111func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
115 zk, _ := s.init(c)112 conn, _ := s.init(c)
116113
117 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,114 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
118 gozk.WorldACL(gozk.PERM_ALL))
119 c.Assert(err, IsNil)115 c.Assert(err, IsNil)
120116
121 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {117 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
122 switch data {118 switch data {
123 case "old":119 case "old":
124 c.Assert(stat, NotNil)120 c.Assert(stat, NotNil)
125 c.Assert(stat.Version(), Equals, int32(0))121 c.Assert(stat.Version(), Equals, int32(0))
126 _, err := zk.Set("/test", "conflict", 0)122 _, err := conn.Set("/test", "conflict", 0)
127 c.Assert(err, IsNil)123 c.Assert(err, IsNil)
128 return "old => new", nil124 return "old => new", nil
129 case "conflict":125 case "conflict":
@@ -136,10 +132,10 @@
136 return "can't happen", nil132 return "can't happen", nil
137 }133 }
138134
139 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)135 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc)
140 c.Assert(err, IsNil)136 c.Assert(err, IsNil)
141137
142 data, stat, err := zk.Get("/test")138 data, stat, err := conn.Get("/test")
143 c.Assert(err, IsNil)139 c.Assert(err, IsNil)
144 c.Assert(data, Equals, "conflict => new")140 c.Assert(data, Equals, "conflict => new")
145 c.Assert(stat, NotNil)141 c.Assert(stat, NotNil)
@@ -147,18 +143,17 @@
147}143}
148144
149func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {145func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
150 zk, _ := s.init(c)146 conn, _ := s.init(c)
151147
152 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,148 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
153 gozk.WorldACL(gozk.PERM_ALL))
154 c.Assert(err, IsNil)149 c.Assert(err, IsNil)
155150
156 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {151 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
157 switch data {152 switch data {
158 case "old":153 case "old":
159 c.Assert(stat, NotNil)154 c.Assert(stat, NotNil)
160 c.Assert(stat.Version(), Equals, int32(0))155 c.Assert(stat.Version(), Equals, int32(0))
161 err := zk.Delete("/test", 0)156 err := conn.Delete("/test", 0)
162 c.Assert(err, IsNil)157 c.Assert(err, IsNil)
163 return "old => <deleted>", nil158 return "old => <deleted>", nil
164 case "":159 case "":
@@ -170,55 +165,53 @@
170 return "can't happen", nil165 return "can't happen", nil
171 }166 }
172167
173 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),168 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc)
174 changeFunc)
175 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
176170
177 data, stat, err := zk.Get("/test")171 data, stat, err := conn.Get("/test")
178 c.Assert(err, IsNil)172 c.Assert(err, IsNil)
179 c.Assert(data, Equals, "<deleted> => new")173 c.Assert(data, Equals, "<deleted> => new")
180 c.Assert(stat, NotNil)174 c.Assert(stat, NotNil)
181 c.Assert(stat.Version(), Equals, int32(0))175 c.Assert(stat.Version(), Equals, int32(0))
182176
183 // Should be the new ACL.177 // Should be the new ACL.
184 acl, _, err := zk.ACL("/test")178 acl, _, err := conn.ACL("/test")
185 c.Assert(err, IsNil)179 c.Assert(err, IsNil)
186 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))180 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
187}181}
188182
189func (s *S) TestRetryChangeErrorInCallback(c *C) {183func (s *S) TestRetryChangeErrorInCallback(c *C) {
190 zk, _ := s.init(c)184 conn, _ := s.init(c)
191185
192 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),186 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
193 func(data string, stat gozk.Stat) (string, os.Error) {187 func(data string, stat *zk.Stat) (string, os.Error) {
194 return "don't use this", os.NewError("BOOM!")188 return "don't use this", os.NewError("BOOM!")
195 })189 })
196 c.Assert(err, NotNil)190 c.Assert(err, NotNil)
197 c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
198 c.Assert(err.String(), Equals, "BOOM!")191 c.Assert(err.String(), Equals, "BOOM!")
199192
200 stat, err := zk.Exists("/test")193 stat, err := conn.Exists("/test")
201 c.Assert(err, IsNil)194 c.Assert(err, IsNil)
202 c.Assert(stat, IsNil)195 c.Assert(stat, IsNil)
203}196}
204197
205func (s *S) TestRetryChangeFailsReading(c *C) {198func (s *S) TestRetryChangeFailsReading(c *C) {
206 zk, _ := s.init(c)199 conn, _ := s.init(c)
207200
208 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,201 // Write only!
209 gozk.WorldACL(gozk.PERM_WRITE)) // Write only!202 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE))
210 c.Assert(err, IsNil)203 c.Assert(err, IsNil)
211204
212 var called bool205 var called bool
213 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),206 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
214 func(data string, stat gozk.Stat) (string, os.Error) {207 func(data string, stat *zk.Stat) (string, os.Error) {
215 called = true208 called = true
216 return "", nil209 return "", nil
217 })210 })
218 c.Assert(err, NotNil)211 c.Assert(err, NotNil)
219 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)212 c.Assert(err, Equals, zk.ZNOAUTH)
220213
221 stat, err := zk.Exists("/test")214 stat, err := conn.Exists("/test")
222 c.Assert(err, IsNil)215 c.Assert(err, IsNil)
223 c.Assert(stat, NotNil)216 c.Assert(stat, NotNil)
224 c.Assert(stat.Version(), Equals, int32(0))217 c.Assert(stat.Version(), Equals, int32(0))
@@ -227,22 +220,21 @@
227}220}
228221
229func (s *S) TestRetryChangeFailsSetting(c *C) {222func (s *S) TestRetryChangeFailsSetting(c *C) {
230 zk, _ := s.init(c)223 conn, _ := s.init(c)
231224
232 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,225 // Read only!
233 gozk.WorldACL(gozk.PERM_READ)) // Read only!226 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
234 c.Assert(err, IsNil)227 c.Assert(err, IsNil)
235228
236 var called bool229 var called bool
237 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),230 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
238 func(data string, stat gozk.Stat) (string, os.Error) {231 func(data string, stat *zk.Stat) (string, os.Error) {
239 called = true232 called = true
240 return "", nil233 return "", nil
241 })234 })
242 c.Assert(err, NotNil)235 c.Assert(err, Equals, zk.ZNOAUTH)
243 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
244236
245 stat, err := zk.Exists("/test")237 stat, err := conn.Exists("/test")
246 c.Assert(err, IsNil)238 c.Assert(err, IsNil)
247 c.Assert(stat, NotNil)239 c.Assert(stat, NotNil)
248 c.Assert(stat.Version(), Equals, int32(0))240 c.Assert(stat.Version(), Equals, int32(0))
@@ -251,23 +243,22 @@
251}243}
252244
253func (s *S) TestRetryChangeFailsCreating(c *C) {245func (s *S) TestRetryChangeFailsCreating(c *C) {
254 zk, _ := s.init(c)246 conn, _ := s.init(c)
255247
256 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,248 // Read only!
257 gozk.WorldACL(gozk.PERM_READ)) // Read only!249 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
258 c.Assert(err, IsNil)250 c.Assert(err, IsNil)
259251
260 var called bool252 var called bool
261 err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,253 err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
262 gozk.WorldACL(gozk.PERM_ALL),254 func(data string, stat *zk.Stat) (string, os.Error) {
263 func(data string, stat gozk.Stat) (string, os.Error) {
264 called = true255 called = true
265 return "", nil256 return "", nil
266 })257 })
267 c.Assert(err, NotNil)258 c.Assert(err, NotNil)
268 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)259 c.Assert(err, Equals, zk.ZNOAUTH)
269260
270 stat, err := zk.Exists("/test/sub")261 stat, err := conn.Exists("/test/sub")
271 c.Assert(err, IsNil)262 c.Assert(err, IsNil)
272 c.Assert(stat, IsNil)263 c.Assert(stat, IsNil)
273264
274265
=== added file 'server.go'
--- server.go 1970-01-01 00:00:00 +0000
+++ server.go 2011-10-03 15:02:28 +0000
@@ -0,0 +1,239 @@
1package zk
2
3import (
4 "bufio"
5 "bytes"
6 "fmt"
7 "io/ioutil"
8 "net"
9 "os"
10 "path/filepath"
11 "strings"
12)
13
14// Server represents a ZooKeeper server, its data and configuration files.
15type Server struct {
16 runDir string
17 installDir string
18}
19
20func (srv *Server) Directory() string {
21 return srv.runDir
22}
23
24// CreateServer creates the directory runDir and sets up a ZooKeeper server
25// environment inside it. It is an error if runDir already exists.
26// The server will listen on the specified TCP port.
27//
28// The ZooKeeper installation directory is specified by installDir.
29// If this is empty, a system default will be used.
30//
31// CreateServer does not start the server - the *Server that
32// is returned can be used with the service package, for example:
33// see launchpad.net/gozk/zk/service.
34func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
35 if err := os.Mkdir(runDir, 0777); err != nil {
36 return nil, err
37 }
38 srv := &Server{runDir: runDir, installDir: installDir}
39 if err := srv.writeLog4JConfig(); err != nil {
40 return nil, err
41 }
42 if err := srv.writeZooKeeperConfig(port); err != nil {
43 return nil, err
44 }
45 if err := srv.writeInstallDir(); err != nil {
46 return nil, err
47 }
48 return srv, nil
49}
50
51// AttachServer creates a new ZooKeeper Server instance
52// to operate inside an existing run directory, runDir.
53// The directory must have been created with CreateServer.
54func AttachServer(runDir string) (*Server, os.Error) {
55 srv := &Server{runDir: runDir}
56 if err := srv.readInstallDir(); err != nil {
57 return nil, fmt.Errorf("cannot read server install directory: %v", err)
58 }
59 return srv, nil
60}
61
62func (srv *Server) path(name string) string {
63 return filepath.Join(srv.runDir, name)
64}
65
66func (srv *Server) CheckAvailability() os.Error {
67 port, err := srv.NetworkPort()
68 if err != nil {
69 return fmt.Errorf("cannot get network port: %v", err)
70 }
71 l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
72 if err != nil {
73 return fmt.Errorf("cannot listen on port %v: %v", port, err)
74 }
75 l.Close()
76 return nil
77}
78
79// ServerCommand returns the command used to start the
80// ZooKeeper server. It is provided for debugging and testing
81// purposes only.
82func (srv *Server) Command() ([]string, os.Error) {
83 cp, err := srv.classPath()
84 if err != nil {
85 return nil, fmt.Errorf("cannot get class path: %v", err)
86 }
87 return []string{
88 "java",
89 "-cp", strings.Join(cp, ":"),
90 "-Dzookeeper.root.logger=INFO,CONSOLE",
91 "-Dlog4j.configuration=file:"+srv.path("log4j.properties"),
92 "org.apache.zookeeper.server.quorum.QuorumPeerMain",
93 srv.path("zoo.cfg"),
94 }, nil
95}
96
97var log4jProperties = `
98log4j.rootLogger=INFO, CONSOLE
99log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
100log4j.appender.CONSOLE.Threshold=INFO
101log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
102log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
103`
104
105func (srv *Server) writeLog4JConfig() (err os.Error) {
106 return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666)
107}
108
109func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) {
110 return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf(
111 "tickTime=2000\n"+
112 "dataDir=%s\n"+
113 "clientPort=%d\n"+
114 "maxClientCnxns=500\n",
115 srv.runDir, port)), 0666)
116}
117
118// NetworkPort returns the TCP port number that
119// the server is configured for.
120func (srv *Server) NetworkPort() (int, os.Error) {
121 f, err := os.Open(srv.path("zoo.cfg"))
122 if err != nil {
123 return 0, err
124 }
125 r := bufio.NewReader(f)
126 for {
127 line, err := r.ReadSlice('\n')
128 if err != nil {
129 return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg"))
130 }
131 var port int
132 if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 {
133 return port, nil
134 }
135 }
136 panic("not reached")
137}
138
139func (srv *Server) writeInstallDir() os.Error {
140 return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)
141}
142
143func (srv *Server) readInstallDir() os.Error {
144 data, err := ioutil.ReadFile(srv.path("installdir.txt"))
145 if err != nil {
146 return err
147 }
148 if data[len(data)-1] == '\n' {
149 data = data[0 : len(data)-1]
150 }
151 srv.installDir = string(data)
152 return nil
153}
154
155func (srv *Server) classPath() ([]string, os.Error) {
156 dir := srv.installDir
157 if dir == "" {
158 return systemClassPath()
159 }
160 if err := checkDirectory(dir); err != nil {
161 return nil, err
162 }
163 // Two possibilities, as seen in zkEnv.sh:
164 // 1) locally built binaries (jars are in build directory)
165 // 2) release binaries
166 if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
167 dir = build
168 }
169 classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
170 if err != nil {
171 panic(fmt.Errorf("glob for jar files: %v", err))
172 }
173 more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
174 if err != nil {
175 panic(fmt.Errorf("glob for lib jar files: %v", err))
176 }
177
178 classPath = append(classPath, more...)
179 if len(classPath) == 0 {
180 return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
181 }
182 return classPath, nil
183}
184
185const zookeeperEnviron = "/etc/zookeeper/conf/environment"
186
187func systemClassPath() ([]string, os.Error) {
188 f, err := os.Open(zookeeperEnviron)
189 if f == nil {
190 return nil, err
191 }
192 r := bufio.NewReader(f)
193 for {
194 line, err := r.ReadSlice('\n')
195 if err != nil {
196 break
197 }
198 if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
199 continue
200 }
201
202 // remove variable and newline
203 path := string(line[len("CLASSPATH=") : len(line)-1])
204
205 // trim white space
206 path = strings.Trim(path, " \t\r")
207
208 // strip quotes
209 if path[0] == '"' {
210 path = path[1 : len(path)-1]
211 }
212
213 // split on :
214 classPath := strings.Split(path, ":")
215
216 // split off $ZOOCFGDIR
217 if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
218 classPath = classPath[1:]
219 }
220
221 if len(classPath) == 0 {
222 return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
223 }
224 return classPath, nil
225 }
226 return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
227}
228
229// checkDirectory returns an error if the given path
230// does not exist or is not a directory.
231func checkDirectory(path string) os.Error {
232 if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
233 if err == nil {
234 err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
235 }
236 return err
237 }
238 return nil
239}
0240
=== added directory 'service'
=== added file 'service/Makefile'
--- service/Makefile 1970-01-01 00:00:00 +0000
+++ service/Makefile 2011-10-03 15:02:28 +0000
@@ -0,0 +1,20 @@
1include $(GOROOT)/src/Make.inc
2
3TARG=launchpad.net/gozk/zk/service
4
5GOFILES=\
6 service.go\
7
8GOFMT=gofmt
9BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go))
10
11gofmt: $(BADFMT)
12 @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done
13
14ifneq ($(BADFMT),)
15ifneq ($(MAKECMDGOALS),gofmt)
16$(warning WARNING: make gofmt: $(BADFMT))
17endif
18endif
19
20include $(GOROOT)/src/Make.pkg
021
=== added file 'service/service.go'
--- service/service.go 1970-01-01 00:00:00 +0000
+++ service/service.go 2011-10-03 15:02:28 +0000
@@ -0,0 +1,160 @@
1// The service package provides support for long-running services.
2package service
3
4import (
5 "os"
6 "fmt"
7 "io/ioutil"
8 "strconv"
9 "path/filepath"
10 "exec"
11 "strings"
12 "time"
13)
14
15// Interface represents a single-process server.
16type Interface interface {
17 // Directory gives the path to the directory containing
18 // the server's configuration and data files. It is assumed that
19 // this exists and can be modified.
20 Directory() string
21
22 // CheckAvailability should return an error if resources
23 // required by the server are not available; for instance
24 // if the server requires a network port which is not free.
25 CheckAvailability() os.Error
26
27 // Command should return a command that can be
28 // used to run the server. The first element of the returned
29 // slice is the executable name; the rest of the elements are
30 // its arguments.
31 Command() ([]string, os.Error)
32}
33
34// Service represents a possibly running server process.
35type Service struct {
36 srv Interface
37}
38
39// New returns a new Service using the given
40// server interface.
41func New(srv Interface) *Service {
42 return &Service{srv}
43}
44
45// Process returns a reference to the identity
46// of the last running server in the Service's directory.
47// If the server was shut down, it returns (nil, nil).
48// The server process may not still be running
49// (for instance if it was terminated abnormally).
50// This function is provided for debugging and testing
51// purposes only.
52func (s *Service) Process() (*os.Process, os.Error) {
53 data, err := ioutil.ReadFile(s.path("pid.txt"))
54 if err != nil {
55 if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT {
56 return nil, nil
57 }
58 return nil, err
59 }
60 pid, err := strconv.Atoi(string(data))
61 if err != nil {
62 return nil, os.NewError("bad process id found in pid.txt")
63 }
64 return os.FindProcess(pid)
65}
66
67func (s *Service) path(name string) string {
68 return filepath.Join(s.srv.Directory(), name)
69}
70
71// Start starts the server. It returns an error if the server is already running.
72// It stores the process ID of the server in the file "pid.txt"
73// inside the server's directory. Stdout and stderr of the server
74// are similarly written to "log.txt".
75func (s *Service) Start() os.Error {
76 if err := s.srv.CheckAvailability(); err != nil {
77 return err
78 }
79 p, err := s.Process()
80 if p != nil || err != nil {
81 if p != nil {
82 p.Release()
83 }
84 return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt"))
85 }
86
87 // create the pid file before starting the process so that if we get two
88 // programs trying to concurrently start a server on the same directory
89 // at the same time, only one should succeed.
90 pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666)
91 if err != nil {
92 return fmt.Errorf("cannot create pid.txt: %v", err)
93 }
94 defer pidf.Close()
95
96 args, err := s.srv.Command()
97 if err != nil {
98 return fmt.Errorf("cannot determine command: %v", err)
99 }
100 cmd := exec.Command(args[0], args[1:]...)
101
102 logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
103 if err != nil {
104 return fmt.Errorf("cannot create log file: %v", err)
105 }
106 defer logf.Close()
107 cmd.Stdout = logf
108 cmd.Stderr = logf
109 if err := cmd.Start(); err != nil {
110 return fmt.Errorf("cannot start server: %v", err)
111 }
112 if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil {
113 return fmt.Errorf("cannot write pid file: %v", err)
114 }
115 return nil
116}
117
118// Stop kills the server. It is a no-op if it is already running.
119func (s *Service) Stop() os.Error {
120 p, err := s.Process()
121 if p == nil {
122 if err != nil {
123 return fmt.Errorf("cannot read process ID of server: %v", err)
124 }
125 return nil
126 }
127 defer p.Release()
128 if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") {
129 return fmt.Errorf("cannot kill server process: %v", err)
130 }
131 // ignore the error returned from Wait because there's little
132 // we can do about it - it either means that the process has just exited
133 // anyway or that we can't wait for it for some other reason,
134 // for example because it was originally started by some other process.
135 _, err = p.Wait(0)
136 if err != nil && strings.Contains(err.String(), "no child processes") {
137 // If we can't wait for the server, it's possible that it was running
138 // but not as a child of this process, so give it a little while
139 // to exit. TODO poll with kill(no signal)?
140 time.Sleep(0.5e9)
141 }
142
143 if err := os.Remove(s.path("pid.txt")); err != nil {
144 return fmt.Errorf("cannot remove server process ID file: %v", err)
145 }
146 return nil
147}
148
149// Destroy stops the server, and then removes its
150// directory and all its contents.
151// Warning: this will destroy all data associated with the server.
152func (s *Service) Destroy() os.Error {
153 if err := s.Stop(); err != nil {
154 return err
155 }
156 if err := os.RemoveAll(s.srv.Directory()); err != nil {
157 return err
158 }
159 return nil
160}
0161
=== modified file 'suite_test.go'
--- suite_test.go 2011-08-19 01:43:37 +0000
+++ suite_test.go 2011-10-03 15:02:28 +0000
@@ -1,65 +1,48 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "testing"
6 "io/ioutil"
7 "path"
8 "fmt"5 "fmt"
6 "launchpad.net/gozk/zk/service"
7 "launchpad.net/gozk/zk"
9 "os"8 "os"
10 "gozk"9 "testing"
11 "time"10 "time"
12)11)
1312
14func TestAll(t *testing.T) {13func TestAll(t *testing.T) {
15 TestingT(t)14 if !*reattach {
15 TestingT(t)
16 }
16}17}
1718
18var _ = Suite(&S{})19var _ = Suite(&S{})
1920
20type S struct {21type S struct {
21 zkRoot string22 zkServer *service.Service
22 zkTestRoot string23 zkTestRoot string
23 zkTestPort int24 zkAddr string
24 zkServerSh string
25 zkServerOut *os.File
26 zkAddr string
2725
28 handles []*gozk.ZooKeeper26 handles []*zk.Conn
29 events []*gozk.Event27 events []*zk.Event
30 liveWatches int28 liveWatches int
31 deadWatches chan bool29 deadWatches chan bool
32}30}
3331
34var logLevel = 0 //gozk.LOG_ERROR32var logLevel = 0 //zk.LOG_ERROR
3533
3634func (s *S) init(c *C) (*zk.Conn, chan zk.Event) {
37var testZooCfg = ("dataDir=%s\n" +35 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
38 "clientPort=%d\n" +
39 "tickTime=2000\n" +
40 "initLimit=10\n" +
41 "syncLimit=5\n" +
42 "")
43
44var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
45 "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
46 "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
47 "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
48 "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
49 "")
50
51func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
52 zk, watch, err := gozk.Init(s.zkAddr, 5e9)
53 c.Assert(err, IsNil)36 c.Assert(err, IsNil)
5437
55 s.handles = append(s.handles, zk)38 s.handles = append(s.handles, conn)
5639
57 event := <-watch40 event := <-watch
5841
59 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)42 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
60 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)43 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
6144
62 bufferedWatch := make(chan gozk.Event, 256)45 bufferedWatch := make(chan zk.Event, 256)
63 bufferedWatch <- event46 bufferedWatch <- event
6447
65 s.liveWatches += 148 s.liveWatches += 1
@@ -82,13 +65,13 @@
82 s.deadWatches <- true65 s.deadWatches <- true
83 }()66 }()
8467
85 return zk, bufferedWatch68 return conn, bufferedWatch
86}69}
8770
88func (s *S) SetUpTest(c *C) {71func (s *S) SetUpTest(c *C) {
89 c.Assert(gozk.CountPendingWatches(), Equals, 0,72 c.Assert(zk.CountPendingWatches(), Equals, 0,
90 Bug("Test got a dirty watch state before running!"))73 Bug("Test got a dirty watch state before running!"))
91 gozk.SetLogLevel(logLevel)74 zk.SetLogLevel(logLevel)
92}75}
9376
94func (s *S) TearDownTest(c *C) {77func (s *S) TearDownTest(c *C) {
@@ -108,98 +91,38 @@
108 }91 }
10992
110 // Reset the list of handles.93 // Reset the list of handles.
111 s.handles = make([]*gozk.ZooKeeper, 0)94 s.handles = make([]*zk.Conn, 0)
11295
113 c.Assert(gozk.CountPendingWatches(), Equals, 0,96 c.Assert(zk.CountPendingWatches(), Equals, 0,
114 Bug("Test left live watches behind!"))97 Bug("Test left live watches behind!"))
115}98}
11699
117// We use the suite set up and tear down to manage a custom zookeeper100// We use the suite set up and tear down to manage a custom ZooKeeper
118//101//
119func (s *S) SetUpSuite(c *C) {102func (s *S) SetUpSuite(c *C) {
120103 var err os.Error
121 s.deadWatches = make(chan bool)104 s.deadWatches = make(chan bool)
122105
123 var err os.Error106 // N.B. We meed to create a subdirectory because zk.CreateServer
124107 // insists on creating its own directory.
125 s.zkRoot = os.Getenv("ZKROOT")108 s.zkTestRoot = c.MkDir() + "/zk"
126 if s.zkRoot == "" {109
127 panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")110 port := 21812
128 }111 s.zkAddr = fmt.Sprint("localhost:", port)
129112
130 s.zkTestRoot = c.MkDir()113 srv, err := zk.CreateServer(port, s.zkTestRoot, "")
131 s.zkTestPort = 21812114 if err != nil {
132115 c.Fatal("Cannot set up server environment: ", err)
133 println("ZooKeeper test server directory:", s.zkTestRoot)116 }
134 println("ZooKeeper test server port:", s.zkTestPort)117 s.zkServer = service.New(srv)
135118 err = s.zkServer.Start()
136 s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)119 if err != nil {
137120 c.Fatal("Cannot start ZooKeeper server: ", err)
138 s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")121 }
139 s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
140 os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
141 if err != nil {
142 panic("Can't open stdout.txt file for server: " + err.String())
143 }
144
145 dataDir := path.Join(s.zkTestRoot, "data")
146 confDir := path.Join(s.zkTestRoot, "conf")
147
148 os.Mkdir(dataDir, 0755)
149 os.Mkdir(confDir, 0755)
150
151 err = os.Setenv("ZOOCFGDIR", confDir)
152 if err != nil {
153 panic("Can't set $ZOOCFGDIR: " + err.String())
154 }
155
156 zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
157 err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
158 if err != nil {
159 panic("Can't write zoo.cfg: " + err.String())
160 }
161
162 log4jPrp := []byte(testLog4jPrp)
163 err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
164 if err != nil {
165 panic("Can't write log4j.properties: " + err.String())
166 }
167
168 s.StartZK()
169}122}
170123
171func (s *S) TearDownSuite(c *C) {124func (s *S) TearDownSuite(c *C) {
172 s.StopZK()125 if s.zkServer != nil {
173 s.zkServerOut.Close()126 s.zkServer.Stop() // TODO Destroy
174}
175
176func (s *S) StartZK() {
177 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
178 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)
179 if err != nil {
180 panic("Problem executing zkServer.sh start: " + err.String())
181 }
182
183 result, err := proc.Wait(0)
184 if err != nil {
185 panic(err.String())
186 } else if result.ExitStatus() != 0 {
187 panic("'zkServer.sh start' exited with non-zero status")
188 }
189}
190
191func (s *S) StopZK() {
192 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
193 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)
194 if err != nil {
195 panic("Problem executing zkServer.sh stop: " + err.String() +
196 " (look for runaway java processes!)")
197 }
198 result, err := proc.Wait(0)
199 if err != nil {
200 panic(err.String())
201 } else if result.ExitStatus() != 0 {
202 panic("'zkServer.sh stop' exited with non-zero status " +
203 "(look for runaway java processes!)")
204 }127 }
205}128}
206129
=== renamed file 'gozk.go' => 'zk.go'
--- gozk.go 2011-08-19 01:56:39 +0000
+++ zk.go 2011-10-03 15:02:28 +0000
@@ -1,4 +1,4 @@
1// gozk - Zookeeper support for the Go language1// gozk - ZooKeeper support for the Go language
2//2//
3// https://wiki.ubuntu.com/gozk3// https://wiki.ubuntu.com/gozk
4//4//
@@ -6,7 +6,7 @@
6//6//
7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
8//8//
9package gozk9package zk
1010
11/*11/*
12#cgo CFLAGS: -I/usr/include/c-client-src12#cgo CFLAGS: -I/usr/include/c-client-src
@@ -27,17 +27,16 @@
27// -----------------------------------------------------------------------27// -----------------------------------------------------------------------
28// Main constants and data types.28// Main constants and data types.
2929
30// The main ZooKeeper object, created through the Init function.30// Conn represents a connection to a set of ZooKeeper nodes.
31// Encapsulates all communication with ZooKeeper.31type Conn struct {
32type ZooKeeper struct {
33 watchChannels map[uintptr]chan Event32 watchChannels map[uintptr]chan Event
34 sessionWatchId uintptr33 sessionWatchId uintptr
35 handle *C.zhandle_t34 handle *C.zhandle_t
36 mutex sync.Mutex35 mutex sync.Mutex
37}36}
3837
39// ClientId represents the established session in ZooKeeper. This is only38// ClientId represents an established ZooKeeper session. It can be
40// useful to be passed back into the ReInit function.39// passed into Redial to reestablish a connection to an existing session.
41type ClientId struct {40type ClientId struct {
42 cId C.clientid_t41 cId C.clientid_t
43}42}
@@ -98,35 +97,60 @@
98 State int97 State int
99}98}
10099
101// Error codes that may be used to verify the result of the100// Error represents a ZooKeeper error.
102// Code method from Error.101type Error int
102
103const (103const (
104 ZOK = C.ZOK104 ZOK Error = C.ZOK
105 ZSYSTEMERROR = C.ZSYSTEMERROR105 ZSYSTEMERROR Error = C.ZSYSTEMERROR
106 ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY106 ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
107 ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY107 ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
108 ZCONNECTIONLOSS = C.ZCONNECTIONLOSS108 ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
109 ZMARSHALLINGERROR = C.ZMARSHALLINGERROR109 ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
110 ZUNIMPLEMENTED = C.ZUNIMPLEMENTED110 ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
111 ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT111 ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
112 ZBADARGUMENTS = C.ZBADARGUMENTS112 ZBADARGUMENTS Error = C.ZBADARGUMENTS
113 ZINVALIDSTATE = C.ZINVALIDSTATE113 ZINVALIDSTATE Error = C.ZINVALIDSTATE
114 ZAPIERROR = C.ZAPIERROR114 ZAPIERROR Error = C.ZAPIERROR
115 ZNONODE = C.ZNONODE115 ZNONODE Error = C.ZNONODE
116 ZNOAUTH = C.ZNOAUTH116 ZNOAUTH Error = C.ZNOAUTH
117 ZBADVERSION = C.ZBADVERSION117 ZBADVERSION Error = C.ZBADVERSION
118 ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS118 ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
119 ZNODEEXISTS = C.ZNODEEXISTS119 ZNODEEXISTS Error = C.ZNODEEXISTS
120 ZNOTEMPTY = C.ZNOTEMPTY120 ZNOTEMPTY Error = C.ZNOTEMPTY
121 ZSESSIONEXPIRED = C.ZSESSIONEXPIRED121 ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
122 ZINVALIDCALLBACK = C.ZINVALIDCALLBACK122 ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
123 ZINVALIDACL = C.ZINVALIDACL123 ZINVALIDACL Error = C.ZINVALIDACL
124 ZAUTHFAILED = C.ZAUTHFAILED124 ZAUTHFAILED Error = C.ZAUTHFAILED
125 ZCLOSING = C.ZCLOSING125 ZCLOSING Error = C.ZCLOSING
126 ZNOTHING = C.ZNOTHING126 ZNOTHING Error = C.ZNOTHING
127 ZSESSIONMOVED = C.ZSESSIONMOVED127 ZSESSIONMOVED Error = C.ZSESSIONMOVED
128)128)
129129
130func (error Error) String() string {
131 return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
132}
133
134// zkError creates an appropriate error return from
135// a ZooKeeper status and the errno return from a C API
136// call.
137func zkError(rc C.int, cerr os.Error) os.Error {
138 code := Error(rc)
139 switch code {
140 case ZOK:
141 return nil
142
143 case ZSYSTEMERROR:
144 // If a ZooKeeper call returns ZSYSTEMERROR, then
145 // errno becomes significant. If errno has not been
146 // set, then we will return ZSYSTEMERROR nonetheless.
147 if cerr != nil {
148 return cerr
149 }
150 }
151 return code
152}
153
130// Constants for SetLogLevel.154// Constants for SetLogLevel.
131const (155const (
132 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR156 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
@@ -273,104 +297,67 @@
273}297}
274298
275// -----------------------------------------------------------------------299// -----------------------------------------------------------------------
276// Error interface which maps onto the ZooKeeper error codes.
277
278type Error interface {
279 String() string
280 Code() int
281}
282
283type errorType struct {
284 zkrc C.int
285 err os.Error
286}
287
288func newError(zkrc C.int, err os.Error) Error {
289 return &errorType{zkrc, err}
290}
291
292func (error *errorType) String() (result string) {
293 if error.zkrc == ZSYSTEMERROR && error.err != nil {
294 result = error.err.String()
295 } else {
296 result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
297 }
298 return
299}
300
301// Code returns the error code that may be compared against one of
302// the gozk.Z* constants.
303func (error *errorType) Code() int {
304 return int(error.zkrc)
305}
306
307// -----------------------------------------------------------------------
308// Stat interface which maps onto the ZooKeeper Stat struct.
309
310// We declare this as an interface rather than an actual struct because
311// this way we don't have to copy data around between the real C struct
312// and the Go one on every call. Most uses will only touch a few elements,
313// or even ignore the stat entirely, so that's a win.
314300
315// Stat contains detailed information about a node.301// Stat contains detailed information about a node.
316type Stat interface {302type Stat struct {
317 Czxid() int64303 c C.struct_Stat
318 Mzxid() int64304}
319 CTime() int64305
320 MTime() int64306func (stat *Stat) String() string {
321 Version() int32307 return fmt.Sprintf(
322 CVersion() int32308 "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+
323 AVersion() int32309 "Version: %d; CVersion: %d; AVersion: %d; "+
324 EphemeralOwner() int64310 "EphemeralOwner: %d; DataLength: %d; "+
325 DataLength() int32311 "NumChildren: %d; Pzxid: %d}",
326 NumChildren() int32312 stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(),
327 Pzxid() int64313 stat.Version(), stat.CVersion(), stat.AVersion(),
328}314 stat.EphemeralOwner(), stat.DataLength(),
329315 stat.NumChildren(), stat.Pzxid(),
330type resultStat C.struct_Stat316 )
331317}
332func (stat *resultStat) Czxid() int64 {318
333 return int64(stat.czxid)319func (stat *Stat) Czxid() int64 {
334}320 return int64(stat.c.czxid)
335321}
336func (stat *resultStat) Mzxid() int64 {322
337 return int64(stat.mzxid)323func (stat *Stat) Mzxid() int64 {
338}324 return int64(stat.c.mzxid)
339325}
340func (stat *resultStat) CTime() int64 {326
341 return int64(stat.ctime)327func (stat *Stat) CTime() int64 {
342}328 return int64(stat.c.ctime)
343329}
344func (stat *resultStat) MTime() int64 {330
345 return int64(stat.mtime)331func (stat *Stat) MTime() int64 {
346}332 return int64(stat.c.mtime)
347333}
348func (stat *resultStat) Version() int32 {334
349 return int32(stat.version)335func (stat *Stat) Version() int32 {
350}336 return int32(stat.c.version)
351337}
352func (stat *resultStat) CVersion() int32 {338
353 return int32(stat.cversion)339func (stat *Stat) CVersion() int32 {
354}340 return int32(stat.c.cversion)
355341}
356func (stat *resultStat) AVersion() int32 {342
357 return int32(stat.aversion)343func (stat *Stat) AVersion() int32 {
358}344 return int32(stat.c.aversion)
359345}
360func (stat *resultStat) EphemeralOwner() int64 {346
361 return int64(stat.ephemeralOwner)347func (stat *Stat) EphemeralOwner() int64 {
362}348 return int64(stat.c.ephemeralOwner)
363349}
364func (stat *resultStat) DataLength() int32 {350
365 return int32(stat.dataLength)351func (stat *Stat) DataLength() int32 {
366}352 return int32(stat.c.dataLength)
367353}
368func (stat *resultStat) NumChildren() int32 {354
369 return int32(stat.numChildren)355func (stat *Stat) NumChildren() int32 {
370}356 return int32(stat.c.numChildren)
371357}
372func (stat *resultStat) Pzxid() int64 {358
373 return int64(stat.pzxid)359func (stat *Stat) Pzxid() int64 {
360 return int64(stat.c.pzxid)
374}361}
375362
376// -----------------------------------------------------------------------363// -----------------------------------------------------------------------
@@ -384,7 +371,7 @@
384 C.zoo_set_debug_level(C.ZooLogLevel(level))371 C.zoo_set_debug_level(C.ZooLogLevel(level))
385}372}
386373
387// Init initializes the communication with a ZooKeeper cluster. The provided374// Dial initializes the communication with a ZooKeeper cluster. The provided
388// servers parameter may include multiple server addresses, separated375// servers parameter may include multiple server addresses, separated
389// by commas, so that the client will automatically attempt to connect376// by commas, so that the client will automatically attempt to connect
390// to another server if one of them stops working for whatever reason.377// to another server if one of them stops working for whatever reason.
@@ -398,79 +385,73 @@
398// The watch channel receives events of type SESSION_EVENT when any change385// The watch channel receives events of type SESSION_EVENT when any change
399// to the state of the established connection happens. See the documentation386// to the state of the established connection happens. See the documentation
400// for the Event type for more details.387// for the Event type for more details.
401func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {388func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) {
402 zk, watch, err = internalInit(servers, recvTimeoutNS, nil)389 return dial(servers, recvTimeoutNS, nil)
403 return
404}390}
405391
406// Equivalent to Init, but attempt to reestablish an existing session392// Redial is equivalent to Dial, but attempts to reestablish an existing session
407// identified via the clientId parameter.393// identified via the clientId parameter.
408func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {394func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
409 zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)395 return dial(servers, recvTimeoutNS, clientId)
410 return
411}396}
412397
413func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {398func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
414399 conn := &Conn{}
415 zk := &ZooKeeper{}400 conn.watchChannels = make(map[uintptr]chan Event)
416 zk.watchChannels = make(map[uintptr]chan Event)
417401
418 var cId *C.clientid_t402 var cId *C.clientid_t
419 if clientId != nil {403 if clientId != nil {
420 cId = &clientId.cId404 cId = &clientId.cId
421 }405 }
422406
423 watchId, watchChannel := zk.createWatch(true)407 watchId, watchChannel := conn.createWatch(true)
424 zk.sessionWatchId = watchId408 conn.sessionWatchId = watchId
425409
426 cservers := C.CString(servers)410 cservers := C.CString(servers)
427 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)411 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
428 C.free(unsafe.Pointer(cservers))412 C.free(unsafe.Pointer(cservers))
429 if handle == nil {413 if handle == nil {
430 zk.closeAllWatches()414 conn.closeAllWatches()
431 return nil, nil, newError(ZSYSTEMERROR, cerr)415 return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
432 }416 }
433 zk.handle = handle417 conn.handle = handle
434 runWatchLoop()418 runWatchLoop()
435 return zk, watchChannel, nil419 return conn, watchChannel, nil
436}420}
437421
438// ClientId returns the client ID for the existing session with ZooKeeper.422// ClientId returns the client ID for the existing session with ZooKeeper.
439// This is useful to reestablish an existing session via ReInit.423// This is useful to reestablish an existing session via ReInit.
440func (zk *ZooKeeper) ClientId() *ClientId {424func (conn *Conn) ClientId() *ClientId {
441 return &ClientId{*C.zoo_client_id(zk.handle)}425 return &ClientId{*C.zoo_client_id(conn.handle)}
442}426}
443427
444// Close terminates the ZooKeeper interaction.428// Close terminates the ZooKeeper interaction.
445func (zk *ZooKeeper) Close() Error {429func (conn *Conn) Close() os.Error {
446430
447 // Protect from concurrency around zk.handle change.431 // Protect from concurrency around conn.handle change.
448 zk.mutex.Lock()432 conn.mutex.Lock()
449 defer zk.mutex.Unlock()433 defer conn.mutex.Unlock()
450434
451 if zk.handle == nil {435 if conn.handle == nil {
452 // ZooKeeper may hang indefinitely if a handler is closed twice,436 // ZooKeeper may hang indefinitely if a handler is closed twice,
453 // so we get in the way and prevent it from happening.437 // so we get in the way and prevent it from happening.
454 return newError(ZCLOSING, nil)438 return ZCLOSING
455 }439 }
456 rc, cerr := C.zookeeper_close(zk.handle)440 rc, cerr := C.zookeeper_close(conn.handle)
457441
458 zk.closeAllWatches()442 conn.closeAllWatches()
459 stopWatchLoop()443 stopWatchLoop()
460444
461 // At this point, nothing else should need zk.handle.445 // At this point, nothing else should need conn.handle.
462 zk.handle = nil446 conn.handle = nil
463447
464 if rc != C.ZOK {448 return zkError(rc, cerr)
465 return newError(rc, cerr)
466 }
467 return nil
468}449}
469450
470// Get returns the data and status from an existing node. err will be nil,451// Get returns the data and status from an existing node. err will be nil,
471// unless an error is found. Attempting to retrieve data from a non-existing452// unless an error is found. Attempting to retrieve data from a non-existing
472// node is an error.453// node is an error.
473func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {454func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
474455
475 cpath := C.CString(path)456 cpath := C.CString(path)
476 cbuffer := (*C.char)(C.malloc(bufferSize))457 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -478,22 +459,21 @@
478 defer C.free(unsafe.Pointer(cpath))459 defer C.free(unsafe.Pointer(cpath))
479 defer C.free(unsafe.Pointer(cbuffer))460 defer C.free(unsafe.Pointer(cbuffer))
480461
481 cstat := C.struct_Stat{}462 var cstat Stat
482 rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,463 rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c)
483 cbuffer, &cbufferLen, &cstat)
484 if rc != C.ZOK {464 if rc != C.ZOK {
485 return "", nil, newError(rc, cerr)465 return "", nil, zkError(rc, cerr)
486 }466 }
467
487 result := C.GoStringN(cbuffer, cbufferLen)468 result := C.GoStringN(cbuffer, cbufferLen)
488469 return result, &cstat, nil
489 return result, (*resultStat)(&cstat), nil
490}470}
491471
492// GetW works like Get but also returns a channel that will receive472// GetW works like Get but also returns a channel that will receive
493// a single Event value when the data or existence of the given ZooKeeper473// a single Event value when the data or existence of the given ZooKeeper
494// node changes or when critical session events happen. See the474// node changes or when critical session events happen. See the
495// documentation of the Event type for more details.475// documentation of the Event type for more details.
496func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {476func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) {
497477
498 cpath := C.CString(path)478 cpath := C.CString(path)
499 cbuffer := (*C.char)(C.malloc(bufferSize))479 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -501,42 +481,38 @@
501 defer C.free(unsafe.Pointer(cpath))481 defer C.free(unsafe.Pointer(cpath))
502 defer C.free(unsafe.Pointer(cbuffer))482 defer C.free(unsafe.Pointer(cbuffer))
503483
504 watchId, watchChannel := zk.createWatch(true)484 watchId, watchChannel := conn.createWatch(true)
505485
506 cstat := C.struct_Stat{}486 var cstat Stat
507 rc, cerr := C.zoo_wget(zk.handle, cpath,487 rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c)
508 C.watch_handler, unsafe.Pointer(watchId),
509 cbuffer, &cbufferLen, &cstat)
510 if rc != C.ZOK {488 if rc != C.ZOK {
511 zk.forgetWatch(watchId)489 conn.forgetWatch(watchId)
512 return "", nil, nil, newError(rc, cerr)490 return "", nil, nil, zkError(rc, cerr)
513 }491 }
514492
515 result := C.GoStringN(cbuffer, cbufferLen)493 result := C.GoStringN(cbuffer, cbufferLen)
516 return result, (*resultStat)(&cstat), watchChannel, nil494 return result, &cstat, watchChannel, nil
517}495}
518496
519// Children returns the children list and status from an existing node.497// Children returns the children list and status from an existing node.
520// err will be nil, unless an error is found. Attempting to retrieve the498// Attempting to retrieve the children list from a non-existent node is an error.
521// children list from a non-existent node is an error.499func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
522func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
523500
524 cpath := C.CString(path)501 cpath := C.CString(path)
525 defer C.free(unsafe.Pointer(cpath))502 defer C.free(unsafe.Pointer(cpath))
526503
527 cvector := C.struct_String_vector{}504 cvector := C.struct_String_vector{}
528 cstat := C.struct_Stat{}505 var cstat Stat
529 rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,506 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
530 &cvector, &cstat)
531507
532 // Can't happen if rc != 0, but avoid potential memory leaks in the future.508 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
533 if cvector.count != 0 {509 if cvector.count != 0 {
534 children = parseStringVector(&cvector)510 children = parseStringVector(&cvector)
535 }511 }
536 if rc != C.ZOK {512 if rc == C.ZOK {
537 err = newError(rc, cerr)513 stat = &cstat
538 } else {514 } else {
539 stat = (*resultStat)(&cstat)515 err = zkError(rc, cerr)
540 }516 }
541 return517 return
542}518}
@@ -545,29 +521,27 @@
545// receive a single Event value when a node is added or removed under the521// receive a single Event value when a node is added or removed under the
546// provided path or when critical session events happen. See the documentation522// provided path or when critical session events happen. See the documentation
547// of the Event type for more details.523// of the Event type for more details.
548func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {524func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) {
549525
550 cpath := C.CString(path)526 cpath := C.CString(path)
551 defer C.free(unsafe.Pointer(cpath))527 defer C.free(unsafe.Pointer(cpath))
552528
553 watchId, watchChannel := zk.createWatch(true)529 watchId, watchChannel := conn.createWatch(true)
554530
555 cvector := C.struct_String_vector{}531 cvector := C.struct_String_vector{}
556 cstat := C.struct_Stat{}532 var cstat Stat
557 rc, cerr := C.zoo_wget_children2(zk.handle, cpath,533 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c)
558 C.watch_handler, unsafe.Pointer(watchId),
559 &cvector, &cstat)
560534
561 // Can't happen if rc != 0, but avoid potential memory leaks in the future.535 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
562 if cvector.count != 0 {536 if cvector.count != 0 {
563 children = parseStringVector(&cvector)537 children = parseStringVector(&cvector)
564 }538 }
565 if rc != C.ZOK {539 if rc == C.ZOK {
566 zk.forgetWatch(watchId)540 stat = &cstat
567 err = newError(rc, cerr)541 watch = watchChannel
568 } else {542 } else {
569 stat = (*resultStat)(&cstat)543 conn.forgetWatch(watchId)
570 watch = watchChannel544 err = zkError(rc, cerr)
571 }545 }
572 return546 return
573}547}
@@ -588,20 +562,20 @@
588// Exists checks if a node exists at the given path. If it does,562// Exists checks if a node exists at the given path. If it does,
589// stat will contain meta information on the existing node, otherwise563// stat will contain meta information on the existing node, otherwise
590// it will be nil.564// it will be nil.
591func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {565func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) {
592 cpath := C.CString(path)566 cpath := C.CString(path)
593 defer C.free(unsafe.Pointer(cpath))567 defer C.free(unsafe.Pointer(cpath))
594568
595 cstat := C.struct_Stat{}569 var cstat Stat
596 rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)570 rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c)
597571
598 // We diverge a bit from the usual here: a ZNONODE is not an error572 // We diverge a bit from the usual here: a ZNONODE is not an error
599 // for an exists call, otherwise every Exists call would have to check573 // for an exists call, otherwise every Exists call would have to check
600 // for err != nil and err.Code() != ZNONODE.574 // for err != nil and err.Code() != ZNONODE.
601 if rc == C.ZOK {575 if rc == C.ZOK {
602 stat = (*resultStat)(&cstat)576 stat = &cstat
603 } else if rc != C.ZNONODE {577 } else if rc != C.ZNONODE {
604 err = newError(rc, cerr)578 err = zkError(rc, cerr)
605 }579 }
606 return580 return
607}581}
@@ -611,28 +585,27 @@
611// stat is nil and the node didn't exist, or when the existing node585// stat is nil and the node didn't exist, or when the existing node
612// is removed. It will also receive critical session events. See the586// is removed. It will also receive critical session events. See the
613// documentation of the Event type for more details.587// documentation of the Event type for more details.
614func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {588func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) {
615 cpath := C.CString(path)589 cpath := C.CString(path)
616 defer C.free(unsafe.Pointer(cpath))590 defer C.free(unsafe.Pointer(cpath))
617591
618 watchId, watchChannel := zk.createWatch(true)592 watchId, watchChannel := conn.createWatch(true)
619593
620 cstat := C.struct_Stat{}594 var cstat Stat
621 rc, cerr := C.zoo_wexists(zk.handle, cpath,595 rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
622 C.watch_handler, unsafe.Pointer(watchId), &cstat)
623596
624 // We diverge a bit from the usual here: a ZNONODE is not an error597 // We diverge a bit from the usual here: a ZNONODE is not an error
625 // for an exists call, otherwise every Exists call would have to check598 // for an exists call, otherwise every Exists call would have to check
626 // for err != nil and err.Code() != ZNONODE.599 // for err != nil and err.Code() != ZNONODE.
627 switch rc {600 switch Error(rc) {
628 case ZOK:601 case ZOK:
629 stat = (*resultStat)(&cstat)602 stat = &cstat
630 watch = watchChannel603 watch = watchChannel
631 case ZNONODE:604 case ZNONODE:
632 watch = watchChannel605 watch = watchChannel
633 default:606 default:
634 zk.forgetWatch(watchId)607 conn.forgetWatch(watchId)
635 err = newError(rc, cerr)608 err = zkError(rc, cerr)
636 }609 }
637 return610 return
638}611}
@@ -646,7 +619,7 @@
646// The returned path is useful in cases where the created path may differ619// The returned path is useful in cases where the created path may differ
647// from the requested one, such as when a sequence number is appended620// from the requested one, such as when a sequence number is appended
648// to it due to the use of the gozk.SEQUENCE flag.621// to it due to the use of the gozk.SEQUENCE flag.
649func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {622func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
650 cpath := C.CString(path)623 cpath := C.CString(path)
651 cvalue := C.CString(value)624 cvalue := C.CString(value)
652 defer C.free(unsafe.Pointer(cpath))625 defer C.free(unsafe.Pointer(cpath))
@@ -660,13 +633,13 @@
660 cpathCreated := (*C.char)(C.malloc(cpathLen))633 cpathCreated := (*C.char)(C.malloc(cpathLen))
661 defer C.free(unsafe.Pointer(cpathCreated))634 defer C.free(unsafe.Pointer(cpathCreated))
662635
663 rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),636 rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
664 caclv, C.int(flags), cpathCreated, C.int(cpathLen))637 if rc == C.ZOK {
665 if rc != C.ZOK {638 pathCreated = C.GoString(cpathCreated)
666 return "", newError(rc, cerr)639 } else {
640 err = zkError(rc, cerr)
667 }641 }
668642 return
669 return C.GoString(cpathCreated), nil
670}643}
671644
672// Set modifies the data for the existing node at the given path, replacing it645// Set modifies the data for the existing node at the given path, replacing it
@@ -677,35 +650,31 @@
677//650//
678// It is an error to attempt to set the data of a non-existing node with651// It is an error to attempt to set the data of a non-existing node with
679// this function. In these cases, use Create instead.652// this function. In these cases, use Create instead.
680func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {653func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
681654
682 cpath := C.CString(path)655 cpath := C.CString(path)
683 cvalue := C.CString(value)656 cvalue := C.CString(value)
684 defer C.free(unsafe.Pointer(cpath))657 defer C.free(unsafe.Pointer(cpath))
685 defer C.free(unsafe.Pointer(cvalue))658 defer C.free(unsafe.Pointer(cvalue))
686659
687 cstat := C.struct_Stat{}660 var cstat Stat
688661 rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c)
689 rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),662 if rc == C.ZOK {
690 C.int(version), &cstat)663 stat = &cstat
691 if rc != C.ZOK {664 } else {
692 return nil, newError(rc, cerr)665 err = zkError(rc, cerr)
693 }666 }
694667 return
695 return (*resultStat)(&cstat), nil
696}668}
697669
698// Delete removes the node at path. If version is not -1, the operation670// Delete removes the node at path. If version is not -1, the operation
699// will only succeed if the node is still at this version when the671// will only succeed if the node is still at this version when the
700// node is deleted as an atomic operation.672// node is deleted as an atomic operation.
701func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {673func (conn *Conn) Delete(path string, version int32) (err os.Error) {
702 cpath := C.CString(path)674 cpath := C.CString(path)
703 defer C.free(unsafe.Pointer(cpath))675 defer C.free(unsafe.Pointer(cpath))
704 rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))676 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
705 if rc != C.ZOK {677 return zkError(rc, cerr)
706 return newError(rc, cerr)
707 }
708 return nil
709}678}
710679
711// AddAuth adds a new authentication certificate to the ZooKeeper680// AddAuth adds a new authentication certificate to the ZooKeeper
@@ -713,7 +682,7 @@
713// authentication information, while the cert parameter provides the682// authentication information, while the cert parameter provides the
714// identity data itself. For instance, the "digest" scheme requires683// identity data itself. For instance, the "digest" scheme requires
715// a pair like "username:password" to be provided as the certificate.684// a pair like "username:password" to be provided as the certificate.
716func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {685func (conn *Conn) AddAuth(scheme, cert string) os.Error {
717 cscheme := C.CString(scheme)686 cscheme := C.CString(scheme)
718 ccert := C.CString(cert)687 ccert := C.CString(cert)
719 defer C.free(unsafe.Pointer(cscheme))688 defer C.free(unsafe.Pointer(cscheme))
@@ -725,43 +694,38 @@
725 }694 }
726 defer C.destroy_completion_data(data)695 defer C.destroy_completion_data(data)
727696
728 rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),697 rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data))
729 C.handle_void_completion, unsafe.Pointer(data))
730 if rc != C.ZOK {698 if rc != C.ZOK {
731 return newError(rc, cerr)699 return zkError(rc, cerr)
732 }700 }
733701
734 C.wait_for_completion(data)702 C.wait_for_completion(data)
735703
736 rc = C.int(uintptr(data.data))704 rc = C.int(uintptr(data.data))
737 if rc != C.ZOK {705 return zkError(rc, nil)
738 return newError(rc, nil)
739 }
740
741 return nil
742}706}
743707
744// ACL returns the access control list for path.708// ACL returns the access control list for path.
745func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {709func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
746710
747 cpath := C.CString(path)711 cpath := C.CString(path)
748 defer C.free(unsafe.Pointer(cpath))712 defer C.free(unsafe.Pointer(cpath))
749713
750 caclv := C.struct_ACL_vector{}714 caclv := C.struct_ACL_vector{}
751 cstat := C.struct_Stat{}
752715
753 rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)716 var cstat Stat
717 rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
754 if rc != C.ZOK {718 if rc != C.ZOK {
755 return nil, nil, newError(rc, cerr)719 return nil, nil, zkError(rc, cerr)
756 }720 }
757721
758 aclv := parseACLVector(&caclv)722 aclv := parseACLVector(&caclv)
759723
760 return aclv, (*resultStat)(&cstat), nil724 return aclv, &cstat, nil
761}725}
762726
763// SetACL changes the access control list for path.727// SetACL changes the access control list for path.
764func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {728func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
765729
766 cpath := C.CString(path)730 cpath := C.CString(path)
767 defer C.free(unsafe.Pointer(cpath))731 defer C.free(unsafe.Pointer(cpath))
@@ -769,12 +733,8 @@
769 caclv := buildACLVector(aclv)733 caclv := buildACLVector(aclv)
770 defer C.deallocate_ACL_vector(caclv)734 defer C.deallocate_ACL_vector(caclv)
771735
772 rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)736 rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
773 if rc != C.ZOK {737 return zkError(rc, cerr)
774 return newError(rc, cerr)
775 }
776
777 return nil
778}738}
779739
780func parseACLVector(caclv *C.struct_ACL_vector) []ACL {740func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
@@ -822,7 +782,7 @@
822// -----------------------------------------------------------------------782// -----------------------------------------------------------------------
823// RetryChange utility method.783// RetryChange utility method.
824784
825type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)785type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
826786
827// RetryChange runs changeFunc to attempt to atomically change path787// RetryChange runs changeFunc to attempt to atomically change path
828// in a lock free manner, and retries in case there was another788// in a lock free manner, and retries in case there was another
@@ -831,8 +791,7 @@
831// changeFunc must work correctly if called multiple times in case791// changeFunc must work correctly if called multiple times in case
832// the modification fails due to concurrent changes, and it may return792// the modification fails due to concurrent changes, and it may return
833// an error that will cause the the RetryChange function to stop and793// an error that will cause the the RetryChange function to stop and
834// return an Error with code ZSYSTEMERROR and the same .String() result794// return the same error.
835// as the provided error.
836//795//
837// This mechanism is not suitable for a node that is frequently modified796// This mechanism is not suitable for a node that is frequently modified
838// concurrently. For those cases, consider using a pessimistic locking797// concurrently. For those cases, consider using a pessimistic locking
@@ -845,8 +804,7 @@
845//804//
846// 2. Call the changeFunc with the current node value and stat,805// 2. Call the changeFunc with the current node value and stat,
847// or with an empty string and nil stat, if the node doesn't yet exist.806// or with an empty string and nil stat, if the node doesn't yet exist.
848// If the changeFunc returns an error, stop and return an Error with807// If the changeFunc returns an error, stop and return the same error.
849// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
850//808//
851// 3. If the changeFunc returns no errors, use the string returned as809// 3. If the changeFunc returns no errors, use the string returned as
852// the new candidate value for the node, and attempt to either create810// the new candidate value for the node, and attempt to either create
@@ -855,36 +813,32 @@
855// in the same node), repeat from step 1. If this procedure fails with any813// in the same node), repeat from step 1. If this procedure fails with any
856// other error, stop and return the error found.814// other error, stop and return the error found.
857//815//
858func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {816func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
859 for {817 for {
860 oldValue, oldStat, getErr := zk.Get(path)818 oldValue, oldStat, err := conn.Get(path)
861 if getErr != nil && getErr.Code() != ZNONODE {819 if err != nil && err != ZNONODE {
862 err = getErr820 return err
863 break821 }
864 }822 newValue, err := changeFunc(oldValue, oldStat)
865 newValue, osErr := changeFunc(oldValue, oldStat)823 if err != nil {
866 if osErr != nil {824 return err
867 return newError(ZSYSTEMERROR, osErr)825 }
868 } else if oldStat == nil {826 if oldStat == nil {
869 _, err = zk.Create(path, newValue, flags, acl)827 _, err := conn.Create(path, newValue, flags, acl)
870 if err == nil || err.Code() != ZNODEEXISTS {828 if err == nil || err != ZNODEEXISTS {
871 break829 return err
872 }830 }
873 } else if newValue == oldValue {831 continue
832 }
833 if newValue == oldValue {
874 return nil // Nothing to do.834 return nil // Nothing to do.
875 } else {835 }
876 _, err = zk.Set(path, newValue, oldStat.Version())836 _, err = conn.Set(path, newValue, oldStat.Version())
877 if err == nil {837 if err == nil || (err != ZBADVERSION && err != ZNONODE) {
878 break838 return err
879 } else {
880 code := err.Code()
881 if code != ZBADVERSION && code != ZNONODE {
882 break
883 }
884 }
885 }839 }
886 }840 }
887 return err841 panic("not reached")
888}842}
889843
890// -----------------------------------------------------------------------844// -----------------------------------------------------------------------
@@ -897,7 +851,7 @@
897// Whenever a *W method is called, it will return a channel which851// Whenever a *W method is called, it will return a channel which
898// outputs Event values. Internally, a map is used to maintain references852// outputs Event values. Internally, a map is used to maintain references
899// between an unique integer key (the watchId), and the event channel. The853// between an unique integer key (the watchId), and the event channel. The
900// watchId is then handed to the C zookeeper library as the watch context,854// watchId is then handed to the C ZooKeeper library as the watch context,
901// so that we get it back when events happen. Using an integer key as the855// so that we get it back when events happen. Using an integer key as the
902// watch context rather than a pointer is needed because there's no guarantee856// watch context rather than a pointer is needed because there's no guarantee
903// that in the future the GC will not move objects around, and also because857// that in the future the GC will not move objects around, and also because
@@ -910,13 +864,13 @@
910// Since Cgo doesn't allow calling back into Go, we actually fire a new864// Since Cgo doesn't allow calling back into Go, we actually fire a new
911// goroutine the very first time Init is called, and allow it to block865// goroutine the very first time Init is called, and allow it to block
912// in a pthread condition variable within a C function. This condition866// in a pthread condition variable within a C function. This condition
913// will only be notified once a zookeeper watch callback appends new867// will only be notified once a ZooKeeper watch callback appends new
914// entries to the event list. When this happens, the C function returns868// entries to the event list. When this happens, the C function returns
915// and we get back into Go land with the pointer to the watch data,869// and we get back into Go land with the pointer to the watch data,
916// including the watchId and other event details such as type and path.870// including the watchId and other event details such as type and path.
917871
918var watchMutex sync.Mutex872var watchMutex sync.Mutex
919var watchZooKeepers = make(map[uintptr]*ZooKeeper)873var watchConns = make(map[uintptr]*Conn)
920var watchCounter uintptr874var watchCounter uintptr
921var watchLoopCounter int875var watchLoopCounter int
922876
@@ -925,14 +879,14 @@
925// mostly as a debugging and testing aid.879// mostly as a debugging and testing aid.
926func CountPendingWatches() int {880func CountPendingWatches() int {
927 watchMutex.Lock()881 watchMutex.Lock()
928 count := len(watchZooKeepers)882 count := len(watchConns)
929 watchMutex.Unlock()883 watchMutex.Unlock()
930 return count884 return count
931}885}
932886
933// createWatch creates and registers a watch, returning the watch id887// createWatch creates and registers a watch, returning the watch id
934// and channel.888// and channel.
935func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {889func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
936 buf := 1 // session/watch event890 buf := 1 // session/watch event
937 if session {891 if session {
938 buf = 32892 buf = 32
@@ -942,8 +896,8 @@
942 defer watchMutex.Unlock()896 defer watchMutex.Unlock()
943 watchId = watchCounter897 watchId = watchCounter
944 watchCounter += 1898 watchCounter += 1
945 zk.watchChannels[watchId] = watchChannel899 conn.watchChannels[watchId] = watchChannel
946 watchZooKeepers[watchId] = zk900 watchConns[watchId] = conn
947 return901 return
948}902}
949903
@@ -951,21 +905,21 @@
951// from ever getting delivered. It shouldn't be used if there's any905// from ever getting delivered. It shouldn't be used if there's any
952// chance the watch channel is still visible and not closed, since906// chance the watch channel is still visible and not closed, since
953// it might mean a goroutine would be blocked forever.907// it might mean a goroutine would be blocked forever.
954func (zk *ZooKeeper) forgetWatch(watchId uintptr) {908func (conn *Conn) forgetWatch(watchId uintptr) {
955 watchMutex.Lock()909 watchMutex.Lock()
956 defer watchMutex.Unlock()910 defer watchMutex.Unlock()
957 zk.watchChannels[watchId] = nil, false911 conn.watchChannels[watchId] = nil, false
958 watchZooKeepers[watchId] = nil, false912 watchConns[watchId] = nil, false
959}913}
960914
961// closeAllWatches closes all watch channels for zk.915// closeAllWatches closes all watch channels for conn.
962func (zk *ZooKeeper) closeAllWatches() {916func (conn *Conn) closeAllWatches() {
963 watchMutex.Lock()917 watchMutex.Lock()
964 defer watchMutex.Unlock()918 defer watchMutex.Unlock()
965 for watchId, ch := range zk.watchChannels {919 for watchId, ch := range conn.watchChannels {
966 close(ch)920 close(ch)
967 zk.watchChannels[watchId] = nil, false921 conn.watchChannels[watchId] = nil, false
968 watchZooKeepers[watchId] = nil, false922 watchConns[watchId] = nil, false
969 }923 }
970}924}
971925
@@ -978,11 +932,11 @@
978 }932 }
979 watchMutex.Lock()933 watchMutex.Lock()
980 defer watchMutex.Unlock()934 defer watchMutex.Unlock()
981 zk, ok := watchZooKeepers[watchId]935 conn, ok := watchConns[watchId]
982 if !ok {936 if !ok {
983 return937 return
984 }938 }
985 if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId {939 if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
986 switch event.State {940 switch event.State {
987 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:941 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
988 default:942 default:
@@ -990,7 +944,7 @@
990 return944 return
991 }945 }
992 }946 }
993 ch := zk.watchChannels[watchId]947 ch := conn.watchChannels[watchId]
994 if ch == nil {948 if ch == nil {
995 return949 return
996 }950 }
@@ -1002,15 +956,15 @@
1002 // straight to the buffer), and the application isn't paying956 // straight to the buffer), and the application isn't paying
1003 // attention for long enough to have the buffer filled up.957 // attention for long enough to have the buffer filled up.
1004 // Break down now rather than leaking forever.958 // Break down now rather than leaking forever.
1005 if watchId == zk.sessionWatchId {959 if watchId == conn.sessionWatchId {
1006 panic("Session event channel buffer is full")960 panic("Session event channel buffer is full")
1007 } else {961 } else {
1008 panic("Watch event channel buffer is full")962 panic("Watch event channel buffer is full")
1009 }963 }
1010 }964 }
1011 if watchId != zk.sessionWatchId {965 if watchId != conn.sessionWatchId {
1012 zk.watchChannels[watchId] = nil, false966 conn.watchChannels[watchId] = nil, false
1013 watchZooKeepers[watchId] = nil, false967 watchConns[watchId] = nil, false
1014 close(ch)968 close(ch)
1015 }969 }
1016}970}
1017971
=== renamed file 'gozk_test.go' => 'zk_test.go'
--- gozk_test.go 2011-08-19 01:51:37 +0000
+++ zk_test.go 2011-10-03 15:02:28 +0000
@@ -1,17 +1,17 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/gozk/zk"
6 "time"6 "time"
7)7)
88
9// This error will be delivered via C errno, since ZK unfortunately9// This error will be delivered via C errno, since ZK unfortunately
10// only provides the handler back from zookeeper_init().10// only provides the handler back from zookeeper_init().
11func (s *S) TestInitErrorThroughErrno(c *C) {11func (s *S) TestInitErrorThroughErrno(c *C) {
12 zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)12 conn, watch, err := zk.Dial("bad-domain-without-port", 5e9)
13 if zk != nil {13 if conn != nil {
14 zk.Close()14 conn.Close()
15 }15 }
16 if watch != nil {16 if watch != nil {
17 go func() {17 go func() {
@@ -23,15 +23,15 @@
23 }23 }
24 }()24 }()
25 }25 }
26 c.Assert(zk, IsNil)26 c.Assert(conn, IsNil)
27 c.Assert(watch, IsNil)27 c.Assert(watch, IsNil)
28 c.Assert(err, Matches, "invalid argument")28 c.Assert(err, Matches, "invalid argument")
29}29}
3030
31func (s *S) TestRecvTimeoutInitParameter(c *C) {31func (s *S) TestRecvTimeoutInitParameter(c *C) {
32 zk, watch, err := gozk.Init(s.zkAddr, 0)32 conn, watch, err := zk.Dial(s.zkAddr, 0)
33 c.Assert(err, IsNil)33 c.Assert(err, IsNil)
34 defer zk.Close()34 defer conn.Close()
3535
36 select {36 select {
37 case <-watch:37 case <-watch:
@@ -40,7 +40,7 @@
40 }40 }
4141
42 for i := 0; i != 1000; i++ {42 for i := 0; i != 1000; i++ {
43 _, _, err := zk.Get("/zookeeper")43 _, _, err := conn.Get("/zookeeper")
44 if err != nil {44 if err != nil {
45 c.Assert(err, Matches, "operation timeout")45 c.Assert(err, Matches, "operation timeout")
46 c.SucceedNow()46 c.SucceedNow()
@@ -51,76 +51,79 @@
51}51}
5252
53func (s *S) TestSessionWatches(c *C) {53func (s *S) TestSessionWatches(c *C) {
54 c.Assert(gozk.CountPendingWatches(), Equals, 0)54 c.Assert(zk.CountPendingWatches(), Equals, 0)
5555
56 zk1, watch1 := s.init(c)56 zk1, watch1 := s.init(c)
57 zk2, watch2 := s.init(c)57 zk2, watch2 := s.init(c)
58 zk3, watch3 := s.init(c)58 zk3, watch3 := s.init(c)
5959
60 c.Assert(gozk.CountPendingWatches(), Equals, 3)60 c.Assert(zk.CountPendingWatches(), Equals, 3)
6161
62 event1 := <-watch162 event1 := <-watch1
63 c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)63 c.Assert(event1.Type, Equals, zk.EVENT_SESSION)
64 c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)64 c.Assert(event1.State, Equals, zk.STATE_CONNECTED)
6565
66 c.Assert(gozk.CountPendingWatches(), Equals, 3)66 c.Assert(zk.CountPendingWatches(), Equals, 3)
6767
68 event2 := <-watch268 event2 := <-watch2
69 c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)69 c.Assert(event2.Type, Equals, zk.EVENT_SESSION)
70 c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)70 c.Assert(event2.State, Equals, zk.STATE_CONNECTED)
7171
72 c.Assert(gozk.CountPendingWatches(), Equals, 3)72 c.Assert(zk.CountPendingWatches(), Equals, 3)
7373
74 event3 := <-watch374 event3 := <-watch3
75 c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)75 c.Assert(event3.Type, Equals, zk.EVENT_SESSION)
76 c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)76 c.Assert(event3.State, Equals, zk.STATE_CONNECTED)
7777
78 c.Assert(gozk.CountPendingWatches(), Equals, 3)78 c.Assert(zk.CountPendingWatches(), Equals, 3)
7979
80 zk1.Close()80 zk1.Close()
81 c.Assert(gozk.CountPendingWatches(), Equals, 2)81 c.Assert(zk.CountPendingWatches(), Equals, 2)
82 zk2.Close()82 zk2.Close()
83 c.Assert(gozk.CountPendingWatches(), Equals, 1)83 c.Assert(zk.CountPendingWatches(), Equals, 1)
84 zk3.Close()84 zk3.Close()
85 c.Assert(gozk.CountPendingWatches(), Equals, 0)85 c.Assert(zk.CountPendingWatches(), Equals, 0)
86}86}
8787
88// Gozk injects a STATE_CLOSED event when zk.Close() is called, right88// Gozk injects a STATE_CLOSED event when conn.Close() is called, right
89// before the channel is closed. Closing the channel injects a nil89// before the channel is closed. Closing the channel injects a nil
90// pointer, as usual for Go, so the STATE_CLOSED gives a chance to90// pointer, as usual for Go, so the STATE_CLOSED gives a chance to
91// know that a nil pointer is coming, and to stop the procedure.91// know that a nil pointer is coming, and to stop the procedure.
92// Hopefully this procedure will avoid some nil-pointer references by92// Hopefully this procedure will avoid some nil-pointer references by
93// mistake.93// mistake.
94func (s *S) TestClosingStateInSessionWatch(c *C) {94func (s *S) TestClosingStateInSessionWatch(c *C) {
95 zk, watch := s.init(c)95 conn, watch := s.init(c)
9696
97 event := <-watch97 event := <-watch
98 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)98 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
99 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)99 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
100100
101 zk.Close()101 conn.Close()
102 event, ok := <-watch102 event, ok := <-watch
103 c.Assert(ok, Equals, false)103 c.Assert(ok, Equals, false)
104 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)104 c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
105 c.Assert(event.State, Equals, gozk.STATE_CLOSED)105 c.Assert(event.State, Equals, zk.STATE_CLOSED)
106}106}
107107
108func (s *S) TestEventString(c *C) {108func (s *S) TestEventString(c *C) {
109 var event gozk.Event109 var event zk.Event
110 event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}110 event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED}
111 c.Assert(event, Matches, "ZooKeeper connected")111 c.Assert(event, Matches, "ZooKeeper connected")
112 event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}112 event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED}
113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")
114 event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}114 event = zk.Event{-1, "/path", zk.STATE_CLOSED}
115 c.Assert(event, Matches, "ZooKeeper connection closed")115 c.Assert(event, Matches, "ZooKeeper connection closed")
116}116}
117117
118var okTests = []struct{gozk.Event; Ok bool}{118var okTests = []struct {
119 {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},119 zk.Event
120 {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},120 Ok bool
121 {gozk.Event{0, "", gozk.STATE_CLOSED}, false},121}{
122 {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},122 {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true},
123 {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},123 {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true},
124 {zk.Event{0, "", zk.STATE_CLOSED}, false},
125 {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false},
126 {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false},
124}127}
125128
126func (s *S) TestEventOk(c *C) {129func (s *S) TestEventOk(c *C) {
@@ -130,9 +133,9 @@
130}133}
131134
132func (s *S) TestGetAndStat(c *C) {135func (s *S) TestGetAndStat(c *C) {
133 zk, _ := s.init(c)136 conn, _ := s.init(c)
134137
135 data, stat, err := zk.Get("/zookeeper")138 data, stat, err := conn.Get("/zookeeper")
136 c.Assert(err, IsNil)139 c.Assert(err, IsNil)
137 c.Assert(data, Equals, "")140 c.Assert(data, Equals, "")
138 c.Assert(stat.Czxid(), Equals, int64(0))141 c.Assert(stat.Czxid(), Equals, int64(0))
@@ -149,58 +152,58 @@
149}152}
150153
151func (s *S) TestGetAndError(c *C) {154func (s *S) TestGetAndError(c *C) {
152 zk, _ := s.init(c)155 conn, _ := s.init(c)
153156
154 data, stat, err := zk.Get("/non-existent")157 data, stat, err := conn.Get("/non-existent")
155158
156 c.Assert(data, Equals, "")159 c.Assert(data, Equals, "")
157 c.Assert(stat, IsNil)160 c.Assert(stat, IsNil)
158 c.Assert(err, Matches, "no node")161 c.Assert(err, Matches, "no node")
159 c.Assert(err.Code(), Equals, gozk.ZNONODE)162 c.Assert(err, Equals, zk.ZNONODE)
160}163}
161164
162func (s *S) TestCreateAndGet(c *C) {165func (s *S) TestCreateAndGet(c *C) {
163 zk, _ := s.init(c)166 conn, _ := s.init(c)
164167
165 path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))168 path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
166 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
167 c.Assert(path, Matches, "/test-[0-9]+")170 c.Assert(path, Matches, "/test-[0-9]+")
168171
169 // Check the error condition from Create().172 // Check the error condition from Create().
170 _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))173 _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
171 c.Assert(err, Matches, "node exists")174 c.Assert(err, Matches, "node exists")
172175
173 data, _, err := zk.Get(path)176 data, _, err := conn.Get(path)
174 c.Assert(err, IsNil)177 c.Assert(err, IsNil)
175 c.Assert(data, Equals, "bababum")178 c.Assert(data, Equals, "bababum")
176}179}
177180
178func (s *S) TestCreateSetAndGet(c *C) {181func (s *S) TestCreateSetAndGet(c *C) {
179 zk, _ := s.init(c)182 conn, _ := s.init(c)
180183
181 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))184 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
182 c.Assert(err, IsNil)185 c.Assert(err, IsNil)
183186
184 stat, err := zk.Set("/test", "bababum", -1) // Any version.187 stat, err := conn.Set("/test", "bababum", -1) // Any version.
185 c.Assert(err, IsNil)188 c.Assert(err, IsNil)
186 c.Assert(stat.Version(), Equals, int32(1))189 c.Assert(stat.Version(), Equals, int32(1))
187190
188 data, _, err := zk.Get("/test")191 data, _, err := conn.Get("/test")
189 c.Assert(err, IsNil)192 c.Assert(err, IsNil)
190 c.Assert(data, Equals, "bababum")193 c.Assert(data, Equals, "bababum")
191}194}
192195
193func (s *S) TestGetAndWatch(c *C) {196func (s *S) TestGetAndWatch(c *C) {
194 c.Check(gozk.CountPendingWatches(), Equals, 0)197 c.Check(zk.CountPendingWatches(), Equals, 0)
195198
196 zk, _ := s.init(c)199 conn, _ := s.init(c)
197200
198 c.Check(gozk.CountPendingWatches(), Equals, 1)201 c.Check(zk.CountPendingWatches(), Equals, 1)
199202
200 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))203 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
201 c.Assert(err, IsNil)204 c.Assert(err, IsNil)
202205
203 data, stat, watch, err := zk.GetW("/test")206 data, stat, watch, err := conn.GetW("/test")
204 c.Assert(err, IsNil)207 c.Assert(err, IsNil)
205 c.Assert(data, Equals, "one")208 c.Assert(data, Equals, "one")
206 c.Assert(stat.Version(), Equals, int32(0))209 c.Assert(stat.Version(), Equals, int32(0))
@@ -211,17 +214,17 @@
211 default:214 default:
212 }215 }
213216
214 c.Check(gozk.CountPendingWatches(), Equals, 2)217 c.Check(zk.CountPendingWatches(), Equals, 2)
215218
216 _, err = zk.Set("/test", "two", -1)219 _, err = conn.Set("/test", "two", -1)
217 c.Assert(err, IsNil)220 c.Assert(err, IsNil)
218221
219 event := <-watch222 event := <-watch
220 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)223 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
221224
222 c.Check(gozk.CountPendingWatches(), Equals, 1)225 c.Check(zk.CountPendingWatches(), Equals, 1)
223226
224 data, _, watch, err = zk.GetW("/test")227 data, _, watch, err = conn.GetW("/test")
225 c.Assert(err, IsNil)228 c.Assert(err, IsNil)
226 c.Assert(data, Equals, "two")229 c.Assert(data, Equals, "two")
227230
@@ -231,86 +234,86 @@
231 default:234 default:
232 }235 }
233236
234 c.Check(gozk.CountPendingWatches(), Equals, 2)237 c.Check(zk.CountPendingWatches(), Equals, 2)
235238
236 _, err = zk.Set("/test", "three", -1)239 _, err = conn.Set("/test", "three", -1)
237 c.Assert(err, IsNil)240 c.Assert(err, IsNil)
238241
239 event = <-watch242 event = <-watch
240 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)243 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
241244
242 c.Check(gozk.CountPendingWatches(), Equals, 1)245 c.Check(zk.CountPendingWatches(), Equals, 1)
243}246}
244247
245func (s *S) TestGetAndWatchWithError(c *C) {248func (s *S) TestGetAndWatchWithError(c *C) {
246 c.Check(gozk.CountPendingWatches(), Equals, 0)249 c.Check(zk.CountPendingWatches(), Equals, 0)
247250
248 zk, _ := s.init(c)251 conn, _ := s.init(c)
249252
250 c.Check(gozk.CountPendingWatches(), Equals, 1)253 c.Check(zk.CountPendingWatches(), Equals, 1)
251254
252 _, _, watch, err := zk.GetW("/test")255 _, _, watch, err := conn.GetW("/test")
253 c.Assert(err, NotNil)256 c.Assert(err, NotNil)
254 c.Assert(err.Code(), Equals, gozk.ZNONODE)257 c.Assert(err, Equals, zk.ZNONODE)
255 c.Assert(watch, IsNil)258 c.Assert(watch, IsNil)
256259
257 c.Check(gozk.CountPendingWatches(), Equals, 1)260 c.Check(zk.CountPendingWatches(), Equals, 1)
258}261}
259262
260func (s *S) TestCloseReleasesWatches(c *C) {263func (s *S) TestCloseReleasesWatches(c *C) {
261 c.Check(gozk.CountPendingWatches(), Equals, 0)264 c.Check(zk.CountPendingWatches(), Equals, 0)
262265
263 zk, _ := s.init(c)266 conn, _ := s.init(c)
264267
265 c.Check(gozk.CountPendingWatches(), Equals, 1)268 c.Check(zk.CountPendingWatches(), Equals, 1)
266269
267 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))270 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
268 c.Assert(err, IsNil)271 c.Assert(err, IsNil)
269272
270 _, _, _, err = zk.GetW("/test")273 _, _, _, err = conn.GetW("/test")
271 c.Assert(err, IsNil)274 c.Assert(err, IsNil)
272275
273 c.Assert(gozk.CountPendingWatches(), Equals, 2)276 c.Assert(zk.CountPendingWatches(), Equals, 2)
274277
275 zk.Close()278 conn.Close()
276279
277 c.Assert(gozk.CountPendingWatches(), Equals, 0)280 c.Assert(zk.CountPendingWatches(), Equals, 0)
278}281}
279282
280// By default, the ZooKeeper C client will hang indefinitely if a283// By default, the ZooKeeper C client will hang indefinitely if a
281// handler is closed twice. We get in the way and prevent it.284// handler is closed twice. We get in the way and prevent it.
282func (s *S) TestClosingTwiceDoesntHang(c *C) {285func (s *S) TestClosingTwiceDoesntHang(c *C) {
283 zk, _ := s.init(c)286 conn, _ := s.init(c)
284 err := zk.Close()287 err := conn.Close()
285 c.Assert(err, IsNil)288 c.Assert(err, IsNil)
286 err = zk.Close()289 err = conn.Close()
287 c.Assert(err, NotNil)290 c.Assert(err, NotNil)
288 c.Assert(err.Code(), Equals, gozk.ZCLOSING)291 c.Assert(err, Equals, zk.ZCLOSING)
289}292}
290293
291func (s *S) TestChildren(c *C) {294func (s *S) TestChildren(c *C) {
292 zk, _ := s.init(c)295 conn, _ := s.init(c)
293296
294 children, stat, err := zk.Children("/")297 children, stat, err := conn.Children("/")
295 c.Assert(err, IsNil)298 c.Assert(err, IsNil)
296 c.Assert(children, Equals, []string{"zookeeper"})299 c.Assert(children, Equals, []string{"zookeeper"})
297 c.Assert(stat.NumChildren(), Equals, int32(1))300 c.Assert(stat.NumChildren(), Equals, int32(1))
298301
299 children, stat, err = zk.Children("/non-existent")302 children, stat, err = conn.Children("/non-existent")
300 c.Assert(err, NotNil)303 c.Assert(err, NotNil)
301 c.Assert(err.Code(), Equals, gozk.ZNONODE)304 c.Assert(err, Equals, zk.ZNONODE)
302 c.Assert(children, Equals, []string{})305 c.Assert(children, Equals, []string{})
303 c.Assert(stat, Equals, nil)306 c.Assert(stat, IsNil)
304}307}
305308
306func (s *S) TestChildrenAndWatch(c *C) {309func (s *S) TestChildrenAndWatch(c *C) {
307 c.Check(gozk.CountPendingWatches(), Equals, 0)310 c.Check(zk.CountPendingWatches(), Equals, 0)
308311
309 zk, _ := s.init(c)312 conn, _ := s.init(c)
310313
311 c.Check(gozk.CountPendingWatches(), Equals, 1)314 c.Check(zk.CountPendingWatches(), Equals, 1)
312315
313 children, stat, watch, err := zk.ChildrenW("/")316 children, stat, watch, err := conn.ChildrenW("/")
314 c.Assert(err, IsNil)317 c.Assert(err, IsNil)
315 c.Assert(children, Equals, []string{"zookeeper"})318 c.Assert(children, Equals, []string{"zookeeper"})
316 c.Assert(stat.NumChildren(), Equals, int32(1))319 c.Assert(stat.NumChildren(), Equals, int32(1))
@@ -321,18 +324,18 @@
321 default:324 default:
322 }325 }
323326
324 c.Check(gozk.CountPendingWatches(), Equals, 2)327 c.Check(zk.CountPendingWatches(), Equals, 2)
325328
326 _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))329 _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
327 c.Assert(err, IsNil)330 c.Assert(err, IsNil)
328331
329 event := <-watch332 event := <-watch
330 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)333 c.Assert(event.Type, Equals, zk.EVENT_CHILD)
331 c.Assert(event.Path, Equals, "/")334 c.Assert(event.Path, Equals, "/")
332335
333 c.Check(gozk.CountPendingWatches(), Equals, 1)336 c.Check(zk.CountPendingWatches(), Equals, 1)
334337
335 children, stat, watch, err = zk.ChildrenW("/")338 children, stat, watch, err = conn.ChildrenW("/")
336 c.Assert(err, IsNil)339 c.Assert(err, IsNil)
337 c.Assert(stat.NumChildren(), Equals, int32(2))340 c.Assert(stat.NumChildren(), Equals, int32(2))
338341
@@ -345,57 +348,56 @@
345 default:348 default:
346 }349 }
347350
348 c.Check(gozk.CountPendingWatches(), Equals, 2)351 c.Check(zk.CountPendingWatches(), Equals, 2)
349352
350 _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))353 _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
351 c.Assert(err, IsNil)354 c.Assert(err, IsNil)
352355
353 event = <-watch356 event = <-watch
354 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)357 c.Assert(event.Type, Equals, zk.EVENT_CHILD)
355358
356 c.Check(gozk.CountPendingWatches(), Equals, 1)359 c.Check(zk.CountPendingWatches(), Equals, 1)
357}360}
358361
359func (s *S) TestChildrenAndWatchWithError(c *C) {362func (s *S) TestChildrenAndWatchWithError(c *C) {
360 c.Check(gozk.CountPendingWatches(), Equals, 0)363 c.Check(zk.CountPendingWatches(), Equals, 0)
361364
362 zk, _ := s.init(c)365 conn, _ := s.init(c)
363366
364 c.Check(gozk.CountPendingWatches(), Equals, 1)367 c.Check(zk.CountPendingWatches(), Equals, 1)
365368
366 _, stat, watch, err := zk.ChildrenW("/test")369 _, stat, watch, err := conn.ChildrenW("/test")
367 c.Assert(err, NotNil)370 c.Assert(err, NotNil)
368 c.Assert(err.Code(), Equals, gozk.ZNONODE)371 c.Assert(err, Equals, zk.ZNONODE)
369 c.Assert(watch, IsNil)372 c.Assert(watch, IsNil)
370 c.Assert(stat, IsNil)373 c.Assert(stat, IsNil)
371374
372 c.Check(gozk.CountPendingWatches(), Equals, 1)375 c.Check(zk.CountPendingWatches(), Equals, 1)
373}376}
374377
375func (s *S) TestExists(c *C) {378func (s *S) TestExists(c *C) {
376 zk, _ := s.init(c)379 conn, _ := s.init(c)
377380
378 stat, err := zk.Exists("/zookeeper")381 stat, err := conn.Exists("/non-existent")
379 c.Assert(err, IsNil)
380 c.Assert(stat.NumChildren(), Equals, int32(1))
381
382 stat, err = zk.Exists("/non-existent")
383 c.Assert(err, IsNil)382 c.Assert(err, IsNil)
384 c.Assert(stat, IsNil)383 c.Assert(stat, IsNil)
384
385 stat, err = conn.Exists("/zookeeper")
386 c.Assert(err, IsNil)
385}387}
386388
387func (s *S) TestExistsAndWatch(c *C) {389func (s *S) TestExistsAndWatch(c *C) {
388 c.Check(gozk.CountPendingWatches(), Equals, 0)390 c.Check(zk.CountPendingWatches(), Equals, 0)
389391
390 zk, _ := s.init(c)392 conn, _ := s.init(c)
391393
392 c.Check(gozk.CountPendingWatches(), Equals, 1)394 c.Check(zk.CountPendingWatches(), Equals, 1)
393395
394 stat, watch, err := zk.ExistsW("/test")396 stat, watch, err := conn.ExistsW("/test")
395 c.Assert(err, IsNil)397 c.Assert(err, IsNil)
396 c.Assert(stat, IsNil)398 c.Assert(stat, IsNil)
397399
398 c.Check(gozk.CountPendingWatches(), Equals, 2)400 c.Check(zk.CountPendingWatches(), Equals, 2)
399401
400 select {402 select {
401 case <-watch:403 case <-watch:
@@ -403,62 +405,62 @@
403 default:405 default:
404 }406 }
405407
406 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))408 _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
407 c.Assert(err, IsNil)409 c.Assert(err, IsNil)
408410
409 event := <-watch411 event := <-watch
410 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)412 c.Assert(event.Type, Equals, zk.EVENT_CREATED)
411 c.Assert(event.Path, Equals, "/test")413 c.Assert(event.Path, Equals, "/test")
412414
413 c.Check(gozk.CountPendingWatches(), Equals, 1)415 c.Check(zk.CountPendingWatches(), Equals, 1)
414416
415 stat, watch, err = zk.ExistsW("/test")417 stat, watch, err = conn.ExistsW("/test")
416 c.Assert(err, IsNil)418 c.Assert(err, IsNil)
417 c.Assert(stat, NotNil)419 c.Assert(stat, NotNil)
418 c.Assert(stat.NumChildren(), Equals, int32(0))420 c.Assert(stat.NumChildren(), Equals, int32(0))
419421
420 c.Check(gozk.CountPendingWatches(), Equals, 2)422 c.Check(zk.CountPendingWatches(), Equals, 2)
421}423}
422424
423func (s *S) TestExistsAndWatchWithError(c *C) {425func (s *S) TestExistsAndWatchWithError(c *C) {
424 c.Check(gozk.CountPendingWatches(), Equals, 0)426 c.Check(zk.CountPendingWatches(), Equals, 0)
425427
426 zk, _ := s.init(c)428 conn, _ := s.init(c)
427429
428 c.Check(gozk.CountPendingWatches(), Equals, 1)430 c.Check(zk.CountPendingWatches(), Equals, 1)
429431
430 stat, watch, err := zk.ExistsW("///")432 stat, watch, err := conn.ExistsW("///")
431 c.Assert(err, NotNil)433 c.Assert(err, NotNil)
432 c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)434 c.Assert(err, Equals, zk.ZBADARGUMENTS)
433 c.Assert(stat, IsNil)435 c.Assert(stat, IsNil)
434 c.Assert(watch, IsNil)436 c.Assert(watch, IsNil)
435437
436 c.Check(gozk.CountPendingWatches(), Equals, 1)438 c.Check(zk.CountPendingWatches(), Equals, 1)
437}439}
438440
439func (s *S) TestDelete(c *C) {441func (s *S) TestDelete(c *C) {
440 zk, _ := s.init(c)442 conn, _ := s.init(c)
441443
442 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))444 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
443 c.Assert(err, IsNil)445 c.Assert(err, IsNil)
444446
445 err = zk.Delete("/test", 5)447 err = conn.Delete("/test", 5)
446 c.Assert(err, NotNil)448 c.Assert(err, NotNil)
447 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)449 c.Assert(err, Equals, zk.ZBADVERSION)
448450
449 err = zk.Delete("/test", -1)451 err = conn.Delete("/test", -1)
450 c.Assert(err, IsNil)452 c.Assert(err, IsNil)
451453
452 err = zk.Delete("/test", -1)454 err = conn.Delete("/test", -1)
453 c.Assert(err, NotNil)455 c.Assert(err, NotNil)
454 c.Assert(err.Code(), Equals, gozk.ZNONODE)456 c.Assert(err, Equals, zk.ZNONODE)
455}457}
456458
457func (s *S) TestClientIdAndReInit(c *C) {459func (s *S) TestClientIdAndReInit(c *C) {
458 zk1, _ := s.init(c)460 zk1, _ := s.init(c)
459 clientId1 := zk1.ClientId()461 clientId1 := zk1.ClientId()
460462
461 zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)463 zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1)
462 c.Assert(err, IsNil)464 c.Assert(err, IsNil)
463 defer zk2.Close()465 defer zk2.Close()
464 clientId2 := zk2.ClientId()466 clientId2 := zk2.ClientId()
@@ -469,110 +471,114 @@
469// Surprisingly for some (including myself, initially), the watch471// Surprisingly for some (including myself, initially), the watch
470// returned by the exists method actually fires on data changes too.472// returned by the exists method actually fires on data changes too.
471func (s *S) TestExistsWatchOnDataChange(c *C) {473func (s *S) TestExistsWatchOnDataChange(c *C) {
472 zk, _ := s.init(c)474 conn, _ := s.init(c)
473475
474 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))476 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
475 c.Assert(err, IsNil)477 c.Assert(err, IsNil)
476478
477 _, watch, err := zk.ExistsW("/test")479 _, watch, err := conn.ExistsW("/test")
478 c.Assert(err, IsNil)480 c.Assert(err, IsNil)
479481
480 _, err = zk.Set("/test", "new", -1)482 _, err = conn.Set("/test", "new", -1)
481 c.Assert(err, IsNil)483 c.Assert(err, IsNil)
482484
483 event := <-watch485 event := <-watch
484486
485 c.Assert(event.Path, Equals, "/test")487 c.Assert(event.Path, Equals, "/test")
486 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)488 c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
487}489}
488490
489func (s *S) TestACL(c *C) {491func (s *S) TestACL(c *C) {
490 zk, _ := s.init(c)492 conn, _ := s.init(c)
491493
492 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))494 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
493 c.Assert(err, IsNil)495 c.Assert(err, IsNil)
494496
495 acl, stat, err := zk.ACL("/test")497 acl, stat, err := conn.ACL("/test")
496 c.Assert(err, IsNil)498 c.Assert(err, IsNil)
497 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))499 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
498 c.Assert(stat, NotNil)500 c.Assert(stat, NotNil)
499 c.Assert(stat.Version(), Equals, int32(0))501 c.Assert(stat.Version(), Equals, int32(0))
500502
501 acl, stat, err = zk.ACL("/non-existent")503 acl, stat, err = conn.ACL("/non-existent")
502 c.Assert(err, NotNil)504 c.Assert(err, NotNil)
503 c.Assert(err.Code(), Equals, gozk.ZNONODE)505 c.Assert(err, Equals, zk.ZNONODE)
504 c.Assert(acl, IsNil)506 c.Assert(acl, IsNil)
505 c.Assert(stat, IsNil)507 c.Assert(stat, IsNil)
506}508}
507509
508func (s *S) TestSetACL(c *C) {510func (s *S) TestSetACL(c *C) {
509 zk, _ := s.init(c)511 conn, _ := s.init(c)
510512
511 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))513 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
512 c.Assert(err, IsNil)514 c.Assert(err, IsNil)
513515
514 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)516 err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5)
515 c.Assert(err, NotNil)517 c.Assert(err, NotNil)
516 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)518 c.Assert(err, Equals, zk.ZBADVERSION)
517519
518 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)520 err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1)
519 c.Assert(err, IsNil)521 c.Assert(err, IsNil)
520522
521 acl, _, err := zk.ACL("/test")523 acl, _, err := conn.ACL("/test")
522 c.Assert(err, IsNil)524 c.Assert(err, IsNil)
523 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))525 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
524}526}
525527
526func (s *S) TestAddAuth(c *C) {528func (s *S) TestAddAuth(c *C) {
527 zk, _ := s.init(c)529 conn, _ := s.init(c)
528530
529 acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}531 acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
530532
531 _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)533 _, err := conn.Create("/test", "", zk.EPHEMERAL, acl)
532 c.Assert(err, IsNil)534 c.Assert(err, IsNil)
533535
534 _, _, err = zk.Get("/test")536 _, _, err = conn.Get("/test")
535 c.Assert(err, NotNil)537 c.Assert(err, NotNil)
536 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)538 c.Assert(err, Equals, zk.ZNOAUTH)
537539
538 err = zk.AddAuth("digest", "joe:passwd")540 err = conn.AddAuth("digest", "joe:passwd")
539 c.Assert(err, IsNil)541 c.Assert(err, IsNil)
540542
541 _, _, err = zk.Get("/test")543 _, _, err = conn.Get("/test")
542 c.Assert(err, IsNil)544 c.Assert(err, IsNil)
543}545}
544546
545func (s *S) TestWatchOnReconnection(c *C) {547func (s *S) TestWatchOnReconnection(c *C) {
546 c.Check(gozk.CountPendingWatches(), Equals, 0)548 c.Check(zk.CountPendingWatches(), Equals, 0)
547549
548 zk, session := s.init(c)550 conn, session := s.init(c)
549551
550 event := <-session552 event := <-session
551 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)553 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
552 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)554 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
553555
554 c.Check(gozk.CountPendingWatches(), Equals, 1)556 c.Check(zk.CountPendingWatches(), Equals, 1)
555557
556 stat, watch, err := zk.ExistsW("/test")558 stat, watch, err := conn.ExistsW("/test")
557 c.Assert(err, IsNil)559 c.Assert(err, IsNil)
558 c.Assert(stat, IsNil)560 c.Assert(stat, IsNil)
559561
560 c.Check(gozk.CountPendingWatches(), Equals, 2)562 c.Check(zk.CountPendingWatches(), Equals, 2)
561563
562 s.StopZK()564 err = s.zkServer.Stop()
565 c.Assert(err, IsNil)
566
563 time.Sleep(2e9)567 time.Sleep(2e9)
564 s.StartZK()
565568
566 // The session channel should receive the reconnection notification,569 err = s.zkServer.Start()
570 c.Assert(err, IsNil)
571
572 // The session channel should receive the reconnection notification.
567 select {573 select {
568 case event := <-session:574 case event := <-session:
569 c.Assert(event.State, Equals, gozk.STATE_CONNECTING)575 c.Assert(event.State, Equals, zk.STATE_CONNECTING)
570 case <-time.After(3e9):576 case <-time.After(3e9):
571 c.Fatal("Session watch didn't fire")577 c.Fatal("Session watch didn't fire")
572 }578 }
573 select {579 select {
574 case event := <-session:580 case event := <-session:
575 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)581 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
576 case <-time.After(3e9):582 case <-time.After(3e9):
577 c.Fatal("Session watch didn't fire")583 c.Fatal("Session watch didn't fire")
578 }584 }
@@ -585,40 +591,40 @@
585 }591 }
586592
587 // And it should still work.593 // And it should still work.
588 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))594 _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
589 c.Assert(err, IsNil)595 c.Assert(err, IsNil)
590596
591 event = <-watch597 event = <-watch
592 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)598 c.Assert(event.Type, Equals, zk.EVENT_CREATED)
593 c.Assert(event.Path, Equals, "/test")599 c.Assert(event.Path, Equals, "/test")
594600
595 c.Check(gozk.CountPendingWatches(), Equals, 1)601 c.Check(zk.CountPendingWatches(), Equals, 1)
596}602}
597603
598func (s *S) TestWatchOnSessionExpiration(c *C) {604func (s *S) TestWatchOnSessionExpiration(c *C) {
599 c.Check(gozk.CountPendingWatches(), Equals, 0)605 c.Check(zk.CountPendingWatches(), Equals, 0)
600606
601 zk, session := s.init(c)607 conn, session := s.init(c)
602608
603 event := <-session609 event := <-session
604 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)610 c.Assert(event.Type, Equals, zk.EVENT_SESSION)
605 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)611 c.Assert(event.State, Equals, zk.STATE_CONNECTED)
606612
607 c.Check(gozk.CountPendingWatches(), Equals, 1)613 c.Check(zk.CountPendingWatches(), Equals, 1)
608614
609 stat, watch, err := zk.ExistsW("/test")615 stat, watch, err := conn.ExistsW("/test")
610 c.Assert(err, IsNil)616 c.Assert(err, IsNil)
611 c.Assert(stat, IsNil)617 c.Assert(stat, IsNil)
612618
613 c.Check(gozk.CountPendingWatches(), Equals, 2)619 c.Check(zk.CountPendingWatches(), Equals, 2)
614620
615 // Use expiration trick described in the FAQ.621 // Use expiration trick described in the FAQ.
616 clientId := zk.ClientId()622 clientId := conn.ClientId()
617 zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)623 zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId)
618624
619 for event := range session2 {625 for event := range session2 {
620 c.Log("Event from overlapping session: ", event)626 c.Log("Event from overlapping session: ", event)
621 if event.State == gozk.STATE_CONNECTED {627 if event.State == zk.STATE_CONNECTED {
622 // Wait for zk to process the connection.628 // Wait for zk to process the connection.
623 // Not reliable without this. :-(629 // Not reliable without this. :-(
624 time.Sleep(1e9)630 time.Sleep(1e9)
@@ -627,21 +633,21 @@
627 }633 }
628 for event := range session {634 for event := range session {
629 c.Log("Event from primary session: ", event)635 c.Log("Event from primary session: ", event)
630 if event.State == gozk.STATE_EXPIRED_SESSION {636 if event.State == zk.STATE_EXPIRED_SESSION {
631 break637 break
632 }638 }
633 }639 }
634640
635 select {641 select {
636 case event := <-watch:642 case event := <-watch:
637 c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)643 c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
638 case <-time.After(3e9):644 case <-time.After(3e9):
639 c.Fatal("Watch event didn't fire")645 c.Fatal("Watch event didn't fire")
640 }646 }
641647
642 event = <-watch648 event = <-watch
643 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)649 c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
644 c.Assert(event.State, Equals, gozk.STATE_CLOSED)650 c.Assert(event.State, Equals, zk.STATE_CLOSED)
645651
646 c.Check(gozk.CountPendingWatches(), Equals, 1)652 c.Check(zk.CountPendingWatches(), Equals, 1)
647}653}

Subscribers

People subscribed via source and target branches

to all changes: