Merge lp:~rogpeppe/gozk/update-event-interface into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Work in progress
Proposed branch: lp:~rogpeppe/gozk/update-event-interface
Merge into: lp:~juju/gozk/trunk
Diff against target: 3594 lines (+1527/-1001)
12 files modified
Makefile (+6/-2)
example/example.go (+22/-23)
helpers.c (+2/-2)
reattach_test.go (+202/-0)
retry_test.go (+65/-74)
server.go (+239/-0)
service/Makefile (+20/-0)
service/service.go (+160/-0)
status.go (+116/-0)
suite_test.go (+56/-138)
zk.go (+381/-489)
zk_test.go (+258/-273)
To merge this branch: bzr merge lp:~rogpeppe/gozk/update-event-interface
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Disapprove
Review via email: mp+77560@code.launchpad.net

Description of the change

Updated event interface and factored-out server-starting interface.

Event interface
-----------

The various functions returning event channels are changed
to return channels that more accurately reflect the kinds
of notifications that can happen on those channels.
This makes programming with the wait channels less error-prone
(no need worry about the wrong kind of event arriving
on a wait channel) and more self-documenting.

The Event type is removed:
- the Type field is now redundant, as only one kind of event can arrive through any given channel.
- the Path field was always redundant, as it is always the same as the node that is being watched.
- the value of the State field is now sent to the session watcher.

When the server goes down, watch channels are simply closed.
I can't think of any scenario when a node watcher will do something
differently depending on how the server has failed, and if it wants
to, it can always interrogate whatever is watching the session notifications.

Likely points of controversy:

- the spelling of the new constants. I chose mixed case rather than
all-caps - YMMV.

- i'd like to think of a better spelling for event_CREATED etc,
now that those constants no longer need exporting.

- use of explicit zero rather than having a zero constant defined
for each type. I *think* that it's less confusing and simpler this
way than having NodeClosed (NodeDown), ChildClosed etc.
Can a session ever go down permanently?

Server-starting interface
-------------------

I realised that server.go has increased significantly in size and
that all the new code was logically independent of ZooKeeper itself.
This inspired me to factor out the code into a new package (currently
gozk/zk/service which will potentially enable similar server packages
at little cost. There are a number of other enhancements that can now
be made to this package without increasing the zk package's complexity.

I think the new packaging works well, but it's arguable whether zookeeper
clients should use the service package directly or whether zk should
return *service.Service itself.

One specific advantage of the former (in addition to reusability and
nicer modularity) is that zookeeper clients can query or set aspects
of the zk.Server; the advantage of the latter is that it adds a line of
code and requires clients to know about another package.

YMMV as always.

To post a comment you must log in.
21. By Roger Peppe

Rewrite Event comment.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

2692 - event := Event{
2693 - Type: int(data.event_type),
2694 - Path: C.GoString(data.event_path),
2695 - State: int(data.connection_state),
2696 - }

The key problem with this proposal is the trashing of information coming
from the server. It's discarding not only why e.g. a state is being
trashed, but also details like the path which is related to an event.
This path isn't just being returned from the path that called the watch.
It's actually part of the wire protocol, and comes from the server side.

Trashing both the session state error information and the paths like that
is not a good idea.

review: Disapprove
22. By Roger Peppe

merged ChildStatus into NodeStatus.

23. By Roger Peppe

gofmt

24. By Roger Peppe

Fixed comments. Added LOG_NONE.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

On 29 September 2011 18:56, Gustavo Niemeyer <email address hidden> wrote:
> Review: Disapprove
>
>
> 2692    -               event := Event{
> 2693    -                       Type:  int(data.event_type),
> 2694    -                       Path:  C.GoString(data.event_path),
> 2695    -                       State: int(data.connection_state),
> 2696    -               }
>
> The key problem with this proposal is the trashing of information coming
> from the server. It's discarding not only why e.g. a state is being
> trashed, but also details like the path which is related to an event.
> This path isn't just being returned from the path that called the watch.
> It's actually part of the wire protocol, and comes from the server side.
>
> Trashing both the session state error information and the paths like that
> is not a good idea.

there is no information in the paths. they are always the same as the path
of the request. if this ever changes, then i'd fully support adding the path,
but as it is Event.Path gives a misleading impression (for instance that Path
might name a child that has been added).

we are trashing no information from the server. if we are then we
should make sure that we panic immediately because the server is doing
something unexpected. [i've just added the requisite check].
the exact specification, that is, what callbacks may be made for a given
API call, is unclear because explanation seems to be omitted in
the ZooKeeper manual itself. i think we owe it to gozk's users to provide
a better defined interface, as you will need to know the details of
the interface
to use the API, and so people will rely on the undocumented behaviour
anyway.

PTAL. the latest version has only two kind of events - node changed events
and session status events. i really think this interface makes sense.

please reconsider.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Let's have a single thread on this topic, please.

As I answered to your email:

>> The point isn't C or Go.. the issue is simply that there's real
>> information coming through the pipe being trashed in the suggested
>> interface.
>
> there really isn't.

The protocol provides three details to the watch. Two of them are
being dropped on the floor in your branch and not reaching the event
listener. If you disagree with this we wont' be able to agree on
anything else around that change.

Unmerged revisions

24. By Roger Peppe

Fixed comments. Added LOG_NONE.

23. By Roger Peppe

gofmt

22. By Roger Peppe

merged ChildStatus into NodeStatus.

21. By Roger Peppe

Rewrite Event comment.

20. By Roger Peppe

merged changes to rename branch.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile'
2--- Makefile 2011-08-03 01:56:58 +0000
3+++ Makefile 2011-09-30 13:35:26 +0000
4@@ -2,10 +2,14 @@
5
6 all: package
7
8-TARG=gozk
9+TARG=launchpad.net/gozk/zk
10
11+GOFILES=\
12+ server.go\
13+ status.go\
14+
15 CGOFILES=\
16- gozk.go\
17+ zk.go\
18
19 CGO_OFILES=\
20 helpers.o\
21
22=== added directory 'example'
23=== renamed file 'example.go' => 'example/example.go'
24--- example.go 2011-08-03 01:47:25 +0000
25+++ example/example.go 2011-09-30 13:35:26 +0000
26@@ -1,30 +1,29 @@
27 package main
28
29 import (
30- "gozk"
31+ "launchpad.net/gozk/zk"
32 )
33
34 func main() {
35- zk, session, err := gozk.Init("localhost:2181", 5000)
36- if err != nil {
37- println("Couldn't connect: " + err.String())
38- return
39- }
40-
41- defer zk.Close()
42-
43- // Wait for connection.
44- event := <-session
45- if event.State != gozk.STATE_CONNECTED {
46- println("Couldn't connect")
47- return
48- }
49-
50- _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))
51- if err != nil {
52- println(err.String())
53- } else {
54- println("Created!")
55- }
56+ conn, session, err := zk.Dial("localhost:2181", 5e9)
57+ if err != nil {
58+ println("Couldn't connect: " + err.String())
59+ return
60+ }
61+
62+ defer conn.Close()
63+
64+ // Wait for connection.
65+ event := <-session
66+ if event != zk.SessionConnected {
67+ println("Couldn't connect")
68+ return
69+ }
70+
71+ _, err = conn.Create("/counter", "0", 0, zk.WorldACL(zk.PERM_ALL))
72+ if err != nil {
73+ println(err.String())
74+ } else {
75+ println("Created!")
76+ }
77 }
78-
79
80=== modified file 'helpers.c'
81--- helpers.c 2011-01-11 12:58:26 +0000
82+++ helpers.c 2011-09-30 13:35:26 +0000
83@@ -4,7 +4,6 @@
84 #include <string.h>
85 #include "helpers.h"
86
87-
88 static pthread_mutex_t watch_mutex = PTHREAD_MUTEX_INITIALIZER;
89 static pthread_cond_t watch_available = PTHREAD_COND_INITIALIZER;
90
91@@ -38,8 +37,9 @@
92 pthread_mutex_lock(&watch_mutex);
93 {
94 watch_data *data = malloc(sizeof(watch_data)); // XXX Check data.
95+
96+ data->event_type = event_type;
97 data->connection_state = connection_state;
98- data->event_type = event_type;
99 data->event_path = strdup(event_path); // XXX Check event_path.
100 data->watch_context = watch_context;
101 data->next = NULL;
102
103=== added file 'reattach_test.go'
104--- reattach_test.go 1970-01-01 00:00:00 +0000
105+++ reattach_test.go 2011-09-30 13:35:26 +0000
106@@ -0,0 +1,202 @@
107+package zk_test
108+
109+import (
110+ "bufio"
111+ . "launchpad.net/gocheck"
112+ "launchpad.net/gozk/zk/service"
113+ "launchpad.net/gozk/zk"
114+ "exec"
115+ "flag"
116+ "fmt"
117+ "os"
118+ "strings"
119+ "testing"
120+ "time"
121+)
122+
123+var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing")
124+var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing")
125+var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing")
126+
127+// This is the reentrancy point for testing ZooKeeper servers
128+// started by processes that are not direct children of the
129+// testing process. This test always succeeds - the status
130+// will be written to stdout and read by indirectServer.
131+func TestStartNonChildServer(t *testing.T) {
132+ if !*reattach {
133+ // not re-entrant, so ignore this test.
134+ return
135+ }
136+ err := startServer(*reattachRunDir, *reattachAbnormalStop)
137+ if err != nil {
138+ fmt.Printf("error:%v\n", err)
139+ return
140+ }
141+ fmt.Printf("done\n")
142+}
143+
144+func (s *S) startServer(c *C, abort bool) {
145+ err := startServer(s.zkTestRoot, abort)
146+ c.Assert(err, IsNil)
147+}
148+
149+// startServerIndirect starts a ZooKeeper server that is not
150+// a direct child of the current process. If abort is true,
151+// the server will be started and then terminated abnormally.
152+func (s *S) startServerIndirect(c *C, abort bool) {
153+ if len(os.Args) == 0 {
154+ c.Fatal("Cannot find self executable name")
155+ }
156+ cmd := exec.Command(
157+ os.Args[0],
158+ "-zktest.reattach",
159+ "-zktest.rundir", s.zkTestRoot,
160+ "-zktest.stop=", fmt.Sprint(abort),
161+ "-test.run", "StartNonChildServer",
162+ )
163+ r, err := cmd.StdoutPipe()
164+ c.Assert(err, IsNil)
165+ defer r.Close()
166+ if err := cmd.Start(); err != nil {
167+ c.Fatalf("cannot start re-entrant gotest process: %v", err)
168+ }
169+ defer cmd.Wait()
170+ bio := bufio.NewReader(r)
171+ for {
172+ line, err := bio.ReadSlice('\n')
173+ if err != nil {
174+ c.Fatalf("indirect server status line not found: %v", err)
175+ }
176+ s := string(line)
177+ if strings.HasPrefix(s, "error:") {
178+ c.Fatalf("indirect server error: %s", s[len("error:"):])
179+ }
180+ if s == "done\n" {
181+ return
182+ }
183+ }
184+ panic("not reached")
185+}
186+
187+// startServer starts a ZooKeeper server, and terminates it abnormally
188+// if abort is true.
189+func startServer(runDir string, abort bool) os.Error {
190+ s, err := zk.AttachServer(runDir)
191+ if err != nil {
192+ return fmt.Errorf("cannot attach to server at %q: %v", runDir, err)
193+ }
194+ srv := service.New(s)
195+ if err := srv.Start(); err != nil {
196+ return fmt.Errorf("cannot start server: %v", err)
197+ }
198+ if abort {
199+ // Give it time to start up, then kill the server process abnormally,
200+ // leaving the pid.txt file behind.
201+ time.Sleep(0.5e9)
202+ p, err := srv.Process()
203+ if err != nil {
204+ return fmt.Errorf("cannot get server process: %v", err)
205+ }
206+ defer p.Release()
207+ if err := p.Kill(); err != nil {
208+ return fmt.Errorf("cannot kill server process: %v", err)
209+ }
210+ }
211+ return nil
212+}
213+
214+func (s *S) checkCookie(c *C) {
215+ conn, watch, err := zk.Dial(s.zkAddr, 5e9)
216+ c.Assert(err, IsNil)
217+
218+ e, ok := <-watch
219+ c.Assert(ok, Equals, true)
220+ c.Assert(e.Ok(), Equals, true)
221+
222+ c.Assert(err, IsNil)
223+ cookie, _, err := conn.Get("/testAttachCookie")
224+ c.Assert(err, IsNil)
225+ c.Assert(cookie, Equals, "testAttachCookie")
226+ conn.Close()
227+}
228+
229+// cases to test:
230+// child server, stopped normally; reattach, start
231+// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
232+// non-direct child server, still running; reattach, start (->error), stop, start
233+// child server, still running; reattach, start (-> error)
234+// child server, still running; reattach, stop, start.
235+// non-direct child server, still running; reattach, stop, start.
236+func (s *S) TestAttachServer(c *C) {
237+ // Create a cookie so that we know we are reattaching to the same instance.
238+ conn, _ := s.init(c)
239+ _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL))
240+ c.Assert(err, IsNil)
241+ s.checkCookie(c)
242+ s.zkServer.Stop()
243+ s.zkServer = nil
244+
245+ s.testAttachServer(c, (*S).startServer)
246+ s.testAttachServer(c, (*S).startServerIndirect)
247+ s.testAttachServerAbnormalTerminate(c, (*S).startServer)
248+ s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect)
249+
250+ srv, err := zk.AttachServer(s.zkTestRoot)
251+ c.Assert(err, IsNil)
252+
253+ s.zkServer = service.New(srv)
254+ err = s.zkServer.Start()
255+ c.Assert(err, IsNil)
256+
257+ conn, _ = s.init(c)
258+ err = conn.Delete("/testAttachCookie", -1)
259+ c.Assert(err, IsNil)
260+}
261+
262+func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) {
263+ start(s, c, false)
264+
265+ s.checkCookie(c)
266+
267+ // try attaching to it while it is still running - it should fail.
268+ a, err := zk.AttachServer(s.zkTestRoot)
269+ c.Assert(err, IsNil)
270+ srv := service.New(a)
271+
272+ err = srv.Start()
273+ c.Assert(err, NotNil)
274+
275+ // stop it and then start it again - it should succeed.
276+ err = srv.Stop()
277+ c.Assert(err, IsNil)
278+
279+ err = srv.Start()
280+ c.Assert(err, IsNil)
281+
282+ s.checkCookie(c)
283+
284+ err = srv.Stop()
285+ c.Assert(err, IsNil)
286+}
287+
288+func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) {
289+ start(s, c, true)
290+
291+ // try attaching to it and starting - it should fail, because pid.txt
292+ // won't have been removed.
293+ a, err := zk.AttachServer(s.zkTestRoot)
294+ c.Assert(err, IsNil)
295+ srv := service.New(a)
296+ err = srv.Start()
297+ c.Assert(err, NotNil)
298+
299+ // stopping it should bring things back to normal.
300+ err = srv.Stop()
301+ c.Assert(err, IsNil)
302+ err = srv.Start()
303+ c.Assert(err, IsNil)
304+
305+ s.checkCookie(c)
306+ err = srv.Stop()
307+ c.Assert(err, IsNil)
308+}
309
310=== modified file 'retry_test.go'
311--- retry_test.go 2011-08-19 01:51:37 +0000
312+++ retry_test.go 2011-09-30 13:35:26 +0000
313@@ -1,42 +1,41 @@
314-package gozk_test
315+package zk_test
316
317 import (
318 . "launchpad.net/gocheck"
319- "gozk"
320+ "launchpad.net/gozk/zk"
321 "os"
322 )
323
324 func (s *S) TestRetryChangeCreating(c *C) {
325- zk, _ := s.init(c)
326+ conn, _ := s.init(c)
327
328- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
329- func(data string, stat gozk.Stat) (string, os.Error) {
330+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
331+ func(data string, stat *zk.Stat) (string, os.Error) {
332 c.Assert(data, Equals, "")
333 c.Assert(stat, IsNil)
334 return "new", nil
335 })
336 c.Assert(err, IsNil)
337
338- data, stat, err := zk.Get("/test")
339+ data, stat, err := conn.Get("/test")
340 c.Assert(err, IsNil)
341 c.Assert(stat, NotNil)
342 c.Assert(stat.Version(), Equals, int32(0))
343 c.Assert(data, Equals, "new")
344
345- acl, _, err := zk.ACL("/test")
346+ acl, _, err := conn.ACL("/test")
347 c.Assert(err, IsNil)
348- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
349+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
350 }
351
352 func (s *S) TestRetryChangeSetting(c *C) {
353- zk, _ := s.init(c)
354+ conn, _ := s.init(c)
355
356- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
357- gozk.WorldACL(gozk.PERM_ALL))
358+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
359 c.Assert(err, IsNil)
360
361- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},
362- func(data string, stat gozk.Stat) (string, os.Error) {
363+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
364+ func(data string, stat *zk.Stat) (string, os.Error) {
365 c.Assert(data, Equals, "old")
366 c.Assert(stat, NotNil)
367 c.Assert(stat.Version(), Equals, int32(0))
368@@ -44,27 +43,26 @@
369 })
370 c.Assert(err, IsNil)
371
372- data, stat, err := zk.Get("/test")
373+ data, stat, err := conn.Get("/test")
374 c.Assert(err, IsNil)
375 c.Assert(stat, NotNil)
376 c.Assert(stat.Version(), Equals, int32(1))
377 c.Assert(data, Equals, "brand new")
378
379 // ACL was unchanged by RetryChange().
380- acl, _, err := zk.ACL("/test")
381+ acl, _, err := conn.ACL("/test")
382 c.Assert(err, IsNil)
383- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
384+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
385 }
386
387 func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
388- zk, _ := s.init(c)
389+ conn, _ := s.init(c)
390
391- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
392- gozk.WorldACL(gozk.PERM_ALL))
393+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
394 c.Assert(err, IsNil)
395
396- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},
397- func(data string, stat gozk.Stat) (string, os.Error) {
398+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
399+ func(data string, stat *zk.Stat) (string, os.Error) {
400 c.Assert(data, Equals, "old")
401 c.Assert(stat, NotNil)
402 c.Assert(stat.Version(), Equals, int32(0))
403@@ -72,7 +70,7 @@
404 })
405 c.Assert(err, IsNil)
406
407- data, stat, err := zk.Get("/test")
408+ data, stat, err := conn.Get("/test")
409 c.Assert(err, IsNil)
410 c.Assert(stat, NotNil)
411 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!
412@@ -80,14 +78,14 @@
413 }
414
415 func (s *S) TestRetryChangeConflictOnCreate(c *C) {
416- zk, _ := s.init(c)
417+ conn, _ := s.init(c)
418
419- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
420+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
421 switch data {
422 case "":
423 c.Assert(stat, IsNil)
424- _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,
425- gozk.WorldACL(gozk.PERM_ALL))
426+ _, err := conn.Create("/test", "conflict", zk.EPHEMERAL,
427+ zk.WorldACL(zk.PERM_ALL))
428 c.Assert(err, IsNil)
429 return "<none> => conflict", nil
430 case "conflict":
431@@ -100,11 +98,10 @@
432 return "can't happen", nil
433 }
434
435- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
436- changeFunc)
437+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc)
438 c.Assert(err, IsNil)
439
440- data, stat, err := zk.Get("/test")
441+ data, stat, err := conn.Get("/test")
442 c.Assert(err, IsNil)
443 c.Assert(data, Equals, "conflict => new")
444 c.Assert(stat, NotNil)
445@@ -112,18 +109,17 @@
446 }
447
448 func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
449- zk, _ := s.init(c)
450+ conn, _ := s.init(c)
451
452- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
453- gozk.WorldACL(gozk.PERM_ALL))
454+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
455 c.Assert(err, IsNil)
456
457- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
458+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
459 switch data {
460 case "old":
461 c.Assert(stat, NotNil)
462 c.Assert(stat.Version(), Equals, int32(0))
463- _, err := zk.Set("/test", "conflict", 0)
464+ _, err := conn.Set("/test", "conflict", 0)
465 c.Assert(err, IsNil)
466 return "old => new", nil
467 case "conflict":
468@@ -136,10 +132,10 @@
469 return "can't happen", nil
470 }
471
472- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)
473+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc)
474 c.Assert(err, IsNil)
475
476- data, stat, err := zk.Get("/test")
477+ data, stat, err := conn.Get("/test")
478 c.Assert(err, IsNil)
479 c.Assert(data, Equals, "conflict => new")
480 c.Assert(stat, NotNil)
481@@ -147,18 +143,17 @@
482 }
483
484 func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
485- zk, _ := s.init(c)
486+ conn, _ := s.init(c)
487
488- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
489- gozk.WorldACL(gozk.PERM_ALL))
490+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
491 c.Assert(err, IsNil)
492
493- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
494+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
495 switch data {
496 case "old":
497 c.Assert(stat, NotNil)
498 c.Assert(stat.Version(), Equals, int32(0))
499- err := zk.Delete("/test", 0)
500+ err := conn.Delete("/test", 0)
501 c.Assert(err, IsNil)
502 return "old => <deleted>", nil
503 case "":
504@@ -170,55 +165,53 @@
505 return "can't happen", nil
506 }
507
508- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),
509- changeFunc)
510+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc)
511 c.Assert(err, IsNil)
512
513- data, stat, err := zk.Get("/test")
514+ data, stat, err := conn.Get("/test")
515 c.Assert(err, IsNil)
516 c.Assert(data, Equals, "<deleted> => new")
517 c.Assert(stat, NotNil)
518 c.Assert(stat.Version(), Equals, int32(0))
519
520 // Should be the new ACL.
521- acl, _, err := zk.ACL("/test")
522+ acl, _, err := conn.ACL("/test")
523 c.Assert(err, IsNil)
524- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))
525+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
526 }
527
528 func (s *S) TestRetryChangeErrorInCallback(c *C) {
529- zk, _ := s.init(c)
530+ conn, _ := s.init(c)
531
532- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
533- func(data string, stat gozk.Stat) (string, os.Error) {
534+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
535+ func(data string, stat *zk.Stat) (string, os.Error) {
536 return "don't use this", os.NewError("BOOM!")
537 })
538 c.Assert(err, NotNil)
539- c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
540 c.Assert(err.String(), Equals, "BOOM!")
541
542- stat, err := zk.Exists("/test")
543+ stat, err := conn.Exists("/test")
544 c.Assert(err, IsNil)
545 c.Assert(stat, IsNil)
546 }
547
548 func (s *S) TestRetryChangeFailsReading(c *C) {
549- zk, _ := s.init(c)
550+ conn, _ := s.init(c)
551
552- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
553- gozk.WorldACL(gozk.PERM_WRITE)) // Write only!
554+ // Write only!
555+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE))
556 c.Assert(err, IsNil)
557
558 var called bool
559- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
560- func(data string, stat gozk.Stat) (string, os.Error) {
561+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
562+ func(data string, stat *zk.Stat) (string, os.Error) {
563 called = true
564 return "", nil
565 })
566 c.Assert(err, NotNil)
567- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
568+ c.Assert(err, Equals, zk.ZNOAUTH)
569
570- stat, err := zk.Exists("/test")
571+ stat, err := conn.Exists("/test")
572 c.Assert(err, IsNil)
573 c.Assert(stat, NotNil)
574 c.Assert(stat.Version(), Equals, int32(0))
575@@ -227,22 +220,21 @@
576 }
577
578 func (s *S) TestRetryChangeFailsSetting(c *C) {
579- zk, _ := s.init(c)
580+ conn, _ := s.init(c)
581
582- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
583- gozk.WorldACL(gozk.PERM_READ)) // Read only!
584+ // Read only!
585+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
586 c.Assert(err, IsNil)
587
588 var called bool
589- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
590- func(data string, stat gozk.Stat) (string, os.Error) {
591+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
592+ func(data string, stat *zk.Stat) (string, os.Error) {
593 called = true
594 return "", nil
595 })
596- c.Assert(err, NotNil)
597- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
598+ c.Assert(err, Equals, zk.ZNOAUTH)
599
600- stat, err := zk.Exists("/test")
601+ stat, err := conn.Exists("/test")
602 c.Assert(err, IsNil)
603 c.Assert(stat, NotNil)
604 c.Assert(stat.Version(), Equals, int32(0))
605@@ -251,23 +243,22 @@
606 }
607
608 func (s *S) TestRetryChangeFailsCreating(c *C) {
609- zk, _ := s.init(c)
610+ conn, _ := s.init(c)
611
612- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
613- gozk.WorldACL(gozk.PERM_READ)) // Read only!
614+ // Read only!
615+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
616 c.Assert(err, IsNil)
617
618 var called bool
619- err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,
620- gozk.WorldACL(gozk.PERM_ALL),
621- func(data string, stat gozk.Stat) (string, os.Error) {
622+ err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
623+ func(data string, stat *zk.Stat) (string, os.Error) {
624 called = true
625 return "", nil
626 })
627 c.Assert(err, NotNil)
628- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
629+ c.Assert(err, Equals, zk.ZNOAUTH)
630
631- stat, err := zk.Exists("/test/sub")
632+ stat, err := conn.Exists("/test/sub")
633 c.Assert(err, IsNil)
634 c.Assert(stat, IsNil)
635
636
637=== added file 'server.go'
638--- server.go 1970-01-01 00:00:00 +0000
639+++ server.go 2011-09-30 13:35:26 +0000
640@@ -0,0 +1,239 @@
641+package zk
642+
643+import (
644+ "bufio"
645+ "bytes"
646+ "fmt"
647+ "io/ioutil"
648+ "net"
649+ "os"
650+ "path/filepath"
651+ "strings"
652+)
653+
654+// Server represents a ZooKeeper server, its data and configuration files.
655+type Server struct {
656+ runDir string
657+ installDir string
658+}
659+
660+func (srv *Server) Directory() string {
661+ return srv.runDir
662+}
663+
664+// CreateServer creates the directory runDir and sets up a ZooKeeper server
665+// environment inside it. It is an error if runDir already exists.
666+// The server will listen on the specified TCP port.
667+//
668+// The ZooKeeper installation directory is specified by installDir.
669+// If this is empty, a system default will be used.
670+//
671+// CreateServer does not start the server - the *Server that
672+// is returned can be used with the service package, for example:
673+// see launchpad.net/gozk/zk/service.
674+func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
675+ if err := os.Mkdir(runDir, 0777); err != nil {
676+ return nil, err
677+ }
678+ srv := &Server{runDir: runDir, installDir: installDir}
679+ if err := srv.writeLog4JConfig(); err != nil {
680+ return nil, err
681+ }
682+ if err := srv.writeZooKeeperConfig(port); err != nil {
683+ return nil, err
684+ }
685+ if err := srv.writeInstallDir(); err != nil {
686+ return nil, err
687+ }
688+ return srv, nil
689+}
690+
691+// AttachServer creates a new ZooKeeper Server instance
692+// to operate inside an existing run directory, runDir.
693+// The directory must have been created with CreateServer.
694+func AttachServer(runDir string) (*Server, os.Error) {
695+ srv := &Server{runDir: runDir}
696+ if err := srv.readInstallDir(); err != nil {
697+ return nil, fmt.Errorf("cannot read server install directory: %v", err)
698+ }
699+ return srv, nil
700+}
701+
702+func (srv *Server) path(name string) string {
703+ return filepath.Join(srv.runDir, name)
704+}
705+
706+func (srv *Server) CheckAvailability() os.Error {
707+ port, err := srv.NetworkPort()
708+ if err != nil {
709+ return fmt.Errorf("cannot get network port: %v", err)
710+ }
711+ l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
712+ if err != nil {
713+ return fmt.Errorf("cannot listen on port %v: %v", port, err)
714+ }
715+ l.Close()
716+ return nil
717+}
718+
719+// ServerCommand returns the command used to start the
720+// ZooKeeper server. It is provided for debugging and testing
721+// purposes only.
722+func (srv *Server) Command() ([]string, os.Error) {
723+ cp, err := srv.classPath()
724+ if err != nil {
725+ return nil, fmt.Errorf("cannot get class path: %v", err)
726+ }
727+ return []string{
728+ "java",
729+ "-cp", strings.Join(cp, ":"),
730+ "-Dzookeeper.root.logger=INFO,CONSOLE",
731+ "-Dlog4j.configuration=file:" + srv.path("log4j.properties"),
732+ "org.apache.zookeeper.server.quorum.QuorumPeerMain",
733+ srv.path("zoo.cfg"),
734+ }, nil
735+}
736+
737+var log4jProperties = `
738+log4j.rootLogger=INFO, CONSOLE
739+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
740+log4j.appender.CONSOLE.Threshold=INFO
741+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
742+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
743+`
744+
745+func (srv *Server) writeLog4JConfig() (err os.Error) {
746+ return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666)
747+}
748+
749+func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) {
750+ return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf(
751+ "tickTime=2000\n"+
752+ "dataDir=%s\n"+
753+ "clientPort=%d\n"+
754+ "maxClientCnxns=500\n",
755+ srv.runDir, port)), 0666)
756+}
757+
758+// NetworkPort returns the TCP port number that
759+// the server is configured for.
760+func (srv *Server) NetworkPort() (int, os.Error) {
761+ f, err := os.Open(srv.path("zoo.cfg"))
762+ if err != nil {
763+ return 0, err
764+ }
765+ r := bufio.NewReader(f)
766+ for {
767+ line, err := r.ReadSlice('\n')
768+ if err != nil {
769+ return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg"))
770+ }
771+ var port int
772+ if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 {
773+ return port, nil
774+ }
775+ }
776+ panic("not reached")
777+}
778+
779+func (srv *Server) writeInstallDir() os.Error {
780+ return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)
781+}
782+
783+func (srv *Server) readInstallDir() os.Error {
784+ data, err := ioutil.ReadFile(srv.path("installdir.txt"))
785+ if err != nil {
786+ return err
787+ }
788+ if data[len(data)-1] == '\n' {
789+ data = data[0 : len(data)-1]
790+ }
791+ srv.installDir = string(data)
792+ return nil
793+}
794+
795+func (srv *Server) classPath() ([]string, os.Error) {
796+ dir := srv.installDir
797+ if dir == "" {
798+ return systemClassPath()
799+ }
800+ if err := checkDirectory(dir); err != nil {
801+ return nil, err
802+ }
803+ // Two possibilities, as seen in zkEnv.sh:
804+ // 1) locally built binaries (jars are in build directory)
805+ // 2) release binaries
806+ if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
807+ dir = build
808+ }
809+ classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
810+ if err != nil {
811+ panic(fmt.Errorf("glob for jar files: %v", err))
812+ }
813+ more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
814+ if err != nil {
815+ panic(fmt.Errorf("glob for lib jar files: %v", err))
816+ }
817+
818+ classPath = append(classPath, more...)
819+ if len(classPath) == 0 {
820+ return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
821+ }
822+ return classPath, nil
823+}
824+
825+const zookeeperEnviron = "/etc/zookeeper/conf/environment"
826+
827+func systemClassPath() ([]string, os.Error) {
828+ f, err := os.Open(zookeeperEnviron)
829+ if f == nil {
830+ return nil, err
831+ }
832+ r := bufio.NewReader(f)
833+ for {
834+ line, err := r.ReadSlice('\n')
835+ if err != nil {
836+ break
837+ }
838+ if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
839+ continue
840+ }
841+
842+ // remove variable and newline
843+ path := string(line[len("CLASSPATH=") : len(line)-1])
844+
845+ // trim white space
846+ path = strings.Trim(path, " \t\r")
847+
848+ // strip quotes
849+ if path[0] == '"' {
850+ path = path[1 : len(path)-1]
851+ }
852+
853+ // split on :
854+ classPath := strings.Split(path, ":")
855+
856+ // split off $ZOOCFGDIR
857+ if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
858+ classPath = classPath[1:]
859+ }
860+
861+ if len(classPath) == 0 {
862+ return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
863+ }
864+ return classPath, nil
865+ }
866+ return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
867+}
868+
869+// checkDirectory returns an error if the given path
870+// does not exist or is not a directory.
871+func checkDirectory(path string) os.Error {
872+ if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
873+ if err == nil {
874+ err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
875+ }
876+ return err
877+ }
878+ return nil
879+}
880
881=== added directory 'service'
882=== added file 'service/Makefile'
883--- service/Makefile 1970-01-01 00:00:00 +0000
884+++ service/Makefile 2011-09-30 13:35:26 +0000
885@@ -0,0 +1,20 @@
886+include $(GOROOT)/src/Make.inc
887+
888+TARG=launchpad.net/gozk/zk/service
889+
890+GOFILES=\
891+ service.go\
892+
893+GOFMT=gofmt
894+BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go))
895+
896+gofmt: $(BADFMT)
897+ @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done
898+
899+ifneq ($(BADFMT),)
900+ifneq ($(MAKECMDGOALS),gofmt)
901+$(warning WARNING: make gofmt: $(BADFMT))
902+endif
903+endif
904+
905+include $(GOROOT)/src/Make.pkg
906
907=== added file 'service/service.go'
908--- service/service.go 1970-01-01 00:00:00 +0000
909+++ service/service.go 2011-09-30 13:35:26 +0000
910@@ -0,0 +1,160 @@
911+// The service package provides support for long-running services.
912+package service
913+
914+import (
915+ "os"
916+ "fmt"
917+ "io/ioutil"
918+ "strconv"
919+ "path/filepath"
920+ "exec"
921+ "strings"
922+ "time"
923+)
924+
925+// Interface represents a single-process server.
926+type Interface interface {
927+ // Directory gives the path to the directory containing
928+ // the server's configuration and data files. It is assumed that
929+ // this exists and can be modified.
930+ Directory() string
931+
932+ // CheckAvailability should return an error if resources
933+ // required by the server are not available; for instance
934+ // if the server requires a network port which is not free.
935+ CheckAvailability() os.Error
936+
937+ // Command should return a command that can be
938+ // used to run the server. The first element of the returned
939+ // slice is the executable name; the rest of the elements are
940+ // its arguments.
941+ Command() ([]string, os.Error)
942+}
943+
944+// Service represents a possibly running server process.
945+type Service struct {
946+ srv Interface
947+}
948+
949+// New returns a new Service using the given
950+// server interface.
951+func New(srv Interface) *Service {
952+ return &Service{srv}
953+}
954+
955+// Process returns a reference to the identity
956+// of the last running server in the Service's directory.
957+// If the server was shut down, it returns (nil, nil).
958+// The server process may not still be running
959+// (for instance if it was terminated abnormally).
960+// This function is provided for debugging and testing
961+// purposes only.
962+func (s *Service) Process() (*os.Process, os.Error) {
963+ data, err := ioutil.ReadFile(s.path("pid.txt"))
964+ if err != nil {
965+ if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT {
966+ return nil, nil
967+ }
968+ return nil, err
969+ }
970+ pid, err := strconv.Atoi(string(data))
971+ if err != nil {
972+ return nil, os.NewError("bad process id found in pid.txt")
973+ }
974+ return os.FindProcess(pid)
975+}
976+
977+func (s *Service) path(name string) string {
978+ return filepath.Join(s.srv.Directory(), name)
979+}
980+
981+// Start starts the server. It returns an error if the server is already running.
982+// It stores the process ID of the server in the file "pid.txt"
983+// inside the server's directory. Stdout and stderr of the server
984+// are similarly written to "log.txt".
985+func (s *Service) Start() os.Error {
986+ if err := s.srv.CheckAvailability(); err != nil {
987+ return err
988+ }
989+ p, err := s.Process()
990+ if p != nil || err != nil {
991+ if p != nil {
992+ p.Release()
993+ }
994+ return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt"))
995+ }
996+
997+ // create the pid file before starting the process so that if we get two
998+ // programs trying to concurrently start a server on the same directory
999+ // at the same time, only one should succeed.
1000+ pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666)
1001+ if err != nil {
1002+ return fmt.Errorf("cannot create pid.txt: %v", err)
1003+ }
1004+ defer pidf.Close()
1005+
1006+ args, err := s.srv.Command()
1007+ if err != nil {
1008+ return fmt.Errorf("cannot determine command: %v", err)
1009+ }
1010+ cmd := exec.Command(args[0], args[1:]...)
1011+
1012+ logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
1013+ if err != nil {
1014+ return fmt.Errorf("cannot create log file: %v", err)
1015+ }
1016+ defer logf.Close()
1017+ cmd.Stdout = logf
1018+ cmd.Stderr = logf
1019+ if err := cmd.Start(); err != nil {
1020+ return fmt.Errorf("cannot start server: %v", err)
1021+ }
1022+ if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil {
1023+ return fmt.Errorf("cannot write pid file: %v", err)
1024+ }
1025+ return nil
1026+}
1027+
1028+// Stop kills the server. It is a no-op if it is already running.
1029+func (s *Service) Stop() os.Error {
1030+ p, err := s.Process()
1031+ if p == nil {
1032+ if err != nil {
1033+ return fmt.Errorf("cannot read process ID of server: %v", err)
1034+ }
1035+ return nil
1036+ }
1037+ defer p.Release()
1038+ if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") {
1039+ return fmt.Errorf("cannot kill server process: %v", err)
1040+ }
1041+ // ignore the error returned from Wait because there's little
1042+ // we can do about it - it either means that the process has just exited
1043+ // anyway or that we can't wait for it for some other reason,
1044+ // for example because it was originally started by some other process.
1045+ _, err = p.Wait(0)
1046+ if err != nil && strings.Contains(err.String(), "no child processes") {
1047+ // If we can't wait for the server, it's possible that it was running
1048+ // but not as a child of this process, so give it a little while
1049+ // to exit. TODO poll with kill(no signal)?
1050+ time.Sleep(0.5e9)
1051+ }
1052+
1053+ if err := os.Remove(s.path("pid.txt")); err != nil {
1054+ return fmt.Errorf("cannot remove server process ID file: %v", err)
1055+ }
1056+ return nil
1057+}
1058+
1059+// Destroy stops the server, and then removes its
1060+// directory and all its contents.
1061+// Warning: this will destroy all data associated with the server.
1062+func (s *Service) Destroy() os.Error {
1063+ if err := s.Stop(); err != nil {
1064+ return err
1065+ }
1066+ if err := os.RemoveAll(s.srv.Directory()); err != nil {
1067+ return err
1068+ }
1069+ return nil
1070+}
1071
1072=== added file 'status.go'
1073--- status.go 1970-01-01 00:00:00 +0000
1074+++ status.go 2011-09-30 13:35:26 +0000
1075@@ -0,0 +1,116 @@
1076+package zk
1077+
1078+import (
1079+ "fmt"
1080+)
1081+
1082+// event types, as defined by libzookeeper
1083+const (
1084+ _ = iota
1085+ event_CREATED
1086+ event_DELETED
1087+ event_CHANGED
1088+ event_CHILD
1089+ event_SESSION = -1
1090+ event_NOTWATCHING = -2
1091+)
1092+
1093+// The statusChan interface represents an event channel.
1094+// It translates from libzookeeper events to Go channel sends
1095+// of the appropriate type.
1096+type statusChan interface {
1097+ // close closes the wait channel.
1098+ close()
1099+
1100+ // send sends an event with the given type and connection
1101+ // state to the wait channel. It should do a non-blocking
1102+ // send and return false if the send would have blocked.
1103+ send(eventType, connectionState int, path string) bool
1104+}
1105+
1106+// SessionStatus represents the status of a ZooKeeper session.
1107+type SessionStatus int
1108+
1109+// Constants that represent the status of ZooKeeper
1110+// connection session.
1111+const (
1112+ _ SessionStatus = iota
1113+ SessionConnecting
1114+ SessionAssociating
1115+ SessionConnected
1116+ SessionExpired SessionStatus = -112
1117+ SessionAuthFailed SessionStatus = -113
1118+)
1119+
1120+type sessionStatusChan chan SessionStatus
1121+
1122+func (c sessionStatusChan) close() {
1123+ close(c)
1124+}
1125+
1126+func (c sessionStatusChan) send(eventType, eventState int, path string) bool {
1127+ if eventType != event_SESSION {
1128+ panic(fmt.Errorf("unexpected event, type %v; status %v", eventType, eventState))
1129+ }
1130+ if path != "" {
1131+ panic(fmt.Errorf("non-empty path received on session event"))
1132+ }
1133+ select {
1134+ case c <- SessionStatus(eventState):
1135+ default:
1136+ return false
1137+ }
1138+ return true
1139+}
1140+
1141+// NodeStatus represents the status of a node after ExistsW,
1142+// GetW, or ChildrenW has been called on the node.
1143+type NodeStatus int
1144+
1145+// Constants that represent changes to a ZooKeeper node.
1146+const (
1147+ _ NodeStatus = iota
1148+ NodeDeleted // The node has been deleted.
1149+ NodeCreated // The node has been created.
1150+ NodeChanged // The node data has changed.
1151+ NodeChangedChild // The number of children of the node has changed.
1152+)
1153+
1154+type nodeStatusChan struct {
1155+ c chan NodeStatus
1156+ path string // stored so we can check that the server is behaving.
1157+}
1158+
1159+func newNodeStatusChan(path string) *nodeStatusChan {
1160+ return &nodeStatusChan{path: path, c: make(chan NodeStatus, 1)}
1161+}
1162+
1163+func (c *nodeStatusChan) close() {
1164+ close(c.c)
1165+}
1166+
1167+func (c *nodeStatusChan) send(eventType, eventState int, path string) bool {
1168+ if path != c.path {
1169+ panic(fmt.Errorf("unexpected path %q received on node change event (type %v, status %v)",
1170+ path, eventType, eventState))
1171+ }
1172+ var val NodeStatus
1173+ switch eventType {
1174+ case event_CREATED:
1175+ val = NodeCreated
1176+ case event_DELETED:
1177+ val = NodeDeleted
1178+ case event_CHANGED:
1179+ val = NodeChanged
1180+ case event_CHILD:
1181+ val = NodeChangedChild
1182+ default:
1183+ panic(fmt.Errorf("unexpected event, type %v; status %v", eventType, eventState))
1184+ }
1185+ select {
1186+ case c.c <- val:
1187+ default:
1188+ return false
1189+ }
1190+ return true
1191+}
1192
1193=== modified file 'suite_test.go'
1194--- suite_test.go 2011-08-19 01:43:37 +0000
1195+++ suite_test.go 2011-09-30 13:35:26 +0000
1196@@ -1,94 +1,72 @@
1197-package gozk_test
1198+package zk_test
1199
1200 import (
1201 . "launchpad.net/gocheck"
1202- "testing"
1203- "io/ioutil"
1204- "path"
1205 "fmt"
1206+ "launchpad.net/gozk/zk/service"
1207+ "launchpad.net/gozk/zk"
1208 "os"
1209- "gozk"
1210+ "testing"
1211 "time"
1212 )
1213
1214 func TestAll(t *testing.T) {
1215- TestingT(t)
1216+ if !*reattach {
1217+ TestingT(t)
1218+ }
1219 }
1220
1221 var _ = Suite(&S{})
1222
1223 type S struct {
1224- zkRoot string
1225- zkTestRoot string
1226- zkTestPort int
1227- zkServerSh string
1228- zkServerOut *os.File
1229- zkAddr string
1230+ zkServer *service.Service
1231+ zkTestRoot string
1232+ zkAddr string
1233
1234- handles []*gozk.ZooKeeper
1235- events []*gozk.Event
1236+ handles []*zk.Conn
1237+ events []zk.SessionStatus
1238 liveWatches int
1239 deadWatches chan bool
1240 }
1241
1242-var logLevel = 0 //gozk.LOG_ERROR
1243-
1244-
1245-var testZooCfg = ("dataDir=%s\n" +
1246- "clientPort=%d\n" +
1247- "tickTime=2000\n" +
1248- "initLimit=10\n" +
1249- "syncLimit=5\n" +
1250- "")
1251-
1252-var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
1253- "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
1254- "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
1255- "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
1256- "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
1257- "")
1258-
1259-func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
1260- zk, watch, err := gozk.Init(s.zkAddr, 5e9)
1261+var logLevel = 0 // zk.LOG_ERROR
1262+
1263+func (s *S) init(c *C) (*zk.Conn, <-chan zk.SessionStatus) {
1264+ conn, watch, err := zk.Dial(s.zkAddr, 5e9)
1265 c.Assert(err, IsNil)
1266
1267- s.handles = append(s.handles, zk)
1268-
1269- event := <-watch
1270-
1271- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
1272- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
1273-
1274- bufferedWatch := make(chan gozk.Event, 256)
1275- bufferedWatch <- event
1276+ s.handles = append(s.handles, conn)
1277+
1278+ status := <-watch
1279+ c.Assert(status == zk.SessionConnected, Equals, true)
1280+
1281+ bufferedWatch := make(chan zk.SessionStatus, 256)
1282+ bufferedWatch <- status
1283
1284 s.liveWatches += 1
1285 go func() {
1286- loop:
1287 for {
1288+ status, ok := <-watch
1289+ if !ok {
1290+ break
1291+ }
1292 select {
1293- case event, ok := <-watch:
1294- if !ok {
1295- close(bufferedWatch)
1296- break loop
1297- }
1298- select {
1299- case bufferedWatch <- event:
1300- default:
1301- panic("Too many events in buffered watch!")
1302- }
1303+ case bufferedWatch <- status:
1304+ default:
1305+ panic("Too many events in buffered watch!")
1306 }
1307 }
1308+ close(bufferedWatch)
1309 s.deadWatches <- true
1310 }()
1311
1312- return zk, bufferedWatch
1313+ return conn, bufferedWatch
1314 }
1315
1316 func (s *S) SetUpTest(c *C) {
1317- c.Assert(gozk.CountPendingWatches(), Equals, 0,
1318+ c.Assert(zk.CountPendingWatches(), Equals, 0,
1319 Bug("Test got a dirty watch state before running!"))
1320- gozk.SetLogLevel(logLevel)
1321+ zk.SetLogLevel(logLevel)
1322 }
1323
1324 func (s *S) TearDownTest(c *C) {
1325@@ -108,98 +86,38 @@
1326 }
1327
1328 // Reset the list of handles.
1329- s.handles = make([]*gozk.ZooKeeper, 0)
1330+ s.handles = make([]*zk.Conn, 0)
1331
1332- c.Assert(gozk.CountPendingWatches(), Equals, 0,
1333+ c.Assert(zk.CountPendingWatches(), Equals, 0,
1334 Bug("Test left live watches behind!"))
1335 }
1336
1337-// We use the suite set up and tear down to manage a custom zookeeper
1338+// We use the suite set up and tear down to manage a custom ZooKeeper
1339 //
1340 func (s *S) SetUpSuite(c *C) {
1341-
1342+ var err os.Error
1343 s.deadWatches = make(chan bool)
1344
1345- var err os.Error
1346-
1347- s.zkRoot = os.Getenv("ZKROOT")
1348- if s.zkRoot == "" {
1349- panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")
1350- }
1351-
1352- s.zkTestRoot = c.MkDir()
1353- s.zkTestPort = 21812
1354-
1355- println("ZooKeeper test server directory:", s.zkTestRoot)
1356- println("ZooKeeper test server port:", s.zkTestPort)
1357-
1358- s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)
1359-
1360- s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")
1361- s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
1362- os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
1363- if err != nil {
1364- panic("Can't open stdout.txt file for server: " + err.String())
1365- }
1366-
1367- dataDir := path.Join(s.zkTestRoot, "data")
1368- confDir := path.Join(s.zkTestRoot, "conf")
1369-
1370- os.Mkdir(dataDir, 0755)
1371- os.Mkdir(confDir, 0755)
1372-
1373- err = os.Setenv("ZOOCFGDIR", confDir)
1374- if err != nil {
1375- panic("Can't set $ZOOCFGDIR: " + err.String())
1376- }
1377-
1378- zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
1379- err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
1380- if err != nil {
1381- panic("Can't write zoo.cfg: " + err.String())
1382- }
1383-
1384- log4jPrp := []byte(testLog4jPrp)
1385- err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
1386- if err != nil {
1387- panic("Can't write log4j.properties: " + err.String())
1388- }
1389-
1390- s.StartZK()
1391+ // N.B. We meed to create a subdirectory because zk.CreateServer
1392+ // insists on creating its own directory.
1393+ s.zkTestRoot = c.MkDir() + "/zk"
1394+
1395+ port := 21812
1396+ s.zkAddr = fmt.Sprint("localhost:", port)
1397+
1398+ srv, err := zk.CreateServer(port, s.zkTestRoot, "")
1399+ if err != nil {
1400+ c.Fatal("Cannot set up server environment: ", err)
1401+ }
1402+ s.zkServer = service.New(srv)
1403+ err = s.zkServer.Start()
1404+ if err != nil {
1405+ c.Fatal("Cannot start ZooKeeper server: ", err)
1406+ }
1407 }
1408
1409 func (s *S) TearDownSuite(c *C) {
1410- s.StopZK()
1411- s.zkServerOut.Close()
1412-}
1413-
1414-func (s *S) StartZK() {
1415- attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
1416- proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)
1417- if err != nil {
1418- panic("Problem executing zkServer.sh start: " + err.String())
1419- }
1420-
1421- result, err := proc.Wait(0)
1422- if err != nil {
1423- panic(err.String())
1424- } else if result.ExitStatus() != 0 {
1425- panic("'zkServer.sh start' exited with non-zero status")
1426- }
1427-}
1428-
1429-func (s *S) StopZK() {
1430- attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
1431- proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)
1432- if err != nil {
1433- panic("Problem executing zkServer.sh stop: " + err.String() +
1434- " (look for runaway java processes!)")
1435- }
1436- result, err := proc.Wait(0)
1437- if err != nil {
1438- panic(err.String())
1439- } else if result.ExitStatus() != 0 {
1440- panic("'zkServer.sh stop' exited with non-zero status " +
1441- "(look for runaway java processes!)")
1442+ if s.zkServer != nil {
1443+ s.zkServer.Stop() // TODO Destroy
1444 }
1445 }
1446
1447=== renamed file 'gozk.go' => 'zk.go'
1448--- gozk.go 2011-08-19 01:56:39 +0000
1449+++ zk.go 2011-09-30 13:35:26 +0000
1450@@ -1,12 +1,26 @@
1451-// gozk - Zookeeper support for the Go language
1452+// gozk - ZooKeeper support for the Go language
1453 //
1454 // https://wiki.ubuntu.com/gozk
1455 //
1456 // Copyright (c) 2010-2011 Canonical Ltd.
1457-//
1458 // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
1459 //
1460-package gozk
1461+// A note about channels as used for change notification in gozk.
1462+//
1463+// Event channels are used to provide notifications of changes to ZooKeeper
1464+// nodes, from GetW, ExistsW, etc. These are invariably one-shot - a single
1465+// value will be sent on the wait channel to signify the change and then the
1466+// channel will be closed. If the ZooKeeper server goes down when awaiting
1467+// a change, the channel will be closed without sending a value. Clients
1468+// can either check explicitly for the closed channel with:
1469+//
1470+// if v, ok := <-wait; !ok {
1471+// }
1472+//
1473+// or just test the received value against zero, which
1474+// signifies no value for each of the notification types.
1475+//
1476+package zk
1477
1478 /*
1479 #cgo CFLAGS: -I/usr/include/c-client-src
1480@@ -27,17 +41,16 @@
1481 // -----------------------------------------------------------------------
1482 // Main constants and data types.
1483
1484-// The main ZooKeeper object, created through the Init function.
1485-// Encapsulates all communication with ZooKeeper.
1486-type ZooKeeper struct {
1487- watchChannels map[uintptr]chan Event
1488+// Conn represents a connection to a set of ZooKeeper nodes.
1489+type Conn struct {
1490+ watchChannels map[uintptr]statusChan
1491 sessionWatchId uintptr
1492 handle *C.zhandle_t
1493 mutex sync.Mutex
1494 }
1495
1496-// ClientId represents the established session in ZooKeeper. This is only
1497-// useful to be passed back into the ReInit function.
1498+// ClientId represents an established ZooKeeper session. It can be
1499+// passed into Redial to reestablish a connection to an existing session.
1500 type ClientId struct {
1501 cId C.clientid_t
1502 }
1503@@ -51,84 +64,63 @@
1504 Id string
1505 }
1506
1507-// Event channels are used to provide notifications of changes in the
1508-// ZooKeeper connection state and in specific node aspects.
1509-//
1510-// There are two sources of events: the session channel obtained during
1511-// initialization with Init, and any watch channels obtained
1512-// through one of the W-suffixed functions (GetW, ExistsW, etc).
1513-//
1514-// The session channel will only receive session-level events notifying
1515-// about critical and transient changes in the ZooKeeper connection
1516-// state (STATE_CONNECTED, STATE_EXPIRED_SESSION, etc). On long
1517-// running applications the session channel must *necessarily* be
1518-// observed since certain events like session expirations require an
1519-// explicit reconnection and reestablishment of state (or bailing out).
1520-// Because of that, the buffer used on the session channel has a limited
1521-// size, and a panic will occur if too many events are not collected.
1522-//
1523-// Watch channels enable monitoring state for nodes, and the
1524-// moment they're fired depends on which function was called to
1525-// create them. Note that, unlike in other ZooKeeper interfaces,
1526-// gozk will NOT dispatch unimportant session events such as
1527-// STATE_ASSOCIATING, STATE_CONNECTING and STATE_CONNECTED to
1528-// watch Event channels, since they are transient and disruptive
1529-// to the workflow. Critical state changes such as expirations
1530-// are still delivered to all event channels, though, and the
1531-// transient events may be obsererved in the session channel.
1532-//
1533-// Since every watch channel may receive critical session events, events
1534-// received must not be handled blindly as if the watch requested has
1535-// been fired. To facilitate such tests, Events offer the Ok method,
1536-// and they also have a good String method so they may be used as an
1537-// os.Error value if wanted. E.g.:
1538-//
1539-// event := <-watch
1540-// if !event.Ok() {
1541-// err = event
1542-// return
1543-// }
1544-//
1545-// Note that closed channels will deliver zeroed Event, which means
1546-// event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED,
1547-// to facilitate handling.
1548-type Event struct {
1549- Type int
1550- Path string
1551- State int
1552-}
1553+// Error represents a ZooKeeper error.
1554+type Error int
1555
1556-// Error codes that may be used to verify the result of the
1557-// Code method from Error.
1558 const (
1559- ZOK = C.ZOK
1560- ZSYSTEMERROR = C.ZSYSTEMERROR
1561- ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY
1562- ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY
1563- ZCONNECTIONLOSS = C.ZCONNECTIONLOSS
1564- ZMARSHALLINGERROR = C.ZMARSHALLINGERROR
1565- ZUNIMPLEMENTED = C.ZUNIMPLEMENTED
1566- ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT
1567- ZBADARGUMENTS = C.ZBADARGUMENTS
1568- ZINVALIDSTATE = C.ZINVALIDSTATE
1569- ZAPIERROR = C.ZAPIERROR
1570- ZNONODE = C.ZNONODE
1571- ZNOAUTH = C.ZNOAUTH
1572- ZBADVERSION = C.ZBADVERSION
1573- ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS
1574- ZNODEEXISTS = C.ZNODEEXISTS
1575- ZNOTEMPTY = C.ZNOTEMPTY
1576- ZSESSIONEXPIRED = C.ZSESSIONEXPIRED
1577- ZINVALIDCALLBACK = C.ZINVALIDCALLBACK
1578- ZINVALIDACL = C.ZINVALIDACL
1579- ZAUTHFAILED = C.ZAUTHFAILED
1580- ZCLOSING = C.ZCLOSING
1581- ZNOTHING = C.ZNOTHING
1582- ZSESSIONMOVED = C.ZSESSIONMOVED
1583+ ZOK Error = C.ZOK
1584+ ZSYSTEMERROR Error = C.ZSYSTEMERROR
1585+ ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
1586+ ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
1587+ ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
1588+ ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
1589+ ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
1590+ ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
1591+ ZBADARGUMENTS Error = C.ZBADARGUMENTS
1592+ ZINVALIDSTATE Error = C.ZINVALIDSTATE
1593+ ZAPIERROR Error = C.ZAPIERROR
1594+ ZNONODE Error = C.ZNONODE
1595+ ZNOAUTH Error = C.ZNOAUTH
1596+ ZBADVERSION Error = C.ZBADVERSION
1597+ ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
1598+ ZNODEEXISTS Error = C.ZNODEEXISTS
1599+ ZNOTEMPTY Error = C.ZNOTEMPTY
1600+ ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
1601+ ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
1602+ ZINVALIDACL Error = C.ZINVALIDACL
1603+ ZAUTHFAILED Error = C.ZAUTHFAILED
1604+ ZCLOSING Error = C.ZCLOSING
1605+ ZNOTHING Error = C.ZNOTHING
1606+ ZSESSIONMOVED Error = C.ZSESSIONMOVED
1607 )
1608
1609+func (error Error) String() string {
1610+ return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
1611+}
1612+
1613+// zkError creates an appropriate error return from
1614+// a ZooKeeper status and the errno return from a C API
1615+// call.
1616+func zkError(rc C.int, cerr os.Error) os.Error {
1617+ code := Error(rc)
1618+ switch code {
1619+ case ZOK:
1620+ return nil
1621+
1622+ case ZSYSTEMERROR:
1623+ // If a ZooKeeper call returns ZSYSTEMERROR, then
1624+ // errno becomes significant. If errno has not been
1625+ // set, then we will return ZSYSTEMERROR nonetheless.
1626+ if cerr != nil {
1627+ return cerr
1628+ }
1629+ }
1630+ return code
1631+}
1632+
1633 // Constants for SetLogLevel.
1634 const (
1635+ LOG_NONE = 0,
1636 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
1637 LOG_WARN = C.ZOO_LOG_LEVEL_WARN
1638 LOG_INFO = C.ZOO_LOG_LEVEL_INFO
1639@@ -155,33 +147,6 @@
1640 PERM_ALL = 0x1f
1641 )
1642
1643-// Constants for Event Type.
1644-const (
1645- EVENT_CREATED = iota + 1
1646- EVENT_DELETED
1647- EVENT_CHANGED
1648- EVENT_CHILD
1649- EVENT_SESSION = -1
1650- EVENT_NOTWATCHING = -2
1651-
1652- // Doesn't really exist in zk, but handy for use in zeroed Event
1653- // values (e.g. closed channels).
1654- EVENT_CLOSED = 0
1655-)
1656-
1657-// Constants for Event State.
1658-const (
1659- STATE_EXPIRED_SESSION = -112
1660- STATE_AUTH_FAILED = -113
1661- STATE_CONNECTING = 1
1662- STATE_ASSOCIATING = 2
1663- STATE_CONNECTED = 3
1664-
1665- // Doesn't really exist in zk, but handy for use in zeroed Event
1666- // values (e.g. closed channels).
1667- STATE_CLOSED = 0
1668-)
1669-
1670 func init() {
1671 if EPHEMERAL != C.ZOO_EPHEMERAL ||
1672 SEQUENCE != C.ZOO_SEQUENCE ||
1673@@ -191,17 +156,17 @@
1674 PERM_DELETE != C.ZOO_PERM_DELETE ||
1675 PERM_ADMIN != C.ZOO_PERM_ADMIN ||
1676 PERM_ALL != C.ZOO_PERM_ALL ||
1677- EVENT_CREATED != C.ZOO_CREATED_EVENT ||
1678- EVENT_DELETED != C.ZOO_DELETED_EVENT ||
1679- EVENT_CHANGED != C.ZOO_CHANGED_EVENT ||
1680- EVENT_CHILD != C.ZOO_CHILD_EVENT ||
1681- EVENT_SESSION != C.ZOO_SESSION_EVENT ||
1682- EVENT_NOTWATCHING != C.ZOO_NOTWATCHING_EVENT ||
1683- STATE_EXPIRED_SESSION != C.ZOO_EXPIRED_SESSION_STATE ||
1684- STATE_AUTH_FAILED != C.ZOO_AUTH_FAILED_STATE ||
1685- STATE_CONNECTING != C.ZOO_CONNECTING_STATE ||
1686- STATE_ASSOCIATING != C.ZOO_ASSOCIATING_STATE ||
1687- STATE_CONNECTED != C.ZOO_CONNECTED_STATE {
1688+ event_CREATED != C.ZOO_CREATED_EVENT ||
1689+ event_DELETED != C.ZOO_DELETED_EVENT ||
1690+ event_CHANGED != C.ZOO_CHANGED_EVENT ||
1691+ event_CHILD != C.ZOO_CHILD_EVENT ||
1692+ event_SESSION != C.ZOO_SESSION_EVENT ||
1693+ event_NOTWATCHING != C.ZOO_NOTWATCHING_EVENT ||
1694+ SessionExpired != SessionStatus(C.ZOO_EXPIRED_SESSION_STATE) ||
1695+ SessionAuthFailed != SessionStatus(C.ZOO_AUTH_FAILED_STATE) ||
1696+ SessionConnecting != SessionStatus(C.ZOO_CONNECTING_STATE) ||
1697+ SessionAssociating != SessionStatus(C.ZOO_ASSOCIATING_STATE) ||
1698+ SessionConnected != SessionStatus(C.ZOO_CONNECTED_STATE) {
1699
1700 panic("OOPS: Constants don't match C counterparts")
1701 }
1702@@ -222,155 +187,97 @@
1703 }
1704
1705 // -----------------------------------------------------------------------
1706-// Event methods.
1707+// Session status methods.
1708
1709 // Ok returns true in case the event reports zk as being in a usable state.
1710-func (e Event) Ok() bool {
1711+func (status SessionStatus) Ok() bool {
1712 // That's really it for now. Anything else seems to mean zk
1713 // can't be used at the moment.
1714- return e.State == STATE_CONNECTED
1715+ return status == SessionConnected
1716 }
1717
1718-func (e Event) String() (s string) {
1719- switch e.State {
1720- case STATE_EXPIRED_SESSION:
1721+func (status SessionStatus) String() (s string) {
1722+ switch status {
1723+ case 0:
1724+ s = "ZooKeeper connection closed"
1725+ case SessionExpired:
1726 s = "ZooKeeper session expired"
1727- case STATE_AUTH_FAILED:
1728+ case SessionAuthFailed:
1729 s = "ZooKeeper authentication failed"
1730- case STATE_CONNECTING:
1731+ case SessionConnecting:
1732 s = "ZooKeeper connecting"
1733- case STATE_ASSOCIATING:
1734+ case SessionAssociating:
1735 s = "ZooKeeper still associating"
1736- case STATE_CONNECTED:
1737+ case SessionConnected:
1738 s = "ZooKeeper connected"
1739- case STATE_CLOSED:
1740- s = "ZooKeeper connection closed"
1741 default:
1742- s = fmt.Sprintf("unknown ZooKeeper state %d", e.State)
1743- }
1744- if e.Type == -1 || e.Type == EVENT_SESSION {
1745- return
1746- }
1747- if s != "" {
1748- s += "; "
1749- }
1750- switch e.Type {
1751- case EVENT_CREATED:
1752- s += "path created: "
1753- case EVENT_DELETED:
1754- s += "path deleted: "
1755- case EVENT_CHANGED:
1756- s += "path changed: "
1757- case EVENT_CHILD:
1758- s += "path children changed: "
1759- case EVENT_NOTWATCHING:
1760- s += "not watching: " // !?
1761- case EVENT_SESSION:
1762- // nothing
1763- }
1764- s += e.Path
1765- return
1766-}
1767-
1768-// -----------------------------------------------------------------------
1769-// Error interface which maps onto the ZooKeeper error codes.
1770-
1771-type Error interface {
1772- String() string
1773- Code() int
1774-}
1775-
1776-type errorType struct {
1777- zkrc C.int
1778- err os.Error
1779-}
1780-
1781-func newError(zkrc C.int, err os.Error) Error {
1782- return &errorType{zkrc, err}
1783-}
1784-
1785-func (error *errorType) String() (result string) {
1786- if error.zkrc == ZSYSTEMERROR && error.err != nil {
1787- result = error.err.String()
1788- } else {
1789- result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
1790- }
1791- return
1792-}
1793-
1794-// Code returns the error code that may be compared against one of
1795-// the gozk.Z* constants.
1796-func (error *errorType) Code() int {
1797- return int(error.zkrc)
1798-}
1799-
1800-// -----------------------------------------------------------------------
1801-// Stat interface which maps onto the ZooKeeper Stat struct.
1802-
1803-// We declare this as an interface rather than an actual struct because
1804-// this way we don't have to copy data around between the real C struct
1805-// and the Go one on every call. Most uses will only touch a few elements,
1806-// or even ignore the stat entirely, so that's a win.
1807+ s = fmt.Sprintf("unknown ZooKeeper state %d", status)
1808+ }
1809+ return s
1810+}
1811+
1812+// -----------------------------------------------------------------------
1813
1814 // Stat contains detailed information about a node.
1815-type Stat interface {
1816- Czxid() int64
1817- Mzxid() int64
1818- CTime() int64
1819- MTime() int64
1820- Version() int32
1821- CVersion() int32
1822- AVersion() int32
1823- EphemeralOwner() int64
1824- DataLength() int32
1825- NumChildren() int32
1826- Pzxid() int64
1827-}
1828-
1829-type resultStat C.struct_Stat
1830-
1831-func (stat *resultStat) Czxid() int64 {
1832- return int64(stat.czxid)
1833-}
1834-
1835-func (stat *resultStat) Mzxid() int64 {
1836- return int64(stat.mzxid)
1837-}
1838-
1839-func (stat *resultStat) CTime() int64 {
1840- return int64(stat.ctime)
1841-}
1842-
1843-func (stat *resultStat) MTime() int64 {
1844- return int64(stat.mtime)
1845-}
1846-
1847-func (stat *resultStat) Version() int32 {
1848- return int32(stat.version)
1849-}
1850-
1851-func (stat *resultStat) CVersion() int32 {
1852- return int32(stat.cversion)
1853-}
1854-
1855-func (stat *resultStat) AVersion() int32 {
1856- return int32(stat.aversion)
1857-}
1858-
1859-func (stat *resultStat) EphemeralOwner() int64 {
1860- return int64(stat.ephemeralOwner)
1861-}
1862-
1863-func (stat *resultStat) DataLength() int32 {
1864- return int32(stat.dataLength)
1865-}
1866-
1867-func (stat *resultStat) NumChildren() int32 {
1868- return int32(stat.numChildren)
1869-}
1870-
1871-func (stat *resultStat) Pzxid() int64 {
1872- return int64(stat.pzxid)
1873+type Stat struct {
1874+ c C.struct_Stat
1875+}
1876+
1877+func (stat *Stat) String() string {
1878+ return fmt.Sprintf(
1879+ "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+
1880+ "Version: %d; CVersion: %d; AVersion: %d; "+
1881+ "EphemeralOwner: %d; DataLength: %d; "+
1882+ "NumChildren: %d; Pzxid: %d}",
1883+ stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(),
1884+ stat.Version(), stat.CVersion(), stat.AVersion(),
1885+ stat.EphemeralOwner(), stat.DataLength(),
1886+ stat.NumChildren(), stat.Pzxid(),
1887+ )
1888+}
1889+
1890+func (stat *Stat) Czxid() int64 {
1891+ return int64(stat.c.czxid)
1892+}
1893+
1894+func (stat *Stat) Mzxid() int64 {
1895+ return int64(stat.c.mzxid)
1896+}
1897+
1898+func (stat *Stat) CTime() int64 {
1899+ return int64(stat.c.ctime)
1900+}
1901+
1902+func (stat *Stat) MTime() int64 {
1903+ return int64(stat.c.mtime)
1904+}
1905+
1906+func (stat *Stat) Version() int32 {
1907+ return int32(stat.c.version)
1908+}
1909+
1910+func (stat *Stat) CVersion() int32 {
1911+ return int32(stat.c.cversion)
1912+}
1913+
1914+func (stat *Stat) AVersion() int32 {
1915+ return int32(stat.c.aversion)
1916+}
1917+
1918+func (stat *Stat) EphemeralOwner() int64 {
1919+ return int64(stat.c.ephemeralOwner)
1920+}
1921+
1922+func (stat *Stat) DataLength() int32 {
1923+ return int32(stat.c.dataLength)
1924+}
1925+
1926+func (stat *Stat) NumChildren() int32 {
1927+ return int32(stat.c.numChildren)
1928+}
1929+
1930+func (stat *Stat) Pzxid() int64 {
1931+ return int64(stat.c.pzxid)
1932 }
1933
1934 // -----------------------------------------------------------------------
1935@@ -380,11 +287,12 @@
1936
1937 // SetLogLevel changes the minimum level of logging output generated
1938 // to adjust the amount of information provided.
1939+// The default logging level is LOG_ERROR.
1940 func SetLogLevel(level int) {
1941 C.zoo_set_debug_level(C.ZooLogLevel(level))
1942 }
1943
1944-// Init initializes the communication with a ZooKeeper cluster. The provided
1945+// Dial initializes the communication with a ZooKeeper cluster. The provided
1946 // servers parameter may include multiple server addresses, separated
1947 // by commas, so that the client will automatically attempt to connect
1948 // to another server if one of them stops working for whatever reason.
1949@@ -395,82 +303,80 @@
1950 //
1951 // Session establishment is asynchronous, meaning that this function
1952 // will return before the communication with ZooKeeper is fully established.
1953-// The watch channel receives events of type SESSION_EVENT when any change
1954-// to the state of the established connection happens. See the documentation
1955-// for the Event type for more details.
1956-func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {
1957- zk, watch, err = internalInit(servers, recvTimeoutNS, nil)
1958- return
1959+// The returned channel is used to receive events about the current
1960+// status of the connection. On long running applications the session channel
1961+// must *necessarily* be observed since certain events like session expirations require an
1962+// explicit reconnection and reestablishment of state (or bailing out).
1963+// Because of that, the buffer used on the session channel has a limited
1964+// size, and a panic will occur if too many events are not collected.
1965+func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan SessionStatus, os.Error) {
1966+ return dial(servers, recvTimeoutNS, nil)
1967 }
1968
1969-// Equivalent to Init, but attempt to reestablish an existing session
1970+// Redial is equivalent to Dial, but attempts to reestablish an existing session
1971 // identified via the clientId parameter.
1972-func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {
1973- zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)
1974- return
1975+func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan SessionStatus, os.Error) {
1976+ return dial(servers, recvTimeoutNS, clientId)
1977 }
1978
1979-func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {
1980-
1981- zk := &ZooKeeper{}
1982- zk.watchChannels = make(map[uintptr]chan Event)
1983+func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan SessionStatus, os.Error) {
1984+ conn := &Conn{}
1985+ conn.watchChannels = make(map[uintptr]statusChan)
1986
1987 var cId *C.clientid_t
1988 if clientId != nil {
1989 cId = &clientId.cId
1990 }
1991
1992- watchId, watchChannel := zk.createWatch(true)
1993- zk.sessionWatchId = watchId
1994+ watchChannel := make(sessionStatusChan, 32)
1995+ watchId := conn.createWatch(watchChannel)
1996+ conn.sessionWatchId = watchId
1997
1998 cservers := C.CString(servers)
1999- handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)
2000+ handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
2001 C.free(unsafe.Pointer(cservers))
2002 if handle == nil {
2003- zk.closeAllWatches()
2004- return nil, nil, newError(ZSYSTEMERROR, cerr)
2005+ conn.closeAllWatches()
2006+ return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
2007 }
2008- zk.handle = handle
2009+ conn.handle = handle
2010 runWatchLoop()
2011- return zk, watchChannel, nil
2012+ return conn, watchChannel, nil
2013 }
2014
2015 // ClientId returns the client ID for the existing session with ZooKeeper.
2016 // This is useful to reestablish an existing session via ReInit.
2017-func (zk *ZooKeeper) ClientId() *ClientId {
2018- return &ClientId{*C.zoo_client_id(zk.handle)}
2019+func (conn *Conn) ClientId() *ClientId {
2020+ return &ClientId{*C.zoo_client_id(conn.handle)}
2021 }
2022
2023 // Close terminates the ZooKeeper interaction.
2024-func (zk *ZooKeeper) Close() Error {
2025-
2026- // Protect from concurrency around zk.handle change.
2027- zk.mutex.Lock()
2028- defer zk.mutex.Unlock()
2029-
2030- if zk.handle == nil {
2031+func (conn *Conn) Close() os.Error {
2032+
2033+ // Protect from concurrency around conn.handle change.
2034+ conn.mutex.Lock()
2035+ defer conn.mutex.Unlock()
2036+
2037+ if conn.handle == nil {
2038 // ZooKeeper may hang indefinitely if a handler is closed twice,
2039 // so we get in the way and prevent it from happening.
2040- return newError(ZCLOSING, nil)
2041+ return ZCLOSING
2042 }
2043- rc, cerr := C.zookeeper_close(zk.handle)
2044+ rc, cerr := C.zookeeper_close(conn.handle)
2045
2046- zk.closeAllWatches()
2047+ conn.closeAllWatches()
2048 stopWatchLoop()
2049
2050- // At this point, nothing else should need zk.handle.
2051- zk.handle = nil
2052+ // At this point, nothing else should need conn.handle.
2053+ conn.handle = nil
2054
2055- if rc != C.ZOK {
2056- return newError(rc, cerr)
2057- }
2058- return nil
2059+ return zkError(rc, cerr)
2060 }
2061
2062 // Get returns the data and status from an existing node. err will be nil,
2063 // unless an error is found. Attempting to retrieve data from a non-existing
2064 // node is an error.
2065-func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {
2066+func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
2067
2068 cpath := C.CString(path)
2069 cbuffer := (*C.char)(C.malloc(bufferSize))
2070@@ -478,22 +384,22 @@
2071 defer C.free(unsafe.Pointer(cpath))
2072 defer C.free(unsafe.Pointer(cbuffer))
2073
2074- cstat := C.struct_Stat{}
2075- rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,
2076- cbuffer, &cbufferLen, &cstat)
2077+ var cstat Stat
2078+ rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c)
2079 if rc != C.ZOK {
2080- return "", nil, newError(rc, cerr)
2081+ return "", nil, zkError(rc, cerr)
2082 }
2083+
2084 result := C.GoStringN(cbuffer, cbufferLen)
2085-
2086- return result, (*resultStat)(&cstat), nil
2087+ return result, &cstat, nil
2088 }
2089
2090 // GetW works like Get but also returns a channel that will receive
2091-// a single Event value when the data or existence of the given ZooKeeper
2092-// node changes or when critical session events happen. See the
2093-// documentation of the Event type for more details.
2094-func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {
2095+// a single value, one of NodeChanged or NodeDeleted, when the
2096+// node contents change or it is deleted.
2097+// If the ZooKeeper server goes down before an
2098+// event happens, the channel will be closed.
2099+func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan NodeStatus, err os.Error) {
2100
2101 cpath := C.CString(path)
2102 cbuffer := (*C.char)(C.malloc(bufferSize))
2103@@ -501,82 +407,94 @@
2104 defer C.free(unsafe.Pointer(cpath))
2105 defer C.free(unsafe.Pointer(cbuffer))
2106
2107- watchId, watchChannel := zk.createWatch(true)
2108+ watchChannel := newNodeStatusChan(path)
2109+ watchId := conn.createWatch(watchChannel)
2110
2111- cstat := C.struct_Stat{}
2112- rc, cerr := C.zoo_wget(zk.handle, cpath,
2113- C.watch_handler, unsafe.Pointer(watchId),
2114- cbuffer, &cbufferLen, &cstat)
2115+ var cstat Stat
2116+ rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c)
2117 if rc != C.ZOK {
2118- zk.forgetWatch(watchId)
2119- return "", nil, nil, newError(rc, cerr)
2120+ conn.forgetWatch(watchId)
2121+ return "", nil, nil, zkError(rc, cerr)
2122 }
2123
2124 result := C.GoStringN(cbuffer, cbufferLen)
2125- return result, (*resultStat)(&cstat), watchChannel, nil
2126+ return result, &cstat, watchChannel.c, nil
2127 }
2128
2129 // Children returns the children list and status from an existing node.
2130-// err will be nil, unless an error is found. Attempting to retrieve the
2131-// children list from a non-existent node is an error.
2132-func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
2133+// Attempting to retrieve the children list from a non-existent node is an error.
2134+func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
2135
2136 cpath := C.CString(path)
2137 defer C.free(unsafe.Pointer(cpath))
2138
2139 cvector := C.struct_String_vector{}
2140- cstat := C.struct_Stat{}
2141- rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,
2142- &cvector, &cstat)
2143+ var cstat Stat
2144+ rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
2145
2146 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
2147 if cvector.count != 0 {
2148 children = parseStringVector(&cvector)
2149 }
2150- if rc != C.ZOK {
2151- err = newError(rc, cerr)
2152+ if rc == C.ZOK {
2153+ stat = &cstat
2154 } else {
2155- stat = (*resultStat)(&cstat)
2156+ err = zkError(rc, cerr)
2157 }
2158 return
2159 }
2160
2161 // ChildrenW works like Children but also returns a channel that will
2162-// receive a single Event value when a node is added or removed under the
2163-// provided path or when critical session events happen. See the documentation
2164-// of the Event type for more details.
2165-func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {
2166+// receive a single value, one of NodeDeleted or NodeChangedChild,
2167+// when a node is added or removed under the
2168+// provided path or when the node itself is deleted. If the ZooKeeper server
2169+// goes down before an event happens, the channel will be closed.
2170+func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan NodeStatus, err os.Error) {
2171
2172 cpath := C.CString(path)
2173 defer C.free(unsafe.Pointer(cpath))
2174
2175- watchId, watchChannel := zk.createWatch(true)
2176+ watchChannel := newNodeStatusChan(path)
2177+ watchId := conn.createWatch(watchChannel)
2178
2179 cvector := C.struct_String_vector{}
2180- cstat := C.struct_Stat{}
2181- rc, cerr := C.zoo_wget_children2(zk.handle, cpath,
2182- C.watch_handler, unsafe.Pointer(watchId),
2183- &cvector, &cstat)
2184+ var cstat Stat
2185+ rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c)
2186
2187 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
2188 if cvector.count != 0 {
2189 children = parseStringVector(&cvector)
2190 }
2191- if rc != C.ZOK {
2192- zk.forgetWatch(watchId)
2193- err = newError(rc, cerr)
2194+ if rc == C.ZOK {
2195+ stat = &cstat
2196+ watch = watchChannel.c
2197 } else {
2198- stat = (*resultStat)(&cstat)
2199- watch = watchChannel
2200+ conn.forgetWatch(watchId)
2201+ err = zkError(rc, cerr)
2202 }
2203 return
2204 }
2205
2206+// an arguably nicer version of parseStringVector (untested)
2207+//func parseStringVector(c *C.struct_String_vector) []string {
2208+// // make a Go slice onto the C data.
2209+// cv := *(*[]*C.char)(&reflect.SliceHeader{
2210+// Data: uintptr(unsafe.Pointer(c.data)),
2211+// Len: c.count,
2212+// Cap: c.count,
2213+// })
2214+// v := make([]string, c.count)
2215+// for i, s := range cv {
2216+// v[i] = C.GoString(s)
2217+// }
2218+// return v
2219+//}
2220+
2221 func parseStringVector(cvector *C.struct_String_vector) []string {
2222 vector := make([]string, cvector.count)
2223 dataStart := uintptr(unsafe.Pointer(cvector.data))
2224 uintptrSize := unsafe.Sizeof(dataStart)
2225- for i := 0; i != len(vector); i++ {
2226+ for i := range vector {
2227 cpathPos := dataStart + uintptr(i)*uintptrSize
2228 cpath := *(**C.char)(unsafe.Pointer(cpathPos))
2229 vector[i] = C.GoString(cpath)
2230@@ -588,51 +506,50 @@
2231 // Exists checks if a node exists at the given path. If it does,
2232 // stat will contain meta information on the existing node, otherwise
2233 // it will be nil.
2234-func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {
2235+func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) {
2236 cpath := C.CString(path)
2237 defer C.free(unsafe.Pointer(cpath))
2238
2239- cstat := C.struct_Stat{}
2240- rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)
2241+ var cstat Stat
2242+ rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c)
2243
2244 // We diverge a bit from the usual here: a ZNONODE is not an error
2245 // for an exists call, otherwise every Exists call would have to check
2246 // for err != nil and err.Code() != ZNONODE.
2247 if rc == C.ZOK {
2248- stat = (*resultStat)(&cstat)
2249+ stat = &cstat
2250 } else if rc != C.ZNONODE {
2251- err = newError(rc, cerr)
2252+ err = zkError(rc, cerr)
2253 }
2254 return
2255 }
2256
2257 // ExistsW works like Exists but also returns a channel that will
2258-// receive an Event value when a node is created in case the returned
2259-// stat is nil and the node didn't exist, or when the existing node
2260-// is removed. It will also receive critical session events. See the
2261-// documentation of the Event type for more details.
2262-func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {
2263+// receive a single value, one of NodeCreated, NodeDeleted or NodeChanged,
2264+// when the node is next created, removed, or its contents change.
2265+// If the ZooKeeper server goes down before an event happens, the channel will be closed.
2266+func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan NodeStatus, err os.Error) {
2267 cpath := C.CString(path)
2268 defer C.free(unsafe.Pointer(cpath))
2269
2270- watchId, watchChannel := zk.createWatch(true)
2271+ watchChannel := newNodeStatusChan(path)
2272+ watchId := conn.createWatch(watchChannel)
2273
2274- cstat := C.struct_Stat{}
2275- rc, cerr := C.zoo_wexists(zk.handle, cpath,
2276- C.watch_handler, unsafe.Pointer(watchId), &cstat)
2277+ var cstat Stat
2278+ rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
2279
2280 // We diverge a bit from the usual here: a ZNONODE is not an error
2281 // for an exists call, otherwise every Exists call would have to check
2282 // for err != nil and err.Code() != ZNONODE.
2283- switch rc {
2284+ switch Error(rc) {
2285 case ZOK:
2286- stat = (*resultStat)(&cstat)
2287- watch = watchChannel
2288+ stat = &cstat
2289+ watch = watchChannel.c
2290 case ZNONODE:
2291- watch = watchChannel
2292+ watch = watchChannel.c
2293 default:
2294- zk.forgetWatch(watchId)
2295- err = newError(rc, cerr)
2296+ conn.forgetWatch(watchId)
2297+ err = zkError(rc, cerr)
2298 }
2299 return
2300 }
2301@@ -646,7 +563,7 @@
2302 // The returned path is useful in cases where the created path may differ
2303 // from the requested one, such as when a sequence number is appended
2304 // to it due to the use of the gozk.SEQUENCE flag.
2305-func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {
2306+func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
2307 cpath := C.CString(path)
2308 cvalue := C.CString(value)
2309 defer C.free(unsafe.Pointer(cpath))
2310@@ -660,13 +577,13 @@
2311 cpathCreated := (*C.char)(C.malloc(cpathLen))
2312 defer C.free(unsafe.Pointer(cpathCreated))
2313
2314- rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),
2315- caclv, C.int(flags), cpathCreated, C.int(cpathLen))
2316- if rc != C.ZOK {
2317- return "", newError(rc, cerr)
2318+ rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
2319+ if rc == C.ZOK {
2320+ pathCreated = C.GoString(cpathCreated)
2321+ } else {
2322+ err = zkError(rc, cerr)
2323 }
2324-
2325- return C.GoString(cpathCreated), nil
2326+ return
2327 }
2328
2329 // Set modifies the data for the existing node at the given path, replacing it
2330@@ -677,35 +594,31 @@
2331 //
2332 // It is an error to attempt to set the data of a non-existing node with
2333 // this function. In these cases, use Create instead.
2334-func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {
2335+func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
2336
2337 cpath := C.CString(path)
2338 cvalue := C.CString(value)
2339 defer C.free(unsafe.Pointer(cpath))
2340 defer C.free(unsafe.Pointer(cvalue))
2341
2342- cstat := C.struct_Stat{}
2343-
2344- rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),
2345- C.int(version), &cstat)
2346- if rc != C.ZOK {
2347- return nil, newError(rc, cerr)
2348+ var cstat Stat
2349+ rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c)
2350+ if rc == C.ZOK {
2351+ stat = &cstat
2352+ } else {
2353+ err = zkError(rc, cerr)
2354 }
2355-
2356- return (*resultStat)(&cstat), nil
2357+ return
2358 }
2359
2360 // Delete removes the node at path. If version is not -1, the operation
2361 // will only succeed if the node is still at this version when the
2362 // node is deleted as an atomic operation.
2363-func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {
2364+func (conn *Conn) Delete(path string, version int32) (err os.Error) {
2365 cpath := C.CString(path)
2366 defer C.free(unsafe.Pointer(cpath))
2367- rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))
2368- if rc != C.ZOK {
2369- return newError(rc, cerr)
2370- }
2371- return nil
2372+ rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
2373+ return zkError(rc, cerr)
2374 }
2375
2376 // AddAuth adds a new authentication certificate to the ZooKeeper
2377@@ -713,7 +626,7 @@
2378 // authentication information, while the cert parameter provides the
2379 // identity data itself. For instance, the "digest" scheme requires
2380 // a pair like "username:password" to be provided as the certificate.
2381-func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {
2382+func (conn *Conn) AddAuth(scheme, cert string) os.Error {
2383 cscheme := C.CString(scheme)
2384 ccert := C.CString(cert)
2385 defer C.free(unsafe.Pointer(cscheme))
2386@@ -725,43 +638,38 @@
2387 }
2388 defer C.destroy_completion_data(data)
2389
2390- rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),
2391- C.handle_void_completion, unsafe.Pointer(data))
2392+ rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data))
2393 if rc != C.ZOK {
2394- return newError(rc, cerr)
2395+ return zkError(rc, cerr)
2396 }
2397
2398 C.wait_for_completion(data)
2399
2400 rc = C.int(uintptr(data.data))
2401- if rc != C.ZOK {
2402- return newError(rc, nil)
2403- }
2404-
2405- return nil
2406+ return zkError(rc, nil)
2407 }
2408
2409 // ACL returns the access control list for path.
2410-func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {
2411+func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
2412
2413 cpath := C.CString(path)
2414 defer C.free(unsafe.Pointer(cpath))
2415
2416 caclv := C.struct_ACL_vector{}
2417- cstat := C.struct_Stat{}
2418
2419- rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)
2420+ var cstat Stat
2421+ rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
2422 if rc != C.ZOK {
2423- return nil, nil, newError(rc, cerr)
2424+ return nil, nil, zkError(rc, cerr)
2425 }
2426
2427 aclv := parseACLVector(&caclv)
2428
2429- return aclv, (*resultStat)(&cstat), nil
2430+ return aclv, &cstat, nil
2431 }
2432
2433 // SetACL changes the access control list for path.
2434-func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {
2435+func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
2436
2437 cpath := C.CString(path)
2438 defer C.free(unsafe.Pointer(cpath))
2439@@ -769,12 +677,8 @@
2440 caclv := buildACLVector(aclv)
2441 defer C.deallocate_ACL_vector(caclv)
2442
2443- rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)
2444- if rc != C.ZOK {
2445- return newError(rc, cerr)
2446- }
2447-
2448- return nil
2449+ rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
2450+ return zkError(rc, cerr)
2451 }
2452
2453 func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
2454@@ -822,7 +726,7 @@
2455 // -----------------------------------------------------------------------
2456 // RetryChange utility method.
2457
2458-type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)
2459+type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
2460
2461 // RetryChange runs changeFunc to attempt to atomically change path
2462 // in a lock free manner, and retries in case there was another
2463@@ -831,8 +735,7 @@
2464 // changeFunc must work correctly if called multiple times in case
2465 // the modification fails due to concurrent changes, and it may return
2466 // an error that will cause the the RetryChange function to stop and
2467-// return an Error with code ZSYSTEMERROR and the same .String() result
2468-// as the provided error.
2469+// return the same error.
2470 //
2471 // This mechanism is not suitable for a node that is frequently modified
2472 // concurrently. For those cases, consider using a pessimistic locking
2473@@ -845,8 +748,7 @@
2474 //
2475 // 2. Call the changeFunc with the current node value and stat,
2476 // or with an empty string and nil stat, if the node doesn't yet exist.
2477-// If the changeFunc returns an error, stop and return an Error with
2478-// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
2479+// If the changeFunc returns an error, stop and return the same error.
2480 //
2481 // 3. If the changeFunc returns no errors, use the string returned as
2482 // the new candidate value for the node, and attempt to either create
2483@@ -855,36 +757,32 @@
2484 // in the same node), repeat from step 1. If this procedure fails with any
2485 // other error, stop and return the error found.
2486 //
2487-func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {
2488+func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
2489 for {
2490- oldValue, oldStat, getErr := zk.Get(path)
2491- if getErr != nil && getErr.Code() != ZNONODE {
2492- err = getErr
2493- break
2494- }
2495- newValue, osErr := changeFunc(oldValue, oldStat)
2496- if osErr != nil {
2497- return newError(ZSYSTEMERROR, osErr)
2498- } else if oldStat == nil {
2499- _, err = zk.Create(path, newValue, flags, acl)
2500- if err == nil || err.Code() != ZNODEEXISTS {
2501- break
2502+ oldValue, oldStat, err := conn.Get(path)
2503+ if err != nil && err != ZNONODE {
2504+ return err
2505+ }
2506+ newValue, err := changeFunc(oldValue, oldStat)
2507+ if err != nil {
2508+ return err
2509+ }
2510+ if oldStat == nil {
2511+ _, err := conn.Create(path, newValue, flags, acl)
2512+ if err == nil || err != ZNODEEXISTS {
2513+ return err
2514 }
2515- } else if newValue == oldValue {
2516+ continue
2517+ }
2518+ if newValue == oldValue {
2519 return nil // Nothing to do.
2520- } else {
2521- _, err = zk.Set(path, newValue, oldStat.Version())
2522- if err == nil {
2523- break
2524- } else {
2525- code := err.Code()
2526- if code != ZBADVERSION && code != ZNONODE {
2527- break
2528- }
2529- }
2530+ }
2531+ _, err = conn.Set(path, newValue, oldStat.Version())
2532+ if err == nil || (err != ZBADVERSION && err != ZNONODE) {
2533+ return err
2534 }
2535 }
2536- return err
2537+ panic("not reached")
2538 }
2539
2540 // -----------------------------------------------------------------------
2541@@ -897,7 +795,7 @@
2542 // Whenever a *W method is called, it will return a channel which
2543 // outputs Event values. Internally, a map is used to maintain references
2544 // between an unique integer key (the watchId), and the event channel. The
2545-// watchId is then handed to the C zookeeper library as the watch context,
2546+// watchId is then handed to the C ZooKeeper library as the watch context,
2547 // so that we get it back when events happen. Using an integer key as the
2548 // watch context rather than a pointer is needed because there's no guarantee
2549 // that in the future the GC will not move objects around, and also because
2550@@ -910,13 +808,13 @@
2551 // Since Cgo doesn't allow calling back into Go, we actually fire a new
2552 // goroutine the very first time Init is called, and allow it to block
2553 // in a pthread condition variable within a C function. This condition
2554-// will only be notified once a zookeeper watch callback appends new
2555+// will only be notified once a ZooKeeper watch callback appends new
2556 // entries to the event list. When this happens, the C function returns
2557 // and we get back into Go land with the pointer to the watch data,
2558 // including the watchId and other event details such as type and path.
2559
2560 var watchMutex sync.Mutex
2561-var watchZooKeepers = make(map[uintptr]*ZooKeeper)
2562+var watchConns = make(map[uintptr]*Conn)
2563 var watchCounter uintptr
2564 var watchLoopCounter int
2565
2566@@ -925,25 +823,20 @@
2567 // mostly as a debugging and testing aid.
2568 func CountPendingWatches() int {
2569 watchMutex.Lock()
2570- count := len(watchZooKeepers)
2571+ count := len(watchConns)
2572 watchMutex.Unlock()
2573 return count
2574 }
2575
2576-// createWatch creates and registers a watch, returning the watch id
2577-// and channel.
2578-func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
2579- buf := 1 // session/watch event
2580- if session {
2581- buf = 32
2582- }
2583- watchChannel = make(chan Event, buf)
2584+// createWatch creates a watch that will send events down the provided
2585+// channel, which must be one of the four possible event channel types.
2586+func (conn *Conn) createWatch(watchChannel statusChan) (watchId uintptr) {
2587 watchMutex.Lock()
2588 defer watchMutex.Unlock()
2589 watchId = watchCounter
2590 watchCounter += 1
2591- zk.watchChannels[watchId] = watchChannel
2592- watchZooKeepers[watchId] = zk
2593+ conn.watchChannels[watchId] = watchChannel
2594+ watchConns[watchId] = conn
2595 return
2596 }
2597
2598@@ -951,67 +844,71 @@
2599 // from ever getting delivered. It shouldn't be used if there's any
2600 // chance the watch channel is still visible and not closed, since
2601 // it might mean a goroutine would be blocked forever.
2602-func (zk *ZooKeeper) forgetWatch(watchId uintptr) {
2603+func (conn *Conn) forgetWatch(watchId uintptr) {
2604 watchMutex.Lock()
2605 defer watchMutex.Unlock()
2606- zk.watchChannels[watchId] = nil, false
2607- watchZooKeepers[watchId] = nil, false
2608+ conn.watchChannels[watchId] = nil, false
2609+ watchConns[watchId] = nil, false
2610 }
2611
2612-// closeAllWatches closes all watch channels for zk.
2613-func (zk *ZooKeeper) closeAllWatches() {
2614+// closeAllWatches closes all watch channels for conn.
2615+func (conn *Conn) closeAllWatches() {
2616 watchMutex.Lock()
2617 defer watchMutex.Unlock()
2618- for watchId, ch := range zk.watchChannels {
2619- close(ch)
2620- zk.watchChannels[watchId] = nil, false
2621- watchZooKeepers[watchId] = nil, false
2622+ for watchId, ch := range conn.watchChannels {
2623+ ch.close()
2624+ conn.watchChannels[watchId] = nil, false
2625+ watchConns[watchId] = nil, false
2626 }
2627 }
2628
2629 // sendEvent delivers the event to the watchId event channel. If the
2630 // event channel is a watch event channel, the event is delivered,
2631 // the channel is closed, and resources are freed.
2632-func sendEvent(watchId uintptr, event Event) {
2633- if event.State == STATE_CLOSED {
2634- panic("Attempted to send a CLOSED event")
2635+func sendEvent(watchId uintptr, eventType, connectionState int, path string) {
2636+ if eventType == event_SESSION && connectionState == 0 {
2637+ panic("Attempted to send a CLOSED connection status")
2638 }
2639 watchMutex.Lock()
2640 defer watchMutex.Unlock()
2641- zk, ok := watchZooKeepers[watchId]
2642+ conn, ok := watchConns[watchId]
2643 if !ok {
2644 return
2645 }
2646- if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId {
2647- switch event.State {
2648- case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
2649+ ch := conn.watchChannels[watchId]
2650+ if ch == nil {
2651+ return
2652+ }
2653+ if eventType == event_SESSION && watchId != conn.sessionWatchId {
2654+ switch SessionStatus(connectionState) {
2655+ case SessionExpired, SessionAuthFailed:
2656+ // The channel will be closed below.
2657 default:
2658+ // Ignore non-critical session events.
2659 // WTF? Feels like TCP saying "dropped a dup packet, ok?"
2660 return
2661 }
2662- }
2663- ch := zk.watchChannels[watchId]
2664- if ch == nil {
2665- return
2666- }
2667- select {
2668- case ch <- event:
2669- default:
2670- // Channel not available for sending, which means session
2671- // events are necessarily involved (trivial events go
2672- // straight to the buffer), and the application isn't paying
2673- // attention for long enough to have the buffer filled up.
2674- // Break down now rather than leaking forever.
2675- if watchId == zk.sessionWatchId {
2676- panic("Session event channel buffer is full")
2677- } else {
2678- panic("Watch event channel buffer is full")
2679- }
2680- }
2681- if watchId != zk.sessionWatchId {
2682- zk.watchChannels[watchId] = nil, false
2683- watchZooKeepers[watchId] = nil, false
2684- close(ch)
2685+ } else {
2686+ if eventType != event_SESSION && watchId == conn.sessionWatchId {
2687+ panic(fmt.Errorf("unexpected non-session event %v %v", eventType, connectionState))
2688+ }
2689+ if !ch.send(eventType, connectionState, path) {
2690+ // Channel not available for sending, which means session
2691+ // events are necessarily involved (trivial events go
2692+ // straight to the buffer), and the application isn't paying
2693+ // attention for long enough to have the buffer filled up.
2694+ // Break down now rather than leaking forever.
2695+ if watchId == conn.sessionWatchId {
2696+ panic("Session event channel buffer is full")
2697+ } else {
2698+ panic("Watch event channel buffer is full (impossible!)")
2699+ }
2700+ }
2701+ }
2702+ if watchId != conn.sessionWatchId {
2703+ conn.watchChannels[watchId] = nil, false
2704+ watchConns[watchId] = nil, false
2705+ ch.close()
2706 }
2707 }
2708
2709@@ -1049,13 +946,8 @@
2710 for {
2711 // This will block until there's a watch event is available.
2712 data := C.wait_for_watch()
2713- event := Event{
2714- Type: int(data.event_type),
2715- Path: C.GoString(data.event_path),
2716- State: int(data.connection_state),
2717- }
2718 watchId := uintptr(data.watch_context)
2719+ sendEvent(watchId, int(data.event_type), int(data.connection_state), C.GoString(data.event_path))
2720 C.destroy_watch_data(data)
2721- sendEvent(watchId, event)
2722 }
2723 }
2724
2725=== renamed file 'gozk_test.go' => 'zk_test.go'
2726--- gozk_test.go 2011-08-19 01:51:37 +0000
2727+++ zk_test.go 2011-09-30 13:35:26 +0000
2728@@ -1,17 +1,17 @@
2729-package gozk_test
2730+package zk_test
2731
2732 import (
2733 . "launchpad.net/gocheck"
2734- "gozk"
2735+ "launchpad.net/gozk/zk"
2736 "time"
2737 )
2738
2739 // This error will be delivered via C errno, since ZK unfortunately
2740 // only provides the handler back from zookeeper_init().
2741 func (s *S) TestInitErrorThroughErrno(c *C) {
2742- zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)
2743- if zk != nil {
2744- zk.Close()
2745+ conn, watch, err := zk.Dial("bad-domain-without-port", 5e9)
2746+ if conn != nil {
2747+ conn.Close()
2748 }
2749 if watch != nil {
2750 go func() {
2751@@ -23,15 +23,15 @@
2752 }
2753 }()
2754 }
2755- c.Assert(zk, IsNil)
2756+ c.Assert(conn, IsNil)
2757 c.Assert(watch, IsNil)
2758 c.Assert(err, Matches, "invalid argument")
2759 }
2760
2761 func (s *S) TestRecvTimeoutInitParameter(c *C) {
2762- zk, watch, err := gozk.Init(s.zkAddr, 0)
2763+ conn, watch, err := zk.Dial(s.zkAddr, 0)
2764 c.Assert(err, IsNil)
2765- defer zk.Close()
2766+ defer conn.Close()
2767
2768 select {
2769 case <-watch:
2770@@ -40,7 +40,7 @@
2771 }
2772
2773 for i := 0; i != 1000; i++ {
2774- _, _, err := zk.Get("/zookeeper")
2775+ _, _, err := conn.Get("/zookeeper")
2776 if err != nil {
2777 c.Assert(err, Matches, "operation timeout")
2778 c.SucceedNow()
2779@@ -51,88 +51,75 @@
2780 }
2781
2782 func (s *S) TestSessionWatches(c *C) {
2783- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2784+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2785
2786 zk1, watch1 := s.init(c)
2787 zk2, watch2 := s.init(c)
2788 zk3, watch3 := s.init(c)
2789
2790- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2791+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2792
2793 event1 := <-watch1
2794- c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)
2795- c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)
2796+ c.Assert(event1, Equals, zk.SessionConnected)
2797
2798- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2799+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2800
2801 event2 := <-watch2
2802- c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)
2803- c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)
2804+ c.Assert(event2, Equals, zk.SessionConnected)
2805
2806- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2807+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2808
2809 event3 := <-watch3
2810- c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)
2811- c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)
2812+ c.Assert(event3, Equals, zk.SessionConnected)
2813
2814- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2815+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2816
2817 zk1.Close()
2818- c.Assert(gozk.CountPendingWatches(), Equals, 2)
2819+ c.Assert(zk.CountPendingWatches(), Equals, 2)
2820 zk2.Close()
2821- c.Assert(gozk.CountPendingWatches(), Equals, 1)
2822+ c.Assert(zk.CountPendingWatches(), Equals, 1)
2823 zk3.Close()
2824- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2825+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2826 }
2827
2828-// Gozk injects a STATE_CLOSED event when zk.Close() is called, right
2829+// Gozk injects a STATE_CLOSED event when conn.Close() is called, right
2830 // before the channel is closed. Closing the channel injects a nil
2831 // pointer, as usual for Go, so the STATE_CLOSED gives a chance to
2832 // know that a nil pointer is coming, and to stop the procedure.
2833 // Hopefully this procedure will avoid some nil-pointer references by
2834 // mistake.
2835 func (s *S) TestClosingStateInSessionWatch(c *C) {
2836- zk, watch := s.init(c)
2837+ conn, watch := s.init(c)
2838
2839 event := <-watch
2840- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
2841- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
2842+ c.Assert(event, Equals, zk.SessionConnected)
2843
2844- zk.Close()
2845+ conn.Close()
2846 event, ok := <-watch
2847 c.Assert(ok, Equals, false)
2848- c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)
2849- c.Assert(event.State, Equals, gozk.STATE_CLOSED)
2850-}
2851-
2852-func (s *S) TestEventString(c *C) {
2853- var event gozk.Event
2854- event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}
2855- c.Assert(event, Matches, "ZooKeeper connected")
2856- event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}
2857- c.Assert(event, Matches, "ZooKeeper connected; path created: /path")
2858- event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}
2859- c.Assert(event, Matches, "ZooKeeper connection closed")
2860-}
2861-
2862-var okTests = []struct{gozk.Event; Ok bool}{
2863- {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},
2864- {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},
2865- {gozk.Event{0, "", gozk.STATE_CLOSED}, false},
2866- {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},
2867- {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},
2868+ c.Assert(event, Equals, zk.SessionStatus(0))
2869+}
2870+
2871+var okTests = []struct {
2872+ zk.SessionStatus
2873+ Ok bool
2874+}{
2875+ {zk.SessionConnected, true},
2876+ {0, false},
2877+ {zk.SessionExpired, false},
2878+ {zk.SessionAuthFailed, false},
2879 }
2880
2881 func (s *S) TestEventOk(c *C) {
2882 for _, t := range okTests {
2883- c.Assert(t.Event.Ok(), Equals, t.Ok)
2884+ c.Assert(t.SessionStatus.Ok(), Equals, t.Ok)
2885 }
2886 }
2887
2888 func (s *S) TestGetAndStat(c *C) {
2889- zk, _ := s.init(c)
2890+ conn, _ := s.init(c)
2891
2892- data, stat, err := zk.Get("/zookeeper")
2893+ data, stat, err := conn.Get("/zookeeper")
2894 c.Assert(err, IsNil)
2895 c.Assert(data, Equals, "")
2896 c.Assert(stat.Czxid(), Equals, int64(0))
2897@@ -149,58 +136,58 @@
2898 }
2899
2900 func (s *S) TestGetAndError(c *C) {
2901- zk, _ := s.init(c)
2902+ conn, _ := s.init(c)
2903
2904- data, stat, err := zk.Get("/non-existent")
2905+ data, stat, err := conn.Get("/non-existent")
2906
2907 c.Assert(data, Equals, "")
2908 c.Assert(stat, IsNil)
2909 c.Assert(err, Matches, "no node")
2910- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2911+ c.Assert(err, Equals, zk.ZNONODE)
2912 }
2913
2914 func (s *S) TestCreateAndGet(c *C) {
2915- zk, _ := s.init(c)
2916+ conn, _ := s.init(c)
2917
2918- path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2919+ path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2920 c.Assert(err, IsNil)
2921 c.Assert(path, Matches, "/test-[0-9]+")
2922
2923 // Check the error condition from Create().
2924- _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2925+ _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2926 c.Assert(err, Matches, "node exists")
2927
2928- data, _, err := zk.Get(path)
2929+ data, _, err := conn.Get(path)
2930 c.Assert(err, IsNil)
2931 c.Assert(data, Equals, "bababum")
2932 }
2933
2934 func (s *S) TestCreateSetAndGet(c *C) {
2935- zk, _ := s.init(c)
2936+ conn, _ := s.init(c)
2937
2938- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2939+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2940 c.Assert(err, IsNil)
2941
2942- stat, err := zk.Set("/test", "bababum", -1) // Any version.
2943+ stat, err := conn.Set("/test", "bababum", -1) // Any version.
2944 c.Assert(err, IsNil)
2945 c.Assert(stat.Version(), Equals, int32(1))
2946
2947- data, _, err := zk.Get("/test")
2948+ data, _, err := conn.Get("/test")
2949 c.Assert(err, IsNil)
2950 c.Assert(data, Equals, "bababum")
2951 }
2952
2953 func (s *S) TestGetAndWatch(c *C) {
2954- c.Check(gozk.CountPendingWatches(), Equals, 0)
2955-
2956- zk, _ := s.init(c)
2957-
2958- c.Check(gozk.CountPendingWatches(), Equals, 1)
2959-
2960- _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2961+ c.Check(zk.CountPendingWatches(), Equals, 0)
2962+
2963+ conn, _ := s.init(c)
2964+
2965+ c.Check(zk.CountPendingWatches(), Equals, 1)
2966+
2967+ _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2968 c.Assert(err, IsNil)
2969
2970- data, stat, watch, err := zk.GetW("/test")
2971+ data, stat, watch, err := conn.GetW("/test")
2972 c.Assert(err, IsNil)
2973 c.Assert(data, Equals, "one")
2974 c.Assert(stat.Version(), Equals, int32(0))
2975@@ -211,17 +198,17 @@
2976 default:
2977 }
2978
2979- c.Check(gozk.CountPendingWatches(), Equals, 2)
2980+ c.Check(zk.CountPendingWatches(), Equals, 2)
2981
2982- _, err = zk.Set("/test", "two", -1)
2983+ _, err = conn.Set("/test", "two", -1)
2984 c.Assert(err, IsNil)
2985
2986 event := <-watch
2987- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2988-
2989- c.Check(gozk.CountPendingWatches(), Equals, 1)
2990-
2991- data, _, watch, err = zk.GetW("/test")
2992+ c.Assert(event, Equals, zk.NodeChanged)
2993+
2994+ c.Check(zk.CountPendingWatches(), Equals, 1)
2995+
2996+ data, _, watch, err = conn.GetW("/test")
2997 c.Assert(err, IsNil)
2998 c.Assert(data, Equals, "two")
2999
3000@@ -231,86 +218,86 @@
3001 default:
3002 }
3003
3004- c.Check(gozk.CountPendingWatches(), Equals, 2)
3005+ c.Check(zk.CountPendingWatches(), Equals, 2)
3006
3007- _, err = zk.Set("/test", "three", -1)
3008+ _, err = conn.Set("/test", "three", -1)
3009 c.Assert(err, IsNil)
3010
3011 event = <-watch
3012- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
3013+ c.Assert(event, Equals, zk.NodeChanged)
3014
3015- c.Check(gozk.CountPendingWatches(), Equals, 1)
3016+ c.Check(zk.CountPendingWatches(), Equals, 1)
3017 }
3018
3019 func (s *S) TestGetAndWatchWithError(c *C) {
3020- c.Check(gozk.CountPendingWatches(), Equals, 0)
3021-
3022- zk, _ := s.init(c)
3023-
3024- c.Check(gozk.CountPendingWatches(), Equals, 1)
3025-
3026- _, _, watch, err := zk.GetW("/test")
3027+ c.Check(zk.CountPendingWatches(), Equals, 0)
3028+
3029+ conn, _ := s.init(c)
3030+
3031+ c.Check(zk.CountPendingWatches(), Equals, 1)
3032+
3033+ _, _, watch, err := conn.GetW("/test")
3034 c.Assert(err, NotNil)
3035- c.Assert(err.Code(), Equals, gozk.ZNONODE)
3036+ c.Assert(err, Equals, zk.ZNONODE)
3037 c.Assert(watch, IsNil)
3038
3039- c.Check(gozk.CountPendingWatches(), Equals, 1)
3040+ c.Check(zk.CountPendingWatches(), Equals, 1)
3041 }
3042
3043 func (s *S) TestCloseReleasesWatches(c *C) {
3044- c.Check(gozk.CountPendingWatches(), Equals, 0)
3045-
3046- zk, _ := s.init(c)
3047-
3048- c.Check(gozk.CountPendingWatches(), Equals, 1)
3049-
3050- _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3051- c.Assert(err, IsNil)
3052-
3053- _, _, _, err = zk.GetW("/test")
3054- c.Assert(err, IsNil)
3055-
3056- c.Assert(gozk.CountPendingWatches(), Equals, 2)
3057-
3058- zk.Close()
3059-
3060- c.Assert(gozk.CountPendingWatches(), Equals, 0)
3061+ c.Check(zk.CountPendingWatches(), Equals, 0)
3062+
3063+ conn, _ := s.init(c)
3064+
3065+ c.Check(zk.CountPendingWatches(), Equals, 1)
3066+
3067+ _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3068+ c.Assert(err, IsNil)
3069+
3070+ _, _, _, err = conn.GetW("/test")
3071+ c.Assert(err, IsNil)
3072+
3073+ c.Assert(zk.CountPendingWatches(), Equals, 2)
3074+
3075+ conn.Close()
3076+
3077+ c.Assert(zk.CountPendingWatches(), Equals, 0)
3078 }
3079
3080 // By default, the ZooKeeper C client will hang indefinitely if a
3081 // handler is closed twice. We get in the way and prevent it.
3082 func (s *S) TestClosingTwiceDoesntHang(c *C) {
3083- zk, _ := s.init(c)
3084- err := zk.Close()
3085+ conn, _ := s.init(c)
3086+ err := conn.Close()
3087 c.Assert(err, IsNil)
3088- err = zk.Close()
3089+ err = conn.Close()
3090 c.Assert(err, NotNil)
3091- c.Assert(err.Code(), Equals, gozk.ZCLOSING)
3092+ c.Assert(err, Equals, zk.ZCLOSING)
3093 }
3094
3095 func (s *S) TestChildren(c *C) {
3096- zk, _ := s.init(c)
3097+ conn, _ := s.init(c)
3098
3099- children, stat, err := zk.Children("/")
3100+ children, stat, err := conn.Children("/")
3101 c.Assert(err, IsNil)
3102 c.Assert(children, Equals, []string{"zookeeper"})
3103 c.Assert(stat.NumChildren(), Equals, int32(1))
3104
3105- children, stat, err = zk.Children("/non-existent")
3106+ children, stat, err = conn.Children("/non-existent")
3107 c.Assert(err, NotNil)
3108- c.Assert(err.Code(), Equals, gozk.ZNONODE)
3109+ c.Assert(err, Equals, zk.ZNONODE)
3110 c.Assert(children, Equals, []string{})
3111- c.Assert(stat, Equals, nil)
3112+ c.Assert(stat, IsNil)
3113 }
3114
3115 func (s *S) TestChildrenAndWatch(c *C) {
3116- c.Check(gozk.CountPendingWatches(), Equals, 0)
3117-
3118- zk, _ := s.init(c)
3119-
3120- c.Check(gozk.CountPendingWatches(), Equals, 1)
3121-
3122- children, stat, watch, err := zk.ChildrenW("/")
3123+ c.Check(zk.CountPendingWatches(), Equals, 0)
3124+
3125+ conn, _ := s.init(c)
3126+
3127+ c.Check(zk.CountPendingWatches(), Equals, 1)
3128+
3129+ children, stat, watch, err := conn.ChildrenW("/")
3130 c.Assert(err, IsNil)
3131 c.Assert(children, Equals, []string{"zookeeper"})
3132 c.Assert(stat.NumChildren(), Equals, int32(1))
3133@@ -321,18 +308,17 @@
3134 default:
3135 }
3136
3137- c.Check(gozk.CountPendingWatches(), Equals, 2)
3138+ c.Check(zk.CountPendingWatches(), Equals, 2)
3139
3140- _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3141+ _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3142 c.Assert(err, IsNil)
3143
3144 event := <-watch
3145- c.Assert(event.Type, Equals, gozk.EVENT_CHILD)
3146- c.Assert(event.Path, Equals, "/")
3147-
3148- c.Check(gozk.CountPendingWatches(), Equals, 1)
3149-
3150- children, stat, watch, err = zk.ChildrenW("/")
3151+ c.Assert(event, Equals, zk.NodeChangedChild)
3152+
3153+ c.Check(zk.CountPendingWatches(), Equals, 1)
3154+
3155+ children, stat, watch, err = conn.ChildrenW("/")
3156 c.Assert(err, IsNil)
3157 c.Assert(stat.NumChildren(), Equals, int32(2))
3158
3159@@ -345,57 +331,56 @@
3160 default:
3161 }
3162
3163- c.Check(gozk.CountPendingWatches(), Equals, 2)
3164+ c.Check(zk.CountPendingWatches(), Equals, 2)
3165
3166- _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3167+ _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3168 c.Assert(err, IsNil)
3169
3170 event = <-watch
3171- c.Assert(event.Type, Equals, gozk.EVENT_CHILD)
3172+ c.Assert(event, Equals, zk.NodeChangedChild)
3173
3174- c.Check(gozk.CountPendingWatches(), Equals, 1)
3175+ c.Check(zk.CountPendingWatches(), Equals, 1)
3176 }
3177
3178 func (s *S) TestChildrenAndWatchWithError(c *C) {
3179- c.Check(gozk.CountPendingWatches(), Equals, 0)
3180-
3181- zk, _ := s.init(c)
3182-
3183- c.Check(gozk.CountPendingWatches(), Equals, 1)
3184-
3185- _, stat, watch, err := zk.ChildrenW("/test")
3186+ c.Check(zk.CountPendingWatches(), Equals, 0)
3187+
3188+ conn, _ := s.init(c)
3189+
3190+ c.Check(zk.CountPendingWatches(), Equals, 1)
3191+
3192+ _, stat, watch, err := conn.ChildrenW("/test")
3193 c.Assert(err, NotNil)
3194- c.Assert(err.Code(), Equals, gozk.ZNONODE)
3195+ c.Assert(err, Equals, zk.ZNONODE)
3196 c.Assert(watch, IsNil)
3197 c.Assert(stat, IsNil)
3198
3199- c.Check(gozk.CountPendingWatches(), Equals, 1)
3200+ c.Check(zk.CountPendingWatches(), Equals, 1)
3201 }
3202
3203 func (s *S) TestExists(c *C) {
3204- zk, _ := s.init(c)
3205-
3206- stat, err := zk.Exists("/zookeeper")
3207- c.Assert(err, IsNil)
3208- c.Assert(stat.NumChildren(), Equals, int32(1))
3209-
3210- stat, err = zk.Exists("/non-existent")
3211+ conn, _ := s.init(c)
3212+
3213+ stat, err := conn.Exists("/non-existent")
3214 c.Assert(err, IsNil)
3215 c.Assert(stat, IsNil)
3216+
3217+ stat, err = conn.Exists("/zookeeper")
3218+ c.Assert(err, IsNil)
3219 }
3220
3221 func (s *S) TestExistsAndWatch(c *C) {
3222- c.Check(gozk.CountPendingWatches(), Equals, 0)
3223-
3224- zk, _ := s.init(c)
3225-
3226- c.Check(gozk.CountPendingWatches(), Equals, 1)
3227-
3228- stat, watch, err := zk.ExistsW("/test")
3229+ c.Check(zk.CountPendingWatches(), Equals, 0)
3230+
3231+ conn, _ := s.init(c)
3232+
3233+ c.Check(zk.CountPendingWatches(), Equals, 1)
3234+
3235+ stat, watch, err := conn.ExistsW("/test")
3236 c.Assert(err, IsNil)
3237 c.Assert(stat, IsNil)
3238
3239- c.Check(gozk.CountPendingWatches(), Equals, 2)
3240+ c.Check(zk.CountPendingWatches(), Equals, 2)
3241
3242 select {
3243 case <-watch:
3244@@ -403,62 +388,61 @@
3245 default:
3246 }
3247
3248- _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3249+ _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3250 c.Assert(err, IsNil)
3251
3252 event := <-watch
3253- c.Assert(event.Type, Equals, gozk.EVENT_CREATED)
3254- c.Assert(event.Path, Equals, "/test")
3255-
3256- c.Check(gozk.CountPendingWatches(), Equals, 1)
3257-
3258- stat, watch, err = zk.ExistsW("/test")
3259+ c.Assert(event, Equals, zk.NodeCreated)
3260+
3261+ c.Check(zk.CountPendingWatches(), Equals, 1)
3262+
3263+ stat, watch, err = conn.ExistsW("/test")
3264 c.Assert(err, IsNil)
3265 c.Assert(stat, NotNil)
3266 c.Assert(stat.NumChildren(), Equals, int32(0))
3267
3268- c.Check(gozk.CountPendingWatches(), Equals, 2)
3269+ c.Check(zk.CountPendingWatches(), Equals, 2)
3270 }
3271
3272 func (s *S) TestExistsAndWatchWithError(c *C) {
3273- c.Check(gozk.CountPendingWatches(), Equals, 0)
3274-
3275- zk, _ := s.init(c)
3276-
3277- c.Check(gozk.CountPendingWatches(), Equals, 1)
3278-
3279- stat, watch, err := zk.ExistsW("///")
3280+ c.Check(zk.CountPendingWatches(), Equals, 0)
3281+
3282+ conn, _ := s.init(c)
3283+
3284+ c.Check(zk.CountPendingWatches(), Equals, 1)
3285+
3286+ stat, watch, err := conn.ExistsW("///")
3287 c.Assert(err, NotNil)
3288- c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)
3289+ c.Assert(err, Equals, zk.ZBADARGUMENTS)
3290 c.Assert(stat, IsNil)
3291 c.Assert(watch, IsNil)
3292
3293- c.Check(gozk.CountPendingWatches(), Equals, 1)
3294+ c.Check(zk.CountPendingWatches(), Equals, 1)
3295 }
3296
3297 func (s *S) TestDelete(c *C) {
3298- zk, _ := s.init(c)
3299-
3300- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3301- c.Assert(err, IsNil)
3302-
3303- err = zk.Delete("/test", 5)
3304- c.Assert(err, NotNil)
3305- c.Assert(err.Code(), Equals, gozk.ZBADVERSION)
3306-
3307- err = zk.Delete("/test", -1)
3308- c.Assert(err, IsNil)
3309-
3310- err = zk.Delete("/test", -1)
3311- c.Assert(err, NotNil)
3312- c.Assert(err.Code(), Equals, gozk.ZNONODE)
3313+ conn, _ := s.init(c)
3314+
3315+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3316+ c.Assert(err, IsNil)
3317+
3318+ err = conn.Delete("/test", 5)
3319+ c.Assert(err, NotNil)
3320+ c.Assert(err, Equals, zk.ZBADVERSION)
3321+
3322+ err = conn.Delete("/test", -1)
3323+ c.Assert(err, IsNil)
3324+
3325+ err = conn.Delete("/test", -1)
3326+ c.Assert(err, NotNil)
3327+ c.Assert(err, Equals, zk.ZNONODE)
3328 }
3329
3330 func (s *S) TestClientIdAndReInit(c *C) {
3331 zk1, _ := s.init(c)
3332 clientId1 := zk1.ClientId()
3333
3334- zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)
3335+ zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1)
3336 c.Assert(err, IsNil)
3337 defer zk2.Close()
3338 clientId2 := zk2.ClientId()
3339@@ -469,110 +453,113 @@
3340 // Surprisingly for some (including myself, initially), the watch
3341 // returned by the exists method actually fires on data changes too.
3342 func (s *S) TestExistsWatchOnDataChange(c *C) {
3343- zk, _ := s.init(c)
3344-
3345- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3346- c.Assert(err, IsNil)
3347-
3348- _, watch, err := zk.ExistsW("/test")
3349- c.Assert(err, IsNil)
3350-
3351- _, err = zk.Set("/test", "new", -1)
3352+ conn, _ := s.init(c)
3353+
3354+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3355+ c.Assert(err, IsNil)
3356+
3357+ _, watch, err := conn.ExistsW("/test")
3358+ c.Assert(err, IsNil)
3359+
3360+ _, err = conn.Set("/test", "new", -1)
3361 c.Assert(err, IsNil)
3362
3363 event := <-watch
3364
3365- c.Assert(event.Path, Equals, "/test")
3366- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
3367+ c.Assert(event, Equals, zk.NodeChanged)
3368 }
3369
3370 func (s *S) TestACL(c *C) {
3371- zk, _ := s.init(c)
3372-
3373- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3374- c.Assert(err, IsNil)
3375-
3376- acl, stat, err := zk.ACL("/test")
3377- c.Assert(err, IsNil)
3378- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
3379+ conn, _ := s.init(c)
3380+
3381+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3382+ c.Assert(err, IsNil)
3383+
3384+ acl, stat, err := conn.ACL("/test")
3385+ c.Assert(err, IsNil)
3386+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
3387 c.Assert(stat, NotNil)
3388 c.Assert(stat.Version(), Equals, int32(0))
3389
3390- acl, stat, err = zk.ACL("/non-existent")
3391+ acl, stat, err = conn.ACL("/non-existent")
3392 c.Assert(err, NotNil)
3393- c.Assert(err.Code(), Equals, gozk.ZNONODE)
3394+ c.Assert(err, Equals, zk.ZNONODE)
3395 c.Assert(acl, IsNil)
3396 c.Assert(stat, IsNil)
3397+ c.Log("finished TestACL")
3398 }
3399
3400 func (s *S) TestSetACL(c *C) {
3401- zk, _ := s.init(c)
3402+ conn, _ := s.init(c)
3403
3404- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3405+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3406 c.Assert(err, IsNil)
3407
3408- err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)
3409+ err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5)
3410 c.Assert(err, NotNil)
3411- c.Assert(err.Code(), Equals, gozk.ZBADVERSION)
3412-
3413- err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)
3414- c.Assert(err, IsNil)
3415-
3416- acl, _, err := zk.ACL("/test")
3417- c.Assert(err, IsNil)
3418- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))
3419+ c.Assert(err, Equals, zk.ZBADVERSION)
3420+
3421+ err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1)
3422+ c.Assert(err, IsNil)
3423+
3424+ acl, _, err := conn.ACL("/test")
3425+ c.Assert(err, IsNil)
3426+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
3427 }
3428
3429 func (s *S) TestAddAuth(c *C) {
3430- zk, _ := s.init(c)
3431-
3432- acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
3433-
3434- _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)
3435+ conn, _ := s.init(c)
3436+
3437+ acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
3438+
3439+ _, err := conn.Create("/test", "", zk.EPHEMERAL, acl)
3440 c.Assert(err, IsNil)
3441
3442- _, _, err = zk.Get("/test")
3443+ _, _, err = conn.Get("/test")
3444 c.Assert(err, NotNil)
3445- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
3446+ c.Assert(err, Equals, zk.ZNOAUTH)
3447
3448- err = zk.AddAuth("digest", "joe:passwd")
3449+ err = conn.AddAuth("digest", "joe:passwd")
3450 c.Assert(err, IsNil)
3451
3452- _, _, err = zk.Get("/test")
3453+ _, _, err = conn.Get("/test")
3454 c.Assert(err, IsNil)
3455 }
3456
3457 func (s *S) TestWatchOnReconnection(c *C) {
3458- c.Check(gozk.CountPendingWatches(), Equals, 0)
3459+ c.Check(zk.CountPendingWatches(), Equals, 0)
3460
3461- zk, session := s.init(c)
3462+ conn, session := s.init(c)
3463
3464 event := <-session
3465- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
3466- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
3467-
3468- c.Check(gozk.CountPendingWatches(), Equals, 1)
3469-
3470- stat, watch, err := zk.ExistsW("/test")
3471+ c.Assert(event, Equals, zk.SessionConnected)
3472+
3473+ c.Check(zk.CountPendingWatches(), Equals, 1)
3474+
3475+ stat, watch, err := conn.ExistsW("/test")
3476 c.Assert(err, IsNil)
3477 c.Assert(stat, IsNil)
3478
3479- c.Check(gozk.CountPendingWatches(), Equals, 2)
3480-
3481- s.StopZK()
3482+ c.Check(zk.CountPendingWatches(), Equals, 2)
3483+
3484+ err = s.zkServer.Stop()
3485+ c.Assert(err, IsNil)
3486+
3487 time.Sleep(2e9)
3488- s.StartZK()
3489-
3490- // The session channel should receive the reconnection notification,
3491+
3492+ err = s.zkServer.Start()
3493+ c.Assert(err, IsNil)
3494+
3495+ // The session channel should receive the reconnection notification.
3496 select {
3497 case event := <-session:
3498- c.Assert(event.State, Equals, gozk.STATE_CONNECTING)
3499+ c.Assert(event, Equals, zk.SessionConnecting)
3500 case <-time.After(3e9):
3501 c.Fatal("Session watch didn't fire")
3502 }
3503 select {
3504 case event := <-session:
3505- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
3506+ c.Assert(event, Equals, zk.SessionConnected)
3507 case <-time.After(3e9):
3508 c.Fatal("Session watch didn't fire")
3509 }
3510@@ -585,40 +572,38 @@
3511 }
3512
3513 // And it should still work.
3514- _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3515+ _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3516 c.Assert(err, IsNil)
3517
3518- event = <-watch
3519- c.Assert(event.Type, Equals, gozk.EVENT_CREATED)
3520- c.Assert(event.Path, Equals, "/test")
3521+ nodeStatus := <-watch
3522+ c.Assert(nodeStatus, Equals, zk.NodeCreated)
3523
3524- c.Check(gozk.CountPendingWatches(), Equals, 1)
3525+ c.Check(zk.CountPendingWatches(), Equals, 1)
3526 }
3527
3528 func (s *S) TestWatchOnSessionExpiration(c *C) {
3529- c.Check(gozk.CountPendingWatches(), Equals, 0)
3530+ c.Check(zk.CountPendingWatches(), Equals, 0)
3531
3532- zk, session := s.init(c)
3533+ conn, session := s.init(c)
3534
3535 event := <-session
3536- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
3537- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
3538-
3539- c.Check(gozk.CountPendingWatches(), Equals, 1)
3540-
3541- stat, watch, err := zk.ExistsW("/test")
3542+ c.Assert(event, Equals, zk.SessionConnected)
3543+
3544+ c.Check(zk.CountPendingWatches(), Equals, 1)
3545+
3546+ stat, watch, err := conn.ExistsW("/test")
3547 c.Assert(err, IsNil)
3548 c.Assert(stat, IsNil)
3549
3550- c.Check(gozk.CountPendingWatches(), Equals, 2)
3551+ c.Check(zk.CountPendingWatches(), Equals, 2)
3552
3553 // Use expiration trick described in the FAQ.
3554- clientId := zk.ClientId()
3555- zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)
3556+ clientId := conn.ClientId()
3557+ zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId)
3558
3559 for event := range session2 {
3560 c.Log("Event from overlapping session: ", event)
3561- if event.State == gozk.STATE_CONNECTED {
3562+ if event == zk.SessionConnected {
3563 // Wait for zk to process the connection.
3564 // Not reliable without this. :-(
3565 time.Sleep(1e9)
3566@@ -627,21 +612,21 @@
3567 }
3568 for event := range session {
3569 c.Log("Event from primary session: ", event)
3570- if event.State == gozk.STATE_EXPIRED_SESSION {
3571+ if event == zk.SessionExpired {
3572 break
3573 }
3574 }
3575
3576 select {
3577- case event := <-watch:
3578- c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)
3579+ case event, ok := <-watch:
3580+ c.Assert(event, Equals, zk.NodeStatus(0))
3581+ c.Assert(ok, Equals, false)
3582 case <-time.After(3e9):
3583 c.Fatal("Watch event didn't fire")
3584 }
3585
3586- event = <-watch
3587- c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)
3588- c.Assert(event.State, Equals, gozk.STATE_CLOSED)
3589+ nodeStatus := <-watch
3590+ c.Assert(nodeStatus, Equals, zk.NodeStatus(0))
3591
3592- c.Check(gozk.CountPendingWatches(), Equals, 1)
3593+ c.Check(zk.CountPendingWatches(), Equals, 1)
3594 }

Subscribers

People subscribed via source and target branches

to all changes: