Merge lp:~rogpeppe/gozk/factor-out-service into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Rejected
Rejected by: Gustavo Niemeyer
Proposed branch: lp:~rogpeppe/gozk/factor-out-service
Merge into: lp:~juju/gozk/trunk
Diff against target: 3109 lines (+1281/-784)
10 files modified
Makefile (+5/-2)
example/example.go (+22/-23)
reattach_test.go (+202/-0)
retry_test.go (+65/-74)
server.go (+239/-0)
service/Makefile (+20/-0)
service/service.go (+160/-0)
suite_test.go (+45/-122)
zk.go (+265/-311)
zk_test.go (+258/-252)
To merge this branch: bzr merge lp:~rogpeppe/gozk/factor-out-service
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Disapprove
Review via email: mp+77960@code.launchpad.net

Description of the change

Factor out server starting code into its own package.

It was orthogonal to the ZooKeeper server code,
feels like it might gain complexity over time,
and it potentially re-usable for other servers.
gozk/service should not be the final name,
as it's entirely independent of zk functionality.

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (3.5 KiB)

This is an interesting idea, but at the same time it's full of problems and
not really necessary right now. I suggest we delay it until a future point
when we actually need that abstraction, as we'll be in a better position
to generalize it. At that point, we can even preserve for the internal
implementation of the ZooKeeper server itself, without many problems.

Here is some feedback, regardless, which I suggest you don't work on
right now besides for [1] which should be integrated.

[1]

=== added file 'reattach_test.go'
--- reattach_test.go 1970-01-01 00:00:00 +0000
+++ reattach_test.go 2011-10-05 14:37:11 +0000

This should be in a separate merge proposal as it's important and unrelated
to the strawman changes proposed.

[2]

+type Interface interface {
+ // Directory gives the path to the directory containing
+ // the server's configuration and data files. It is assumed that
+ // this exists and can be modified.
+ Directory() string

s/Directory/Dir/

[3]

+
+ // CheckAvailability should return an error if resources
+ // required by the server are not available; for instance
+ // if the server requires a network port which is not free.
+ CheckAvailability() os.Error

We don't need this. The Command function can return an error if
it knows it cannot start.

[4]

+ l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
+ if err != nil {
+ return fmt.Errorf("cannot listen on port %v: %v", port, err)
+ }
+ l.Close()

This doesn't really guarantee that when the service itself is started,
the port will be available, so we can as well not do it.

[5]

+// Process returns a reference to the identity
+// of the last running server in the Service's directory.
+// If the server was shut down, it returns (nil, nil).

This shouldn't be in a generic service API. What if there
are multiple processes handled by the service?

[6]

+// If the server was shut down, it returns (nil, nil).

If we're returning a nil *Process, we necessarily need err != nil.

[7]

+// It stores the process ID of the server in the file "pid.txt"
+// inside the server's directory. Stdout and stderr of the server
+// are similarly written to "log.txt".

That's not nice. Who owns the content of this directory, the
interface implementation, or the Service? We can't write
arbitrary content into a directory of different ownership.

Also, a generic interface would need to do better in terms
of logging. We could have a file somewhere named output.txt, maybe,
and let logging with the underlying implementation.

[8]

+ if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") {
+ return fmt.Errorf("cannot kill server process: %v", err)
+ }

Different services are stopped in different ways. Some prefer a SIGTERM,
before a SIGKILL, for instance. And what happens if there are multiple
processes?

[9]

+ if err != nil && strings.Contains(err.String(), "no child processes") {
+ // If we can't wait for the server, it's possible that it was running
+ // but not as a child of this process, so give it a little while
+ // to exit. TODO po...

Read more...

review: Disapprove
Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (6.2 KiB)

On 5 October 2011 16:32, Gustavo Niemeyer <email address hidden> wrote:
> Review: Disapprove
>
> This is an interesting idea, but at the same time it's full of problems and
> not really necessary right now.  I suggest we delay it until a future point
> when we actually need that abstraction, as we'll be in a better position
> to generalize it. At that point, we can even preserve for the internal
> implementation of the ZooKeeper server itself, without many problems.

ok, i'll do that if you don't mind.

the thing that was mostly bothering me was the mingling of the code
from Server and Service, and that the complexities of Service were
obscuring the essential simplicity of Server.

BTW many of the comments below are applicable even without
the factored-out service package.

> Here is some feedback, regardless, which I suggest you don't work on
> right now besides for [1] which should be integrated.
>
> [1]
>
> === added file 'reattach_test.go'
> --- reattach_test.go    1970-01-01 00:00:00 +0000
> +++ reattach_test.go    2011-10-05 14:37:11 +0000
>
> This should be in a separate merge proposal as it's important and unrelated
> to the strawman changes proposed.

oh, i thought it was in the earlier merge.

i've added to the update-server-interface, if that's ok.

> +type Interface interface {
> +       // Directory gives the path to the directory containing
> +       // the server's configuration and data files. It is assumed that
> +       // this exists and can be modified.
> +       Directory() string
>
> s/Directory/Dir/

actually maybe RunDir

> [3]
>
> +
> +       // CheckAvailability should return an error if resources
> +       // required by the server are not available; for instance
> +       // if the server requires a network port which is not free.
> +       CheckAvailability() os.Error
>
> We don't need this.  The Command function can return an error if
> it knows it cannot start.

perhaps. i preferred to keep the availability checking separate
so that calling Command didn't imply that the service was
going to be started immediately.

> [4]
>
> +       l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
> +       if err != nil {
> +               return fmt.Errorf("cannot listen on port %v: %v", port, err)
> +       }
> +       l.Close()
>
> This doesn't really guarantee that when the service itself is started,
> the port will be available, so we can as well not do it.

it doesn't guarantee it, but it does mean that we can return
an early error rather than relying on the server to print
the error somewhere at some unspecified later time.
i found it very useful when testing.

i wonder if the Start should print stdout and stderr from
the server for the first half second or so after it's been started.
that's a possible solution, but makes things slower.

> [5]
>
> +// Process returns a reference to the identity
> +// of the last running server in the Service's directory.
> +// If the server was shut down, it returns (nil, nil).
>
> This shouldn't be in a generic service API.  What if there
> are multiple processes handled by the service?

my plan was to change things so the server is started in
its own process group or sess...

Read more...

Unmerged revisions

23. By Roger Peppe

Factored out server running code into a separate package.

22. By Roger Peppe

Fix Server code.

21. By Gustavo Niemeyer

The package location is now http://launchpad.net/gozk/zk.

The shorter package name makes for significantly more digestible line lengths.

20. By Gustavo Niemeyer

Merged clean-up-interface branch, by Roger Peppe [r=niemeyer]

This branch does a number of incompatible changes that improve
the overall API of gozk.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile'
2--- Makefile 2011-08-03 01:56:58 +0000
3+++ Makefile 2011-10-03 15:02:28 +0000
4@@ -2,10 +2,13 @@
5
6 all: package
7
8-TARG=gozk
9+TARG=launchpad.net/gozk/zk
10
11+GOFILES=\
12+ server.go\
13+
14 CGOFILES=\
15- gozk.go\
16+ zk.go\
17
18 CGO_OFILES=\
19 helpers.o\
20
21=== added directory 'example'
22=== renamed file 'example.go' => 'example/example.go'
23--- example.go 2011-08-03 01:47:25 +0000
24+++ example/example.go 2011-10-03 15:02:28 +0000
25@@ -1,30 +1,29 @@
26 package main
27
28 import (
29- "gozk"
30+ "launchpad.net/zookeeper/zookeeper"
31 )
32
33 func main() {
34- zk, session, err := gozk.Init("localhost:2181", 5000)
35- if err != nil {
36- println("Couldn't connect: " + err.String())
37- return
38- }
39-
40- defer zk.Close()
41-
42- // Wait for connection.
43- event := <-session
44- if event.State != gozk.STATE_CONNECTED {
45- println("Couldn't connect")
46- return
47- }
48-
49- _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))
50- if err != nil {
51- println(err.String())
52- } else {
53- println("Created!")
54- }
55+ zk, session, err := zookeeper.Init("localhost:2181", 5000)
56+ if err != nil {
57+ println("Couldn't connect: " + err.String())
58+ return
59+ }
60+
61+ defer zk.Close()
62+
63+ // Wait for connection.
64+ event := <-session
65+ if event.State != zookeeper.STATE_CONNECTED {
66+ println("Couldn't connect")
67+ return
68+ }
69+
70+ _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
71+ if err != nil {
72+ println(err.String())
73+ } else {
74+ println("Created!")
75+ }
76 }
77-
78
79=== added file 'reattach_test.go'
80--- reattach_test.go 1970-01-01 00:00:00 +0000
81+++ reattach_test.go 2011-10-03 15:02:28 +0000
82@@ -0,0 +1,202 @@
83+package zk_test
84+
85+import (
86+ "bufio"
87+ . "launchpad.net/gocheck"
88+ "launchpad.net/gozk/zk/service"
89+ "launchpad.net/gozk/zk"
90+ "exec"
91+ "flag"
92+ "fmt"
93+ "os"
94+ "strings"
95+ "testing"
96+ "time"
97+)
98+
99+var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing")
100+var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing")
101+var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing")
102+
103+// This is the reentrancy point for testing ZooKeeper servers
104+// started by processes that are not direct children of the
105+// testing process. This test always succeeds - the status
106+// will be written to stdout and read by indirectServer.
107+func TestStartNonChildServer(t *testing.T) {
108+ if !*reattach {
109+ // not re-entrant, so ignore this test.
110+ return
111+ }
112+ err := startServer(*reattachRunDir, *reattachAbnormalStop)
113+ if err != nil {
114+ fmt.Printf("error:%v\n", err)
115+ return
116+ }
117+ fmt.Printf("done\n")
118+}
119+
120+func (s *S) startServer(c *C, abort bool) {
121+ err := startServer(s.zkTestRoot, abort)
122+ c.Assert(err, IsNil)
123+}
124+
125+// startServerIndirect starts a ZooKeeper server that is not
126+// a direct child of the current process. If abort is true,
127+// the server will be started and then terminated abnormally.
128+func (s *S) startServerIndirect(c *C, abort bool) {
129+ if len(os.Args) == 0 {
130+ c.Fatal("Cannot find self executable name")
131+ }
132+ cmd := exec.Command(
133+ os.Args[0],
134+ "-zktest.reattach",
135+ "-zktest.rundir", s.zkTestRoot,
136+ "-zktest.stop=", fmt.Sprint(abort),
137+ "-test.run", "StartNonChildServer",
138+ )
139+ r, err := cmd.StdoutPipe()
140+ c.Assert(err, IsNil)
141+ defer r.Close()
142+ if err := cmd.Start(); err != nil {
143+ c.Fatalf("cannot start re-entrant gotest process: %v", err)
144+ }
145+ defer cmd.Wait()
146+ bio := bufio.NewReader(r)
147+ for {
148+ line, err := bio.ReadSlice('\n')
149+ if err != nil {
150+ c.Fatalf("indirect server status line not found: %v", err)
151+ }
152+ s := string(line)
153+ if strings.HasPrefix(s, "error:") {
154+ c.Fatalf("indirect server error: %s", s[len("error:"):])
155+ }
156+ if s == "done\n" {
157+ return
158+ }
159+ }
160+ panic("not reached")
161+}
162+
163+// startServer starts a ZooKeeper server, and terminates it abnormally
164+// if abort is true.
165+func startServer(runDir string, abort bool) os.Error {
166+ s, err := zk.AttachServer(runDir)
167+ if err != nil {
168+ return fmt.Errorf("cannot attach to server at %q: %v", runDir, err)
169+ }
170+ srv := service.New(s)
171+ if err := srv.Start(); err != nil {
172+ return fmt.Errorf("cannot start server: %v", err)
173+ }
174+ if abort {
175+ // Give it time to start up, then kill the server process abnormally,
176+ // leaving the pid.txt file behind.
177+ time.Sleep(0.5e9)
178+ p, err := srv.Process()
179+ if err != nil {
180+ return fmt.Errorf("cannot get server process: %v", err)
181+ }
182+ defer p.Release()
183+ if err := p.Kill(); err != nil {
184+ return fmt.Errorf("cannot kill server process: %v", err)
185+ }
186+ }
187+ return nil
188+}
189+
190+func (s *S) checkCookie(c *C) {
191+ conn, watch, err := zk.Dial(s.zkAddr, 5e9)
192+ c.Assert(err, IsNil)
193+
194+ e, ok := <-watch
195+ c.Assert(ok, Equals, true)
196+ c.Assert(e.Ok(), Equals, true)
197+
198+ c.Assert(err, IsNil)
199+ cookie, _, err := conn.Get("/testAttachCookie")
200+ c.Assert(err, IsNil)
201+ c.Assert(cookie, Equals, "testAttachCookie")
202+ conn.Close()
203+}
204+
205+// cases to test:
206+// child server, stopped normally; reattach, start
207+// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
208+// non-direct child server, still running; reattach, start (->error), stop, start
209+// child server, still running; reattach, start (-> error)
210+// child server, still running; reattach, stop, start.
211+// non-direct child server, still running; reattach, stop, start.
212+func (s *S) TestAttachServer(c *C) {
213+ // Create a cookie so that we know we are reattaching to the same instance.
214+ conn, _ := s.init(c)
215+ _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL))
216+ c.Assert(err, IsNil)
217+ s.checkCookie(c)
218+ s.zkServer.Stop()
219+ s.zkServer = nil
220+
221+ s.testAttachServer(c, (*S).startServer)
222+ s.testAttachServer(c, (*S).startServerIndirect)
223+ s.testAttachServerAbnormalTerminate(c, (*S).startServer)
224+ s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect)
225+
226+ srv, err := zk.AttachServer(s.zkTestRoot)
227+ c.Assert(err, IsNil)
228+
229+ s.zkServer = service.New(srv)
230+ err = s.zkServer.Start()
231+ c.Assert(err, IsNil)
232+
233+ conn, _ = s.init(c)
234+ err = conn.Delete("/testAttachCookie", -1)
235+ c.Assert(err, IsNil)
236+}
237+
238+func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) {
239+ start(s, c, false)
240+
241+ s.checkCookie(c)
242+
243+ // try attaching to it while it is still running - it should fail.
244+ a, err := zk.AttachServer(s.zkTestRoot)
245+ c.Assert(err, IsNil)
246+ srv := service.New(a)
247+
248+ err = srv.Start()
249+ c.Assert(err, NotNil)
250+
251+ // stop it and then start it again - it should succeed.
252+ err = srv.Stop()
253+ c.Assert(err, IsNil)
254+
255+ err = srv.Start()
256+ c.Assert(err, IsNil)
257+
258+ s.checkCookie(c)
259+
260+ err = srv.Stop()
261+ c.Assert(err, IsNil)
262+}
263+
264+func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) {
265+ start(s, c, true)
266+
267+ // try attaching to it and starting - it should fail, because pid.txt
268+ // won't have been removed.
269+ a, err := zk.AttachServer(s.zkTestRoot)
270+ c.Assert(err, IsNil)
271+ srv := service.New(a)
272+ err = srv.Start()
273+ c.Assert(err, NotNil)
274+
275+ // stopping it should bring things back to normal.
276+ err = srv.Stop()
277+ c.Assert(err, IsNil)
278+ err = srv.Start()
279+ c.Assert(err, IsNil)
280+
281+ s.checkCookie(c)
282+ err = srv.Stop()
283+ c.Assert(err, IsNil)
284+}
285\ No newline at end of file
286
287=== modified file 'retry_test.go'
288--- retry_test.go 2011-08-19 01:51:37 +0000
289+++ retry_test.go 2011-10-03 15:02:28 +0000
290@@ -1,42 +1,41 @@
291-package gozk_test
292+package zk_test
293
294 import (
295 . "launchpad.net/gocheck"
296- "gozk"
297+ "launchpad.net/gozk/zk"
298 "os"
299 )
300
301 func (s *S) TestRetryChangeCreating(c *C) {
302- zk, _ := s.init(c)
303+ conn, _ := s.init(c)
304
305- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
306- func(data string, stat gozk.Stat) (string, os.Error) {
307+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
308+ func(data string, stat *zk.Stat) (string, os.Error) {
309 c.Assert(data, Equals, "")
310 c.Assert(stat, IsNil)
311 return "new", nil
312 })
313 c.Assert(err, IsNil)
314
315- data, stat, err := zk.Get("/test")
316+ data, stat, err := conn.Get("/test")
317 c.Assert(err, IsNil)
318 c.Assert(stat, NotNil)
319 c.Assert(stat.Version(), Equals, int32(0))
320 c.Assert(data, Equals, "new")
321
322- acl, _, err := zk.ACL("/test")
323+ acl, _, err := conn.ACL("/test")
324 c.Assert(err, IsNil)
325- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
326+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
327 }
328
329 func (s *S) TestRetryChangeSetting(c *C) {
330- zk, _ := s.init(c)
331+ conn, _ := s.init(c)
332
333- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
334- gozk.WorldACL(gozk.PERM_ALL))
335+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
336 c.Assert(err, IsNil)
337
338- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},
339- func(data string, stat gozk.Stat) (string, os.Error) {
340+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
341+ func(data string, stat *zk.Stat) (string, os.Error) {
342 c.Assert(data, Equals, "old")
343 c.Assert(stat, NotNil)
344 c.Assert(stat.Version(), Equals, int32(0))
345@@ -44,27 +43,26 @@
346 })
347 c.Assert(err, IsNil)
348
349- data, stat, err := zk.Get("/test")
350+ data, stat, err := conn.Get("/test")
351 c.Assert(err, IsNil)
352 c.Assert(stat, NotNil)
353 c.Assert(stat.Version(), Equals, int32(1))
354 c.Assert(data, Equals, "brand new")
355
356 // ACL was unchanged by RetryChange().
357- acl, _, err := zk.ACL("/test")
358+ acl, _, err := conn.ACL("/test")
359 c.Assert(err, IsNil)
360- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
361+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
362 }
363
364 func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
365- zk, _ := s.init(c)
366+ conn, _ := s.init(c)
367
368- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
369- gozk.WorldACL(gozk.PERM_ALL))
370+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
371 c.Assert(err, IsNil)
372
373- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},
374- func(data string, stat gozk.Stat) (string, os.Error) {
375+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
376+ func(data string, stat *zk.Stat) (string, os.Error) {
377 c.Assert(data, Equals, "old")
378 c.Assert(stat, NotNil)
379 c.Assert(stat.Version(), Equals, int32(0))
380@@ -72,7 +70,7 @@
381 })
382 c.Assert(err, IsNil)
383
384- data, stat, err := zk.Get("/test")
385+ data, stat, err := conn.Get("/test")
386 c.Assert(err, IsNil)
387 c.Assert(stat, NotNil)
388 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!
389@@ -80,14 +78,14 @@
390 }
391
392 func (s *S) TestRetryChangeConflictOnCreate(c *C) {
393- zk, _ := s.init(c)
394+ conn, _ := s.init(c)
395
396- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
397+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
398 switch data {
399 case "":
400 c.Assert(stat, IsNil)
401- _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,
402- gozk.WorldACL(gozk.PERM_ALL))
403+ _, err := conn.Create("/test", "conflict", zk.EPHEMERAL,
404+ zk.WorldACL(zk.PERM_ALL))
405 c.Assert(err, IsNil)
406 return "<none> => conflict", nil
407 case "conflict":
408@@ -100,11 +98,10 @@
409 return "can't happen", nil
410 }
411
412- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
413- changeFunc)
414+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc)
415 c.Assert(err, IsNil)
416
417- data, stat, err := zk.Get("/test")
418+ data, stat, err := conn.Get("/test")
419 c.Assert(err, IsNil)
420 c.Assert(data, Equals, "conflict => new")
421 c.Assert(stat, NotNil)
422@@ -112,18 +109,17 @@
423 }
424
425 func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
426- zk, _ := s.init(c)
427+ conn, _ := s.init(c)
428
429- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
430- gozk.WorldACL(gozk.PERM_ALL))
431+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
432 c.Assert(err, IsNil)
433
434- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
435+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
436 switch data {
437 case "old":
438 c.Assert(stat, NotNil)
439 c.Assert(stat.Version(), Equals, int32(0))
440- _, err := zk.Set("/test", "conflict", 0)
441+ _, err := conn.Set("/test", "conflict", 0)
442 c.Assert(err, IsNil)
443 return "old => new", nil
444 case "conflict":
445@@ -136,10 +132,10 @@
446 return "can't happen", nil
447 }
448
449- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)
450+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc)
451 c.Assert(err, IsNil)
452
453- data, stat, err := zk.Get("/test")
454+ data, stat, err := conn.Get("/test")
455 c.Assert(err, IsNil)
456 c.Assert(data, Equals, "conflict => new")
457 c.Assert(stat, NotNil)
458@@ -147,18 +143,17 @@
459 }
460
461 func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
462- zk, _ := s.init(c)
463+ conn, _ := s.init(c)
464
465- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
466- gozk.WorldACL(gozk.PERM_ALL))
467+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
468 c.Assert(err, IsNil)
469
470- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
471+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
472 switch data {
473 case "old":
474 c.Assert(stat, NotNil)
475 c.Assert(stat.Version(), Equals, int32(0))
476- err := zk.Delete("/test", 0)
477+ err := conn.Delete("/test", 0)
478 c.Assert(err, IsNil)
479 return "old => <deleted>", nil
480 case "":
481@@ -170,55 +165,53 @@
482 return "can't happen", nil
483 }
484
485- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),
486- changeFunc)
487+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc)
488 c.Assert(err, IsNil)
489
490- data, stat, err := zk.Get("/test")
491+ data, stat, err := conn.Get("/test")
492 c.Assert(err, IsNil)
493 c.Assert(data, Equals, "<deleted> => new")
494 c.Assert(stat, NotNil)
495 c.Assert(stat.Version(), Equals, int32(0))
496
497 // Should be the new ACL.
498- acl, _, err := zk.ACL("/test")
499+ acl, _, err := conn.ACL("/test")
500 c.Assert(err, IsNil)
501- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))
502+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
503 }
504
505 func (s *S) TestRetryChangeErrorInCallback(c *C) {
506- zk, _ := s.init(c)
507+ conn, _ := s.init(c)
508
509- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
510- func(data string, stat gozk.Stat) (string, os.Error) {
511+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
512+ func(data string, stat *zk.Stat) (string, os.Error) {
513 return "don't use this", os.NewError("BOOM!")
514 })
515 c.Assert(err, NotNil)
516- c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
517 c.Assert(err.String(), Equals, "BOOM!")
518
519- stat, err := zk.Exists("/test")
520+ stat, err := conn.Exists("/test")
521 c.Assert(err, IsNil)
522 c.Assert(stat, IsNil)
523 }
524
525 func (s *S) TestRetryChangeFailsReading(c *C) {
526- zk, _ := s.init(c)
527+ conn, _ := s.init(c)
528
529- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
530- gozk.WorldACL(gozk.PERM_WRITE)) // Write only!
531+ // Write only!
532+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE))
533 c.Assert(err, IsNil)
534
535 var called bool
536- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
537- func(data string, stat gozk.Stat) (string, os.Error) {
538+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
539+ func(data string, stat *zk.Stat) (string, os.Error) {
540 called = true
541 return "", nil
542 })
543 c.Assert(err, NotNil)
544- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
545+ c.Assert(err, Equals, zk.ZNOAUTH)
546
547- stat, err := zk.Exists("/test")
548+ stat, err := conn.Exists("/test")
549 c.Assert(err, IsNil)
550 c.Assert(stat, NotNil)
551 c.Assert(stat.Version(), Equals, int32(0))
552@@ -227,22 +220,21 @@
553 }
554
555 func (s *S) TestRetryChangeFailsSetting(c *C) {
556- zk, _ := s.init(c)
557+ conn, _ := s.init(c)
558
559- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
560- gozk.WorldACL(gozk.PERM_READ)) // Read only!
561+ // Read only!
562+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
563 c.Assert(err, IsNil)
564
565 var called bool
566- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
567- func(data string, stat gozk.Stat) (string, os.Error) {
568+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
569+ func(data string, stat *zk.Stat) (string, os.Error) {
570 called = true
571 return "", nil
572 })
573- c.Assert(err, NotNil)
574- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
575+ c.Assert(err, Equals, zk.ZNOAUTH)
576
577- stat, err := zk.Exists("/test")
578+ stat, err := conn.Exists("/test")
579 c.Assert(err, IsNil)
580 c.Assert(stat, NotNil)
581 c.Assert(stat.Version(), Equals, int32(0))
582@@ -251,23 +243,22 @@
583 }
584
585 func (s *S) TestRetryChangeFailsCreating(c *C) {
586- zk, _ := s.init(c)
587+ conn, _ := s.init(c)
588
589- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
590- gozk.WorldACL(gozk.PERM_READ)) // Read only!
591+ // Read only!
592+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
593 c.Assert(err, IsNil)
594
595 var called bool
596- err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,
597- gozk.WorldACL(gozk.PERM_ALL),
598- func(data string, stat gozk.Stat) (string, os.Error) {
599+ err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
600+ func(data string, stat *zk.Stat) (string, os.Error) {
601 called = true
602 return "", nil
603 })
604 c.Assert(err, NotNil)
605- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
606+ c.Assert(err, Equals, zk.ZNOAUTH)
607
608- stat, err := zk.Exists("/test/sub")
609+ stat, err := conn.Exists("/test/sub")
610 c.Assert(err, IsNil)
611 c.Assert(stat, IsNil)
612
613
614=== added file 'server.go'
615--- server.go 1970-01-01 00:00:00 +0000
616+++ server.go 2011-10-03 15:02:28 +0000
617@@ -0,0 +1,239 @@
618+package zk
619+
620+import (
621+ "bufio"
622+ "bytes"
623+ "fmt"
624+ "io/ioutil"
625+ "net"
626+ "os"
627+ "path/filepath"
628+ "strings"
629+)
630+
631+// Server represents a ZooKeeper server, its data and configuration files.
632+type Server struct {
633+ runDir string
634+ installDir string
635+}
636+
637+func (srv *Server) Directory() string {
638+ return srv.runDir
639+}
640+
641+// CreateServer creates the directory runDir and sets up a ZooKeeper server
642+// environment inside it. It is an error if runDir already exists.
643+// The server will listen on the specified TCP port.
644+//
645+// The ZooKeeper installation directory is specified by installDir.
646+// If this is empty, a system default will be used.
647+//
648+// CreateServer does not start the server - the *Server that
649+// is returned can be used with the service package, for example:
650+// see launchpad.net/gozk/zk/service.
651+func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
652+ if err := os.Mkdir(runDir, 0777); err != nil {
653+ return nil, err
654+ }
655+ srv := &Server{runDir: runDir, installDir: installDir}
656+ if err := srv.writeLog4JConfig(); err != nil {
657+ return nil, err
658+ }
659+ if err := srv.writeZooKeeperConfig(port); err != nil {
660+ return nil, err
661+ }
662+ if err := srv.writeInstallDir(); err != nil {
663+ return nil, err
664+ }
665+ return srv, nil
666+}
667+
668+// AttachServer creates a new ZooKeeper Server instance
669+// to operate inside an existing run directory, runDir.
670+// The directory must have been created with CreateServer.
671+func AttachServer(runDir string) (*Server, os.Error) {
672+ srv := &Server{runDir: runDir}
673+ if err := srv.readInstallDir(); err != nil {
674+ return nil, fmt.Errorf("cannot read server install directory: %v", err)
675+ }
676+ return srv, nil
677+}
678+
679+func (srv *Server) path(name string) string {
680+ return filepath.Join(srv.runDir, name)
681+}
682+
683+func (srv *Server) CheckAvailability() os.Error {
684+ port, err := srv.NetworkPort()
685+ if err != nil {
686+ return fmt.Errorf("cannot get network port: %v", err)
687+ }
688+ l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
689+ if err != nil {
690+ return fmt.Errorf("cannot listen on port %v: %v", port, err)
691+ }
692+ l.Close()
693+ return nil
694+}
695+
696+// ServerCommand returns the command used to start the
697+// ZooKeeper server. It is provided for debugging and testing
698+// purposes only.
699+func (srv *Server) Command() ([]string, os.Error) {
700+ cp, err := srv.classPath()
701+ if err != nil {
702+ return nil, fmt.Errorf("cannot get class path: %v", err)
703+ }
704+ return []string{
705+ "java",
706+ "-cp", strings.Join(cp, ":"),
707+ "-Dzookeeper.root.logger=INFO,CONSOLE",
708+ "-Dlog4j.configuration=file:"+srv.path("log4j.properties"),
709+ "org.apache.zookeeper.server.quorum.QuorumPeerMain",
710+ srv.path("zoo.cfg"),
711+ }, nil
712+}
713+
714+var log4jProperties = `
715+log4j.rootLogger=INFO, CONSOLE
716+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
717+log4j.appender.CONSOLE.Threshold=INFO
718+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
719+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
720+`
721+
722+func (srv *Server) writeLog4JConfig() (err os.Error) {
723+ return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666)
724+}
725+
726+func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) {
727+ return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf(
728+ "tickTime=2000\n"+
729+ "dataDir=%s\n"+
730+ "clientPort=%d\n"+
731+ "maxClientCnxns=500\n",
732+ srv.runDir, port)), 0666)
733+}
734+
735+// NetworkPort returns the TCP port number that
736+// the server is configured for.
737+func (srv *Server) NetworkPort() (int, os.Error) {
738+ f, err := os.Open(srv.path("zoo.cfg"))
739+ if err != nil {
740+ return 0, err
741+ }
742+ r := bufio.NewReader(f)
743+ for {
744+ line, err := r.ReadSlice('\n')
745+ if err != nil {
746+ return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg"))
747+ }
748+ var port int
749+ if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 {
750+ return port, nil
751+ }
752+ }
753+ panic("not reached")
754+}
755+
756+func (srv *Server) writeInstallDir() os.Error {
757+ return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)
758+}
759+
760+func (srv *Server) readInstallDir() os.Error {
761+ data, err := ioutil.ReadFile(srv.path("installdir.txt"))
762+ if err != nil {
763+ return err
764+ }
765+ if data[len(data)-1] == '\n' {
766+ data = data[0 : len(data)-1]
767+ }
768+ srv.installDir = string(data)
769+ return nil
770+}
771+
772+func (srv *Server) classPath() ([]string, os.Error) {
773+ dir := srv.installDir
774+ if dir == "" {
775+ return systemClassPath()
776+ }
777+ if err := checkDirectory(dir); err != nil {
778+ return nil, err
779+ }
780+ // Two possibilities, as seen in zkEnv.sh:
781+ // 1) locally built binaries (jars are in build directory)
782+ // 2) release binaries
783+ if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
784+ dir = build
785+ }
786+ classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
787+ if err != nil {
788+ panic(fmt.Errorf("glob for jar files: %v", err))
789+ }
790+ more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
791+ if err != nil {
792+ panic(fmt.Errorf("glob for lib jar files: %v", err))
793+ }
794+
795+ classPath = append(classPath, more...)
796+ if len(classPath) == 0 {
797+ return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
798+ }
799+ return classPath, nil
800+}
801+
802+const zookeeperEnviron = "/etc/zookeeper/conf/environment"
803+
804+func systemClassPath() ([]string, os.Error) {
805+ f, err := os.Open(zookeeperEnviron)
806+ if f == nil {
807+ return nil, err
808+ }
809+ r := bufio.NewReader(f)
810+ for {
811+ line, err := r.ReadSlice('\n')
812+ if err != nil {
813+ break
814+ }
815+ if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
816+ continue
817+ }
818+
819+ // remove variable and newline
820+ path := string(line[len("CLASSPATH=") : len(line)-1])
821+
822+ // trim white space
823+ path = strings.Trim(path, " \t\r")
824+
825+ // strip quotes
826+ if path[0] == '"' {
827+ path = path[1 : len(path)-1]
828+ }
829+
830+ // split on :
831+ classPath := strings.Split(path, ":")
832+
833+ // split off $ZOOCFGDIR
834+ if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
835+ classPath = classPath[1:]
836+ }
837+
838+ if len(classPath) == 0 {
839+ return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
840+ }
841+ return classPath, nil
842+ }
843+ return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
844+}
845+
846+// checkDirectory returns an error if the given path
847+// does not exist or is not a directory.
848+func checkDirectory(path string) os.Error {
849+ if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
850+ if err == nil {
851+ err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
852+ }
853+ return err
854+ }
855+ return nil
856+}
857
858=== added directory 'service'
859=== added file 'service/Makefile'
860--- service/Makefile 1970-01-01 00:00:00 +0000
861+++ service/Makefile 2011-10-03 15:02:28 +0000
862@@ -0,0 +1,20 @@
863+include $(GOROOT)/src/Make.inc
864+
865+TARG=launchpad.net/gozk/zk/service
866+
867+GOFILES=\
868+ service.go\
869+
870+GOFMT=gofmt
871+BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go))
872+
873+gofmt: $(BADFMT)
874+ @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done
875+
876+ifneq ($(BADFMT),)
877+ifneq ($(MAKECMDGOALS),gofmt)
878+$(warning WARNING: make gofmt: $(BADFMT))
879+endif
880+endif
881+
882+include $(GOROOT)/src/Make.pkg
883
884=== added file 'service/service.go'
885--- service/service.go 1970-01-01 00:00:00 +0000
886+++ service/service.go 2011-10-03 15:02:28 +0000
887@@ -0,0 +1,160 @@
888+// The service package provides support for long-running services.
889+package service
890+
891+import (
892+ "os"
893+ "fmt"
894+ "io/ioutil"
895+ "strconv"
896+ "path/filepath"
897+ "exec"
898+ "strings"
899+ "time"
900+)
901+
902+// Interface represents a single-process server.
903+type Interface interface {
904+ // Directory gives the path to the directory containing
905+ // the server's configuration and data files. It is assumed that
906+ // this exists and can be modified.
907+ Directory() string
908+
909+ // CheckAvailability should return an error if resources
910+ // required by the server are not available; for instance
911+ // if the server requires a network port which is not free.
912+ CheckAvailability() os.Error
913+
914+ // Command should return a command that can be
915+ // used to run the server. The first element of the returned
916+ // slice is the executable name; the rest of the elements are
917+ // its arguments.
918+ Command() ([]string, os.Error)
919+}
920+
921+// Service represents a possibly running server process.
922+type Service struct {
923+ srv Interface
924+}
925+
926+// New returns a new Service using the given
927+// server interface.
928+func New(srv Interface) *Service {
929+ return &Service{srv}
930+}
931+
932+// Process returns a reference to the identity
933+// of the last running server in the Service's directory.
934+// If the server was shut down, it returns (nil, nil).
935+// The server process may not still be running
936+// (for instance if it was terminated abnormally).
937+// This function is provided for debugging and testing
938+// purposes only.
939+func (s *Service) Process() (*os.Process, os.Error) {
940+ data, err := ioutil.ReadFile(s.path("pid.txt"))
941+ if err != nil {
942+ if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT {
943+ return nil, nil
944+ }
945+ return nil, err
946+ }
947+ pid, err := strconv.Atoi(string(data))
948+ if err != nil {
949+ return nil, os.NewError("bad process id found in pid.txt")
950+ }
951+ return os.FindProcess(pid)
952+}
953+
954+func (s *Service) path(name string) string {
955+ return filepath.Join(s.srv.Directory(), name)
956+}
957+
958+// Start starts the server. It returns an error if the server is already running.
959+// It stores the process ID of the server in the file "pid.txt"
960+// inside the server's directory. Stdout and stderr of the server
961+// are similarly written to "log.txt".
962+func (s *Service) Start() os.Error {
963+ if err := s.srv.CheckAvailability(); err != nil {
964+ return err
965+ }
966+ p, err := s.Process()
967+ if p != nil || err != nil {
968+ if p != nil {
969+ p.Release()
970+ }
971+ return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt"))
972+ }
973+
974+ // create the pid file before starting the process so that if we get two
975+ // programs trying to concurrently start a server on the same directory
976+ // at the same time, only one should succeed.
977+ pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666)
978+ if err != nil {
979+ return fmt.Errorf("cannot create pid.txt: %v", err)
980+ }
981+ defer pidf.Close()
982+
983+ args, err := s.srv.Command()
984+ if err != nil {
985+ return fmt.Errorf("cannot determine command: %v", err)
986+ }
987+ cmd := exec.Command(args[0], args[1:]...)
988+
989+ logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
990+ if err != nil {
991+ return fmt.Errorf("cannot create log file: %v", err)
992+ }
993+ defer logf.Close()
994+ cmd.Stdout = logf
995+ cmd.Stderr = logf
996+ if err := cmd.Start(); err != nil {
997+ return fmt.Errorf("cannot start server: %v", err)
998+ }
999+ if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil {
1000+ return fmt.Errorf("cannot write pid file: %v", err)
1001+ }
1002+ return nil
1003+}
1004+
1005+// Stop kills the server. It is a no-op if it is already running.
1006+func (s *Service) Stop() os.Error {
1007+ p, err := s.Process()
1008+ if p == nil {
1009+ if err != nil {
1010+ return fmt.Errorf("cannot read process ID of server: %v", err)
1011+ }
1012+ return nil
1013+ }
1014+ defer p.Release()
1015+ if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") {
1016+ return fmt.Errorf("cannot kill server process: %v", err)
1017+ }
1018+ // ignore the error returned from Wait because there's little
1019+ // we can do about it - it either means that the process has just exited
1020+ // anyway or that we can't wait for it for some other reason,
1021+ // for example because it was originally started by some other process.
1022+ _, err = p.Wait(0)
1023+ if err != nil && strings.Contains(err.String(), "no child processes") {
1024+ // If we can't wait for the server, it's possible that it was running
1025+ // but not as a child of this process, so give it a little while
1026+ // to exit. TODO poll with kill(no signal)?
1027+ time.Sleep(0.5e9)
1028+ }
1029+
1030+ if err := os.Remove(s.path("pid.txt")); err != nil {
1031+ return fmt.Errorf("cannot remove server process ID file: %v", err)
1032+ }
1033+ return nil
1034+}
1035+
1036+// Destroy stops the server, and then removes its
1037+// directory and all its contents.
1038+// Warning: this will destroy all data associated with the server.
1039+func (s *Service) Destroy() os.Error {
1040+ if err := s.Stop(); err != nil {
1041+ return err
1042+ }
1043+ if err := os.RemoveAll(s.srv.Directory()); err != nil {
1044+ return err
1045+ }
1046+ return nil
1047+}
1048
1049=== modified file 'suite_test.go'
1050--- suite_test.go 2011-08-19 01:43:37 +0000
1051+++ suite_test.go 2011-10-03 15:02:28 +0000
1052@@ -1,65 +1,48 @@
1053-package gozk_test
1054+package zk_test
1055
1056 import (
1057 . "launchpad.net/gocheck"
1058- "testing"
1059- "io/ioutil"
1060- "path"
1061 "fmt"
1062+ "launchpad.net/gozk/zk/service"
1063+ "launchpad.net/gozk/zk"
1064 "os"
1065- "gozk"
1066+ "testing"
1067 "time"
1068 )
1069
1070 func TestAll(t *testing.T) {
1071- TestingT(t)
1072+ if !*reattach {
1073+ TestingT(t)
1074+ }
1075 }
1076
1077 var _ = Suite(&S{})
1078
1079 type S struct {
1080- zkRoot string
1081- zkTestRoot string
1082- zkTestPort int
1083- zkServerSh string
1084- zkServerOut *os.File
1085- zkAddr string
1086+ zkServer *service.Service
1087+ zkTestRoot string
1088+ zkAddr string
1089
1090- handles []*gozk.ZooKeeper
1091- events []*gozk.Event
1092+ handles []*zk.Conn
1093+ events []*zk.Event
1094 liveWatches int
1095 deadWatches chan bool
1096 }
1097
1098-var logLevel = 0 //gozk.LOG_ERROR
1099-
1100-
1101-var testZooCfg = ("dataDir=%s\n" +
1102- "clientPort=%d\n" +
1103- "tickTime=2000\n" +
1104- "initLimit=10\n" +
1105- "syncLimit=5\n" +
1106- "")
1107-
1108-var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
1109- "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
1110- "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
1111- "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
1112- "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
1113- "")
1114-
1115-func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
1116- zk, watch, err := gozk.Init(s.zkAddr, 5e9)
1117+var logLevel = 0 //zk.LOG_ERROR
1118+
1119+func (s *S) init(c *C) (*zk.Conn, chan zk.Event) {
1120+ conn, watch, err := zk.Dial(s.zkAddr, 5e9)
1121 c.Assert(err, IsNil)
1122
1123- s.handles = append(s.handles, zk)
1124+ s.handles = append(s.handles, conn)
1125
1126 event := <-watch
1127
1128- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
1129- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
1130+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
1131+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
1132
1133- bufferedWatch := make(chan gozk.Event, 256)
1134+ bufferedWatch := make(chan zk.Event, 256)
1135 bufferedWatch <- event
1136
1137 s.liveWatches += 1
1138@@ -82,13 +65,13 @@
1139 s.deadWatches <- true
1140 }()
1141
1142- return zk, bufferedWatch
1143+ return conn, bufferedWatch
1144 }
1145
1146 func (s *S) SetUpTest(c *C) {
1147- c.Assert(gozk.CountPendingWatches(), Equals, 0,
1148+ c.Assert(zk.CountPendingWatches(), Equals, 0,
1149 Bug("Test got a dirty watch state before running!"))
1150- gozk.SetLogLevel(logLevel)
1151+ zk.SetLogLevel(logLevel)
1152 }
1153
1154 func (s *S) TearDownTest(c *C) {
1155@@ -108,98 +91,38 @@
1156 }
1157
1158 // Reset the list of handles.
1159- s.handles = make([]*gozk.ZooKeeper, 0)
1160+ s.handles = make([]*zk.Conn, 0)
1161
1162- c.Assert(gozk.CountPendingWatches(), Equals, 0,
1163+ c.Assert(zk.CountPendingWatches(), Equals, 0,
1164 Bug("Test left live watches behind!"))
1165 }
1166
1167-// We use the suite set up and tear down to manage a custom zookeeper
1168+// We use the suite set up and tear down to manage a custom ZooKeeper
1169 //
1170 func (s *S) SetUpSuite(c *C) {
1171-
1172+ var err os.Error
1173 s.deadWatches = make(chan bool)
1174
1175- var err os.Error
1176-
1177- s.zkRoot = os.Getenv("ZKROOT")
1178- if s.zkRoot == "" {
1179- panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")
1180- }
1181-
1182- s.zkTestRoot = c.MkDir()
1183- s.zkTestPort = 21812
1184-
1185- println("ZooKeeper test server directory:", s.zkTestRoot)
1186- println("ZooKeeper test server port:", s.zkTestPort)
1187-
1188- s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)
1189-
1190- s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")
1191- s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
1192- os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
1193- if err != nil {
1194- panic("Can't open stdout.txt file for server: " + err.String())
1195- }
1196-
1197- dataDir := path.Join(s.zkTestRoot, "data")
1198- confDir := path.Join(s.zkTestRoot, "conf")
1199-
1200- os.Mkdir(dataDir, 0755)
1201- os.Mkdir(confDir, 0755)
1202-
1203- err = os.Setenv("ZOOCFGDIR", confDir)
1204- if err != nil {
1205- panic("Can't set $ZOOCFGDIR: " + err.String())
1206- }
1207-
1208- zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
1209- err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
1210- if err != nil {
1211- panic("Can't write zoo.cfg: " + err.String())
1212- }
1213-
1214- log4jPrp := []byte(testLog4jPrp)
1215- err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
1216- if err != nil {
1217- panic("Can't write log4j.properties: " + err.String())
1218- }
1219-
1220- s.StartZK()
1221+ // N.B. We meed to create a subdirectory because zk.CreateServer
1222+ // insists on creating its own directory.
1223+ s.zkTestRoot = c.MkDir() + "/zk"
1224+
1225+ port := 21812
1226+ s.zkAddr = fmt.Sprint("localhost:", port)
1227+
1228+ srv, err := zk.CreateServer(port, s.zkTestRoot, "")
1229+ if err != nil {
1230+ c.Fatal("Cannot set up server environment: ", err)
1231+ }
1232+ s.zkServer = service.New(srv)
1233+ err = s.zkServer.Start()
1234+ if err != nil {
1235+ c.Fatal("Cannot start ZooKeeper server: ", err)
1236+ }
1237 }
1238
1239 func (s *S) TearDownSuite(c *C) {
1240- s.StopZK()
1241- s.zkServerOut.Close()
1242-}
1243-
1244-func (s *S) StartZK() {
1245- attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
1246- proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)
1247- if err != nil {
1248- panic("Problem executing zkServer.sh start: " + err.String())
1249- }
1250-
1251- result, err := proc.Wait(0)
1252- if err != nil {
1253- panic(err.String())
1254- } else if result.ExitStatus() != 0 {
1255- panic("'zkServer.sh start' exited with non-zero status")
1256- }
1257-}
1258-
1259-func (s *S) StopZK() {
1260- attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
1261- proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)
1262- if err != nil {
1263- panic("Problem executing zkServer.sh stop: " + err.String() +
1264- " (look for runaway java processes!)")
1265- }
1266- result, err := proc.Wait(0)
1267- if err != nil {
1268- panic(err.String())
1269- } else if result.ExitStatus() != 0 {
1270- panic("'zkServer.sh stop' exited with non-zero status " +
1271- "(look for runaway java processes!)")
1272+ if s.zkServer != nil {
1273+ s.zkServer.Stop() // TODO Destroy
1274 }
1275 }
1276
1277=== renamed file 'gozk.go' => 'zk.go'
1278--- gozk.go 2011-08-19 01:56:39 +0000
1279+++ zk.go 2011-10-03 15:02:28 +0000
1280@@ -1,4 +1,4 @@
1281-// gozk - Zookeeper support for the Go language
1282+// gozk - ZooKeeper support for the Go language
1283 //
1284 // https://wiki.ubuntu.com/gozk
1285 //
1286@@ -6,7 +6,7 @@
1287 //
1288 // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
1289 //
1290-package gozk
1291+package zk
1292
1293 /*
1294 #cgo CFLAGS: -I/usr/include/c-client-src
1295@@ -27,17 +27,16 @@
1296 // -----------------------------------------------------------------------
1297 // Main constants and data types.
1298
1299-// The main ZooKeeper object, created through the Init function.
1300-// Encapsulates all communication with ZooKeeper.
1301-type ZooKeeper struct {
1302+// Conn represents a connection to a set of ZooKeeper nodes.
1303+type Conn struct {
1304 watchChannels map[uintptr]chan Event
1305 sessionWatchId uintptr
1306 handle *C.zhandle_t
1307 mutex sync.Mutex
1308 }
1309
1310-// ClientId represents the established session in ZooKeeper. This is only
1311-// useful to be passed back into the ReInit function.
1312+// ClientId represents an established ZooKeeper session. It can be
1313+// passed into Redial to reestablish a connection to an existing session.
1314 type ClientId struct {
1315 cId C.clientid_t
1316 }
1317@@ -98,35 +97,60 @@
1318 State int
1319 }
1320
1321-// Error codes that may be used to verify the result of the
1322-// Code method from Error.
1323+// Error represents a ZooKeeper error.
1324+type Error int
1325+
1326 const (
1327- ZOK = C.ZOK
1328- ZSYSTEMERROR = C.ZSYSTEMERROR
1329- ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY
1330- ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY
1331- ZCONNECTIONLOSS = C.ZCONNECTIONLOSS
1332- ZMARSHALLINGERROR = C.ZMARSHALLINGERROR
1333- ZUNIMPLEMENTED = C.ZUNIMPLEMENTED
1334- ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT
1335- ZBADARGUMENTS = C.ZBADARGUMENTS
1336- ZINVALIDSTATE = C.ZINVALIDSTATE
1337- ZAPIERROR = C.ZAPIERROR
1338- ZNONODE = C.ZNONODE
1339- ZNOAUTH = C.ZNOAUTH
1340- ZBADVERSION = C.ZBADVERSION
1341- ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS
1342- ZNODEEXISTS = C.ZNODEEXISTS
1343- ZNOTEMPTY = C.ZNOTEMPTY
1344- ZSESSIONEXPIRED = C.ZSESSIONEXPIRED
1345- ZINVALIDCALLBACK = C.ZINVALIDCALLBACK
1346- ZINVALIDACL = C.ZINVALIDACL
1347- ZAUTHFAILED = C.ZAUTHFAILED
1348- ZCLOSING = C.ZCLOSING
1349- ZNOTHING = C.ZNOTHING
1350- ZSESSIONMOVED = C.ZSESSIONMOVED
1351+ ZOK Error = C.ZOK
1352+ ZSYSTEMERROR Error = C.ZSYSTEMERROR
1353+ ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
1354+ ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
1355+ ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
1356+ ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
1357+ ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
1358+ ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
1359+ ZBADARGUMENTS Error = C.ZBADARGUMENTS
1360+ ZINVALIDSTATE Error = C.ZINVALIDSTATE
1361+ ZAPIERROR Error = C.ZAPIERROR
1362+ ZNONODE Error = C.ZNONODE
1363+ ZNOAUTH Error = C.ZNOAUTH
1364+ ZBADVERSION Error = C.ZBADVERSION
1365+ ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
1366+ ZNODEEXISTS Error = C.ZNODEEXISTS
1367+ ZNOTEMPTY Error = C.ZNOTEMPTY
1368+ ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
1369+ ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
1370+ ZINVALIDACL Error = C.ZINVALIDACL
1371+ ZAUTHFAILED Error = C.ZAUTHFAILED
1372+ ZCLOSING Error = C.ZCLOSING
1373+ ZNOTHING Error = C.ZNOTHING
1374+ ZSESSIONMOVED Error = C.ZSESSIONMOVED
1375 )
1376
1377+func (error Error) String() string {
1378+ return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
1379+}
1380+
1381+// zkError creates an appropriate error return from
1382+// a ZooKeeper status and the errno return from a C API
1383+// call.
1384+func zkError(rc C.int, cerr os.Error) os.Error {
1385+ code := Error(rc)
1386+ switch code {
1387+ case ZOK:
1388+ return nil
1389+
1390+ case ZSYSTEMERROR:
1391+ // If a ZooKeeper call returns ZSYSTEMERROR, then
1392+ // errno becomes significant. If errno has not been
1393+ // set, then we will return ZSYSTEMERROR nonetheless.
1394+ if cerr != nil {
1395+ return cerr
1396+ }
1397+ }
1398+ return code
1399+}
1400+
1401 // Constants for SetLogLevel.
1402 const (
1403 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
1404@@ -273,104 +297,67 @@
1405 }
1406
1407 // -----------------------------------------------------------------------
1408-// Error interface which maps onto the ZooKeeper error codes.
1409-
1410-type Error interface {
1411- String() string
1412- Code() int
1413-}
1414-
1415-type errorType struct {
1416- zkrc C.int
1417- err os.Error
1418-}
1419-
1420-func newError(zkrc C.int, err os.Error) Error {
1421- return &errorType{zkrc, err}
1422-}
1423-
1424-func (error *errorType) String() (result string) {
1425- if error.zkrc == ZSYSTEMERROR && error.err != nil {
1426- result = error.err.String()
1427- } else {
1428- result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
1429- }
1430- return
1431-}
1432-
1433-// Code returns the error code that may be compared against one of
1434-// the gozk.Z* constants.
1435-func (error *errorType) Code() int {
1436- return int(error.zkrc)
1437-}
1438-
1439-// -----------------------------------------------------------------------
1440-// Stat interface which maps onto the ZooKeeper Stat struct.
1441-
1442-// We declare this as an interface rather than an actual struct because
1443-// this way we don't have to copy data around between the real C struct
1444-// and the Go one on every call. Most uses will only touch a few elements,
1445-// or even ignore the stat entirely, so that's a win.
1446
1447 // Stat contains detailed information about a node.
1448-type Stat interface {
1449- Czxid() int64
1450- Mzxid() int64
1451- CTime() int64
1452- MTime() int64
1453- Version() int32
1454- CVersion() int32
1455- AVersion() int32
1456- EphemeralOwner() int64
1457- DataLength() int32
1458- NumChildren() int32
1459- Pzxid() int64
1460-}
1461-
1462-type resultStat C.struct_Stat
1463-
1464-func (stat *resultStat) Czxid() int64 {
1465- return int64(stat.czxid)
1466-}
1467-
1468-func (stat *resultStat) Mzxid() int64 {
1469- return int64(stat.mzxid)
1470-}
1471-
1472-func (stat *resultStat) CTime() int64 {
1473- return int64(stat.ctime)
1474-}
1475-
1476-func (stat *resultStat) MTime() int64 {
1477- return int64(stat.mtime)
1478-}
1479-
1480-func (stat *resultStat) Version() int32 {
1481- return int32(stat.version)
1482-}
1483-
1484-func (stat *resultStat) CVersion() int32 {
1485- return int32(stat.cversion)
1486-}
1487-
1488-func (stat *resultStat) AVersion() int32 {
1489- return int32(stat.aversion)
1490-}
1491-
1492-func (stat *resultStat) EphemeralOwner() int64 {
1493- return int64(stat.ephemeralOwner)
1494-}
1495-
1496-func (stat *resultStat) DataLength() int32 {
1497- return int32(stat.dataLength)
1498-}
1499-
1500-func (stat *resultStat) NumChildren() int32 {
1501- return int32(stat.numChildren)
1502-}
1503-
1504-func (stat *resultStat) Pzxid() int64 {
1505- return int64(stat.pzxid)
1506+type Stat struct {
1507+ c C.struct_Stat
1508+}
1509+
1510+func (stat *Stat) String() string {
1511+ return fmt.Sprintf(
1512+ "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+
1513+ "Version: %d; CVersion: %d; AVersion: %d; "+
1514+ "EphemeralOwner: %d; DataLength: %d; "+
1515+ "NumChildren: %d; Pzxid: %d}",
1516+ stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(),
1517+ stat.Version(), stat.CVersion(), stat.AVersion(),
1518+ stat.EphemeralOwner(), stat.DataLength(),
1519+ stat.NumChildren(), stat.Pzxid(),
1520+ )
1521+}
1522+
1523+func (stat *Stat) Czxid() int64 {
1524+ return int64(stat.c.czxid)
1525+}
1526+
1527+func (stat *Stat) Mzxid() int64 {
1528+ return int64(stat.c.mzxid)
1529+}
1530+
1531+func (stat *Stat) CTime() int64 {
1532+ return int64(stat.c.ctime)
1533+}
1534+
1535+func (stat *Stat) MTime() int64 {
1536+ return int64(stat.c.mtime)
1537+}
1538+
1539+func (stat *Stat) Version() int32 {
1540+ return int32(stat.c.version)
1541+}
1542+
1543+func (stat *Stat) CVersion() int32 {
1544+ return int32(stat.c.cversion)
1545+}
1546+
1547+func (stat *Stat) AVersion() int32 {
1548+ return int32(stat.c.aversion)
1549+}
1550+
1551+func (stat *Stat) EphemeralOwner() int64 {
1552+ return int64(stat.c.ephemeralOwner)
1553+}
1554+
1555+func (stat *Stat) DataLength() int32 {
1556+ return int32(stat.c.dataLength)
1557+}
1558+
1559+func (stat *Stat) NumChildren() int32 {
1560+ return int32(stat.c.numChildren)
1561+}
1562+
1563+func (stat *Stat) Pzxid() int64 {
1564+ return int64(stat.c.pzxid)
1565 }
1566
1567 // -----------------------------------------------------------------------
1568@@ -384,7 +371,7 @@
1569 C.zoo_set_debug_level(C.ZooLogLevel(level))
1570 }
1571
1572-// Init initializes the communication with a ZooKeeper cluster. The provided
1573+// Dial initializes the communication with a ZooKeeper cluster. The provided
1574 // servers parameter may include multiple server addresses, separated
1575 // by commas, so that the client will automatically attempt to connect
1576 // to another server if one of them stops working for whatever reason.
1577@@ -398,79 +385,73 @@
1578 // The watch channel receives events of type SESSION_EVENT when any change
1579 // to the state of the established connection happens. See the documentation
1580 // for the Event type for more details.
1581-func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {
1582- zk, watch, err = internalInit(servers, recvTimeoutNS, nil)
1583- return
1584+func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) {
1585+ return dial(servers, recvTimeoutNS, nil)
1586 }
1587
1588-// Equivalent to Init, but attempt to reestablish an existing session
1589+// Redial is equivalent to Dial, but attempts to reestablish an existing session
1590 // identified via the clientId parameter.
1591-func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {
1592- zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)
1593- return
1594+func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
1595+ return dial(servers, recvTimeoutNS, clientId)
1596 }
1597
1598-func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {
1599-
1600- zk := &ZooKeeper{}
1601- zk.watchChannels = make(map[uintptr]chan Event)
1602+func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
1603+ conn := &Conn{}
1604+ conn.watchChannels = make(map[uintptr]chan Event)
1605
1606 var cId *C.clientid_t
1607 if clientId != nil {
1608 cId = &clientId.cId
1609 }
1610
1611- watchId, watchChannel := zk.createWatch(true)
1612- zk.sessionWatchId = watchId
1613+ watchId, watchChannel := conn.createWatch(true)
1614+ conn.sessionWatchId = watchId
1615
1616 cservers := C.CString(servers)
1617- handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)
1618+ handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
1619 C.free(unsafe.Pointer(cservers))
1620 if handle == nil {
1621- zk.closeAllWatches()
1622- return nil, nil, newError(ZSYSTEMERROR, cerr)
1623+ conn.closeAllWatches()
1624+ return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
1625 }
1626- zk.handle = handle
1627+ conn.handle = handle
1628 runWatchLoop()
1629- return zk, watchChannel, nil
1630+ return conn, watchChannel, nil
1631 }
1632
1633 // ClientId returns the client ID for the existing session with ZooKeeper.
1634 // This is useful to reestablish an existing session via ReInit.
1635-func (zk *ZooKeeper) ClientId() *ClientId {
1636- return &ClientId{*C.zoo_client_id(zk.handle)}
1637+func (conn *Conn) ClientId() *ClientId {
1638+ return &ClientId{*C.zoo_client_id(conn.handle)}
1639 }
1640
1641 // Close terminates the ZooKeeper interaction.
1642-func (zk *ZooKeeper) Close() Error {
1643-
1644- // Protect from concurrency around zk.handle change.
1645- zk.mutex.Lock()
1646- defer zk.mutex.Unlock()
1647-
1648- if zk.handle == nil {
1649+func (conn *Conn) Close() os.Error {
1650+
1651+ // Protect from concurrency around conn.handle change.
1652+ conn.mutex.Lock()
1653+ defer conn.mutex.Unlock()
1654+
1655+ if conn.handle == nil {
1656 // ZooKeeper may hang indefinitely if a handler is closed twice,
1657 // so we get in the way and prevent it from happening.
1658- return newError(ZCLOSING, nil)
1659+ return ZCLOSING
1660 }
1661- rc, cerr := C.zookeeper_close(zk.handle)
1662+ rc, cerr := C.zookeeper_close(conn.handle)
1663
1664- zk.closeAllWatches()
1665+ conn.closeAllWatches()
1666 stopWatchLoop()
1667
1668- // At this point, nothing else should need zk.handle.
1669- zk.handle = nil
1670+ // At this point, nothing else should need conn.handle.
1671+ conn.handle = nil
1672
1673- if rc != C.ZOK {
1674- return newError(rc, cerr)
1675- }
1676- return nil
1677+ return zkError(rc, cerr)
1678 }
1679
1680 // Get returns the data and status from an existing node. err will be nil,
1681 // unless an error is found. Attempting to retrieve data from a non-existing
1682 // node is an error.
1683-func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {
1684+func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
1685
1686 cpath := C.CString(path)
1687 cbuffer := (*C.char)(C.malloc(bufferSize))
1688@@ -478,22 +459,21 @@
1689 defer C.free(unsafe.Pointer(cpath))
1690 defer C.free(unsafe.Pointer(cbuffer))
1691
1692- cstat := C.struct_Stat{}
1693- rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,
1694- cbuffer, &cbufferLen, &cstat)
1695+ var cstat Stat
1696+ rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c)
1697 if rc != C.ZOK {
1698- return "", nil, newError(rc, cerr)
1699+ return "", nil, zkError(rc, cerr)
1700 }
1701+
1702 result := C.GoStringN(cbuffer, cbufferLen)
1703-
1704- return result, (*resultStat)(&cstat), nil
1705+ return result, &cstat, nil
1706 }
1707
1708 // GetW works like Get but also returns a channel that will receive
1709 // a single Event value when the data or existence of the given ZooKeeper
1710 // node changes or when critical session events happen. See the
1711 // documentation of the Event type for more details.
1712-func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {
1713+func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) {
1714
1715 cpath := C.CString(path)
1716 cbuffer := (*C.char)(C.malloc(bufferSize))
1717@@ -501,42 +481,38 @@
1718 defer C.free(unsafe.Pointer(cpath))
1719 defer C.free(unsafe.Pointer(cbuffer))
1720
1721- watchId, watchChannel := zk.createWatch(true)
1722+ watchId, watchChannel := conn.createWatch(true)
1723
1724- cstat := C.struct_Stat{}
1725- rc, cerr := C.zoo_wget(zk.handle, cpath,
1726- C.watch_handler, unsafe.Pointer(watchId),
1727- cbuffer, &cbufferLen, &cstat)
1728+ var cstat Stat
1729+ rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c)
1730 if rc != C.ZOK {
1731- zk.forgetWatch(watchId)
1732- return "", nil, nil, newError(rc, cerr)
1733+ conn.forgetWatch(watchId)
1734+ return "", nil, nil, zkError(rc, cerr)
1735 }
1736
1737 result := C.GoStringN(cbuffer, cbufferLen)
1738- return result, (*resultStat)(&cstat), watchChannel, nil
1739+ return result, &cstat, watchChannel, nil
1740 }
1741
1742 // Children returns the children list and status from an existing node.
1743-// err will be nil, unless an error is found. Attempting to retrieve the
1744-// children list from a non-existent node is an error.
1745-func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
1746+// Attempting to retrieve the children list from a non-existent node is an error.
1747+func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
1748
1749 cpath := C.CString(path)
1750 defer C.free(unsafe.Pointer(cpath))
1751
1752 cvector := C.struct_String_vector{}
1753- cstat := C.struct_Stat{}
1754- rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,
1755- &cvector, &cstat)
1756+ var cstat Stat
1757+ rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
1758
1759 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
1760 if cvector.count != 0 {
1761 children = parseStringVector(&cvector)
1762 }
1763- if rc != C.ZOK {
1764- err = newError(rc, cerr)
1765+ if rc == C.ZOK {
1766+ stat = &cstat
1767 } else {
1768- stat = (*resultStat)(&cstat)
1769+ err = zkError(rc, cerr)
1770 }
1771 return
1772 }
1773@@ -545,29 +521,27 @@
1774 // receive a single Event value when a node is added or removed under the
1775 // provided path or when critical session events happen. See the documentation
1776 // of the Event type for more details.
1777-func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {
1778+func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) {
1779
1780 cpath := C.CString(path)
1781 defer C.free(unsafe.Pointer(cpath))
1782
1783- watchId, watchChannel := zk.createWatch(true)
1784+ watchId, watchChannel := conn.createWatch(true)
1785
1786 cvector := C.struct_String_vector{}
1787- cstat := C.struct_Stat{}
1788- rc, cerr := C.zoo_wget_children2(zk.handle, cpath,
1789- C.watch_handler, unsafe.Pointer(watchId),
1790- &cvector, &cstat)
1791+ var cstat Stat
1792+ rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c)
1793
1794 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
1795 if cvector.count != 0 {
1796 children = parseStringVector(&cvector)
1797 }
1798- if rc != C.ZOK {
1799- zk.forgetWatch(watchId)
1800- err = newError(rc, cerr)
1801+ if rc == C.ZOK {
1802+ stat = &cstat
1803+ watch = watchChannel
1804 } else {
1805- stat = (*resultStat)(&cstat)
1806- watch = watchChannel
1807+ conn.forgetWatch(watchId)
1808+ err = zkError(rc, cerr)
1809 }
1810 return
1811 }
1812@@ -588,20 +562,20 @@
1813 // Exists checks if a node exists at the given path. If it does,
1814 // stat will contain meta information on the existing node, otherwise
1815 // it will be nil.
1816-func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {
1817+func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) {
1818 cpath := C.CString(path)
1819 defer C.free(unsafe.Pointer(cpath))
1820
1821- cstat := C.struct_Stat{}
1822- rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)
1823+ var cstat Stat
1824+ rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c)
1825
1826 // We diverge a bit from the usual here: a ZNONODE is not an error
1827 // for an exists call, otherwise every Exists call would have to check
1828 // for err != nil and err.Code() != ZNONODE.
1829 if rc == C.ZOK {
1830- stat = (*resultStat)(&cstat)
1831+ stat = &cstat
1832 } else if rc != C.ZNONODE {
1833- err = newError(rc, cerr)
1834+ err = zkError(rc, cerr)
1835 }
1836 return
1837 }
1838@@ -611,28 +585,27 @@
1839 // stat is nil and the node didn't exist, or when the existing node
1840 // is removed. It will also receive critical session events. See the
1841 // documentation of the Event type for more details.
1842-func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {
1843+func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) {
1844 cpath := C.CString(path)
1845 defer C.free(unsafe.Pointer(cpath))
1846
1847- watchId, watchChannel := zk.createWatch(true)
1848+ watchId, watchChannel := conn.createWatch(true)
1849
1850- cstat := C.struct_Stat{}
1851- rc, cerr := C.zoo_wexists(zk.handle, cpath,
1852- C.watch_handler, unsafe.Pointer(watchId), &cstat)
1853+ var cstat Stat
1854+ rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
1855
1856 // We diverge a bit from the usual here: a ZNONODE is not an error
1857 // for an exists call, otherwise every Exists call would have to check
1858 // for err != nil and err.Code() != ZNONODE.
1859- switch rc {
1860+ switch Error(rc) {
1861 case ZOK:
1862- stat = (*resultStat)(&cstat)
1863+ stat = &cstat
1864 watch = watchChannel
1865 case ZNONODE:
1866 watch = watchChannel
1867 default:
1868- zk.forgetWatch(watchId)
1869- err = newError(rc, cerr)
1870+ conn.forgetWatch(watchId)
1871+ err = zkError(rc, cerr)
1872 }
1873 return
1874 }
1875@@ -646,7 +619,7 @@
1876 // The returned path is useful in cases where the created path may differ
1877 // from the requested one, such as when a sequence number is appended
1878 // to it due to the use of the gozk.SEQUENCE flag.
1879-func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {
1880+func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
1881 cpath := C.CString(path)
1882 cvalue := C.CString(value)
1883 defer C.free(unsafe.Pointer(cpath))
1884@@ -660,13 +633,13 @@
1885 cpathCreated := (*C.char)(C.malloc(cpathLen))
1886 defer C.free(unsafe.Pointer(cpathCreated))
1887
1888- rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),
1889- caclv, C.int(flags), cpathCreated, C.int(cpathLen))
1890- if rc != C.ZOK {
1891- return "", newError(rc, cerr)
1892+ rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
1893+ if rc == C.ZOK {
1894+ pathCreated = C.GoString(cpathCreated)
1895+ } else {
1896+ err = zkError(rc, cerr)
1897 }
1898-
1899- return C.GoString(cpathCreated), nil
1900+ return
1901 }
1902
1903 // Set modifies the data for the existing node at the given path, replacing it
1904@@ -677,35 +650,31 @@
1905 //
1906 // It is an error to attempt to set the data of a non-existing node with
1907 // this function. In these cases, use Create instead.
1908-func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {
1909+func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
1910
1911 cpath := C.CString(path)
1912 cvalue := C.CString(value)
1913 defer C.free(unsafe.Pointer(cpath))
1914 defer C.free(unsafe.Pointer(cvalue))
1915
1916- cstat := C.struct_Stat{}
1917-
1918- rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),
1919- C.int(version), &cstat)
1920- if rc != C.ZOK {
1921- return nil, newError(rc, cerr)
1922+ var cstat Stat
1923+ rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c)
1924+ if rc == C.ZOK {
1925+ stat = &cstat
1926+ } else {
1927+ err = zkError(rc, cerr)
1928 }
1929-
1930- return (*resultStat)(&cstat), nil
1931+ return
1932 }
1933
1934 // Delete removes the node at path. If version is not -1, the operation
1935 // will only succeed if the node is still at this version when the
1936 // node is deleted as an atomic operation.
1937-func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {
1938+func (conn *Conn) Delete(path string, version int32) (err os.Error) {
1939 cpath := C.CString(path)
1940 defer C.free(unsafe.Pointer(cpath))
1941- rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))
1942- if rc != C.ZOK {
1943- return newError(rc, cerr)
1944- }
1945- return nil
1946+ rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
1947+ return zkError(rc, cerr)
1948 }
1949
1950 // AddAuth adds a new authentication certificate to the ZooKeeper
1951@@ -713,7 +682,7 @@
1952 // authentication information, while the cert parameter provides the
1953 // identity data itself. For instance, the "digest" scheme requires
1954 // a pair like "username:password" to be provided as the certificate.
1955-func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {
1956+func (conn *Conn) AddAuth(scheme, cert string) os.Error {
1957 cscheme := C.CString(scheme)
1958 ccert := C.CString(cert)
1959 defer C.free(unsafe.Pointer(cscheme))
1960@@ -725,43 +694,38 @@
1961 }
1962 defer C.destroy_completion_data(data)
1963
1964- rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),
1965- C.handle_void_completion, unsafe.Pointer(data))
1966+ rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data))
1967 if rc != C.ZOK {
1968- return newError(rc, cerr)
1969+ return zkError(rc, cerr)
1970 }
1971
1972 C.wait_for_completion(data)
1973
1974 rc = C.int(uintptr(data.data))
1975- if rc != C.ZOK {
1976- return newError(rc, nil)
1977- }
1978-
1979- return nil
1980+ return zkError(rc, nil)
1981 }
1982
1983 // ACL returns the access control list for path.
1984-func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {
1985+func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
1986
1987 cpath := C.CString(path)
1988 defer C.free(unsafe.Pointer(cpath))
1989
1990 caclv := C.struct_ACL_vector{}
1991- cstat := C.struct_Stat{}
1992
1993- rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)
1994+ var cstat Stat
1995+ rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
1996 if rc != C.ZOK {
1997- return nil, nil, newError(rc, cerr)
1998+ return nil, nil, zkError(rc, cerr)
1999 }
2000
2001 aclv := parseACLVector(&caclv)
2002
2003- return aclv, (*resultStat)(&cstat), nil
2004+ return aclv, &cstat, nil
2005 }
2006
2007 // SetACL changes the access control list for path.
2008-func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {
2009+func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
2010
2011 cpath := C.CString(path)
2012 defer C.free(unsafe.Pointer(cpath))
2013@@ -769,12 +733,8 @@
2014 caclv := buildACLVector(aclv)
2015 defer C.deallocate_ACL_vector(caclv)
2016
2017- rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)
2018- if rc != C.ZOK {
2019- return newError(rc, cerr)
2020- }
2021-
2022- return nil
2023+ rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
2024+ return zkError(rc, cerr)
2025 }
2026
2027 func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
2028@@ -822,7 +782,7 @@
2029 // -----------------------------------------------------------------------
2030 // RetryChange utility method.
2031
2032-type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)
2033+type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
2034
2035 // RetryChange runs changeFunc to attempt to atomically change path
2036 // in a lock free manner, and retries in case there was another
2037@@ -831,8 +791,7 @@
2038 // changeFunc must work correctly if called multiple times in case
2039 // the modification fails due to concurrent changes, and it may return
2040 // an error that will cause the the RetryChange function to stop and
2041-// return an Error with code ZSYSTEMERROR and the same .String() result
2042-// as the provided error.
2043+// return the same error.
2044 //
2045 // This mechanism is not suitable for a node that is frequently modified
2046 // concurrently. For those cases, consider using a pessimistic locking
2047@@ -845,8 +804,7 @@
2048 //
2049 // 2. Call the changeFunc with the current node value and stat,
2050 // or with an empty string and nil stat, if the node doesn't yet exist.
2051-// If the changeFunc returns an error, stop and return an Error with
2052-// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
2053+// If the changeFunc returns an error, stop and return the same error.
2054 //
2055 // 3. If the changeFunc returns no errors, use the string returned as
2056 // the new candidate value for the node, and attempt to either create
2057@@ -855,36 +813,32 @@
2058 // in the same node), repeat from step 1. If this procedure fails with any
2059 // other error, stop and return the error found.
2060 //
2061-func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {
2062+func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
2063 for {
2064- oldValue, oldStat, getErr := zk.Get(path)
2065- if getErr != nil && getErr.Code() != ZNONODE {
2066- err = getErr
2067- break
2068- }
2069- newValue, osErr := changeFunc(oldValue, oldStat)
2070- if osErr != nil {
2071- return newError(ZSYSTEMERROR, osErr)
2072- } else if oldStat == nil {
2073- _, err = zk.Create(path, newValue, flags, acl)
2074- if err == nil || err.Code() != ZNODEEXISTS {
2075- break
2076+ oldValue, oldStat, err := conn.Get(path)
2077+ if err != nil && err != ZNONODE {
2078+ return err
2079+ }
2080+ newValue, err := changeFunc(oldValue, oldStat)
2081+ if err != nil {
2082+ return err
2083+ }
2084+ if oldStat == nil {
2085+ _, err := conn.Create(path, newValue, flags, acl)
2086+ if err == nil || err != ZNODEEXISTS {
2087+ return err
2088 }
2089- } else if newValue == oldValue {
2090+ continue
2091+ }
2092+ if newValue == oldValue {
2093 return nil // Nothing to do.
2094- } else {
2095- _, err = zk.Set(path, newValue, oldStat.Version())
2096- if err == nil {
2097- break
2098- } else {
2099- code := err.Code()
2100- if code != ZBADVERSION && code != ZNONODE {
2101- break
2102- }
2103- }
2104+ }
2105+ _, err = conn.Set(path, newValue, oldStat.Version())
2106+ if err == nil || (err != ZBADVERSION && err != ZNONODE) {
2107+ return err
2108 }
2109 }
2110- return err
2111+ panic("not reached")
2112 }
2113
2114 // -----------------------------------------------------------------------
2115@@ -897,7 +851,7 @@
2116 // Whenever a *W method is called, it will return a channel which
2117 // outputs Event values. Internally, a map is used to maintain references
2118 // between an unique integer key (the watchId), and the event channel. The
2119-// watchId is then handed to the C zookeeper library as the watch context,
2120+// watchId is then handed to the C ZooKeeper library as the watch context,
2121 // so that we get it back when events happen. Using an integer key as the
2122 // watch context rather than a pointer is needed because there's no guarantee
2123 // that in the future the GC will not move objects around, and also because
2124@@ -910,13 +864,13 @@
2125 // Since Cgo doesn't allow calling back into Go, we actually fire a new
2126 // goroutine the very first time Init is called, and allow it to block
2127 // in a pthread condition variable within a C function. This condition
2128-// will only be notified once a zookeeper watch callback appends new
2129+// will only be notified once a ZooKeeper watch callback appends new
2130 // entries to the event list. When this happens, the C function returns
2131 // and we get back into Go land with the pointer to the watch data,
2132 // including the watchId and other event details such as type and path.
2133
2134 var watchMutex sync.Mutex
2135-var watchZooKeepers = make(map[uintptr]*ZooKeeper)
2136+var watchConns = make(map[uintptr]*Conn)
2137 var watchCounter uintptr
2138 var watchLoopCounter int
2139
2140@@ -925,14 +879,14 @@
2141 // mostly as a debugging and testing aid.
2142 func CountPendingWatches() int {
2143 watchMutex.Lock()
2144- count := len(watchZooKeepers)
2145+ count := len(watchConns)
2146 watchMutex.Unlock()
2147 return count
2148 }
2149
2150 // createWatch creates and registers a watch, returning the watch id
2151 // and channel.
2152-func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
2153+func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
2154 buf := 1 // session/watch event
2155 if session {
2156 buf = 32
2157@@ -942,8 +896,8 @@
2158 defer watchMutex.Unlock()
2159 watchId = watchCounter
2160 watchCounter += 1
2161- zk.watchChannels[watchId] = watchChannel
2162- watchZooKeepers[watchId] = zk
2163+ conn.watchChannels[watchId] = watchChannel
2164+ watchConns[watchId] = conn
2165 return
2166 }
2167
2168@@ -951,21 +905,21 @@
2169 // from ever getting delivered. It shouldn't be used if there's any
2170 // chance the watch channel is still visible and not closed, since
2171 // it might mean a goroutine would be blocked forever.
2172-func (zk *ZooKeeper) forgetWatch(watchId uintptr) {
2173+func (conn *Conn) forgetWatch(watchId uintptr) {
2174 watchMutex.Lock()
2175 defer watchMutex.Unlock()
2176- zk.watchChannels[watchId] = nil, false
2177- watchZooKeepers[watchId] = nil, false
2178+ conn.watchChannels[watchId] = nil, false
2179+ watchConns[watchId] = nil, false
2180 }
2181
2182-// closeAllWatches closes all watch channels for zk.
2183-func (zk *ZooKeeper) closeAllWatches() {
2184+// closeAllWatches closes all watch channels for conn.
2185+func (conn *Conn) closeAllWatches() {
2186 watchMutex.Lock()
2187 defer watchMutex.Unlock()
2188- for watchId, ch := range zk.watchChannels {
2189+ for watchId, ch := range conn.watchChannels {
2190 close(ch)
2191- zk.watchChannels[watchId] = nil, false
2192- watchZooKeepers[watchId] = nil, false
2193+ conn.watchChannels[watchId] = nil, false
2194+ watchConns[watchId] = nil, false
2195 }
2196 }
2197
2198@@ -978,11 +932,11 @@
2199 }
2200 watchMutex.Lock()
2201 defer watchMutex.Unlock()
2202- zk, ok := watchZooKeepers[watchId]
2203+ conn, ok := watchConns[watchId]
2204 if !ok {
2205 return
2206 }
2207- if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId {
2208+ if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
2209 switch event.State {
2210 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
2211 default:
2212@@ -990,7 +944,7 @@
2213 return
2214 }
2215 }
2216- ch := zk.watchChannels[watchId]
2217+ ch := conn.watchChannels[watchId]
2218 if ch == nil {
2219 return
2220 }
2221@@ -1002,15 +956,15 @@
2222 // straight to the buffer), and the application isn't paying
2223 // attention for long enough to have the buffer filled up.
2224 // Break down now rather than leaking forever.
2225- if watchId == zk.sessionWatchId {
2226+ if watchId == conn.sessionWatchId {
2227 panic("Session event channel buffer is full")
2228 } else {
2229 panic("Watch event channel buffer is full")
2230 }
2231 }
2232- if watchId != zk.sessionWatchId {
2233- zk.watchChannels[watchId] = nil, false
2234- watchZooKeepers[watchId] = nil, false
2235+ if watchId != conn.sessionWatchId {
2236+ conn.watchChannels[watchId] = nil, false
2237+ watchConns[watchId] = nil, false
2238 close(ch)
2239 }
2240 }
2241
2242=== renamed file 'gozk_test.go' => 'zk_test.go'
2243--- gozk_test.go 2011-08-19 01:51:37 +0000
2244+++ zk_test.go 2011-10-03 15:02:28 +0000
2245@@ -1,17 +1,17 @@
2246-package gozk_test
2247+package zk_test
2248
2249 import (
2250 . "launchpad.net/gocheck"
2251- "gozk"
2252+ "launchpad.net/gozk/zk"
2253 "time"
2254 )
2255
2256 // This error will be delivered via C errno, since ZK unfortunately
2257 // only provides the handler back from zookeeper_init().
2258 func (s *S) TestInitErrorThroughErrno(c *C) {
2259- zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)
2260- if zk != nil {
2261- zk.Close()
2262+ conn, watch, err := zk.Dial("bad-domain-without-port", 5e9)
2263+ if conn != nil {
2264+ conn.Close()
2265 }
2266 if watch != nil {
2267 go func() {
2268@@ -23,15 +23,15 @@
2269 }
2270 }()
2271 }
2272- c.Assert(zk, IsNil)
2273+ c.Assert(conn, IsNil)
2274 c.Assert(watch, IsNil)
2275 c.Assert(err, Matches, "invalid argument")
2276 }
2277
2278 func (s *S) TestRecvTimeoutInitParameter(c *C) {
2279- zk, watch, err := gozk.Init(s.zkAddr, 0)
2280+ conn, watch, err := zk.Dial(s.zkAddr, 0)
2281 c.Assert(err, IsNil)
2282- defer zk.Close()
2283+ defer conn.Close()
2284
2285 select {
2286 case <-watch:
2287@@ -40,7 +40,7 @@
2288 }
2289
2290 for i := 0; i != 1000; i++ {
2291- _, _, err := zk.Get("/zookeeper")
2292+ _, _, err := conn.Get("/zookeeper")
2293 if err != nil {
2294 c.Assert(err, Matches, "operation timeout")
2295 c.SucceedNow()
2296@@ -51,76 +51,79 @@
2297 }
2298
2299 func (s *S) TestSessionWatches(c *C) {
2300- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2301+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2302
2303 zk1, watch1 := s.init(c)
2304 zk2, watch2 := s.init(c)
2305 zk3, watch3 := s.init(c)
2306
2307- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2308+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2309
2310 event1 := <-watch1
2311- c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)
2312- c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)
2313+ c.Assert(event1.Type, Equals, zk.EVENT_SESSION)
2314+ c.Assert(event1.State, Equals, zk.STATE_CONNECTED)
2315
2316- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2317+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2318
2319 event2 := <-watch2
2320- c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)
2321- c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)
2322+ c.Assert(event2.Type, Equals, zk.EVENT_SESSION)
2323+ c.Assert(event2.State, Equals, zk.STATE_CONNECTED)
2324
2325- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2326+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2327
2328 event3 := <-watch3
2329- c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)
2330- c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)
2331+ c.Assert(event3.Type, Equals, zk.EVENT_SESSION)
2332+ c.Assert(event3.State, Equals, zk.STATE_CONNECTED)
2333
2334- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2335+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2336
2337 zk1.Close()
2338- c.Assert(gozk.CountPendingWatches(), Equals, 2)
2339+ c.Assert(zk.CountPendingWatches(), Equals, 2)
2340 zk2.Close()
2341- c.Assert(gozk.CountPendingWatches(), Equals, 1)
2342+ c.Assert(zk.CountPendingWatches(), Equals, 1)
2343 zk3.Close()
2344- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2345+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2346 }
2347
2348-// Gozk injects a STATE_CLOSED event when zk.Close() is called, right
2349+// Gozk injects a STATE_CLOSED event when conn.Close() is called, right
2350 // before the channel is closed. Closing the channel injects a nil
2351 // pointer, as usual for Go, so the STATE_CLOSED gives a chance to
2352 // know that a nil pointer is coming, and to stop the procedure.
2353 // Hopefully this procedure will avoid some nil-pointer references by
2354 // mistake.
2355 func (s *S) TestClosingStateInSessionWatch(c *C) {
2356- zk, watch := s.init(c)
2357+ conn, watch := s.init(c)
2358
2359 event := <-watch
2360- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
2361- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
2362+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
2363+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
2364
2365- zk.Close()
2366+ conn.Close()
2367 event, ok := <-watch
2368 c.Assert(ok, Equals, false)
2369- c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)
2370- c.Assert(event.State, Equals, gozk.STATE_CLOSED)
2371+ c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
2372+ c.Assert(event.State, Equals, zk.STATE_CLOSED)
2373 }
2374
2375 func (s *S) TestEventString(c *C) {
2376- var event gozk.Event
2377- event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}
2378+ var event zk.Event
2379+ event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED}
2380 c.Assert(event, Matches, "ZooKeeper connected")
2381- event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}
2382+ event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED}
2383 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")
2384- event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}
2385+ event = zk.Event{-1, "/path", zk.STATE_CLOSED}
2386 c.Assert(event, Matches, "ZooKeeper connection closed")
2387 }
2388
2389-var okTests = []struct{gozk.Event; Ok bool}{
2390- {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},
2391- {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},
2392- {gozk.Event{0, "", gozk.STATE_CLOSED}, false},
2393- {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},
2394- {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},
2395+var okTests = []struct {
2396+ zk.Event
2397+ Ok bool
2398+}{
2399+ {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true},
2400+ {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true},
2401+ {zk.Event{0, "", zk.STATE_CLOSED}, false},
2402+ {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false},
2403+ {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false},
2404 }
2405
2406 func (s *S) TestEventOk(c *C) {
2407@@ -130,9 +133,9 @@
2408 }
2409
2410 func (s *S) TestGetAndStat(c *C) {
2411- zk, _ := s.init(c)
2412+ conn, _ := s.init(c)
2413
2414- data, stat, err := zk.Get("/zookeeper")
2415+ data, stat, err := conn.Get("/zookeeper")
2416 c.Assert(err, IsNil)
2417 c.Assert(data, Equals, "")
2418 c.Assert(stat.Czxid(), Equals, int64(0))
2419@@ -149,58 +152,58 @@
2420 }
2421
2422 func (s *S) TestGetAndError(c *C) {
2423- zk, _ := s.init(c)
2424+ conn, _ := s.init(c)
2425
2426- data, stat, err := zk.Get("/non-existent")
2427+ data, stat, err := conn.Get("/non-existent")
2428
2429 c.Assert(data, Equals, "")
2430 c.Assert(stat, IsNil)
2431 c.Assert(err, Matches, "no node")
2432- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2433+ c.Assert(err, Equals, zk.ZNONODE)
2434 }
2435
2436 func (s *S) TestCreateAndGet(c *C) {
2437- zk, _ := s.init(c)
2438+ conn, _ := s.init(c)
2439
2440- path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2441+ path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2442 c.Assert(err, IsNil)
2443 c.Assert(path, Matches, "/test-[0-9]+")
2444
2445 // Check the error condition from Create().
2446- _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2447+ _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2448 c.Assert(err, Matches, "node exists")
2449
2450- data, _, err := zk.Get(path)
2451+ data, _, err := conn.Get(path)
2452 c.Assert(err, IsNil)
2453 c.Assert(data, Equals, "bababum")
2454 }
2455
2456 func (s *S) TestCreateSetAndGet(c *C) {
2457- zk, _ := s.init(c)
2458+ conn, _ := s.init(c)
2459
2460- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2461+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2462 c.Assert(err, IsNil)
2463
2464- stat, err := zk.Set("/test", "bababum", -1) // Any version.
2465+ stat, err := conn.Set("/test", "bababum", -1) // Any version.
2466 c.Assert(err, IsNil)
2467 c.Assert(stat.Version(), Equals, int32(1))
2468
2469- data, _, err := zk.Get("/test")
2470+ data, _, err := conn.Get("/test")
2471 c.Assert(err, IsNil)
2472 c.Assert(data, Equals, "bababum")
2473 }
2474
2475 func (s *S) TestGetAndWatch(c *C) {
2476- c.Check(gozk.CountPendingWatches(), Equals, 0)
2477-
2478- zk, _ := s.init(c)
2479-
2480- c.Check(gozk.CountPendingWatches(), Equals, 1)
2481-
2482- _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2483+ c.Check(zk.CountPendingWatches(), Equals, 0)
2484+
2485+ conn, _ := s.init(c)
2486+
2487+ c.Check(zk.CountPendingWatches(), Equals, 1)
2488+
2489+ _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2490 c.Assert(err, IsNil)
2491
2492- data, stat, watch, err := zk.GetW("/test")
2493+ data, stat, watch, err := conn.GetW("/test")
2494 c.Assert(err, IsNil)
2495 c.Assert(data, Equals, "one")
2496 c.Assert(stat.Version(), Equals, int32(0))
2497@@ -211,17 +214,17 @@
2498 default:
2499 }
2500
2501- c.Check(gozk.CountPendingWatches(), Equals, 2)
2502+ c.Check(zk.CountPendingWatches(), Equals, 2)
2503
2504- _, err = zk.Set("/test", "two", -1)
2505+ _, err = conn.Set("/test", "two", -1)
2506 c.Assert(err, IsNil)
2507
2508 event := <-watch
2509- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2510-
2511- c.Check(gozk.CountPendingWatches(), Equals, 1)
2512-
2513- data, _, watch, err = zk.GetW("/test")
2514+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2515+
2516+ c.Check(zk.CountPendingWatches(), Equals, 1)
2517+
2518+ data, _, watch, err = conn.GetW("/test")
2519 c.Assert(err, IsNil)
2520 c.Assert(data, Equals, "two")
2521
2522@@ -231,86 +234,86 @@
2523 default:
2524 }
2525
2526- c.Check(gozk.CountPendingWatches(), Equals, 2)
2527+ c.Check(zk.CountPendingWatches(), Equals, 2)
2528
2529- _, err = zk.Set("/test", "three", -1)
2530+ _, err = conn.Set("/test", "three", -1)
2531 c.Assert(err, IsNil)
2532
2533 event = <-watch
2534- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2535+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2536
2537- c.Check(gozk.CountPendingWatches(), Equals, 1)
2538+ c.Check(zk.CountPendingWatches(), Equals, 1)
2539 }
2540
2541 func (s *S) TestGetAndWatchWithError(c *C) {
2542- c.Check(gozk.CountPendingWatches(), Equals, 0)
2543-
2544- zk, _ := s.init(c)
2545-
2546- c.Check(gozk.CountPendingWatches(), Equals, 1)
2547-
2548- _, _, watch, err := zk.GetW("/test")
2549+ c.Check(zk.CountPendingWatches(), Equals, 0)
2550+
2551+ conn, _ := s.init(c)
2552+
2553+ c.Check(zk.CountPendingWatches(), Equals, 1)
2554+
2555+ _, _, watch, err := conn.GetW("/test")
2556 c.Assert(err, NotNil)
2557- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2558+ c.Assert(err, Equals, zk.ZNONODE)
2559 c.Assert(watch, IsNil)
2560
2561- c.Check(gozk.CountPendingWatches(), Equals, 1)
2562+ c.Check(zk.CountPendingWatches(), Equals, 1)
2563 }
2564
2565 func (s *S) TestCloseReleasesWatches(c *C) {
2566- c.Check(gozk.CountPendingWatches(), Equals, 0)
2567-
2568- zk, _ := s.init(c)
2569-
2570- c.Check(gozk.CountPendingWatches(), Equals, 1)
2571-
2572- _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2573- c.Assert(err, IsNil)
2574-
2575- _, _, _, err = zk.GetW("/test")
2576- c.Assert(err, IsNil)
2577-
2578- c.Assert(gozk.CountPendingWatches(), Equals, 2)
2579-
2580- zk.Close()
2581-
2582- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2583+ c.Check(zk.CountPendingWatches(), Equals, 0)
2584+
2585+ conn, _ := s.init(c)
2586+
2587+ c.Check(zk.CountPendingWatches(), Equals, 1)
2588+
2589+ _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2590+ c.Assert(err, IsNil)
2591+
2592+ _, _, _, err = conn.GetW("/test")
2593+ c.Assert(err, IsNil)
2594+
2595+ c.Assert(zk.CountPendingWatches(), Equals, 2)
2596+
2597+ conn.Close()
2598+
2599+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2600 }
2601
2602 // By default, the ZooKeeper C client will hang indefinitely if a
2603 // handler is closed twice. We get in the way and prevent it.
2604 func (s *S) TestClosingTwiceDoesntHang(c *C) {
2605- zk, _ := s.init(c)
2606- err := zk.Close()
2607+ conn, _ := s.init(c)
2608+ err := conn.Close()
2609 c.Assert(err, IsNil)
2610- err = zk.Close()
2611+ err = conn.Close()
2612 c.Assert(err, NotNil)
2613- c.Assert(err.Code(), Equals, gozk.ZCLOSING)
2614+ c.Assert(err, Equals, zk.ZCLOSING)
2615 }
2616
2617 func (s *S) TestChildren(c *C) {
2618- zk, _ := s.init(c)
2619+ conn, _ := s.init(c)
2620
2621- children, stat, err := zk.Children("/")
2622+ children, stat, err := conn.Children("/")
2623 c.Assert(err, IsNil)
2624 c.Assert(children, Equals, []string{"zookeeper"})
2625 c.Assert(stat.NumChildren(), Equals, int32(1))
2626
2627- children, stat, err = zk.Children("/non-existent")
2628+ children, stat, err = conn.Children("/non-existent")
2629 c.Assert(err, NotNil)
2630- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2631+ c.Assert(err, Equals, zk.ZNONODE)
2632 c.Assert(children, Equals, []string{})
2633- c.Assert(stat, Equals, nil)
2634+ c.Assert(stat, IsNil)
2635 }
2636
2637 func (s *S) TestChildrenAndWatch(c *C) {
2638- c.Check(gozk.CountPendingWatches(), Equals, 0)
2639-
2640- zk, _ := s.init(c)
2641-
2642- c.Check(gozk.CountPendingWatches(), Equals, 1)
2643-
2644- children, stat, watch, err := zk.ChildrenW("/")
2645+ c.Check(zk.CountPendingWatches(), Equals, 0)
2646+
2647+ conn, _ := s.init(c)
2648+
2649+ c.Check(zk.CountPendingWatches(), Equals, 1)
2650+
2651+ children, stat, watch, err := conn.ChildrenW("/")
2652 c.Assert(err, IsNil)
2653 c.Assert(children, Equals, []string{"zookeeper"})
2654 c.Assert(stat.NumChildren(), Equals, int32(1))
2655@@ -321,18 +324,18 @@
2656 default:
2657 }
2658
2659- c.Check(gozk.CountPendingWatches(), Equals, 2)
2660+ c.Check(zk.CountPendingWatches(), Equals, 2)
2661
2662- _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2663+ _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2664 c.Assert(err, IsNil)
2665
2666 event := <-watch
2667- c.Assert(event.Type, Equals, gozk.EVENT_CHILD)
2668+ c.Assert(event.Type, Equals, zk.EVENT_CHILD)
2669 c.Assert(event.Path, Equals, "/")
2670
2671- c.Check(gozk.CountPendingWatches(), Equals, 1)
2672+ c.Check(zk.CountPendingWatches(), Equals, 1)
2673
2674- children, stat, watch, err = zk.ChildrenW("/")
2675+ children, stat, watch, err = conn.ChildrenW("/")
2676 c.Assert(err, IsNil)
2677 c.Assert(stat.NumChildren(), Equals, int32(2))
2678
2679@@ -345,57 +348,56 @@
2680 default:
2681 }
2682
2683- c.Check(gozk.CountPendingWatches(), Equals, 2)
2684+ c.Check(zk.CountPendingWatches(), Equals, 2)
2685
2686- _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2687+ _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2688 c.Assert(err, IsNil)
2689
2690 event = <-watch
2691- c.Assert(event.Type, Equals, gozk.EVENT_CHILD)
2692+ c.Assert(event.Type, Equals, zk.EVENT_CHILD)
2693
2694- c.Check(gozk.CountPendingWatches(), Equals, 1)
2695+ c.Check(zk.CountPendingWatches(), Equals, 1)
2696 }
2697
2698 func (s *S) TestChildrenAndWatchWithError(c *C) {
2699- c.Check(gozk.CountPendingWatches(), Equals, 0)
2700-
2701- zk, _ := s.init(c)
2702-
2703- c.Check(gozk.CountPendingWatches(), Equals, 1)
2704-
2705- _, stat, watch, err := zk.ChildrenW("/test")
2706+ c.Check(zk.CountPendingWatches(), Equals, 0)
2707+
2708+ conn, _ := s.init(c)
2709+
2710+ c.Check(zk.CountPendingWatches(), Equals, 1)
2711+
2712+ _, stat, watch, err := conn.ChildrenW("/test")
2713 c.Assert(err, NotNil)
2714- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2715+ c.Assert(err, Equals, zk.ZNONODE)
2716 c.Assert(watch, IsNil)
2717 c.Assert(stat, IsNil)
2718
2719- c.Check(gozk.CountPendingWatches(), Equals, 1)
2720+ c.Check(zk.CountPendingWatches(), Equals, 1)
2721 }
2722
2723 func (s *S) TestExists(c *C) {
2724- zk, _ := s.init(c)
2725-
2726- stat, err := zk.Exists("/zookeeper")
2727- c.Assert(err, IsNil)
2728- c.Assert(stat.NumChildren(), Equals, int32(1))
2729-
2730- stat, err = zk.Exists("/non-existent")
2731+ conn, _ := s.init(c)
2732+
2733+ stat, err := conn.Exists("/non-existent")
2734 c.Assert(err, IsNil)
2735 c.Assert(stat, IsNil)
2736+
2737+ stat, err = conn.Exists("/zookeeper")
2738+ c.Assert(err, IsNil)
2739 }
2740
2741 func (s *S) TestExistsAndWatch(c *C) {
2742- c.Check(gozk.CountPendingWatches(), Equals, 0)
2743-
2744- zk, _ := s.init(c)
2745-
2746- c.Check(gozk.CountPendingWatches(), Equals, 1)
2747-
2748- stat, watch, err := zk.ExistsW("/test")
2749+ c.Check(zk.CountPendingWatches(), Equals, 0)
2750+
2751+ conn, _ := s.init(c)
2752+
2753+ c.Check(zk.CountPendingWatches(), Equals, 1)
2754+
2755+ stat, watch, err := conn.ExistsW("/test")
2756 c.Assert(err, IsNil)
2757 c.Assert(stat, IsNil)
2758
2759- c.Check(gozk.CountPendingWatches(), Equals, 2)
2760+ c.Check(zk.CountPendingWatches(), Equals, 2)
2761
2762 select {
2763 case <-watch:
2764@@ -403,62 +405,62 @@
2765 default:
2766 }
2767
2768- _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2769+ _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2770 c.Assert(err, IsNil)
2771
2772 event := <-watch
2773- c.Assert(event.Type, Equals, gozk.EVENT_CREATED)
2774+ c.Assert(event.Type, Equals, zk.EVENT_CREATED)
2775 c.Assert(event.Path, Equals, "/test")
2776
2777- c.Check(gozk.CountPendingWatches(), Equals, 1)
2778+ c.Check(zk.CountPendingWatches(), Equals, 1)
2779
2780- stat, watch, err = zk.ExistsW("/test")
2781+ stat, watch, err = conn.ExistsW("/test")
2782 c.Assert(err, IsNil)
2783 c.Assert(stat, NotNil)
2784 c.Assert(stat.NumChildren(), Equals, int32(0))
2785
2786- c.Check(gozk.CountPendingWatches(), Equals, 2)
2787+ c.Check(zk.CountPendingWatches(), Equals, 2)
2788 }
2789
2790 func (s *S) TestExistsAndWatchWithError(c *C) {
2791- c.Check(gozk.CountPendingWatches(), Equals, 0)
2792-
2793- zk, _ := s.init(c)
2794-
2795- c.Check(gozk.CountPendingWatches(), Equals, 1)
2796-
2797- stat, watch, err := zk.ExistsW("///")
2798+ c.Check(zk.CountPendingWatches(), Equals, 0)
2799+
2800+ conn, _ := s.init(c)
2801+
2802+ c.Check(zk.CountPendingWatches(), Equals, 1)
2803+
2804+ stat, watch, err := conn.ExistsW("///")
2805 c.Assert(err, NotNil)
2806- c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)
2807+ c.Assert(err, Equals, zk.ZBADARGUMENTS)
2808 c.Assert(stat, IsNil)
2809 c.Assert(watch, IsNil)
2810
2811- c.Check(gozk.CountPendingWatches(), Equals, 1)
2812+ c.Check(zk.CountPendingWatches(), Equals, 1)
2813 }
2814
2815 func (s *S) TestDelete(c *C) {
2816- zk, _ := s.init(c)
2817-
2818- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2819- c.Assert(err, IsNil)
2820-
2821- err = zk.Delete("/test", 5)
2822- c.Assert(err, NotNil)
2823- c.Assert(err.Code(), Equals, gozk.ZBADVERSION)
2824-
2825- err = zk.Delete("/test", -1)
2826- c.Assert(err, IsNil)
2827-
2828- err = zk.Delete("/test", -1)
2829- c.Assert(err, NotNil)
2830- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2831+ conn, _ := s.init(c)
2832+
2833+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2834+ c.Assert(err, IsNil)
2835+
2836+ err = conn.Delete("/test", 5)
2837+ c.Assert(err, NotNil)
2838+ c.Assert(err, Equals, zk.ZBADVERSION)
2839+
2840+ err = conn.Delete("/test", -1)
2841+ c.Assert(err, IsNil)
2842+
2843+ err = conn.Delete("/test", -1)
2844+ c.Assert(err, NotNil)
2845+ c.Assert(err, Equals, zk.ZNONODE)
2846 }
2847
2848 func (s *S) TestClientIdAndReInit(c *C) {
2849 zk1, _ := s.init(c)
2850 clientId1 := zk1.ClientId()
2851
2852- zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)
2853+ zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1)
2854 c.Assert(err, IsNil)
2855 defer zk2.Close()
2856 clientId2 := zk2.ClientId()
2857@@ -469,110 +471,114 @@
2858 // Surprisingly for some (including myself, initially), the watch
2859 // returned by the exists method actually fires on data changes too.
2860 func (s *S) TestExistsWatchOnDataChange(c *C) {
2861- zk, _ := s.init(c)
2862-
2863- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2864- c.Assert(err, IsNil)
2865-
2866- _, watch, err := zk.ExistsW("/test")
2867- c.Assert(err, IsNil)
2868-
2869- _, err = zk.Set("/test", "new", -1)
2870+ conn, _ := s.init(c)
2871+
2872+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2873+ c.Assert(err, IsNil)
2874+
2875+ _, watch, err := conn.ExistsW("/test")
2876+ c.Assert(err, IsNil)
2877+
2878+ _, err = conn.Set("/test", "new", -1)
2879 c.Assert(err, IsNil)
2880
2881 event := <-watch
2882
2883 c.Assert(event.Path, Equals, "/test")
2884- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2885+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2886 }
2887
2888 func (s *S) TestACL(c *C) {
2889- zk, _ := s.init(c)
2890-
2891- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2892- c.Assert(err, IsNil)
2893-
2894- acl, stat, err := zk.ACL("/test")
2895- c.Assert(err, IsNil)
2896- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
2897+ conn, _ := s.init(c)
2898+
2899+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2900+ c.Assert(err, IsNil)
2901+
2902+ acl, stat, err := conn.ACL("/test")
2903+ c.Assert(err, IsNil)
2904+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
2905 c.Assert(stat, NotNil)
2906 c.Assert(stat.Version(), Equals, int32(0))
2907
2908- acl, stat, err = zk.ACL("/non-existent")
2909+ acl, stat, err = conn.ACL("/non-existent")
2910 c.Assert(err, NotNil)
2911- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2912+ c.Assert(err, Equals, zk.ZNONODE)
2913 c.Assert(acl, IsNil)
2914 c.Assert(stat, IsNil)
2915 }
2916
2917 func (s *S) TestSetACL(c *C) {
2918- zk, _ := s.init(c)
2919+ conn, _ := s.init(c)
2920
2921- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2922+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2923 c.Assert(err, IsNil)
2924
2925- err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)
2926+ err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5)
2927 c.Assert(err, NotNil)
2928- c.Assert(err.Code(), Equals, gozk.ZBADVERSION)
2929-
2930- err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)
2931- c.Assert(err, IsNil)
2932-
2933- acl, _, err := zk.ACL("/test")
2934- c.Assert(err, IsNil)
2935- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))
2936+ c.Assert(err, Equals, zk.ZBADVERSION)
2937+
2938+ err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1)
2939+ c.Assert(err, IsNil)
2940+
2941+ acl, _, err := conn.ACL("/test")
2942+ c.Assert(err, IsNil)
2943+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
2944 }
2945
2946 func (s *S) TestAddAuth(c *C) {
2947- zk, _ := s.init(c)
2948-
2949- acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
2950-
2951- _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)
2952+ conn, _ := s.init(c)
2953+
2954+ acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
2955+
2956+ _, err := conn.Create("/test", "", zk.EPHEMERAL, acl)
2957 c.Assert(err, IsNil)
2958
2959- _, _, err = zk.Get("/test")
2960+ _, _, err = conn.Get("/test")
2961 c.Assert(err, NotNil)
2962- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
2963+ c.Assert(err, Equals, zk.ZNOAUTH)
2964
2965- err = zk.AddAuth("digest", "joe:passwd")
2966+ err = conn.AddAuth("digest", "joe:passwd")
2967 c.Assert(err, IsNil)
2968
2969- _, _, err = zk.Get("/test")
2970+ _, _, err = conn.Get("/test")
2971 c.Assert(err, IsNil)
2972 }
2973
2974 func (s *S) TestWatchOnReconnection(c *C) {
2975- c.Check(gozk.CountPendingWatches(), Equals, 0)
2976+ c.Check(zk.CountPendingWatches(), Equals, 0)
2977
2978- zk, session := s.init(c)
2979+ conn, session := s.init(c)
2980
2981 event := <-session
2982- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
2983- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
2984-
2985- c.Check(gozk.CountPendingWatches(), Equals, 1)
2986-
2987- stat, watch, err := zk.ExistsW("/test")
2988+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
2989+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
2990+
2991+ c.Check(zk.CountPendingWatches(), Equals, 1)
2992+
2993+ stat, watch, err := conn.ExistsW("/test")
2994 c.Assert(err, IsNil)
2995 c.Assert(stat, IsNil)
2996
2997- c.Check(gozk.CountPendingWatches(), Equals, 2)
2998-
2999- s.StopZK()
3000+ c.Check(zk.CountPendingWatches(), Equals, 2)
3001+
3002+ err = s.zkServer.Stop()
3003+ c.Assert(err, IsNil)
3004+
3005 time.Sleep(2e9)
3006- s.StartZK()
3007
3008- // The session channel should receive the reconnection notification,
3009+ err = s.zkServer.Start()
3010+ c.Assert(err, IsNil)
3011+
3012+ // The session channel should receive the reconnection notification.
3013 select {
3014 case event := <-session:
3015- c.Assert(event.State, Equals, gozk.STATE_CONNECTING)
3016+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
3017 case <-time.After(3e9):
3018 c.Fatal("Session watch didn't fire")
3019 }
3020 select {
3021 case event := <-session:
3022- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
3023+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
3024 case <-time.After(3e9):
3025 c.Fatal("Session watch didn't fire")
3026 }
3027@@ -585,40 +591,40 @@
3028 }
3029
3030 // And it should still work.
3031- _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3032+ _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3033 c.Assert(err, IsNil)
3034
3035 event = <-watch
3036- c.Assert(event.Type, Equals, gozk.EVENT_CREATED)
3037+ c.Assert(event.Type, Equals, zk.EVENT_CREATED)
3038 c.Assert(event.Path, Equals, "/test")
3039
3040- c.Check(gozk.CountPendingWatches(), Equals, 1)
3041+ c.Check(zk.CountPendingWatches(), Equals, 1)
3042 }
3043
3044 func (s *S) TestWatchOnSessionExpiration(c *C) {
3045- c.Check(gozk.CountPendingWatches(), Equals, 0)
3046+ c.Check(zk.CountPendingWatches(), Equals, 0)
3047
3048- zk, session := s.init(c)
3049+ conn, session := s.init(c)
3050
3051 event := <-session
3052- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
3053- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
3054-
3055- c.Check(gozk.CountPendingWatches(), Equals, 1)
3056-
3057- stat, watch, err := zk.ExistsW("/test")
3058+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
3059+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
3060+
3061+ c.Check(zk.CountPendingWatches(), Equals, 1)
3062+
3063+ stat, watch, err := conn.ExistsW("/test")
3064 c.Assert(err, IsNil)
3065 c.Assert(stat, IsNil)
3066
3067- c.Check(gozk.CountPendingWatches(), Equals, 2)
3068+ c.Check(zk.CountPendingWatches(), Equals, 2)
3069
3070 // Use expiration trick described in the FAQ.
3071- clientId := zk.ClientId()
3072- zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)
3073+ clientId := conn.ClientId()
3074+ zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId)
3075
3076 for event := range session2 {
3077 c.Log("Event from overlapping session: ", event)
3078- if event.State == gozk.STATE_CONNECTED {
3079+ if event.State == zk.STATE_CONNECTED {
3080 // Wait for zk to process the connection.
3081 // Not reliable without this. :-(
3082 time.Sleep(1e9)
3083@@ -627,21 +633,21 @@
3084 }
3085 for event := range session {
3086 c.Log("Event from primary session: ", event)
3087- if event.State == gozk.STATE_EXPIRED_SESSION {
3088+ if event.State == zk.STATE_EXPIRED_SESSION {
3089 break
3090 }
3091 }
3092
3093 select {
3094 case event := <-watch:
3095- c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)
3096+ c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
3097 case <-time.After(3e9):
3098 c.Fatal("Watch event didn't fire")
3099 }
3100
3101 event = <-watch
3102- c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)
3103- c.Assert(event.State, Equals, gozk.STATE_CLOSED)
3104+ c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
3105+ c.Assert(event.State, Equals, zk.STATE_CLOSED)
3106
3107- c.Check(gozk.CountPendingWatches(), Equals, 1)
3108+ c.Check(zk.CountPendingWatches(), Equals, 1)
3109 }

Subscribers

People subscribed via source and target branches

to all changes: