Merge lp:~rogpeppe/gozk/update-event-interface into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Work in progress
Proposed branch: lp:~rogpeppe/gozk/update-event-interface
Merge into: lp:~juju/gozk/trunk
Diff against target: 3594 lines (+1527/-1001)
12 files modified
Makefile (+6/-2)
example/example.go (+22/-23)
helpers.c (+2/-2)
reattach_test.go (+202/-0)
retry_test.go (+65/-74)
server.go (+239/-0)
service/Makefile (+20/-0)
service/service.go (+160/-0)
status.go (+116/-0)
suite_test.go (+56/-138)
zk.go (+381/-489)
zk_test.go (+258/-273)
To merge this branch: bzr merge lp:~rogpeppe/gozk/update-event-interface
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Disapprove
Review via email: mp+77560@code.launchpad.net

Description of the change

Updated event interface and factored-out server-starting interface.

Event interface
-----------

The various functions returning event channels are changed
to return channels that more accurately reflect the kinds
of notifications that can happen on those channels.
This makes programming with the wait channels less error-prone
(no need worry about the wrong kind of event arriving
on a wait channel) and more self-documenting.

The Event type is removed:
- the Type field is now redundant, as only one kind of event can arrive through any given channel.
- the Path field was always redundant, as it is always the same as the node that is being watched.
- the value of the State field is now sent to the session watcher.

When the server goes down, watch channels are simply closed.
I can't think of any scenario when a node watcher will do something
differently depending on how the server has failed, and if it wants
to, it can always interrogate whatever is watching the session notifications.

Likely points of controversy:

- the spelling of the new constants. I chose mixed case rather than
all-caps - YMMV.

- i'd like to think of a better spelling for event_CREATED etc,
now that those constants no longer need exporting.

- use of explicit zero rather than having a zero constant defined
for each type. I *think* that it's less confusing and simpler this
way than having NodeClosed (NodeDown), ChildClosed etc.
Can a session ever go down permanently?

Server-starting interface
-------------------

I realised that server.go has increased significantly in size and
that all the new code was logically independent of ZooKeeper itself.
This inspired me to factor out the code into a new package (currently
gozk/zk/service which will potentially enable similar server packages
at little cost. There are a number of other enhancements that can now
be made to this package without increasing the zk package's complexity.

I think the new packaging works well, but it's arguable whether zookeeper
clients should use the service package directly or whether zk should
return *service.Service itself.

One specific advantage of the former (in addition to reusability and
nicer modularity) is that zookeeper clients can query or set aspects
of the zk.Server; the advantage of the latter is that it adds a line of
code and requires clients to know about another package.

YMMV as always.

To post a comment you must log in.
21. By Roger Peppe

Rewrite Event comment.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

2692 - event := Event{
2693 - Type: int(data.event_type),
2694 - Path: C.GoString(data.event_path),
2695 - State: int(data.connection_state),
2696 - }

The key problem with this proposal is the trashing of information coming
from the server. It's discarding not only why e.g. a state is being
trashed, but also details like the path which is related to an event.
This path isn't just being returned from the path that called the watch.
It's actually part of the wire protocol, and comes from the server side.

Trashing both the session state error information and the paths like that
is not a good idea.

review: Disapprove
22. By Roger Peppe

merged ChildStatus into NodeStatus.

23. By Roger Peppe

gofmt

24. By Roger Peppe

Fixed comments. Added LOG_NONE.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

On 29 September 2011 18:56, Gustavo Niemeyer <email address hidden> wrote:
> Review: Disapprove
>
>
> 2692    -               event := Event{
> 2693    -                       Type:  int(data.event_type),
> 2694    -                       Path:  C.GoString(data.event_path),
> 2695    -                       State: int(data.connection_state),
> 2696    -               }
>
> The key problem with this proposal is the trashing of information coming
> from the server. It's discarding not only why e.g. a state is being
> trashed, but also details like the path which is related to an event.
> This path isn't just being returned from the path that called the watch.
> It's actually part of the wire protocol, and comes from the server side.
>
> Trashing both the session state error information and the paths like that
> is not a good idea.

there is no information in the paths. they are always the same as the path
of the request. if this ever changes, then i'd fully support adding the path,
but as it is Event.Path gives a misleading impression (for instance that Path
might name a child that has been added).

we are trashing no information from the server. if we are then we
should make sure that we panic immediately because the server is doing
something unexpected. [i've just added the requisite check].
the exact specification, that is, what callbacks may be made for a given
API call, is unclear because explanation seems to be omitted in
the ZooKeeper manual itself. i think we owe it to gozk's users to provide
a better defined interface, as you will need to know the details of
the interface
to use the API, and so people will rely on the undocumented behaviour
anyway.

PTAL. the latest version has only two kind of events - node changed events
and session status events. i really think this interface makes sense.

please reconsider.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Let's have a single thread on this topic, please.

As I answered to your email:

>> The point isn't C or Go.. the issue is simply that there's real
>> information coming through the pipe being trashed in the suggested
>> interface.
>
> there really isn't.

The protocol provides three details to the watch. Two of them are
being dropped on the floor in your branch and not reaching the event
listener. If you disagree with this we wont' be able to agree on
anything else around that change.

Unmerged revisions

24. By Roger Peppe

Fixed comments. Added LOG_NONE.

23. By Roger Peppe

gofmt

22. By Roger Peppe

merged ChildStatus into NodeStatus.

21. By Roger Peppe

Rewrite Event comment.

20. By Roger Peppe

merged changes to rename branch.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile'
--- Makefile 2011-08-03 01:56:58 +0000
+++ Makefile 2011-09-30 13:35:26 +0000
@@ -2,10 +2,14 @@
22
3all: package3all: package
44
5TARG=gozk5TARG=launchpad.net/gozk/zk
66
7GOFILES=\
8 server.go\
9 status.go\
10
7CGOFILES=\11CGOFILES=\
8 gozk.go\12 zk.go\
913
10CGO_OFILES=\14CGO_OFILES=\
11 helpers.o\15 helpers.o\
1216
=== added directory 'example'
=== renamed file 'example.go' => 'example/example.go'
--- example.go 2011-08-03 01:47:25 +0000
+++ example/example.go 2011-09-30 13:35:26 +0000
@@ -1,30 +1,29 @@
1package main1package main
22
3import (3import (
4 "gozk"4 "launchpad.net/gozk/zk"
5)5)
66
7func main() {7func main() {
8 zk, session, err := gozk.Init("localhost:2181", 5000)8 conn, session, err := zk.Dial("localhost:2181", 5e9)
9 if err != nil {9 if err != nil {
10 println("Couldn't connect: " + err.String())10 println("Couldn't connect: " + err.String())
11 return11 return
12 }12 }
1313
14 defer zk.Close()14 defer conn.Close()
1515
16 // Wait for connection.16 // Wait for connection.
17 event := <-session17 event := <-session
18 if event.State != gozk.STATE_CONNECTED {18 if event != zk.SessionConnected {
19 println("Couldn't connect")19 println("Couldn't connect")
20 return20 return
21 }21 }
2222
23 _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))23 _, err = conn.Create("/counter", "0", 0, zk.WorldACL(zk.PERM_ALL))
24 if err != nil {24 if err != nil {
25 println(err.String())25 println(err.String())
26 } else {26 } else {
27 println("Created!")27 println("Created!")
28 }28 }
29}29}
30
3130
=== modified file 'helpers.c'
--- helpers.c 2011-01-11 12:58:26 +0000
+++ helpers.c 2011-09-30 13:35:26 +0000
@@ -4,7 +4,6 @@
4#include <string.h>4#include <string.h>
5#include "helpers.h"5#include "helpers.h"
66
7
8static pthread_mutex_t watch_mutex = PTHREAD_MUTEX_INITIALIZER;7static pthread_mutex_t watch_mutex = PTHREAD_MUTEX_INITIALIZER;
9static pthread_cond_t watch_available = PTHREAD_COND_INITIALIZER;8static pthread_cond_t watch_available = PTHREAD_COND_INITIALIZER;
109
@@ -38,8 +37,9 @@
38 pthread_mutex_lock(&watch_mutex);37 pthread_mutex_lock(&watch_mutex);
39 {38 {
40 watch_data *data = malloc(sizeof(watch_data)); // XXX Check data.39 watch_data *data = malloc(sizeof(watch_data)); // XXX Check data.
40
41 data->event_type = event_type;
41 data->connection_state = connection_state;42 data->connection_state = connection_state;
42 data->event_type = event_type;
43 data->event_path = strdup(event_path); // XXX Check event_path.43 data->event_path = strdup(event_path); // XXX Check event_path.
44 data->watch_context = watch_context;44 data->watch_context = watch_context;
45 data->next = NULL;45 data->next = NULL;
4646
=== added file 'reattach_test.go'
--- reattach_test.go 1970-01-01 00:00:00 +0000
+++ reattach_test.go 2011-09-30 13:35:26 +0000
@@ -0,0 +1,202 @@
1package zk_test
2
3import (
4 "bufio"
5 . "launchpad.net/gocheck"
6 "launchpad.net/gozk/zk/service"
7 "launchpad.net/gozk/zk"
8 "exec"
9 "flag"
10 "fmt"
11 "os"
12 "strings"
13 "testing"
14 "time"
15)
16
17var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing")
18var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing")
19var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing")
20
21// This is the reentrancy point for testing ZooKeeper servers
22// started by processes that are not direct children of the
23// testing process. This test always succeeds - the status
24// will be written to stdout and read by indirectServer.
25func TestStartNonChildServer(t *testing.T) {
26 if !*reattach {
27 // not re-entrant, so ignore this test.
28 return
29 }
30 err := startServer(*reattachRunDir, *reattachAbnormalStop)
31 if err != nil {
32 fmt.Printf("error:%v\n", err)
33 return
34 }
35 fmt.Printf("done\n")
36}
37
38func (s *S) startServer(c *C, abort bool) {
39 err := startServer(s.zkTestRoot, abort)
40 c.Assert(err, IsNil)
41}
42
43// startServerIndirect starts a ZooKeeper server that is not
44// a direct child of the current process. If abort is true,
45// the server will be started and then terminated abnormally.
46func (s *S) startServerIndirect(c *C, abort bool) {
47 if len(os.Args) == 0 {
48 c.Fatal("Cannot find self executable name")
49 }
50 cmd := exec.Command(
51 os.Args[0],
52 "-zktest.reattach",
53 "-zktest.rundir", s.zkTestRoot,
54 "-zktest.stop=", fmt.Sprint(abort),
55 "-test.run", "StartNonChildServer",
56 )
57 r, err := cmd.StdoutPipe()
58 c.Assert(err, IsNil)
59 defer r.Close()
60 if err := cmd.Start(); err != nil {
61 c.Fatalf("cannot start re-entrant gotest process: %v", err)
62 }
63 defer cmd.Wait()
64 bio := bufio.NewReader(r)
65 for {
66 line, err := bio.ReadSlice('\n')
67 if err != nil {
68 c.Fatalf("indirect server status line not found: %v", err)
69 }
70 s := string(line)
71 if strings.HasPrefix(s, "error:") {
72 c.Fatalf("indirect server error: %s", s[len("error:"):])
73 }
74 if s == "done\n" {
75 return
76 }
77 }
78 panic("not reached")
79}
80
81// startServer starts a ZooKeeper server, and terminates it abnormally
82// if abort is true.
83func startServer(runDir string, abort bool) os.Error {
84 s, err := zk.AttachServer(runDir)
85 if err != nil {
86 return fmt.Errorf("cannot attach to server at %q: %v", runDir, err)
87 }
88 srv := service.New(s)
89 if err := srv.Start(); err != nil {
90 return fmt.Errorf("cannot start server: %v", err)
91 }
92 if abort {
93 // Give it time to start up, then kill the server process abnormally,
94 // leaving the pid.txt file behind.
95 time.Sleep(0.5e9)
96 p, err := srv.Process()
97 if err != nil {
98 return fmt.Errorf("cannot get server process: %v", err)
99 }
100 defer p.Release()
101 if err := p.Kill(); err != nil {
102 return fmt.Errorf("cannot kill server process: %v", err)
103 }
104 }
105 return nil
106}
107
108func (s *S) checkCookie(c *C) {
109 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
110 c.Assert(err, IsNil)
111
112 e, ok := <-watch
113 c.Assert(ok, Equals, true)
114 c.Assert(e.Ok(), Equals, true)
115
116 c.Assert(err, IsNil)
117 cookie, _, err := conn.Get("/testAttachCookie")
118 c.Assert(err, IsNil)
119 c.Assert(cookie, Equals, "testAttachCookie")
120 conn.Close()
121}
122
123// cases to test:
124// child server, stopped normally; reattach, start
125// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
126// non-direct child server, still running; reattach, start (->error), stop, start
127// child server, still running; reattach, start (-> error)
128// child server, still running; reattach, stop, start.
129// non-direct child server, still running; reattach, stop, start.
130func (s *S) TestAttachServer(c *C) {
131 // Create a cookie so that we know we are reattaching to the same instance.
132 conn, _ := s.init(c)
133 _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL))
134 c.Assert(err, IsNil)
135 s.checkCookie(c)
136 s.zkServer.Stop()
137 s.zkServer = nil
138
139 s.testAttachServer(c, (*S).startServer)
140 s.testAttachServer(c, (*S).startServerIndirect)
141 s.testAttachServerAbnormalTerminate(c, (*S).startServer)
142 s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect)
143
144 srv, err := zk.AttachServer(s.zkTestRoot)
145 c.Assert(err, IsNil)
146
147 s.zkServer = service.New(srv)
148 err = s.zkServer.Start()
149 c.Assert(err, IsNil)
150
151 conn, _ = s.init(c)
152 err = conn.Delete("/testAttachCookie", -1)
153 c.Assert(err, IsNil)
154}
155
156func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) {
157 start(s, c, false)
158
159 s.checkCookie(c)
160
161 // try attaching to it while it is still running - it should fail.
162 a, err := zk.AttachServer(s.zkTestRoot)
163 c.Assert(err, IsNil)
164 srv := service.New(a)
165
166 err = srv.Start()
167 c.Assert(err, NotNil)
168
169 // stop it and then start it again - it should succeed.
170 err = srv.Stop()
171 c.Assert(err, IsNil)
172
173 err = srv.Start()
174 c.Assert(err, IsNil)
175
176 s.checkCookie(c)
177
178 err = srv.Stop()
179 c.Assert(err, IsNil)
180}
181
182func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) {
183 start(s, c, true)
184
185 // try attaching to it and starting - it should fail, because pid.txt
186 // won't have been removed.
187 a, err := zk.AttachServer(s.zkTestRoot)
188 c.Assert(err, IsNil)
189 srv := service.New(a)
190 err = srv.Start()
191 c.Assert(err, NotNil)
192
193 // stopping it should bring things back to normal.
194 err = srv.Stop()
195 c.Assert(err, IsNil)
196 err = srv.Start()
197 c.Assert(err, IsNil)
198
199 s.checkCookie(c)
200 err = srv.Stop()
201 c.Assert(err, IsNil)
202}
0203
=== modified file 'retry_test.go'
--- retry_test.go 2011-08-19 01:51:37 +0000
+++ retry_test.go 2011-09-30 13:35:26 +0000
@@ -1,42 +1,41 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/gozk/zk"
6 "os"6 "os"
7)7)
88
9func (s *S) TestRetryChangeCreating(c *C) {9func (s *S) TestRetryChangeCreating(c *C) {
10 zk, _ := s.init(c)10 conn, _ := s.init(c)
1111
12 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),12 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
13 func(data string, stat gozk.Stat) (string, os.Error) {13 func(data string, stat *zk.Stat) (string, os.Error) {
14 c.Assert(data, Equals, "")14 c.Assert(data, Equals, "")
15 c.Assert(stat, IsNil)15 c.Assert(stat, IsNil)
16 return "new", nil16 return "new", nil
17 })17 })
18 c.Assert(err, IsNil)18 c.Assert(err, IsNil)
1919
20 data, stat, err := zk.Get("/test")20 data, stat, err := conn.Get("/test")
21 c.Assert(err, IsNil)21 c.Assert(err, IsNil)
22 c.Assert(stat, NotNil)22 c.Assert(stat, NotNil)
23 c.Assert(stat.Version(), Equals, int32(0))23 c.Assert(stat.Version(), Equals, int32(0))
24 c.Assert(data, Equals, "new")24 c.Assert(data, Equals, "new")
2525
26 acl, _, err := zk.ACL("/test")26 acl, _, err := conn.ACL("/test")
27 c.Assert(err, IsNil)27 c.Assert(err, IsNil)
28 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))28 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
29}29}
3030
31func (s *S) TestRetryChangeSetting(c *C) {31func (s *S) TestRetryChangeSetting(c *C) {
32 zk, _ := s.init(c)32 conn, _ := s.init(c)
3333
34 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,34 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
35 gozk.WorldACL(gozk.PERM_ALL))
36 c.Assert(err, IsNil)35 c.Assert(err, IsNil)
3736
38 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},37 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
39 func(data string, stat gozk.Stat) (string, os.Error) {38 func(data string, stat *zk.Stat) (string, os.Error) {
40 c.Assert(data, Equals, "old")39 c.Assert(data, Equals, "old")
41 c.Assert(stat, NotNil)40 c.Assert(stat, NotNil)
42 c.Assert(stat.Version(), Equals, int32(0))41 c.Assert(stat.Version(), Equals, int32(0))
@@ -44,27 +43,26 @@
44 })43 })
45 c.Assert(err, IsNil)44 c.Assert(err, IsNil)
4645
47 data, stat, err := zk.Get("/test")46 data, stat, err := conn.Get("/test")
48 c.Assert(err, IsNil)47 c.Assert(err, IsNil)
49 c.Assert(stat, NotNil)48 c.Assert(stat, NotNil)
50 c.Assert(stat.Version(), Equals, int32(1))49 c.Assert(stat.Version(), Equals, int32(1))
51 c.Assert(data, Equals, "brand new")50 c.Assert(data, Equals, "brand new")
5251
53 // ACL was unchanged by RetryChange().52 // ACL was unchanged by RetryChange().
54 acl, _, err := zk.ACL("/test")53 acl, _, err := conn.ACL("/test")
55 c.Assert(err, IsNil)54 c.Assert(err, IsNil)
56 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))55 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
57}56}
5857
59func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {58func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
60 zk, _ := s.init(c)59 conn, _ := s.init(c)
6160
62 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,61 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
63 gozk.WorldACL(gozk.PERM_ALL))
64 c.Assert(err, IsNil)62 c.Assert(err, IsNil)
6563
66 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},64 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
67 func(data string, stat gozk.Stat) (string, os.Error) {65 func(data string, stat *zk.Stat) (string, os.Error) {
68 c.Assert(data, Equals, "old")66 c.Assert(data, Equals, "old")
69 c.Assert(stat, NotNil)67 c.Assert(stat, NotNil)
70 c.Assert(stat.Version(), Equals, int32(0))68 c.Assert(stat.Version(), Equals, int32(0))
@@ -72,7 +70,7 @@
72 })70 })
73 c.Assert(err, IsNil)71 c.Assert(err, IsNil)
7472
75 data, stat, err := zk.Get("/test")73 data, stat, err := conn.Get("/test")
76 c.Assert(err, IsNil)74 c.Assert(err, IsNil)
77 c.Assert(stat, NotNil)75 c.Assert(stat, NotNil)
78 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!76 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!
@@ -80,14 +78,14 @@
80}78}
8179
82func (s *S) TestRetryChangeConflictOnCreate(c *C) {80func (s *S) TestRetryChangeConflictOnCreate(c *C) {
83 zk, _ := s.init(c)81 conn, _ := s.init(c)
8482
85 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {83 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
86 switch data {84 switch data {
87 case "":85 case "":
88 c.Assert(stat, IsNil)86 c.Assert(stat, IsNil)
89 _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,87 _, err := conn.Create("/test", "conflict", zk.EPHEMERAL,
90 gozk.WorldACL(gozk.PERM_ALL))88 zk.WorldACL(zk.PERM_ALL))
91 c.Assert(err, IsNil)89 c.Assert(err, IsNil)
92 return "<none> => conflict", nil90 return "<none> => conflict", nil
93 case "conflict":91 case "conflict":
@@ -100,11 +98,10 @@
100 return "can't happen", nil98 return "can't happen", nil
101 }99 }
102100
103 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),101 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc)
104 changeFunc)
105 c.Assert(err, IsNil)102 c.Assert(err, IsNil)
106103
107 data, stat, err := zk.Get("/test")104 data, stat, err := conn.Get("/test")
108 c.Assert(err, IsNil)105 c.Assert(err, IsNil)
109 c.Assert(data, Equals, "conflict => new")106 c.Assert(data, Equals, "conflict => new")
110 c.Assert(stat, NotNil)107 c.Assert(stat, NotNil)
@@ -112,18 +109,17 @@
112}109}
113110
114func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {111func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
115 zk, _ := s.init(c)112 conn, _ := s.init(c)
116113
117 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,114 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
118 gozk.WorldACL(gozk.PERM_ALL))
119 c.Assert(err, IsNil)115 c.Assert(err, IsNil)
120116
121 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {117 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
122 switch data {118 switch data {
123 case "old":119 case "old":
124 c.Assert(stat, NotNil)120 c.Assert(stat, NotNil)
125 c.Assert(stat.Version(), Equals, int32(0))121 c.Assert(stat.Version(), Equals, int32(0))
126 _, err := zk.Set("/test", "conflict", 0)122 _, err := conn.Set("/test", "conflict", 0)
127 c.Assert(err, IsNil)123 c.Assert(err, IsNil)
128 return "old => new", nil124 return "old => new", nil
129 case "conflict":125 case "conflict":
@@ -136,10 +132,10 @@
136 return "can't happen", nil132 return "can't happen", nil
137 }133 }
138134
139 err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)135 err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc)
140 c.Assert(err, IsNil)136 c.Assert(err, IsNil)
141137
142 data, stat, err := zk.Get("/test")138 data, stat, err := conn.Get("/test")
143 c.Assert(err, IsNil)139 c.Assert(err, IsNil)
144 c.Assert(data, Equals, "conflict => new")140 c.Assert(data, Equals, "conflict => new")
145 c.Assert(stat, NotNil)141 c.Assert(stat, NotNil)
@@ -147,18 +143,17 @@
147}143}
148144
149func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {145func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
150 zk, _ := s.init(c)146 conn, _ := s.init(c)
151147
152 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,148 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
153 gozk.WorldACL(gozk.PERM_ALL))
154 c.Assert(err, IsNil)149 c.Assert(err, IsNil)
155150
156 changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {151 changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
157 switch data {152 switch data {
158 case "old":153 case "old":
159 c.Assert(stat, NotNil)154 c.Assert(stat, NotNil)
160 c.Assert(stat.Version(), Equals, int32(0))155 c.Assert(stat.Version(), Equals, int32(0))
161 err := zk.Delete("/test", 0)156 err := conn.Delete("/test", 0)
162 c.Assert(err, IsNil)157 c.Assert(err, IsNil)
163 return "old => <deleted>", nil158 return "old => <deleted>", nil
164 case "":159 case "":
@@ -170,55 +165,53 @@
170 return "can't happen", nil165 return "can't happen", nil
171 }166 }
172167
173 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),168 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc)
174 changeFunc)
175 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
176170
177 data, stat, err := zk.Get("/test")171 data, stat, err := conn.Get("/test")
178 c.Assert(err, IsNil)172 c.Assert(err, IsNil)
179 c.Assert(data, Equals, "<deleted> => new")173 c.Assert(data, Equals, "<deleted> => new")
180 c.Assert(stat, NotNil)174 c.Assert(stat, NotNil)
181 c.Assert(stat.Version(), Equals, int32(0))175 c.Assert(stat.Version(), Equals, int32(0))
182176
183 // Should be the new ACL.177 // Should be the new ACL.
184 acl, _, err := zk.ACL("/test")178 acl, _, err := conn.ACL("/test")
185 c.Assert(err, IsNil)179 c.Assert(err, IsNil)
186 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))180 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
187}181}
188182
189func (s *S) TestRetryChangeErrorInCallback(c *C) {183func (s *S) TestRetryChangeErrorInCallback(c *C) {
190 zk, _ := s.init(c)184 conn, _ := s.init(c)
191185
192 err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),186 err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
193 func(data string, stat gozk.Stat) (string, os.Error) {187 func(data string, stat *zk.Stat) (string, os.Error) {
194 return "don't use this", os.NewError("BOOM!")188 return "don't use this", os.NewError("BOOM!")
195 })189 })
196 c.Assert(err, NotNil)190 c.Assert(err, NotNil)
197 c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
198 c.Assert(err.String(), Equals, "BOOM!")191 c.Assert(err.String(), Equals, "BOOM!")
199192
200 stat, err := zk.Exists("/test")193 stat, err := conn.Exists("/test")
201 c.Assert(err, IsNil)194 c.Assert(err, IsNil)
202 c.Assert(stat, IsNil)195 c.Assert(stat, IsNil)
203}196}
204197
205func (s *S) TestRetryChangeFailsReading(c *C) {198func (s *S) TestRetryChangeFailsReading(c *C) {
206 zk, _ := s.init(c)199 conn, _ := s.init(c)
207200
208 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,201 // Write only!
209 gozk.WorldACL(gozk.PERM_WRITE)) // Write only!202 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE))
210 c.Assert(err, IsNil)203 c.Assert(err, IsNil)
211204
212 var called bool205 var called bool
213 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),206 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
214 func(data string, stat gozk.Stat) (string, os.Error) {207 func(data string, stat *zk.Stat) (string, os.Error) {
215 called = true208 called = true
216 return "", nil209 return "", nil
217 })210 })
218 c.Assert(err, NotNil)211 c.Assert(err, NotNil)
219 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)212 c.Assert(err, Equals, zk.ZNOAUTH)
220213
221 stat, err := zk.Exists("/test")214 stat, err := conn.Exists("/test")
222 c.Assert(err, IsNil)215 c.Assert(err, IsNil)
223 c.Assert(stat, NotNil)216 c.Assert(stat, NotNil)
224 c.Assert(stat.Version(), Equals, int32(0))217 c.Assert(stat.Version(), Equals, int32(0))
@@ -227,22 +220,21 @@
227}220}
228221
229func (s *S) TestRetryChangeFailsSetting(c *C) {222func (s *S) TestRetryChangeFailsSetting(c *C) {
230 zk, _ := s.init(c)223 conn, _ := s.init(c)
231224
232 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,225 // Read only!
233 gozk.WorldACL(gozk.PERM_READ)) // Read only!226 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
234 c.Assert(err, IsNil)227 c.Assert(err, IsNil)
235228
236 var called bool229 var called bool
237 err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),230 err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
238 func(data string, stat gozk.Stat) (string, os.Error) {231 func(data string, stat *zk.Stat) (string, os.Error) {
239 called = true232 called = true
240 return "", nil233 return "", nil
241 })234 })
242 c.Assert(err, NotNil)235 c.Assert(err, Equals, zk.ZNOAUTH)
243 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
244236
245 stat, err := zk.Exists("/test")237 stat, err := conn.Exists("/test")
246 c.Assert(err, IsNil)238 c.Assert(err, IsNil)
247 c.Assert(stat, NotNil)239 c.Assert(stat, NotNil)
248 c.Assert(stat.Version(), Equals, int32(0))240 c.Assert(stat.Version(), Equals, int32(0))
@@ -251,23 +243,22 @@
251}243}
252244
253func (s *S) TestRetryChangeFailsCreating(c *C) {245func (s *S) TestRetryChangeFailsCreating(c *C) {
254 zk, _ := s.init(c)246 conn, _ := s.init(c)
255247
256 _, err := zk.Create("/test", "old", gozk.EPHEMERAL,248 // Read only!
257 gozk.WorldACL(gozk.PERM_READ)) // Read only!249 _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
258 c.Assert(err, IsNil)250 c.Assert(err, IsNil)
259251
260 var called bool252 var called bool
261 err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,253 err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
262 gozk.WorldACL(gozk.PERM_ALL),254 func(data string, stat *zk.Stat) (string, os.Error) {
263 func(data string, stat gozk.Stat) (string, os.Error) {
264 called = true255 called = true
265 return "", nil256 return "", nil
266 })257 })
267 c.Assert(err, NotNil)258 c.Assert(err, NotNil)
268 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)259 c.Assert(err, Equals, zk.ZNOAUTH)
269260
270 stat, err := zk.Exists("/test/sub")261 stat, err := conn.Exists("/test/sub")
271 c.Assert(err, IsNil)262 c.Assert(err, IsNil)
272 c.Assert(stat, IsNil)263 c.Assert(stat, IsNil)
273264
274265
=== added file 'server.go'
--- server.go 1970-01-01 00:00:00 +0000
+++ server.go 2011-09-30 13:35:26 +0000
@@ -0,0 +1,239 @@
1package zk
2
3import (
4 "bufio"
5 "bytes"
6 "fmt"
7 "io/ioutil"
8 "net"
9 "os"
10 "path/filepath"
11 "strings"
12)
13
14// Server represents a ZooKeeper server, its data and configuration files.
15type Server struct {
16 runDir string
17 installDir string
18}
19
20func (srv *Server) Directory() string {
21 return srv.runDir
22}
23
24// CreateServer creates the directory runDir and sets up a ZooKeeper server
25// environment inside it. It is an error if runDir already exists.
26// The server will listen on the specified TCP port.
27//
28// The ZooKeeper installation directory is specified by installDir.
29// If this is empty, a system default will be used.
30//
31// CreateServer does not start the server - the *Server that
32// is returned can be used with the service package, for example:
33// see launchpad.net/gozk/zk/service.
34func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
35 if err := os.Mkdir(runDir, 0777); err != nil {
36 return nil, err
37 }
38 srv := &Server{runDir: runDir, installDir: installDir}
39 if err := srv.writeLog4JConfig(); err != nil {
40 return nil, err
41 }
42 if err := srv.writeZooKeeperConfig(port); err != nil {
43 return nil, err
44 }
45 if err := srv.writeInstallDir(); err != nil {
46 return nil, err
47 }
48 return srv, nil
49}
50
51// AttachServer creates a new ZooKeeper Server instance
52// to operate inside an existing run directory, runDir.
53// The directory must have been created with CreateServer.
54func AttachServer(runDir string) (*Server, os.Error) {
55 srv := &Server{runDir: runDir}
56 if err := srv.readInstallDir(); err != nil {
57 return nil, fmt.Errorf("cannot read server install directory: %v", err)
58 }
59 return srv, nil
60}
61
62func (srv *Server) path(name string) string {
63 return filepath.Join(srv.runDir, name)
64}
65
66func (srv *Server) CheckAvailability() os.Error {
67 port, err := srv.NetworkPort()
68 if err != nil {
69 return fmt.Errorf("cannot get network port: %v", err)
70 }
71 l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
72 if err != nil {
73 return fmt.Errorf("cannot listen on port %v: %v", port, err)
74 }
75 l.Close()
76 return nil
77}
78
79// ServerCommand returns the command used to start the
80// ZooKeeper server. It is provided for debugging and testing
81// purposes only.
82func (srv *Server) Command() ([]string, os.Error) {
83 cp, err := srv.classPath()
84 if err != nil {
85 return nil, fmt.Errorf("cannot get class path: %v", err)
86 }
87 return []string{
88 "java",
89 "-cp", strings.Join(cp, ":"),
90 "-Dzookeeper.root.logger=INFO,CONSOLE",
91 "-Dlog4j.configuration=file:" + srv.path("log4j.properties"),
92 "org.apache.zookeeper.server.quorum.QuorumPeerMain",
93 srv.path("zoo.cfg"),
94 }, nil
95}
96
97var log4jProperties = `
98log4j.rootLogger=INFO, CONSOLE
99log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
100log4j.appender.CONSOLE.Threshold=INFO
101log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
102log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
103`
104
105func (srv *Server) writeLog4JConfig() (err os.Error) {
106 return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666)
107}
108
109func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) {
110 return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf(
111 "tickTime=2000\n"+
112 "dataDir=%s\n"+
113 "clientPort=%d\n"+
114 "maxClientCnxns=500\n",
115 srv.runDir, port)), 0666)
116}
117
118// NetworkPort returns the TCP port number that
119// the server is configured for.
120func (srv *Server) NetworkPort() (int, os.Error) {
121 f, err := os.Open(srv.path("zoo.cfg"))
122 if err != nil {
123 return 0, err
124 }
125 r := bufio.NewReader(f)
126 for {
127 line, err := r.ReadSlice('\n')
128 if err != nil {
129 return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg"))
130 }
131 var port int
132 if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 {
133 return port, nil
134 }
135 }
136 panic("not reached")
137}
138
139func (srv *Server) writeInstallDir() os.Error {
140 return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)
141}
142
143func (srv *Server) readInstallDir() os.Error {
144 data, err := ioutil.ReadFile(srv.path("installdir.txt"))
145 if err != nil {
146 return err
147 }
148 if data[len(data)-1] == '\n' {
149 data = data[0 : len(data)-1]
150 }
151 srv.installDir = string(data)
152 return nil
153}
154
155func (srv *Server) classPath() ([]string, os.Error) {
156 dir := srv.installDir
157 if dir == "" {
158 return systemClassPath()
159 }
160 if err := checkDirectory(dir); err != nil {
161 return nil, err
162 }
163 // Two possibilities, as seen in zkEnv.sh:
164 // 1) locally built binaries (jars are in build directory)
165 // 2) release binaries
166 if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
167 dir = build
168 }
169 classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
170 if err != nil {
171 panic(fmt.Errorf("glob for jar files: %v", err))
172 }
173 more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
174 if err != nil {
175 panic(fmt.Errorf("glob for lib jar files: %v", err))
176 }
177
178 classPath = append(classPath, more...)
179 if len(classPath) == 0 {
180 return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
181 }
182 return classPath, nil
183}
184
185const zookeeperEnviron = "/etc/zookeeper/conf/environment"
186
187func systemClassPath() ([]string, os.Error) {
188 f, err := os.Open(zookeeperEnviron)
189 if f == nil {
190 return nil, err
191 }
192 r := bufio.NewReader(f)
193 for {
194 line, err := r.ReadSlice('\n')
195 if err != nil {
196 break
197 }
198 if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
199 continue
200 }
201
202 // remove variable and newline
203 path := string(line[len("CLASSPATH=") : len(line)-1])
204
205 // trim white space
206 path = strings.Trim(path, " \t\r")
207
208 // strip quotes
209 if path[0] == '"' {
210 path = path[1 : len(path)-1]
211 }
212
213 // split on :
214 classPath := strings.Split(path, ":")
215
216 // split off $ZOOCFGDIR
217 if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
218 classPath = classPath[1:]
219 }
220
221 if len(classPath) == 0 {
222 return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
223 }
224 return classPath, nil
225 }
226 return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
227}
228
229// checkDirectory returns an error if the given path
230// does not exist or is not a directory.
231func checkDirectory(path string) os.Error {
232 if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
233 if err == nil {
234 err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
235 }
236 return err
237 }
238 return nil
239}
0240
=== added directory 'service'
=== added file 'service/Makefile'
--- service/Makefile 1970-01-01 00:00:00 +0000
+++ service/Makefile 2011-09-30 13:35:26 +0000
@@ -0,0 +1,20 @@
1include $(GOROOT)/src/Make.inc
2
3TARG=launchpad.net/gozk/zk/service
4
5GOFILES=\
6 service.go\
7
8GOFMT=gofmt
9BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go))
10
11gofmt: $(BADFMT)
12 @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done
13
14ifneq ($(BADFMT),)
15ifneq ($(MAKECMDGOALS),gofmt)
16$(warning WARNING: make gofmt: $(BADFMT))
17endif
18endif
19
20include $(GOROOT)/src/Make.pkg
021
=== added file 'service/service.go'
--- service/service.go 1970-01-01 00:00:00 +0000
+++ service/service.go 2011-09-30 13:35:26 +0000
@@ -0,0 +1,160 @@
1// The service package provides support for long-running services.
2package service
3
4import (
5 "os"
6 "fmt"
7 "io/ioutil"
8 "strconv"
9 "path/filepath"
10 "exec"
11 "strings"
12 "time"
13)
14
15// Interface represents a single-process server.
16type Interface interface {
17 // Directory gives the path to the directory containing
18 // the server's configuration and data files. It is assumed that
19 // this exists and can be modified.
20 Directory() string
21
22 // CheckAvailability should return an error if resources
23 // required by the server are not available; for instance
24 // if the server requires a network port which is not free.
25 CheckAvailability() os.Error
26
27 // Command should return a command that can be
28 // used to run the server. The first element of the returned
29 // slice is the executable name; the rest of the elements are
30 // its arguments.
31 Command() ([]string, os.Error)
32}
33
34// Service represents a possibly running server process.
35type Service struct {
36 srv Interface
37}
38
39// New returns a new Service using the given
40// server interface.
41func New(srv Interface) *Service {
42 return &Service{srv}
43}
44
45// Process returns a reference to the identity
46// of the last running server in the Service's directory.
47// If the server was shut down, it returns (nil, nil).
48// The server process may not still be running
49// (for instance if it was terminated abnormally).
50// This function is provided for debugging and testing
51// purposes only.
52func (s *Service) Process() (*os.Process, os.Error) {
53 data, err := ioutil.ReadFile(s.path("pid.txt"))
54 if err != nil {
55 if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT {
56 return nil, nil
57 }
58 return nil, err
59 }
60 pid, err := strconv.Atoi(string(data))
61 if err != nil {
62 return nil, os.NewError("bad process id found in pid.txt")
63 }
64 return os.FindProcess(pid)
65}
66
67func (s *Service) path(name string) string {
68 return filepath.Join(s.srv.Directory(), name)
69}
70
71// Start starts the server. It returns an error if the server is already running.
72// It stores the process ID of the server in the file "pid.txt"
73// inside the server's directory. Stdout and stderr of the server
74// are similarly written to "log.txt".
75func (s *Service) Start() os.Error {
76 if err := s.srv.CheckAvailability(); err != nil {
77 return err
78 }
79 p, err := s.Process()
80 if p != nil || err != nil {
81 if p != nil {
82 p.Release()
83 }
84 return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt"))
85 }
86
87 // create the pid file before starting the process so that if we get two
88 // programs trying to concurrently start a server on the same directory
89 // at the same time, only one should succeed.
90 pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666)
91 if err != nil {
92 return fmt.Errorf("cannot create pid.txt: %v", err)
93 }
94 defer pidf.Close()
95
96 args, err := s.srv.Command()
97 if err != nil {
98 return fmt.Errorf("cannot determine command: %v", err)
99 }
100 cmd := exec.Command(args[0], args[1:]...)
101
102 logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
103 if err != nil {
104 return fmt.Errorf("cannot create log file: %v", err)
105 }
106 defer logf.Close()
107 cmd.Stdout = logf
108 cmd.Stderr = logf
109 if err := cmd.Start(); err != nil {
110 return fmt.Errorf("cannot start server: %v", err)
111 }
112 if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil {
113 return fmt.Errorf("cannot write pid file: %v", err)
114 }
115 return nil
116}
117
118// Stop kills the server. It is a no-op if it is already running.
119func (s *Service) Stop() os.Error {
120 p, err := s.Process()
121 if p == nil {
122 if err != nil {
123 return fmt.Errorf("cannot read process ID of server: %v", err)
124 }
125 return nil
126 }
127 defer p.Release()
128 if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") {
129 return fmt.Errorf("cannot kill server process: %v", err)
130 }
131 // ignore the error returned from Wait because there's little
132 // we can do about it - it either means that the process has just exited
133 // anyway or that we can't wait for it for some other reason,
134 // for example because it was originally started by some other process.
135 _, err = p.Wait(0)
136 if err != nil && strings.Contains(err.String(), "no child processes") {
137 // If we can't wait for the server, it's possible that it was running
138 // but not as a child of this process, so give it a little while
139 // to exit. TODO poll with kill(no signal)?
140 time.Sleep(0.5e9)
141 }
142
143 if err := os.Remove(s.path("pid.txt")); err != nil {
144 return fmt.Errorf("cannot remove server process ID file: %v", err)
145 }
146 return nil
147}
148
149// Destroy stops the server, and then removes its
150// directory and all its contents.
151// Warning: this will destroy all data associated with the server.
152func (s *Service) Destroy() os.Error {
153 if err := s.Stop(); err != nil {
154 return err
155 }
156 if err := os.RemoveAll(s.srv.Directory()); err != nil {
157 return err
158 }
159 return nil
160}
0161
=== added file 'status.go'
--- status.go 1970-01-01 00:00:00 +0000
+++ status.go 2011-09-30 13:35:26 +0000
@@ -0,0 +1,116 @@
1package zk
2
3import (
4 "fmt"
5)
6
7// event types, as defined by libzookeeper
8const (
9 _ = iota
10 event_CREATED
11 event_DELETED
12 event_CHANGED
13 event_CHILD
14 event_SESSION = -1
15 event_NOTWATCHING = -2
16)
17
18// The statusChan interface represents an event channel.
19// It translates from libzookeeper events to Go channel sends
20// of the appropriate type.
21type statusChan interface {
22 // close closes the wait channel.
23 close()
24
25 // send sends an event with the given type and connection
26 // state to the wait channel. It should do a non-blocking
27 // send and return false if the send would have blocked.
28 send(eventType, connectionState int, path string) bool
29}
30
31// SessionStatus represents the status of a ZooKeeper session.
32type SessionStatus int
33
34// Constants that represent the status of ZooKeeper
35// connection session.
36const (
37 _ SessionStatus = iota
38 SessionConnecting
39 SessionAssociating
40 SessionConnected
41 SessionExpired SessionStatus = -112
42 SessionAuthFailed SessionStatus = -113
43)
44
45type sessionStatusChan chan SessionStatus
46
47func (c sessionStatusChan) close() {
48 close(c)
49}
50
51func (c sessionStatusChan) send(eventType, eventState int, path string) bool {
52 if eventType != event_SESSION {
53 panic(fmt.Errorf("unexpected event, type %v; status %v", eventType, eventState))
54 }
55 if path != "" {
56 panic(fmt.Errorf("non-empty path received on session event"))
57 }
58 select {
59 case c <- SessionStatus(eventState):
60 default:
61 return false
62 }
63 return true
64}
65
66// NodeStatus represents the status of a node after ExistsW,
67// GetW, or ChildrenW has been called on the node.
68type NodeStatus int
69
70// Constants that represent changes to a ZooKeeper node.
71const (
72 _ NodeStatus = iota
73 NodeDeleted // The node has been deleted.
74 NodeCreated // The node has been created.
75 NodeChanged // The node data has changed.
76 NodeChangedChild // The number of children of the node has changed.
77)
78
79type nodeStatusChan struct {
80 c chan NodeStatus
81 path string // stored so we can check that the server is behaving.
82}
83
84func newNodeStatusChan(path string) *nodeStatusChan {
85 return &nodeStatusChan{path: path, c: make(chan NodeStatus, 1)}
86}
87
88func (c *nodeStatusChan) close() {
89 close(c.c)
90}
91
92func (c *nodeStatusChan) send(eventType, eventState int, path string) bool {
93 if path != c.path {
94 panic(fmt.Errorf("unexpected path %q received on node change event (type %v, status %v)",
95 path, eventType, eventState))
96 }
97 var val NodeStatus
98 switch eventType {
99 case event_CREATED:
100 val = NodeCreated
101 case event_DELETED:
102 val = NodeDeleted
103 case event_CHANGED:
104 val = NodeChanged
105 case event_CHILD:
106 val = NodeChangedChild
107 default:
108 panic(fmt.Errorf("unexpected event, type %v; status %v", eventType, eventState))
109 }
110 select {
111 case c.c <- val:
112 default:
113 return false
114 }
115 return true
116}
0117
=== modified file 'suite_test.go'
--- suite_test.go 2011-08-19 01:43:37 +0000
+++ suite_test.go 2011-09-30 13:35:26 +0000
@@ -1,94 +1,72 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "testing"
6 "io/ioutil"
7 "path"
8 "fmt"5 "fmt"
6 "launchpad.net/gozk/zk/service"
7 "launchpad.net/gozk/zk"
9 "os"8 "os"
10 "gozk"9 "testing"
11 "time"10 "time"
12)11)
1312
14func TestAll(t *testing.T) {13func TestAll(t *testing.T) {
15 TestingT(t)14 if !*reattach {
15 TestingT(t)
16 }
16}17}
1718
18var _ = Suite(&S{})19var _ = Suite(&S{})
1920
20type S struct {21type S struct {
21 zkRoot string22 zkServer *service.Service
22 zkTestRoot string23 zkTestRoot string
23 zkTestPort int24 zkAddr string
24 zkServerSh string
25 zkServerOut *os.File
26 zkAddr string
2725
28 handles []*gozk.ZooKeeper26 handles []*zk.Conn
29 events []*gozk.Event27 events []zk.SessionStatus
30 liveWatches int28 liveWatches int
31 deadWatches chan bool29 deadWatches chan bool
32}30}
3331
34var logLevel = 0 //gozk.LOG_ERROR32var logLevel = 0 // zk.LOG_ERROR
3533
3634func (s *S) init(c *C) (*zk.Conn, <-chan zk.SessionStatus) {
37var testZooCfg = ("dataDir=%s\n" +35 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
38 "clientPort=%d\n" +
39 "tickTime=2000\n" +
40 "initLimit=10\n" +
41 "syncLimit=5\n" +
42 "")
43
44var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
45 "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
46 "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
47 "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
48 "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
49 "")
50
51func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
52 zk, watch, err := gozk.Init(s.zkAddr, 5e9)
53 c.Assert(err, IsNil)36 c.Assert(err, IsNil)
5437
55 s.handles = append(s.handles, zk)38 s.handles = append(s.handles, conn)
5639
57 event := <-watch40 status := <-watch
5841 c.Assert(status == zk.SessionConnected, Equals, true)
59 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)42
60 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)43 bufferedWatch := make(chan zk.SessionStatus, 256)
6144 bufferedWatch <- status
62 bufferedWatch := make(chan gozk.Event, 256)
63 bufferedWatch <- event
6445
65 s.liveWatches += 146 s.liveWatches += 1
66 go func() {47 go func() {
67 loop:
68 for {48 for {
49 status, ok := <-watch
50 if !ok {
51 break
52 }
69 select {53 select {
70 case event, ok := <-watch:54 case bufferedWatch <- status:
71 if !ok {55 default:
72 close(bufferedWatch)56 panic("Too many events in buffered watch!")
73 break loop
74 }
75 select {
76 case bufferedWatch <- event:
77 default:
78 panic("Too many events in buffered watch!")
79 }
80 }57 }
81 }58 }
59 close(bufferedWatch)
82 s.deadWatches <- true60 s.deadWatches <- true
83 }()61 }()
8462
85 return zk, bufferedWatch63 return conn, bufferedWatch
86}64}
8765
88func (s *S) SetUpTest(c *C) {66func (s *S) SetUpTest(c *C) {
89 c.Assert(gozk.CountPendingWatches(), Equals, 0,67 c.Assert(zk.CountPendingWatches(), Equals, 0,
90 Bug("Test got a dirty watch state before running!"))68 Bug("Test got a dirty watch state before running!"))
91 gozk.SetLogLevel(logLevel)69 zk.SetLogLevel(logLevel)
92}70}
9371
94func (s *S) TearDownTest(c *C) {72func (s *S) TearDownTest(c *C) {
@@ -108,98 +86,38 @@
108 }86 }
10987
110 // Reset the list of handles.88 // Reset the list of handles.
111 s.handles = make([]*gozk.ZooKeeper, 0)89 s.handles = make([]*zk.Conn, 0)
11290
113 c.Assert(gozk.CountPendingWatches(), Equals, 0,91 c.Assert(zk.CountPendingWatches(), Equals, 0,
114 Bug("Test left live watches behind!"))92 Bug("Test left live watches behind!"))
115}93}
11694
117// We use the suite set up and tear down to manage a custom zookeeper95// We use the suite set up and tear down to manage a custom ZooKeeper
118//96//
119func (s *S) SetUpSuite(c *C) {97func (s *S) SetUpSuite(c *C) {
12098 var err os.Error
121 s.deadWatches = make(chan bool)99 s.deadWatches = make(chan bool)
122100
123 var err os.Error101 // N.B. We meed to create a subdirectory because zk.CreateServer
124102 // insists on creating its own directory.
125 s.zkRoot = os.Getenv("ZKROOT")103 s.zkTestRoot = c.MkDir() + "/zk"
126 if s.zkRoot == "" {104
127 panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")105 port := 21812
128 }106 s.zkAddr = fmt.Sprint("localhost:", port)
129107
130 s.zkTestRoot = c.MkDir()108 srv, err := zk.CreateServer(port, s.zkTestRoot, "")
131 s.zkTestPort = 21812109 if err != nil {
132110 c.Fatal("Cannot set up server environment: ", err)
133 println("ZooKeeper test server directory:", s.zkTestRoot)111 }
134 println("ZooKeeper test server port:", s.zkTestPort)112 s.zkServer = service.New(srv)
135113 err = s.zkServer.Start()
136 s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)114 if err != nil {
137115 c.Fatal("Cannot start ZooKeeper server: ", err)
138 s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")116 }
139 s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
140 os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
141 if err != nil {
142 panic("Can't open stdout.txt file for server: " + err.String())
143 }
144
145 dataDir := path.Join(s.zkTestRoot, "data")
146 confDir := path.Join(s.zkTestRoot, "conf")
147
148 os.Mkdir(dataDir, 0755)
149 os.Mkdir(confDir, 0755)
150
151 err = os.Setenv("ZOOCFGDIR", confDir)
152 if err != nil {
153 panic("Can't set $ZOOCFGDIR: " + err.String())
154 }
155
156 zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
157 err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
158 if err != nil {
159 panic("Can't write zoo.cfg: " + err.String())
160 }
161
162 log4jPrp := []byte(testLog4jPrp)
163 err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
164 if err != nil {
165 panic("Can't write log4j.properties: " + err.String())
166 }
167
168 s.StartZK()
169}117}
170118
171func (s *S) TearDownSuite(c *C) {119func (s *S) TearDownSuite(c *C) {
172 s.StopZK()120 if s.zkServer != nil {
173 s.zkServerOut.Close()121 s.zkServer.Stop() // TODO Destroy
174}
175
176func (s *S) StartZK() {
177 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
178 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)
179 if err != nil {
180 panic("Problem executing zkServer.sh start: " + err.String())
181 }
182
183 result, err := proc.Wait(0)
184 if err != nil {
185 panic(err.String())
186 } else if result.ExitStatus() != 0 {
187 panic("'zkServer.sh start' exited with non-zero status")
188 }
189}
190
191func (s *S) StopZK() {
192 attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
193 proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)
194 if err != nil {
195 panic("Problem executing zkServer.sh stop: " + err.String() +
196 " (look for runaway java processes!)")
197 }
198 result, err := proc.Wait(0)
199 if err != nil {
200 panic(err.String())
201 } else if result.ExitStatus() != 0 {
202 panic("'zkServer.sh stop' exited with non-zero status " +
203 "(look for runaway java processes!)")
204 }122 }
205}123}
206124
=== renamed file 'gozk.go' => 'zk.go'
--- gozk.go 2011-08-19 01:56:39 +0000
+++ zk.go 2011-09-30 13:35:26 +0000
@@ -1,12 +1,26 @@
1// gozk - Zookeeper support for the Go language1// gozk - ZooKeeper support for the Go language
2//2//
3// https://wiki.ubuntu.com/gozk3// https://wiki.ubuntu.com/gozk
4//4//
5// Copyright (c) 2010-2011 Canonical Ltd.5// Copyright (c) 2010-2011 Canonical Ltd.
6//
7// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>6// Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
8//7//
9package gozk8// A note about channels as used for change notification in gozk.
9//
10// Event channels are used to provide notifications of changes to ZooKeeper
11// nodes, from GetW, ExistsW, etc. These are invariably one-shot - a single
12// value will be sent on the wait channel to signify the change and then the
13// channel will be closed. If the ZooKeeper server goes down when awaiting
14// a change, the channel will be closed without sending a value. Clients
15// can either check explicitly for the closed channel with:
16//
17// if v, ok := <-wait; !ok {
18// }
19//
20// or just test the received value against zero, which
21// signifies no value for each of the notification types.
22//
23package zk
1024
11/*25/*
12#cgo CFLAGS: -I/usr/include/c-client-src26#cgo CFLAGS: -I/usr/include/c-client-src
@@ -27,17 +41,16 @@
27// -----------------------------------------------------------------------41// -----------------------------------------------------------------------
28// Main constants and data types.42// Main constants and data types.
2943
30// The main ZooKeeper object, created through the Init function.44// Conn represents a connection to a set of ZooKeeper nodes.
31// Encapsulates all communication with ZooKeeper.45type Conn struct {
32type ZooKeeper struct {46 watchChannels map[uintptr]statusChan
33 watchChannels map[uintptr]chan Event
34 sessionWatchId uintptr47 sessionWatchId uintptr
35 handle *C.zhandle_t48 handle *C.zhandle_t
36 mutex sync.Mutex49 mutex sync.Mutex
37}50}
3851
39// ClientId represents the established session in ZooKeeper. This is only52// ClientId represents an established ZooKeeper session. It can be
40// useful to be passed back into the ReInit function.53// passed into Redial to reestablish a connection to an existing session.
41type ClientId struct {54type ClientId struct {
42 cId C.clientid_t55 cId C.clientid_t
43}56}
@@ -51,84 +64,63 @@
51 Id string64 Id string
52}65}
5366
54// Event channels are used to provide notifications of changes in the67// Error represents a ZooKeeper error.
55// ZooKeeper connection state and in specific node aspects.68type Error int
56//
57// There are two sources of events: the session channel obtained during
58// initialization with Init, and any watch channels obtained
59// through one of the W-suffixed functions (GetW, ExistsW, etc).
60//
61// The session channel will only receive session-level events notifying
62// about critical and transient changes in the ZooKeeper connection
63// state (STATE_CONNECTED, STATE_EXPIRED_SESSION, etc). On long
64// running applications the session channel must *necessarily* be
65// observed since certain events like session expirations require an
66// explicit reconnection and reestablishment of state (or bailing out).
67// Because of that, the buffer used on the session channel has a limited
68// size, and a panic will occur if too many events are not collected.
69//
70// Watch channels enable monitoring state for nodes, and the
71// moment they're fired depends on which function was called to
72// create them. Note that, unlike in other ZooKeeper interfaces,
73// gozk will NOT dispatch unimportant session events such as
74// STATE_ASSOCIATING, STATE_CONNECTING and STATE_CONNECTED to
75// watch Event channels, since they are transient and disruptive
76// to the workflow. Critical state changes such as expirations
77// are still delivered to all event channels, though, and the
78// transient events may be obsererved in the session channel.
79//
80// Since every watch channel may receive critical session events, events
81// received must not be handled blindly as if the watch requested has
82// been fired. To facilitate such tests, Events offer the Ok method,
83// and they also have a good String method so they may be used as an
84// os.Error value if wanted. E.g.:
85//
86// event := <-watch
87// if !event.Ok() {
88// err = event
89// return
90// }
91//
92// Note that closed channels will deliver zeroed Event, which means
93// event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED,
94// to facilitate handling.
95type Event struct {
96 Type int
97 Path string
98 State int
99}
10069
101// Error codes that may be used to verify the result of the
102// Code method from Error.
103const (70const (
104 ZOK = C.ZOK71 ZOK Error = C.ZOK
105 ZSYSTEMERROR = C.ZSYSTEMERROR72 ZSYSTEMERROR Error = C.ZSYSTEMERROR
106 ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY73 ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
107 ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY74 ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
108 ZCONNECTIONLOSS = C.ZCONNECTIONLOSS75 ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
109 ZMARSHALLINGERROR = C.ZMARSHALLINGERROR76 ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
110 ZUNIMPLEMENTED = C.ZUNIMPLEMENTED77 ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
111 ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT78 ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
112 ZBADARGUMENTS = C.ZBADARGUMENTS79 ZBADARGUMENTS Error = C.ZBADARGUMENTS
113 ZINVALIDSTATE = C.ZINVALIDSTATE80 ZINVALIDSTATE Error = C.ZINVALIDSTATE
114 ZAPIERROR = C.ZAPIERROR81 ZAPIERROR Error = C.ZAPIERROR
115 ZNONODE = C.ZNONODE82 ZNONODE Error = C.ZNONODE
116 ZNOAUTH = C.ZNOAUTH83 ZNOAUTH Error = C.ZNOAUTH
117 ZBADVERSION = C.ZBADVERSION84 ZBADVERSION Error = C.ZBADVERSION
118 ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS85 ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
119 ZNODEEXISTS = C.ZNODEEXISTS86 ZNODEEXISTS Error = C.ZNODEEXISTS
120 ZNOTEMPTY = C.ZNOTEMPTY87 ZNOTEMPTY Error = C.ZNOTEMPTY
121 ZSESSIONEXPIRED = C.ZSESSIONEXPIRED88 ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
122 ZINVALIDCALLBACK = C.ZINVALIDCALLBACK89 ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
123 ZINVALIDACL = C.ZINVALIDACL90 ZINVALIDACL Error = C.ZINVALIDACL
124 ZAUTHFAILED = C.ZAUTHFAILED91 ZAUTHFAILED Error = C.ZAUTHFAILED
125 ZCLOSING = C.ZCLOSING92 ZCLOSING Error = C.ZCLOSING
126 ZNOTHING = C.ZNOTHING93 ZNOTHING Error = C.ZNOTHING
127 ZSESSIONMOVED = C.ZSESSIONMOVED94 ZSESSIONMOVED Error = C.ZSESSIONMOVED
128)95)
12996
97func (error Error) String() string {
98 return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
99}
100
101// zkError creates an appropriate error return from
102// a ZooKeeper status and the errno return from a C API
103// call.
104func zkError(rc C.int, cerr os.Error) os.Error {
105 code := Error(rc)
106 switch code {
107 case ZOK:
108 return nil
109
110 case ZSYSTEMERROR:
111 // If a ZooKeeper call returns ZSYSTEMERROR, then
112 // errno becomes significant. If errno has not been
113 // set, then we will return ZSYSTEMERROR nonetheless.
114 if cerr != nil {
115 return cerr
116 }
117 }
118 return code
119}
120
130// Constants for SetLogLevel.121// Constants for SetLogLevel.
131const (122const (
123 LOG_NONE = 0,
132 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR124 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
133 LOG_WARN = C.ZOO_LOG_LEVEL_WARN125 LOG_WARN = C.ZOO_LOG_LEVEL_WARN
134 LOG_INFO = C.ZOO_LOG_LEVEL_INFO126 LOG_INFO = C.ZOO_LOG_LEVEL_INFO
@@ -155,33 +147,6 @@
155 PERM_ALL = 0x1f147 PERM_ALL = 0x1f
156)148)
157149
158// Constants for Event Type.
159const (
160 EVENT_CREATED = iota + 1
161 EVENT_DELETED
162 EVENT_CHANGED
163 EVENT_CHILD
164 EVENT_SESSION = -1
165 EVENT_NOTWATCHING = -2
166
167 // Doesn't really exist in zk, but handy for use in zeroed Event
168 // values (e.g. closed channels).
169 EVENT_CLOSED = 0
170)
171
172// Constants for Event State.
173const (
174 STATE_EXPIRED_SESSION = -112
175 STATE_AUTH_FAILED = -113
176 STATE_CONNECTING = 1
177 STATE_ASSOCIATING = 2
178 STATE_CONNECTED = 3
179
180 // Doesn't really exist in zk, but handy for use in zeroed Event
181 // values (e.g. closed channels).
182 STATE_CLOSED = 0
183)
184
185func init() {150func init() {
186 if EPHEMERAL != C.ZOO_EPHEMERAL ||151 if EPHEMERAL != C.ZOO_EPHEMERAL ||
187 SEQUENCE != C.ZOO_SEQUENCE ||152 SEQUENCE != C.ZOO_SEQUENCE ||
@@ -191,17 +156,17 @@
191 PERM_DELETE != C.ZOO_PERM_DELETE ||156 PERM_DELETE != C.ZOO_PERM_DELETE ||
192 PERM_ADMIN != C.ZOO_PERM_ADMIN ||157 PERM_ADMIN != C.ZOO_PERM_ADMIN ||
193 PERM_ALL != C.ZOO_PERM_ALL ||158 PERM_ALL != C.ZOO_PERM_ALL ||
194 EVENT_CREATED != C.ZOO_CREATED_EVENT ||159 event_CREATED != C.ZOO_CREATED_EVENT ||
195 EVENT_DELETED != C.ZOO_DELETED_EVENT ||160 event_DELETED != C.ZOO_DELETED_EVENT ||
196 EVENT_CHANGED != C.ZOO_CHANGED_EVENT ||161 event_CHANGED != C.ZOO_CHANGED_EVENT ||
197 EVENT_CHILD != C.ZOO_CHILD_EVENT ||162 event_CHILD != C.ZOO_CHILD_EVENT ||
198 EVENT_SESSION != C.ZOO_SESSION_EVENT ||163 event_SESSION != C.ZOO_SESSION_EVENT ||
199 EVENT_NOTWATCHING != C.ZOO_NOTWATCHING_EVENT ||164 event_NOTWATCHING != C.ZOO_NOTWATCHING_EVENT ||
200 STATE_EXPIRED_SESSION != C.ZOO_EXPIRED_SESSION_STATE ||165 SessionExpired != SessionStatus(C.ZOO_EXPIRED_SESSION_STATE) ||
201 STATE_AUTH_FAILED != C.ZOO_AUTH_FAILED_STATE ||166 SessionAuthFailed != SessionStatus(C.ZOO_AUTH_FAILED_STATE) ||
202 STATE_CONNECTING != C.ZOO_CONNECTING_STATE ||167 SessionConnecting != SessionStatus(C.ZOO_CONNECTING_STATE) ||
203 STATE_ASSOCIATING != C.ZOO_ASSOCIATING_STATE ||168 SessionAssociating != SessionStatus(C.ZOO_ASSOCIATING_STATE) ||
204 STATE_CONNECTED != C.ZOO_CONNECTED_STATE {169 SessionConnected != SessionStatus(C.ZOO_CONNECTED_STATE) {
205170
206 panic("OOPS: Constants don't match C counterparts")171 panic("OOPS: Constants don't match C counterparts")
207 }172 }
@@ -222,155 +187,97 @@
222}187}
223188
224// -----------------------------------------------------------------------189// -----------------------------------------------------------------------
225// Event methods.190// Session status methods.
226191
227// Ok returns true in case the event reports zk as being in a usable state.192// Ok returns true in case the event reports zk as being in a usable state.
228func (e Event) Ok() bool {193func (status SessionStatus) Ok() bool {
229 // That's really it for now. Anything else seems to mean zk194 // That's really it for now. Anything else seems to mean zk
230 // can't be used at the moment.195 // can't be used at the moment.
231 return e.State == STATE_CONNECTED196 return status == SessionConnected
232}197}
233198
234func (e Event) String() (s string) {199func (status SessionStatus) String() (s string) {
235 switch e.State {200 switch status {
236 case STATE_EXPIRED_SESSION:201 case 0:
202 s = "ZooKeeper connection closed"
203 case SessionExpired:
237 s = "ZooKeeper session expired"204 s = "ZooKeeper session expired"
238 case STATE_AUTH_FAILED:205 case SessionAuthFailed:
239 s = "ZooKeeper authentication failed"206 s = "ZooKeeper authentication failed"
240 case STATE_CONNECTING:207 case SessionConnecting:
241 s = "ZooKeeper connecting"208 s = "ZooKeeper connecting"
242 case STATE_ASSOCIATING:209 case SessionAssociating:
243 s = "ZooKeeper still associating"210 s = "ZooKeeper still associating"
244 case STATE_CONNECTED:211 case SessionConnected:
245 s = "ZooKeeper connected"212 s = "ZooKeeper connected"
246 case STATE_CLOSED:
247 s = "ZooKeeper connection closed"
248 default:213 default:
249 s = fmt.Sprintf("unknown ZooKeeper state %d", e.State)214 s = fmt.Sprintf("unknown ZooKeeper state %d", status)
250 }215 }
251 if e.Type == -1 || e.Type == EVENT_SESSION {216 return s
252 return217}
253 }218
254 if s != "" {219// -----------------------------------------------------------------------
255 s += "; "
256 }
257 switch e.Type {
258 case EVENT_CREATED:
259 s += "path created: "
260 case EVENT_DELETED:
261 s += "path deleted: "
262 case EVENT_CHANGED:
263 s += "path changed: "
264 case EVENT_CHILD:
265 s += "path children changed: "
266 case EVENT_NOTWATCHING:
267 s += "not watching: " // !?
268 case EVENT_SESSION:
269 // nothing
270 }
271 s += e.Path
272 return
273}
274
275// -----------------------------------------------------------------------
276// Error interface which maps onto the ZooKeeper error codes.
277
278type Error interface {
279 String() string
280 Code() int
281}
282
283type errorType struct {
284 zkrc C.int
285 err os.Error
286}
287
288func newError(zkrc C.int, err os.Error) Error {
289 return &errorType{zkrc, err}
290}
291
292func (error *errorType) String() (result string) {
293 if error.zkrc == ZSYSTEMERROR && error.err != nil {
294 result = error.err.String()
295 } else {
296 result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
297 }
298 return
299}
300
301// Code returns the error code that may be compared against one of
302// the gozk.Z* constants.
303func (error *errorType) Code() int {
304 return int(error.zkrc)
305}
306
307// -----------------------------------------------------------------------
308// Stat interface which maps onto the ZooKeeper Stat struct.
309
310// We declare this as an interface rather than an actual struct because
311// this way we don't have to copy data around between the real C struct
312// and the Go one on every call. Most uses will only touch a few elements,
313// or even ignore the stat entirely, so that's a win.
314220
315// Stat contains detailed information about a node.221// Stat contains detailed information about a node.
316type Stat interface {222type Stat struct {
317 Czxid() int64223 c C.struct_Stat
318 Mzxid() int64224}
319 CTime() int64225
320 MTime() int64226func (stat *Stat) String() string {
321 Version() int32227 return fmt.Sprintf(
322 CVersion() int32228 "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+
323 AVersion() int32229 "Version: %d; CVersion: %d; AVersion: %d; "+
324 EphemeralOwner() int64230 "EphemeralOwner: %d; DataLength: %d; "+
325 DataLength() int32231 "NumChildren: %d; Pzxid: %d}",
326 NumChildren() int32232 stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(),
327 Pzxid() int64233 stat.Version(), stat.CVersion(), stat.AVersion(),
328}234 stat.EphemeralOwner(), stat.DataLength(),
329235 stat.NumChildren(), stat.Pzxid(),
330type resultStat C.struct_Stat236 )
331237}
332func (stat *resultStat) Czxid() int64 {238
333 return int64(stat.czxid)239func (stat *Stat) Czxid() int64 {
334}240 return int64(stat.c.czxid)
335241}
336func (stat *resultStat) Mzxid() int64 {242
337 return int64(stat.mzxid)243func (stat *Stat) Mzxid() int64 {
338}244 return int64(stat.c.mzxid)
339245}
340func (stat *resultStat) CTime() int64 {246
341 return int64(stat.ctime)247func (stat *Stat) CTime() int64 {
342}248 return int64(stat.c.ctime)
343249}
344func (stat *resultStat) MTime() int64 {250
345 return int64(stat.mtime)251func (stat *Stat) MTime() int64 {
346}252 return int64(stat.c.mtime)
347253}
348func (stat *resultStat) Version() int32 {254
349 return int32(stat.version)255func (stat *Stat) Version() int32 {
350}256 return int32(stat.c.version)
351257}
352func (stat *resultStat) CVersion() int32 {258
353 return int32(stat.cversion)259func (stat *Stat) CVersion() int32 {
354}260 return int32(stat.c.cversion)
355261}
356func (stat *resultStat) AVersion() int32 {262
357 return int32(stat.aversion)263func (stat *Stat) AVersion() int32 {
358}264 return int32(stat.c.aversion)
359265}
360func (stat *resultStat) EphemeralOwner() int64 {266
361 return int64(stat.ephemeralOwner)267func (stat *Stat) EphemeralOwner() int64 {
362}268 return int64(stat.c.ephemeralOwner)
363269}
364func (stat *resultStat) DataLength() int32 {270
365 return int32(stat.dataLength)271func (stat *Stat) DataLength() int32 {
366}272 return int32(stat.c.dataLength)
367273}
368func (stat *resultStat) NumChildren() int32 {274
369 return int32(stat.numChildren)275func (stat *Stat) NumChildren() int32 {
370}276 return int32(stat.c.numChildren)
371277}
372func (stat *resultStat) Pzxid() int64 {278
373 return int64(stat.pzxid)279func (stat *Stat) Pzxid() int64 {
280 return int64(stat.c.pzxid)
374}281}
375282
376// -----------------------------------------------------------------------283// -----------------------------------------------------------------------
@@ -380,11 +287,12 @@
380287
381// SetLogLevel changes the minimum level of logging output generated288// SetLogLevel changes the minimum level of logging output generated
382// to adjust the amount of information provided.289// to adjust the amount of information provided.
290// The default logging level is LOG_ERROR.
383func SetLogLevel(level int) {291func SetLogLevel(level int) {
384 C.zoo_set_debug_level(C.ZooLogLevel(level))292 C.zoo_set_debug_level(C.ZooLogLevel(level))
385}293}
386294
387// Init initializes the communication with a ZooKeeper cluster. The provided295// Dial initializes the communication with a ZooKeeper cluster. The provided
388// servers parameter may include multiple server addresses, separated296// servers parameter may include multiple server addresses, separated
389// by commas, so that the client will automatically attempt to connect297// by commas, so that the client will automatically attempt to connect
390// to another server if one of them stops working for whatever reason.298// to another server if one of them stops working for whatever reason.
@@ -395,82 +303,80 @@
395//303//
396// Session establishment is asynchronous, meaning that this function304// Session establishment is asynchronous, meaning that this function
397// will return before the communication with ZooKeeper is fully established.305// will return before the communication with ZooKeeper is fully established.
398// The watch channel receives events of type SESSION_EVENT when any change306// The returned channel is used to receive events about the current
399// to the state of the established connection happens. See the documentation307// status of the connection. On long running applications the session channel
400// for the Event type for more details.308// must *necessarily* be observed since certain events like session expirations require an
401func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {309// explicit reconnection and reestablishment of state (or bailing out).
402 zk, watch, err = internalInit(servers, recvTimeoutNS, nil)310// Because of that, the buffer used on the session channel has a limited
403 return311// size, and a panic will occur if too many events are not collected.
312func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan SessionStatus, os.Error) {
313 return dial(servers, recvTimeoutNS, nil)
404}314}
405315
406// Equivalent to Init, but attempt to reestablish an existing session316// Redial is equivalent to Dial, but attempts to reestablish an existing session
407// identified via the clientId parameter.317// identified via the clientId parameter.
408func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {318func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan SessionStatus, os.Error) {
409 zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)319 return dial(servers, recvTimeoutNS, clientId)
410 return
411}320}
412321
413func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {322func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan SessionStatus, os.Error) {
414323 conn := &Conn{}
415 zk := &ZooKeeper{}324 conn.watchChannels = make(map[uintptr]statusChan)
416 zk.watchChannels = make(map[uintptr]chan Event)
417325
418 var cId *C.clientid_t326 var cId *C.clientid_t
419 if clientId != nil {327 if clientId != nil {
420 cId = &clientId.cId328 cId = &clientId.cId
421 }329 }
422330
423 watchId, watchChannel := zk.createWatch(true)331 watchChannel := make(sessionStatusChan, 32)
424 zk.sessionWatchId = watchId332 watchId := conn.createWatch(watchChannel)
333 conn.sessionWatchId = watchId
425334
426 cservers := C.CString(servers)335 cservers := C.CString(servers)
427 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)336 handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
428 C.free(unsafe.Pointer(cservers))337 C.free(unsafe.Pointer(cservers))
429 if handle == nil {338 if handle == nil {
430 zk.closeAllWatches()339 conn.closeAllWatches()
431 return nil, nil, newError(ZSYSTEMERROR, cerr)340 return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
432 }341 }
433 zk.handle = handle342 conn.handle = handle
434 runWatchLoop()343 runWatchLoop()
435 return zk, watchChannel, nil344 return conn, watchChannel, nil
436}345}
437346
438// ClientId returns the client ID for the existing session with ZooKeeper.347// ClientId returns the client ID for the existing session with ZooKeeper.
439// This is useful to reestablish an existing session via ReInit.348// This is useful to reestablish an existing session via ReInit.
440func (zk *ZooKeeper) ClientId() *ClientId {349func (conn *Conn) ClientId() *ClientId {
441 return &ClientId{*C.zoo_client_id(zk.handle)}350 return &ClientId{*C.zoo_client_id(conn.handle)}
442}351}
443352
444// Close terminates the ZooKeeper interaction.353// Close terminates the ZooKeeper interaction.
445func (zk *ZooKeeper) Close() Error {354func (conn *Conn) Close() os.Error {
446355
447 // Protect from concurrency around zk.handle change.356 // Protect from concurrency around conn.handle change.
448 zk.mutex.Lock()357 conn.mutex.Lock()
449 defer zk.mutex.Unlock()358 defer conn.mutex.Unlock()
450359
451 if zk.handle == nil {360 if conn.handle == nil {
452 // ZooKeeper may hang indefinitely if a handler is closed twice,361 // ZooKeeper may hang indefinitely if a handler is closed twice,
453 // so we get in the way and prevent it from happening.362 // so we get in the way and prevent it from happening.
454 return newError(ZCLOSING, nil)363 return ZCLOSING
455 }364 }
456 rc, cerr := C.zookeeper_close(zk.handle)365 rc, cerr := C.zookeeper_close(conn.handle)
457366
458 zk.closeAllWatches()367 conn.closeAllWatches()
459 stopWatchLoop()368 stopWatchLoop()
460369
461 // At this point, nothing else should need zk.handle.370 // At this point, nothing else should need conn.handle.
462 zk.handle = nil371 conn.handle = nil
463372
464 if rc != C.ZOK {373 return zkError(rc, cerr)
465 return newError(rc, cerr)
466 }
467 return nil
468}374}
469375
470// Get returns the data and status from an existing node. err will be nil,376// Get returns the data and status from an existing node. err will be nil,
471// unless an error is found. Attempting to retrieve data from a non-existing377// unless an error is found. Attempting to retrieve data from a non-existing
472// node is an error.378// node is an error.
473func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {379func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
474380
475 cpath := C.CString(path)381 cpath := C.CString(path)
476 cbuffer := (*C.char)(C.malloc(bufferSize))382 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -478,22 +384,22 @@
478 defer C.free(unsafe.Pointer(cpath))384 defer C.free(unsafe.Pointer(cpath))
479 defer C.free(unsafe.Pointer(cbuffer))385 defer C.free(unsafe.Pointer(cbuffer))
480386
481 cstat := C.struct_Stat{}387 var cstat Stat
482 rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,388 rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c)
483 cbuffer, &cbufferLen, &cstat)
484 if rc != C.ZOK {389 if rc != C.ZOK {
485 return "", nil, newError(rc, cerr)390 return "", nil, zkError(rc, cerr)
486 }391 }
392
487 result := C.GoStringN(cbuffer, cbufferLen)393 result := C.GoStringN(cbuffer, cbufferLen)
488394 return result, &cstat, nil
489 return result, (*resultStat)(&cstat), nil
490}395}
491396
492// GetW works like Get but also returns a channel that will receive397// GetW works like Get but also returns a channel that will receive
493// a single Event value when the data or existence of the given ZooKeeper398// a single value, one of NodeChanged or NodeDeleted, when the
494// node changes or when critical session events happen. See the399// node contents change or it is deleted.
495// documentation of the Event type for more details.400// If the ZooKeeper server goes down before an
496func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {401// event happens, the channel will be closed.
402func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan NodeStatus, err os.Error) {
497403
498 cpath := C.CString(path)404 cpath := C.CString(path)
499 cbuffer := (*C.char)(C.malloc(bufferSize))405 cbuffer := (*C.char)(C.malloc(bufferSize))
@@ -501,82 +407,94 @@
501 defer C.free(unsafe.Pointer(cpath))407 defer C.free(unsafe.Pointer(cpath))
502 defer C.free(unsafe.Pointer(cbuffer))408 defer C.free(unsafe.Pointer(cbuffer))
503409
504 watchId, watchChannel := zk.createWatch(true)410 watchChannel := newNodeStatusChan(path)
411 watchId := conn.createWatch(watchChannel)
505412
506 cstat := C.struct_Stat{}413 var cstat Stat
507 rc, cerr := C.zoo_wget(zk.handle, cpath,414 rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c)
508 C.watch_handler, unsafe.Pointer(watchId),
509 cbuffer, &cbufferLen, &cstat)
510 if rc != C.ZOK {415 if rc != C.ZOK {
511 zk.forgetWatch(watchId)416 conn.forgetWatch(watchId)
512 return "", nil, nil, newError(rc, cerr)417 return "", nil, nil, zkError(rc, cerr)
513 }418 }
514419
515 result := C.GoStringN(cbuffer, cbufferLen)420 result := C.GoStringN(cbuffer, cbufferLen)
516 return result, (*resultStat)(&cstat), watchChannel, nil421 return result, &cstat, watchChannel.c, nil
517}422}
518423
519// Children returns the children list and status from an existing node.424// Children returns the children list and status from an existing node.
520// err will be nil, unless an error is found. Attempting to retrieve the425// Attempting to retrieve the children list from a non-existent node is an error.
521// children list from a non-existent node is an error.426func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
522func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
523427
524 cpath := C.CString(path)428 cpath := C.CString(path)
525 defer C.free(unsafe.Pointer(cpath))429 defer C.free(unsafe.Pointer(cpath))
526430
527 cvector := C.struct_String_vector{}431 cvector := C.struct_String_vector{}
528 cstat := C.struct_Stat{}432 var cstat Stat
529 rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,433 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
530 &cvector, &cstat)
531434
532 // Can't happen if rc != 0, but avoid potential memory leaks in the future.435 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
533 if cvector.count != 0 {436 if cvector.count != 0 {
534 children = parseStringVector(&cvector)437 children = parseStringVector(&cvector)
535 }438 }
536 if rc != C.ZOK {439 if rc == C.ZOK {
537 err = newError(rc, cerr)440 stat = &cstat
538 } else {441 } else {
539 stat = (*resultStat)(&cstat)442 err = zkError(rc, cerr)
540 }443 }
541 return444 return
542}445}
543446
544// ChildrenW works like Children but also returns a channel that will447// ChildrenW works like Children but also returns a channel that will
545// receive a single Event value when a node is added or removed under the448// receive a single value, one of NodeDeleted or NodeChangedChild,
546// provided path or when critical session events happen. See the documentation449// when a node is added or removed under the
547// of the Event type for more details.450// provided path or when the node itself is deleted. If the ZooKeeper server
548func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {451// goes down before an event happens, the channel will be closed.
452func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan NodeStatus, err os.Error) {
549453
550 cpath := C.CString(path)454 cpath := C.CString(path)
551 defer C.free(unsafe.Pointer(cpath))455 defer C.free(unsafe.Pointer(cpath))
552456
553 watchId, watchChannel := zk.createWatch(true)457 watchChannel := newNodeStatusChan(path)
458 watchId := conn.createWatch(watchChannel)
554459
555 cvector := C.struct_String_vector{}460 cvector := C.struct_String_vector{}
556 cstat := C.struct_Stat{}461 var cstat Stat
557 rc, cerr := C.zoo_wget_children2(zk.handle, cpath,462 rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c)
558 C.watch_handler, unsafe.Pointer(watchId),
559 &cvector, &cstat)
560463
561 // Can't happen if rc != 0, but avoid potential memory leaks in the future.464 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
562 if cvector.count != 0 {465 if cvector.count != 0 {
563 children = parseStringVector(&cvector)466 children = parseStringVector(&cvector)
564 }467 }
565 if rc != C.ZOK {468 if rc == C.ZOK {
566 zk.forgetWatch(watchId)469 stat = &cstat
567 err = newError(rc, cerr)470 watch = watchChannel.c
568 } else {471 } else {
569 stat = (*resultStat)(&cstat)472 conn.forgetWatch(watchId)
570 watch = watchChannel473 err = zkError(rc, cerr)
571 }474 }
572 return475 return
573}476}
574477
478// an arguably nicer version of parseStringVector (untested)
479//func parseStringVector(c *C.struct_String_vector) []string {
480// // make a Go slice onto the C data.
481// cv := *(*[]*C.char)(&reflect.SliceHeader{
482// Data: uintptr(unsafe.Pointer(c.data)),
483// Len: c.count,
484// Cap: c.count,
485// })
486// v := make([]string, c.count)
487// for i, s := range cv {
488// v[i] = C.GoString(s)
489// }
490// return v
491//}
492
575func parseStringVector(cvector *C.struct_String_vector) []string {493func parseStringVector(cvector *C.struct_String_vector) []string {
576 vector := make([]string, cvector.count)494 vector := make([]string, cvector.count)
577 dataStart := uintptr(unsafe.Pointer(cvector.data))495 dataStart := uintptr(unsafe.Pointer(cvector.data))
578 uintptrSize := unsafe.Sizeof(dataStart)496 uintptrSize := unsafe.Sizeof(dataStart)
579 for i := 0; i != len(vector); i++ {497 for i := range vector {
580 cpathPos := dataStart + uintptr(i)*uintptrSize498 cpathPos := dataStart + uintptr(i)*uintptrSize
581 cpath := *(**C.char)(unsafe.Pointer(cpathPos))499 cpath := *(**C.char)(unsafe.Pointer(cpathPos))
582 vector[i] = C.GoString(cpath)500 vector[i] = C.GoString(cpath)
@@ -588,51 +506,50 @@
588// Exists checks if a node exists at the given path. If it does,506// Exists checks if a node exists at the given path. If it does,
589// stat will contain meta information on the existing node, otherwise507// stat will contain meta information on the existing node, otherwise
590// it will be nil.508// it will be nil.
591func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {509func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) {
592 cpath := C.CString(path)510 cpath := C.CString(path)
593 defer C.free(unsafe.Pointer(cpath))511 defer C.free(unsafe.Pointer(cpath))
594512
595 cstat := C.struct_Stat{}513 var cstat Stat
596 rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)514 rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c)
597515
598 // We diverge a bit from the usual here: a ZNONODE is not an error516 // We diverge a bit from the usual here: a ZNONODE is not an error
599 // for an exists call, otherwise every Exists call would have to check517 // for an exists call, otherwise every Exists call would have to check
600 // for err != nil and err.Code() != ZNONODE.518 // for err != nil and err.Code() != ZNONODE.
601 if rc == C.ZOK {519 if rc == C.ZOK {
602 stat = (*resultStat)(&cstat)520 stat = &cstat
603 } else if rc != C.ZNONODE {521 } else if rc != C.ZNONODE {
604 err = newError(rc, cerr)522 err = zkError(rc, cerr)
605 }523 }
606 return524 return
607}525}
608526
609// ExistsW works like Exists but also returns a channel that will527// ExistsW works like Exists but also returns a channel that will
610// receive an Event value when a node is created in case the returned528// receive a single value, one of NodeCreated, NodeDeleted or NodeChanged,
611// stat is nil and the node didn't exist, or when the existing node529// when the node is next created, removed, or its contents change.
612// is removed. It will also receive critical session events. See the530// If the ZooKeeper server goes down before an event happens, the channel will be closed.
613// documentation of the Event type for more details.531func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan NodeStatus, err os.Error) {
614func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {
615 cpath := C.CString(path)532 cpath := C.CString(path)
616 defer C.free(unsafe.Pointer(cpath))533 defer C.free(unsafe.Pointer(cpath))
617534
618 watchId, watchChannel := zk.createWatch(true)535 watchChannel := newNodeStatusChan(path)
536 watchId := conn.createWatch(watchChannel)
619537
620 cstat := C.struct_Stat{}538 var cstat Stat
621 rc, cerr := C.zoo_wexists(zk.handle, cpath,539 rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
622 C.watch_handler, unsafe.Pointer(watchId), &cstat)
623540
624 // We diverge a bit from the usual here: a ZNONODE is not an error541 // We diverge a bit from the usual here: a ZNONODE is not an error
625 // for an exists call, otherwise every Exists call would have to check542 // for an exists call, otherwise every Exists call would have to check
626 // for err != nil and err.Code() != ZNONODE.543 // for err != nil and err.Code() != ZNONODE.
627 switch rc {544 switch Error(rc) {
628 case ZOK:545 case ZOK:
629 stat = (*resultStat)(&cstat)546 stat = &cstat
630 watch = watchChannel547 watch = watchChannel.c
631 case ZNONODE:548 case ZNONODE:
632 watch = watchChannel549 watch = watchChannel.c
633 default:550 default:
634 zk.forgetWatch(watchId)551 conn.forgetWatch(watchId)
635 err = newError(rc, cerr)552 err = zkError(rc, cerr)
636 }553 }
637 return554 return
638}555}
@@ -646,7 +563,7 @@
646// The returned path is useful in cases where the created path may differ563// The returned path is useful in cases where the created path may differ
647// from the requested one, such as when a sequence number is appended564// from the requested one, such as when a sequence number is appended
648// to it due to the use of the gozk.SEQUENCE flag.565// to it due to the use of the gozk.SEQUENCE flag.
649func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {566func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
650 cpath := C.CString(path)567 cpath := C.CString(path)
651 cvalue := C.CString(value)568 cvalue := C.CString(value)
652 defer C.free(unsafe.Pointer(cpath))569 defer C.free(unsafe.Pointer(cpath))
@@ -660,13 +577,13 @@
660 cpathCreated := (*C.char)(C.malloc(cpathLen))577 cpathCreated := (*C.char)(C.malloc(cpathLen))
661 defer C.free(unsafe.Pointer(cpathCreated))578 defer C.free(unsafe.Pointer(cpathCreated))
662579
663 rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),580 rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
664 caclv, C.int(flags), cpathCreated, C.int(cpathLen))581 if rc == C.ZOK {
665 if rc != C.ZOK {582 pathCreated = C.GoString(cpathCreated)
666 return "", newError(rc, cerr)583 } else {
584 err = zkError(rc, cerr)
667 }585 }
668586 return
669 return C.GoString(cpathCreated), nil
670}587}
671588
672// Set modifies the data for the existing node at the given path, replacing it589// Set modifies the data for the existing node at the given path, replacing it
@@ -677,35 +594,31 @@
677//594//
678// It is an error to attempt to set the data of a non-existing node with595// It is an error to attempt to set the data of a non-existing node with
679// this function. In these cases, use Create instead.596// this function. In these cases, use Create instead.
680func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {597func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
681598
682 cpath := C.CString(path)599 cpath := C.CString(path)
683 cvalue := C.CString(value)600 cvalue := C.CString(value)
684 defer C.free(unsafe.Pointer(cpath))601 defer C.free(unsafe.Pointer(cpath))
685 defer C.free(unsafe.Pointer(cvalue))602 defer C.free(unsafe.Pointer(cvalue))
686603
687 cstat := C.struct_Stat{}604 var cstat Stat
688605 rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c)
689 rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),606 if rc == C.ZOK {
690 C.int(version), &cstat)607 stat = &cstat
691 if rc != C.ZOK {608 } else {
692 return nil, newError(rc, cerr)609 err = zkError(rc, cerr)
693 }610 }
694611 return
695 return (*resultStat)(&cstat), nil
696}612}
697613
698// Delete removes the node at path. If version is not -1, the operation614// Delete removes the node at path. If version is not -1, the operation
699// will only succeed if the node is still at this version when the615// will only succeed if the node is still at this version when the
700// node is deleted as an atomic operation.616// node is deleted as an atomic operation.
701func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {617func (conn *Conn) Delete(path string, version int32) (err os.Error) {
702 cpath := C.CString(path)618 cpath := C.CString(path)
703 defer C.free(unsafe.Pointer(cpath))619 defer C.free(unsafe.Pointer(cpath))
704 rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))620 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
705 if rc != C.ZOK {621 return zkError(rc, cerr)
706 return newError(rc, cerr)
707 }
708 return nil
709}622}
710623
711// AddAuth adds a new authentication certificate to the ZooKeeper624// AddAuth adds a new authentication certificate to the ZooKeeper
@@ -713,7 +626,7 @@
713// authentication information, while the cert parameter provides the626// authentication information, while the cert parameter provides the
714// identity data itself. For instance, the "digest" scheme requires627// identity data itself. For instance, the "digest" scheme requires
715// a pair like "username:password" to be provided as the certificate.628// a pair like "username:password" to be provided as the certificate.
716func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {629func (conn *Conn) AddAuth(scheme, cert string) os.Error {
717 cscheme := C.CString(scheme)630 cscheme := C.CString(scheme)
718 ccert := C.CString(cert)631 ccert := C.CString(cert)
719 defer C.free(unsafe.Pointer(cscheme))632 defer C.free(unsafe.Pointer(cscheme))
@@ -725,43 +638,38 @@
725 }638 }
726 defer C.destroy_completion_data(data)639 defer C.destroy_completion_data(data)
727640
728 rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),641 rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data))
729 C.handle_void_completion, unsafe.Pointer(data))
730 if rc != C.ZOK {642 if rc != C.ZOK {
731 return newError(rc, cerr)643 return zkError(rc, cerr)
732 }644 }
733645
734 C.wait_for_completion(data)646 C.wait_for_completion(data)
735647
736 rc = C.int(uintptr(data.data))648 rc = C.int(uintptr(data.data))
737 if rc != C.ZOK {649 return zkError(rc, nil)
738 return newError(rc, nil)
739 }
740
741 return nil
742}650}
743651
744// ACL returns the access control list for path.652// ACL returns the access control list for path.
745func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {653func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
746654
747 cpath := C.CString(path)655 cpath := C.CString(path)
748 defer C.free(unsafe.Pointer(cpath))656 defer C.free(unsafe.Pointer(cpath))
749657
750 caclv := C.struct_ACL_vector{}658 caclv := C.struct_ACL_vector{}
751 cstat := C.struct_Stat{}
752659
753 rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)660 var cstat Stat
661 rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
754 if rc != C.ZOK {662 if rc != C.ZOK {
755 return nil, nil, newError(rc, cerr)663 return nil, nil, zkError(rc, cerr)
756 }664 }
757665
758 aclv := parseACLVector(&caclv)666 aclv := parseACLVector(&caclv)
759667
760 return aclv, (*resultStat)(&cstat), nil668 return aclv, &cstat, nil
761}669}
762670
763// SetACL changes the access control list for path.671// SetACL changes the access control list for path.
764func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {672func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
765673
766 cpath := C.CString(path)674 cpath := C.CString(path)
767 defer C.free(unsafe.Pointer(cpath))675 defer C.free(unsafe.Pointer(cpath))
@@ -769,12 +677,8 @@
769 caclv := buildACLVector(aclv)677 caclv := buildACLVector(aclv)
770 defer C.deallocate_ACL_vector(caclv)678 defer C.deallocate_ACL_vector(caclv)
771679
772 rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)680 rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
773 if rc != C.ZOK {681 return zkError(rc, cerr)
774 return newError(rc, cerr)
775 }
776
777 return nil
778}682}
779683
780func parseACLVector(caclv *C.struct_ACL_vector) []ACL {684func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
@@ -822,7 +726,7 @@
822// -----------------------------------------------------------------------726// -----------------------------------------------------------------------
823// RetryChange utility method.727// RetryChange utility method.
824728
825type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)729type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
826730
827// RetryChange runs changeFunc to attempt to atomically change path731// RetryChange runs changeFunc to attempt to atomically change path
828// in a lock free manner, and retries in case there was another732// in a lock free manner, and retries in case there was another
@@ -831,8 +735,7 @@
831// changeFunc must work correctly if called multiple times in case735// changeFunc must work correctly if called multiple times in case
832// the modification fails due to concurrent changes, and it may return736// the modification fails due to concurrent changes, and it may return
833// an error that will cause the the RetryChange function to stop and737// an error that will cause the the RetryChange function to stop and
834// return an Error with code ZSYSTEMERROR and the same .String() result738// return the same error.
835// as the provided error.
836//739//
837// This mechanism is not suitable for a node that is frequently modified740// This mechanism is not suitable for a node that is frequently modified
838// concurrently. For those cases, consider using a pessimistic locking741// concurrently. For those cases, consider using a pessimistic locking
@@ -845,8 +748,7 @@
845//748//
846// 2. Call the changeFunc with the current node value and stat,749// 2. Call the changeFunc with the current node value and stat,
847// or with an empty string and nil stat, if the node doesn't yet exist.750// or with an empty string and nil stat, if the node doesn't yet exist.
848// If the changeFunc returns an error, stop and return an Error with751// If the changeFunc returns an error, stop and return the same error.
849// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
850//752//
851// 3. If the changeFunc returns no errors, use the string returned as753// 3. If the changeFunc returns no errors, use the string returned as
852// the new candidate value for the node, and attempt to either create754// the new candidate value for the node, and attempt to either create
@@ -855,36 +757,32 @@
855// in the same node), repeat from step 1. If this procedure fails with any757// in the same node), repeat from step 1. If this procedure fails with any
856// other error, stop and return the error found.758// other error, stop and return the error found.
857//759//
858func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {760func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
859 for {761 for {
860 oldValue, oldStat, getErr := zk.Get(path)762 oldValue, oldStat, err := conn.Get(path)
861 if getErr != nil && getErr.Code() != ZNONODE {763 if err != nil && err != ZNONODE {
862 err = getErr764 return err
863 break765 }
864 }766 newValue, err := changeFunc(oldValue, oldStat)
865 newValue, osErr := changeFunc(oldValue, oldStat)767 if err != nil {
866 if osErr != nil {768 return err
867 return newError(ZSYSTEMERROR, osErr)769 }
868 } else if oldStat == nil {770 if oldStat == nil {
869 _, err = zk.Create(path, newValue, flags, acl)771 _, err := conn.Create(path, newValue, flags, acl)
870 if err == nil || err.Code() != ZNODEEXISTS {772 if err == nil || err != ZNODEEXISTS {
871 break773 return err
872 }774 }
873 } else if newValue == oldValue {775 continue
776 }
777 if newValue == oldValue {
874 return nil // Nothing to do.778 return nil // Nothing to do.
875 } else {779 }
876 _, err = zk.Set(path, newValue, oldStat.Version())780 _, err = conn.Set(path, newValue, oldStat.Version())
877 if err == nil {781 if err == nil || (err != ZBADVERSION && err != ZNONODE) {
878 break782 return err
879 } else {
880 code := err.Code()
881 if code != ZBADVERSION && code != ZNONODE {
882 break
883 }
884 }
885 }783 }
886 }784 }
887 return err785 panic("not reached")
888}786}
889787
890// -----------------------------------------------------------------------788// -----------------------------------------------------------------------
@@ -897,7 +795,7 @@
897// Whenever a *W method is called, it will return a channel which795// Whenever a *W method is called, it will return a channel which
898// outputs Event values. Internally, a map is used to maintain references796// outputs Event values. Internally, a map is used to maintain references
899// between an unique integer key (the watchId), and the event channel. The797// between an unique integer key (the watchId), and the event channel. The
900// watchId is then handed to the C zookeeper library as the watch context,798// watchId is then handed to the C ZooKeeper library as the watch context,
901// so that we get it back when events happen. Using an integer key as the799// so that we get it back when events happen. Using an integer key as the
902// watch context rather than a pointer is needed because there's no guarantee800// watch context rather than a pointer is needed because there's no guarantee
903// that in the future the GC will not move objects around, and also because801// that in the future the GC will not move objects around, and also because
@@ -910,13 +808,13 @@
910// Since Cgo doesn't allow calling back into Go, we actually fire a new808// Since Cgo doesn't allow calling back into Go, we actually fire a new
911// goroutine the very first time Init is called, and allow it to block809// goroutine the very first time Init is called, and allow it to block
912// in a pthread condition variable within a C function. This condition810// in a pthread condition variable within a C function. This condition
913// will only be notified once a zookeeper watch callback appends new811// will only be notified once a ZooKeeper watch callback appends new
914// entries to the event list. When this happens, the C function returns812// entries to the event list. When this happens, the C function returns
915// and we get back into Go land with the pointer to the watch data,813// and we get back into Go land with the pointer to the watch data,
916// including the watchId and other event details such as type and path.814// including the watchId and other event details such as type and path.
917815
918var watchMutex sync.Mutex816var watchMutex sync.Mutex
919var watchZooKeepers = make(map[uintptr]*ZooKeeper)817var watchConns = make(map[uintptr]*Conn)
920var watchCounter uintptr818var watchCounter uintptr
921var watchLoopCounter int819var watchLoopCounter int
922820
@@ -925,25 +823,20 @@
925// mostly as a debugging and testing aid.823// mostly as a debugging and testing aid.
926func CountPendingWatches() int {824func CountPendingWatches() int {
927 watchMutex.Lock()825 watchMutex.Lock()
928 count := len(watchZooKeepers)826 count := len(watchConns)
929 watchMutex.Unlock()827 watchMutex.Unlock()
930 return count828 return count
931}829}
932830
933// createWatch creates and registers a watch, returning the watch id831// createWatch creates a watch that will send events down the provided
934// and channel.832// channel, which must be one of the four possible event channel types.
935func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {833func (conn *Conn) createWatch(watchChannel statusChan) (watchId uintptr) {
936 buf := 1 // session/watch event
937 if session {
938 buf = 32
939 }
940 watchChannel = make(chan Event, buf)
941 watchMutex.Lock()834 watchMutex.Lock()
942 defer watchMutex.Unlock()835 defer watchMutex.Unlock()
943 watchId = watchCounter836 watchId = watchCounter
944 watchCounter += 1837 watchCounter += 1
945 zk.watchChannels[watchId] = watchChannel838 conn.watchChannels[watchId] = watchChannel
946 watchZooKeepers[watchId] = zk839 watchConns[watchId] = conn
947 return840 return
948}841}
949842
@@ -951,67 +844,71 @@
951// from ever getting delivered. It shouldn't be used if there's any844// from ever getting delivered. It shouldn't be used if there's any
952// chance the watch channel is still visible and not closed, since845// chance the watch channel is still visible and not closed, since
953// it might mean a goroutine would be blocked forever.846// it might mean a goroutine would be blocked forever.
954func (zk *ZooKeeper) forgetWatch(watchId uintptr) {847func (conn *Conn) forgetWatch(watchId uintptr) {
955 watchMutex.Lock()848 watchMutex.Lock()
956 defer watchMutex.Unlock()849 defer watchMutex.Unlock()
957 zk.watchChannels[watchId] = nil, false850 conn.watchChannels[watchId] = nil, false
958 watchZooKeepers[watchId] = nil, false851 watchConns[watchId] = nil, false
959}852}
960853
961// closeAllWatches closes all watch channels for zk.854// closeAllWatches closes all watch channels for conn.
962func (zk *ZooKeeper) closeAllWatches() {855func (conn *Conn) closeAllWatches() {
963 watchMutex.Lock()856 watchMutex.Lock()
964 defer watchMutex.Unlock()857 defer watchMutex.Unlock()
965 for watchId, ch := range zk.watchChannels {858 for watchId, ch := range conn.watchChannels {
966 close(ch)859 ch.close()
967 zk.watchChannels[watchId] = nil, false860 conn.watchChannels[watchId] = nil, false
968 watchZooKeepers[watchId] = nil, false861 watchConns[watchId] = nil, false
969 }862 }
970}863}
971864
972// sendEvent delivers the event to the watchId event channel. If the865// sendEvent delivers the event to the watchId event channel. If the
973// event channel is a watch event channel, the event is delivered,866// event channel is a watch event channel, the event is delivered,
974// the channel is closed, and resources are freed.867// the channel is closed, and resources are freed.
975func sendEvent(watchId uintptr, event Event) {868func sendEvent(watchId uintptr, eventType, connectionState int, path string) {
976 if event.State == STATE_CLOSED {869 if eventType == event_SESSION && connectionState == 0 {
977 panic("Attempted to send a CLOSED event")870 panic("Attempted to send a CLOSED connection status")
978 }871 }
979 watchMutex.Lock()872 watchMutex.Lock()
980 defer watchMutex.Unlock()873 defer watchMutex.Unlock()
981 zk, ok := watchZooKeepers[watchId]874 conn, ok := watchConns[watchId]
982 if !ok {875 if !ok {
983 return876 return
984 }877 }
985 if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId {878 ch := conn.watchChannels[watchId]
986 switch event.State {879 if ch == nil {
987 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:880 return
881 }
882 if eventType == event_SESSION && watchId != conn.sessionWatchId {
883 switch SessionStatus(connectionState) {
884 case SessionExpired, SessionAuthFailed:
885 // The channel will be closed below.
988 default:886 default:
887 // Ignore non-critical session events.
989 // WTF? Feels like TCP saying "dropped a dup packet, ok?"888 // WTF? Feels like TCP saying "dropped a dup packet, ok?"
990 return889 return
991 }890 }
992 }891 } else {
993 ch := zk.watchChannels[watchId]892 if eventType != event_SESSION && watchId == conn.sessionWatchId {
994 if ch == nil {893 panic(fmt.Errorf("unexpected non-session event %v %v", eventType, connectionState))
995 return894 }
996 }895 if !ch.send(eventType, connectionState, path) {
997 select {896 // Channel not available for sending, which means session
998 case ch <- event:897 // events are necessarily involved (trivial events go
999 default:898 // straight to the buffer), and the application isn't paying
1000 // Channel not available for sending, which means session899 // attention for long enough to have the buffer filled up.
1001 // events are necessarily involved (trivial events go900 // Break down now rather than leaking forever.
1002 // straight to the buffer), and the application isn't paying901 if watchId == conn.sessionWatchId {
1003 // attention for long enough to have the buffer filled up.902 panic("Session event channel buffer is full")
1004 // Break down now rather than leaking forever.903 } else {
1005 if watchId == zk.sessionWatchId {904 panic("Watch event channel buffer is full (impossible!)")
1006 panic("Session event channel buffer is full")905 }
1007 } else {906 }
1008 panic("Watch event channel buffer is full")907 }
1009 }908 if watchId != conn.sessionWatchId {
1010 }909 conn.watchChannels[watchId] = nil, false
1011 if watchId != zk.sessionWatchId {910 watchConns[watchId] = nil, false
1012 zk.watchChannels[watchId] = nil, false911 ch.close()
1013 watchZooKeepers[watchId] = nil, false
1014 close(ch)
1015 }912 }
1016}913}
1017914
@@ -1049,13 +946,8 @@
1049 for {946 for {
1050 // This will block until there's a watch event is available.947 // This will block until there's a watch event is available.
1051 data := C.wait_for_watch()948 data := C.wait_for_watch()
1052 event := Event{
1053 Type: int(data.event_type),
1054 Path: C.GoString(data.event_path),
1055 State: int(data.connection_state),
1056 }
1057 watchId := uintptr(data.watch_context)949 watchId := uintptr(data.watch_context)
950 sendEvent(watchId, int(data.event_type), int(data.connection_state), C.GoString(data.event_path))
1058 C.destroy_watch_data(data)951 C.destroy_watch_data(data)
1059 sendEvent(watchId, event)
1060 }952 }
1061}953}
1062954
=== renamed file 'gozk_test.go' => 'zk_test.go'
--- gozk_test.go 2011-08-19 01:51:37 +0000
+++ zk_test.go 2011-09-30 13:35:26 +0000
@@ -1,17 +1,17 @@
1package gozk_test1package zk_test
22
3import (3import (
4 . "launchpad.net/gocheck"4 . "launchpad.net/gocheck"
5 "gozk"5 "launchpad.net/gozk/zk"
6 "time"6 "time"
7)7)
88
9// This error will be delivered via C errno, since ZK unfortunately9// This error will be delivered via C errno, since ZK unfortunately
10// only provides the handler back from zookeeper_init().10// only provides the handler back from zookeeper_init().
11func (s *S) TestInitErrorThroughErrno(c *C) {11func (s *S) TestInitErrorThroughErrno(c *C) {
12 zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)12 conn, watch, err := zk.Dial("bad-domain-without-port", 5e9)
13 if zk != nil {13 if conn != nil {
14 zk.Close()14 conn.Close()
15 }15 }
16 if watch != nil {16 if watch != nil {
17 go func() {17 go func() {
@@ -23,15 +23,15 @@
23 }23 }
24 }()24 }()
25 }25 }
26 c.Assert(zk, IsNil)26 c.Assert(conn, IsNil)
27 c.Assert(watch, IsNil)27 c.Assert(watch, IsNil)
28 c.Assert(err, Matches, "invalid argument")28 c.Assert(err, Matches, "invalid argument")
29}29}
3030
31func (s *S) TestRecvTimeoutInitParameter(c *C) {31func (s *S) TestRecvTimeoutInitParameter(c *C) {
32 zk, watch, err := gozk.Init(s.zkAddr, 0)32 conn, watch, err := zk.Dial(s.zkAddr, 0)
33 c.Assert(err, IsNil)33 c.Assert(err, IsNil)
34 defer zk.Close()34 defer conn.Close()
3535
36 select {36 select {
37 case <-watch:37 case <-watch:
@@ -40,7 +40,7 @@
40 }40 }
4141
42 for i := 0; i != 1000; i++ {42 for i := 0; i != 1000; i++ {
43 _, _, err := zk.Get("/zookeeper")43 _, _, err := conn.Get("/zookeeper")
44 if err != nil {44 if err != nil {
45 c.Assert(err, Matches, "operation timeout")45 c.Assert(err, Matches, "operation timeout")
46 c.SucceedNow()46 c.SucceedNow()
@@ -51,88 +51,75 @@
51}51}
5252
53func (s *S) TestSessionWatches(c *C) {53func (s *S) TestSessionWatches(c *C) {
54 c.Assert(gozk.CountPendingWatches(), Equals, 0)54 c.Assert(zk.CountPendingWatches(), Equals, 0)
5555
56 zk1, watch1 := s.init(c)56 zk1, watch1 := s.init(c)
57 zk2, watch2 := s.init(c)57 zk2, watch2 := s.init(c)
58 zk3, watch3 := s.init(c)58 zk3, watch3 := s.init(c)
5959
60 c.Assert(gozk.CountPendingWatches(), Equals, 3)60 c.Assert(zk.CountPendingWatches(), Equals, 3)
6161
62 event1 := <-watch162 event1 := <-watch1
63 c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)63 c.Assert(event1, Equals, zk.SessionConnected)
64 c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)
6564
66 c.Assert(gozk.CountPendingWatches(), Equals, 3)65 c.Assert(zk.CountPendingWatches(), Equals, 3)
6766
68 event2 := <-watch267 event2 := <-watch2
69 c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)68 c.Assert(event2, Equals, zk.SessionConnected)
70 c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)
7169
72 c.Assert(gozk.CountPendingWatches(), Equals, 3)70 c.Assert(zk.CountPendingWatches(), Equals, 3)
7371
74 event3 := <-watch372 event3 := <-watch3
75 c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)73 c.Assert(event3, Equals, zk.SessionConnected)
76 c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)
7774
78 c.Assert(gozk.CountPendingWatches(), Equals, 3)75 c.Assert(zk.CountPendingWatches(), Equals, 3)
7976
80 zk1.Close()77 zk1.Close()
81 c.Assert(gozk.CountPendingWatches(), Equals, 2)78 c.Assert(zk.CountPendingWatches(), Equals, 2)
82 zk2.Close()79 zk2.Close()
83 c.Assert(gozk.CountPendingWatches(), Equals, 1)80 c.Assert(zk.CountPendingWatches(), Equals, 1)
84 zk3.Close()81 zk3.Close()
85 c.Assert(gozk.CountPendingWatches(), Equals, 0)82 c.Assert(zk.CountPendingWatches(), Equals, 0)
86}83}
8784
88// Gozk injects a STATE_CLOSED event when zk.Close() is called, right85// Gozk injects a STATE_CLOSED event when conn.Close() is called, right
89// before the channel is closed. Closing the channel injects a nil86// before the channel is closed. Closing the channel injects a nil
90// pointer, as usual for Go, so the STATE_CLOSED gives a chance to87// pointer, as usual for Go, so the STATE_CLOSED gives a chance to
91// know that a nil pointer is coming, and to stop the procedure.88// know that a nil pointer is coming, and to stop the procedure.
92// Hopefully this procedure will avoid some nil-pointer references by89// Hopefully this procedure will avoid some nil-pointer references by
93// mistake.90// mistake.
94func (s *S) TestClosingStateInSessionWatch(c *C) {91func (s *S) TestClosingStateInSessionWatch(c *C) {
95 zk, watch := s.init(c)92 conn, watch := s.init(c)
9693
97 event := <-watch94 event := <-watch
98 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)95 c.Assert(event, Equals, zk.SessionConnected)
99 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
10096
101 zk.Close()97 conn.Close()
102 event, ok := <-watch98 event, ok := <-watch
103 c.Assert(ok, Equals, false)99 c.Assert(ok, Equals, false)
104 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)100 c.Assert(event, Equals, zk.SessionStatus(0))
105 c.Assert(event.State, Equals, gozk.STATE_CLOSED)101}
106}102
107103var okTests = []struct {
108func (s *S) TestEventString(c *C) {104 zk.SessionStatus
109 var event gozk.Event105 Ok bool
110 event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}106}{
111 c.Assert(event, Matches, "ZooKeeper connected")107 {zk.SessionConnected, true},
112 event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}108 {0, false},
113 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")109 {zk.SessionExpired, false},
114 event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}110 {zk.SessionAuthFailed, false},
115 c.Assert(event, Matches, "ZooKeeper connection closed")
116}
117
118var okTests = []struct{gozk.Event; Ok bool}{
119 {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},
120 {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},
121 {gozk.Event{0, "", gozk.STATE_CLOSED}, false},
122 {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},
123 {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},
124}111}
125112
126func (s *S) TestEventOk(c *C) {113func (s *S) TestEventOk(c *C) {
127 for _, t := range okTests {114 for _, t := range okTests {
128 c.Assert(t.Event.Ok(), Equals, t.Ok)115 c.Assert(t.SessionStatus.Ok(), Equals, t.Ok)
129 }116 }
130}117}
131118
132func (s *S) TestGetAndStat(c *C) {119func (s *S) TestGetAndStat(c *C) {
133 zk, _ := s.init(c)120 conn, _ := s.init(c)
134121
135 data, stat, err := zk.Get("/zookeeper")122 data, stat, err := conn.Get("/zookeeper")
136 c.Assert(err, IsNil)123 c.Assert(err, IsNil)
137 c.Assert(data, Equals, "")124 c.Assert(data, Equals, "")
138 c.Assert(stat.Czxid(), Equals, int64(0))125 c.Assert(stat.Czxid(), Equals, int64(0))
@@ -149,58 +136,58 @@
149}136}
150137
151func (s *S) TestGetAndError(c *C) {138func (s *S) TestGetAndError(c *C) {
152 zk, _ := s.init(c)139 conn, _ := s.init(c)
153140
154 data, stat, err := zk.Get("/non-existent")141 data, stat, err := conn.Get("/non-existent")
155142
156 c.Assert(data, Equals, "")143 c.Assert(data, Equals, "")
157 c.Assert(stat, IsNil)144 c.Assert(stat, IsNil)
158 c.Assert(err, Matches, "no node")145 c.Assert(err, Matches, "no node")
159 c.Assert(err.Code(), Equals, gozk.ZNONODE)146 c.Assert(err, Equals, zk.ZNONODE)
160}147}
161148
162func (s *S) TestCreateAndGet(c *C) {149func (s *S) TestCreateAndGet(c *C) {
163 zk, _ := s.init(c)150 conn, _ := s.init(c)
164151
165 path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))152 path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
166 c.Assert(err, IsNil)153 c.Assert(err, IsNil)
167 c.Assert(path, Matches, "/test-[0-9]+")154 c.Assert(path, Matches, "/test-[0-9]+")
168155
169 // Check the error condition from Create().156 // Check the error condition from Create().
170 _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))157 _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
171 c.Assert(err, Matches, "node exists")158 c.Assert(err, Matches, "node exists")
172159
173 data, _, err := zk.Get(path)160 data, _, err := conn.Get(path)
174 c.Assert(err, IsNil)161 c.Assert(err, IsNil)
175 c.Assert(data, Equals, "bababum")162 c.Assert(data, Equals, "bababum")
176}163}
177164
178func (s *S) TestCreateSetAndGet(c *C) {165func (s *S) TestCreateSetAndGet(c *C) {
179 zk, _ := s.init(c)166 conn, _ := s.init(c)
180167
181 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))168 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
182 c.Assert(err, IsNil)169 c.Assert(err, IsNil)
183170
184 stat, err := zk.Set("/test", "bababum", -1) // Any version.171 stat, err := conn.Set("/test", "bababum", -1) // Any version.
185 c.Assert(err, IsNil)172 c.Assert(err, IsNil)
186 c.Assert(stat.Version(), Equals, int32(1))173 c.Assert(stat.Version(), Equals, int32(1))
187174
188 data, _, err := zk.Get("/test")175 data, _, err := conn.Get("/test")
189 c.Assert(err, IsNil)176 c.Assert(err, IsNil)
190 c.Assert(data, Equals, "bababum")177 c.Assert(data, Equals, "bababum")
191}178}
192179
193func (s *S) TestGetAndWatch(c *C) {180func (s *S) TestGetAndWatch(c *C) {
194 c.Check(gozk.CountPendingWatches(), Equals, 0)181 c.Check(zk.CountPendingWatches(), Equals, 0)
195182
196 zk, _ := s.init(c)183 conn, _ := s.init(c)
197184
198 c.Check(gozk.CountPendingWatches(), Equals, 1)185 c.Check(zk.CountPendingWatches(), Equals, 1)
199186
200 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))187 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
201 c.Assert(err, IsNil)188 c.Assert(err, IsNil)
202189
203 data, stat, watch, err := zk.GetW("/test")190 data, stat, watch, err := conn.GetW("/test")
204 c.Assert(err, IsNil)191 c.Assert(err, IsNil)
205 c.Assert(data, Equals, "one")192 c.Assert(data, Equals, "one")
206 c.Assert(stat.Version(), Equals, int32(0))193 c.Assert(stat.Version(), Equals, int32(0))
@@ -211,17 +198,17 @@
211 default:198 default:
212 }199 }
213200
214 c.Check(gozk.CountPendingWatches(), Equals, 2)201 c.Check(zk.CountPendingWatches(), Equals, 2)
215202
216 _, err = zk.Set("/test", "two", -1)203 _, err = conn.Set("/test", "two", -1)
217 c.Assert(err, IsNil)204 c.Assert(err, IsNil)
218205
219 event := <-watch206 event := <-watch
220 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)207 c.Assert(event, Equals, zk.NodeChanged)
221208
222 c.Check(gozk.CountPendingWatches(), Equals, 1)209 c.Check(zk.CountPendingWatches(), Equals, 1)
223210
224 data, _, watch, err = zk.GetW("/test")211 data, _, watch, err = conn.GetW("/test")
225 c.Assert(err, IsNil)212 c.Assert(err, IsNil)
226 c.Assert(data, Equals, "two")213 c.Assert(data, Equals, "two")
227214
@@ -231,86 +218,86 @@
231 default:218 default:
232 }219 }
233220
234 c.Check(gozk.CountPendingWatches(), Equals, 2)221 c.Check(zk.CountPendingWatches(), Equals, 2)
235222
236 _, err = zk.Set("/test", "three", -1)223 _, err = conn.Set("/test", "three", -1)
237 c.Assert(err, IsNil)224 c.Assert(err, IsNil)
238225
239 event = <-watch226 event = <-watch
240 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)227 c.Assert(event, Equals, zk.NodeChanged)
241228
242 c.Check(gozk.CountPendingWatches(), Equals, 1)229 c.Check(zk.CountPendingWatches(), Equals, 1)
243}230}
244231
245func (s *S) TestGetAndWatchWithError(c *C) {232func (s *S) TestGetAndWatchWithError(c *C) {
246 c.Check(gozk.CountPendingWatches(), Equals, 0)233 c.Check(zk.CountPendingWatches(), Equals, 0)
247234
248 zk, _ := s.init(c)235 conn, _ := s.init(c)
249236
250 c.Check(gozk.CountPendingWatches(), Equals, 1)237 c.Check(zk.CountPendingWatches(), Equals, 1)
251238
252 _, _, watch, err := zk.GetW("/test")239 _, _, watch, err := conn.GetW("/test")
253 c.Assert(err, NotNil)240 c.Assert(err, NotNil)
254 c.Assert(err.Code(), Equals, gozk.ZNONODE)241 c.Assert(err, Equals, zk.ZNONODE)
255 c.Assert(watch, IsNil)242 c.Assert(watch, IsNil)
256243
257 c.Check(gozk.CountPendingWatches(), Equals, 1)244 c.Check(zk.CountPendingWatches(), Equals, 1)
258}245}
259246
260func (s *S) TestCloseReleasesWatches(c *C) {247func (s *S) TestCloseReleasesWatches(c *C) {
261 c.Check(gozk.CountPendingWatches(), Equals, 0)248 c.Check(zk.CountPendingWatches(), Equals, 0)
262249
263 zk, _ := s.init(c)250 conn, _ := s.init(c)
264251
265 c.Check(gozk.CountPendingWatches(), Equals, 1)252 c.Check(zk.CountPendingWatches(), Equals, 1)
266253
267 _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))254 _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
268 c.Assert(err, IsNil)255 c.Assert(err, IsNil)
269256
270 _, _, _, err = zk.GetW("/test")257 _, _, _, err = conn.GetW("/test")
271 c.Assert(err, IsNil)258 c.Assert(err, IsNil)
272259
273 c.Assert(gozk.CountPendingWatches(), Equals, 2)260 c.Assert(zk.CountPendingWatches(), Equals, 2)
274261
275 zk.Close()262 conn.Close()
276263
277 c.Assert(gozk.CountPendingWatches(), Equals, 0)264 c.Assert(zk.CountPendingWatches(), Equals, 0)
278}265}
279266
280// By default, the ZooKeeper C client will hang indefinitely if a267// By default, the ZooKeeper C client will hang indefinitely if a
281// handler is closed twice. We get in the way and prevent it.268// handler is closed twice. We get in the way and prevent it.
282func (s *S) TestClosingTwiceDoesntHang(c *C) {269func (s *S) TestClosingTwiceDoesntHang(c *C) {
283 zk, _ := s.init(c)270 conn, _ := s.init(c)
284 err := zk.Close()271 err := conn.Close()
285 c.Assert(err, IsNil)272 c.Assert(err, IsNil)
286 err = zk.Close()273 err = conn.Close()
287 c.Assert(err, NotNil)274 c.Assert(err, NotNil)
288 c.Assert(err.Code(), Equals, gozk.ZCLOSING)275 c.Assert(err, Equals, zk.ZCLOSING)
289}276}
290277
291func (s *S) TestChildren(c *C) {278func (s *S) TestChildren(c *C) {
292 zk, _ := s.init(c)279 conn, _ := s.init(c)
293280
294 children, stat, err := zk.Children("/")281 children, stat, err := conn.Children("/")
295 c.Assert(err, IsNil)282 c.Assert(err, IsNil)
296 c.Assert(children, Equals, []string{"zookeeper"})283 c.Assert(children, Equals, []string{"zookeeper"})
297 c.Assert(stat.NumChildren(), Equals, int32(1))284 c.Assert(stat.NumChildren(), Equals, int32(1))
298285
299 children, stat, err = zk.Children("/non-existent")286 children, stat, err = conn.Children("/non-existent")
300 c.Assert(err, NotNil)287 c.Assert(err, NotNil)
301 c.Assert(err.Code(), Equals, gozk.ZNONODE)288 c.Assert(err, Equals, zk.ZNONODE)
302 c.Assert(children, Equals, []string{})289 c.Assert(children, Equals, []string{})
303 c.Assert(stat, Equals, nil)290 c.Assert(stat, IsNil)
304}291}
305292
306func (s *S) TestChildrenAndWatch(c *C) {293func (s *S) TestChildrenAndWatch(c *C) {
307 c.Check(gozk.CountPendingWatches(), Equals, 0)294 c.Check(zk.CountPendingWatches(), Equals, 0)
308295
309 zk, _ := s.init(c)296 conn, _ := s.init(c)
310297
311 c.Check(gozk.CountPendingWatches(), Equals, 1)298 c.Check(zk.CountPendingWatches(), Equals, 1)
312299
313 children, stat, watch, err := zk.ChildrenW("/")300 children, stat, watch, err := conn.ChildrenW("/")
314 c.Assert(err, IsNil)301 c.Assert(err, IsNil)
315 c.Assert(children, Equals, []string{"zookeeper"})302 c.Assert(children, Equals, []string{"zookeeper"})
316 c.Assert(stat.NumChildren(), Equals, int32(1))303 c.Assert(stat.NumChildren(), Equals, int32(1))
@@ -321,18 +308,17 @@
321 default:308 default:
322 }309 }
323310
324 c.Check(gozk.CountPendingWatches(), Equals, 2)311 c.Check(zk.CountPendingWatches(), Equals, 2)
325312
326 _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))313 _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
327 c.Assert(err, IsNil)314 c.Assert(err, IsNil)
328315
329 event := <-watch316 event := <-watch
330 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)317 c.Assert(event, Equals, zk.NodeChangedChild)
331 c.Assert(event.Path, Equals, "/")318
332319 c.Check(zk.CountPendingWatches(), Equals, 1)
333 c.Check(gozk.CountPendingWatches(), Equals, 1)320
334321 children, stat, watch, err = conn.ChildrenW("/")
335 children, stat, watch, err = zk.ChildrenW("/")
336 c.Assert(err, IsNil)322 c.Assert(err, IsNil)
337 c.Assert(stat.NumChildren(), Equals, int32(2))323 c.Assert(stat.NumChildren(), Equals, int32(2))
338324
@@ -345,57 +331,56 @@
345 default:331 default:
346 }332 }
347333
348 c.Check(gozk.CountPendingWatches(), Equals, 2)334 c.Check(zk.CountPendingWatches(), Equals, 2)
349335
350 _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))336 _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
351 c.Assert(err, IsNil)337 c.Assert(err, IsNil)
352338
353 event = <-watch339 event = <-watch
354 c.Assert(event.Type, Equals, gozk.EVENT_CHILD)340 c.Assert(event, Equals, zk.NodeChangedChild)
355341
356 c.Check(gozk.CountPendingWatches(), Equals, 1)342 c.Check(zk.CountPendingWatches(), Equals, 1)
357}343}
358344
359func (s *S) TestChildrenAndWatchWithError(c *C) {345func (s *S) TestChildrenAndWatchWithError(c *C) {
360 c.Check(gozk.CountPendingWatches(), Equals, 0)346 c.Check(zk.CountPendingWatches(), Equals, 0)
361347
362 zk, _ := s.init(c)348 conn, _ := s.init(c)
363349
364 c.Check(gozk.CountPendingWatches(), Equals, 1)350 c.Check(zk.CountPendingWatches(), Equals, 1)
365351
366 _, stat, watch, err := zk.ChildrenW("/test")352 _, stat, watch, err := conn.ChildrenW("/test")
367 c.Assert(err, NotNil)353 c.Assert(err, NotNil)
368 c.Assert(err.Code(), Equals, gozk.ZNONODE)354 c.Assert(err, Equals, zk.ZNONODE)
369 c.Assert(watch, IsNil)355 c.Assert(watch, IsNil)
370 c.Assert(stat, IsNil)356 c.Assert(stat, IsNil)
371357
372 c.Check(gozk.CountPendingWatches(), Equals, 1)358 c.Check(zk.CountPendingWatches(), Equals, 1)
373}359}
374360
375func (s *S) TestExists(c *C) {361func (s *S) TestExists(c *C) {
376 zk, _ := s.init(c)362 conn, _ := s.init(c)
377363
378 stat, err := zk.Exists("/zookeeper")364 stat, err := conn.Exists("/non-existent")
379 c.Assert(err, IsNil)
380 c.Assert(stat.NumChildren(), Equals, int32(1))
381
382 stat, err = zk.Exists("/non-existent")
383 c.Assert(err, IsNil)365 c.Assert(err, IsNil)
384 c.Assert(stat, IsNil)366 c.Assert(stat, IsNil)
367
368 stat, err = conn.Exists("/zookeeper")
369 c.Assert(err, IsNil)
385}370}
386371
387func (s *S) TestExistsAndWatch(c *C) {372func (s *S) TestExistsAndWatch(c *C) {
388 c.Check(gozk.CountPendingWatches(), Equals, 0)373 c.Check(zk.CountPendingWatches(), Equals, 0)
389374
390 zk, _ := s.init(c)375 conn, _ := s.init(c)
391376
392 c.Check(gozk.CountPendingWatches(), Equals, 1)377 c.Check(zk.CountPendingWatches(), Equals, 1)
393378
394 stat, watch, err := zk.ExistsW("/test")379 stat, watch, err := conn.ExistsW("/test")
395 c.Assert(err, IsNil)380 c.Assert(err, IsNil)
396 c.Assert(stat, IsNil)381 c.Assert(stat, IsNil)
397382
398 c.Check(gozk.CountPendingWatches(), Equals, 2)383 c.Check(zk.CountPendingWatches(), Equals, 2)
399384
400 select {385 select {
401 case <-watch:386 case <-watch:
@@ -403,62 +388,61 @@
403 default:388 default:
404 }389 }
405390
406 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))391 _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
407 c.Assert(err, IsNil)392 c.Assert(err, IsNil)
408393
409 event := <-watch394 event := <-watch
410 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)395 c.Assert(event, Equals, zk.NodeCreated)
411 c.Assert(event.Path, Equals, "/test")396
412397 c.Check(zk.CountPendingWatches(), Equals, 1)
413 c.Check(gozk.CountPendingWatches(), Equals, 1)398
414399 stat, watch, err = conn.ExistsW("/test")
415 stat, watch, err = zk.ExistsW("/test")
416 c.Assert(err, IsNil)400 c.Assert(err, IsNil)
417 c.Assert(stat, NotNil)401 c.Assert(stat, NotNil)
418 c.Assert(stat.NumChildren(), Equals, int32(0))402 c.Assert(stat.NumChildren(), Equals, int32(0))
419403
420 c.Check(gozk.CountPendingWatches(), Equals, 2)404 c.Check(zk.CountPendingWatches(), Equals, 2)
421}405}
422406
423func (s *S) TestExistsAndWatchWithError(c *C) {407func (s *S) TestExistsAndWatchWithError(c *C) {
424 c.Check(gozk.CountPendingWatches(), Equals, 0)408 c.Check(zk.CountPendingWatches(), Equals, 0)
425409
426 zk, _ := s.init(c)410 conn, _ := s.init(c)
427411
428 c.Check(gozk.CountPendingWatches(), Equals, 1)412 c.Check(zk.CountPendingWatches(), Equals, 1)
429413
430 stat, watch, err := zk.ExistsW("///")414 stat, watch, err := conn.ExistsW("///")
431 c.Assert(err, NotNil)415 c.Assert(err, NotNil)
432 c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)416 c.Assert(err, Equals, zk.ZBADARGUMENTS)
433 c.Assert(stat, IsNil)417 c.Assert(stat, IsNil)
434 c.Assert(watch, IsNil)418 c.Assert(watch, IsNil)
435419
436 c.Check(gozk.CountPendingWatches(), Equals, 1)420 c.Check(zk.CountPendingWatches(), Equals, 1)
437}421}
438422
439func (s *S) TestDelete(c *C) {423func (s *S) TestDelete(c *C) {
440 zk, _ := s.init(c)424 conn, _ := s.init(c)
441425
442 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))426 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
443 c.Assert(err, IsNil)427 c.Assert(err, IsNil)
444428
445 err = zk.Delete("/test", 5)429 err = conn.Delete("/test", 5)
446 c.Assert(err, NotNil)430 c.Assert(err, NotNil)
447 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)431 c.Assert(err, Equals, zk.ZBADVERSION)
448432
449 err = zk.Delete("/test", -1)433 err = conn.Delete("/test", -1)
450 c.Assert(err, IsNil)434 c.Assert(err, IsNil)
451435
452 err = zk.Delete("/test", -1)436 err = conn.Delete("/test", -1)
453 c.Assert(err, NotNil)437 c.Assert(err, NotNil)
454 c.Assert(err.Code(), Equals, gozk.ZNONODE)438 c.Assert(err, Equals, zk.ZNONODE)
455}439}
456440
457func (s *S) TestClientIdAndReInit(c *C) {441func (s *S) TestClientIdAndReInit(c *C) {
458 zk1, _ := s.init(c)442 zk1, _ := s.init(c)
459 clientId1 := zk1.ClientId()443 clientId1 := zk1.ClientId()
460444
461 zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)445 zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1)
462 c.Assert(err, IsNil)446 c.Assert(err, IsNil)
463 defer zk2.Close()447 defer zk2.Close()
464 clientId2 := zk2.ClientId()448 clientId2 := zk2.ClientId()
@@ -469,110 +453,113 @@
469// Surprisingly for some (including myself, initially), the watch453// Surprisingly for some (including myself, initially), the watch
470// returned by the exists method actually fires on data changes too.454// returned by the exists method actually fires on data changes too.
471func (s *S) TestExistsWatchOnDataChange(c *C) {455func (s *S) TestExistsWatchOnDataChange(c *C) {
472 zk, _ := s.init(c)456 conn, _ := s.init(c)
473457
474 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))458 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
475 c.Assert(err, IsNil)459 c.Assert(err, IsNil)
476460
477 _, watch, err := zk.ExistsW("/test")461 _, watch, err := conn.ExistsW("/test")
478 c.Assert(err, IsNil)462 c.Assert(err, IsNil)
479463
480 _, err = zk.Set("/test", "new", -1)464 _, err = conn.Set("/test", "new", -1)
481 c.Assert(err, IsNil)465 c.Assert(err, IsNil)
482466
483 event := <-watch467 event := <-watch
484468
485 c.Assert(event.Path, Equals, "/test")469 c.Assert(event, Equals, zk.NodeChanged)
486 c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
487}470}
488471
489func (s *S) TestACL(c *C) {472func (s *S) TestACL(c *C) {
490 zk, _ := s.init(c)473 conn, _ := s.init(c)
491474
492 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))475 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
493 c.Assert(err, IsNil)476 c.Assert(err, IsNil)
494477
495 acl, stat, err := zk.ACL("/test")478 acl, stat, err := conn.ACL("/test")
496 c.Assert(err, IsNil)479 c.Assert(err, IsNil)
497 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))480 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
498 c.Assert(stat, NotNil)481 c.Assert(stat, NotNil)
499 c.Assert(stat.Version(), Equals, int32(0))482 c.Assert(stat.Version(), Equals, int32(0))
500483
501 acl, stat, err = zk.ACL("/non-existent")484 acl, stat, err = conn.ACL("/non-existent")
502 c.Assert(err, NotNil)485 c.Assert(err, NotNil)
503 c.Assert(err.Code(), Equals, gozk.ZNONODE)486 c.Assert(err, Equals, zk.ZNONODE)
504 c.Assert(acl, IsNil)487 c.Assert(acl, IsNil)
505 c.Assert(stat, IsNil)488 c.Assert(stat, IsNil)
489 c.Log("finished TestACL")
506}490}
507491
508func (s *S) TestSetACL(c *C) {492func (s *S) TestSetACL(c *C) {
509 zk, _ := s.init(c)493 conn, _ := s.init(c)
510494
511 _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))495 _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
512 c.Assert(err, IsNil)496 c.Assert(err, IsNil)
513497
514 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)498 err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5)
515 c.Assert(err, NotNil)499 c.Assert(err, NotNil)
516 c.Assert(err.Code(), Equals, gozk.ZBADVERSION)500 c.Assert(err, Equals, zk.ZBADVERSION)
517501
518 err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)502 err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1)
519 c.Assert(err, IsNil)503 c.Assert(err, IsNil)
520504
521 acl, _, err := zk.ACL("/test")505 acl, _, err := conn.ACL("/test")
522 c.Assert(err, IsNil)506 c.Assert(err, IsNil)
523 c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))507 c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
524}508}
525509
526func (s *S) TestAddAuth(c *C) {510func (s *S) TestAddAuth(c *C) {
527 zk, _ := s.init(c)511 conn, _ := s.init(c)
528512
529 acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}513 acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
530514
531 _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)515 _, err := conn.Create("/test", "", zk.EPHEMERAL, acl)
532 c.Assert(err, IsNil)516 c.Assert(err, IsNil)
533517
534 _, _, err = zk.Get("/test")518 _, _, err = conn.Get("/test")
535 c.Assert(err, NotNil)519 c.Assert(err, NotNil)
536 c.Assert(err.Code(), Equals, gozk.ZNOAUTH)520 c.Assert(err, Equals, zk.ZNOAUTH)
537521
538 err = zk.AddAuth("digest", "joe:passwd")522 err = conn.AddAuth("digest", "joe:passwd")
539 c.Assert(err, IsNil)523 c.Assert(err, IsNil)
540524
541 _, _, err = zk.Get("/test")525 _, _, err = conn.Get("/test")
542 c.Assert(err, IsNil)526 c.Assert(err, IsNil)
543}527}
544528
545func (s *S) TestWatchOnReconnection(c *C) {529func (s *S) TestWatchOnReconnection(c *C) {
546 c.Check(gozk.CountPendingWatches(), Equals, 0)530 c.Check(zk.CountPendingWatches(), Equals, 0)
547531
548 zk, session := s.init(c)532 conn, session := s.init(c)
549533
550 event := <-session534 event := <-session
551 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)535 c.Assert(event, Equals, zk.SessionConnected)
552 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)536
553537 c.Check(zk.CountPendingWatches(), Equals, 1)
554 c.Check(gozk.CountPendingWatches(), Equals, 1)538
555539 stat, watch, err := conn.ExistsW("/test")
556 stat, watch, err := zk.ExistsW("/test")
557 c.Assert(err, IsNil)540 c.Assert(err, IsNil)
558 c.Assert(stat, IsNil)541 c.Assert(stat, IsNil)
559542
560 c.Check(gozk.CountPendingWatches(), Equals, 2)543 c.Check(zk.CountPendingWatches(), Equals, 2)
561544
562 s.StopZK()545 err = s.zkServer.Stop()
546 c.Assert(err, IsNil)
547
563 time.Sleep(2e9)548 time.Sleep(2e9)
564 s.StartZK()549
565550 err = s.zkServer.Start()
566 // The session channel should receive the reconnection notification,551 c.Assert(err, IsNil)
552
553 // The session channel should receive the reconnection notification.
567 select {554 select {
568 case event := <-session:555 case event := <-session:
569 c.Assert(event.State, Equals, gozk.STATE_CONNECTING)556 c.Assert(event, Equals, zk.SessionConnecting)
570 case <-time.After(3e9):557 case <-time.After(3e9):
571 c.Fatal("Session watch didn't fire")558 c.Fatal("Session watch didn't fire")
572 }559 }
573 select {560 select {
574 case event := <-session:561 case event := <-session:
575 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)562 c.Assert(event, Equals, zk.SessionConnected)
576 case <-time.After(3e9):563 case <-time.After(3e9):
577 c.Fatal("Session watch didn't fire")564 c.Fatal("Session watch didn't fire")
578 }565 }
@@ -585,40 +572,38 @@
585 }572 }
586573
587 // And it should still work.574 // And it should still work.
588 _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))575 _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
589 c.Assert(err, IsNil)576 c.Assert(err, IsNil)
590577
591 event = <-watch578 nodeStatus := <-watch
592 c.Assert(event.Type, Equals, gozk.EVENT_CREATED)579 c.Assert(nodeStatus, Equals, zk.NodeCreated)
593 c.Assert(event.Path, Equals, "/test")
594580
595 c.Check(gozk.CountPendingWatches(), Equals, 1)581 c.Check(zk.CountPendingWatches(), Equals, 1)
596}582}
597583
598func (s *S) TestWatchOnSessionExpiration(c *C) {584func (s *S) TestWatchOnSessionExpiration(c *C) {
599 c.Check(gozk.CountPendingWatches(), Equals, 0)585 c.Check(zk.CountPendingWatches(), Equals, 0)
600586
601 zk, session := s.init(c)587 conn, session := s.init(c)
602588
603 event := <-session589 event := <-session
604 c.Assert(event.Type, Equals, gozk.EVENT_SESSION)590 c.Assert(event, Equals, zk.SessionConnected)
605 c.Assert(event.State, Equals, gozk.STATE_CONNECTED)591
606592 c.Check(zk.CountPendingWatches(), Equals, 1)
607 c.Check(gozk.CountPendingWatches(), Equals, 1)593
608594 stat, watch, err := conn.ExistsW("/test")
609 stat, watch, err := zk.ExistsW("/test")
610 c.Assert(err, IsNil)595 c.Assert(err, IsNil)
611 c.Assert(stat, IsNil)596 c.Assert(stat, IsNil)
612597
613 c.Check(gozk.CountPendingWatches(), Equals, 2)598 c.Check(zk.CountPendingWatches(), Equals, 2)
614599
615 // Use expiration trick described in the FAQ.600 // Use expiration trick described in the FAQ.
616 clientId := zk.ClientId()601 clientId := conn.ClientId()
617 zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)602 zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId)
618603
619 for event := range session2 {604 for event := range session2 {
620 c.Log("Event from overlapping session: ", event)605 c.Log("Event from overlapping session: ", event)
621 if event.State == gozk.STATE_CONNECTED {606 if event == zk.SessionConnected {
622 // Wait for zk to process the connection.607 // Wait for zk to process the connection.
623 // Not reliable without this. :-(608 // Not reliable without this. :-(
624 time.Sleep(1e9)609 time.Sleep(1e9)
@@ -627,21 +612,21 @@
627 }612 }
628 for event := range session {613 for event := range session {
629 c.Log("Event from primary session: ", event)614 c.Log("Event from primary session: ", event)
630 if event.State == gozk.STATE_EXPIRED_SESSION {615 if event == zk.SessionExpired {
631 break616 break
632 }617 }
633 }618 }
634619
635 select {620 select {
636 case event := <-watch:621 case event, ok := <-watch:
637 c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)622 c.Assert(event, Equals, zk.NodeStatus(0))
623 c.Assert(ok, Equals, false)
638 case <-time.After(3e9):624 case <-time.After(3e9):
639 c.Fatal("Watch event didn't fire")625 c.Fatal("Watch event didn't fire")
640 }626 }
641627
642 event = <-watch628 nodeStatus := <-watch
643 c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)629 c.Assert(nodeStatus, Equals, zk.NodeStatus(0))
644 c.Assert(event.State, Equals, gozk.STATE_CLOSED)
645630
646 c.Check(gozk.CountPendingWatches(), Equals, 1)631 c.Check(zk.CountPendingWatches(), Equals, 1)
647}632}

Subscribers

People subscribed via source and target branches

to all changes: