Merge lp:~rogpeppe/gozk/update-documentation into lp:~juju/gozk/trunk

Proposed by Roger Peppe
Status: Rejected
Rejected by: Gustavo Niemeyer
Proposed branch: lp:~rogpeppe/gozk/update-documentation
Merge into: lp:~juju/gozk/trunk
Diff against target: 3213 lines (+1334/-814)
10 files modified
Makefile (+5/-2)
example/example.go (+22/-23)
reattach_test.go (+202/-0)
retry_test.go (+65/-74)
server.go (+239/-0)
service/Makefile (+20/-0)
service/service.go (+160/-0)
suite_test.go (+45/-122)
zk.go (+318/-341)
zk_test.go (+258/-252)
To merge this branch: bzr merge lp:~rogpeppe/gozk/update-documentation
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+77980@code.launchpad.net

Description of the change

Update documentation.

Add more complete description of event delivery and comments
on Stat field accessors.

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

This is a massive change unrelated to documentation.

Revision history for this message
Roger Peppe (rogpeppe) wrote :

this was a while ago. i didn't know how to handle the review system...

i think the intended changes are still worth making. i'll make a CL.
i could also use that proposal to change appropriate methods
to return a time.Time if that seems reasonable (e.g. Stat.CTime etc)

On 10 February 2012 12:27, Gustavo Niemeyer <email address hidden> wrote:
> The proposal to merge lp:~rogpeppe/gozk/update-documentation into lp:gozk has been updated.
>
>    Status: Needs review => Rejected
>
> For more details, see:
> https://code.launchpad.net/~rogpeppe/gozk/update-documentation/+merge/77980
> --
> https://code.launchpad.net/~rogpeppe/gozk/update-documentation/+merge/77980
> You are the owner of lp:~rogpeppe/gozk/update-documentation.

Unmerged revisions

24. By Roger Peppe

Update documentation to include more accurate
description of events and comments on stat field accessors.

23. By Roger Peppe

Factored out server running code into a separate package.

22. By Roger Peppe

Fix Server code.

21. By Gustavo Niemeyer

The package location is now http://launchpad.net/gozk/zk.

The shorter package name makes for significantly more digestible line lengths.

20. By Gustavo Niemeyer

Merged clean-up-interface branch, by Roger Peppe [r=niemeyer]

This branch does a number of incompatible changes that improve
the overall API of gozk.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile'
2--- Makefile 2011-08-03 01:56:58 +0000
3+++ Makefile 2011-10-03 17:02:23 +0000
4@@ -2,10 +2,13 @@
5
6 all: package
7
8-TARG=gozk
9+TARG=launchpad.net/gozk/zk
10
11+GOFILES=\
12+ server.go\
13+
14 CGOFILES=\
15- gozk.go\
16+ zk.go\
17
18 CGO_OFILES=\
19 helpers.o\
20
21=== added directory 'example'
22=== renamed file 'example.go' => 'example/example.go'
23--- example.go 2011-08-03 01:47:25 +0000
24+++ example/example.go 2011-10-03 17:02:23 +0000
25@@ -1,30 +1,29 @@
26 package main
27
28 import (
29- "gozk"
30+ "launchpad.net/zookeeper/zookeeper"
31 )
32
33 func main() {
34- zk, session, err := gozk.Init("localhost:2181", 5000)
35- if err != nil {
36- println("Couldn't connect: " + err.String())
37- return
38- }
39-
40- defer zk.Close()
41-
42- // Wait for connection.
43- event := <-session
44- if event.State != gozk.STATE_CONNECTED {
45- println("Couldn't connect")
46- return
47- }
48-
49- _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL))
50- if err != nil {
51- println(err.String())
52- } else {
53- println("Created!")
54- }
55+ zk, session, err := zookeeper.Init("localhost:2181", 5000)
56+ if err != nil {
57+ println("Couldn't connect: " + err.String())
58+ return
59+ }
60+
61+ defer zk.Close()
62+
63+ // Wait for connection.
64+ event := <-session
65+ if event.State != zookeeper.STATE_CONNECTED {
66+ println("Couldn't connect")
67+ return
68+ }
69+
70+ _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
71+ if err != nil {
72+ println(err.String())
73+ } else {
74+ println("Created!")
75+ }
76 }
77-
78
79=== added file 'reattach_test.go'
80--- reattach_test.go 1970-01-01 00:00:00 +0000
81+++ reattach_test.go 2011-10-03 17:02:23 +0000
82@@ -0,0 +1,202 @@
83+package zk_test
84+
85+import (
86+ "bufio"
87+ . "launchpad.net/gocheck"
88+ "launchpad.net/gozk/zk/service"
89+ "launchpad.net/gozk/zk"
90+ "exec"
91+ "flag"
92+ "fmt"
93+ "os"
94+ "strings"
95+ "testing"
96+ "time"
97+)
98+
99+var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing")
100+var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing")
101+var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing")
102+
103+// This is the reentrancy point for testing ZooKeeper servers
104+// started by processes that are not direct children of the
105+// testing process. This test always succeeds - the status
106+// will be written to stdout and read by indirectServer.
107+func TestStartNonChildServer(t *testing.T) {
108+ if !*reattach {
109+ // not re-entrant, so ignore this test.
110+ return
111+ }
112+ err := startServer(*reattachRunDir, *reattachAbnormalStop)
113+ if err != nil {
114+ fmt.Printf("error:%v\n", err)
115+ return
116+ }
117+ fmt.Printf("done\n")
118+}
119+
120+func (s *S) startServer(c *C, abort bool) {
121+ err := startServer(s.zkTestRoot, abort)
122+ c.Assert(err, IsNil)
123+}
124+
125+// startServerIndirect starts a ZooKeeper server that is not
126+// a direct child of the current process. If abort is true,
127+// the server will be started and then terminated abnormally.
128+func (s *S) startServerIndirect(c *C, abort bool) {
129+ if len(os.Args) == 0 {
130+ c.Fatal("Cannot find self executable name")
131+ }
132+ cmd := exec.Command(
133+ os.Args[0],
134+ "-zktest.reattach",
135+ "-zktest.rundir", s.zkTestRoot,
136+ "-zktest.stop=", fmt.Sprint(abort),
137+ "-test.run", "StartNonChildServer",
138+ )
139+ r, err := cmd.StdoutPipe()
140+ c.Assert(err, IsNil)
141+ defer r.Close()
142+ if err := cmd.Start(); err != nil {
143+ c.Fatalf("cannot start re-entrant gotest process: %v", err)
144+ }
145+ defer cmd.Wait()
146+ bio := bufio.NewReader(r)
147+ for {
148+ line, err := bio.ReadSlice('\n')
149+ if err != nil {
150+ c.Fatalf("indirect server status line not found: %v", err)
151+ }
152+ s := string(line)
153+ if strings.HasPrefix(s, "error:") {
154+ c.Fatalf("indirect server error: %s", s[len("error:"):])
155+ }
156+ if s == "done\n" {
157+ return
158+ }
159+ }
160+ panic("not reached")
161+}
162+
163+// startServer starts a ZooKeeper server, and terminates it abnormally
164+// if abort is true.
165+func startServer(runDir string, abort bool) os.Error {
166+ s, err := zk.AttachServer(runDir)
167+ if err != nil {
168+ return fmt.Errorf("cannot attach to server at %q: %v", runDir, err)
169+ }
170+ srv := service.New(s)
171+ if err := srv.Start(); err != nil {
172+ return fmt.Errorf("cannot start server: %v", err)
173+ }
174+ if abort {
175+ // Give it time to start up, then kill the server process abnormally,
176+ // leaving the pid.txt file behind.
177+ time.Sleep(0.5e9)
178+ p, err := srv.Process()
179+ if err != nil {
180+ return fmt.Errorf("cannot get server process: %v", err)
181+ }
182+ defer p.Release()
183+ if err := p.Kill(); err != nil {
184+ return fmt.Errorf("cannot kill server process: %v", err)
185+ }
186+ }
187+ return nil
188+}
189+
190+func (s *S) checkCookie(c *C) {
191+ conn, watch, err := zk.Dial(s.zkAddr, 5e9)
192+ c.Assert(err, IsNil)
193+
194+ e, ok := <-watch
195+ c.Assert(ok, Equals, true)
196+ c.Assert(e.Ok(), Equals, true)
197+
198+ c.Assert(err, IsNil)
199+ cookie, _, err := conn.Get("/testAttachCookie")
200+ c.Assert(err, IsNil)
201+ c.Assert(cookie, Equals, "testAttachCookie")
202+ conn.Close()
203+}
204+
205+// cases to test:
206+// child server, stopped normally; reattach, start
207+// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
208+// non-direct child server, still running; reattach, start (->error), stop, start
209+// child server, still running; reattach, start (-> error)
210+// child server, still running; reattach, stop, start.
211+// non-direct child server, still running; reattach, stop, start.
212+func (s *S) TestAttachServer(c *C) {
213+ // Create a cookie so that we know we are reattaching to the same instance.
214+ conn, _ := s.init(c)
215+ _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL))
216+ c.Assert(err, IsNil)
217+ s.checkCookie(c)
218+ s.zkServer.Stop()
219+ s.zkServer = nil
220+
221+ s.testAttachServer(c, (*S).startServer)
222+ s.testAttachServer(c, (*S).startServerIndirect)
223+ s.testAttachServerAbnormalTerminate(c, (*S).startServer)
224+ s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect)
225+
226+ srv, err := zk.AttachServer(s.zkTestRoot)
227+ c.Assert(err, IsNil)
228+
229+ s.zkServer = service.New(srv)
230+ err = s.zkServer.Start()
231+ c.Assert(err, IsNil)
232+
233+ conn, _ = s.init(c)
234+ err = conn.Delete("/testAttachCookie", -1)
235+ c.Assert(err, IsNil)
236+}
237+
238+func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) {
239+ start(s, c, false)
240+
241+ s.checkCookie(c)
242+
243+ // try attaching to it while it is still running - it should fail.
244+ a, err := zk.AttachServer(s.zkTestRoot)
245+ c.Assert(err, IsNil)
246+ srv := service.New(a)
247+
248+ err = srv.Start()
249+ c.Assert(err, NotNil)
250+
251+ // stop it and then start it again - it should succeed.
252+ err = srv.Stop()
253+ c.Assert(err, IsNil)
254+
255+ err = srv.Start()
256+ c.Assert(err, IsNil)
257+
258+ s.checkCookie(c)
259+
260+ err = srv.Stop()
261+ c.Assert(err, IsNil)
262+}
263+
264+func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) {
265+ start(s, c, true)
266+
267+ // try attaching to it and starting - it should fail, because pid.txt
268+ // won't have been removed.
269+ a, err := zk.AttachServer(s.zkTestRoot)
270+ c.Assert(err, IsNil)
271+ srv := service.New(a)
272+ err = srv.Start()
273+ c.Assert(err, NotNil)
274+
275+ // stopping it should bring things back to normal.
276+ err = srv.Stop()
277+ c.Assert(err, IsNil)
278+ err = srv.Start()
279+ c.Assert(err, IsNil)
280+
281+ s.checkCookie(c)
282+ err = srv.Stop()
283+ c.Assert(err, IsNil)
284+}
285\ No newline at end of file
286
287=== modified file 'retry_test.go'
288--- retry_test.go 2011-08-19 01:51:37 +0000
289+++ retry_test.go 2011-10-03 17:02:23 +0000
290@@ -1,42 +1,41 @@
291-package gozk_test
292+package zk_test
293
294 import (
295 . "launchpad.net/gocheck"
296- "gozk"
297+ "launchpad.net/gozk/zk"
298 "os"
299 )
300
301 func (s *S) TestRetryChangeCreating(c *C) {
302- zk, _ := s.init(c)
303+ conn, _ := s.init(c)
304
305- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
306- func(data string, stat gozk.Stat) (string, os.Error) {
307+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
308+ func(data string, stat *zk.Stat) (string, os.Error) {
309 c.Assert(data, Equals, "")
310 c.Assert(stat, IsNil)
311 return "new", nil
312 })
313 c.Assert(err, IsNil)
314
315- data, stat, err := zk.Get("/test")
316+ data, stat, err := conn.Get("/test")
317 c.Assert(err, IsNil)
318 c.Assert(stat, NotNil)
319 c.Assert(stat.Version(), Equals, int32(0))
320 c.Assert(data, Equals, "new")
321
322- acl, _, err := zk.ACL("/test")
323+ acl, _, err := conn.ACL("/test")
324 c.Assert(err, IsNil)
325- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
326+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
327 }
328
329 func (s *S) TestRetryChangeSetting(c *C) {
330- zk, _ := s.init(c)
331+ conn, _ := s.init(c)
332
333- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
334- gozk.WorldACL(gozk.PERM_ALL))
335+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
336 c.Assert(err, IsNil)
337
338- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},
339- func(data string, stat gozk.Stat) (string, os.Error) {
340+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
341+ func(data string, stat *zk.Stat) (string, os.Error) {
342 c.Assert(data, Equals, "old")
343 c.Assert(stat, NotNil)
344 c.Assert(stat.Version(), Equals, int32(0))
345@@ -44,27 +43,26 @@
346 })
347 c.Assert(err, IsNil)
348
349- data, stat, err := zk.Get("/test")
350+ data, stat, err := conn.Get("/test")
351 c.Assert(err, IsNil)
352 c.Assert(stat, NotNil)
353 c.Assert(stat.Version(), Equals, int32(1))
354 c.Assert(data, Equals, "brand new")
355
356 // ACL was unchanged by RetryChange().
357- acl, _, err := zk.ACL("/test")
358+ acl, _, err := conn.ACL("/test")
359 c.Assert(err, IsNil)
360- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
361+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
362 }
363
364 func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
365- zk, _ := s.init(c)
366+ conn, _ := s.init(c)
367
368- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
369- gozk.WorldACL(gozk.PERM_ALL))
370+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
371 c.Assert(err, IsNil)
372
373- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{},
374- func(data string, stat gozk.Stat) (string, os.Error) {
375+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{},
376+ func(data string, stat *zk.Stat) (string, os.Error) {
377 c.Assert(data, Equals, "old")
378 c.Assert(stat, NotNil)
379 c.Assert(stat.Version(), Equals, int32(0))
380@@ -72,7 +70,7 @@
381 })
382 c.Assert(err, IsNil)
383
384- data, stat, err := zk.Get("/test")
385+ data, stat, err := conn.Get("/test")
386 c.Assert(err, IsNil)
387 c.Assert(stat, NotNil)
388 c.Assert(stat.Version(), Equals, int32(0)) // Unchanged!
389@@ -80,14 +78,14 @@
390 }
391
392 func (s *S) TestRetryChangeConflictOnCreate(c *C) {
393- zk, _ := s.init(c)
394+ conn, _ := s.init(c)
395
396- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
397+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
398 switch data {
399 case "":
400 c.Assert(stat, IsNil)
401- _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL,
402- gozk.WorldACL(gozk.PERM_ALL))
403+ _, err := conn.Create("/test", "conflict", zk.EPHEMERAL,
404+ zk.WorldACL(zk.PERM_ALL))
405 c.Assert(err, IsNil)
406 return "<none> => conflict", nil
407 case "conflict":
408@@ -100,11 +98,10 @@
409 return "can't happen", nil
410 }
411
412- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
413- changeFunc)
414+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc)
415 c.Assert(err, IsNil)
416
417- data, stat, err := zk.Get("/test")
418+ data, stat, err := conn.Get("/test")
419 c.Assert(err, IsNil)
420 c.Assert(data, Equals, "conflict => new")
421 c.Assert(stat, NotNil)
422@@ -112,18 +109,17 @@
423 }
424
425 func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) {
426- zk, _ := s.init(c)
427+ conn, _ := s.init(c)
428
429- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
430- gozk.WorldACL(gozk.PERM_ALL))
431+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
432 c.Assert(err, IsNil)
433
434- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
435+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
436 switch data {
437 case "old":
438 c.Assert(stat, NotNil)
439 c.Assert(stat.Version(), Equals, int32(0))
440- _, err := zk.Set("/test", "conflict", 0)
441+ _, err := conn.Set("/test", "conflict", 0)
442 c.Assert(err, IsNil)
443 return "old => new", nil
444 case "conflict":
445@@ -136,10 +132,10 @@
446 return "can't happen", nil
447 }
448
449- err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc)
450+ err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc)
451 c.Assert(err, IsNil)
452
453- data, stat, err := zk.Get("/test")
454+ data, stat, err := conn.Get("/test")
455 c.Assert(err, IsNil)
456 c.Assert(data, Equals, "conflict => new")
457 c.Assert(stat, NotNil)
458@@ -147,18 +143,17 @@
459 }
460
461 func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) {
462- zk, _ := s.init(c)
463+ conn, _ := s.init(c)
464
465- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
466- gozk.WorldACL(gozk.PERM_ALL))
467+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
468 c.Assert(err, IsNil)
469
470- changeFunc := func(data string, stat gozk.Stat) (string, os.Error) {
471+ changeFunc := func(data string, stat *zk.Stat) (string, os.Error) {
472 switch data {
473 case "old":
474 c.Assert(stat, NotNil)
475 c.Assert(stat.Version(), Equals, int32(0))
476- err := zk.Delete("/test", 0)
477+ err := conn.Delete("/test", 0)
478 c.Assert(err, IsNil)
479 return "old => <deleted>", nil
480 case "":
481@@ -170,55 +165,53 @@
482 return "can't happen", nil
483 }
484
485- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ),
486- changeFunc)
487+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc)
488 c.Assert(err, IsNil)
489
490- data, stat, err := zk.Get("/test")
491+ data, stat, err := conn.Get("/test")
492 c.Assert(err, IsNil)
493 c.Assert(data, Equals, "<deleted> => new")
494 c.Assert(stat, NotNil)
495 c.Assert(stat.Version(), Equals, int32(0))
496
497 // Should be the new ACL.
498- acl, _, err := zk.ACL("/test")
499+ acl, _, err := conn.ACL("/test")
500 c.Assert(err, IsNil)
501- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))
502+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
503 }
504
505 func (s *S) TestRetryChangeErrorInCallback(c *C) {
506- zk, _ := s.init(c)
507+ conn, _ := s.init(c)
508
509- err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
510- func(data string, stat gozk.Stat) (string, os.Error) {
511+ err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
512+ func(data string, stat *zk.Stat) (string, os.Error) {
513 return "don't use this", os.NewError("BOOM!")
514 })
515 c.Assert(err, NotNil)
516- c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR)
517 c.Assert(err.String(), Equals, "BOOM!")
518
519- stat, err := zk.Exists("/test")
520+ stat, err := conn.Exists("/test")
521 c.Assert(err, IsNil)
522 c.Assert(stat, IsNil)
523 }
524
525 func (s *S) TestRetryChangeFailsReading(c *C) {
526- zk, _ := s.init(c)
527+ conn, _ := s.init(c)
528
529- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
530- gozk.WorldACL(gozk.PERM_WRITE)) // Write only!
531+ // Write only!
532+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE))
533 c.Assert(err, IsNil)
534
535 var called bool
536- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
537- func(data string, stat gozk.Stat) (string, os.Error) {
538+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
539+ func(data string, stat *zk.Stat) (string, os.Error) {
540 called = true
541 return "", nil
542 })
543 c.Assert(err, NotNil)
544- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
545+ c.Assert(err, Equals, zk.ZNOAUTH)
546
547- stat, err := zk.Exists("/test")
548+ stat, err := conn.Exists("/test")
549 c.Assert(err, IsNil)
550 c.Assert(stat, NotNil)
551 c.Assert(stat.Version(), Equals, int32(0))
552@@ -227,22 +220,21 @@
553 }
554
555 func (s *S) TestRetryChangeFailsSetting(c *C) {
556- zk, _ := s.init(c)
557+ conn, _ := s.init(c)
558
559- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
560- gozk.WorldACL(gozk.PERM_READ)) // Read only!
561+ // Read only!
562+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
563 c.Assert(err, IsNil)
564
565 var called bool
566- err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL),
567- func(data string, stat gozk.Stat) (string, os.Error) {
568+ err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
569+ func(data string, stat *zk.Stat) (string, os.Error) {
570 called = true
571 return "", nil
572 })
573- c.Assert(err, NotNil)
574- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
575+ c.Assert(err, Equals, zk.ZNOAUTH)
576
577- stat, err := zk.Exists("/test")
578+ stat, err := conn.Exists("/test")
579 c.Assert(err, IsNil)
580 c.Assert(stat, NotNil)
581 c.Assert(stat.Version(), Equals, int32(0))
582@@ -251,23 +243,22 @@
583 }
584
585 func (s *S) TestRetryChangeFailsCreating(c *C) {
586- zk, _ := s.init(c)
587+ conn, _ := s.init(c)
588
589- _, err := zk.Create("/test", "old", gozk.EPHEMERAL,
590- gozk.WorldACL(gozk.PERM_READ)) // Read only!
591+ // Read only!
592+ _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ))
593 c.Assert(err, IsNil)
594
595 var called bool
596- err = zk.RetryChange("/test/sub", gozk.EPHEMERAL,
597- gozk.WorldACL(gozk.PERM_ALL),
598- func(data string, stat gozk.Stat) (string, os.Error) {
599+ err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL),
600+ func(data string, stat *zk.Stat) (string, os.Error) {
601 called = true
602 return "", nil
603 })
604 c.Assert(err, NotNil)
605- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
606+ c.Assert(err, Equals, zk.ZNOAUTH)
607
608- stat, err := zk.Exists("/test/sub")
609+ stat, err := conn.Exists("/test/sub")
610 c.Assert(err, IsNil)
611 c.Assert(stat, IsNil)
612
613
614=== added file 'server.go'
615--- server.go 1970-01-01 00:00:00 +0000
616+++ server.go 2011-10-03 17:02:23 +0000
617@@ -0,0 +1,239 @@
618+package zk
619+
620+import (
621+ "bufio"
622+ "bytes"
623+ "fmt"
624+ "io/ioutil"
625+ "net"
626+ "os"
627+ "path/filepath"
628+ "strings"
629+)
630+
631+// Server represents a ZooKeeper server, its data and configuration files.
632+type Server struct {
633+ runDir string
634+ installDir string
635+}
636+
637+func (srv *Server) Directory() string {
638+ return srv.runDir
639+}
640+
641+// CreateServer creates the directory runDir and sets up a ZooKeeper server
642+// environment inside it. It is an error if runDir already exists.
643+// The server will listen on the specified TCP port.
644+//
645+// The ZooKeeper installation directory is specified by installDir.
646+// If this is empty, a system default will be used.
647+//
648+// CreateServer does not start the server - the *Server that
649+// is returned can be used with the service package, for example:
650+// see launchpad.net/gozk/zk/service.
651+func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
652+ if err := os.Mkdir(runDir, 0777); err != nil {
653+ return nil, err
654+ }
655+ srv := &Server{runDir: runDir, installDir: installDir}
656+ if err := srv.writeLog4JConfig(); err != nil {
657+ return nil, err
658+ }
659+ if err := srv.writeZooKeeperConfig(port); err != nil {
660+ return nil, err
661+ }
662+ if err := srv.writeInstallDir(); err != nil {
663+ return nil, err
664+ }
665+ return srv, nil
666+}
667+
668+// AttachServer creates a new ZooKeeper Server instance
669+// to operate inside an existing run directory, runDir.
670+// The directory must have been created with CreateServer.
671+func AttachServer(runDir string) (*Server, os.Error) {
672+ srv := &Server{runDir: runDir}
673+ if err := srv.readInstallDir(); err != nil {
674+ return nil, fmt.Errorf("cannot read server install directory: %v", err)
675+ }
676+ return srv, nil
677+}
678+
679+func (srv *Server) path(name string) string {
680+ return filepath.Join(srv.runDir, name)
681+}
682+
683+func (srv *Server) CheckAvailability() os.Error {
684+ port, err := srv.NetworkPort()
685+ if err != nil {
686+ return fmt.Errorf("cannot get network port: %v", err)
687+ }
688+ l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
689+ if err != nil {
690+ return fmt.Errorf("cannot listen on port %v: %v", port, err)
691+ }
692+ l.Close()
693+ return nil
694+}
695+
696+// ServerCommand returns the command used to start the
697+// ZooKeeper server. It is provided for debugging and testing
698+// purposes only.
699+func (srv *Server) Command() ([]string, os.Error) {
700+ cp, err := srv.classPath()
701+ if err != nil {
702+ return nil, fmt.Errorf("cannot get class path: %v", err)
703+ }
704+ return []string{
705+ "java",
706+ "-cp", strings.Join(cp, ":"),
707+ "-Dzookeeper.root.logger=INFO,CONSOLE",
708+ "-Dlog4j.configuration=file:"+srv.path("log4j.properties"),
709+ "org.apache.zookeeper.server.quorum.QuorumPeerMain",
710+ srv.path("zoo.cfg"),
711+ }, nil
712+}
713+
714+var log4jProperties = `
715+log4j.rootLogger=INFO, CONSOLE
716+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
717+log4j.appender.CONSOLE.Threshold=INFO
718+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
719+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
720+`
721+
722+func (srv *Server) writeLog4JConfig() (err os.Error) {
723+ return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666)
724+}
725+
726+func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) {
727+ return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf(
728+ "tickTime=2000\n"+
729+ "dataDir=%s\n"+
730+ "clientPort=%d\n"+
731+ "maxClientCnxns=500\n",
732+ srv.runDir, port)), 0666)
733+}
734+
735+// NetworkPort returns the TCP port number that
736+// the server is configured for.
737+func (srv *Server) NetworkPort() (int, os.Error) {
738+ f, err := os.Open(srv.path("zoo.cfg"))
739+ if err != nil {
740+ return 0, err
741+ }
742+ r := bufio.NewReader(f)
743+ for {
744+ line, err := r.ReadSlice('\n')
745+ if err != nil {
746+ return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg"))
747+ }
748+ var port int
749+ if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 {
750+ return port, nil
751+ }
752+ }
753+ panic("not reached")
754+}
755+
756+func (srv *Server) writeInstallDir() os.Error {
757+ return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666)
758+}
759+
760+func (srv *Server) readInstallDir() os.Error {
761+ data, err := ioutil.ReadFile(srv.path("installdir.txt"))
762+ if err != nil {
763+ return err
764+ }
765+ if data[len(data)-1] == '\n' {
766+ data = data[0 : len(data)-1]
767+ }
768+ srv.installDir = string(data)
769+ return nil
770+}
771+
772+func (srv *Server) classPath() ([]string, os.Error) {
773+ dir := srv.installDir
774+ if dir == "" {
775+ return systemClassPath()
776+ }
777+ if err := checkDirectory(dir); err != nil {
778+ return nil, err
779+ }
780+ // Two possibilities, as seen in zkEnv.sh:
781+ // 1) locally built binaries (jars are in build directory)
782+ // 2) release binaries
783+ if build := filepath.Join(dir, "build"); checkDirectory(build) == nil {
784+ dir = build
785+ }
786+ classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar"))
787+ if err != nil {
788+ panic(fmt.Errorf("glob for jar files: %v", err))
789+ }
790+ more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar"))
791+ if err != nil {
792+ panic(fmt.Errorf("glob for lib jar files: %v", err))
793+ }
794+
795+ classPath = append(classPath, more...)
796+ if len(classPath) == 0 {
797+ return nil, fmt.Errorf("zookeeper libraries not found in %q", dir)
798+ }
799+ return classPath, nil
800+}
801+
802+const zookeeperEnviron = "/etc/zookeeper/conf/environment"
803+
804+func systemClassPath() ([]string, os.Error) {
805+ f, err := os.Open(zookeeperEnviron)
806+ if f == nil {
807+ return nil, err
808+ }
809+ r := bufio.NewReader(f)
810+ for {
811+ line, err := r.ReadSlice('\n')
812+ if err != nil {
813+ break
814+ }
815+ if !bytes.HasPrefix(line, []byte("CLASSPATH=")) {
816+ continue
817+ }
818+
819+ // remove variable and newline
820+ path := string(line[len("CLASSPATH=") : len(line)-1])
821+
822+ // trim white space
823+ path = strings.Trim(path, " \t\r")
824+
825+ // strip quotes
826+ if path[0] == '"' {
827+ path = path[1 : len(path)-1]
828+ }
829+
830+ // split on :
831+ classPath := strings.Split(path, ":")
832+
833+ // split off $ZOOCFGDIR
834+ if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" {
835+ classPath = classPath[1:]
836+ }
837+
838+ if len(classPath) == 0 {
839+ return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron)
840+ }
841+ return classPath, nil
842+ }
843+ return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron)
844+}
845+
846+// checkDirectory returns an error if the given path
847+// does not exist or is not a directory.
848+func checkDirectory(path string) os.Error {
849+ if info, err := os.Stat(path); err != nil || !info.IsDirectory() {
850+ if err == nil {
851+ err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")}
852+ }
853+ return err
854+ }
855+ return nil
856+}
857
858=== added directory 'service'
859=== added file 'service/Makefile'
860--- service/Makefile 1970-01-01 00:00:00 +0000
861+++ service/Makefile 2011-10-03 17:02:23 +0000
862@@ -0,0 +1,20 @@
863+include $(GOROOT)/src/Make.inc
864+
865+TARG=launchpad.net/gozk/zk/service
866+
867+GOFILES=\
868+ service.go\
869+
870+GOFMT=gofmt
871+BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go))
872+
873+gofmt: $(BADFMT)
874+ @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done
875+
876+ifneq ($(BADFMT),)
877+ifneq ($(MAKECMDGOALS),gofmt)
878+$(warning WARNING: make gofmt: $(BADFMT))
879+endif
880+endif
881+
882+include $(GOROOT)/src/Make.pkg
883
884=== added file 'service/service.go'
885--- service/service.go 1970-01-01 00:00:00 +0000
886+++ service/service.go 2011-10-03 17:02:23 +0000
887@@ -0,0 +1,160 @@
888+// The service package provides support for long-running services.
889+package service
890+
891+import (
892+ "os"
893+ "fmt"
894+ "io/ioutil"
895+ "strconv"
896+ "path/filepath"
897+ "exec"
898+ "strings"
899+ "time"
900+)
901+
902+// Interface represents a single-process server.
903+type Interface interface {
904+ // Directory gives the path to the directory containing
905+ // the server's configuration and data files. It is assumed that
906+ // this exists and can be modified.
907+ Directory() string
908+
909+ // CheckAvailability should return an error if resources
910+ // required by the server are not available; for instance
911+ // if the server requires a network port which is not free.
912+ CheckAvailability() os.Error
913+
914+ // Command should return a command that can be
915+ // used to run the server. The first element of the returned
916+ // slice is the executable name; the rest of the elements are
917+ // its arguments.
918+ Command() ([]string, os.Error)
919+}
920+
921+// Service represents a possibly running server process.
922+type Service struct {
923+ srv Interface
924+}
925+
926+// New returns a new Service using the given
927+// server interface.
928+func New(srv Interface) *Service {
929+ return &Service{srv}
930+}
931+
932+// Process returns a reference to the identity
933+// of the last running server in the Service's directory.
934+// If the server was shut down, it returns (nil, nil).
935+// The server process may not still be running
936+// (for instance if it was terminated abnormally).
937+// This function is provided for debugging and testing
938+// purposes only.
939+func (s *Service) Process() (*os.Process, os.Error) {
940+ data, err := ioutil.ReadFile(s.path("pid.txt"))
941+ if err != nil {
942+ if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT {
943+ return nil, nil
944+ }
945+ return nil, err
946+ }
947+ pid, err := strconv.Atoi(string(data))
948+ if err != nil {
949+ return nil, os.NewError("bad process id found in pid.txt")
950+ }
951+ return os.FindProcess(pid)
952+}
953+
954+func (s *Service) path(name string) string {
955+ return filepath.Join(s.srv.Directory(), name)
956+}
957+
958+// Start starts the server. It returns an error if the server is already running.
959+// It stores the process ID of the server in the file "pid.txt"
960+// inside the server's directory. Stdout and stderr of the server
961+// are similarly written to "log.txt".
962+func (s *Service) Start() os.Error {
963+ if err := s.srv.CheckAvailability(); err != nil {
964+ return err
965+ }
966+ p, err := s.Process()
967+ if p != nil || err != nil {
968+ if p != nil {
969+ p.Release()
970+ }
971+ return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt"))
972+ }
973+
974+ // create the pid file before starting the process so that if we get two
975+ // programs trying to concurrently start a server on the same directory
976+ // at the same time, only one should succeed.
977+ pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666)
978+ if err != nil {
979+ return fmt.Errorf("cannot create pid.txt: %v", err)
980+ }
981+ defer pidf.Close()
982+
983+ args, err := s.srv.Command()
984+ if err != nil {
985+ return fmt.Errorf("cannot determine command: %v", err)
986+ }
987+ cmd := exec.Command(args[0], args[1:]...)
988+
989+ logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
990+ if err != nil {
991+ return fmt.Errorf("cannot create log file: %v", err)
992+ }
993+ defer logf.Close()
994+ cmd.Stdout = logf
995+ cmd.Stderr = logf
996+ if err := cmd.Start(); err != nil {
997+ return fmt.Errorf("cannot start server: %v", err)
998+ }
999+ if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil {
1000+ return fmt.Errorf("cannot write pid file: %v", err)
1001+ }
1002+ return nil
1003+}
1004+
1005+// Stop kills the server. It is a no-op if it is already running.
1006+func (s *Service) Stop() os.Error {
1007+ p, err := s.Process()
1008+ if p == nil {
1009+ if err != nil {
1010+ return fmt.Errorf("cannot read process ID of server: %v", err)
1011+ }
1012+ return nil
1013+ }
1014+ defer p.Release()
1015+ if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") {
1016+ return fmt.Errorf("cannot kill server process: %v", err)
1017+ }
1018+ // ignore the error returned from Wait because there's little
1019+ // we can do about it - it either means that the process has just exited
1020+ // anyway or that we can't wait for it for some other reason,
1021+ // for example because it was originally started by some other process.
1022+ _, err = p.Wait(0)
1023+ if err != nil && strings.Contains(err.String(), "no child processes") {
1024+ // If we can't wait for the server, it's possible that it was running
1025+ // but not as a child of this process, so give it a little while
1026+ // to exit. TODO poll with kill(no signal)?
1027+ time.Sleep(0.5e9)
1028+ }
1029+
1030+ if err := os.Remove(s.path("pid.txt")); err != nil {
1031+ return fmt.Errorf("cannot remove server process ID file: %v", err)
1032+ }
1033+ return nil
1034+}
1035+
1036+// Destroy stops the server, and then removes its
1037+// directory and all its contents.
1038+// Warning: this will destroy all data associated with the server.
1039+func (s *Service) Destroy() os.Error {
1040+ if err := s.Stop(); err != nil {
1041+ return err
1042+ }
1043+ if err := os.RemoveAll(s.srv.Directory()); err != nil {
1044+ return err
1045+ }
1046+ return nil
1047+}
1048
1049=== modified file 'suite_test.go'
1050--- suite_test.go 2011-08-19 01:43:37 +0000
1051+++ suite_test.go 2011-10-03 17:02:23 +0000
1052@@ -1,65 +1,48 @@
1053-package gozk_test
1054+package zk_test
1055
1056 import (
1057 . "launchpad.net/gocheck"
1058- "testing"
1059- "io/ioutil"
1060- "path"
1061 "fmt"
1062+ "launchpad.net/gozk/zk/service"
1063+ "launchpad.net/gozk/zk"
1064 "os"
1065- "gozk"
1066+ "testing"
1067 "time"
1068 )
1069
1070 func TestAll(t *testing.T) {
1071- TestingT(t)
1072+ if !*reattach {
1073+ TestingT(t)
1074+ }
1075 }
1076
1077 var _ = Suite(&S{})
1078
1079 type S struct {
1080- zkRoot string
1081- zkTestRoot string
1082- zkTestPort int
1083- zkServerSh string
1084- zkServerOut *os.File
1085- zkAddr string
1086+ zkServer *service.Service
1087+ zkTestRoot string
1088+ zkAddr string
1089
1090- handles []*gozk.ZooKeeper
1091- events []*gozk.Event
1092+ handles []*zk.Conn
1093+ events []*zk.Event
1094 liveWatches int
1095 deadWatches chan bool
1096 }
1097
1098-var logLevel = 0 //gozk.LOG_ERROR
1099-
1100-
1101-var testZooCfg = ("dataDir=%s\n" +
1102- "clientPort=%d\n" +
1103- "tickTime=2000\n" +
1104- "initLimit=10\n" +
1105- "syncLimit=5\n" +
1106- "")
1107-
1108-var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" +
1109- "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" +
1110- "log4j.appender.CONSOLE.Threshold=DEBUG\n" +
1111- "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" +
1112- "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" +
1113- "")
1114-
1115-func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) {
1116- zk, watch, err := gozk.Init(s.zkAddr, 5e9)
1117+var logLevel = 0 //zk.LOG_ERROR
1118+
1119+func (s *S) init(c *C) (*zk.Conn, chan zk.Event) {
1120+ conn, watch, err := zk.Dial(s.zkAddr, 5e9)
1121 c.Assert(err, IsNil)
1122
1123- s.handles = append(s.handles, zk)
1124+ s.handles = append(s.handles, conn)
1125
1126 event := <-watch
1127
1128- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
1129- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
1130+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
1131+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
1132
1133- bufferedWatch := make(chan gozk.Event, 256)
1134+ bufferedWatch := make(chan zk.Event, 256)
1135 bufferedWatch <- event
1136
1137 s.liveWatches += 1
1138@@ -82,13 +65,13 @@
1139 s.deadWatches <- true
1140 }()
1141
1142- return zk, bufferedWatch
1143+ return conn, bufferedWatch
1144 }
1145
1146 func (s *S) SetUpTest(c *C) {
1147- c.Assert(gozk.CountPendingWatches(), Equals, 0,
1148+ c.Assert(zk.CountPendingWatches(), Equals, 0,
1149 Bug("Test got a dirty watch state before running!"))
1150- gozk.SetLogLevel(logLevel)
1151+ zk.SetLogLevel(logLevel)
1152 }
1153
1154 func (s *S) TearDownTest(c *C) {
1155@@ -108,98 +91,38 @@
1156 }
1157
1158 // Reset the list of handles.
1159- s.handles = make([]*gozk.ZooKeeper, 0)
1160+ s.handles = make([]*zk.Conn, 0)
1161
1162- c.Assert(gozk.CountPendingWatches(), Equals, 0,
1163+ c.Assert(zk.CountPendingWatches(), Equals, 0,
1164 Bug("Test left live watches behind!"))
1165 }
1166
1167-// We use the suite set up and tear down to manage a custom zookeeper
1168+// We use the suite set up and tear down to manage a custom ZooKeeper
1169 //
1170 func (s *S) SetUpSuite(c *C) {
1171-
1172+ var err os.Error
1173 s.deadWatches = make(chan bool)
1174
1175- var err os.Error
1176-
1177- s.zkRoot = os.Getenv("ZKROOT")
1178- if s.zkRoot == "" {
1179- panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)")
1180- }
1181-
1182- s.zkTestRoot = c.MkDir()
1183- s.zkTestPort = 21812
1184-
1185- println("ZooKeeper test server directory:", s.zkTestRoot)
1186- println("ZooKeeper test server port:", s.zkTestPort)
1187-
1188- s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort)
1189-
1190- s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh")
1191- s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"),
1192- os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
1193- if err != nil {
1194- panic("Can't open stdout.txt file for server: " + err.String())
1195- }
1196-
1197- dataDir := path.Join(s.zkTestRoot, "data")
1198- confDir := path.Join(s.zkTestRoot, "conf")
1199-
1200- os.Mkdir(dataDir, 0755)
1201- os.Mkdir(confDir, 0755)
1202-
1203- err = os.Setenv("ZOOCFGDIR", confDir)
1204- if err != nil {
1205- panic("Can't set $ZOOCFGDIR: " + err.String())
1206- }
1207-
1208- zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort))
1209- err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644)
1210- if err != nil {
1211- panic("Can't write zoo.cfg: " + err.String())
1212- }
1213-
1214- log4jPrp := []byte(testLog4jPrp)
1215- err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644)
1216- if err != nil {
1217- panic("Can't write log4j.properties: " + err.String())
1218- }
1219-
1220- s.StartZK()
1221+ // N.B. We meed to create a subdirectory because zk.CreateServer
1222+ // insists on creating its own directory.
1223+ s.zkTestRoot = c.MkDir() + "/zk"
1224+
1225+ port := 21812
1226+ s.zkAddr = fmt.Sprint("localhost:", port)
1227+
1228+ srv, err := zk.CreateServer(port, s.zkTestRoot, "")
1229+ if err != nil {
1230+ c.Fatal("Cannot set up server environment: ", err)
1231+ }
1232+ s.zkServer = service.New(srv)
1233+ err = s.zkServer.Start()
1234+ if err != nil {
1235+ c.Fatal("Cannot start ZooKeeper server: ", err)
1236+ }
1237 }
1238
1239 func (s *S) TearDownSuite(c *C) {
1240- s.StopZK()
1241- s.zkServerOut.Close()
1242-}
1243-
1244-func (s *S) StartZK() {
1245- attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
1246- proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr)
1247- if err != nil {
1248- panic("Problem executing zkServer.sh start: " + err.String())
1249- }
1250-
1251- result, err := proc.Wait(0)
1252- if err != nil {
1253- panic(err.String())
1254- } else if result.ExitStatus() != 0 {
1255- panic("'zkServer.sh start' exited with non-zero status")
1256- }
1257-}
1258-
1259-func (s *S) StopZK() {
1260- attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}}
1261- proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr)
1262- if err != nil {
1263- panic("Problem executing zkServer.sh stop: " + err.String() +
1264- " (look for runaway java processes!)")
1265- }
1266- result, err := proc.Wait(0)
1267- if err != nil {
1268- panic(err.String())
1269- } else if result.ExitStatus() != 0 {
1270- panic("'zkServer.sh stop' exited with non-zero status " +
1271- "(look for runaway java processes!)")
1272+ if s.zkServer != nil {
1273+ s.zkServer.Stop() // TODO Destroy
1274 }
1275 }
1276
1277=== renamed file 'gozk.go' => 'zk.go'
1278--- gozk.go 2011-08-19 01:56:39 +0000
1279+++ zk.go 2011-10-03 17:02:23 +0000
1280@@ -1,4 +1,4 @@
1281-// gozk - Zookeeper support for the Go language
1282+// gozk - ZooKeeper support for the Go language
1283 //
1284 // https://wiki.ubuntu.com/gozk
1285 //
1286@@ -6,7 +6,7 @@
1287 //
1288 // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com>
1289 //
1290-package gozk
1291+package zk
1292
1293 /*
1294 #cgo CFLAGS: -I/usr/include/c-client-src
1295@@ -27,17 +27,16 @@
1296 // -----------------------------------------------------------------------
1297 // Main constants and data types.
1298
1299-// The main ZooKeeper object, created through the Init function.
1300-// Encapsulates all communication with ZooKeeper.
1301-type ZooKeeper struct {
1302+// Conn represents a connection to a set of ZooKeeper nodes.
1303+type Conn struct {
1304 watchChannels map[uintptr]chan Event
1305 sessionWatchId uintptr
1306 handle *C.zhandle_t
1307 mutex sync.Mutex
1308 }
1309
1310-// ClientId represents the established session in ZooKeeper. This is only
1311-// useful to be passed back into the ReInit function.
1312+// ClientId represents an established ZooKeeper session. It can be
1313+// passed into Redial to reestablish a connection to an existing session.
1314 type ClientId struct {
1315 cId C.clientid_t
1316 }
1317@@ -59,25 +58,21 @@
1318 // through one of the W-suffixed functions (GetW, ExistsW, etc).
1319 //
1320 // The session channel will only receive session-level events notifying
1321-// about critical and transient changes in the ZooKeeper connection
1322-// state (STATE_CONNECTED, STATE_EXPIRED_SESSION, etc). On long
1323-// running applications the session channel must *necessarily* be
1324-// observed since certain events like session expirations require an
1325-// explicit reconnection and reestablishment of state (or bailing out).
1326-// Because of that, the buffer used on the session channel has a limited
1327-// size, and a panic will occur if too many events are not collected.
1328+// about changes in the ZooKeeper connection state (STATE_CONNECTED,
1329+// STATE_EXPIRED_SESSION, etc). On long running applications the session
1330+// channel must *necessarily* be observed since certain events like session
1331+// expirations require an explicit reconnection and reestablishment of
1332+// state (or bailing out). Because of that, the buffer used on the session
1333+// channel has a limited size, and a panic will occur if too many events
1334+// are not collected.
1335 //
1336 // Watch channels enable monitoring state for nodes, and the
1337 // moment they're fired depends on which function was called to
1338-// create them. Note that, unlike in other ZooKeeper interfaces,
1339-// gozk will NOT dispatch unimportant session events such as
1340-// STATE_ASSOCIATING, STATE_CONNECTING and STATE_CONNECTED to
1341-// watch Event channels, since they are transient and disruptive
1342-// to the workflow. Critical state changes such as expirations
1343-// are still delivered to all event channels, though, and the
1344-// transient events may be obsererved in the session channel.
1345+// create them. Two critical session events, STATE_EXPIRED_SESSION
1346+// and STATE_EXPIRED_SESSION, will be delivered to all
1347+// node watch channels if they occur.
1348 //
1349-// Since every watch channel may receive critical session events, events
1350+// Since every watch channel may receive these critical session events, events
1351 // received must not be handled blindly as if the watch requested has
1352 // been fired. To facilitate such tests, Events offer the Ok method,
1353 // and they also have a good String method so they may be used as an
1354@@ -89,44 +84,78 @@
1355 // return
1356 // }
1357 //
1358-// Note that closed channels will deliver zeroed Event, which means
1359+// Note that closed channels will deliver a zeroed Event, which means
1360 // event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED,
1361 // to facilitate handling.
1362 type Event struct {
1363- Type int
1364- Path string
1365+ // Type gives the type of event (one of the EVENT_* constants).
1366+ // If Type is EVENT_SESSION, then the events is a session
1367+ // event.
1368+ Type int
1369+
1370+ // For non-session events, Path gives the path of the node
1371+ // that was being watched.
1372+ Path string
1373+
1374+ // For session events, State (one of the STATE* constants) gives the session
1375+ // status.
1376 State int
1377 }
1378
1379-// Error codes that may be used to verify the result of the
1380-// Code method from Error.
1381+// Error represents a ZooKeeper error.
1382+type Error int
1383+
1384 const (
1385- ZOK = C.ZOK
1386- ZSYSTEMERROR = C.ZSYSTEMERROR
1387- ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY
1388- ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY
1389- ZCONNECTIONLOSS = C.ZCONNECTIONLOSS
1390- ZMARSHALLINGERROR = C.ZMARSHALLINGERROR
1391- ZUNIMPLEMENTED = C.ZUNIMPLEMENTED
1392- ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT
1393- ZBADARGUMENTS = C.ZBADARGUMENTS
1394- ZINVALIDSTATE = C.ZINVALIDSTATE
1395- ZAPIERROR = C.ZAPIERROR
1396- ZNONODE = C.ZNONODE
1397- ZNOAUTH = C.ZNOAUTH
1398- ZBADVERSION = C.ZBADVERSION
1399- ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS
1400- ZNODEEXISTS = C.ZNODEEXISTS
1401- ZNOTEMPTY = C.ZNOTEMPTY
1402- ZSESSIONEXPIRED = C.ZSESSIONEXPIRED
1403- ZINVALIDCALLBACK = C.ZINVALIDCALLBACK
1404- ZINVALIDACL = C.ZINVALIDACL
1405- ZAUTHFAILED = C.ZAUTHFAILED
1406- ZCLOSING = C.ZCLOSING
1407- ZNOTHING = C.ZNOTHING
1408- ZSESSIONMOVED = C.ZSESSIONMOVED
1409+ ZOK Error = C.ZOK
1410+ ZSYSTEMERROR Error = C.ZSYSTEMERROR
1411+ ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY
1412+ ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY
1413+ ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS
1414+ ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR
1415+ ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED
1416+ ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT
1417+ ZBADARGUMENTS Error = C.ZBADARGUMENTS
1418+ ZINVALIDSTATE Error = C.ZINVALIDSTATE
1419+ ZAPIERROR Error = C.ZAPIERROR
1420+ ZNONODE Error = C.ZNONODE
1421+ ZNOAUTH Error = C.ZNOAUTH
1422+ ZBADVERSION Error = C.ZBADVERSION
1423+ ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS
1424+ ZNODEEXISTS Error = C.ZNODEEXISTS
1425+ ZNOTEMPTY Error = C.ZNOTEMPTY
1426+ ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED
1427+ ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK
1428+ ZINVALIDACL Error = C.ZINVALIDACL
1429+ ZAUTHFAILED Error = C.ZAUTHFAILED
1430+ ZCLOSING Error = C.ZCLOSING
1431+ ZNOTHING Error = C.ZNOTHING
1432+ ZSESSIONMOVED Error = C.ZSESSIONMOVED
1433 )
1434
1435+func (error Error) String() string {
1436+ return C.GoString(C.zerror(C.int(error))) // Static, no need to free it.
1437+}
1438+
1439+// zkError creates an appropriate error return from
1440+// a ZooKeeper status and the errno return from a C API
1441+// call.
1442+func zkError(rc C.int, cerr os.Error) os.Error {
1443+ code := Error(rc)
1444+ switch code {
1445+ case ZOK:
1446+ return nil
1447+
1448+ case ZSYSTEMERROR:
1449+ // If a ZooKeeper call returns ZSYSTEMERROR, then
1450+ // errno becomes significant. If errno has not been
1451+ // set, then we will return ZSYSTEMERROR nonetheless.
1452+ if cerr != nil {
1453+ return cerr
1454+ }
1455+ }
1456+ return code
1457+}
1458+
1459 // Constants for SetLogLevel.
1460 const (
1461 LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR
1462@@ -141,8 +170,8 @@
1463
1464 // Constants for Create's flags parameter.
1465 const (
1466- EPHEMERAL = 1 << iota
1467- SEQUENCE
1468+ EPHEMERAL = 1 << iota // The node will be deleted when the current session ends.
1469+ SEQUENCE // Append a sequence number to the node name.
1470 )
1471
1472 // Constants for ACL Perms.
1473@@ -273,104 +302,79 @@
1474 }
1475
1476 // -----------------------------------------------------------------------
1477-// Error interface which maps onto the ZooKeeper error codes.
1478-
1479-type Error interface {
1480- String() string
1481- Code() int
1482-}
1483-
1484-type errorType struct {
1485- zkrc C.int
1486- err os.Error
1487-}
1488-
1489-func newError(zkrc C.int, err os.Error) Error {
1490- return &errorType{zkrc, err}
1491-}
1492-
1493-func (error *errorType) String() (result string) {
1494- if error.zkrc == ZSYSTEMERROR && error.err != nil {
1495- result = error.err.String()
1496- } else {
1497- result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it.
1498- }
1499- return
1500-}
1501-
1502-// Code returns the error code that may be compared against one of
1503-// the gozk.Z* constants.
1504-func (error *errorType) Code() int {
1505- return int(error.zkrc)
1506-}
1507-
1508-// -----------------------------------------------------------------------
1509-// Stat interface which maps onto the ZooKeeper Stat struct.
1510-
1511-// We declare this as an interface rather than an actual struct because
1512-// this way we don't have to copy data around between the real C struct
1513-// and the Go one on every call. Most uses will only touch a few elements,
1514-// or even ignore the stat entirely, so that's a win.
1515
1516 // Stat contains detailed information about a node.
1517-type Stat interface {
1518- Czxid() int64
1519- Mzxid() int64
1520- CTime() int64
1521- MTime() int64
1522- Version() int32
1523- CVersion() int32
1524- AVersion() int32
1525- EphemeralOwner() int64
1526- DataLength() int32
1527- NumChildren() int32
1528- Pzxid() int64
1529-}
1530-
1531-type resultStat C.struct_Stat
1532-
1533-func (stat *resultStat) Czxid() int64 {
1534- return int64(stat.czxid)
1535-}
1536-
1537-func (stat *resultStat) Mzxid() int64 {
1538- return int64(stat.mzxid)
1539-}
1540-
1541-func (stat *resultStat) CTime() int64 {
1542- return int64(stat.ctime)
1543-}
1544-
1545-func (stat *resultStat) MTime() int64 {
1546- return int64(stat.mtime)
1547-}
1548-
1549-func (stat *resultStat) Version() int32 {
1550- return int32(stat.version)
1551-}
1552-
1553-func (stat *resultStat) CVersion() int32 {
1554- return int32(stat.cversion)
1555-}
1556-
1557-func (stat *resultStat) AVersion() int32 {
1558- return int32(stat.aversion)
1559-}
1560-
1561-func (stat *resultStat) EphemeralOwner() int64 {
1562- return int64(stat.ephemeralOwner)
1563-}
1564-
1565-func (stat *resultStat) DataLength() int32 {
1566- return int32(stat.dataLength)
1567-}
1568-
1569-func (stat *resultStat) NumChildren() int32 {
1570- return int32(stat.numChildren)
1571-}
1572-
1573-func (stat *resultStat) Pzxid() int64 {
1574- return int64(stat.pzxid)
1575+type Stat struct {
1576+ c C.struct_Stat
1577+}
1578+
1579+func (stat *Stat) String() string {
1580+ return fmt.Sprintf(
1581+ "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+
1582+ "Version: %d; CVersion: %d; AVersion: %d; "+
1583+ "EphemeralOwner: %d; DataLength: %d; "+
1584+ "NumChildren: %d; Pzxid: %d}",
1585+ stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(),
1586+ stat.Version(), stat.CVersion(), stat.AVersion(),
1587+ stat.EphemeralOwner(), stat.DataLength(),
1588+ stat.NumChildren(), stat.Pzxid(),
1589+ )
1590+}
1591+
1592+// Czxid returns the zxid of the change that caused the node to be created.
1593+func (stat *Stat) Czxid() int64 {
1594+ return int64(stat.c.czxid)
1595+}
1596+
1597+// Mzxid returns the zxid of the change that last modified the node.
1598+func (stat *Stat) Mzxid() int64 {
1599+ return int64(stat.c.mzxid)
1600+}
1601+
1602+// CTime returns the time in milliseconds from epoch when the node was created.
1603+func (stat *Stat) CTime() int64 {
1604+ return int64(stat.c.ctime)
1605+}
1606+
1607+// MTime returns the time in milliseconds from epoch when the node was last modified.
1608+func (stat *Stat) MTime() int64 {
1609+ return int64(stat.c.mtime)
1610+}
1611+
1612+// Version returns the number of changes to the data of the node.
1613+func (stat *Stat) Version() int32 {
1614+ return int32(stat.c.version)
1615+}
1616+
1617+// CVersion returns the number of changes to the children of the node.
1618+func (stat *Stat) CVersion() int32 {
1619+ return int32(stat.c.cversion)
1620+}
1621+
1622+// AVersion returns the number of changes to the ACL of the node.
1623+func (stat *Stat) AVersion() int32 {
1624+ return int32(stat.c.aversion)
1625+}
1626+
1627+// If the node is an ephemeral node, EphemeralOwner returns the session id
1628+// of the owner of the node; otherwise it will return zero.
1629+func (stat *Stat) EphemeralOwner() int64 {
1630+ return int64(stat.c.ephemeralOwner)
1631+}
1632+
1633+// DataLength returns the length of the data in the node in bytes.
1634+func (stat *Stat) DataLength() int32 {
1635+ return int32(stat.c.dataLength)
1636+}
1637+
1638+// NumChildren returns the number of children of the znode.
1639+func (stat *Stat) NumChildren() int32 {
1640+ return int32(stat.c.numChildren)
1641+}
1642+
1643+// Pzxid returns the Pzxid of the node, whatever that is.
1644+func (stat *Stat) Pzxid() int64 {
1645+ return int64(stat.c.pzxid)
1646 }
1647
1648 // -----------------------------------------------------------------------
1649@@ -380,11 +384,13 @@
1650
1651 // SetLogLevel changes the minimum level of logging output generated
1652 // to adjust the amount of information provided.
1653+// The default is LOG_ERROR. If level is zero, no logging output
1654+// will be generated.
1655 func SetLogLevel(level int) {
1656 C.zoo_set_debug_level(C.ZooLogLevel(level))
1657 }
1658
1659-// Init initializes the communication with a ZooKeeper cluster. The provided
1660+// Dial initializes the communication with a ZooKeeper cluster. The provided
1661 // servers parameter may include multiple server addresses, separated
1662 // by commas, so that the client will automatically attempt to connect
1663 // to another server if one of them stops working for whatever reason.
1664@@ -398,79 +404,73 @@
1665 // The watch channel receives events of type SESSION_EVENT when any change
1666 // to the state of the established connection happens. See the documentation
1667 // for the Event type for more details.
1668-func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) {
1669- zk, watch, err = internalInit(servers, recvTimeoutNS, nil)
1670- return
1671+func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) {
1672+ return dial(servers, recvTimeoutNS, nil)
1673 }
1674
1675-// Equivalent to Init, but attempt to reestablish an existing session
1676+// Redial is equivalent to Dial, but attempts to reestablish an existing session
1677 // identified via the clientId parameter.
1678-func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) {
1679- zk, watch, err = internalInit(servers, recvTimeoutNS, clientId)
1680- return
1681+func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
1682+ return dial(servers, recvTimeoutNS, clientId)
1683 }
1684
1685-func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) {
1686-
1687- zk := &ZooKeeper{}
1688- zk.watchChannels = make(map[uintptr]chan Event)
1689+func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) {
1690+ conn := &Conn{}
1691+ conn.watchChannels = make(map[uintptr]chan Event)
1692
1693 var cId *C.clientid_t
1694 if clientId != nil {
1695 cId = &clientId.cId
1696 }
1697
1698- watchId, watchChannel := zk.createWatch(true)
1699- zk.sessionWatchId = watchId
1700+ watchId, watchChannel := conn.createWatch(true)
1701+ conn.sessionWatchId = watchId
1702
1703 cservers := C.CString(servers)
1704- handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0)
1705+ handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0)
1706 C.free(unsafe.Pointer(cservers))
1707 if handle == nil {
1708- zk.closeAllWatches()
1709- return nil, nil, newError(ZSYSTEMERROR, cerr)
1710+ conn.closeAllWatches()
1711+ return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr)
1712 }
1713- zk.handle = handle
1714+ conn.handle = handle
1715 runWatchLoop()
1716- return zk, watchChannel, nil
1717+ return conn, watchChannel, nil
1718 }
1719
1720 // ClientId returns the client ID for the existing session with ZooKeeper.
1721 // This is useful to reestablish an existing session via ReInit.
1722-func (zk *ZooKeeper) ClientId() *ClientId {
1723- return &ClientId{*C.zoo_client_id(zk.handle)}
1724+func (conn *Conn) ClientId() *ClientId {
1725+ return &ClientId{*C.zoo_client_id(conn.handle)}
1726 }
1727
1728 // Close terminates the ZooKeeper interaction.
1729-func (zk *ZooKeeper) Close() Error {
1730-
1731- // Protect from concurrency around zk.handle change.
1732- zk.mutex.Lock()
1733- defer zk.mutex.Unlock()
1734-
1735- if zk.handle == nil {
1736+func (conn *Conn) Close() os.Error {
1737+
1738+ // Protect from concurrency around conn.handle change.
1739+ conn.mutex.Lock()
1740+ defer conn.mutex.Unlock()
1741+
1742+ if conn.handle == nil {
1743 // ZooKeeper may hang indefinitely if a handler is closed twice,
1744 // so we get in the way and prevent it from happening.
1745- return newError(ZCLOSING, nil)
1746+ return ZCLOSING
1747 }
1748- rc, cerr := C.zookeeper_close(zk.handle)
1749+ rc, cerr := C.zookeeper_close(conn.handle)
1750
1751- zk.closeAllWatches()
1752+ conn.closeAllWatches()
1753 stopWatchLoop()
1754
1755- // At this point, nothing else should need zk.handle.
1756- zk.handle = nil
1757+ // At this point, nothing else should need conn.handle.
1758+ conn.handle = nil
1759
1760- if rc != C.ZOK {
1761- return newError(rc, cerr)
1762- }
1763- return nil
1764+ return zkError(rc, cerr)
1765 }
1766
1767 // Get returns the data and status from an existing node. err will be nil,
1768 // unless an error is found. Attempting to retrieve data from a non-existing
1769 // node is an error.
1770-func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) {
1771+func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) {
1772
1773 cpath := C.CString(path)
1774 cbuffer := (*C.char)(C.malloc(bufferSize))
1775@@ -478,22 +478,22 @@
1776 defer C.free(unsafe.Pointer(cpath))
1777 defer C.free(unsafe.Pointer(cbuffer))
1778
1779- cstat := C.struct_Stat{}
1780- rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil,
1781- cbuffer, &cbufferLen, &cstat)
1782+ var cstat Stat
1783+ rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c)
1784 if rc != C.ZOK {
1785- return "", nil, newError(rc, cerr)
1786+ return "", nil, zkError(rc, cerr)
1787 }
1788+
1789 result := C.GoStringN(cbuffer, cbufferLen)
1790-
1791- return result, (*resultStat)(&cstat), nil
1792+ return result, &cstat, nil
1793 }
1794
1795 // GetW works like Get but also returns a channel that will receive
1796-// a single Event value when the data or existence of the given ZooKeeper
1797-// node changes or when critical session events happen. See the
1798-// documentation of the Event type for more details.
1799-func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) {
1800+// a single Event value, with Type EVENT_CHANGED or EVENT_DELETED,
1801+// when the node contents change or it is deleted.
1802+// It will also receive a critical session event if the server goes down.
1803+// See the documentation of the Event type for more details.
1804+func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) {
1805
1806 cpath := C.CString(path)
1807 cbuffer := (*C.char)(C.malloc(bufferSize))
1808@@ -501,73 +501,69 @@
1809 defer C.free(unsafe.Pointer(cpath))
1810 defer C.free(unsafe.Pointer(cbuffer))
1811
1812- watchId, watchChannel := zk.createWatch(true)
1813+ watchId, watchChannel := conn.createWatch(true)
1814
1815- cstat := C.struct_Stat{}
1816- rc, cerr := C.zoo_wget(zk.handle, cpath,
1817- C.watch_handler, unsafe.Pointer(watchId),
1818- cbuffer, &cbufferLen, &cstat)
1819+ var cstat Stat
1820+ rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c)
1821 if rc != C.ZOK {
1822- zk.forgetWatch(watchId)
1823- return "", nil, nil, newError(rc, cerr)
1824+ conn.forgetWatch(watchId)
1825+ return "", nil, nil, zkError(rc, cerr)
1826 }
1827
1828 result := C.GoStringN(cbuffer, cbufferLen)
1829- return result, (*resultStat)(&cstat), watchChannel, nil
1830+ return result, &cstat, watchChannel, nil
1831 }
1832
1833 // Children returns the children list and status from an existing node.
1834-// err will be nil, unless an error is found. Attempting to retrieve the
1835-// children list from a non-existent node is an error.
1836-func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) {
1837+// Attempting to retrieve the children list from a non-existent node is an error.
1838+func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) {
1839
1840 cpath := C.CString(path)
1841 defer C.free(unsafe.Pointer(cpath))
1842
1843 cvector := C.struct_String_vector{}
1844- cstat := C.struct_Stat{}
1845- rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil,
1846- &cvector, &cstat)
1847+ var cstat Stat
1848+ rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c)
1849
1850 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
1851 if cvector.count != 0 {
1852 children = parseStringVector(&cvector)
1853 }
1854- if rc != C.ZOK {
1855- err = newError(rc, cerr)
1856+ if rc == C.ZOK {
1857+ stat = &cstat
1858 } else {
1859- stat = (*resultStat)(&cstat)
1860+ err = zkError(rc, cerr)
1861 }
1862 return
1863 }
1864
1865 // ChildrenW works like Children but also returns a channel that will
1866-// receive a single Event value when a node is added or removed under the
1867-// provided path or when critical session events happen. See the documentation
1868-// of the Event type for more details.
1869-func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) {
1870+// receive a single Event value when a child of the given path is
1871+// added or removed (EVENT_CHILD), or when the path node itself is
1872+// deleted (EVENT_DELETED).
1873+// It will also receive a critical session event if the server goes down.
1874+// See the documentation of the Event type for more details.
1875+func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) {
1876
1877 cpath := C.CString(path)
1878 defer C.free(unsafe.Pointer(cpath))
1879
1880- watchId, watchChannel := zk.createWatch(true)
1881+ watchId, watchChannel := conn.createWatch(true)
1882
1883 cvector := C.struct_String_vector{}
1884- cstat := C.struct_Stat{}
1885- rc, cerr := C.zoo_wget_children2(zk.handle, cpath,
1886- C.watch_handler, unsafe.Pointer(watchId),
1887- &cvector, &cstat)
1888+ var cstat Stat
1889+ rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c)
1890
1891 // Can't happen if rc != 0, but avoid potential memory leaks in the future.
1892 if cvector.count != 0 {
1893 children = parseStringVector(&cvector)
1894 }
1895- if rc != C.ZOK {
1896- zk.forgetWatch(watchId)
1897- err = newError(rc, cerr)
1898+ if rc == C.ZOK {
1899+ stat = &cstat
1900+ watch = watchChannel
1901 } else {
1902- stat = (*resultStat)(&cstat)
1903- watch = watchChannel
1904+ conn.forgetWatch(watchId)
1905+ err = zkError(rc, cerr)
1906 }
1907 return
1908 }
1909@@ -588,51 +584,51 @@
1910 // Exists checks if a node exists at the given path. If it does,
1911 // stat will contain meta information on the existing node, otherwise
1912 // it will be nil.
1913-func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) {
1914+func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) {
1915 cpath := C.CString(path)
1916 defer C.free(unsafe.Pointer(cpath))
1917
1918- cstat := C.struct_Stat{}
1919- rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat)
1920+ var cstat Stat
1921+ rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c)
1922
1923 // We diverge a bit from the usual here: a ZNONODE is not an error
1924 // for an exists call, otherwise every Exists call would have to check
1925 // for err != nil and err.Code() != ZNONODE.
1926 if rc == C.ZOK {
1927- stat = (*resultStat)(&cstat)
1928+ stat = &cstat
1929 } else if rc != C.ZNONODE {
1930- err = newError(rc, cerr)
1931+ err = zkError(rc, cerr)
1932 }
1933 return
1934 }
1935
1936 // ExistsW works like Exists but also returns a channel that will
1937-// receive an Event value when a node is created in case the returned
1938-// stat is nil and the node didn't exist, or when the existing node
1939-// is removed. It will also receive critical session events. See the
1940-// documentation of the Event type for more details.
1941-func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) {
1942+// receive a single Event, with Type EVENT_CREATED, EVENT_DELETED
1943+// or EVENT_CHANGED, when the node is next created, removed,
1944+// or its contents change.
1945+// It will also receive a critical session event if the server goes down.
1946+// See the documentation of the Event type for more details.
1947+func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) {
1948 cpath := C.CString(path)
1949 defer C.free(unsafe.Pointer(cpath))
1950
1951- watchId, watchChannel := zk.createWatch(true)
1952+ watchId, watchChannel := conn.createWatch(true)
1953
1954- cstat := C.struct_Stat{}
1955- rc, cerr := C.zoo_wexists(zk.handle, cpath,
1956- C.watch_handler, unsafe.Pointer(watchId), &cstat)
1957+ var cstat Stat
1958+ rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c)
1959
1960 // We diverge a bit from the usual here: a ZNONODE is not an error
1961 // for an exists call, otherwise every Exists call would have to check
1962 // for err != nil and err.Code() != ZNONODE.
1963- switch rc {
1964+ switch Error(rc) {
1965 case ZOK:
1966- stat = (*resultStat)(&cstat)
1967+ stat = &cstat
1968 watch = watchChannel
1969 case ZNONODE:
1970 watch = watchChannel
1971 default:
1972- zk.forgetWatch(watchId)
1973- err = newError(rc, cerr)
1974+ conn.forgetWatch(watchId)
1975+ err = zkError(rc, cerr)
1976 }
1977 return
1978 }
1979@@ -646,7 +642,7 @@
1980 // The returned path is useful in cases where the created path may differ
1981 // from the requested one, such as when a sequence number is appended
1982 // to it due to the use of the gozk.SEQUENCE flag.
1983-func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) {
1984+func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) {
1985 cpath := C.CString(path)
1986 cvalue := C.CString(value)
1987 defer C.free(unsafe.Pointer(cpath))
1988@@ -660,13 +656,13 @@
1989 cpathCreated := (*C.char)(C.malloc(cpathLen))
1990 defer C.free(unsafe.Pointer(cpathCreated))
1991
1992- rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)),
1993- caclv, C.int(flags), cpathCreated, C.int(cpathLen))
1994- if rc != C.ZOK {
1995- return "", newError(rc, cerr)
1996+ rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen))
1997+ if rc == C.ZOK {
1998+ pathCreated = C.GoString(cpathCreated)
1999+ } else {
2000+ err = zkError(rc, cerr)
2001 }
2002-
2003- return C.GoString(cpathCreated), nil
2004+ return
2005 }
2006
2007 // Set modifies the data for the existing node at the given path, replacing it
2008@@ -677,35 +673,31 @@
2009 //
2010 // It is an error to attempt to set the data of a non-existing node with
2011 // this function. In these cases, use Create instead.
2012-func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) {
2013+func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) {
2014
2015 cpath := C.CString(path)
2016 cvalue := C.CString(value)
2017 defer C.free(unsafe.Pointer(cpath))
2018 defer C.free(unsafe.Pointer(cvalue))
2019
2020- cstat := C.struct_Stat{}
2021-
2022- rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)),
2023- C.int(version), &cstat)
2024- if rc != C.ZOK {
2025- return nil, newError(rc, cerr)
2026+ var cstat Stat
2027+ rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c)
2028+ if rc == C.ZOK {
2029+ stat = &cstat
2030+ } else {
2031+ err = zkError(rc, cerr)
2032 }
2033-
2034- return (*resultStat)(&cstat), nil
2035+ return
2036 }
2037
2038 // Delete removes the node at path. If version is not -1, the operation
2039 // will only succeed if the node is still at this version when the
2040 // node is deleted as an atomic operation.
2041-func (zk *ZooKeeper) Delete(path string, version int32) (err Error) {
2042+func (conn *Conn) Delete(path string, version int32) (err os.Error) {
2043 cpath := C.CString(path)
2044 defer C.free(unsafe.Pointer(cpath))
2045- rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version))
2046- if rc != C.ZOK {
2047- return newError(rc, cerr)
2048- }
2049- return nil
2050+ rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
2051+ return zkError(rc, cerr)
2052 }
2053
2054 // AddAuth adds a new authentication certificate to the ZooKeeper
2055@@ -713,7 +705,7 @@
2056 // authentication information, while the cert parameter provides the
2057 // identity data itself. For instance, the "digest" scheme requires
2058 // a pair like "username:password" to be provided as the certificate.
2059-func (zk *ZooKeeper) AddAuth(scheme, cert string) Error {
2060+func (conn *Conn) AddAuth(scheme, cert string) os.Error {
2061 cscheme := C.CString(scheme)
2062 ccert := C.CString(cert)
2063 defer C.free(unsafe.Pointer(cscheme))
2064@@ -725,43 +717,38 @@
2065 }
2066 defer C.destroy_completion_data(data)
2067
2068- rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)),
2069- C.handle_void_completion, unsafe.Pointer(data))
2070+ rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data))
2071 if rc != C.ZOK {
2072- return newError(rc, cerr)
2073+ return zkError(rc, cerr)
2074 }
2075
2076 C.wait_for_completion(data)
2077
2078 rc = C.int(uintptr(data.data))
2079- if rc != C.ZOK {
2080- return newError(rc, nil)
2081- }
2082-
2083- return nil
2084+ return zkError(rc, nil)
2085 }
2086
2087 // ACL returns the access control list for path.
2088-func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) {
2089+func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) {
2090
2091 cpath := C.CString(path)
2092 defer C.free(unsafe.Pointer(cpath))
2093
2094 caclv := C.struct_ACL_vector{}
2095- cstat := C.struct_Stat{}
2096
2097- rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat)
2098+ var cstat Stat
2099+ rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c)
2100 if rc != C.ZOK {
2101- return nil, nil, newError(rc, cerr)
2102+ return nil, nil, zkError(rc, cerr)
2103 }
2104
2105 aclv := parseACLVector(&caclv)
2106
2107- return aclv, (*resultStat)(&cstat), nil
2108+ return aclv, &cstat, nil
2109 }
2110
2111 // SetACL changes the access control list for path.
2112-func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error {
2113+func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error {
2114
2115 cpath := C.CString(path)
2116 defer C.free(unsafe.Pointer(cpath))
2117@@ -769,12 +756,8 @@
2118 caclv := buildACLVector(aclv)
2119 defer C.deallocate_ACL_vector(caclv)
2120
2121- rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv)
2122- if rc != C.ZOK {
2123- return newError(rc, cerr)
2124- }
2125-
2126- return nil
2127+ rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv)
2128+ return zkError(rc, cerr)
2129 }
2130
2131 func parseACLVector(caclv *C.struct_ACL_vector) []ACL {
2132@@ -822,7 +805,7 @@
2133 // -----------------------------------------------------------------------
2134 // RetryChange utility method.
2135
2136-type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error)
2137+type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error)
2138
2139 // RetryChange runs changeFunc to attempt to atomically change path
2140 // in a lock free manner, and retries in case there was another
2141@@ -831,8 +814,7 @@
2142 // changeFunc must work correctly if called multiple times in case
2143 // the modification fails due to concurrent changes, and it may return
2144 // an error that will cause the the RetryChange function to stop and
2145-// return an Error with code ZSYSTEMERROR and the same .String() result
2146-// as the provided error.
2147+// return the same error.
2148 //
2149 // This mechanism is not suitable for a node that is frequently modified
2150 // concurrently. For those cases, consider using a pessimistic locking
2151@@ -845,8 +827,7 @@
2152 //
2153 // 2. Call the changeFunc with the current node value and stat,
2154 // or with an empty string and nil stat, if the node doesn't yet exist.
2155-// If the changeFunc returns an error, stop and return an Error with
2156-// ZSYSTEMERROR Code() and the same String() as the changeFunc error.
2157+// If the changeFunc returns an error, stop and return the same error.
2158 //
2159 // 3. If the changeFunc returns no errors, use the string returned as
2160 // the new candidate value for the node, and attempt to either create
2161@@ -855,36 +836,32 @@
2162 // in the same node), repeat from step 1. If this procedure fails with any
2163 // other error, stop and return the error found.
2164 //
2165-func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) {
2166+func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
2167 for {
2168- oldValue, oldStat, getErr := zk.Get(path)
2169- if getErr != nil && getErr.Code() != ZNONODE {
2170- err = getErr
2171- break
2172- }
2173- newValue, osErr := changeFunc(oldValue, oldStat)
2174- if osErr != nil {
2175- return newError(ZSYSTEMERROR, osErr)
2176- } else if oldStat == nil {
2177- _, err = zk.Create(path, newValue, flags, acl)
2178- if err == nil || err.Code() != ZNODEEXISTS {
2179- break
2180+ oldValue, oldStat, err := conn.Get(path)
2181+ if err != nil && err != ZNONODE {
2182+ return err
2183+ }
2184+ newValue, err := changeFunc(oldValue, oldStat)
2185+ if err != nil {
2186+ return err
2187+ }
2188+ if oldStat == nil {
2189+ _, err := conn.Create(path, newValue, flags, acl)
2190+ if err == nil || err != ZNODEEXISTS {
2191+ return err
2192 }
2193- } else if newValue == oldValue {
2194+ continue
2195+ }
2196+ if newValue == oldValue {
2197 return nil // Nothing to do.
2198- } else {
2199- _, err = zk.Set(path, newValue, oldStat.Version())
2200- if err == nil {
2201- break
2202- } else {
2203- code := err.Code()
2204- if code != ZBADVERSION && code != ZNONODE {
2205- break
2206- }
2207- }
2208+ }
2209+ _, err = conn.Set(path, newValue, oldStat.Version())
2210+ if err == nil || (err != ZBADVERSION && err != ZNONODE) {
2211+ return err
2212 }
2213 }
2214- return err
2215+ panic("not reached")
2216 }
2217
2218 // -----------------------------------------------------------------------
2219@@ -897,7 +874,7 @@
2220 // Whenever a *W method is called, it will return a channel which
2221 // outputs Event values. Internally, a map is used to maintain references
2222 // between an unique integer key (the watchId), and the event channel. The
2223-// watchId is then handed to the C zookeeper library as the watch context,
2224+// watchId is then handed to the C ZooKeeper library as the watch context,
2225 // so that we get it back when events happen. Using an integer key as the
2226 // watch context rather than a pointer is needed because there's no guarantee
2227 // that in the future the GC will not move objects around, and also because
2228@@ -910,13 +887,13 @@
2229 // Since Cgo doesn't allow calling back into Go, we actually fire a new
2230 // goroutine the very first time Init is called, and allow it to block
2231 // in a pthread condition variable within a C function. This condition
2232-// will only be notified once a zookeeper watch callback appends new
2233+// will only be notified once a ZooKeeper watch callback appends new
2234 // entries to the event list. When this happens, the C function returns
2235 // and we get back into Go land with the pointer to the watch data,
2236 // including the watchId and other event details such as type and path.
2237
2238 var watchMutex sync.Mutex
2239-var watchZooKeepers = make(map[uintptr]*ZooKeeper)
2240+var watchConns = make(map[uintptr]*Conn)
2241 var watchCounter uintptr
2242 var watchLoopCounter int
2243
2244@@ -925,14 +902,14 @@
2245 // mostly as a debugging and testing aid.
2246 func CountPendingWatches() int {
2247 watchMutex.Lock()
2248- count := len(watchZooKeepers)
2249+ count := len(watchConns)
2250 watchMutex.Unlock()
2251 return count
2252 }
2253
2254 // createWatch creates and registers a watch, returning the watch id
2255 // and channel.
2256-func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
2257+func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) {
2258 buf := 1 // session/watch event
2259 if session {
2260 buf = 32
2261@@ -942,8 +919,8 @@
2262 defer watchMutex.Unlock()
2263 watchId = watchCounter
2264 watchCounter += 1
2265- zk.watchChannels[watchId] = watchChannel
2266- watchZooKeepers[watchId] = zk
2267+ conn.watchChannels[watchId] = watchChannel
2268+ watchConns[watchId] = conn
2269 return
2270 }
2271
2272@@ -951,21 +928,21 @@
2273 // from ever getting delivered. It shouldn't be used if there's any
2274 // chance the watch channel is still visible and not closed, since
2275 // it might mean a goroutine would be blocked forever.
2276-func (zk *ZooKeeper) forgetWatch(watchId uintptr) {
2277+func (conn *Conn) forgetWatch(watchId uintptr) {
2278 watchMutex.Lock()
2279 defer watchMutex.Unlock()
2280- zk.watchChannels[watchId] = nil, false
2281- watchZooKeepers[watchId] = nil, false
2282+ conn.watchChannels[watchId] = nil, false
2283+ watchConns[watchId] = nil, false
2284 }
2285
2286-// closeAllWatches closes all watch channels for zk.
2287-func (zk *ZooKeeper) closeAllWatches() {
2288+// closeAllWatches closes all watch channels for conn.
2289+func (conn *Conn) closeAllWatches() {
2290 watchMutex.Lock()
2291 defer watchMutex.Unlock()
2292- for watchId, ch := range zk.watchChannels {
2293+ for watchId, ch := range conn.watchChannels {
2294 close(ch)
2295- zk.watchChannels[watchId] = nil, false
2296- watchZooKeepers[watchId] = nil, false
2297+ conn.watchChannels[watchId] = nil, false
2298+ watchConns[watchId] = nil, false
2299 }
2300 }
2301
2302@@ -978,11 +955,11 @@
2303 }
2304 watchMutex.Lock()
2305 defer watchMutex.Unlock()
2306- zk, ok := watchZooKeepers[watchId]
2307+ conn, ok := watchConns[watchId]
2308 if !ok {
2309 return
2310 }
2311- if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId {
2312+ if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId {
2313 switch event.State {
2314 case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED:
2315 default:
2316@@ -990,7 +967,7 @@
2317 return
2318 }
2319 }
2320- ch := zk.watchChannels[watchId]
2321+ ch := conn.watchChannels[watchId]
2322 if ch == nil {
2323 return
2324 }
2325@@ -1002,15 +979,15 @@
2326 // straight to the buffer), and the application isn't paying
2327 // attention for long enough to have the buffer filled up.
2328 // Break down now rather than leaking forever.
2329- if watchId == zk.sessionWatchId {
2330+ if watchId == conn.sessionWatchId {
2331 panic("Session event channel buffer is full")
2332 } else {
2333 panic("Watch event channel buffer is full")
2334 }
2335 }
2336- if watchId != zk.sessionWatchId {
2337- zk.watchChannels[watchId] = nil, false
2338- watchZooKeepers[watchId] = nil, false
2339+ if watchId != conn.sessionWatchId {
2340+ conn.watchChannels[watchId] = nil, false
2341+ watchConns[watchId] = nil, false
2342 close(ch)
2343 }
2344 }
2345
2346=== renamed file 'gozk_test.go' => 'zk_test.go'
2347--- gozk_test.go 2011-08-19 01:51:37 +0000
2348+++ zk_test.go 2011-10-03 17:02:23 +0000
2349@@ -1,17 +1,17 @@
2350-package gozk_test
2351+package zk_test
2352
2353 import (
2354 . "launchpad.net/gocheck"
2355- "gozk"
2356+ "launchpad.net/gozk/zk"
2357 "time"
2358 )
2359
2360 // This error will be delivered via C errno, since ZK unfortunately
2361 // only provides the handler back from zookeeper_init().
2362 func (s *S) TestInitErrorThroughErrno(c *C) {
2363- zk, watch, err := gozk.Init("bad-domain-without-port", 5e9)
2364- if zk != nil {
2365- zk.Close()
2366+ conn, watch, err := zk.Dial("bad-domain-without-port", 5e9)
2367+ if conn != nil {
2368+ conn.Close()
2369 }
2370 if watch != nil {
2371 go func() {
2372@@ -23,15 +23,15 @@
2373 }
2374 }()
2375 }
2376- c.Assert(zk, IsNil)
2377+ c.Assert(conn, IsNil)
2378 c.Assert(watch, IsNil)
2379 c.Assert(err, Matches, "invalid argument")
2380 }
2381
2382 func (s *S) TestRecvTimeoutInitParameter(c *C) {
2383- zk, watch, err := gozk.Init(s.zkAddr, 0)
2384+ conn, watch, err := zk.Dial(s.zkAddr, 0)
2385 c.Assert(err, IsNil)
2386- defer zk.Close()
2387+ defer conn.Close()
2388
2389 select {
2390 case <-watch:
2391@@ -40,7 +40,7 @@
2392 }
2393
2394 for i := 0; i != 1000; i++ {
2395- _, _, err := zk.Get("/zookeeper")
2396+ _, _, err := conn.Get("/zookeeper")
2397 if err != nil {
2398 c.Assert(err, Matches, "operation timeout")
2399 c.SucceedNow()
2400@@ -51,76 +51,79 @@
2401 }
2402
2403 func (s *S) TestSessionWatches(c *C) {
2404- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2405+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2406
2407 zk1, watch1 := s.init(c)
2408 zk2, watch2 := s.init(c)
2409 zk3, watch3 := s.init(c)
2410
2411- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2412+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2413
2414 event1 := <-watch1
2415- c.Assert(event1.Type, Equals, gozk.EVENT_SESSION)
2416- c.Assert(event1.State, Equals, gozk.STATE_CONNECTED)
2417+ c.Assert(event1.Type, Equals, zk.EVENT_SESSION)
2418+ c.Assert(event1.State, Equals, zk.STATE_CONNECTED)
2419
2420- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2421+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2422
2423 event2 := <-watch2
2424- c.Assert(event2.Type, Equals, gozk.EVENT_SESSION)
2425- c.Assert(event2.State, Equals, gozk.STATE_CONNECTED)
2426+ c.Assert(event2.Type, Equals, zk.EVENT_SESSION)
2427+ c.Assert(event2.State, Equals, zk.STATE_CONNECTED)
2428
2429- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2430+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2431
2432 event3 := <-watch3
2433- c.Assert(event3.Type, Equals, gozk.EVENT_SESSION)
2434- c.Assert(event3.State, Equals, gozk.STATE_CONNECTED)
2435+ c.Assert(event3.Type, Equals, zk.EVENT_SESSION)
2436+ c.Assert(event3.State, Equals, zk.STATE_CONNECTED)
2437
2438- c.Assert(gozk.CountPendingWatches(), Equals, 3)
2439+ c.Assert(zk.CountPendingWatches(), Equals, 3)
2440
2441 zk1.Close()
2442- c.Assert(gozk.CountPendingWatches(), Equals, 2)
2443+ c.Assert(zk.CountPendingWatches(), Equals, 2)
2444 zk2.Close()
2445- c.Assert(gozk.CountPendingWatches(), Equals, 1)
2446+ c.Assert(zk.CountPendingWatches(), Equals, 1)
2447 zk3.Close()
2448- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2449+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2450 }
2451
2452-// Gozk injects a STATE_CLOSED event when zk.Close() is called, right
2453+// Gozk injects a STATE_CLOSED event when conn.Close() is called, right
2454 // before the channel is closed. Closing the channel injects a nil
2455 // pointer, as usual for Go, so the STATE_CLOSED gives a chance to
2456 // know that a nil pointer is coming, and to stop the procedure.
2457 // Hopefully this procedure will avoid some nil-pointer references by
2458 // mistake.
2459 func (s *S) TestClosingStateInSessionWatch(c *C) {
2460- zk, watch := s.init(c)
2461+ conn, watch := s.init(c)
2462
2463 event := <-watch
2464- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
2465- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
2466+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
2467+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
2468
2469- zk.Close()
2470+ conn.Close()
2471 event, ok := <-watch
2472 c.Assert(ok, Equals, false)
2473- c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)
2474- c.Assert(event.State, Equals, gozk.STATE_CLOSED)
2475+ c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
2476+ c.Assert(event.State, Equals, zk.STATE_CLOSED)
2477 }
2478
2479 func (s *S) TestEventString(c *C) {
2480- var event gozk.Event
2481- event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED}
2482+ var event zk.Event
2483+ event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED}
2484 c.Assert(event, Matches, "ZooKeeper connected")
2485- event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED}
2486+ event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED}
2487 c.Assert(event, Matches, "ZooKeeper connected; path created: /path")
2488- event = gozk.Event{-1, "/path", gozk.STATE_CLOSED}
2489+ event = zk.Event{-1, "/path", zk.STATE_CLOSED}
2490 c.Assert(event, Matches, "ZooKeeper connection closed")
2491 }
2492
2493-var okTests = []struct{gozk.Event; Ok bool}{
2494- {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true},
2495- {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true},
2496- {gozk.Event{0, "", gozk.STATE_CLOSED}, false},
2497- {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false},
2498- {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false},
2499+var okTests = []struct {
2500+ zk.Event
2501+ Ok bool
2502+}{
2503+ {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true},
2504+ {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true},
2505+ {zk.Event{0, "", zk.STATE_CLOSED}, false},
2506+ {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false},
2507+ {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false},
2508 }
2509
2510 func (s *S) TestEventOk(c *C) {
2511@@ -130,9 +133,9 @@
2512 }
2513
2514 func (s *S) TestGetAndStat(c *C) {
2515- zk, _ := s.init(c)
2516+ conn, _ := s.init(c)
2517
2518- data, stat, err := zk.Get("/zookeeper")
2519+ data, stat, err := conn.Get("/zookeeper")
2520 c.Assert(err, IsNil)
2521 c.Assert(data, Equals, "")
2522 c.Assert(stat.Czxid(), Equals, int64(0))
2523@@ -149,58 +152,58 @@
2524 }
2525
2526 func (s *S) TestGetAndError(c *C) {
2527- zk, _ := s.init(c)
2528+ conn, _ := s.init(c)
2529
2530- data, stat, err := zk.Get("/non-existent")
2531+ data, stat, err := conn.Get("/non-existent")
2532
2533 c.Assert(data, Equals, "")
2534 c.Assert(stat, IsNil)
2535 c.Assert(err, Matches, "no node")
2536- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2537+ c.Assert(err, Equals, zk.ZNONODE)
2538 }
2539
2540 func (s *S) TestCreateAndGet(c *C) {
2541- zk, _ := s.init(c)
2542+ conn, _ := s.init(c)
2543
2544- path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2545+ path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2546 c.Assert(err, IsNil)
2547 c.Assert(path, Matches, "/test-[0-9]+")
2548
2549 // Check the error condition from Create().
2550- _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2551+ _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2552 c.Assert(err, Matches, "node exists")
2553
2554- data, _, err := zk.Get(path)
2555+ data, _, err := conn.Get(path)
2556 c.Assert(err, IsNil)
2557 c.Assert(data, Equals, "bababum")
2558 }
2559
2560 func (s *S) TestCreateSetAndGet(c *C) {
2561- zk, _ := s.init(c)
2562+ conn, _ := s.init(c)
2563
2564- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2565+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2566 c.Assert(err, IsNil)
2567
2568- stat, err := zk.Set("/test", "bababum", -1) // Any version.
2569+ stat, err := conn.Set("/test", "bababum", -1) // Any version.
2570 c.Assert(err, IsNil)
2571 c.Assert(stat.Version(), Equals, int32(1))
2572
2573- data, _, err := zk.Get("/test")
2574+ data, _, err := conn.Get("/test")
2575 c.Assert(err, IsNil)
2576 c.Assert(data, Equals, "bababum")
2577 }
2578
2579 func (s *S) TestGetAndWatch(c *C) {
2580- c.Check(gozk.CountPendingWatches(), Equals, 0)
2581-
2582- zk, _ := s.init(c)
2583-
2584- c.Check(gozk.CountPendingWatches(), Equals, 1)
2585-
2586- _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2587+ c.Check(zk.CountPendingWatches(), Equals, 0)
2588+
2589+ conn, _ := s.init(c)
2590+
2591+ c.Check(zk.CountPendingWatches(), Equals, 1)
2592+
2593+ _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2594 c.Assert(err, IsNil)
2595
2596- data, stat, watch, err := zk.GetW("/test")
2597+ data, stat, watch, err := conn.GetW("/test")
2598 c.Assert(err, IsNil)
2599 c.Assert(data, Equals, "one")
2600 c.Assert(stat.Version(), Equals, int32(0))
2601@@ -211,17 +214,17 @@
2602 default:
2603 }
2604
2605- c.Check(gozk.CountPendingWatches(), Equals, 2)
2606+ c.Check(zk.CountPendingWatches(), Equals, 2)
2607
2608- _, err = zk.Set("/test", "two", -1)
2609+ _, err = conn.Set("/test", "two", -1)
2610 c.Assert(err, IsNil)
2611
2612 event := <-watch
2613- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2614-
2615- c.Check(gozk.CountPendingWatches(), Equals, 1)
2616-
2617- data, _, watch, err = zk.GetW("/test")
2618+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2619+
2620+ c.Check(zk.CountPendingWatches(), Equals, 1)
2621+
2622+ data, _, watch, err = conn.GetW("/test")
2623 c.Assert(err, IsNil)
2624 c.Assert(data, Equals, "two")
2625
2626@@ -231,86 +234,86 @@
2627 default:
2628 }
2629
2630- c.Check(gozk.CountPendingWatches(), Equals, 2)
2631+ c.Check(zk.CountPendingWatches(), Equals, 2)
2632
2633- _, err = zk.Set("/test", "three", -1)
2634+ _, err = conn.Set("/test", "three", -1)
2635 c.Assert(err, IsNil)
2636
2637 event = <-watch
2638- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2639+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2640
2641- c.Check(gozk.CountPendingWatches(), Equals, 1)
2642+ c.Check(zk.CountPendingWatches(), Equals, 1)
2643 }
2644
2645 func (s *S) TestGetAndWatchWithError(c *C) {
2646- c.Check(gozk.CountPendingWatches(), Equals, 0)
2647-
2648- zk, _ := s.init(c)
2649-
2650- c.Check(gozk.CountPendingWatches(), Equals, 1)
2651-
2652- _, _, watch, err := zk.GetW("/test")
2653+ c.Check(zk.CountPendingWatches(), Equals, 0)
2654+
2655+ conn, _ := s.init(c)
2656+
2657+ c.Check(zk.CountPendingWatches(), Equals, 1)
2658+
2659+ _, _, watch, err := conn.GetW("/test")
2660 c.Assert(err, NotNil)
2661- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2662+ c.Assert(err, Equals, zk.ZNONODE)
2663 c.Assert(watch, IsNil)
2664
2665- c.Check(gozk.CountPendingWatches(), Equals, 1)
2666+ c.Check(zk.CountPendingWatches(), Equals, 1)
2667 }
2668
2669 func (s *S) TestCloseReleasesWatches(c *C) {
2670- c.Check(gozk.CountPendingWatches(), Equals, 0)
2671-
2672- zk, _ := s.init(c)
2673-
2674- c.Check(gozk.CountPendingWatches(), Equals, 1)
2675-
2676- _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2677- c.Assert(err, IsNil)
2678-
2679- _, _, _, err = zk.GetW("/test")
2680- c.Assert(err, IsNil)
2681-
2682- c.Assert(gozk.CountPendingWatches(), Equals, 2)
2683-
2684- zk.Close()
2685-
2686- c.Assert(gozk.CountPendingWatches(), Equals, 0)
2687+ c.Check(zk.CountPendingWatches(), Equals, 0)
2688+
2689+ conn, _ := s.init(c)
2690+
2691+ c.Check(zk.CountPendingWatches(), Equals, 1)
2692+
2693+ _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2694+ c.Assert(err, IsNil)
2695+
2696+ _, _, _, err = conn.GetW("/test")
2697+ c.Assert(err, IsNil)
2698+
2699+ c.Assert(zk.CountPendingWatches(), Equals, 2)
2700+
2701+ conn.Close()
2702+
2703+ c.Assert(zk.CountPendingWatches(), Equals, 0)
2704 }
2705
2706 // By default, the ZooKeeper C client will hang indefinitely if a
2707 // handler is closed twice. We get in the way and prevent it.
2708 func (s *S) TestClosingTwiceDoesntHang(c *C) {
2709- zk, _ := s.init(c)
2710- err := zk.Close()
2711+ conn, _ := s.init(c)
2712+ err := conn.Close()
2713 c.Assert(err, IsNil)
2714- err = zk.Close()
2715+ err = conn.Close()
2716 c.Assert(err, NotNil)
2717- c.Assert(err.Code(), Equals, gozk.ZCLOSING)
2718+ c.Assert(err, Equals, zk.ZCLOSING)
2719 }
2720
2721 func (s *S) TestChildren(c *C) {
2722- zk, _ := s.init(c)
2723+ conn, _ := s.init(c)
2724
2725- children, stat, err := zk.Children("/")
2726+ children, stat, err := conn.Children("/")
2727 c.Assert(err, IsNil)
2728 c.Assert(children, Equals, []string{"zookeeper"})
2729 c.Assert(stat.NumChildren(), Equals, int32(1))
2730
2731- children, stat, err = zk.Children("/non-existent")
2732+ children, stat, err = conn.Children("/non-existent")
2733 c.Assert(err, NotNil)
2734- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2735+ c.Assert(err, Equals, zk.ZNONODE)
2736 c.Assert(children, Equals, []string{})
2737- c.Assert(stat, Equals, nil)
2738+ c.Assert(stat, IsNil)
2739 }
2740
2741 func (s *S) TestChildrenAndWatch(c *C) {
2742- c.Check(gozk.CountPendingWatches(), Equals, 0)
2743-
2744- zk, _ := s.init(c)
2745-
2746- c.Check(gozk.CountPendingWatches(), Equals, 1)
2747-
2748- children, stat, watch, err := zk.ChildrenW("/")
2749+ c.Check(zk.CountPendingWatches(), Equals, 0)
2750+
2751+ conn, _ := s.init(c)
2752+
2753+ c.Check(zk.CountPendingWatches(), Equals, 1)
2754+
2755+ children, stat, watch, err := conn.ChildrenW("/")
2756 c.Assert(err, IsNil)
2757 c.Assert(children, Equals, []string{"zookeeper"})
2758 c.Assert(stat.NumChildren(), Equals, int32(1))
2759@@ -321,18 +324,18 @@
2760 default:
2761 }
2762
2763- c.Check(gozk.CountPendingWatches(), Equals, 2)
2764+ c.Check(zk.CountPendingWatches(), Equals, 2)
2765
2766- _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2767+ _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2768 c.Assert(err, IsNil)
2769
2770 event := <-watch
2771- c.Assert(event.Type, Equals, gozk.EVENT_CHILD)
2772+ c.Assert(event.Type, Equals, zk.EVENT_CHILD)
2773 c.Assert(event.Path, Equals, "/")
2774
2775- c.Check(gozk.CountPendingWatches(), Equals, 1)
2776+ c.Check(zk.CountPendingWatches(), Equals, 1)
2777
2778- children, stat, watch, err = zk.ChildrenW("/")
2779+ children, stat, watch, err = conn.ChildrenW("/")
2780 c.Assert(err, IsNil)
2781 c.Assert(stat.NumChildren(), Equals, int32(2))
2782
2783@@ -345,57 +348,56 @@
2784 default:
2785 }
2786
2787- c.Check(gozk.CountPendingWatches(), Equals, 2)
2788+ c.Check(zk.CountPendingWatches(), Equals, 2)
2789
2790- _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2791+ _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2792 c.Assert(err, IsNil)
2793
2794 event = <-watch
2795- c.Assert(event.Type, Equals, gozk.EVENT_CHILD)
2796+ c.Assert(event.Type, Equals, zk.EVENT_CHILD)
2797
2798- c.Check(gozk.CountPendingWatches(), Equals, 1)
2799+ c.Check(zk.CountPendingWatches(), Equals, 1)
2800 }
2801
2802 func (s *S) TestChildrenAndWatchWithError(c *C) {
2803- c.Check(gozk.CountPendingWatches(), Equals, 0)
2804-
2805- zk, _ := s.init(c)
2806-
2807- c.Check(gozk.CountPendingWatches(), Equals, 1)
2808-
2809- _, stat, watch, err := zk.ChildrenW("/test")
2810+ c.Check(zk.CountPendingWatches(), Equals, 0)
2811+
2812+ conn, _ := s.init(c)
2813+
2814+ c.Check(zk.CountPendingWatches(), Equals, 1)
2815+
2816+ _, stat, watch, err := conn.ChildrenW("/test")
2817 c.Assert(err, NotNil)
2818- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2819+ c.Assert(err, Equals, zk.ZNONODE)
2820 c.Assert(watch, IsNil)
2821 c.Assert(stat, IsNil)
2822
2823- c.Check(gozk.CountPendingWatches(), Equals, 1)
2824+ c.Check(zk.CountPendingWatches(), Equals, 1)
2825 }
2826
2827 func (s *S) TestExists(c *C) {
2828- zk, _ := s.init(c)
2829-
2830- stat, err := zk.Exists("/zookeeper")
2831- c.Assert(err, IsNil)
2832- c.Assert(stat.NumChildren(), Equals, int32(1))
2833-
2834- stat, err = zk.Exists("/non-existent")
2835+ conn, _ := s.init(c)
2836+
2837+ stat, err := conn.Exists("/non-existent")
2838 c.Assert(err, IsNil)
2839 c.Assert(stat, IsNil)
2840+
2841+ stat, err = conn.Exists("/zookeeper")
2842+ c.Assert(err, IsNil)
2843 }
2844
2845 func (s *S) TestExistsAndWatch(c *C) {
2846- c.Check(gozk.CountPendingWatches(), Equals, 0)
2847-
2848- zk, _ := s.init(c)
2849-
2850- c.Check(gozk.CountPendingWatches(), Equals, 1)
2851-
2852- stat, watch, err := zk.ExistsW("/test")
2853+ c.Check(zk.CountPendingWatches(), Equals, 0)
2854+
2855+ conn, _ := s.init(c)
2856+
2857+ c.Check(zk.CountPendingWatches(), Equals, 1)
2858+
2859+ stat, watch, err := conn.ExistsW("/test")
2860 c.Assert(err, IsNil)
2861 c.Assert(stat, IsNil)
2862
2863- c.Check(gozk.CountPendingWatches(), Equals, 2)
2864+ c.Check(zk.CountPendingWatches(), Equals, 2)
2865
2866 select {
2867 case <-watch:
2868@@ -403,62 +405,62 @@
2869 default:
2870 }
2871
2872- _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2873+ _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2874 c.Assert(err, IsNil)
2875
2876 event := <-watch
2877- c.Assert(event.Type, Equals, gozk.EVENT_CREATED)
2878+ c.Assert(event.Type, Equals, zk.EVENT_CREATED)
2879 c.Assert(event.Path, Equals, "/test")
2880
2881- c.Check(gozk.CountPendingWatches(), Equals, 1)
2882+ c.Check(zk.CountPendingWatches(), Equals, 1)
2883
2884- stat, watch, err = zk.ExistsW("/test")
2885+ stat, watch, err = conn.ExistsW("/test")
2886 c.Assert(err, IsNil)
2887 c.Assert(stat, NotNil)
2888 c.Assert(stat.NumChildren(), Equals, int32(0))
2889
2890- c.Check(gozk.CountPendingWatches(), Equals, 2)
2891+ c.Check(zk.CountPendingWatches(), Equals, 2)
2892 }
2893
2894 func (s *S) TestExistsAndWatchWithError(c *C) {
2895- c.Check(gozk.CountPendingWatches(), Equals, 0)
2896-
2897- zk, _ := s.init(c)
2898-
2899- c.Check(gozk.CountPendingWatches(), Equals, 1)
2900-
2901- stat, watch, err := zk.ExistsW("///")
2902+ c.Check(zk.CountPendingWatches(), Equals, 0)
2903+
2904+ conn, _ := s.init(c)
2905+
2906+ c.Check(zk.CountPendingWatches(), Equals, 1)
2907+
2908+ stat, watch, err := conn.ExistsW("///")
2909 c.Assert(err, NotNil)
2910- c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS)
2911+ c.Assert(err, Equals, zk.ZBADARGUMENTS)
2912 c.Assert(stat, IsNil)
2913 c.Assert(watch, IsNil)
2914
2915- c.Check(gozk.CountPendingWatches(), Equals, 1)
2916+ c.Check(zk.CountPendingWatches(), Equals, 1)
2917 }
2918
2919 func (s *S) TestDelete(c *C) {
2920- zk, _ := s.init(c)
2921-
2922- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2923- c.Assert(err, IsNil)
2924-
2925- err = zk.Delete("/test", 5)
2926- c.Assert(err, NotNil)
2927- c.Assert(err.Code(), Equals, gozk.ZBADVERSION)
2928-
2929- err = zk.Delete("/test", -1)
2930- c.Assert(err, IsNil)
2931-
2932- err = zk.Delete("/test", -1)
2933- c.Assert(err, NotNil)
2934- c.Assert(err.Code(), Equals, gozk.ZNONODE)
2935+ conn, _ := s.init(c)
2936+
2937+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2938+ c.Assert(err, IsNil)
2939+
2940+ err = conn.Delete("/test", 5)
2941+ c.Assert(err, NotNil)
2942+ c.Assert(err, Equals, zk.ZBADVERSION)
2943+
2944+ err = conn.Delete("/test", -1)
2945+ c.Assert(err, IsNil)
2946+
2947+ err = conn.Delete("/test", -1)
2948+ c.Assert(err, NotNil)
2949+ c.Assert(err, Equals, zk.ZNONODE)
2950 }
2951
2952 func (s *S) TestClientIdAndReInit(c *C) {
2953 zk1, _ := s.init(c)
2954 clientId1 := zk1.ClientId()
2955
2956- zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1)
2957+ zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1)
2958 c.Assert(err, IsNil)
2959 defer zk2.Close()
2960 clientId2 := zk2.ClientId()
2961@@ -469,110 +471,114 @@
2962 // Surprisingly for some (including myself, initially), the watch
2963 // returned by the exists method actually fires on data changes too.
2964 func (s *S) TestExistsWatchOnDataChange(c *C) {
2965- zk, _ := s.init(c)
2966-
2967- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2968- c.Assert(err, IsNil)
2969-
2970- _, watch, err := zk.ExistsW("/test")
2971- c.Assert(err, IsNil)
2972-
2973- _, err = zk.Set("/test", "new", -1)
2974+ conn, _ := s.init(c)
2975+
2976+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
2977+ c.Assert(err, IsNil)
2978+
2979+ _, watch, err := conn.ExistsW("/test")
2980+ c.Assert(err, IsNil)
2981+
2982+ _, err = conn.Set("/test", "new", -1)
2983 c.Assert(err, IsNil)
2984
2985 event := <-watch
2986
2987 c.Assert(event.Path, Equals, "/test")
2988- c.Assert(event.Type, Equals, gozk.EVENT_CHANGED)
2989+ c.Assert(event.Type, Equals, zk.EVENT_CHANGED)
2990 }
2991
2992 func (s *S) TestACL(c *C) {
2993- zk, _ := s.init(c)
2994-
2995- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
2996- c.Assert(err, IsNil)
2997-
2998- acl, stat, err := zk.ACL("/test")
2999- c.Assert(err, IsNil)
3000- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL))
3001+ conn, _ := s.init(c)
3002+
3003+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3004+ c.Assert(err, IsNil)
3005+
3006+ acl, stat, err := conn.ACL("/test")
3007+ c.Assert(err, IsNil)
3008+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
3009 c.Assert(stat, NotNil)
3010 c.Assert(stat.Version(), Equals, int32(0))
3011
3012- acl, stat, err = zk.ACL("/non-existent")
3013+ acl, stat, err = conn.ACL("/non-existent")
3014 c.Assert(err, NotNil)
3015- c.Assert(err.Code(), Equals, gozk.ZNONODE)
3016+ c.Assert(err, Equals, zk.ZNONODE)
3017 c.Assert(acl, IsNil)
3018 c.Assert(stat, IsNil)
3019 }
3020
3021 func (s *S) TestSetACL(c *C) {
3022- zk, _ := s.init(c)
3023+ conn, _ := s.init(c)
3024
3025- _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3026+ _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3027 c.Assert(err, IsNil)
3028
3029- err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5)
3030+ err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5)
3031 c.Assert(err, NotNil)
3032- c.Assert(err.Code(), Equals, gozk.ZBADVERSION)
3033-
3034- err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1)
3035- c.Assert(err, IsNil)
3036-
3037- acl, _, err := zk.ACL("/test")
3038- c.Assert(err, IsNil)
3039- c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ))
3040+ c.Assert(err, Equals, zk.ZBADVERSION)
3041+
3042+ err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1)
3043+ c.Assert(err, IsNil)
3044+
3045+ acl, _, err := conn.ACL("/test")
3046+ c.Assert(err, IsNil)
3047+ c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
3048 }
3049
3050 func (s *S) TestAddAuth(c *C) {
3051- zk, _ := s.init(c)
3052-
3053- acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
3054-
3055- _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl)
3056+ conn, _ := s.init(c)
3057+
3058+ acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}}
3059+
3060+ _, err := conn.Create("/test", "", zk.EPHEMERAL, acl)
3061 c.Assert(err, IsNil)
3062
3063- _, _, err = zk.Get("/test")
3064+ _, _, err = conn.Get("/test")
3065 c.Assert(err, NotNil)
3066- c.Assert(err.Code(), Equals, gozk.ZNOAUTH)
3067+ c.Assert(err, Equals, zk.ZNOAUTH)
3068
3069- err = zk.AddAuth("digest", "joe:passwd")
3070+ err = conn.AddAuth("digest", "joe:passwd")
3071 c.Assert(err, IsNil)
3072
3073- _, _, err = zk.Get("/test")
3074+ _, _, err = conn.Get("/test")
3075 c.Assert(err, IsNil)
3076 }
3077
3078 func (s *S) TestWatchOnReconnection(c *C) {
3079- c.Check(gozk.CountPendingWatches(), Equals, 0)
3080+ c.Check(zk.CountPendingWatches(), Equals, 0)
3081
3082- zk, session := s.init(c)
3083+ conn, session := s.init(c)
3084
3085 event := <-session
3086- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
3087- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
3088-
3089- c.Check(gozk.CountPendingWatches(), Equals, 1)
3090-
3091- stat, watch, err := zk.ExistsW("/test")
3092+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
3093+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
3094+
3095+ c.Check(zk.CountPendingWatches(), Equals, 1)
3096+
3097+ stat, watch, err := conn.ExistsW("/test")
3098 c.Assert(err, IsNil)
3099 c.Assert(stat, IsNil)
3100
3101- c.Check(gozk.CountPendingWatches(), Equals, 2)
3102-
3103- s.StopZK()
3104+ c.Check(zk.CountPendingWatches(), Equals, 2)
3105+
3106+ err = s.zkServer.Stop()
3107+ c.Assert(err, IsNil)
3108+
3109 time.Sleep(2e9)
3110- s.StartZK()
3111
3112- // The session channel should receive the reconnection notification,
3113+ err = s.zkServer.Start()
3114+ c.Assert(err, IsNil)
3115+
3116+ // The session channel should receive the reconnection notification.
3117 select {
3118 case event := <-session:
3119- c.Assert(event.State, Equals, gozk.STATE_CONNECTING)
3120+ c.Assert(event.State, Equals, zk.STATE_CONNECTING)
3121 case <-time.After(3e9):
3122 c.Fatal("Session watch didn't fire")
3123 }
3124 select {
3125 case event := <-session:
3126- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
3127+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
3128 case <-time.After(3e9):
3129 c.Fatal("Session watch didn't fire")
3130 }
3131@@ -585,40 +591,40 @@
3132 }
3133
3134 // And it should still work.
3135- _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL))
3136+ _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL))
3137 c.Assert(err, IsNil)
3138
3139 event = <-watch
3140- c.Assert(event.Type, Equals, gozk.EVENT_CREATED)
3141+ c.Assert(event.Type, Equals, zk.EVENT_CREATED)
3142 c.Assert(event.Path, Equals, "/test")
3143
3144- c.Check(gozk.CountPendingWatches(), Equals, 1)
3145+ c.Check(zk.CountPendingWatches(), Equals, 1)
3146 }
3147
3148 func (s *S) TestWatchOnSessionExpiration(c *C) {
3149- c.Check(gozk.CountPendingWatches(), Equals, 0)
3150+ c.Check(zk.CountPendingWatches(), Equals, 0)
3151
3152- zk, session := s.init(c)
3153+ conn, session := s.init(c)
3154
3155 event := <-session
3156- c.Assert(event.Type, Equals, gozk.EVENT_SESSION)
3157- c.Assert(event.State, Equals, gozk.STATE_CONNECTED)
3158-
3159- c.Check(gozk.CountPendingWatches(), Equals, 1)
3160-
3161- stat, watch, err := zk.ExistsW("/test")
3162+ c.Assert(event.Type, Equals, zk.EVENT_SESSION)
3163+ c.Assert(event.State, Equals, zk.STATE_CONNECTED)
3164+
3165+ c.Check(zk.CountPendingWatches(), Equals, 1)
3166+
3167+ stat, watch, err := conn.ExistsW("/test")
3168 c.Assert(err, IsNil)
3169 c.Assert(stat, IsNil)
3170
3171- c.Check(gozk.CountPendingWatches(), Equals, 2)
3172+ c.Check(zk.CountPendingWatches(), Equals, 2)
3173
3174 // Use expiration trick described in the FAQ.
3175- clientId := zk.ClientId()
3176- zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId)
3177+ clientId := conn.ClientId()
3178+ zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId)
3179
3180 for event := range session2 {
3181 c.Log("Event from overlapping session: ", event)
3182- if event.State == gozk.STATE_CONNECTED {
3183+ if event.State == zk.STATE_CONNECTED {
3184 // Wait for zk to process the connection.
3185 // Not reliable without this. :-(
3186 time.Sleep(1e9)
3187@@ -627,21 +633,21 @@
3188 }
3189 for event := range session {
3190 c.Log("Event from primary session: ", event)
3191- if event.State == gozk.STATE_EXPIRED_SESSION {
3192+ if event.State == zk.STATE_EXPIRED_SESSION {
3193 break
3194 }
3195 }
3196
3197 select {
3198 case event := <-watch:
3199- c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION)
3200+ c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION)
3201 case <-time.After(3e9):
3202 c.Fatal("Watch event didn't fire")
3203 }
3204
3205 event = <-watch
3206- c.Assert(event.Type, Equals, gozk.EVENT_CLOSED)
3207- c.Assert(event.State, Equals, gozk.STATE_CLOSED)
3208+ c.Assert(event.Type, Equals, zk.EVENT_CLOSED)
3209+ c.Assert(event.State, Equals, zk.STATE_CLOSED)
3210
3211- c.Check(gozk.CountPendingWatches(), Equals, 1)
3212+ c.Check(zk.CountPendingWatches(), Equals, 1)
3213 }

Subscribers

People subscribed via source and target branches

to all changes: