Merge lp:~rogpeppe/gozk/clean-up-interface into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Merged
Merge reported by: Gustavo Niemeyer
Merged at revision: not available
Proposed branch: lp:~rogpeppe/gozk/clean-up-interface
Merge into: lp:~juju/gozk/trunk
Diff against target: 2278 lines (+648/-578)
7 files modified
Makefile (+5/-2)
example/example.go (+22/-23)
retry_test.go (+45/-47)
server.go (+166/-0)
suite_test.go (+65/-114)
zookeeper.go (+216/-265)
zookeeper_test.go (+129/-127)
To merge this branch: bzr merge lp:~rogpeppe/gozk/clean-up-interface
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Approve
Review via email: mp+76766@code.launchpad.net

Description of the change

Clean up gozk interface. Add Server function.

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (3.2 KiB)

This is looking fantastic. Thanks Roger.

Some details to sort out:

[1]

=== renamed file 'gozk.go' => 'zookeeper/gozk.go'

Please keep the files at the root of the branch. We'll put it in a
new series in launchpad so that the gozk/zookeeper reference is
correct.

[2]

+// Conn represents a connection to a set of Zookeeper nodes.

The project name is "ZooKeeper". Please use this casing consistently
across the code base.

[3]

+// ClientId represents an established ZooKeeper session. It can be
+// passed into New to reestablish a connection to an existing session.

There's no New function.

[4]

+ ZOK Error = C.ZOK
+ ZSYSTEMERROR Error = C.ZSYSTEMERROR
+ ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY

Much better!

[5]

+func (zk *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {

Very nice changes in RetryChange too, thanks!

[6]

+ path, err := zk.Create("/test-", "bababum", zookeeper.SEQUENCE|zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))

These lines make me unhappy about the length of "zookeeper". I'm thinking we
might well rename it to "zk" in a follow up branch, but please keep it like
this for now. Just for comparison:

path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))

[7]

+ c.Assert(stat, Equals, (*zookeeper.Stat)(nil))

c.Assert(stat, IsNil)

[8]

+// Server sets up a zookeeper server environment inside dataDir

ZooKeeper in this file as well.

[9]

+// for a server that listening on the specified TCP port, sending

s/listening/listens/

[10]

+// Server does not actually start the server. Instead it returns
+// a command line, suitable for passing to exec.Command,
+// for example.

Let's do a bit better than this. There's no reason to keep the
responsibility of creating the directory, starting, stopping,
cleaning, etc, with the caller. We need this for real, not just
due to testing, but in juju for the local deployment case.

I suggest we define a Server type as:

  type Server struct {
      runDir, installDir string
      cmd *exec.Cmd
  }

and:

  func zookeeper.NewServer(runDir) *Server

Then, we should have this as well:

  func (s *Server) Create(installDir string) os.Error
  func (s *Server) Start() os.Error
  func (s *Server) Stop() os.Error
  func (s *Server) Destroy() os.Error

Please push this in a spearate merge proposal. I'm happy to
merge the current version as is for the moment.

[11]

+var log4jProperties = []byte(`

No need to have two copies of that file in memory at all times. Use a const
and []byte() it when necessary.

[12]

+ classPath, _ := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
+ more, _ := filepath.Glob(filepath.Join(dir, "lib/*.jar"))

Please check the errors.

[13]

+ fd, err := os.Open(zookeeperEnviron)

That's a file, or an f, not an fd.

[14]

+ line, err := r.ReadSlice('\n')

ReadLine()

[15]

+ // strip quotes
+ if path[0] == '"' {
+ path = path[1 : len(path)-1]
+ }

Please use Trim. This logic assumes it...

Read more...

review: Needs Fixing
Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (4.7 KiB)

On 23 September 2011 18:21, Gustavo Niemeyer <email address hidden> wrote:
> Review: Needs Fixing
>
> This is looking fantastic. Thanks Roger.
>
> Some details to sort out:
>
>
> [1]
>
> === renamed file 'gozk.go' => 'zookeeper/gozk.go'
>
> Please keep the files at the root of the branch. We'll put it in a
> new series in launchpad so that the gozk/zookeeper reference is
> correct.

ok. i moved the example file into an "example" directory
so that goinstall won't get confused.

>
>
> [2]
>
> +// Conn represents a connection to a set of Zookeeper nodes.
>
> The project name is "ZooKeeper". Please use this casing consistently
> across the code base.

done. + a few original discrepancies in this respect.

> [3]
>
> +// ClientId represents an established ZooKeeper session.  It can be
> +// passed into New to reestablish a connection to an existing session.
>
> There's no New function.

i'd already fixed this.

>
> +func (zk *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
>
> Very nice changes in RetryChange too, thanks!

good. worth the bug, i'd say!

> [6]
>
> +       path, err := zk.Create("/test-", "bababum", zookeeper.SEQUENCE|zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
>
> These lines make me unhappy about the length of "zookeeper". I'm thinking we
> might well rename it to "zk" in a follow up branch, but please keep it like
> this for now. Just for comparison:
>
> path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))

my inclination would be to keep the name as "zookeeper", but
people can import as zk if they want. then strangers to code that's
using it will see "launchpad.net/zookeeper" and know instantly
what the package is about.

also, people can easily define constants for this kind of thing if they wish.

> +       c.Assert(stat, Equals, (*zookeeper.Stat)(nil))
>
> c.Assert(stat, IsNil)

done.

> [8]
>
> +// Server sets up a zookeeper server environment inside dataDir
>
> ZooKeeper in this file as well.

done.

> [9]
>
> +// for a server that listening on the specified TCP port, sending
>
> s/listening/listens/

done

> [10]
>
> +// Server does not actually start the server. Instead it returns
> +// a command line, suitable for passing to exec.Command,
> +// for example.
>
> Let's do a bit better than this. There's no reason to keep the
> responsibility of creating the directory, starting, stopping,
> cleaning, etc, with the caller. We need this for real, not just
> due to testing, but in juju for the local deployment case.

i thought quite a bit about this.
the problem with doing the starting ourselves is that
there are many things the caller might wish to do
with the running executable. for example:
- logging (see the custom logging in suite_test.go)
- running as a different user
- tweaking the config file before starting
- stashing the pid for long-term killability
- starting the same server several times on the same
  directory (as is done in this test suite). ok, your proposal
  does deal with this.

-
>
> I suggest we define a Server type as:
>
>  type Server struct {
>      runDir, installDir string
>      cmd *exec.Cmd
>  }
>
> ...

Read more...

lp:~rogpeppe/gozk/clean-up-interface updated
24. By Roger Peppe

fixes after code review.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Thanks for the changes. Some additional feedback:

[10]

> i thought quite a bit about this.
> the problem with doing the starting ourselves is that
> there are many things the caller might wish to do

Yeah, we talked precisely about this on IRC. None of the
points you mention are a good reason to not provide the
suggested interface.

[12]

> the docs say that the only error that glob can return is if the
> pattern is malformed. those patterns are evidently not.

Until someone changes them by mistake and they become malformed,
and rather than being warned about that we get crazy behavior.

A panic is fine in that case, but please do not leave errors unchecked.

[14]

Sounds good.

lp:~rogpeppe/gozk/clean-up-interface updated
25. By Roger Peppe

add error checks to Glob call

Revision history for this message
Roger Peppe (rogpeppe) wrote :

On 23 September 2011 19:41, Gustavo Niemeyer <email address hidden> wrote:
>> the docs say that the only error that glob can return is if the
>> pattern is malformed. those patterns are evidently not.
>
> Until someone changes them by mistake and they become malformed,
> and rather than being warned about that we get crazy behavior.
>
> A panic is fine in that case, but please do not leave errors unchecked.

i will do this since you ask. but i really think this is unnecessary.
the worst thing that will happen if the pattern is bad is that we'll
get no files.
this will also happen if the file name is misspelled, a much more likely
case (and one incidentally that i think might apply currently - i can't
test it), and one which the error check won't catch. it seems ok to me
that they're
both debugged in the same way.

[the only time a pattern can be bad is if ends in a backslash
or it has a malformed character class. the former is easy to see
and we're unlikely to add a character class match here.]

i've pushed a new version. i'll leave the Server changes for a separate
merge, as suggested.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

That's merged, and pushed to a new package location, and fixed accordingly:

    http://launchpad.net/gozk/zk

Once the Server implementation is finished, we can announce the new location
and the changes performed.

The new location also enables us to wait a bit for people to migrate, instead
of removing the old location immediately.

On the next merge proposal, please remember to target lp:~juju/juju/zk instead of trunk.

review: Approve
Revision history for this message
Roger Peppe (rogpeppe) wrote :

thanks, but...

one last plea to have the word "zookeeper" in the package path at least,
but preferably as the pkg identifier. people can import as zk if they wish
(or even as both!). i'm not keen on acronyms and abbreviations - can you
tell? :-)

On 24 September 2011 15:08, Gustavo Niemeyer <email address hidden> wrote:
> Review: Approve
>
> That's merged, and pushed to a new package location, and fixed accordingly:
>
>    http://launchpad.net/gozk/zk
>
> Once the Server implementation is finished, we can announce the new location
> and the changes performed.
>
> The new location also enables us to wait a bit for people to migrate, instead
> of removing the old location immediately.
>
> On the next merge proposal, please remember to target lp:~juju/juju/zk instead of trunk.
> --
> https://code.launchpad.net/~rogpeppe/gozk/clean-up-interface/+merge/76766
> You are the owner of lp:~rogpeppe/gozk/clean-up-interface.
>

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

> one last plea to have the word "zookeeper" in the package path at least,
> but preferably as the pkg identifier. people can import as zk if they wish
> (or even as both!). i'm not keen on acronyms and abbreviations - can you
> tell? :-)

I can tell, but I'm not keen on the very long lines that the new package name
has created, so being a matter of personal choice only I'll reserve the right
to keep the existing convention unchanged in that one case.

I do accept taking the "go" prefix out, though.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile'
--- Makefile 2011-08-03 01:56:58 +0000
+++ Makefile 2011-09-24 08:31:23 +0000
@@ -2,10 +2,13 @@
22
3all: package3all: package
44
5TARG=gozk5TARG=launchpad.net/zookeeper
66
7GOFILES=\
8 server.go\
9
7CGOFILES=\10CGOFILES=\
8 gozk.go\11 zookeeper.go\
912
10CGO_OFILES=\13CGO_OFILES=\
11 helpers.o\14 helpers.o\
1215
=== added directory 'example'
=== renamed file 'example.go' => 'example/example.go'
--- example.go 2011-08-03 01:47:25 +0000
+++ example/example.go 2011-09-24 08:31:23 +0000
@@ -1,30 +1,29 @@
1package main1package main
22
3import (3import (
4 "gozk"4 "launchpad.net/zookeeper/zookeeper"
5)5)
66
7func main() {7func main() {
8 zk, session, err := gozk.Init("localhost:2181", 5000)8 zk, session, err := zookeeper.Init("localhost:2181", 5000)
9 if err != nil {9 if err != nil {
10 println("Couldn't connect: " + err.String())10 println("Couldn't connect: " + err.String())
11 return11 return
12 }12 }
1313
14 defer zk.Close()14 defer zk.Close()
1515
16 // Wait for connection.16 // Wait for connection.
17 event := <-session17 event := <-session
18 if event.State != gozk.STATE_CONNECTED {18 if event.State != zookeeper.STATE_CONNECTED {
19 println("Couldn't connect")19 println("Couldn't connect")
20 return20 return
21 }21 }
2222
23 _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))23 _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
24 if err != nil {24 if err != nil {
25 println(err.String())25 println(err.String())
26 } else {26 } else {
27 println("Created!")27 println("Created!")
28 }28 }
29}29}
30
3130
=== modified file 'retry_test.go'
--- retry_test.go 2011-08-19 01:51:37 +0000
+++ retry_test.go 2011-09-24 08:31:23 +0000
@@ -1,16 +1,16 @@
1package gozk_test1package zookeeper_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/zookeeper"
6 "os"6 "os"
7)7)
88
9func (s *S) TestRetryChangeCreating(c *C) {9func (s *S) TestRetryChangeCreating(c *C) {
10 zk, _ := s.init(c)10 zk, _ := s.init(c)
1111
12 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),12 err := zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL),
13 func(data string, stat gozk.Stat) (string, os.Error) {13 func(data string, stat *zookeeper.Stat) (string, os.Error) {
14 c.Assert(data, Equals, "")14 c.Assert(data, Equals, "")
15 c.Assert(stat, IsNil)15 c.Assert(stat, IsNil)
16 return "new", nil16 return "new", nil
@@ -25,18 +25,18 @@
2525
26 acl, _, err := zk.ACL("/test")26 acl, _, err := zk.ACL("/test")
27 c.Assert(err, IsNil)27 c.Assert(err, IsNil)
28 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))28 c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_ALL))
29}29}
3030
31func (s *S) TestRetryChangeSetting(c *C) {31func (s *S) TestRetryChangeSetting(c *C) {
32 zk, _ := s.init(c)32 zk, _ := s.init(c)
3333
34 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,34 _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL,
35 gozk.WorldACL(gozk.PERM_ALL))35 zookeeper.WorldACL(zookeeper.PERM_ALL))
36 c.Assert(err, IsNil)36 c.Assert(err, IsNil)
3737
38 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},38 err = zk.RetryChange("/test", zookeeper.EPHEMERAL, []zookeeper.ACL{},
39 func(data string, stat gozk.Stat) (string, os.Error) {39 func(data string, stat *zookeeper.Stat) (string, os.Error) {
40 c.Assert(data, Equals, "old")40 c.Assert(data, Equals, "old")
41 c.Assert(stat, NotNil)41 c.Assert(stat, NotNil)
42 c.Assert(stat.Version(), Equals, int32(0))42 c.Assert(stat.Version(), Equals, int32(0))
@@ -53,18 +53,18 @@
53 // ACL was unchanged by RetryChange().53 // ACL was unchanged by RetryChange().
54 acl, _, err := zk.ACL("/test")54 acl, _, err := zk.ACL("/test")
55 c.Assert(err, IsNil)55 c.Assert(err, IsNil)
56 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))56 c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_ALL))
57}57}
5858
59func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {59func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
60 zk, _ := s.init(c)60 zk, _ := s.init(c)
6161
62 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,62 _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL,
63 gozk.WorldACL(gozk.PERM_ALL))63 zookeeper.WorldACL(zookeeper.PERM_ALL))
64 c.Assert(err, IsNil)64 c.Assert(err, IsNil)
6565
66 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},66 err = zk.RetryChange("/test", zookeeper.EPHEMERAL, []zookeeper.ACL{},
67 func(data string, stat gozk.Stat) (string, os.Error) {67 func(data string, stat *zookeeper.Stat) (string, os.Error) {
68 c.Assert(data, Equals, "old")68 c.Assert(data, Equals, "old")
69 c.Assert(stat, NotNil)69 c.Assert(stat, NotNil)
70 c.Assert(stat.Version(), Equals, int32(0))70 c.Assert(stat.Version(), Equals, int32(0))
@@ -82,12 +82,12 @@
82func (s *S) TestRetryChangeConflictOnCreate(c *C) {82func (s *S) TestRetryChangeConflictOnCreate(c *C) {
83 zk, _ := s.init(c)83 zk, _ := s.init(c)
8484
85 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {85 changeFunc := func(data string, stat *zookeeper.Stat) (string, os.Error) {
86 switch data {86 switch data {
87 case "":87 case "":
88 c.Assert(stat, IsNil)88 c.Assert(stat, IsNil)
89 _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,89 _, err := zk.Create("/test", "conflict", zookeeper.EPHEMERAL,
90 gozk.WorldACL(gozk.PERM_ALL))90 zookeeper.WorldACL(zookeeper.PERM_ALL))
91 c.Assert(err, IsNil)91 c.Assert(err, IsNil)
92 return "<none> => conflict", nil92 return "<none> => conflict", nil
93 case "conflict":93 case "conflict":
@@ -100,7 +100,7 @@
100 return "can't happen", nil100 return "can't happen", nil
101 }101 }
102102
103 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),103 err := zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL),
104 changeFunc)104 changeFunc)
105 c.Assert(err, IsNil)105 c.Assert(err, IsNil)
106106
@@ -114,11 +114,11 @@
114func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {114func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
115 zk, _ := s.init(c)115 zk, _ := s.init(c)
116116
117 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,117 _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL,
118 gozk.WorldACL(gozk.PERM_ALL))118 zookeeper.WorldACL(zookeeper.PERM_ALL))
119 c.Assert(err, IsNil)119 c.Assert(err, IsNil)
120120
121 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {121 changeFunc := func(data string, stat *zookeeper.Stat) (string, os.Error) {
122 switch data {122 switch data {
123 case "old":123 case "old":
124 c.Assert(stat, NotNil)124 c.Assert(stat, NotNil)
@@ -136,7 +136,7 @@
136 return "can't happen", nil136 return "can't happen", nil
137 }137 }
138138
139 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)139 err = zk.RetryChange("/test", zookeeper.EPHEMERAL, []zookeeper.ACL{}, changeFunc)
140 c.Assert(err, IsNil)140 c.Assert(err, IsNil)
141141
142 data, stat, err := zk.Get("/test")142 data, stat, err := zk.Get("/test")
@@ -149,11 +149,11 @@
149func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {149func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
150 zk, _ := s.init(c)150 zk, _ := s.init(c)
151151
152 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,152 _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL,
153 gozk.WorldACL(gozk.PERM_ALL))153 zookeeper.WorldACL(zookeeper.PERM_ALL))
154 c.Assert(err, IsNil)154 c.Assert(err, IsNil)
155155
156 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {156 changeFunc := func(data string, stat *zookeeper.Stat) (string, os.Error) {
157 switch data {157 switch data {
158 case "old":158 case "old":
159 c.Assert(stat, NotNil)159 c.Assert(stat, NotNil)
@@ -170,7 +170,7 @@
170 return "can't happen", nil170 return "can't happen", nil
171 }171 }
172172
173 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),173 err = zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_READ),
174 changeFunc)174 changeFunc)
175 c.Assert(err, IsNil)175 c.Assert(err, IsNil)
176176
@@ -183,18 +183,17 @@
183 // Should be the new ACL.183 // Should be the new ACL.
184 acl, _, err := zk.ACL("/test")184 acl, _, err := zk.ACL("/test")
185 c.Assert(err, IsNil)185 c.Assert(err, IsNil)
186 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))186 c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_READ))
187}187}
188188
189func (s *S) TestRetryChangeErrorInCallback(c *C) {189func (s *S) TestRetryChangeErrorInCallback(c *C) {
190 zk, _ := s.init(c)190 zk, _ := s.init(c)
191191
192 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),192 err := zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL),
193 func(data string, stat gozk.Stat) (string, os.Error) {193 func(data string, stat *zookeeper.Stat) (string, os.Error) {
194 return "don't use this", os.NewError("BOOM!")194 return "don't use this", os.NewError("BOOM!")
195 })195 })
196 c.Assert(err, NotNil)196 c.Assert(err, NotNil)
197 c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
198 c.Assert(err.String(), Equals, "BOOM!")197 c.Assert(err.String(), Equals, "BOOM!")
199198
200 stat, err := zk.Exists("/test")199 stat, err := zk.Exists("/test")
@@ -205,18 +204,18 @@
205func (s *S) TestRetryChangeFailsReading(c *C) {204func (s *S) TestRetryChangeFailsReading(c *C) {
206 zk, _ := s.init(c)205 zk, _ := s.init(c)
207206
208 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,207 _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL,
209 gozk.WorldACL(gozk.PERM_WRITE)) // Write only!208 zookeeper.WorldACL(zookeeper.PERM_WRITE)) // Write only!
210 c.Assert(err, IsNil)209 c.Assert(err, IsNil)
211210
212 var called bool211 var called bool
213 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),212 err = zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL),
214 func(data string, stat gozk.Stat) (string, os.Error) {213 func(data string, stat *zookeeper.Stat) (string, os.Error) {
215 called = true214 called = true
216 return "", nil215 return "", nil
217 })216 })
218 c.Assert(err, NotNil)217 c.Assert(err, NotNil)
219 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)218 c.Assert(err, Equals, zookeeper.ZNOAUTH)
220219
221 stat, err := zk.Exists("/test")220 stat, err := zk.Exists("/test")
222 c.Assert(err, IsNil)221 c.Assert(err, IsNil)
@@ -229,18 +228,17 @@
229func (s *S) TestRetryChangeFailsSetting(c *C) {228func (s *S) TestRetryChangeFailsSetting(c *C) {
230 zk, _ := s.init(c)229 zk, _ := s.init(c)
231230
232 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,231 _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL,
233 gozk.WorldACL(gozk.PERM_READ)) // Read only!232 zookeeper.WorldACL(zookeeper.PERM_READ)) // Read only!
234 c.Assert(err, IsNil)233 c.Assert(err, IsNil)
235234
236 var called bool235 var called bool
237 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),236 err = zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL),
238 func(data string, stat gozk.Stat) (string, os.Error) {237 func(data string, stat *zookeeper.Stat) (string, os.Error) {
239 called = true238 called = true
240 return "", nil239 return "", nil
241 })240 })
242 c.Assert(err, NotNil)241 c.Assert(err, Equals, zookeeper.ZNOAUTH)
243 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
244242
245 stat, err := zk.Exists("/test")243 stat, err := zk.Exists("/test")
246 c.Assert(err, IsNil)244 c.Assert(err, IsNil)
@@ -253,19 +251,19 @@
253func (s *S) TestRetryChangeFailsCreating(c *C) {251func (s *S) TestRetryChangeFailsCreating(c *C) {
254 zk, _ := s.init(c)252 zk, _ := s.init(c)
255253
256 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,254 _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL,
257 gozk.WorldACL(gozk.PERM_READ)) // Read only!255 zookeeper.WorldACL(zookeeper.PERM_READ)) // Read only!
258 c.Assert(err, IsNil)256 c.Assert(err, IsNil)
259257
260 var called bool258 var called bool
261 err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,259 err = zk.RetryChange("/test/sub", zookeeper.EPHEMERAL,
262 gozk.WorldACL(gozk.PERM_ALL),260 zookeeper.WorldACL(zookeeper.PERM_ALL),
263 func(data string, stat gozk.Stat) (string, os.Error) {261 func(data string, stat *zookeeper.Stat) (string, os.Error) {
264 called = true262 called = true
265 return "", nil263 return "", nil
266 })264 })
267 c.Assert(err, NotNil)265 c.Assert(err, NotNil)
268 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)266 c.Assert(err, Equals, zookeeper.ZNOAUTH)
269267
270 stat, err := zk.Exists("/test/sub")268 stat, err := zk.Exists("/test/sub")
271 c.Assert(err, IsNil)269 c.Assert(err, IsNil)
272270
=== added file 'server.go'
--- server.go 1970-01-01 00:00:00 +0000
+++ server.go 2011-09-24 08:31:23 +0000
@@ -0,0 +1,166 @@
1package zookeeper
2
3import (
4 "bufio"
5 "bytes"
6 "exec"
7 "fmt"
8 "io/ioutil"
9 "os"
10 "path/filepath"
11 "strings"
12)
13
14const zookeeperEnviron = "/etc/zookeeper/conf/environment"
15
16// Server sets up a zookeeper server environment inside dataDir
17// for a server that listens on the specified TCP port, sending
18// all log messages to standard output.
19// The dataDir directory must exist already.
20//
21// The zookeeper installation directory is specified by installedDir.
22// If this is empty, a system default will be used.
23//
24// Server does not actually start the server. Instead it returns
25// a command line, suitable for passing to exec.Command,
26// for example.
27func Server(port int, dataDir, installedDir string) ([]string, os.Error) {
28 cp, err := classPath(installedDir)
29 if err != nil {
30 return nil, err
31 }
32 logDir := filepath.Join(dataDir, "log")
33 if err = os.Mkdir(logDir, 0777); err != nil && err.(*os.PathError).Error != os.EEXIST {
34 return nil, err
35 }
36 logConfigPath, err := writeLog4JConfig(dataDir)
37 if err != nil {
38 return nil, err
39 }
40 configPath, err := writeZooKeeperConfig(dataDir, port)
41 if err != nil {
42 return nil, err
43 }
44 exe, err := exec.LookPath("java")
45 if err != nil {
46 return nil, err
47 }
48 cmd := []string{
49 exe,
50 "-cp", strings.Join(cp, ":"),
51 "-Dzookeeper.log.dir=" + logDir,
52 "-Dzookeeper.root.logger=INFO,CONSOLE",
53 "-Dlog4j.configuration=file:" + logConfigPath,
54 "org.apache.zookeeper.server.quorum.QuorumPeerMain",
55 configPath,
56 }
57 return cmd, nil
58}
59
60var log4jProperties = `
61log4j.rootLogger=INFO, CONSOLE
62log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
63log4j.appender.CONSOLE.Threshold=INFO
64log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
65log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
66`
67
68func writeLog4JConfig(dir string) (path string, err os.Error) {
69 path = filepath.Join(dir, "log4j.properties")
70 err = ioutil.WriteFile(path, []byte(log4jProperties), 0666)
71 return
72}
73
74func writeZooKeeperConfig(dir string, port int) (path string, err os.Error) {
75 path = filepath.Join(dir, "zoo.cfg")
76 err = ioutil.WriteFile(path, []byte(fmt.Sprintf(
77 "tickTime=2000\n"+
78 "dataDir=%s\n"+
79 "clientPort=%d\n"+
80 "maxClientCnxns=500\n",
81 dir, port)), 0666)
82 return
83}
84
85func classPath(dir string) ([]string, os.Error) {
86 if dir == "" {
87 return systemClassPath()
88 }
89 if err := checkDirectory(dir); err != nil {
90 return nil, err
91 }
92 // Two possibilities, as seen in zkEnv.sh:
93 // 1) locally built binaries (jars are in build directory)
94 // 2) release binaries
95 if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
96 dir = build
97 }
98 classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
99 if err != nil {
100 panic(fmt.Errorf("glob for jar files: %v", err))
101 }
102 more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
103 if err != nil {
104 panic(fmt.Errorf("glob for lib jar files: %v", err))
105 }
106
107 classPath = append(classPath, more...)
108 if len(classPath) == 0 {
109 return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
110 }
111 return classPath, nil
112}
113
114func systemClassPath() ([]string, os.Error) {
115 f, err := os.Open(zookeeperEnviron)
116 if f == nil {
117 return nil, err
118 }
119 r := bufio.NewReader(f)
120 for {
121 line, err := r.ReadSlice('\n')
122 if err != nil {
123 break
124 }
125 if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
126 continue
127 }
128
129 // remove variable and newline
130 path := string(line[len("CLASSPATH=") : len(line)-1])
131
132 // trim white space
133 path = strings.Trim(path, " \t\r")
134
135 // strip quotes
136 if path[0] == '"' {
137 path = path[1 : len(path)-1]
138 }
139
140 // split on :
141 classPath := strings.Split(path, ":")
142
143 // split off $ZOOCFGDIR
144 if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
145 classPath = classPath[1:]
146 }
147
148 if len(classPath) == 0 {
149 return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
150 }
151 return classPath, nil
152 }
153 return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
154}
155
156// checkDirectory returns an error if the given path
157// does not exist or is not a directory.
158func checkDirectory(path string) os.Error {
159 if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
160 if err == nil {
161 err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
162 }
163 return err
164 }
165 return nil
166}
0167
=== modified file 'suite_test.go'
--- suite_test.go 2011-08-19 01:43:37 +0000
+++ suite_test.go 2011-09-24 08:31:23 +0000
@@ -1,13 +1,13 @@
1package gozk_test1package zookeeper_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "testing"5 "bufio"
6 "io/ioutil"6 "exec"
7 "path"
8 "fmt"7 "fmt"
8 "launchpad.net/zookeeper"
9 "os"9 "os"
10 "gozk"10 "testing"
11 "time"11 "time"
12)12)
1313
@@ -18,48 +18,32 @@
18var _ = Suite(&S{})18var _ = Suite(&S{})
1919
20type S struct {20type S struct {
21 zkRoot string21 zkArgs []string
22 zkTestRoot string22 zkTestRoot string
23 zkTestPort int23 zkTestPort int
24 zkServerSh string24 zkProcess *os.Process // The running ZooKeeper process
25 zkServerOut *os.File25 zkAddr string
26 zkAddr string
2726
28 handles []*gozk.ZooKeeper27 handles []*zookeeper.Conn
29 events []*gozk.Event28 events []*zookeeper.Event
30 liveWatches int29 liveWatches int
31 deadWatches chan bool30 deadWatches chan bool
32}31}
3332
34var logLevel = 0 //gozk.LOG_ERROR33var logLevel = 0 //zookeeper.LOG_ERROR
3534
3635func (s *S) init(c *C) (*zookeeper.Conn, chan zookeeper.Event) {
37var testZooCfg = ("dataDir=%s\n" +36 zk, watch, err := zookeeper.Dial(s.zkAddr, 5e9)
38 "clientPort=%d\n" +
39 "tickTime=2000\n" +
40 "initLimit=10\n" +
41 "syncLimit=5\n" +
42 "")
43
44var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
45 "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
46 "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
47 "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
48 "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
49 "")
50
51func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
52 zk, watch, err := gozk.Init(s.zkAddr, 5e9)
53 c.Assert(err, IsNil)37 c.Assert(err, IsNil)
5438
55 s.handles = append(s.handles, zk)39 s.handles = append(s.handles, zk)
5640
57 event := <-watch41 event := <-watch
5842
59 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)43 c.Assert(event.Type, Equals, zookeeper.EVENT_SESSION)
60 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)44 c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED)
6145
62 bufferedWatch := make(chan gozk.Event, 256)46 bufferedWatch := make(chan zookeeper.Event, 256)
63 bufferedWatch <- event47 bufferedWatch <- event
6448
65 s.liveWatches += 149 s.liveWatches += 1
@@ -86,9 +70,9 @@
86}70}
8771
88func (s *S) SetUpTest(c *C) {72func (s *S) SetUpTest(c *C) {
89 c.Assert(gozk.CountPendingWatches(), Equals, 0,73 c.Assert(zookeeper.CountPendingWatches(), Equals, 0,
90 Bug("Test got a dirty watch state before running!"))74 Bug("Test got a dirty watch state before running!"))
91 gozk.SetLogLevel(logLevel)75 zookeeper.SetLogLevel(logLevel)
92}76}
9377
94func (s *S) TearDownTest(c *C) {78func (s *S) TearDownTest(c *C) {
@@ -108,98 +92,65 @@
108 }92 }
10993
110 // Reset the list of handles.94 // Reset the list of handles.
111 s.handles = make([]*gozk.ZooKeeper, 0)95 s.handles = make([]*zookeeper.Conn, 0)
11296
113 c.Assert(gozk.CountPendingWatches(), Equals, 0,97 c.Assert(zookeeper.CountPendingWatches(), Equals, 0,
114 Bug("Test left live watches behind!"))98 Bug("Test left live watches behind!"))
115}99}
116100
117// We use the suite set up and tear down to manage a custom zookeeper101// We use the suite set up and tear down to manage a custom ZooKeeper
118//102//
119func (s *S) SetUpSuite(c *C) {103func (s *S) SetUpSuite(c *C) {
120104 var err os.Error
121 s.deadWatches = make(chan bool)105 s.deadWatches = make(chan bool)
122106
123 var err os.Error
124
125 s.zkRoot = os.Getenv("ZKROOT")
126 if s.zkRoot == "" {
127 panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")
128 }
129
130 s.zkTestRoot = c.MkDir()107 s.zkTestRoot = c.MkDir()
131 s.zkTestPort = 21812108 s.zkTestPort = 21812
132109 s.zkAddr = fmt.Sprint("localhost:", s.zkTestPort)
133 println("ZooKeeper test server directory:", s.zkTestRoot)110
134 println("ZooKeeper test server port:", s.zkTestPort)111 s.zkArgs, err = zookeeper.Server(s.zkTestPort, s.zkTestRoot, "")
135112 if err != nil {
136 s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)113 c.Fatal("Cannot set up server environment: ", err)
137114 }
138 s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")115 s.StartZK(c)
139 s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
140 os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
141 if err != nil {
142 panic("Can't open stdout.txt file for server: " + err.String())
143 }
144
145 dataDir := path.Join(s.zkTestRoot, "data")
146 confDir := path.Join(s.zkTestRoot, "conf")
147
148 os.Mkdir(dataDir, 0755)
149 os.Mkdir(confDir, 0755)
150
151 err = os.Setenv("ZOOCFGDIR", confDir)
152 if err != nil {
153 panic("Can't set $ZOOCFGDIR: " + err.String())
154 }
155
156 zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
157 err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
158 if err != nil {
159 panic("Can't write zoo.cfg: " + err.String())
160 }
161
162 log4jPrp := []byte(testLog4jPrp)
163 err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
164 if err != nil {
165 panic("Can't write log4j.properties: " + err.String())
166 }
167
168 s.StartZK()
169}116}
170117
171func (s *S) TearDownSuite(c *C) {118func (s *S) TearDownSuite(c *C) {
172 s.StopZK()119 s.StopZK()
173 s.zkServerOut.Close()120}
174}121
175122func startLogger(c *C, cmd *exec.Cmd) {
176func (s *S) StartZK() {123 r, err := cmd.StdoutPipe()
177 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}124 if err != nil {
178 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)125 c.Fatal("cannot make output pipe:", err)
179 if err != nil {126 }
180 panic("Problem executing zkServer.sh start: " + err.String())127 cmd.Stderr = cmd.Stdout
181 }128 bio := bufio.NewReader(r)
182129 go func() {
183 result, err := proc.Wait(0)130 for {
184 if err != nil {131 line, err := bio.ReadSlice('\n')
185 panic(err.String())132 if err != nil {
186 } else if result.ExitStatus() != 0 {133 break
187 panic("'zkServer.sh start' exited with non-zero status")134 }
188 }135 c.Log(line[0 : len(line)-1])
136 }
137 }()
138}
139
140func (s *S) StartZK(c *C) {
141 cmd := exec.Command(s.zkArgs[0], s.zkArgs[1:]...)
142 startLogger(c, cmd)
143 err := cmd.Start()
144 if err != nil {
145 c.Fatal("Error starting zookeeper server: ", err)
146 }
147 s.zkProcess = cmd.Process
189}148}
190149
191func (s *S) StopZK() {150func (s *S) StopZK() {
192 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}151 if s.zkProcess != nil {
193 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)152 s.zkProcess.Kill()
194 if err != nil {153 s.zkProcess.Wait(0)
195 panic("Problem executing zkServer.sh stop: " + err.String() +154 }
196 " (look for runaway java processes!)")155 s.zkProcess = nil
197 }
198 result, err := proc.Wait(0)
199 if err != nil {
200 panic(err.String())
201 } else if result.ExitStatus() != 0 {
202 panic("'zkServer.sh stop' exited with non-zero status " +
203 "(look for runaway java processes!)")
204 }
205}156}
206157
=== added directory 'zookeeper'
=== renamed file 'gozk.go' => 'zookeeper.go'
--- gozk.go 2011-08-19 01:56:39 +0000
+++ zookeeper.go 2011-09-24 08:31:23 +0000
@@ -1,4 +1,4 @@
1// gozk - Zookeeper support for the Go language1// gozk - ZooKeeper support for the Go language
2//2//
3// https://wiki.ubuntu.com/gozk3// https://wiki.ubuntu.com/gozk
4//4//
@@ -6,7 +6,7 @@
6//6//
7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
8//8//
9package gozk9package zookeeper
1010
11/*11/*
12#cgo CFLAGS: -I/usr/include/c-client-src12#cgo CFLAGS: -I/usr/include/c-client-src
@@ -27,17 +27,16 @@
27// -----------------------------------------------------------------------27// -----------------------------------------------------------------------
28// Main constants and data types.28// Main constants and data types.
2929
30// The main ZooKeeper object, created through the Init function.30// Conn represents a connection to a set of ZooKeeper nodes.
31// Encapsulates all communication with ZooKeeper.31type Conn struct {
32type ZooKeeper struct {
33 watchChannels map[uintptr]chan Event32 watchChannels map[uintptr]chan Event
34 sessionWatchId uintptr33 sessionWatchId uintptr
35 handle *C.zhandle_t34 handle *C.zhandle_t
36 mutex sync.Mutex35 mutex sync.Mutex
37}36}
3837
39// ClientId represents the established session in ZooKeeper. This is only38// ClientId represents an established ZooKeeper session. It can be
40// useful to be passed back into the ReInit function.39// passed into Redial to reestablish a connection to an existing session.
41type ClientId struct {40type ClientId struct {
42 cId C.clientid_t41 cId C.clientid_t
43}42}
@@ -98,35 +97,60 @@
98 State int97 State int
99}98}
10099
101// Error codes that may be used to verify the result of the100// Error represents a ZooKeeper error.
102// Code method from Error.101type Error int
102
103const (103const (
104 ZOK = C.ZOK104 ZOK Error = C.ZOK
105 ZSYSTEMERROR = C.ZSYSTEMERROR105 ZSYSTEMERROR Error = C.ZSYSTEMERROR
106 ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY106 ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
107 ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY107 ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
108 ZCONNECTIONLOSS = C.ZCONNECTIONLOSS108 ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
109 ZMARSHALLINGERROR = C.ZMARSHALLINGERROR109 ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
110 ZUNIMPLEMENTED = C.ZUNIMPLEMENTED110 ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
111 ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT111 ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
112 ZBADARGUMENTS = C.ZBADARGUMENTS112 ZBADARGUMENTS Error = C.ZBADARGUMENTS
113 ZINVALIDSTATE = C.ZINVALIDSTATE113 ZINVALIDSTATE Error = C.ZINVALIDSTATE
114 ZAPIERROR = C.ZAPIERROR114 ZAPIERROR Error = C.ZAPIERROR
115 ZNONODE = C.ZNONODE115 ZNONODE Error = C.ZNONODE
116 ZNOAUTH = C.ZNOAUTH116 ZNOAUTH Error = C.ZNOAUTH
117 ZBADVERSION = C.ZBADVERSION117 ZBADVERSION Error = C.ZBADVERSION
118 ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS118 ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
119 ZNODEEXISTS = C.ZNODEEXISTS119 ZNODEEXISTS Error = C.ZNODEEXISTS
120 ZNOTEMPTY = C.ZNOTEMPTY120 ZNOTEMPTY Error = C.ZNOTEMPTY
121 ZSESSIONEXPIRED = C.ZSESSIONEXPIRED121 ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
122 ZINVALIDCALLBACK = C.ZINVALIDCALLBACK122 ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
123 ZINVALIDACL = C.ZINVALIDACL123 ZINVALIDACL Error = C.ZINVALIDACL
124 ZAUTHFAILED = C.ZAUTHFAILED124 ZAUTHFAILED Error = C.ZAUTHFAILED
125 ZCLOSING = C.ZCLOSING125 ZCLOSING Error = C.ZCLOSING
126 ZNOTHING = C.ZNOTHING126 ZNOTHING Error = C.ZNOTHING
127 ZSESSIONMOVED = C.ZSESSIONMOVED127 ZSESSIONMOVED Error = C.ZSESSIONMOVED
128)128)
129129
130func (error Error) String() string {
131 return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
132}
133
134// zkError creates an appropriate error return from
135// a ZooKeeper status and the errno return from a C API
136// call.
137func zkError(rc C.int, cerr os.Error) os.Error {
138 code := Error(rc)
139 switch code {
140 case ZOK:
141 return nil
142
143 case ZSYSTEMERROR:
144 // If a zookeeper call returns ZSYSTEMERROR, then
145 // errno becomes significant. If errno has not been
146 // set, then we will return ZSYSTEMERROR nonetheless.
147 if cerr != nil {
148 return cerr
149 }
150 }
151 return code
152}
153
130// Constants for SetLogLevel.154// Constants for SetLogLevel.
131const (155const (
132 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR156 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
@@ -273,104 +297,54 @@
273}297}
274298
275// -----------------------------------------------------------------------299// -----------------------------------------------------------------------
276// Error interface which maps onto the ZooKeeper error codes.
277
278type Error interface {
279 String() string
280 Code() int
281}
282
283type errorType struct {
284 zkrc C.int
285 err os.Error
286}
287
288func newError(zkrc C.int, err os.Error) Error {
289 return &errorType{zkrc, err}
290}
291
292func (error *errorType) String() (result string) {
293 if error.zkrc == ZSYSTEMERROR && error.err != nil {
294 result = error.err.String()
295 } else {
296 result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
297 }
298 return
299}
300
301// Code returns the error code that may be compared against one of
302// the gozk.Z* constants.
303func (error *errorType) Code() int {
304 return int(error.zkrc)
305}
306
307// -----------------------------------------------------------------------
308// Stat interface which maps onto the ZooKeeper Stat struct.
309
310// We declare this as an interface rather than an actual struct because
311// this way we don't have to copy data around between the real C struct
312// and the Go one on every call. Most uses will only touch a few elements,
313// or even ignore the stat entirely, so that's a win.
314300
315// Stat contains detailed information about a node.301// Stat contains detailed information about a node.
316type Stat interface {302type Stat struct {
317 Czxid() int64303 c C.struct_Stat
318 Mzxid() int64304}
319 CTime() int64305
320 MTime() int64306func (stat *Stat) Czxid() int64 {
321 Version() int32307 return int64(stat.c.czxid)
322 CVersion() int32308}
323 AVersion() int32309
324 EphemeralOwner() int64310func (stat *Stat) Mzxid() int64 {
325 DataLength() int32311 return int64(stat.c.mzxid)
326 NumChildren() int32312}
327 Pzxid() int64313
328}314func (stat *Stat) CTime() int64 {
329315 return int64(stat.c.ctime)
330type resultStat C.struct_Stat316}
331317
332func (stat *resultStat) Czxid() int64 {318func (stat *Stat) MTime() int64 {
333 return int64(stat.czxid)319 return int64(stat.c.mtime)
334}320}
335321
336func (stat *resultStat) Mzxid() int64 {322func (stat *Stat) Version() int32 {
337 return int64(stat.mzxid)323 return int32(stat.c.version)
338}324}
339325
340func (stat *resultStat) CTime() int64 {326func (stat *Stat) CVersion() int32 {
341 return int64(stat.ctime)327 return int32(stat.c.cversion)
342}328}
343329
344func (stat *resultStat) MTime() int64 {330func (stat *Stat) AVersion() int32 {
345 return int64(stat.mtime)331 return int32(stat.c.aversion)
346}332}
347333
348func (stat *resultStat) Version() int32 {334func (stat *Stat) EphemeralOwner() int64 {
349 return int32(stat.version)335 return int64(stat.c.ephemeralOwner)
350}336}
351337
352func (stat *resultStat) CVersion() int32 {338func (stat *Stat) DataLength() int32 {
353 return int32(stat.cversion)339 return int32(stat.c.dataLength)
354}340}
355341
356func (stat *resultStat) AVersion() int32 {342func (stat *Stat) NumChildren() int32 {
357 return int32(stat.aversion)343 return int32(stat.c.numChildren)
358}344}
359345
360func (stat *resultStat) EphemeralOwner() int64 {346func (stat *Stat) Pzxid() int64 {
361 return int64(stat.ephemeralOwner)347 return int64(stat.c.pzxid)
362}
363
364func (stat *resultStat) DataLength() int32 {
365 return int32(stat.dataLength)
366}
367
368func (stat *resultStat) NumChildren() int32 {
369 return int32(stat.numChildren)
370}
371
372func (stat *resultStat) Pzxid() int64 {
373 return int64(stat.pzxid)
374}348}
375349
376// -----------------------------------------------------------------------350// -----------------------------------------------------------------------
@@ -384,7 +358,7 @@
384 C.zoo_set_debug_level(C.ZooLogLevel(level))358 C.zoo_set_debug_level(C.ZooLogLevel(level))
385}359}
386360
387// Init initializes the communication with a ZooKeeper cluster. The provided361// Dial initializes the communication with a ZooKeeper cluster. The provided
388// servers parameter may include multiple server addresses, separated362// servers parameter may include multiple server addresses, separated
389// by commas, so that the client will automatically attempt to connect363// by commas, so that the client will automatically attempt to connect
390// to another server if one of them stops working for whatever reason.364// to another server if one of them stops working for whatever reason.
@@ -398,21 +372,18 @@
398// The watch channel receives events of type SESSION_EVENT when any change372// The watch channel receives events of type SESSION_EVENT when any change
399// to the state of the established connection happens. See the documentation373// to the state of the established connection happens. See the documentation
400// for the Event type for more details.374// for the Event type for more details.
401func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {375func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) {
402 zk, watch, err = internalInit(servers, recvTimeoutNS, nil)376 return dial(servers, recvTimeoutNS, nil)
403 return
404}377}
405378
406// Equivalent to Init, but attempt to reestablish an existing session379// Redial is equivalent to Dial, but attempts to reestablish an existing session
407// identified via the clientId parameter.380// identified via the clientId parameter.
408func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {381func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
409 zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)382 return dial(servers, recvTimeoutNS, clientId)
410 return
411}383}
412384
413func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {385func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
414386 zk := &Conn{}
415 zk := &ZooKeeper{}
416 zk.watchChannels = make(map[uintptr]chan Event)387 zk.watchChannels = make(map[uintptr]chan Event)
417388
418 var cId *C.clientid_t389 var cId *C.clientid_t
@@ -424,11 +395,11 @@
424 zk.sessionWatchId = watchId395 zk.sessionWatchId = watchId
425396
426 cservers := C.CString(servers)397 cservers := C.CString(servers)
427 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)398 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
428 C.free(unsafe.Pointer(cservers))399 C.free(unsafe.Pointer(cservers))
429 if handle == nil {400 if handle == nil {
430 zk.closeAllWatches()401 zk.closeAllWatches()
431 return nil, nil, newError(ZSYSTEMERROR, cerr)402 return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
432 }403 }
433 zk.handle = handle404 zk.handle = handle
434 runWatchLoop()405 runWatchLoop()
@@ -437,12 +408,12 @@
437408
438// ClientId returns the client ID for the existing session with ZooKeeper.409// ClientId returns the client ID for the existing session with ZooKeeper.
439// This is useful to reestablish an existing session via ReInit.410// This is useful to reestablish an existing session via ReInit.
440func (zk *ZooKeeper) ClientId() *ClientId {411func (zk *Conn) ClientId() *ClientId {
441 return &ClientId{*C.zoo_client_id(zk.handle)}412 return &ClientId{*C.zoo_client_id(zk.handle)}
442}413}
443414
444// Close terminates the ZooKeeper interaction.415// Close terminates the ZooKeeper interaction.
445func (zk *ZooKeeper) Close() Error {416func (zk *Conn) Close() os.Error {
446417
447 // Protect from concurrency around zk.handle change.418 // Protect from concurrency around zk.handle change.
448 zk.mutex.Lock()419 zk.mutex.Lock()
@@ -451,7 +422,7 @@
451 if zk.handle == nil {422 if zk.handle == nil {
452 // ZooKeeper may hang indefinitely if a handler is closed twice,423 // ZooKeeper may hang indefinitely if a handler is closed twice,
453 // so we get in the way and prevent it from happening.424 // so we get in the way and prevent it from happening.
454 return newError(ZCLOSING, nil)425 return ZCLOSING
455 }426 }
456 rc, cerr := C.zookeeper_close(zk.handle)427 rc, cerr := C.zookeeper_close(zk.handle)
457428
@@ -461,16 +432,13 @@
461 // At this point, nothing else should need zk.handle.432 // At this point, nothing else should need zk.handle.
462 zk.handle = nil433 zk.handle = nil
463434
464 if rc != C.ZOK {435 return zkError(rc, cerr)
465 return newError(rc, cerr)
466 }
467 return nil
468}436}
469437
470// Get returns the data and status from an existing node. err will be nil,438// Get returns the data and status from an existing node. err will be nil,
471// unless an error is found. Attempting to retrieve data from a non-existing439// unless an error is found. Attempting to retrieve data from a non-existing
472// node is an error.440// node is an error.
473func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {441func (zk *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
474442
475 cpath := C.CString(path)443 cpath := C.CString(path)
476 cbuffer := (*C.char)(C.malloc(bufferSize))444 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -478,22 +446,22 @@
478 defer C.free(unsafe.Pointer(cpath))446 defer C.free(unsafe.Pointer(cpath))
479 defer C.free(unsafe.Pointer(cbuffer))447 defer C.free(unsafe.Pointer(cbuffer))
480448
481 cstat := C.struct_Stat{}449 var cstat Stat
482 rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,450 rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,
483 cbuffer, &cbufferLen, &cstat)451 cbuffer, &cbufferLen, &cstat.c)
484 if rc != C.ZOK {452 if rc != C.ZOK {
485 return "", nil, newError(rc, cerr)453 return "", nil, zkError(rc, cerr)
486 }454 }
455
487 result := C.GoStringN(cbuffer, cbufferLen)456 result := C.GoStringN(cbuffer, cbufferLen)
488457 return result, &cstat, nil
489 return result, (*resultStat)(&cstat), nil
490}458}
491459
492// GetW works like Get but also returns a channel that will receive460// GetW works like Get but also returns a channel that will receive
493// a single Event value when the data or existence of the given ZooKeeper461// a single Event value when the data or existence of the given ZooKeeper
494// node changes or when critical session events happen. See the462// node changes or when critical session events happen. See the
495// documentation of the Event type for more details.463// documentation of the Event type for more details.
496func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {464func (zk *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) {
497465
498 cpath := C.CString(path)466 cpath := C.CString(path)
499 cbuffer := (*C.char)(C.malloc(bufferSize))467 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -503,40 +471,39 @@
503471
504 watchId, watchChannel := zk.createWatch(true)472 watchId, watchChannel := zk.createWatch(true)
505473
506 cstat := C.struct_Stat{}474 var cstat Stat
507 rc, cerr := C.zoo_wget(zk.handle, cpath,475 rc, cerr := C.zoo_wget(zk.handle, cpath,
508 C.watch_handler, unsafe.Pointer(watchId),476 C.watch_handler, unsafe.Pointer(watchId),
509 cbuffer, &cbufferLen, &cstat)477 cbuffer, &cbufferLen, &cstat.c)
510 if rc != C.ZOK {478 if rc != C.ZOK {
511 zk.forgetWatch(watchId)479 zk.forgetWatch(watchId)
512 return "", nil, nil, newError(rc, cerr)480 return "", nil, nil, zkError(rc, cerr)
513 }481 }
514482
515 result := C.GoStringN(cbuffer, cbufferLen)483 result := C.GoStringN(cbuffer, cbufferLen)
516 return result, (*resultStat)(&cstat), watchChannel, nil484 return result, &cstat, watchChannel, nil
517}485}
518486
519// Children returns the children list and status from an existing node.487// Children returns the children list and status from an existing node.
520// err will be nil, unless an error is found. Attempting to retrieve the488// Attempting to retrieve the children list from a non-existent node is an error.
521// children list from a non-existent node is an error.489func (zk *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
522func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
523490
524 cpath := C.CString(path)491 cpath := C.CString(path)
525 defer C.free(unsafe.Pointer(cpath))492 defer C.free(unsafe.Pointer(cpath))
526493
527 cvector := C.struct_String_vector{}494 cvector := C.struct_String_vector{}
528 cstat := C.struct_Stat{}495 var cstat Stat
529 rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,496 rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,
530 &cvector, &cstat)497 &cvector, &cstat.c)
531498
532 // Can't happen if rc != 0, but avoid potential memory leaks in the future.499 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
533 if cvector.count != 0 {500 if cvector.count != 0 {
534 children = parseStringVector(&cvector)501 children = parseStringVector(&cvector)
535 }502 }
536 if rc != C.ZOK {503 if rc == C.ZOK {
537 err = newError(rc, cerr)504 stat = &cstat
538 } else {505 } else {
539 stat = (*resultStat)(&cstat)506 err = zkError(rc, cerr)
540 }507 }
541 return508 return
542}509}
@@ -545,7 +512,7 @@
545// receive a single Event value when a node is added or removed under the512// receive a single Event value when a node is added or removed under the
546// provided path or when critical session events happen. See the documentation513// provided path or when critical session events happen. See the documentation
547// of the Event type for more details.514// of the Event type for more details.
548func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {515func (zk *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) {
549516
550 cpath := C.CString(path)517 cpath := C.CString(path)
551 defer C.free(unsafe.Pointer(cpath))518 defer C.free(unsafe.Pointer(cpath))
@@ -553,21 +520,21 @@
553 watchId, watchChannel := zk.createWatch(true)520 watchId, watchChannel := zk.createWatch(true)
554521
555 cvector := C.struct_String_vector{}522 cvector := C.struct_String_vector{}
556 cstat := C.struct_Stat{}523 var cstat Stat
557 rc, cerr := C.zoo_wget_children2(zk.handle, cpath,524 rc, cerr := C.zoo_wget_children2(zk.handle, cpath,
558 C.watch_handler, unsafe.Pointer(watchId),525 C.watch_handler, unsafe.Pointer(watchId),
559 &cvector, &cstat)526 &cvector, &cstat.c)
560527
561 // Can't happen if rc != 0, but avoid potential memory leaks in the future.528 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
562 if cvector.count != 0 {529 if cvector.count != 0 {
563 children = parseStringVector(&cvector)530 children = parseStringVector(&cvector)
564 }531 }
565 if rc != C.ZOK {532 if rc == C.ZOK {
533 stat = &cstat
534 watch = watchChannel
535 } else {
566 zk.forgetWatch(watchId)536 zk.forgetWatch(watchId)
567 err = newError(rc, cerr)537 err = zkError(rc, cerr)
568 } else {
569 stat = (*resultStat)(&cstat)
570 watch = watchChannel
571 }538 }
572 return539 return
573}540}
@@ -588,20 +555,20 @@
588// Exists checks if a node exists at the given path. If it does,555// Exists checks if a node exists at the given path. If it does,
589// stat will contain meta information on the existing node, otherwise556// stat will contain meta information on the existing node, otherwise
590// it will be nil.557// it will be nil.
591func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {558func (zk *Conn) Exists(path string) (stat *Stat, err os.Error) {
592 cpath := C.CString(path)559 cpath := C.CString(path)
593 defer C.free(unsafe.Pointer(cpath))560 defer C.free(unsafe.Pointer(cpath))
594561
595 cstat := C.struct_Stat{}562 var cstat Stat
596 rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)563 rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &stat.c)
597564
598 // We diverge a bit from the usual here: a ZNONODE is not an error565 // We diverge a bit from the usual here: a ZNONODE is not an error
599 // for an exists call, otherwise every Exists call would have to check566 // for an exists call, otherwise every Exists call would have to check
600 // for err != nil and err.Code() != ZNONODE.567 // for err != nil and err.Code() != ZNONODE.
601 if rc == C.ZOK {568 if rc == C.ZOK {
602 stat = (*resultStat)(&cstat)569 stat = &cstat
603 } else if rc != C.ZNONODE {570 } else if rc != C.ZNONODE {
604 err = newError(rc, cerr)571 err = zkError(rc, cerr)
605 }572 }
606 return573 return
607}574}
@@ -611,28 +578,28 @@
611// stat is nil and the node didn't exist, or when the existing node578// stat is nil and the node didn't exist, or when the existing node
612// is removed. It will also receive critical session events. See the579// is removed. It will also receive critical session events. See the
613// documentation of the Event type for more details.580// documentation of the Event type for more details.
614func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {581func (zk *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) {
615 cpath := C.CString(path)582 cpath := C.CString(path)
616 defer C.free(unsafe.Pointer(cpath))583 defer C.free(unsafe.Pointer(cpath))
617584
618 watchId, watchChannel := zk.createWatch(true)585 watchId, watchChannel := zk.createWatch(true)
619586
620 cstat := C.struct_Stat{}587 var cstat Stat
621 rc, cerr := C.zoo_wexists(zk.handle, cpath,588 rc, cerr := C.zoo_wexists(zk.handle, cpath,
622 C.watch_handler, unsafe.Pointer(watchId), &cstat)589 C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
623590
624 // We diverge a bit from the usual here: a ZNONODE is not an error591 // We diverge a bit from the usual here: a ZNONODE is not an error
625 // for an exists call, otherwise every Exists call would have to check592 // for an exists call, otherwise every Exists call would have to check
626 // for err != nil and err.Code() != ZNONODE.593 // for err != nil and err.Code() != ZNONODE.
627 switch rc {594 switch Error(rc) {
628 case ZOK:595 case ZOK:
629 stat = (*resultStat)(&cstat)596 stat = &cstat
630 watch = watchChannel597 watch = watchChannel
631 case ZNONODE:598 case ZNONODE:
632 watch = watchChannel599 watch = watchChannel
633 default:600 default:
634 zk.forgetWatch(watchId)601 zk.forgetWatch(watchId)
635 err = newError(rc, cerr)602 err = zkError(rc, cerr)
636 }603 }
637 return604 return
638}605}
@@ -646,7 +613,7 @@
646// The returned path is useful in cases where the created path may differ613// The returned path is useful in cases where the created path may differ
647// from the requested one, such as when a sequence number is appended614// from the requested one, such as when a sequence number is appended
648// to it due to the use of the gozk.SEQUENCE flag.615// to it due to the use of the gozk.SEQUENCE flag.
649func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {616func (zk *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
650 cpath := C.CString(path)617 cpath := C.CString(path)
651 cvalue := C.CString(value)618 cvalue := C.CString(value)
652 defer C.free(unsafe.Pointer(cpath))619 defer C.free(unsafe.Pointer(cpath))
@@ -662,11 +629,12 @@
662629
663 rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),630 rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),
664 caclv, C.int(flags), cpathCreated, C.int(cpathLen))631 caclv, C.int(flags), cpathCreated, C.int(cpathLen))
665 if rc != C.ZOK {632 if rc == C.ZOK {
666 return "", newError(rc, cerr)633 pathCreated = C.GoString(cpathCreated)
634 } else {
635 err = zkError(rc, cerr)
667 }636 }
668637 return
669 return C.GoString(cpathCreated), nil
670}638}
671639
672// Set modifies the data for the existing node at the given path, replacing it640// Set modifies the data for the existing node at the given path, replacing it
@@ -677,35 +645,32 @@
677//645//
678// It is an error to attempt to set the data of a non-existing node with646// It is an error to attempt to set the data of a non-existing node with
679// this function. In these cases, use Create instead.647// this function. In these cases, use Create instead.
680func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {648func (zk *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
681649
682 cpath := C.CString(path)650 cpath := C.CString(path)
683 cvalue := C.CString(value)651 cvalue := C.CString(value)
684 defer C.free(unsafe.Pointer(cpath))652 defer C.free(unsafe.Pointer(cpath))
685 defer C.free(unsafe.Pointer(cvalue))653 defer C.free(unsafe.Pointer(cvalue))
686654
687 cstat := C.struct_Stat{}655 var cstat Stat
688
689 rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),656 rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),
690 C.int(version), &cstat)657 C.int(version), &cstat.c)
691 if rc != C.ZOK {658 if rc == C.ZOK {
692 return nil, newError(rc, cerr)659 stat = &cstat
660 } else {
661 err = zkError(rc, cerr)
693 }662 }
694663 return
695 return (*resultStat)(&cstat), nil
696}664}
697665
698// Delete removes the node at path. If version is not -1, the operation666// Delete removes the node at path. If version is not -1, the operation
699// will only succeed if the node is still at this version when the667// will only succeed if the node is still at this version when the
700// node is deleted as an atomic operation.668// node is deleted as an atomic operation.
701func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {669func (zk *Conn) Delete(path string, version int32) (err os.Error) {
702 cpath := C.CString(path)670 cpath := C.CString(path)
703 defer C.free(unsafe.Pointer(cpath))671 defer C.free(unsafe.Pointer(cpath))
704 rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))672 rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))
705 if rc != C.ZOK {673 return zkError(rc, cerr)
706 return newError(rc, cerr)
707 }
708 return nil
709}674}
710675
711// AddAuth adds a new authentication certificate to the ZooKeeper676// AddAuth adds a new authentication certificate to the ZooKeeper
@@ -713,7 +678,7 @@
713// authentication information, while the cert parameter provides the678// authentication information, while the cert parameter provides the
714// identity data itself. For instance, the "digest" scheme requires679// identity data itself. For instance, the "digest" scheme requires
715// a pair like "username:password" to be provided as the certificate.680// a pair like "username:password" to be provided as the certificate.
716func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {681func (zk *Conn) AddAuth(scheme, cert string) os.Error {
717 cscheme := C.CString(scheme)682 cscheme := C.CString(scheme)
718 ccert := C.CString(cert)683 ccert := C.CString(cert)
719 defer C.free(unsafe.Pointer(cscheme))684 defer C.free(unsafe.Pointer(cscheme))
@@ -728,40 +693,36 @@
728 rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),693 rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),
729 C.handle_void_completion, unsafe.Pointer(data))694 C.handle_void_completion, unsafe.Pointer(data))
730 if rc != C.ZOK {695 if rc != C.ZOK {
731 return newError(rc, cerr)696 return zkError(rc, cerr)
732 }697 }
733698
734 C.wait_for_completion(data)699 C.wait_for_completion(data)
735700
736 rc = C.int(uintptr(data.data))701 rc = C.int(uintptr(data.data))
737 if rc != C.ZOK {702 return zkError(rc, nil)
738 return newError(rc, nil)
739 }
740
741 return nil
742}703}
743704
744// ACL returns the access control list for path.705// ACL returns the access control list for path.
745func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {706func (zk *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
746707
747 cpath := C.CString(path)708 cpath := C.CString(path)
748 defer C.free(unsafe.Pointer(cpath))709 defer C.free(unsafe.Pointer(cpath))
749710
750 caclv := C.struct_ACL_vector{}711 caclv := C.struct_ACL_vector{}
751 cstat := C.struct_Stat{}
752712
753 rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)713 var cstat Stat
714 rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat.c)
754 if rc != C.ZOK {715 if rc != C.ZOK {
755 return nil, nil, newError(rc, cerr)716 return nil, nil, zkError(rc, cerr)
756 }717 }
757718
758 aclv := parseACLVector(&caclv)719 aclv := parseACLVector(&caclv)
759720
760 return aclv, (*resultStat)(&cstat), nil721 return aclv, &cstat, nil
761}722}
762723
763// SetACL changes the access control list for path.724// SetACL changes the access control list for path.
764func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {725func (zk *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
765726
766 cpath := C.CString(path)727 cpath := C.CString(path)
767 defer C.free(unsafe.Pointer(cpath))728 defer C.free(unsafe.Pointer(cpath))
@@ -770,11 +731,7 @@
770 defer C.deallocate_ACL_vector(caclv)731 defer C.deallocate_ACL_vector(caclv)
771732
772 rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)733 rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)
773 if rc != C.ZOK {734 return zkError(rc, cerr)
774 return newError(rc, cerr)
775 }
776
777 return nil
778}735}
779736
780func parseACLVector(caclv *C.struct_ACL_vector) []ACL {737func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
@@ -822,7 +779,7 @@
822// -----------------------------------------------------------------------779// -----------------------------------------------------------------------
823// RetryChange utility method.780// RetryChange utility method.
824781
825type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)782type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
826783
827// RetryChange runs changeFunc to attempt to atomically change path784// RetryChange runs changeFunc to attempt to atomically change path
828// in a lock free manner, and retries in case there was another785// in a lock free manner, and retries in case there was another
@@ -831,8 +788,7 @@
831// changeFunc must work correctly if called multiple times in case788// changeFunc must work correctly if called multiple times in case
832// the modification fails due to concurrent changes, and it may return789// the modification fails due to concurrent changes, and it may return
833// an error that will cause the the RetryChange function to stop and790// an error that will cause the the RetryChange function to stop and
834// return an Error with code ZSYSTEMERROR and the same .String() result791// return the same error.
835// as the provided error.
836//792//
837// This mechanism is not suitable for a node that is frequently modified793// This mechanism is not suitable for a node that is frequently modified
838// concurrently. For those cases, consider using a pessimistic locking794// concurrently. For those cases, consider using a pessimistic locking
@@ -845,8 +801,7 @@
845//801//
846// 2. Call the changeFunc with the current node value and stat,802// 2. Call the changeFunc with the current node value and stat,
847// or with an empty string and nil stat, if the node doesn't yet exist.803// or with an empty string and nil stat, if the node doesn't yet exist.
848// If the changeFunc returns an error, stop and return an Error with804// If the changeFunc returns an error, stop and return the same error.
849// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
850//805//
851// 3. If the changeFunc returns no errors, use the string returned as806// 3. If the changeFunc returns no errors, use the string returned as
852// the new candidate value for the node, and attempt to either create807// the new candidate value for the node, and attempt to either create
@@ -855,36 +810,32 @@
855// in the same node), repeat from step 1. If this procedure fails with any810// in the same node), repeat from step 1. If this procedure fails with any
856// other error, stop and return the error found.811// other error, stop and return the error found.
857//812//
858func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {813func (zk *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
859 for {814 for {
860 oldValue, oldStat, getErr := zk.Get(path)815 oldValue, oldStat, err := zk.Get(path)
861 if getErr != nil && getErr.Code() != ZNONODE {816 if err != nil && err != ZNONODE {
862 err = getErr817 return err
863 break818 }
864 }819 newValue, err := changeFunc(oldValue, oldStat)
865 newValue, osErr := changeFunc(oldValue, oldStat)820 if err != nil {
866 if osErr != nil {821 return err
867 return newError(ZSYSTEMERROR, osErr)822 }
868 } else if oldStat == nil {823 if oldStat == nil {
869 _, err = zk.Create(path, newValue, flags, acl)824 _, err := zk.Create(path, newValue, flags, acl)
870 if err == nil || err.Code() != ZNODEEXISTS {825 if err == nil || err != ZNODEEXISTS {
871 break826 return err
872 }827 }
873 } else if newValue == oldValue {828 continue
829 }
830 if newValue == oldValue {
874 return nil // Nothing to do.831 return nil // Nothing to do.
875 } else {832 }
876 _, err = zk.Set(path, newValue, oldStat.Version())833 _, err = zk.Set(path, newValue, oldStat.Version())
877 if err == nil {834 if err == nil || (err != ZBADVERSION && err != ZNONODE) {
878 break835 return err
879 } else {
880 code := err.Code()
881 if code != ZBADVERSION && code != ZNONODE {
882 break
883 }
884 }
885 }836 }
886 }837 }
887 return err838 panic("not reached")
888}839}
889840
890// -----------------------------------------------------------------------841// -----------------------------------------------------------------------
@@ -897,7 +848,7 @@
897// Whenever a *W method is called, it will return a channel which848// Whenever a *W method is called, it will return a channel which
898// outputs Event values. Internally, a map is used to maintain references849// outputs Event values. Internally, a map is used to maintain references
899// between an unique integer key (the watchId), and the event channel. The850// between an unique integer key (the watchId), and the event channel. The
900// watchId is then handed to the C zookeeper library as the watch context,851// watchId is then handed to the C ZooKeeper library as the watch context,
901// so that we get it back when events happen. Using an integer key as the852// so that we get it back when events happen. Using an integer key as the
902// watch context rather than a pointer is needed because there's no guarantee853// watch context rather than a pointer is needed because there's no guarantee
903// that in the future the GC will not move objects around, and also because854// that in the future the GC will not move objects around, and also because
@@ -910,13 +861,13 @@
910// Since Cgo doesn't allow calling back into Go, we actually fire a new861// Since Cgo doesn't allow calling back into Go, we actually fire a new
911// goroutine the very first time Init is called, and allow it to block862// goroutine the very first time Init is called, and allow it to block
912// in a pthread condition variable within a C function. This condition863// in a pthread condition variable within a C function. This condition
913// will only be notified once a zookeeper watch callback appends new864// will only be notified once a ZooKeeper watch callback appends new
914// entries to the event list. When this happens, the C function returns865// entries to the event list. When this happens, the C function returns
915// and we get back into Go land with the pointer to the watch data,866// and we get back into Go land with the pointer to the watch data,
916// including the watchId and other event details such as type and path.867// including the watchId and other event details such as type and path.
917868
918var watchMutex sync.Mutex869var watchMutex sync.Mutex
919var watchZooKeepers = make(map[uintptr]*ZooKeeper)870var watchConns = make(map[uintptr]*Conn)
920var watchCounter uintptr871var watchCounter uintptr
921var watchLoopCounter int872var watchLoopCounter int
922873
@@ -925,14 +876,14 @@
925// mostly as a debugging and testing aid.876// mostly as a debugging and testing aid.
926func CountPendingWatches() int {877func CountPendingWatches() int {
927 watchMutex.Lock()878 watchMutex.Lock()
928 count := len(watchZooKeepers)879 count := len(watchConns)
929 watchMutex.Unlock()880 watchMutex.Unlock()
930 return count881 return count
931}882}
932883
933// createWatch creates and registers a watch, returning the watch id884// createWatch creates and registers a watch, returning the watch id
934// and channel.885// and channel.
935func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {886func (zk *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
936 buf := 1 // session/watch event887 buf := 1 // session/watch event
937 if session {888 if session {
938 buf = 32889 buf = 32
@@ -943,7 +894,7 @@
943 watchId = watchCounter894 watchId = watchCounter
944 watchCounter += 1895 watchCounter += 1
945 zk.watchChannels[watchId] = watchChannel896 zk.watchChannels[watchId] = watchChannel
946 watchZooKeepers[watchId] = zk897 watchConns[watchId] = zk
947 return898 return
948}899}
949900
@@ -951,21 +902,21 @@
951// from ever getting delivered. It shouldn't be used if there's any902// from ever getting delivered. It shouldn't be used if there's any
952// chance the watch channel is still visible and not closed, since903// chance the watch channel is still visible and not closed, since
953// it might mean a goroutine would be blocked forever.904// it might mean a goroutine would be blocked forever.
954func (zk *ZooKeeper) forgetWatch(watchId uintptr) {905func (zk *Conn) forgetWatch(watchId uintptr) {
955 watchMutex.Lock()906 watchMutex.Lock()
956 defer watchMutex.Unlock()907 defer watchMutex.Unlock()
957 zk.watchChannels[watchId] = nil, false908 zk.watchChannels[watchId] = nil, false
958 watchZooKeepers[watchId] = nil, false909 watchConns[watchId] = nil, false
959}910}
960911
961// closeAllWatches closes all watch channels for zk.912// closeAllWatches closes all watch channels for zk.
962func (zk *ZooKeeper) closeAllWatches() {913func (zk *Conn) closeAllWatches() {
963 watchMutex.Lock()914 watchMutex.Lock()
964 defer watchMutex.Unlock()915 defer watchMutex.Unlock()
965 for watchId, ch := range zk.watchChannels {916 for watchId, ch := range zk.watchChannels {
966 close(ch)917 close(ch)
967 zk.watchChannels[watchId] = nil, false918 zk.watchChannels[watchId] = nil, false
968 watchZooKeepers[watchId] = nil, false919 watchConns[watchId] = nil, false
969 }920 }
970}921}
971922
@@ -978,7 +929,7 @@
978 }929 }
979 watchMutex.Lock()930 watchMutex.Lock()
980 defer watchMutex.Unlock()931 defer watchMutex.Unlock()
981 zk, ok := watchZooKeepers[watchId]932 zk, ok := watchConns[watchId]
982 if !ok {933 if !ok {
983 return934 return
984 }935 }
@@ -1010,7 +961,7 @@
1010 }961 }
1011 if watchId != zk.sessionWatchId {962 if watchId != zk.sessionWatchId {
1012 zk.watchChannels[watchId] = nil, false963 zk.watchChannels[watchId] = nil, false
1013 watchZooKeepers[watchId] = nil, false964 watchConns[watchId] = nil, false
1014 close(ch)965 close(ch)
1015 }966 }
1016}967}
1017968
=== renamed file 'gozk_test.go' => 'zookeeper_test.go'
--- gozk_test.go 2011-08-19 01:51:37 +0000
+++ zookeeper_test.go 2011-09-24 08:31:23 +0000
@@ -1,15 +1,15 @@
1package gozk_test1package zookeeper_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/zookeeper"
6 "time"6 "time"
7)7)
88
9// This error will be delivered via C errno, since ZK unfortunately9// This error will be delivered via C errno, since ZK unfortunately
10// only provides the handler back from zookeeper_init().10// only provides the handler back from zookeeper_init().
11func (s *S) TestInitErrorThroughErrno(c *C) {11func (s *S) TestInitErrorThroughErrno(c *C) {
12 zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)12 zk, watch, err := zookeeper.Dial("bad-domain-without-port", 5e9)
13 if zk != nil {13 if zk != nil {
14 zk.Close()14 zk.Close()
15 }15 }
@@ -29,7 +29,7 @@
29}29}
3030
31func (s *S) TestRecvTimeoutInitParameter(c *C) {31func (s *S) TestRecvTimeoutInitParameter(c *C) {
32 zk, watch, err := gozk.Init(s.zkAddr, 0)32 zk, watch, err := zookeeper.Dial(s.zkAddr, 0)
33 c.Assert(err, IsNil)33 c.Assert(err, IsNil)
34 defer zk.Close()34 defer zk.Close()
3535
@@ -51,38 +51,38 @@
51}51}
5252
53func (s *S) TestSessionWatches(c *C) {53func (s *S) TestSessionWatches(c *C) {
54 c.Assert(gozk.CountPendingWatches(), Equals, 0)54 c.Assert(zookeeper.CountPendingWatches(), Equals, 0)
5555
56 zk1, watch1 := s.init(c)56 zk1, watch1 := s.init(c)
57 zk2, watch2 := s.init(c)57 zk2, watch2 := s.init(c)
58 zk3, watch3 := s.init(c)58 zk3, watch3 := s.init(c)
5959
60 c.Assert(gozk.CountPendingWatches(), Equals, 3)60 c.Assert(zookeeper.CountPendingWatches(), Equals, 3)
6161
62 event1 := <-watch162 event1 := <-watch1
63 c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)63 c.Assert(event1.Type, Equals, zookeeper.EVENT_SESSION)
64 c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)64 c.Assert(event1.State, Equals, zookeeper.STATE_CONNECTED)
6565
66 c.Assert(gozk.CountPendingWatches(), Equals, 3)66 c.Assert(zookeeper.CountPendingWatches(), Equals, 3)
6767
68 event2 := <-watch268 event2 := <-watch2
69 c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)69 c.Assert(event2.Type, Equals, zookeeper.EVENT_SESSION)
70 c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)70 c.Assert(event2.State, Equals, zookeeper.STATE_CONNECTED)
7171
72 c.Assert(gozk.CountPendingWatches(), Equals, 3)72 c.Assert(zookeeper.CountPendingWatches(), Equals, 3)
7373
74 event3 := <-watch374 event3 := <-watch3
75 c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)75 c.Assert(event3.Type, Equals, zookeeper.EVENT_SESSION)
76 c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)76 c.Assert(event3.State, Equals, zookeeper.STATE_CONNECTED)
7777
78 c.Assert(gozk.CountPendingWatches(), Equals, 3)78 c.Assert(zookeeper.CountPendingWatches(), Equals, 3)
7979
80 zk1.Close()80 zk1.Close()
81 c.Assert(gozk.CountPendingWatches(), Equals, 2)81 c.Assert(zookeeper.CountPendingWatches(), Equals, 2)
82 zk2.Close()82 zk2.Close()
83 c.Assert(gozk.CountPendingWatches(), Equals, 1)83 c.Assert(zookeeper.CountPendingWatches(), Equals, 1)
84 zk3.Close()84 zk3.Close()
85 c.Assert(gozk.CountPendingWatches(), Equals, 0)85 c.Assert(zookeeper.CountPendingWatches(), Equals, 0)
86}86}
8787
88// Gozk injects a STATE_CLOSED event when zk.Close() is called, right88// Gozk injects a STATE_CLOSED event when zk.Close() is called, right
@@ -95,32 +95,35 @@
95 zk, watch := s.init(c)95 zk, watch := s.init(c)
9696
97 event := <-watch97 event := <-watch
98 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)98 c.Assert(event.Type, Equals, zookeeper.EVENT_SESSION)
99 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)99 c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED)
100100
101 zk.Close()101 zk.Close()
102 event, ok := <-watch102 event, ok := <-watch
103 c.Assert(ok, Equals, false)103 c.Assert(ok, Equals, false)
104 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)104 c.Assert(event.Type, Equals, zookeeper.EVENT_CLOSED)
105 c.Assert(event.State, Equals, gozk.STATE_CLOSED)105 c.Assert(event.State, Equals, zookeeper.STATE_CLOSED)
106}106}
107107
108func (s *S) TestEventString(c *C) {108func (s *S) TestEventString(c *C) {
109 var event gozk.Event109 var event zookeeper.Event
110 event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}110 event = zookeeper.Event{zookeeper.EVENT_SESSION, "/path", zookeeper.STATE_CONNECTED}
111 c.Assert(event, Matches, "ZooKeeper connected")111 c.Assert(event, Matches, "ZooKeeper connected")
112 event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}112 event = zookeeper.Event{zookeeper.EVENT_CREATED, "/path", zookeeper.STATE_CONNECTED}
113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")
114 event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}114 event = zookeeper.Event{-1, "/path", zookeeper.STATE_CLOSED}
115 c.Assert(event, Matches, "ZooKeeper connection closed")115 c.Assert(event, Matches, "ZooKeeper connection closed")
116}116}
117117
118var okTests = []struct{gozk.Event; Ok bool}{118var okTests = []struct {
119 {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},119 zookeeper.Event
120 {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},120 Ok bool
121 {gozk.Event{0, "", gozk.STATE_CLOSED}, false},121}{
122 {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},122 {zookeeper.Event{zookeeper.EVENT_SESSION, "", zookeeper.STATE_CONNECTED}, true},
123 {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},123 {zookeeper.Event{zookeeper.EVENT_CREATED, "", zookeeper.STATE_CONNECTED}, true},
124 {zookeeper.Event{0, "", zookeeper.STATE_CLOSED}, false},
125 {zookeeper.Event{0, "", zookeeper.STATE_EXPIRED_SESSION}, false},
126 {zookeeper.Event{0, "", zookeeper.STATE_AUTH_FAILED}, false},
124}127}
125128
126func (s *S) TestEventOk(c *C) {129func (s *S) TestEventOk(c *C) {
@@ -156,18 +159,18 @@
156 c.Assert(data, Equals, "")159 c.Assert(data, Equals, "")
157 c.Assert(stat, IsNil)160 c.Assert(stat, IsNil)
158 c.Assert(err, Matches, "no node")161 c.Assert(err, Matches, "no node")
159 c.Assert(err.Code(), Equals, gozk.ZNONODE)162 c.Assert(err, Equals, zookeeper.ZNONODE)
160}163}
161164
162func (s *S) TestCreateAndGet(c *C) {165func (s *S) TestCreateAndGet(c *C) {
163 zk, _ := s.init(c)166 zk, _ := s.init(c)
164167
165 path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))168 path, err := zk.Create("/test-", "bababum", zookeeper.SEQUENCE|zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
166 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
167 c.Assert(path, Matches, "/test-[0-9]+")170 c.Assert(path, Matches, "/test-[0-9]+")
168171
169 // Check the error condition from Create().172 // Check the error condition from Create().
170 _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))173 _, err = zk.Create(path, "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
171 c.Assert(err, Matches, "node exists")174 c.Assert(err, Matches, "node exists")
172175
173 data, _, err := zk.Get(path)176 data, _, err := zk.Get(path)
@@ -178,7 +181,7 @@
178func (s *S) TestCreateSetAndGet(c *C) {181func (s *S) TestCreateSetAndGet(c *C) {
179 zk, _ := s.init(c)182 zk, _ := s.init(c)
180183
181 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))184 _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
182 c.Assert(err, IsNil)185 c.Assert(err, IsNil)
183186
184 stat, err := zk.Set("/test", "bababum", -1) // Any version.187 stat, err := zk.Set("/test", "bababum", -1) // Any version.
@@ -191,13 +194,13 @@
191}194}
192195
193func (s *S) TestGetAndWatch(c *C) {196func (s *S) TestGetAndWatch(c *C) {
194 c.Check(gozk.CountPendingWatches(), Equals, 0)197 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
195198
196 zk, _ := s.init(c)199 zk, _ := s.init(c)
197200
198 c.Check(gozk.CountPendingWatches(), Equals, 1)201 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
199202
200 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))203 _, err := zk.Create("/test", "one", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
201 c.Assert(err, IsNil)204 c.Assert(err, IsNil)
202205
203 data, stat, watch, err := zk.GetW("/test")206 data, stat, watch, err := zk.GetW("/test")
@@ -211,15 +214,15 @@
211 default:214 default:
212 }215 }
213216
214 c.Check(gozk.CountPendingWatches(), Equals, 2)217 c.Check(zookeeper.CountPendingWatches(), Equals, 2)
215218
216 _, err = zk.Set("/test", "two", -1)219 _, err = zk.Set("/test", "two", -1)
217 c.Assert(err, IsNil)220 c.Assert(err, IsNil)
218221
219 event := <-watch222 event := <-watch
220 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)223 c.Assert(event.Type, Equals, zookeeper.EVENT_CHANGED)
221224
222 c.Check(gozk.CountPendingWatches(), Equals, 1)225 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
223226
224 data, _, watch, err = zk.GetW("/test")227 data, _, watch, err = zk.GetW("/test")
225 c.Assert(err, IsNil)228 c.Assert(err, IsNil)
@@ -231,50 +234,50 @@
231 default:234 default:
232 }235 }
233236
234 c.Check(gozk.CountPendingWatches(), Equals, 2)237 c.Check(zookeeper.CountPendingWatches(), Equals, 2)
235238
236 _, err = zk.Set("/test", "three", -1)239 _, err = zk.Set("/test", "three", -1)
237 c.Assert(err, IsNil)240 c.Assert(err, IsNil)
238241
239 event = <-watch242 event = <-watch
240 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)243 c.Assert(event.Type, Equals, zookeeper.EVENT_CHANGED)
241244
242 c.Check(gozk.CountPendingWatches(), Equals, 1)245 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
243}246}
244247
245func (s *S) TestGetAndWatchWithError(c *C) {248func (s *S) TestGetAndWatchWithError(c *C) {
246 c.Check(gozk.CountPendingWatches(), Equals, 0)249 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
247250
248 zk, _ := s.init(c)251 zk, _ := s.init(c)
249252
250 c.Check(gozk.CountPendingWatches(), Equals, 1)253 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
251254
252 _, _, watch, err := zk.GetW("/test")255 _, _, watch, err := zk.GetW("/test")
253 c.Assert(err, NotNil)256 c.Assert(err, NotNil)
254 c.Assert(err.Code(), Equals, gozk.ZNONODE)257 c.Assert(err, Equals, zookeeper.ZNONODE)
255 c.Assert(watch, IsNil)258 c.Assert(watch, IsNil)
256259
257 c.Check(gozk.CountPendingWatches(), Equals, 1)260 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
258}261}
259262
260func (s *S) TestCloseReleasesWatches(c *C) {263func (s *S) TestCloseReleasesWatches(c *C) {
261 c.Check(gozk.CountPendingWatches(), Equals, 0)264 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
262265
263 zk, _ := s.init(c)266 zk, _ := s.init(c)
264267
265 c.Check(gozk.CountPendingWatches(), Equals, 1)268 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
266269
267 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))270 _, err := zk.Create("/test", "one", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
268 c.Assert(err, IsNil)271 c.Assert(err, IsNil)
269272
270 _, _, _, err = zk.GetW("/test")273 _, _, _, err = zk.GetW("/test")
271 c.Assert(err, IsNil)274 c.Assert(err, IsNil)
272275
273 c.Assert(gozk.CountPendingWatches(), Equals, 2)276 c.Assert(zookeeper.CountPendingWatches(), Equals, 2)
274277
275 zk.Close()278 zk.Close()
276279
277 c.Assert(gozk.CountPendingWatches(), Equals, 0)280 c.Assert(zookeeper.CountPendingWatches(), Equals, 0)
278}281}
279282
280// By default, the ZooKeeper C client will hang indefinitely if a283// By default, the ZooKeeper C client will hang indefinitely if a
@@ -285,7 +288,7 @@
285 c.Assert(err, IsNil)288 c.Assert(err, IsNil)
286 err = zk.Close()289 err = zk.Close()
287 c.Assert(err, NotNil)290 c.Assert(err, NotNil)
288 c.Assert(err.Code(), Equals, gozk.ZCLOSING)291 c.Assert(err, Equals, zookeeper.ZCLOSING)
289}292}
290293
291func (s *S) TestChildren(c *C) {294func (s *S) TestChildren(c *C) {
@@ -298,17 +301,17 @@
298301
299 children, stat, err = zk.Children("/non-existent")302 children, stat, err = zk.Children("/non-existent")
300 c.Assert(err, NotNil)303 c.Assert(err, NotNil)
301 c.Assert(err.Code(), Equals, gozk.ZNONODE)304 c.Assert(err, Equals, zookeeper.ZNONODE)
302 c.Assert(children, Equals, []string{})305 c.Assert(children, Equals, []string{})
303 c.Assert(stat, Equals, nil)306 c.Assert(stat, IsNil)
304}307}
305308
306func (s *S) TestChildrenAndWatch(c *C) {309func (s *S) TestChildrenAndWatch(c *C) {
307 c.Check(gozk.CountPendingWatches(), Equals, 0)310 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
308311
309 zk, _ := s.init(c)312 zk, _ := s.init(c)
310313
311 c.Check(gozk.CountPendingWatches(), Equals, 1)314 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
312315
313 children, stat, watch, err := zk.ChildrenW("/")316 children, stat, watch, err := zk.ChildrenW("/")
314 c.Assert(err, IsNil)317 c.Assert(err, IsNil)
@@ -321,16 +324,16 @@
321 default:324 default:
322 }325 }
323326
324 c.Check(gozk.CountPendingWatches(), Equals, 2)327 c.Check(zookeeper.CountPendingWatches(), Equals, 2)
325328
326 _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))329 _, err = zk.Create("/test1", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
327 c.Assert(err, IsNil)330 c.Assert(err, IsNil)
328331
329 event := <-watch332 event := <-watch
330 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)333 c.Assert(event.Type, Equals, zookeeper.EVENT_CHILD)
331 c.Assert(event.Path, Equals, "/")334 c.Assert(event.Path, Equals, "/")
332335
333 c.Check(gozk.CountPendingWatches(), Equals, 1)336 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
334337
335 children, stat, watch, err = zk.ChildrenW("/")338 children, stat, watch, err = zk.ChildrenW("/")
336 c.Assert(err, IsNil)339 c.Assert(err, IsNil)
@@ -345,57 +348,56 @@
345 default:348 default:
346 }349 }
347350
348 c.Check(gozk.CountPendingWatches(), Equals, 2)351 c.Check(zookeeper.CountPendingWatches(), Equals, 2)
349352
350 _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))353 _, err = zk.Create("/test2", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
351 c.Assert(err, IsNil)354 c.Assert(err, IsNil)
352355
353 event = <-watch356 event = <-watch
354 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)357 c.Assert(event.Type, Equals, zookeeper.EVENT_CHILD)
355358
356 c.Check(gozk.CountPendingWatches(), Equals, 1)359 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
357}360}
358361
359func (s *S) TestChildrenAndWatchWithError(c *C) {362func (s *S) TestChildrenAndWatchWithError(c *C) {
360 c.Check(gozk.CountPendingWatches(), Equals, 0)363 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
361364
362 zk, _ := s.init(c)365 zk, _ := s.init(c)
363366
364 c.Check(gozk.CountPendingWatches(), Equals, 1)367 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
365368
366 _, stat, watch, err := zk.ChildrenW("/test")369 _, stat, watch, err := zk.ChildrenW("/test")
367 c.Assert(err, NotNil)370 c.Assert(err, NotNil)
368 c.Assert(err.Code(), Equals, gozk.ZNONODE)371 c.Assert(err, Equals, zookeeper.ZNONODE)
369 c.Assert(watch, IsNil)372 c.Assert(watch, IsNil)
370 c.Assert(stat, IsNil)373 c.Assert(stat, IsNil)
371374
372 c.Check(gozk.CountPendingWatches(), Equals, 1)375 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
373}376}
374377
375func (s *S) TestExists(c *C) {378func (s *S) TestExists(c *C) {
376 zk, _ := s.init(c)379 zk, _ := s.init(c)
377380
378 stat, err := zk.Exists("/zookeeper")381 stat, err := zk.Exists("/non-existent")
379 c.Assert(err, IsNil)
380 c.Assert(stat.NumChildren(), Equals, int32(1))
381
382 stat, err = zk.Exists("/non-existent")
383 c.Assert(err, IsNil)382 c.Assert(err, IsNil)
384 c.Assert(stat, IsNil)383 c.Assert(stat, IsNil)
384
385 stat, err = zk.Exists("/zookeeper")
386 c.Assert(err, IsNil)
385}387}
386388
387func (s *S) TestExistsAndWatch(c *C) {389func (s *S) TestExistsAndWatch(c *C) {
388 c.Check(gozk.CountPendingWatches(), Equals, 0)390 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
389391
390 zk, _ := s.init(c)392 zk, _ := s.init(c)
391393
392 c.Check(gozk.CountPendingWatches(), Equals, 1)394 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
393395
394 stat, watch, err := zk.ExistsW("/test")396 stat, watch, err := zk.ExistsW("/test")
395 c.Assert(err, IsNil)397 c.Assert(err, IsNil)
396 c.Assert(stat, IsNil)398 c.Assert(stat, IsNil)
397399
398 c.Check(gozk.CountPendingWatches(), Equals, 2)400 c.Check(zookeeper.CountPendingWatches(), Equals, 2)
399401
400 select {402 select {
401 case <-watch:403 case <-watch:
@@ -403,62 +405,62 @@
403 default:405 default:
404 }406 }
405407
406 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))408 _, err = zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
407 c.Assert(err, IsNil)409 c.Assert(err, IsNil)
408410
409 event := <-watch411 event := <-watch
410 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)412 c.Assert(event.Type, Equals, zookeeper.EVENT_CREATED)
411 c.Assert(event.Path, Equals, "/test")413 c.Assert(event.Path, Equals, "/test")
412414
413 c.Check(gozk.CountPendingWatches(), Equals, 1)415 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
414416
415 stat, watch, err = zk.ExistsW("/test")417 stat, watch, err = zk.ExistsW("/test")
416 c.Assert(err, IsNil)418 c.Assert(err, IsNil)
417 c.Assert(stat, NotNil)419 c.Assert(stat, NotNil)
418 c.Assert(stat.NumChildren(), Equals, int32(0))420 c.Assert(stat.NumChildren(), Equals, int32(0))
419421
420 c.Check(gozk.CountPendingWatches(), Equals, 2)422 c.Check(zookeeper.CountPendingWatches(), Equals, 2)
421}423}
422424
423func (s *S) TestExistsAndWatchWithError(c *C) {425func (s *S) TestExistsAndWatchWithError(c *C) {
424 c.Check(gozk.CountPendingWatches(), Equals, 0)426 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
425427
426 zk, _ := s.init(c)428 zk, _ := s.init(c)
427429
428 c.Check(gozk.CountPendingWatches(), Equals, 1)430 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
429431
430 stat, watch, err := zk.ExistsW("///")432 stat, watch, err := zk.ExistsW("///")
431 c.Assert(err, NotNil)433 c.Assert(err, NotNil)
432 c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)434 c.Assert(err, Equals, zookeeper.ZBADARGUMENTS)
433 c.Assert(stat, IsNil)435 c.Assert(stat, IsNil)
434 c.Assert(watch, IsNil)436 c.Assert(watch, IsNil)
435437
436 c.Check(gozk.CountPendingWatches(), Equals, 1)438 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
437}439}
438440
439func (s *S) TestDelete(c *C) {441func (s *S) TestDelete(c *C) {
440 zk, _ := s.init(c)442 zk, _ := s.init(c)
441443
442 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))444 _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
443 c.Assert(err, IsNil)445 c.Assert(err, IsNil)
444446
445 err = zk.Delete("/test", 5)447 err = zk.Delete("/test", 5)
446 c.Assert(err, NotNil)448 c.Assert(err, NotNil)
447 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)449 c.Assert(err, Equals, zookeeper.ZBADVERSION)
448450
449 err = zk.Delete("/test", -1)451 err = zk.Delete("/test", -1)
450 c.Assert(err, IsNil)452 c.Assert(err, IsNil)
451453
452 err = zk.Delete("/test", -1)454 err = zk.Delete("/test", -1)
453 c.Assert(err, NotNil)455 c.Assert(err, NotNil)
454 c.Assert(err.Code(), Equals, gozk.ZNONODE)456 c.Assert(err, Equals, zookeeper.ZNONODE)
455}457}
456458
457func (s *S) TestClientIdAndReInit(c *C) {459func (s *S) TestClientIdAndReInit(c *C) {
458 zk1, _ := s.init(c)460 zk1, _ := s.init(c)
459 clientId1 := zk1.ClientId()461 clientId1 := zk1.ClientId()
460462
461 zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)463 zk2, _, err := zookeeper.Redial(s.zkAddr, 5e9, clientId1)
462 c.Assert(err, IsNil)464 c.Assert(err, IsNil)
463 defer zk2.Close()465 defer zk2.Close()
464 clientId2 := zk2.ClientId()466 clientId2 := zk2.ClientId()
@@ -471,7 +473,7 @@
471func (s *S) TestExistsWatchOnDataChange(c *C) {473func (s *S) TestExistsWatchOnDataChange(c *C) {
472 zk, _ := s.init(c)474 zk, _ := s.init(c)
473475
474 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))476 _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
475 c.Assert(err, IsNil)477 c.Assert(err, IsNil)
476478
477 _, watch, err := zk.ExistsW("/test")479 _, watch, err := zk.ExistsW("/test")
@@ -483,24 +485,24 @@
483 event := <-watch485 event := <-watch
484486
485 c.Assert(event.Path, Equals, "/test")487 c.Assert(event.Path, Equals, "/test")
486 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)488 c.Assert(event.Type, Equals, zookeeper.EVENT_CHANGED)
487}489}
488490
489func (s *S) TestACL(c *C) {491func (s *S) TestACL(c *C) {
490 zk, _ := s.init(c)492 zk, _ := s.init(c)
491493
492 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))494 _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
493 c.Assert(err, IsNil)495 c.Assert(err, IsNil)
494496
495 acl, stat, err := zk.ACL("/test")497 acl, stat, err := zk.ACL("/test")
496 c.Assert(err, IsNil)498 c.Assert(err, IsNil)
497 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))499 c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_ALL))
498 c.Assert(stat, NotNil)500 c.Assert(stat, NotNil)
499 c.Assert(stat.Version(), Equals, int32(0))501 c.Assert(stat.Version(), Equals, int32(0))
500502
501 acl, stat, err = zk.ACL("/non-existent")503 acl, stat, err = zk.ACL("/non-existent")
502 c.Assert(err, NotNil)504 c.Assert(err, NotNil)
503 c.Assert(err.Code(), Equals, gozk.ZNONODE)505 c.Assert(err, Equals, zookeeper.ZNONODE)
504 c.Assert(acl, IsNil)506 c.Assert(acl, IsNil)
505 c.Assert(stat, IsNil)507 c.Assert(stat, IsNil)
506}508}
@@ -508,32 +510,32 @@
508func (s *S) TestSetACL(c *C) {510func (s *S) TestSetACL(c *C) {
509 zk, _ := s.init(c)511 zk, _ := s.init(c)
510512
511 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))513 _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
512 c.Assert(err, IsNil)514 c.Assert(err, IsNil)
513515
514 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)516 err = zk.SetACL("/test", zookeeper.WorldACL(zookeeper.PERM_ALL), 5)
515 c.Assert(err, NotNil)517 c.Assert(err, NotNil)
516 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)518 c.Assert(err, Equals, zookeeper.ZBADVERSION)
517519
518 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)520 err = zk.SetACL("/test", zookeeper.WorldACL(zookeeper.PERM_READ), -1)
519 c.Assert(err, IsNil)521 c.Assert(err, IsNil)
520522
521 acl, _, err := zk.ACL("/test")523 acl, _, err := zk.ACL("/test")
522 c.Assert(err, IsNil)524 c.Assert(err, IsNil)
523 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))525 c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_READ))
524}526}
525527
526func (s *S) TestAddAuth(c *C) {528func (s *S) TestAddAuth(c *C) {
527 zk, _ := s.init(c)529 zk, _ := s.init(c)
528530
529 acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}531 acl := []zookeeper.ACL{{zookeeper.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
530532
531 _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)533 _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, acl)
532 c.Assert(err, IsNil)534 c.Assert(err, IsNil)
533535
534 _, _, err = zk.Get("/test")536 _, _, err = zk.Get("/test")
535 c.Assert(err, NotNil)537 c.Assert(err, NotNil)
536 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)538 c.Assert(err, Equals, zookeeper.ZNOAUTH)
537539
538 err = zk.AddAuth("digest", "joe:passwd")540 err = zk.AddAuth("digest", "joe:passwd")
539 c.Assert(err, IsNil)541 c.Assert(err, IsNil)
@@ -543,36 +545,36 @@
543}545}
544546
545func (s *S) TestWatchOnReconnection(c *C) {547func (s *S) TestWatchOnReconnection(c *C) {
546 c.Check(gozk.CountPendingWatches(), Equals, 0)548 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
547549
548 zk, session := s.init(c)550 zk, session := s.init(c)
549551
550 event := <-session552 event := <-session
551 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)553 c.Assert(event.Type, Equals, zookeeper.EVENT_SESSION)
552 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)554 c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED)
553555
554 c.Check(gozk.CountPendingWatches(), Equals, 1)556 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
555557
556 stat, watch, err := zk.ExistsW("/test")558 stat, watch, err := zk.ExistsW("/test")
557 c.Assert(err, IsNil)559 c.Assert(err, IsNil)
558 c.Assert(stat, IsNil)560 c.Assert(stat, IsNil)
559561
560 c.Check(gozk.CountPendingWatches(), Equals, 2)562 c.Check(zookeeper.CountPendingWatches(), Equals, 2)
561563
562 s.StopZK()564 s.StopZK()
563 time.Sleep(2e9)565 time.Sleep(2e9)
564 s.StartZK()566 s.StartZK(c)
565567
566 // The session channel should receive the reconnection notification,568 // The session channel should receive the reconnection notification,
567 select {569 select {
568 case event := <-session:570 case event := <-session:
569 c.Assert(event.State, Equals, gozk.STATE_CONNECTING)571 c.Assert(event.State, Equals, zookeeper.STATE_CONNECTING)
570 case <-time.After(3e9):572 case <-time.After(3e9):
571 c.Fatal("Session watch didn't fire")573 c.Fatal("Session watch didn't fire")
572 }574 }
573 select {575 select {
574 case event := <-session:576 case event := <-session:
575 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)577 c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED)
576 case <-time.After(3e9):578 case <-time.After(3e9):
577 c.Fatal("Session watch didn't fire")579 c.Fatal("Session watch didn't fire")
578 }580 }
@@ -585,40 +587,40 @@
585 }587 }
586588
587 // And it should still work.589 // And it should still work.
588 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))590 _, err = zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL))
589 c.Assert(err, IsNil)591 c.Assert(err, IsNil)
590592
591 event = <-watch593 event = <-watch
592 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)594 c.Assert(event.Type, Equals, zookeeper.EVENT_CREATED)
593 c.Assert(event.Path, Equals, "/test")595 c.Assert(event.Path, Equals, "/test")
594596
595 c.Check(gozk.CountPendingWatches(), Equals, 1)597 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
596}598}
597599
598func (s *S) TestWatchOnSessionExpiration(c *C) {600func (s *S) TestWatchOnSessionExpiration(c *C) {
599 c.Check(gozk.CountPendingWatches(), Equals, 0)601 c.Check(zookeeper.CountPendingWatches(), Equals, 0)
600602
601 zk, session := s.init(c)603 zk, session := s.init(c)
602604
603 event := <-session605 event := <-session
604 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)606 c.Assert(event.Type, Equals, zookeeper.EVENT_SESSION)
605 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)607 c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED)
606608
607 c.Check(gozk.CountPendingWatches(), Equals, 1)609 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
608610
609 stat, watch, err := zk.ExistsW("/test")611 stat, watch, err := zk.ExistsW("/test")
610 c.Assert(err, IsNil)612 c.Assert(err, IsNil)
611 c.Assert(stat, IsNil)613 c.Assert(stat, IsNil)
612614
613 c.Check(gozk.CountPendingWatches(), Equals, 2)615 c.Check(zookeeper.CountPendingWatches(), Equals, 2)
614616
615 // Use expiration trick described in the FAQ.617 // Use expiration trick described in the FAQ.
616 clientId := zk.ClientId()618 clientId := zk.ClientId()
617 zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)619 zk2, session2, err := zookeeper.Redial(s.zkAddr, 5e9, clientId)
618620
619 for event := range session2 {621 for event := range session2 {
620 c.Log("Event from overlapping session: ", event)622 c.Log("Event from overlapping session: ", event)
621 if event.State == gozk.STATE_CONNECTED {623 if event.State == zookeeper.STATE_CONNECTED {
622 // Wait for zk to process the connection.624 // Wait for zk to process the connection.
623 // Not reliable without this. :-(625 // Not reliable without this. :-(
624 time.Sleep(1e9)626 time.Sleep(1e9)
@@ -627,21 +629,21 @@
627 }629 }
628 for event := range session {630 for event := range session {
629 c.Log("Event from primary session: ", event)631 c.Log("Event from primary session: ", event)
630 if event.State == gozk.STATE_EXPIRED_SESSION {632 if event.State == zookeeper.STATE_EXPIRED_SESSION {
631 break633 break
632 }634 }
633 }635 }
634636
635 select {637 select {
636 case event := <-watch:638 case event := <-watch:
637 c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)639 c.Assert(event.State, Equals, zookeeper.STATE_EXPIRED_SESSION)
638 case <-time.After(3e9):640 case <-time.After(3e9):
639 c.Fatal("Watch event didn't fire")641 c.Fatal("Watch event didn't fire")
640 }642 }
641643
642 event = <-watch644 event = <-watch
643 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)645 c.Assert(event.Type, Equals, zookeeper.EVENT_CLOSED)
644 c.Assert(event.State, Equals, gozk.STATE_CLOSED)646 c.Assert(event.State, Equals, zookeeper.STATE_CLOSED)
645647
646 c.Check(gozk.CountPendingWatches(), Equals, 1)648 c.Check(zookeeper.CountPendingWatches(), Equals, 1)
647}649}

Subscribers

People subscribed via source and target branches

to all changes: