Merge lp:~rogpeppe/gozk/update-documentation into lp:~juju/gozk/trunk
- update-documentation
- Merge into trunk
Status: | Rejected | ||||
---|---|---|---|---|---|
Rejected by: | Gustavo Niemeyer | ||||
Proposed branch: | lp:~rogpeppe/gozk/update-documentation | ||||
Merge into: | lp:~juju/gozk/trunk | ||||
Diff against target: |
3213 lines (+1334/-814) 10 files modified
Makefile (+5/-2) example/example.go (+22/-23) reattach_test.go (+202/-0) retry_test.go (+65/-74) server.go (+239/-0) service/Makefile (+20/-0) service/service.go (+160/-0) suite_test.go (+45/-122) zk.go (+318/-341) zk_test.go (+258/-252) |
||||
To merge this branch: | bzr merge lp:~rogpeppe/gozk/update-documentation | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+77980@code.launchpad.net |
Commit message
Description of the change
Update documentation.
Add more complete description of event delivery and comments
on Stat field accessors.
Gustavo Niemeyer (niemeyer) wrote : | # |
Roger Peppe (rogpeppe) wrote : | # |
this was a while ago. i didn't know how to handle the review system...
i think the intended changes are still worth making. i'll make a CL.
i could also use that proposal to change appropriate methods
to return a time.Time if that seems reasonable (e.g. Stat.CTime etc)
On 10 February 2012 12:27, Gustavo Niemeyer <email address hidden> wrote:
> The proposal to merge lp:~rogpeppe/gozk/update-documentation into lp:gozk has been updated.
>
> Status: Needs review => Rejected
>
> For more details, see:
> https:/
> --
> https:/
> You are the owner of lp:~rogpeppe/gozk/update-documentation.
Unmerged revisions
- 24. By Roger Peppe
-
Update documentation to include more accurate
description of events and comments on stat field accessors. - 23. By Roger Peppe
-
Factored out server running code into a separate package.
- 22. By Roger Peppe
-
Fix Server code.
- 21. By Gustavo Niemeyer
-
The package location is now http://
launchpad. net/gozk/ zk. The shorter package name makes for significantly more digestible line lengths.
- 20. By Gustavo Niemeyer
-
Merged clean-up-interface branch, by Roger Peppe [r=niemeyer]
This branch does a number of incompatible changes that improve
the overall API of gozk.
Preview Diff
1 | === modified file 'Makefile' |
2 | --- Makefile 2011-08-03 01:56:58 +0000 |
3 | +++ Makefile 2011-10-03 17:02:23 +0000 |
4 | @@ -2,10 +2,13 @@ |
5 | |
6 | all: package |
7 | |
8 | -TARG=gozk |
9 | +TARG=launchpad.net/gozk/zk |
10 | |
11 | +GOFILES=\ |
12 | + server.go\ |
13 | + |
14 | CGOFILES=\ |
15 | - gozk.go\ |
16 | + zk.go\ |
17 | |
18 | CGO_OFILES=\ |
19 | helpers.o\ |
20 | |
21 | === added directory 'example' |
22 | === renamed file 'example.go' => 'example/example.go' |
23 | --- example.go 2011-08-03 01:47:25 +0000 |
24 | +++ example/example.go 2011-10-03 17:02:23 +0000 |
25 | @@ -1,30 +1,29 @@ |
26 | package main |
27 | |
28 | import ( |
29 | - "gozk" |
30 | + "launchpad.net/zookeeper/zookeeper" |
31 | ) |
32 | |
33 | func main() { |
34 | - zk, session, err := gozk.Init("localhost:2181", 5000) |
35 | - if err != nil { |
36 | - println("Couldn't connect: " + err.String()) |
37 | - return |
38 | - } |
39 | - |
40 | - defer zk.Close() |
41 | - |
42 | - // Wait for connection. |
43 | - event := <-session |
44 | - if event.State != gozk.STATE_CONNECTED { |
45 | - println("Couldn't connect") |
46 | - return |
47 | - } |
48 | - |
49 | - _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL)) |
50 | - if err != nil { |
51 | - println(err.String()) |
52 | - } else { |
53 | - println("Created!") |
54 | - } |
55 | + zk, session, err := zookeeper.Init("localhost:2181", 5000) |
56 | + if err != nil { |
57 | + println("Couldn't connect: " + err.String()) |
58 | + return |
59 | + } |
60 | + |
61 | + defer zk.Close() |
62 | + |
63 | + // Wait for connection. |
64 | + event := <-session |
65 | + if event.State != zookeeper.STATE_CONNECTED { |
66 | + println("Couldn't connect") |
67 | + return |
68 | + } |
69 | + |
70 | + _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
71 | + if err != nil { |
72 | + println(err.String()) |
73 | + } else { |
74 | + println("Created!") |
75 | + } |
76 | } |
77 | - |
78 | |
79 | === added file 'reattach_test.go' |
80 | --- reattach_test.go 1970-01-01 00:00:00 +0000 |
81 | +++ reattach_test.go 2011-10-03 17:02:23 +0000 |
82 | @@ -0,0 +1,202 @@ |
83 | +package zk_test |
84 | + |
85 | +import ( |
86 | + "bufio" |
87 | + . "launchpad.net/gocheck" |
88 | + "launchpad.net/gozk/zk/service" |
89 | + "launchpad.net/gozk/zk" |
90 | + "exec" |
91 | + "flag" |
92 | + "fmt" |
93 | + "os" |
94 | + "strings" |
95 | + "testing" |
96 | + "time" |
97 | +) |
98 | + |
99 | +var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing") |
100 | +var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing") |
101 | +var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing") |
102 | + |
103 | +// This is the reentrancy point for testing ZooKeeper servers |
104 | +// started by processes that are not direct children of the |
105 | +// testing process. This test always succeeds - the status |
106 | +// will be written to stdout and read by indirectServer. |
107 | +func TestStartNonChildServer(t *testing.T) { |
108 | + if !*reattach { |
109 | + // not re-entrant, so ignore this test. |
110 | + return |
111 | + } |
112 | + err := startServer(*reattachRunDir, *reattachAbnormalStop) |
113 | + if err != nil { |
114 | + fmt.Printf("error:%v\n", err) |
115 | + return |
116 | + } |
117 | + fmt.Printf("done\n") |
118 | +} |
119 | + |
120 | +func (s *S) startServer(c *C, abort bool) { |
121 | + err := startServer(s.zkTestRoot, abort) |
122 | + c.Assert(err, IsNil) |
123 | +} |
124 | + |
125 | +// startServerIndirect starts a ZooKeeper server that is not |
126 | +// a direct child of the current process. If abort is true, |
127 | +// the server will be started and then terminated abnormally. |
128 | +func (s *S) startServerIndirect(c *C, abort bool) { |
129 | + if len(os.Args) == 0 { |
130 | + c.Fatal("Cannot find self executable name") |
131 | + } |
132 | + cmd := exec.Command( |
133 | + os.Args[0], |
134 | + "-zktest.reattach", |
135 | + "-zktest.rundir", s.zkTestRoot, |
136 | + "-zktest.stop=", fmt.Sprint(abort), |
137 | + "-test.run", "StartNonChildServer", |
138 | + ) |
139 | + r, err := cmd.StdoutPipe() |
140 | + c.Assert(err, IsNil) |
141 | + defer r.Close() |
142 | + if err := cmd.Start(); err != nil { |
143 | + c.Fatalf("cannot start re-entrant gotest process: %v", err) |
144 | + } |
145 | + defer cmd.Wait() |
146 | + bio := bufio.NewReader(r) |
147 | + for { |
148 | + line, err := bio.ReadSlice('\n') |
149 | + if err != nil { |
150 | + c.Fatalf("indirect server status line not found: %v", err) |
151 | + } |
152 | + s := string(line) |
153 | + if strings.HasPrefix(s, "error:") { |
154 | + c.Fatalf("indirect server error: %s", s[len("error:"):]) |
155 | + } |
156 | + if s == "done\n" { |
157 | + return |
158 | + } |
159 | + } |
160 | + panic("not reached") |
161 | +} |
162 | + |
163 | +// startServer starts a ZooKeeper server, and terminates it abnormally |
164 | +// if abort is true. |
165 | +func startServer(runDir string, abort bool) os.Error { |
166 | + s, err := zk.AttachServer(runDir) |
167 | + if err != nil { |
168 | + return fmt.Errorf("cannot attach to server at %q: %v", runDir, err) |
169 | + } |
170 | + srv := service.New(s) |
171 | + if err := srv.Start(); err != nil { |
172 | + return fmt.Errorf("cannot start server: %v", err) |
173 | + } |
174 | + if abort { |
175 | + // Give it time to start up, then kill the server process abnormally, |
176 | + // leaving the pid.txt file behind. |
177 | + time.Sleep(0.5e9) |
178 | + p, err := srv.Process() |
179 | + if err != nil { |
180 | + return fmt.Errorf("cannot get server process: %v", err) |
181 | + } |
182 | + defer p.Release() |
183 | + if err := p.Kill(); err != nil { |
184 | + return fmt.Errorf("cannot kill server process: %v", err) |
185 | + } |
186 | + } |
187 | + return nil |
188 | +} |
189 | + |
190 | +func (s *S) checkCookie(c *C) { |
191 | + conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
192 | + c.Assert(err, IsNil) |
193 | + |
194 | + e, ok := <-watch |
195 | + c.Assert(ok, Equals, true) |
196 | + c.Assert(e.Ok(), Equals, true) |
197 | + |
198 | + c.Assert(err, IsNil) |
199 | + cookie, _, err := conn.Get("/testAttachCookie") |
200 | + c.Assert(err, IsNil) |
201 | + c.Assert(cookie, Equals, "testAttachCookie") |
202 | + conn.Close() |
203 | +} |
204 | + |
205 | +// cases to test: |
206 | +// child server, stopped normally; reattach, start |
207 | +// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start |
208 | +// non-direct child server, still running; reattach, start (->error), stop, start |
209 | +// child server, still running; reattach, start (-> error) |
210 | +// child server, still running; reattach, stop, start. |
211 | +// non-direct child server, still running; reattach, stop, start. |
212 | +func (s *S) TestAttachServer(c *C) { |
213 | + // Create a cookie so that we know we are reattaching to the same instance. |
214 | + conn, _ := s.init(c) |
215 | + _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL)) |
216 | + c.Assert(err, IsNil) |
217 | + s.checkCookie(c) |
218 | + s.zkServer.Stop() |
219 | + s.zkServer = nil |
220 | + |
221 | + s.testAttachServer(c, (*S).startServer) |
222 | + s.testAttachServer(c, (*S).startServerIndirect) |
223 | + s.testAttachServerAbnormalTerminate(c, (*S).startServer) |
224 | + s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect) |
225 | + |
226 | + srv, err := zk.AttachServer(s.zkTestRoot) |
227 | + c.Assert(err, IsNil) |
228 | + |
229 | + s.zkServer = service.New(srv) |
230 | + err = s.zkServer.Start() |
231 | + c.Assert(err, IsNil) |
232 | + |
233 | + conn, _ = s.init(c) |
234 | + err = conn.Delete("/testAttachCookie", -1) |
235 | + c.Assert(err, IsNil) |
236 | +} |
237 | + |
238 | +func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) { |
239 | + start(s, c, false) |
240 | + |
241 | + s.checkCookie(c) |
242 | + |
243 | + // try attaching to it while it is still running - it should fail. |
244 | + a, err := zk.AttachServer(s.zkTestRoot) |
245 | + c.Assert(err, IsNil) |
246 | + srv := service.New(a) |
247 | + |
248 | + err = srv.Start() |
249 | + c.Assert(err, NotNil) |
250 | + |
251 | + // stop it and then start it again - it should succeed. |
252 | + err = srv.Stop() |
253 | + c.Assert(err, IsNil) |
254 | + |
255 | + err = srv.Start() |
256 | + c.Assert(err, IsNil) |
257 | + |
258 | + s.checkCookie(c) |
259 | + |
260 | + err = srv.Stop() |
261 | + c.Assert(err, IsNil) |
262 | +} |
263 | + |
264 | +func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) { |
265 | + start(s, c, true) |
266 | + |
267 | + // try attaching to it and starting - it should fail, because pid.txt |
268 | + // won't have been removed. |
269 | + a, err := zk.AttachServer(s.zkTestRoot) |
270 | + c.Assert(err, IsNil) |
271 | + srv := service.New(a) |
272 | + err = srv.Start() |
273 | + c.Assert(err, NotNil) |
274 | + |
275 | + // stopping it should bring things back to normal. |
276 | + err = srv.Stop() |
277 | + c.Assert(err, IsNil) |
278 | + err = srv.Start() |
279 | + c.Assert(err, IsNil) |
280 | + |
281 | + s.checkCookie(c) |
282 | + err = srv.Stop() |
283 | + c.Assert(err, IsNil) |
284 | +} |
285 | \ No newline at end of file |
286 | |
287 | === modified file 'retry_test.go' |
288 | --- retry_test.go 2011-08-19 01:51:37 +0000 |
289 | +++ retry_test.go 2011-10-03 17:02:23 +0000 |
290 | @@ -1,42 +1,41 @@ |
291 | -package gozk_test |
292 | +package zk_test |
293 | |
294 | import ( |
295 | . "launchpad.net/gocheck" |
296 | - "gozk" |
297 | + "launchpad.net/gozk/zk" |
298 | "os" |
299 | ) |
300 | |
301 | func (s *S) TestRetryChangeCreating(c *C) { |
302 | - zk, _ := s.init(c) |
303 | + conn, _ := s.init(c) |
304 | |
305 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
306 | - func(data string, stat gozk.Stat) (string, os.Error) { |
307 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
308 | + func(data string, stat *zk.Stat) (string, os.Error) { |
309 | c.Assert(data, Equals, "") |
310 | c.Assert(stat, IsNil) |
311 | return "new", nil |
312 | }) |
313 | c.Assert(err, IsNil) |
314 | |
315 | - data, stat, err := zk.Get("/test") |
316 | + data, stat, err := conn.Get("/test") |
317 | c.Assert(err, IsNil) |
318 | c.Assert(stat, NotNil) |
319 | c.Assert(stat.Version(), Equals, int32(0)) |
320 | c.Assert(data, Equals, "new") |
321 | |
322 | - acl, _, err := zk.ACL("/test") |
323 | + acl, _, err := conn.ACL("/test") |
324 | c.Assert(err, IsNil) |
325 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
326 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
327 | } |
328 | |
329 | func (s *S) TestRetryChangeSetting(c *C) { |
330 | - zk, _ := s.init(c) |
331 | + conn, _ := s.init(c) |
332 | |
333 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
334 | - gozk.WorldACL(gozk.PERM_ALL)) |
335 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
336 | c.Assert(err, IsNil) |
337 | |
338 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
339 | - func(data string, stat gozk.Stat) (string, os.Error) { |
340 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
341 | + func(data string, stat *zk.Stat) (string, os.Error) { |
342 | c.Assert(data, Equals, "old") |
343 | c.Assert(stat, NotNil) |
344 | c.Assert(stat.Version(), Equals, int32(0)) |
345 | @@ -44,27 +43,26 @@ |
346 | }) |
347 | c.Assert(err, IsNil) |
348 | |
349 | - data, stat, err := zk.Get("/test") |
350 | + data, stat, err := conn.Get("/test") |
351 | c.Assert(err, IsNil) |
352 | c.Assert(stat, NotNil) |
353 | c.Assert(stat.Version(), Equals, int32(1)) |
354 | c.Assert(data, Equals, "brand new") |
355 | |
356 | // ACL was unchanged by RetryChange(). |
357 | - acl, _, err := zk.ACL("/test") |
358 | + acl, _, err := conn.ACL("/test") |
359 | c.Assert(err, IsNil) |
360 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
361 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
362 | } |
363 | |
364 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { |
365 | - zk, _ := s.init(c) |
366 | + conn, _ := s.init(c) |
367 | |
368 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
369 | - gozk.WorldACL(gozk.PERM_ALL)) |
370 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
371 | c.Assert(err, IsNil) |
372 | |
373 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
374 | - func(data string, stat gozk.Stat) (string, os.Error) { |
375 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
376 | + func(data string, stat *zk.Stat) (string, os.Error) { |
377 | c.Assert(data, Equals, "old") |
378 | c.Assert(stat, NotNil) |
379 | c.Assert(stat.Version(), Equals, int32(0)) |
380 | @@ -72,7 +70,7 @@ |
381 | }) |
382 | c.Assert(err, IsNil) |
383 | |
384 | - data, stat, err := zk.Get("/test") |
385 | + data, stat, err := conn.Get("/test") |
386 | c.Assert(err, IsNil) |
387 | c.Assert(stat, NotNil) |
388 | c.Assert(stat.Version(), Equals, int32(0)) // Unchanged! |
389 | @@ -80,14 +78,14 @@ |
390 | } |
391 | |
392 | func (s *S) TestRetryChangeConflictOnCreate(c *C) { |
393 | - zk, _ := s.init(c) |
394 | + conn, _ := s.init(c) |
395 | |
396 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
397 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
398 | switch data { |
399 | case "": |
400 | c.Assert(stat, IsNil) |
401 | - _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL, |
402 | - gozk.WorldACL(gozk.PERM_ALL)) |
403 | + _, err := conn.Create("/test", "conflict", zk.EPHEMERAL, |
404 | + zk.WorldACL(zk.PERM_ALL)) |
405 | c.Assert(err, IsNil) |
406 | return "<none> => conflict", nil |
407 | case "conflict": |
408 | @@ -100,11 +98,10 @@ |
409 | return "can't happen", nil |
410 | } |
411 | |
412 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
413 | - changeFunc) |
414 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc) |
415 | c.Assert(err, IsNil) |
416 | |
417 | - data, stat, err := zk.Get("/test") |
418 | + data, stat, err := conn.Get("/test") |
419 | c.Assert(err, IsNil) |
420 | c.Assert(data, Equals, "conflict => new") |
421 | c.Assert(stat, NotNil) |
422 | @@ -112,18 +109,17 @@ |
423 | } |
424 | |
425 | func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) { |
426 | - zk, _ := s.init(c) |
427 | + conn, _ := s.init(c) |
428 | |
429 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
430 | - gozk.WorldACL(gozk.PERM_ALL)) |
431 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
432 | c.Assert(err, IsNil) |
433 | |
434 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
435 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
436 | switch data { |
437 | case "old": |
438 | c.Assert(stat, NotNil) |
439 | c.Assert(stat.Version(), Equals, int32(0)) |
440 | - _, err := zk.Set("/test", "conflict", 0) |
441 | + _, err := conn.Set("/test", "conflict", 0) |
442 | c.Assert(err, IsNil) |
443 | return "old => new", nil |
444 | case "conflict": |
445 | @@ -136,10 +132,10 @@ |
446 | return "can't happen", nil |
447 | } |
448 | |
449 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc) |
450 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc) |
451 | c.Assert(err, IsNil) |
452 | |
453 | - data, stat, err := zk.Get("/test") |
454 | + data, stat, err := conn.Get("/test") |
455 | c.Assert(err, IsNil) |
456 | c.Assert(data, Equals, "conflict => new") |
457 | c.Assert(stat, NotNil) |
458 | @@ -147,18 +143,17 @@ |
459 | } |
460 | |
461 | func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) { |
462 | - zk, _ := s.init(c) |
463 | + conn, _ := s.init(c) |
464 | |
465 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
466 | - gozk.WorldACL(gozk.PERM_ALL)) |
467 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
468 | c.Assert(err, IsNil) |
469 | |
470 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
471 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
472 | switch data { |
473 | case "old": |
474 | c.Assert(stat, NotNil) |
475 | c.Assert(stat.Version(), Equals, int32(0)) |
476 | - err := zk.Delete("/test", 0) |
477 | + err := conn.Delete("/test", 0) |
478 | c.Assert(err, IsNil) |
479 | return "old => <deleted>", nil |
480 | case "": |
481 | @@ -170,55 +165,53 @@ |
482 | return "can't happen", nil |
483 | } |
484 | |
485 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ), |
486 | - changeFunc) |
487 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc) |
488 | c.Assert(err, IsNil) |
489 | |
490 | - data, stat, err := zk.Get("/test") |
491 | + data, stat, err := conn.Get("/test") |
492 | c.Assert(err, IsNil) |
493 | c.Assert(data, Equals, "<deleted> => new") |
494 | c.Assert(stat, NotNil) |
495 | c.Assert(stat.Version(), Equals, int32(0)) |
496 | |
497 | // Should be the new ACL. |
498 | - acl, _, err := zk.ACL("/test") |
499 | + acl, _, err := conn.ACL("/test") |
500 | c.Assert(err, IsNil) |
501 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
502 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
503 | } |
504 | |
505 | func (s *S) TestRetryChangeErrorInCallback(c *C) { |
506 | - zk, _ := s.init(c) |
507 | + conn, _ := s.init(c) |
508 | |
509 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
510 | - func(data string, stat gozk.Stat) (string, os.Error) { |
511 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
512 | + func(data string, stat *zk.Stat) (string, os.Error) { |
513 | return "don't use this", os.NewError("BOOM!") |
514 | }) |
515 | c.Assert(err, NotNil) |
516 | - c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR) |
517 | c.Assert(err.String(), Equals, "BOOM!") |
518 | |
519 | - stat, err := zk.Exists("/test") |
520 | + stat, err := conn.Exists("/test") |
521 | c.Assert(err, IsNil) |
522 | c.Assert(stat, IsNil) |
523 | } |
524 | |
525 | func (s *S) TestRetryChangeFailsReading(c *C) { |
526 | - zk, _ := s.init(c) |
527 | + conn, _ := s.init(c) |
528 | |
529 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
530 | - gozk.WorldACL(gozk.PERM_WRITE)) // Write only! |
531 | + // Write only! |
532 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE)) |
533 | c.Assert(err, IsNil) |
534 | |
535 | var called bool |
536 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
537 | - func(data string, stat gozk.Stat) (string, os.Error) { |
538 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
539 | + func(data string, stat *zk.Stat) (string, os.Error) { |
540 | called = true |
541 | return "", nil |
542 | }) |
543 | c.Assert(err, NotNil) |
544 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
545 | + c.Assert(err, Equals, zk.ZNOAUTH) |
546 | |
547 | - stat, err := zk.Exists("/test") |
548 | + stat, err := conn.Exists("/test") |
549 | c.Assert(err, IsNil) |
550 | c.Assert(stat, NotNil) |
551 | c.Assert(stat.Version(), Equals, int32(0)) |
552 | @@ -227,22 +220,21 @@ |
553 | } |
554 | |
555 | func (s *S) TestRetryChangeFailsSetting(c *C) { |
556 | - zk, _ := s.init(c) |
557 | + conn, _ := s.init(c) |
558 | |
559 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
560 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
561 | + // Read only! |
562 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
563 | c.Assert(err, IsNil) |
564 | |
565 | var called bool |
566 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
567 | - func(data string, stat gozk.Stat) (string, os.Error) { |
568 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
569 | + func(data string, stat *zk.Stat) (string, os.Error) { |
570 | called = true |
571 | return "", nil |
572 | }) |
573 | - c.Assert(err, NotNil) |
574 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
575 | + c.Assert(err, Equals, zk.ZNOAUTH) |
576 | |
577 | - stat, err := zk.Exists("/test") |
578 | + stat, err := conn.Exists("/test") |
579 | c.Assert(err, IsNil) |
580 | c.Assert(stat, NotNil) |
581 | c.Assert(stat.Version(), Equals, int32(0)) |
582 | @@ -251,23 +243,22 @@ |
583 | } |
584 | |
585 | func (s *S) TestRetryChangeFailsCreating(c *C) { |
586 | - zk, _ := s.init(c) |
587 | + conn, _ := s.init(c) |
588 | |
589 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
590 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
591 | + // Read only! |
592 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
593 | c.Assert(err, IsNil) |
594 | |
595 | var called bool |
596 | - err = zk.RetryChange("/test/sub", gozk.EPHEMERAL, |
597 | - gozk.WorldACL(gozk.PERM_ALL), |
598 | - func(data string, stat gozk.Stat) (string, os.Error) { |
599 | + err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
600 | + func(data string, stat *zk.Stat) (string, os.Error) { |
601 | called = true |
602 | return "", nil |
603 | }) |
604 | c.Assert(err, NotNil) |
605 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
606 | + c.Assert(err, Equals, zk.ZNOAUTH) |
607 | |
608 | - stat, err := zk.Exists("/test/sub") |
609 | + stat, err := conn.Exists("/test/sub") |
610 | c.Assert(err, IsNil) |
611 | c.Assert(stat, IsNil) |
612 | |
613 | |
614 | === added file 'server.go' |
615 | --- server.go 1970-01-01 00:00:00 +0000 |
616 | +++ server.go 2011-10-03 17:02:23 +0000 |
617 | @@ -0,0 +1,239 @@ |
618 | +package zk |
619 | + |
620 | +import ( |
621 | + "bufio" |
622 | + "bytes" |
623 | + "fmt" |
624 | + "io/ioutil" |
625 | + "net" |
626 | + "os" |
627 | + "path/filepath" |
628 | + "strings" |
629 | +) |
630 | + |
631 | +// Server represents a ZooKeeper server, its data and configuration files. |
632 | +type Server struct { |
633 | + runDir string |
634 | + installDir string |
635 | +} |
636 | + |
637 | +func (srv *Server) Directory() string { |
638 | + return srv.runDir |
639 | +} |
640 | + |
641 | +// CreateServer creates the directory runDir and sets up a ZooKeeper server |
642 | +// environment inside it. It is an error if runDir already exists. |
643 | +// The server will listen on the specified TCP port. |
644 | +// |
645 | +// The ZooKeeper installation directory is specified by installDir. |
646 | +// If this is empty, a system default will be used. |
647 | +// |
648 | +// CreateServer does not start the server - the *Server that |
649 | +// is returned can be used with the service package, for example: |
650 | +// see launchpad.net/gozk/zk/service. |
651 | +func CreateServer(port int, runDir, installDir string) (*Server, os.Error) { |
652 | + if err := os.Mkdir(runDir, 0777); err != nil { |
653 | + return nil, err |
654 | + } |
655 | + srv := &Server{runDir: runDir, installDir: installDir} |
656 | + if err := srv.writeLog4JConfig(); err != nil { |
657 | + return nil, err |
658 | + } |
659 | + if err := srv.writeZooKeeperConfig(port); err != nil { |
660 | + return nil, err |
661 | + } |
662 | + if err := srv.writeInstallDir(); err != nil { |
663 | + return nil, err |
664 | + } |
665 | + return srv, nil |
666 | +} |
667 | + |
668 | +// AttachServer creates a new ZooKeeper Server instance |
669 | +// to operate inside an existing run directory, runDir. |
670 | +// The directory must have been created with CreateServer. |
671 | +func AttachServer(runDir string) (*Server, os.Error) { |
672 | + srv := &Server{runDir: runDir} |
673 | + if err := srv.readInstallDir(); err != nil { |
674 | + return nil, fmt.Errorf("cannot read server install directory: %v", err) |
675 | + } |
676 | + return srv, nil |
677 | +} |
678 | + |
679 | +func (srv *Server) path(name string) string { |
680 | + return filepath.Join(srv.runDir, name) |
681 | +} |
682 | + |
683 | +func (srv *Server) CheckAvailability() os.Error { |
684 | + port, err := srv.NetworkPort() |
685 | + if err != nil { |
686 | + return fmt.Errorf("cannot get network port: %v", err) |
687 | + } |
688 | + l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) |
689 | + if err != nil { |
690 | + return fmt.Errorf("cannot listen on port %v: %v", port, err) |
691 | + } |
692 | + l.Close() |
693 | + return nil |
694 | +} |
695 | + |
696 | +// ServerCommand returns the command used to start the |
697 | +// ZooKeeper server. It is provided for debugging and testing |
698 | +// purposes only. |
699 | +func (srv *Server) Command() ([]string, os.Error) { |
700 | + cp, err := srv.classPath() |
701 | + if err != nil { |
702 | + return nil, fmt.Errorf("cannot get class path: %v", err) |
703 | + } |
704 | + return []string{ |
705 | + "java", |
706 | + "-cp", strings.Join(cp, ":"), |
707 | + "-Dzookeeper.root.logger=INFO,CONSOLE", |
708 | + "-Dlog4j.configuration=file:"+srv.path("log4j.properties"), |
709 | + "org.apache.zookeeper.server.quorum.QuorumPeerMain", |
710 | + srv.path("zoo.cfg"), |
711 | + }, nil |
712 | +} |
713 | + |
714 | +var log4jProperties = ` |
715 | +log4j.rootLogger=INFO, CONSOLE |
716 | +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender |
717 | +log4j.appender.CONSOLE.Threshold=INFO |
718 | +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout |
719 | +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n |
720 | +` |
721 | + |
722 | +func (srv *Server) writeLog4JConfig() (err os.Error) { |
723 | + return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666) |
724 | +} |
725 | + |
726 | +func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) { |
727 | + return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf( |
728 | + "tickTime=2000\n"+ |
729 | + "dataDir=%s\n"+ |
730 | + "clientPort=%d\n"+ |
731 | + "maxClientCnxns=500\n", |
732 | + srv.runDir, port)), 0666) |
733 | +} |
734 | + |
735 | +// NetworkPort returns the TCP port number that |
736 | +// the server is configured for. |
737 | +func (srv *Server) NetworkPort() (int, os.Error) { |
738 | + f, err := os.Open(srv.path("zoo.cfg")) |
739 | + if err != nil { |
740 | + return 0, err |
741 | + } |
742 | + r := bufio.NewReader(f) |
743 | + for { |
744 | + line, err := r.ReadSlice('\n') |
745 | + if err != nil { |
746 | + return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg")) |
747 | + } |
748 | + var port int |
749 | + if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 { |
750 | + return port, nil |
751 | + } |
752 | + } |
753 | + panic("not reached") |
754 | +} |
755 | + |
756 | +func (srv *Server) writeInstallDir() os.Error { |
757 | + return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666) |
758 | +} |
759 | + |
760 | +func (srv *Server) readInstallDir() os.Error { |
761 | + data, err := ioutil.ReadFile(srv.path("installdir.txt")) |
762 | + if err != nil { |
763 | + return err |
764 | + } |
765 | + if data[len(data)-1] == '\n' { |
766 | + data = data[0 : len(data)-1] |
767 | + } |
768 | + srv.installDir = string(data) |
769 | + return nil |
770 | +} |
771 | + |
772 | +func (srv *Server) classPath() ([]string, os.Error) { |
773 | + dir := srv.installDir |
774 | + if dir == "" { |
775 | + return systemClassPath() |
776 | + } |
777 | + if err := checkDirectory(dir); err != nil { |
778 | + return nil, err |
779 | + } |
780 | + // Two possibilities, as seen in zkEnv.sh: |
781 | + // 1) locally built binaries (jars are in build directory) |
782 | + // 2) release binaries |
783 | + if build := filepath.Join(dir, "build"); checkDirectory(build) == nil { |
784 | + dir = build |
785 | + } |
786 | + classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar")) |
787 | + if err != nil { |
788 | + panic(fmt.Errorf("glob for jar files: %v", err)) |
789 | + } |
790 | + more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar")) |
791 | + if err != nil { |
792 | + panic(fmt.Errorf("glob for lib jar files: %v", err)) |
793 | + } |
794 | + |
795 | + classPath = append(classPath, more...) |
796 | + if len(classPath) == 0 { |
797 | + return nil, fmt.Errorf("zookeeper libraries not found in %q", dir) |
798 | + } |
799 | + return classPath, nil |
800 | +} |
801 | + |
802 | +const zookeeperEnviron = "/etc/zookeeper/conf/environment" |
803 | + |
804 | +func systemClassPath() ([]string, os.Error) { |
805 | + f, err := os.Open(zookeeperEnviron) |
806 | + if f == nil { |
807 | + return nil, err |
808 | + } |
809 | + r := bufio.NewReader(f) |
810 | + for { |
811 | + line, err := r.ReadSlice('\n') |
812 | + if err != nil { |
813 | + break |
814 | + } |
815 | + if !bytes.HasPrefix(line, []byte("CLASSPATH=")) { |
816 | + continue |
817 | + } |
818 | + |
819 | + // remove variable and newline |
820 | + path := string(line[len("CLASSPATH=") : len(line)-1]) |
821 | + |
822 | + // trim white space |
823 | + path = strings.Trim(path, " \t\r") |
824 | + |
825 | + // strip quotes |
826 | + if path[0] == '"' { |
827 | + path = path[1 : len(path)-1] |
828 | + } |
829 | + |
830 | + // split on : |
831 | + classPath := strings.Split(path, ":") |
832 | + |
833 | + // split off $ZOOCFGDIR |
834 | + if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" { |
835 | + classPath = classPath[1:] |
836 | + } |
837 | + |
838 | + if len(classPath) == 0 { |
839 | + return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron) |
840 | + } |
841 | + return classPath, nil |
842 | + } |
843 | + return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron) |
844 | +} |
845 | + |
846 | +// checkDirectory returns an error if the given path |
847 | +// does not exist or is not a directory. |
848 | +func checkDirectory(path string) os.Error { |
849 | + if info, err := os.Stat(path); err != nil || !info.IsDirectory() { |
850 | + if err == nil { |
851 | + err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")} |
852 | + } |
853 | + return err |
854 | + } |
855 | + return nil |
856 | +} |
857 | |
858 | === added directory 'service' |
859 | === added file 'service/Makefile' |
860 | --- service/Makefile 1970-01-01 00:00:00 +0000 |
861 | +++ service/Makefile 2011-10-03 17:02:23 +0000 |
862 | @@ -0,0 +1,20 @@ |
863 | +include $(GOROOT)/src/Make.inc |
864 | + |
865 | +TARG=launchpad.net/gozk/zk/service |
866 | + |
867 | +GOFILES=\ |
868 | + service.go\ |
869 | + |
870 | +GOFMT=gofmt |
871 | +BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go)) |
872 | + |
873 | +gofmt: $(BADFMT) |
874 | + @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done |
875 | + |
876 | +ifneq ($(BADFMT),) |
877 | +ifneq ($(MAKECMDGOALS),gofmt) |
878 | +$(warning WARNING: make gofmt: $(BADFMT)) |
879 | +endif |
880 | +endif |
881 | + |
882 | +include $(GOROOT)/src/Make.pkg |
883 | |
884 | === added file 'service/service.go' |
885 | --- service/service.go 1970-01-01 00:00:00 +0000 |
886 | +++ service/service.go 2011-10-03 17:02:23 +0000 |
887 | @@ -0,0 +1,160 @@ |
888 | +// The service package provides support for long-running services. |
889 | +package service |
890 | + |
891 | +import ( |
892 | + "os" |
893 | + "fmt" |
894 | + "io/ioutil" |
895 | + "strconv" |
896 | + "path/filepath" |
897 | + "exec" |
898 | + "strings" |
899 | + "time" |
900 | +) |
901 | + |
902 | +// Interface represents a single-process server. |
903 | +type Interface interface { |
904 | + // Directory gives the path to the directory containing |
905 | + // the server's configuration and data files. It is assumed that |
906 | + // this exists and can be modified. |
907 | + Directory() string |
908 | + |
909 | + // CheckAvailability should return an error if resources |
910 | + // required by the server are not available; for instance |
911 | + // if the server requires a network port which is not free. |
912 | + CheckAvailability() os.Error |
913 | + |
914 | + // Command should return a command that can be |
915 | + // used to run the server. The first element of the returned |
916 | + // slice is the executable name; the rest of the elements are |
917 | + // its arguments. |
918 | + Command() ([]string, os.Error) |
919 | +} |
920 | + |
921 | +// Service represents a possibly running server process. |
922 | +type Service struct { |
923 | + srv Interface |
924 | +} |
925 | + |
926 | +// New returns a new Service using the given |
927 | +// server interface. |
928 | +func New(srv Interface) *Service { |
929 | + return &Service{srv} |
930 | +} |
931 | + |
932 | +// Process returns a reference to the identity |
933 | +// of the last running server in the Service's directory. |
934 | +// If the server was shut down, it returns (nil, nil). |
935 | +// The server process may not still be running |
936 | +// (for instance if it was terminated abnormally). |
937 | +// This function is provided for debugging and testing |
938 | +// purposes only. |
939 | +func (s *Service) Process() (*os.Process, os.Error) { |
940 | + data, err := ioutil.ReadFile(s.path("pid.txt")) |
941 | + if err != nil { |
942 | + if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT { |
943 | + return nil, nil |
944 | + } |
945 | + return nil, err |
946 | + } |
947 | + pid, err := strconv.Atoi(string(data)) |
948 | + if err != nil { |
949 | + return nil, os.NewError("bad process id found in pid.txt") |
950 | + } |
951 | + return os.FindProcess(pid) |
952 | +} |
953 | + |
954 | +func (s *Service) path(name string) string { |
955 | + return filepath.Join(s.srv.Directory(), name) |
956 | +} |
957 | + |
958 | +// Start starts the server. It returns an error if the server is already running. |
959 | +// It stores the process ID of the server in the file "pid.txt" |
960 | +// inside the server's directory. Stdout and stderr of the server |
961 | +// are similarly written to "log.txt". |
962 | +func (s *Service) Start() os.Error { |
963 | + if err := s.srv.CheckAvailability(); err != nil { |
964 | + return err |
965 | + } |
966 | + p, err := s.Process() |
967 | + if p != nil || err != nil { |
968 | + if p != nil { |
969 | + p.Release() |
970 | + } |
971 | + return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt")) |
972 | + } |
973 | + |
974 | + // create the pid file before starting the process so that if we get two |
975 | + // programs trying to concurrently start a server on the same directory |
976 | + // at the same time, only one should succeed. |
977 | + pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666) |
978 | + if err != nil { |
979 | + return fmt.Errorf("cannot create pid.txt: %v", err) |
980 | + } |
981 | + defer pidf.Close() |
982 | + |
983 | + args, err := s.srv.Command() |
984 | + if err != nil { |
985 | + return fmt.Errorf("cannot determine command: %v", err) |
986 | + } |
987 | + cmd := exec.Command(args[0], args[1:]...) |
988 | + |
989 | + logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) |
990 | + if err != nil { |
991 | + return fmt.Errorf("cannot create log file: %v", err) |
992 | + } |
993 | + defer logf.Close() |
994 | + cmd.Stdout = logf |
995 | + cmd.Stderr = logf |
996 | + if err := cmd.Start(); err != nil { |
997 | + return fmt.Errorf("cannot start server: %v", err) |
998 | + } |
999 | + if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil { |
1000 | + return fmt.Errorf("cannot write pid file: %v", err) |
1001 | + } |
1002 | + return nil |
1003 | +} |
1004 | + |
1005 | +// Stop kills the server. It is a no-op if it is already running. |
1006 | +func (s *Service) Stop() os.Error { |
1007 | + p, err := s.Process() |
1008 | + if p == nil { |
1009 | + if err != nil { |
1010 | + return fmt.Errorf("cannot read process ID of server: %v", err) |
1011 | + } |
1012 | + return nil |
1013 | + } |
1014 | + defer p.Release() |
1015 | + if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") { |
1016 | + return fmt.Errorf("cannot kill server process: %v", err) |
1017 | + } |
1018 | + // ignore the error returned from Wait because there's little |
1019 | + // we can do about it - it either means that the process has just exited |
1020 | + // anyway or that we can't wait for it for some other reason, |
1021 | + // for example because it was originally started by some other process. |
1022 | + _, err = p.Wait(0) |
1023 | + if err != nil && strings.Contains(err.String(), "no child processes") { |
1024 | + // If we can't wait for the server, it's possible that it was running |
1025 | + // but not as a child of this process, so give it a little while |
1026 | + // to exit. TODO poll with kill(no signal)? |
1027 | + time.Sleep(0.5e9) |
1028 | + } |
1029 | + |
1030 | + if err := os.Remove(s.path("pid.txt")); err != nil { |
1031 | + return fmt.Errorf("cannot remove server process ID file: %v", err) |
1032 | + } |
1033 | + return nil |
1034 | +} |
1035 | + |
1036 | +// Destroy stops the server, and then removes its |
1037 | +// directory and all its contents. |
1038 | +// Warning: this will destroy all data associated with the server. |
1039 | +func (s *Service) Destroy() os.Error { |
1040 | + if err := s.Stop(); err != nil { |
1041 | + return err |
1042 | + } |
1043 | + if err := os.RemoveAll(s.srv.Directory()); err != nil { |
1044 | + return err |
1045 | + } |
1046 | + return nil |
1047 | +} |
1048 | |
1049 | === modified file 'suite_test.go' |
1050 | --- suite_test.go 2011-08-19 01:43:37 +0000 |
1051 | +++ suite_test.go 2011-10-03 17:02:23 +0000 |
1052 | @@ -1,65 +1,48 @@ |
1053 | -package gozk_test |
1054 | +package zk_test |
1055 | |
1056 | import ( |
1057 | . "launchpad.net/gocheck" |
1058 | - "testing" |
1059 | - "io/ioutil" |
1060 | - "path" |
1061 | "fmt" |
1062 | + "launchpad.net/gozk/zk/service" |
1063 | + "launchpad.net/gozk/zk" |
1064 | "os" |
1065 | - "gozk" |
1066 | + "testing" |
1067 | "time" |
1068 | ) |
1069 | |
1070 | func TestAll(t *testing.T) { |
1071 | - TestingT(t) |
1072 | + if !*reattach { |
1073 | + TestingT(t) |
1074 | + } |
1075 | } |
1076 | |
1077 | var _ = Suite(&S{}) |
1078 | |
1079 | type S struct { |
1080 | - zkRoot string |
1081 | - zkTestRoot string |
1082 | - zkTestPort int |
1083 | - zkServerSh string |
1084 | - zkServerOut *os.File |
1085 | - zkAddr string |
1086 | + zkServer *service.Service |
1087 | + zkTestRoot string |
1088 | + zkAddr string |
1089 | |
1090 | - handles []*gozk.ZooKeeper |
1091 | - events []*gozk.Event |
1092 | + handles []*zk.Conn |
1093 | + events []*zk.Event |
1094 | liveWatches int |
1095 | deadWatches chan bool |
1096 | } |
1097 | |
1098 | -var logLevel = 0 //gozk.LOG_ERROR |
1099 | - |
1100 | - |
1101 | -var testZooCfg = ("dataDir=%s\n" + |
1102 | - "clientPort=%d\n" + |
1103 | - "tickTime=2000\n" + |
1104 | - "initLimit=10\n" + |
1105 | - "syncLimit=5\n" + |
1106 | - "") |
1107 | - |
1108 | -var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" + |
1109 | - "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + |
1110 | - "log4j.appender.CONSOLE.Threshold=DEBUG\n" + |
1111 | - "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + |
1112 | - "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" + |
1113 | - "") |
1114 | - |
1115 | -func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) { |
1116 | - zk, watch, err := gozk.Init(s.zkAddr, 5e9) |
1117 | +var logLevel = 0 //zk.LOG_ERROR |
1118 | + |
1119 | +func (s *S) init(c *C) (*zk.Conn, chan zk.Event) { |
1120 | + conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
1121 | c.Assert(err, IsNil) |
1122 | |
1123 | - s.handles = append(s.handles, zk) |
1124 | + s.handles = append(s.handles, conn) |
1125 | |
1126 | event := <-watch |
1127 | |
1128 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
1129 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
1130 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
1131 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
1132 | |
1133 | - bufferedWatch := make(chan gozk.Event, 256) |
1134 | + bufferedWatch := make(chan zk.Event, 256) |
1135 | bufferedWatch <- event |
1136 | |
1137 | s.liveWatches += 1 |
1138 | @@ -82,13 +65,13 @@ |
1139 | s.deadWatches <- true |
1140 | }() |
1141 | |
1142 | - return zk, bufferedWatch |
1143 | + return conn, bufferedWatch |
1144 | } |
1145 | |
1146 | func (s *S) SetUpTest(c *C) { |
1147 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
1148 | + c.Assert(zk.CountPendingWatches(), Equals, 0, |
1149 | Bug("Test got a dirty watch state before running!")) |
1150 | - gozk.SetLogLevel(logLevel) |
1151 | + zk.SetLogLevel(logLevel) |
1152 | } |
1153 | |
1154 | func (s *S) TearDownTest(c *C) { |
1155 | @@ -108,98 +91,38 @@ |
1156 | } |
1157 | |
1158 | // Reset the list of handles. |
1159 | - s.handles = make([]*gozk.ZooKeeper, 0) |
1160 | + s.handles = make([]*zk.Conn, 0) |
1161 | |
1162 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
1163 | + c.Assert(zk.CountPendingWatches(), Equals, 0, |
1164 | Bug("Test left live watches behind!")) |
1165 | } |
1166 | |
1167 | -// We use the suite set up and tear down to manage a custom zookeeper |
1168 | +// We use the suite set up and tear down to manage a custom ZooKeeper |
1169 | // |
1170 | func (s *S) SetUpSuite(c *C) { |
1171 | - |
1172 | + var err os.Error |
1173 | s.deadWatches = make(chan bool) |
1174 | |
1175 | - var err os.Error |
1176 | - |
1177 | - s.zkRoot = os.Getenv("ZKROOT") |
1178 | - if s.zkRoot == "" { |
1179 | - panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)") |
1180 | - } |
1181 | - |
1182 | - s.zkTestRoot = c.MkDir() |
1183 | - s.zkTestPort = 21812 |
1184 | - |
1185 | - println("ZooKeeper test server directory:", s.zkTestRoot) |
1186 | - println("ZooKeeper test server port:", s.zkTestPort) |
1187 | - |
1188 | - s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort) |
1189 | - |
1190 | - s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh") |
1191 | - s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"), |
1192 | - os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) |
1193 | - if err != nil { |
1194 | - panic("Can't open stdout.txt file for server: " + err.String()) |
1195 | - } |
1196 | - |
1197 | - dataDir := path.Join(s.zkTestRoot, "data") |
1198 | - confDir := path.Join(s.zkTestRoot, "conf") |
1199 | - |
1200 | - os.Mkdir(dataDir, 0755) |
1201 | - os.Mkdir(confDir, 0755) |
1202 | - |
1203 | - err = os.Setenv("ZOOCFGDIR", confDir) |
1204 | - if err != nil { |
1205 | - panic("Can't set $ZOOCFGDIR: " + err.String()) |
1206 | - } |
1207 | - |
1208 | - zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort)) |
1209 | - err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644) |
1210 | - if err != nil { |
1211 | - panic("Can't write zoo.cfg: " + err.String()) |
1212 | - } |
1213 | - |
1214 | - log4jPrp := []byte(testLog4jPrp) |
1215 | - err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644) |
1216 | - if err != nil { |
1217 | - panic("Can't write log4j.properties: " + err.String()) |
1218 | - } |
1219 | - |
1220 | - s.StartZK() |
1221 | + // N.B. We meed to create a subdirectory because zk.CreateServer |
1222 | + // insists on creating its own directory. |
1223 | + s.zkTestRoot = c.MkDir() + "/zk" |
1224 | + |
1225 | + port := 21812 |
1226 | + s.zkAddr = fmt.Sprint("localhost:", port) |
1227 | + |
1228 | + srv, err := zk.CreateServer(port, s.zkTestRoot, "") |
1229 | + if err != nil { |
1230 | + c.Fatal("Cannot set up server environment: ", err) |
1231 | + } |
1232 | + s.zkServer = service.New(srv) |
1233 | + err = s.zkServer.Start() |
1234 | + if err != nil { |
1235 | + c.Fatal("Cannot start ZooKeeper server: ", err) |
1236 | + } |
1237 | } |
1238 | |
1239 | func (s *S) TearDownSuite(c *C) { |
1240 | - s.StopZK() |
1241 | - s.zkServerOut.Close() |
1242 | -} |
1243 | - |
1244 | -func (s *S) StartZK() { |
1245 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
1246 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr) |
1247 | - if err != nil { |
1248 | - panic("Problem executing zkServer.sh start: " + err.String()) |
1249 | - } |
1250 | - |
1251 | - result, err := proc.Wait(0) |
1252 | - if err != nil { |
1253 | - panic(err.String()) |
1254 | - } else if result.ExitStatus() != 0 { |
1255 | - panic("'zkServer.sh start' exited with non-zero status") |
1256 | - } |
1257 | -} |
1258 | - |
1259 | -func (s *S) StopZK() { |
1260 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
1261 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr) |
1262 | - if err != nil { |
1263 | - panic("Problem executing zkServer.sh stop: " + err.String() + |
1264 | - " (look for runaway java processes!)") |
1265 | - } |
1266 | - result, err := proc.Wait(0) |
1267 | - if err != nil { |
1268 | - panic(err.String()) |
1269 | - } else if result.ExitStatus() != 0 { |
1270 | - panic("'zkServer.sh stop' exited with non-zero status " + |
1271 | - "(look for runaway java processes!)") |
1272 | + if s.zkServer != nil { |
1273 | + s.zkServer.Stop() // TODO Destroy |
1274 | } |
1275 | } |
1276 | |
1277 | === renamed file 'gozk.go' => 'zk.go' |
1278 | --- gozk.go 2011-08-19 01:56:39 +0000 |
1279 | +++ zk.go 2011-10-03 17:02:23 +0000 |
1280 | @@ -1,4 +1,4 @@ |
1281 | -// gozk - Zookeeper support for the Go language |
1282 | +// gozk - ZooKeeper support for the Go language |
1283 | // |
1284 | // https://wiki.ubuntu.com/gozk |
1285 | // |
1286 | @@ -6,7 +6,7 @@ |
1287 | // |
1288 | // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> |
1289 | // |
1290 | -package gozk |
1291 | +package zk |
1292 | |
1293 | /* |
1294 | #cgo CFLAGS: -I/usr/include/c-client-src |
1295 | @@ -27,17 +27,16 @@ |
1296 | // ----------------------------------------------------------------------- |
1297 | // Main constants and data types. |
1298 | |
1299 | -// The main ZooKeeper object, created through the Init function. |
1300 | -// Encapsulates all communication with ZooKeeper. |
1301 | -type ZooKeeper struct { |
1302 | +// Conn represents a connection to a set of ZooKeeper nodes. |
1303 | +type Conn struct { |
1304 | watchChannels map[uintptr]chan Event |
1305 | sessionWatchId uintptr |
1306 | handle *C.zhandle_t |
1307 | mutex sync.Mutex |
1308 | } |
1309 | |
1310 | -// ClientId represents the established session in ZooKeeper. This is only |
1311 | -// useful to be passed back into the ReInit function. |
1312 | +// ClientId represents an established ZooKeeper session. It can be |
1313 | +// passed into Redial to reestablish a connection to an existing session. |
1314 | type ClientId struct { |
1315 | cId C.clientid_t |
1316 | } |
1317 | @@ -59,25 +58,21 @@ |
1318 | // through one of the W-suffixed functions (GetW, ExistsW, etc). |
1319 | // |
1320 | // The session channel will only receive session-level events notifying |
1321 | -// about critical and transient changes in the ZooKeeper connection |
1322 | -// state (STATE_CONNECTED, STATE_EXPIRED_SESSION, etc). On long |
1323 | -// running applications the session channel must *necessarily* be |
1324 | -// observed since certain events like session expirations require an |
1325 | -// explicit reconnection and reestablishment of state (or bailing out). |
1326 | -// Because of that, the buffer used on the session channel has a limited |
1327 | -// size, and a panic will occur if too many events are not collected. |
1328 | +// about changes in the ZooKeeper connection state (STATE_CONNECTED, |
1329 | +// STATE_EXPIRED_SESSION, etc). On long running applications the session |
1330 | +// channel must *necessarily* be observed since certain events like session |
1331 | +// expirations require an explicit reconnection and reestablishment of |
1332 | +// state (or bailing out). Because of that, the buffer used on the session |
1333 | +// channel has a limited size, and a panic will occur if too many events |
1334 | +// are not collected. |
1335 | // |
1336 | // Watch channels enable monitoring state for nodes, and the |
1337 | // moment they're fired depends on which function was called to |
1338 | -// create them. Note that, unlike in other ZooKeeper interfaces, |
1339 | -// gozk will NOT dispatch unimportant session events such as |
1340 | -// STATE_ASSOCIATING, STATE_CONNECTING and STATE_CONNECTED to |
1341 | -// watch Event channels, since they are transient and disruptive |
1342 | -// to the workflow. Critical state changes such as expirations |
1343 | -// are still delivered to all event channels, though, and the |
1344 | -// transient events may be obsererved in the session channel. |
1345 | +// create them. Two critical session events, STATE_EXPIRED_SESSION |
1346 | +// and STATE_EXPIRED_SESSION, will be delivered to all |
1347 | +// node watch channels if they occur. |
1348 | // |
1349 | -// Since every watch channel may receive critical session events, events |
1350 | +// Since every watch channel may receive these critical session events, events |
1351 | // received must not be handled blindly as if the watch requested has |
1352 | // been fired. To facilitate such tests, Events offer the Ok method, |
1353 | // and they also have a good String method so they may be used as an |
1354 | @@ -89,44 +84,78 @@ |
1355 | // return |
1356 | // } |
1357 | // |
1358 | -// Note that closed channels will deliver zeroed Event, which means |
1359 | +// Note that closed channels will deliver a zeroed Event, which means |
1360 | // event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED, |
1361 | // to facilitate handling. |
1362 | type Event struct { |
1363 | - Type int |
1364 | - Path string |
1365 | + // Type gives the type of event (one of the EVENT_* constants). |
1366 | + // If Type is EVENT_SESSION, then the events is a session |
1367 | + // event. |
1368 | + Type int |
1369 | + |
1370 | + // For non-session events, Path gives the path of the node |
1371 | + // that was being watched. |
1372 | + Path string |
1373 | + |
1374 | + // For session events, State (one of the STATE* constants) gives the session |
1375 | + // status. |
1376 | State int |
1377 | } |
1378 | |
1379 | -// Error codes that may be used to verify the result of the |
1380 | -// Code method from Error. |
1381 | +// Error represents a ZooKeeper error. |
1382 | +type Error int |
1383 | + |
1384 | const ( |
1385 | - ZOK = C.ZOK |
1386 | - ZSYSTEMERROR = C.ZSYSTEMERROR |
1387 | - ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY |
1388 | - ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY |
1389 | - ZCONNECTIONLOSS = C.ZCONNECTIONLOSS |
1390 | - ZMARSHALLINGERROR = C.ZMARSHALLINGERROR |
1391 | - ZUNIMPLEMENTED = C.ZUNIMPLEMENTED |
1392 | - ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT |
1393 | - ZBADARGUMENTS = C.ZBADARGUMENTS |
1394 | - ZINVALIDSTATE = C.ZINVALIDSTATE |
1395 | - ZAPIERROR = C.ZAPIERROR |
1396 | - ZNONODE = C.ZNONODE |
1397 | - ZNOAUTH = C.ZNOAUTH |
1398 | - ZBADVERSION = C.ZBADVERSION |
1399 | - ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS |
1400 | - ZNODEEXISTS = C.ZNODEEXISTS |
1401 | - ZNOTEMPTY = C.ZNOTEMPTY |
1402 | - ZSESSIONEXPIRED = C.ZSESSIONEXPIRED |
1403 | - ZINVALIDCALLBACK = C.ZINVALIDCALLBACK |
1404 | - ZINVALIDACL = C.ZINVALIDACL |
1405 | - ZAUTHFAILED = C.ZAUTHFAILED |
1406 | - ZCLOSING = C.ZCLOSING |
1407 | - ZNOTHING = C.ZNOTHING |
1408 | - ZSESSIONMOVED = C.ZSESSIONMOVED |
1409 | + ZOK Error = C.ZOK |
1410 | + ZSYSTEMERROR Error = C.ZSYSTEMERROR |
1411 | + ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY |
1412 | + ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY |
1413 | + ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS |
1414 | + ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR |
1415 | + ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED |
1416 | + ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT |
1417 | + ZBADARGUMENTS Error = C.ZBADARGUMENTS |
1418 | + ZINVALIDSTATE Error = C.ZINVALIDSTATE |
1419 | + ZAPIERROR Error = C.ZAPIERROR |
1420 | + ZNONODE Error = C.ZNONODE |
1421 | + ZNOAUTH Error = C.ZNOAUTH |
1422 | + ZBADVERSION Error = C.ZBADVERSION |
1423 | + ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS |
1424 | + ZNODEEXISTS Error = C.ZNODEEXISTS |
1425 | + ZNOTEMPTY Error = C.ZNOTEMPTY |
1426 | + ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED |
1427 | + ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK |
1428 | + ZINVALIDACL Error = C.ZINVALIDACL |
1429 | + ZAUTHFAILED Error = C.ZAUTHFAILED |
1430 | + ZCLOSING Error = C.ZCLOSING |
1431 | + ZNOTHING Error = C.ZNOTHING |
1432 | + ZSESSIONMOVED Error = C.ZSESSIONMOVED |
1433 | ) |
1434 | |
1435 | +func (error Error) String() string { |
1436 | + return C.GoString(C.zerror(C.int(error))) // Static, no need to free it. |
1437 | +} |
1438 | + |
1439 | +// zkError creates an appropriate error return from |
1440 | +// a ZooKeeper status and the errno return from a C API |
1441 | +// call. |
1442 | +func zkError(rc C.int, cerr os.Error) os.Error { |
1443 | + code := Error(rc) |
1444 | + switch code { |
1445 | + case ZOK: |
1446 | + return nil |
1447 | + |
1448 | + case ZSYSTEMERROR: |
1449 | + // If a ZooKeeper call returns ZSYSTEMERROR, then |
1450 | + // errno becomes significant. If errno has not been |
1451 | + // set, then we will return ZSYSTEMERROR nonetheless. |
1452 | + if cerr != nil { |
1453 | + return cerr |
1454 | + } |
1455 | + } |
1456 | + return code |
1457 | +} |
1458 | + |
1459 | // Constants for SetLogLevel. |
1460 | const ( |
1461 | LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR |
1462 | @@ -141,8 +170,8 @@ |
1463 | |
1464 | // Constants for Create's flags parameter. |
1465 | const ( |
1466 | - EPHEMERAL = 1 << iota |
1467 | - SEQUENCE |
1468 | + EPHEMERAL = 1 << iota // The node will be deleted when the current session ends. |
1469 | + SEQUENCE // Append a sequence number to the node name. |
1470 | ) |
1471 | |
1472 | // Constants for ACL Perms. |
1473 | @@ -273,104 +302,79 @@ |
1474 | } |
1475 | |
1476 | // ----------------------------------------------------------------------- |
1477 | -// Error interface which maps onto the ZooKeeper error codes. |
1478 | - |
1479 | -type Error interface { |
1480 | - String() string |
1481 | - Code() int |
1482 | -} |
1483 | - |
1484 | -type errorType struct { |
1485 | - zkrc C.int |
1486 | - err os.Error |
1487 | -} |
1488 | - |
1489 | -func newError(zkrc C.int, err os.Error) Error { |
1490 | - return &errorType{zkrc, err} |
1491 | -} |
1492 | - |
1493 | -func (error *errorType) String() (result string) { |
1494 | - if error.zkrc == ZSYSTEMERROR && error.err != nil { |
1495 | - result = error.err.String() |
1496 | - } else { |
1497 | - result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it. |
1498 | - } |
1499 | - return |
1500 | -} |
1501 | - |
1502 | -// Code returns the error code that may be compared against one of |
1503 | -// the gozk.Z* constants. |
1504 | -func (error *errorType) Code() int { |
1505 | - return int(error.zkrc) |
1506 | -} |
1507 | - |
1508 | -// ----------------------------------------------------------------------- |
1509 | -// Stat interface which maps onto the ZooKeeper Stat struct. |
1510 | - |
1511 | -// We declare this as an interface rather than an actual struct because |
1512 | -// this way we don't have to copy data around between the real C struct |
1513 | -// and the Go one on every call. Most uses will only touch a few elements, |
1514 | -// or even ignore the stat entirely, so that's a win. |
1515 | |
1516 | // Stat contains detailed information about a node. |
1517 | -type Stat interface { |
1518 | - Czxid() int64 |
1519 | - Mzxid() int64 |
1520 | - CTime() int64 |
1521 | - MTime() int64 |
1522 | - Version() int32 |
1523 | - CVersion() int32 |
1524 | - AVersion() int32 |
1525 | - EphemeralOwner() int64 |
1526 | - DataLength() int32 |
1527 | - NumChildren() int32 |
1528 | - Pzxid() int64 |
1529 | -} |
1530 | - |
1531 | -type resultStat C.struct_Stat |
1532 | - |
1533 | -func (stat *resultStat) Czxid() int64 { |
1534 | - return int64(stat.czxid) |
1535 | -} |
1536 | - |
1537 | -func (stat *resultStat) Mzxid() int64 { |
1538 | - return int64(stat.mzxid) |
1539 | -} |
1540 | - |
1541 | -func (stat *resultStat) CTime() int64 { |
1542 | - return int64(stat.ctime) |
1543 | -} |
1544 | - |
1545 | -func (stat *resultStat) MTime() int64 { |
1546 | - return int64(stat.mtime) |
1547 | -} |
1548 | - |
1549 | -func (stat *resultStat) Version() int32 { |
1550 | - return int32(stat.version) |
1551 | -} |
1552 | - |
1553 | -func (stat *resultStat) CVersion() int32 { |
1554 | - return int32(stat.cversion) |
1555 | -} |
1556 | - |
1557 | -func (stat *resultStat) AVersion() int32 { |
1558 | - return int32(stat.aversion) |
1559 | -} |
1560 | - |
1561 | -func (stat *resultStat) EphemeralOwner() int64 { |
1562 | - return int64(stat.ephemeralOwner) |
1563 | -} |
1564 | - |
1565 | -func (stat *resultStat) DataLength() int32 { |
1566 | - return int32(stat.dataLength) |
1567 | -} |
1568 | - |
1569 | -func (stat *resultStat) NumChildren() int32 { |
1570 | - return int32(stat.numChildren) |
1571 | -} |
1572 | - |
1573 | -func (stat *resultStat) Pzxid() int64 { |
1574 | - return int64(stat.pzxid) |
1575 | +type Stat struct { |
1576 | + c C.struct_Stat |
1577 | +} |
1578 | + |
1579 | +func (stat *Stat) String() string { |
1580 | + return fmt.Sprintf( |
1581 | + "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+ |
1582 | + "Version: %d; CVersion: %d; AVersion: %d; "+ |
1583 | + "EphemeralOwner: %d; DataLength: %d; "+ |
1584 | + "NumChildren: %d; Pzxid: %d}", |
1585 | + stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(), |
1586 | + stat.Version(), stat.CVersion(), stat.AVersion(), |
1587 | + stat.EphemeralOwner(), stat.DataLength(), |
1588 | + stat.NumChildren(), stat.Pzxid(), |
1589 | + ) |
1590 | +} |
1591 | + |
1592 | +// Czxid returns the zxid of the change that caused the node to be created. |
1593 | +func (stat *Stat) Czxid() int64 { |
1594 | + return int64(stat.c.czxid) |
1595 | +} |
1596 | + |
1597 | +// Mzxid returns the zxid of the change that last modified the node. |
1598 | +func (stat *Stat) Mzxid() int64 { |
1599 | + return int64(stat.c.mzxid) |
1600 | +} |
1601 | + |
1602 | +// CTime returns the time in milliseconds from epoch when the node was created. |
1603 | +func (stat *Stat) CTime() int64 { |
1604 | + return int64(stat.c.ctime) |
1605 | +} |
1606 | + |
1607 | +// MTime returns the time in milliseconds from epoch when the node was last modified. |
1608 | +func (stat *Stat) MTime() int64 { |
1609 | + return int64(stat.c.mtime) |
1610 | +} |
1611 | + |
1612 | +// Version returns the number of changes to the data of the node. |
1613 | +func (stat *Stat) Version() int32 { |
1614 | + return int32(stat.c.version) |
1615 | +} |
1616 | + |
1617 | +// CVersion returns the number of changes to the children of the node. |
1618 | +func (stat *Stat) CVersion() int32 { |
1619 | + return int32(stat.c.cversion) |
1620 | +} |
1621 | + |
1622 | +// AVersion returns the number of changes to the ACL of the node. |
1623 | +func (stat *Stat) AVersion() int32 { |
1624 | + return int32(stat.c.aversion) |
1625 | +} |
1626 | + |
1627 | +// If the node is an ephemeral node, EphemeralOwner returns the session id |
1628 | +// of the owner of the node; otherwise it will return zero. |
1629 | +func (stat *Stat) EphemeralOwner() int64 { |
1630 | + return int64(stat.c.ephemeralOwner) |
1631 | +} |
1632 | + |
1633 | +// DataLength returns the length of the data in the node in bytes. |
1634 | +func (stat *Stat) DataLength() int32 { |
1635 | + return int32(stat.c.dataLength) |
1636 | +} |
1637 | + |
1638 | +// NumChildren returns the number of children of the znode. |
1639 | +func (stat *Stat) NumChildren() int32 { |
1640 | + return int32(stat.c.numChildren) |
1641 | +} |
1642 | + |
1643 | +// Pzxid returns the Pzxid of the node, whatever that is. |
1644 | +func (stat *Stat) Pzxid() int64 { |
1645 | + return int64(stat.c.pzxid) |
1646 | } |
1647 | |
1648 | // ----------------------------------------------------------------------- |
1649 | @@ -380,11 +384,13 @@ |
1650 | |
1651 | // SetLogLevel changes the minimum level of logging output generated |
1652 | // to adjust the amount of information provided. |
1653 | +// The default is LOG_ERROR. If level is zero, no logging output |
1654 | +// will be generated. |
1655 | func SetLogLevel(level int) { |
1656 | C.zoo_set_debug_level(C.ZooLogLevel(level)) |
1657 | } |
1658 | |
1659 | -// Init initializes the communication with a ZooKeeper cluster. The provided |
1660 | +// Dial initializes the communication with a ZooKeeper cluster. The provided |
1661 | // servers parameter may include multiple server addresses, separated |
1662 | // by commas, so that the client will automatically attempt to connect |
1663 | // to another server if one of them stops working for whatever reason. |
1664 | @@ -398,79 +404,73 @@ |
1665 | // The watch channel receives events of type SESSION_EVENT when any change |
1666 | // to the state of the established connection happens. See the documentation |
1667 | // for the Event type for more details. |
1668 | -func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) { |
1669 | - zk, watch, err = internalInit(servers, recvTimeoutNS, nil) |
1670 | - return |
1671 | +func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) { |
1672 | + return dial(servers, recvTimeoutNS, nil) |
1673 | } |
1674 | |
1675 | -// Equivalent to Init, but attempt to reestablish an existing session |
1676 | +// Redial is equivalent to Dial, but attempts to reestablish an existing session |
1677 | // identified via the clientId parameter. |
1678 | -func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) { |
1679 | - zk, watch, err = internalInit(servers, recvTimeoutNS, clientId) |
1680 | - return |
1681 | +func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1682 | + return dial(servers, recvTimeoutNS, clientId) |
1683 | } |
1684 | |
1685 | -func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) { |
1686 | - |
1687 | - zk := &ZooKeeper{} |
1688 | - zk.watchChannels = make(map[uintptr]chan Event) |
1689 | +func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1690 | + conn := &Conn{} |
1691 | + conn.watchChannels = make(map[uintptr]chan Event) |
1692 | |
1693 | var cId *C.clientid_t |
1694 | if clientId != nil { |
1695 | cId = &clientId.cId |
1696 | } |
1697 | |
1698 | - watchId, watchChannel := zk.createWatch(true) |
1699 | - zk.sessionWatchId = watchId |
1700 | + watchId, watchChannel := conn.createWatch(true) |
1701 | + conn.sessionWatchId = watchId |
1702 | |
1703 | cservers := C.CString(servers) |
1704 | - handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0) |
1705 | + handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0) |
1706 | C.free(unsafe.Pointer(cservers)) |
1707 | if handle == nil { |
1708 | - zk.closeAllWatches() |
1709 | - return nil, nil, newError(ZSYSTEMERROR, cerr) |
1710 | + conn.closeAllWatches() |
1711 | + return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr) |
1712 | } |
1713 | - zk.handle = handle |
1714 | + conn.handle = handle |
1715 | runWatchLoop() |
1716 | - return zk, watchChannel, nil |
1717 | + return conn, watchChannel, nil |
1718 | } |
1719 | |
1720 | // ClientId returns the client ID for the existing session with ZooKeeper. |
1721 | // This is useful to reestablish an existing session via ReInit. |
1722 | -func (zk *ZooKeeper) ClientId() *ClientId { |
1723 | - return &ClientId{*C.zoo_client_id(zk.handle)} |
1724 | +func (conn *Conn) ClientId() *ClientId { |
1725 | + return &ClientId{*C.zoo_client_id(conn.handle)} |
1726 | } |
1727 | |
1728 | // Close terminates the ZooKeeper interaction. |
1729 | -func (zk *ZooKeeper) Close() Error { |
1730 | - |
1731 | - // Protect from concurrency around zk.handle change. |
1732 | - zk.mutex.Lock() |
1733 | - defer zk.mutex.Unlock() |
1734 | - |
1735 | - if zk.handle == nil { |
1736 | +func (conn *Conn) Close() os.Error { |
1737 | + |
1738 | + // Protect from concurrency around conn.handle change. |
1739 | + conn.mutex.Lock() |
1740 | + defer conn.mutex.Unlock() |
1741 | + |
1742 | + if conn.handle == nil { |
1743 | // ZooKeeper may hang indefinitely if a handler is closed twice, |
1744 | // so we get in the way and prevent it from happening. |
1745 | - return newError(ZCLOSING, nil) |
1746 | + return ZCLOSING |
1747 | } |
1748 | - rc, cerr := C.zookeeper_close(zk.handle) |
1749 | + rc, cerr := C.zookeeper_close(conn.handle) |
1750 | |
1751 | - zk.closeAllWatches() |
1752 | + conn.closeAllWatches() |
1753 | stopWatchLoop() |
1754 | |
1755 | - // At this point, nothing else should need zk.handle. |
1756 | - zk.handle = nil |
1757 | + // At this point, nothing else should need conn.handle. |
1758 | + conn.handle = nil |
1759 | |
1760 | - if rc != C.ZOK { |
1761 | - return newError(rc, cerr) |
1762 | - } |
1763 | - return nil |
1764 | + return zkError(rc, cerr) |
1765 | } |
1766 | |
1767 | // Get returns the data and status from an existing node. err will be nil, |
1768 | // unless an error is found. Attempting to retrieve data from a non-existing |
1769 | // node is an error. |
1770 | -func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) { |
1771 | +func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) { |
1772 | |
1773 | cpath := C.CString(path) |
1774 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1775 | @@ -478,22 +478,22 @@ |
1776 | defer C.free(unsafe.Pointer(cpath)) |
1777 | defer C.free(unsafe.Pointer(cbuffer)) |
1778 | |
1779 | - cstat := C.struct_Stat{} |
1780 | - rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil, |
1781 | - cbuffer, &cbufferLen, &cstat) |
1782 | + var cstat Stat |
1783 | + rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c) |
1784 | if rc != C.ZOK { |
1785 | - return "", nil, newError(rc, cerr) |
1786 | + return "", nil, zkError(rc, cerr) |
1787 | } |
1788 | + |
1789 | result := C.GoStringN(cbuffer, cbufferLen) |
1790 | - |
1791 | - return result, (*resultStat)(&cstat), nil |
1792 | + return result, &cstat, nil |
1793 | } |
1794 | |
1795 | // GetW works like Get but also returns a channel that will receive |
1796 | -// a single Event value when the data or existence of the given ZooKeeper |
1797 | -// node changes or when critical session events happen. See the |
1798 | -// documentation of the Event type for more details. |
1799 | -func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) { |
1800 | +// a single Event value, with Type EVENT_CHANGED or EVENT_DELETED, |
1801 | +// when the node contents change or it is deleted. |
1802 | +// It will also receive a critical session event if the server goes down. |
1803 | +// See the documentation of the Event type for more details. |
1804 | +func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) { |
1805 | |
1806 | cpath := C.CString(path) |
1807 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1808 | @@ -501,73 +501,69 @@ |
1809 | defer C.free(unsafe.Pointer(cpath)) |
1810 | defer C.free(unsafe.Pointer(cbuffer)) |
1811 | |
1812 | - watchId, watchChannel := zk.createWatch(true) |
1813 | + watchId, watchChannel := conn.createWatch(true) |
1814 | |
1815 | - cstat := C.struct_Stat{} |
1816 | - rc, cerr := C.zoo_wget(zk.handle, cpath, |
1817 | - C.watch_handler, unsafe.Pointer(watchId), |
1818 | - cbuffer, &cbufferLen, &cstat) |
1819 | + var cstat Stat |
1820 | + rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c) |
1821 | if rc != C.ZOK { |
1822 | - zk.forgetWatch(watchId) |
1823 | - return "", nil, nil, newError(rc, cerr) |
1824 | + conn.forgetWatch(watchId) |
1825 | + return "", nil, nil, zkError(rc, cerr) |
1826 | } |
1827 | |
1828 | result := C.GoStringN(cbuffer, cbufferLen) |
1829 | - return result, (*resultStat)(&cstat), watchChannel, nil |
1830 | + return result, &cstat, watchChannel, nil |
1831 | } |
1832 | |
1833 | // Children returns the children list and status from an existing node. |
1834 | -// err will be nil, unless an error is found. Attempting to retrieve the |
1835 | -// children list from a non-existent node is an error. |
1836 | -func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) { |
1837 | +// Attempting to retrieve the children list from a non-existent node is an error. |
1838 | +func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) { |
1839 | |
1840 | cpath := C.CString(path) |
1841 | defer C.free(unsafe.Pointer(cpath)) |
1842 | |
1843 | cvector := C.struct_String_vector{} |
1844 | - cstat := C.struct_Stat{} |
1845 | - rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil, |
1846 | - &cvector, &cstat) |
1847 | + var cstat Stat |
1848 | + rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c) |
1849 | |
1850 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1851 | if cvector.count != 0 { |
1852 | children = parseStringVector(&cvector) |
1853 | } |
1854 | - if rc != C.ZOK { |
1855 | - err = newError(rc, cerr) |
1856 | + if rc == C.ZOK { |
1857 | + stat = &cstat |
1858 | } else { |
1859 | - stat = (*resultStat)(&cstat) |
1860 | + err = zkError(rc, cerr) |
1861 | } |
1862 | return |
1863 | } |
1864 | |
1865 | // ChildrenW works like Children but also returns a channel that will |
1866 | -// receive a single Event value when a node is added or removed under the |
1867 | -// provided path or when critical session events happen. See the documentation |
1868 | -// of the Event type for more details. |
1869 | -func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) { |
1870 | +// receive a single Event value when a child of the given path is |
1871 | +// added or removed (EVENT_CHILD), or when the path node itself is |
1872 | +// deleted (EVENT_DELETED). |
1873 | +// It will also receive a critical session event if the server goes down. |
1874 | +// See the documentation of the Event type for more details. |
1875 | +func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) { |
1876 | |
1877 | cpath := C.CString(path) |
1878 | defer C.free(unsafe.Pointer(cpath)) |
1879 | |
1880 | - watchId, watchChannel := zk.createWatch(true) |
1881 | + watchId, watchChannel := conn.createWatch(true) |
1882 | |
1883 | cvector := C.struct_String_vector{} |
1884 | - cstat := C.struct_Stat{} |
1885 | - rc, cerr := C.zoo_wget_children2(zk.handle, cpath, |
1886 | - C.watch_handler, unsafe.Pointer(watchId), |
1887 | - &cvector, &cstat) |
1888 | + var cstat Stat |
1889 | + rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c) |
1890 | |
1891 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1892 | if cvector.count != 0 { |
1893 | children = parseStringVector(&cvector) |
1894 | } |
1895 | - if rc != C.ZOK { |
1896 | - zk.forgetWatch(watchId) |
1897 | - err = newError(rc, cerr) |
1898 | + if rc == C.ZOK { |
1899 | + stat = &cstat |
1900 | + watch = watchChannel |
1901 | } else { |
1902 | - stat = (*resultStat)(&cstat) |
1903 | - watch = watchChannel |
1904 | + conn.forgetWatch(watchId) |
1905 | + err = zkError(rc, cerr) |
1906 | } |
1907 | return |
1908 | } |
1909 | @@ -588,51 +584,51 @@ |
1910 | // Exists checks if a node exists at the given path. If it does, |
1911 | // stat will contain meta information on the existing node, otherwise |
1912 | // it will be nil. |
1913 | -func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) { |
1914 | +func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) { |
1915 | cpath := C.CString(path) |
1916 | defer C.free(unsafe.Pointer(cpath)) |
1917 | |
1918 | - cstat := C.struct_Stat{} |
1919 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat) |
1920 | + var cstat Stat |
1921 | + rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c) |
1922 | |
1923 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1924 | // for an exists call, otherwise every Exists call would have to check |
1925 | // for err != nil and err.Code() != ZNONODE. |
1926 | if rc == C.ZOK { |
1927 | - stat = (*resultStat)(&cstat) |
1928 | + stat = &cstat |
1929 | } else if rc != C.ZNONODE { |
1930 | - err = newError(rc, cerr) |
1931 | + err = zkError(rc, cerr) |
1932 | } |
1933 | return |
1934 | } |
1935 | |
1936 | // ExistsW works like Exists but also returns a channel that will |
1937 | -// receive an Event value when a node is created in case the returned |
1938 | -// stat is nil and the node didn't exist, or when the existing node |
1939 | -// is removed. It will also receive critical session events. See the |
1940 | -// documentation of the Event type for more details. |
1941 | -func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) { |
1942 | +// receive a single Event, with Type EVENT_CREATED, EVENT_DELETED |
1943 | +// or EVENT_CHANGED, when the node is next created, removed, |
1944 | +// or its contents change. |
1945 | +// It will also receive a critical session event if the server goes down. |
1946 | +// See the documentation of the Event type for more details. |
1947 | +func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) { |
1948 | cpath := C.CString(path) |
1949 | defer C.free(unsafe.Pointer(cpath)) |
1950 | |
1951 | - watchId, watchChannel := zk.createWatch(true) |
1952 | + watchId, watchChannel := conn.createWatch(true) |
1953 | |
1954 | - cstat := C.struct_Stat{} |
1955 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, |
1956 | - C.watch_handler, unsafe.Pointer(watchId), &cstat) |
1957 | + var cstat Stat |
1958 | + rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c) |
1959 | |
1960 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1961 | // for an exists call, otherwise every Exists call would have to check |
1962 | // for err != nil and err.Code() != ZNONODE. |
1963 | - switch rc { |
1964 | + switch Error(rc) { |
1965 | case ZOK: |
1966 | - stat = (*resultStat)(&cstat) |
1967 | + stat = &cstat |
1968 | watch = watchChannel |
1969 | case ZNONODE: |
1970 | watch = watchChannel |
1971 | default: |
1972 | - zk.forgetWatch(watchId) |
1973 | - err = newError(rc, cerr) |
1974 | + conn.forgetWatch(watchId) |
1975 | + err = zkError(rc, cerr) |
1976 | } |
1977 | return |
1978 | } |
1979 | @@ -646,7 +642,7 @@ |
1980 | // The returned path is useful in cases where the created path may differ |
1981 | // from the requested one, such as when a sequence number is appended |
1982 | // to it due to the use of the gozk.SEQUENCE flag. |
1983 | -func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) { |
1984 | +func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) { |
1985 | cpath := C.CString(path) |
1986 | cvalue := C.CString(value) |
1987 | defer C.free(unsafe.Pointer(cpath)) |
1988 | @@ -660,13 +656,13 @@ |
1989 | cpathCreated := (*C.char)(C.malloc(cpathLen)) |
1990 | defer C.free(unsafe.Pointer(cpathCreated)) |
1991 | |
1992 | - rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)), |
1993 | - caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
1994 | - if rc != C.ZOK { |
1995 | - return "", newError(rc, cerr) |
1996 | + rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
1997 | + if rc == C.ZOK { |
1998 | + pathCreated = C.GoString(cpathCreated) |
1999 | + } else { |
2000 | + err = zkError(rc, cerr) |
2001 | } |
2002 | - |
2003 | - return C.GoString(cpathCreated), nil |
2004 | + return |
2005 | } |
2006 | |
2007 | // Set modifies the data for the existing node at the given path, replacing it |
2008 | @@ -677,35 +673,31 @@ |
2009 | // |
2010 | // It is an error to attempt to set the data of a non-existing node with |
2011 | // this function. In these cases, use Create instead. |
2012 | -func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) { |
2013 | +func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) { |
2014 | |
2015 | cpath := C.CString(path) |
2016 | cvalue := C.CString(value) |
2017 | defer C.free(unsafe.Pointer(cpath)) |
2018 | defer C.free(unsafe.Pointer(cvalue)) |
2019 | |
2020 | - cstat := C.struct_Stat{} |
2021 | - |
2022 | - rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)), |
2023 | - C.int(version), &cstat) |
2024 | - if rc != C.ZOK { |
2025 | - return nil, newError(rc, cerr) |
2026 | + var cstat Stat |
2027 | + rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c) |
2028 | + if rc == C.ZOK { |
2029 | + stat = &cstat |
2030 | + } else { |
2031 | + err = zkError(rc, cerr) |
2032 | } |
2033 | - |
2034 | - return (*resultStat)(&cstat), nil |
2035 | + return |
2036 | } |
2037 | |
2038 | // Delete removes the node at path. If version is not -1, the operation |
2039 | // will only succeed if the node is still at this version when the |
2040 | // node is deleted as an atomic operation. |
2041 | -func (zk *ZooKeeper) Delete(path string, version int32) (err Error) { |
2042 | +func (conn *Conn) Delete(path string, version int32) (err os.Error) { |
2043 | cpath := C.CString(path) |
2044 | defer C.free(unsafe.Pointer(cpath)) |
2045 | - rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version)) |
2046 | - if rc != C.ZOK { |
2047 | - return newError(rc, cerr) |
2048 | - } |
2049 | - return nil |
2050 | + rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) |
2051 | + return zkError(rc, cerr) |
2052 | } |
2053 | |
2054 | // AddAuth adds a new authentication certificate to the ZooKeeper |
2055 | @@ -713,7 +705,7 @@ |
2056 | // authentication information, while the cert parameter provides the |
2057 | // identity data itself. For instance, the "digest" scheme requires |
2058 | // a pair like "username:password" to be provided as the certificate. |
2059 | -func (zk *ZooKeeper) AddAuth(scheme, cert string) Error { |
2060 | +func (conn *Conn) AddAuth(scheme, cert string) os.Error { |
2061 | cscheme := C.CString(scheme) |
2062 | ccert := C.CString(cert) |
2063 | defer C.free(unsafe.Pointer(cscheme)) |
2064 | @@ -725,43 +717,38 @@ |
2065 | } |
2066 | defer C.destroy_completion_data(data) |
2067 | |
2068 | - rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)), |
2069 | - C.handle_void_completion, unsafe.Pointer(data)) |
2070 | + rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data)) |
2071 | if rc != C.ZOK { |
2072 | - return newError(rc, cerr) |
2073 | + return zkError(rc, cerr) |
2074 | } |
2075 | |
2076 | C.wait_for_completion(data) |
2077 | |
2078 | rc = C.int(uintptr(data.data)) |
2079 | - if rc != C.ZOK { |
2080 | - return newError(rc, nil) |
2081 | - } |
2082 | - |
2083 | - return nil |
2084 | + return zkError(rc, nil) |
2085 | } |
2086 | |
2087 | // ACL returns the access control list for path. |
2088 | -func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) { |
2089 | +func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) { |
2090 | |
2091 | cpath := C.CString(path) |
2092 | defer C.free(unsafe.Pointer(cpath)) |
2093 | |
2094 | caclv := C.struct_ACL_vector{} |
2095 | - cstat := C.struct_Stat{} |
2096 | |
2097 | - rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat) |
2098 | + var cstat Stat |
2099 | + rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) |
2100 | if rc != C.ZOK { |
2101 | - return nil, nil, newError(rc, cerr) |
2102 | + return nil, nil, zkError(rc, cerr) |
2103 | } |
2104 | |
2105 | aclv := parseACLVector(&caclv) |
2106 | |
2107 | - return aclv, (*resultStat)(&cstat), nil |
2108 | + return aclv, &cstat, nil |
2109 | } |
2110 | |
2111 | // SetACL changes the access control list for path. |
2112 | -func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error { |
2113 | +func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error { |
2114 | |
2115 | cpath := C.CString(path) |
2116 | defer C.free(unsafe.Pointer(cpath)) |
2117 | @@ -769,12 +756,8 @@ |
2118 | caclv := buildACLVector(aclv) |
2119 | defer C.deallocate_ACL_vector(caclv) |
2120 | |
2121 | - rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv) |
2122 | - if rc != C.ZOK { |
2123 | - return newError(rc, cerr) |
2124 | - } |
2125 | - |
2126 | - return nil |
2127 | + rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) |
2128 | + return zkError(rc, cerr) |
2129 | } |
2130 | |
2131 | func parseACLVector(caclv *C.struct_ACL_vector) []ACL { |
2132 | @@ -822,7 +805,7 @@ |
2133 | // ----------------------------------------------------------------------- |
2134 | // RetryChange utility method. |
2135 | |
2136 | -type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error) |
2137 | +type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error) |
2138 | |
2139 | // RetryChange runs changeFunc to attempt to atomically change path |
2140 | // in a lock free manner, and retries in case there was another |
2141 | @@ -831,8 +814,7 @@ |
2142 | // changeFunc must work correctly if called multiple times in case |
2143 | // the modification fails due to concurrent changes, and it may return |
2144 | // an error that will cause the the RetryChange function to stop and |
2145 | -// return an Error with code ZSYSTEMERROR and the same .String() result |
2146 | -// as the provided error. |
2147 | +// return the same error. |
2148 | // |
2149 | // This mechanism is not suitable for a node that is frequently modified |
2150 | // concurrently. For those cases, consider using a pessimistic locking |
2151 | @@ -845,8 +827,7 @@ |
2152 | // |
2153 | // 2. Call the changeFunc with the current node value and stat, |
2154 | // or with an empty string and nil stat, if the node doesn't yet exist. |
2155 | -// If the changeFunc returns an error, stop and return an Error with |
2156 | -// ZSYSTEMERROR Code() and the same String() as the changeFunc error. |
2157 | +// If the changeFunc returns an error, stop and return the same error. |
2158 | // |
2159 | // 3. If the changeFunc returns no errors, use the string returned as |
2160 | // the new candidate value for the node, and attempt to either create |
2161 | @@ -855,36 +836,32 @@ |
2162 | // in the same node), repeat from step 1. If this procedure fails with any |
2163 | // other error, stop and return the error found. |
2164 | // |
2165 | -func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) { |
2166 | +func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error { |
2167 | for { |
2168 | - oldValue, oldStat, getErr := zk.Get(path) |
2169 | - if getErr != nil && getErr.Code() != ZNONODE { |
2170 | - err = getErr |
2171 | - break |
2172 | - } |
2173 | - newValue, osErr := changeFunc(oldValue, oldStat) |
2174 | - if osErr != nil { |
2175 | - return newError(ZSYSTEMERROR, osErr) |
2176 | - } else if oldStat == nil { |
2177 | - _, err = zk.Create(path, newValue, flags, acl) |
2178 | - if err == nil || err.Code() != ZNODEEXISTS { |
2179 | - break |
2180 | + oldValue, oldStat, err := conn.Get(path) |
2181 | + if err != nil && err != ZNONODE { |
2182 | + return err |
2183 | + } |
2184 | + newValue, err := changeFunc(oldValue, oldStat) |
2185 | + if err != nil { |
2186 | + return err |
2187 | + } |
2188 | + if oldStat == nil { |
2189 | + _, err := conn.Create(path, newValue, flags, acl) |
2190 | + if err == nil || err != ZNODEEXISTS { |
2191 | + return err |
2192 | } |
2193 | - } else if newValue == oldValue { |
2194 | + continue |
2195 | + } |
2196 | + if newValue == oldValue { |
2197 | return nil // Nothing to do. |
2198 | - } else { |
2199 | - _, err = zk.Set(path, newValue, oldStat.Version()) |
2200 | - if err == nil { |
2201 | - break |
2202 | - } else { |
2203 | - code := err.Code() |
2204 | - if code != ZBADVERSION && code != ZNONODE { |
2205 | - break |
2206 | - } |
2207 | - } |
2208 | + } |
2209 | + _, err = conn.Set(path, newValue, oldStat.Version()) |
2210 | + if err == nil || (err != ZBADVERSION && err != ZNONODE) { |
2211 | + return err |
2212 | } |
2213 | } |
2214 | - return err |
2215 | + panic("not reached") |
2216 | } |
2217 | |
2218 | // ----------------------------------------------------------------------- |
2219 | @@ -897,7 +874,7 @@ |
2220 | // Whenever a *W method is called, it will return a channel which |
2221 | // outputs Event values. Internally, a map is used to maintain references |
2222 | // between an unique integer key (the watchId), and the event channel. The |
2223 | -// watchId is then handed to the C zookeeper library as the watch context, |
2224 | +// watchId is then handed to the C ZooKeeper library as the watch context, |
2225 | // so that we get it back when events happen. Using an integer key as the |
2226 | // watch context rather than a pointer is needed because there's no guarantee |
2227 | // that in the future the GC will not move objects around, and also because |
2228 | @@ -910,13 +887,13 @@ |
2229 | // Since Cgo doesn't allow calling back into Go, we actually fire a new |
2230 | // goroutine the very first time Init is called, and allow it to block |
2231 | // in a pthread condition variable within a C function. This condition |
2232 | -// will only be notified once a zookeeper watch callback appends new |
2233 | +// will only be notified once a ZooKeeper watch callback appends new |
2234 | // entries to the event list. When this happens, the C function returns |
2235 | // and we get back into Go land with the pointer to the watch data, |
2236 | // including the watchId and other event details such as type and path. |
2237 | |
2238 | var watchMutex sync.Mutex |
2239 | -var watchZooKeepers = make(map[uintptr]*ZooKeeper) |
2240 | +var watchConns = make(map[uintptr]*Conn) |
2241 | var watchCounter uintptr |
2242 | var watchLoopCounter int |
2243 | |
2244 | @@ -925,14 +902,14 @@ |
2245 | // mostly as a debugging and testing aid. |
2246 | func CountPendingWatches() int { |
2247 | watchMutex.Lock() |
2248 | - count := len(watchZooKeepers) |
2249 | + count := len(watchConns) |
2250 | watchMutex.Unlock() |
2251 | return count |
2252 | } |
2253 | |
2254 | // createWatch creates and registers a watch, returning the watch id |
2255 | // and channel. |
2256 | -func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
2257 | +func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
2258 | buf := 1 // session/watch event |
2259 | if session { |
2260 | buf = 32 |
2261 | @@ -942,8 +919,8 @@ |
2262 | defer watchMutex.Unlock() |
2263 | watchId = watchCounter |
2264 | watchCounter += 1 |
2265 | - zk.watchChannels[watchId] = watchChannel |
2266 | - watchZooKeepers[watchId] = zk |
2267 | + conn.watchChannels[watchId] = watchChannel |
2268 | + watchConns[watchId] = conn |
2269 | return |
2270 | } |
2271 | |
2272 | @@ -951,21 +928,21 @@ |
2273 | // from ever getting delivered. It shouldn't be used if there's any |
2274 | // chance the watch channel is still visible and not closed, since |
2275 | // it might mean a goroutine would be blocked forever. |
2276 | -func (zk *ZooKeeper) forgetWatch(watchId uintptr) { |
2277 | +func (conn *Conn) forgetWatch(watchId uintptr) { |
2278 | watchMutex.Lock() |
2279 | defer watchMutex.Unlock() |
2280 | - zk.watchChannels[watchId] = nil, false |
2281 | - watchZooKeepers[watchId] = nil, false |
2282 | + conn.watchChannels[watchId] = nil, false |
2283 | + watchConns[watchId] = nil, false |
2284 | } |
2285 | |
2286 | -// closeAllWatches closes all watch channels for zk. |
2287 | -func (zk *ZooKeeper) closeAllWatches() { |
2288 | +// closeAllWatches closes all watch channels for conn. |
2289 | +func (conn *Conn) closeAllWatches() { |
2290 | watchMutex.Lock() |
2291 | defer watchMutex.Unlock() |
2292 | - for watchId, ch := range zk.watchChannels { |
2293 | + for watchId, ch := range conn.watchChannels { |
2294 | close(ch) |
2295 | - zk.watchChannels[watchId] = nil, false |
2296 | - watchZooKeepers[watchId] = nil, false |
2297 | + conn.watchChannels[watchId] = nil, false |
2298 | + watchConns[watchId] = nil, false |
2299 | } |
2300 | } |
2301 | |
2302 | @@ -978,11 +955,11 @@ |
2303 | } |
2304 | watchMutex.Lock() |
2305 | defer watchMutex.Unlock() |
2306 | - zk, ok := watchZooKeepers[watchId] |
2307 | + conn, ok := watchConns[watchId] |
2308 | if !ok { |
2309 | return |
2310 | } |
2311 | - if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId { |
2312 | + if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId { |
2313 | switch event.State { |
2314 | case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED: |
2315 | default: |
2316 | @@ -990,7 +967,7 @@ |
2317 | return |
2318 | } |
2319 | } |
2320 | - ch := zk.watchChannels[watchId] |
2321 | + ch := conn.watchChannels[watchId] |
2322 | if ch == nil { |
2323 | return |
2324 | } |
2325 | @@ -1002,15 +979,15 @@ |
2326 | // straight to the buffer), and the application isn't paying |
2327 | // attention for long enough to have the buffer filled up. |
2328 | // Break down now rather than leaking forever. |
2329 | - if watchId == zk.sessionWatchId { |
2330 | + if watchId == conn.sessionWatchId { |
2331 | panic("Session event channel buffer is full") |
2332 | } else { |
2333 | panic("Watch event channel buffer is full") |
2334 | } |
2335 | } |
2336 | - if watchId != zk.sessionWatchId { |
2337 | - zk.watchChannels[watchId] = nil, false |
2338 | - watchZooKeepers[watchId] = nil, false |
2339 | + if watchId != conn.sessionWatchId { |
2340 | + conn.watchChannels[watchId] = nil, false |
2341 | + watchConns[watchId] = nil, false |
2342 | close(ch) |
2343 | } |
2344 | } |
2345 | |
2346 | === renamed file 'gozk_test.go' => 'zk_test.go' |
2347 | --- gozk_test.go 2011-08-19 01:51:37 +0000 |
2348 | +++ zk_test.go 2011-10-03 17:02:23 +0000 |
2349 | @@ -1,17 +1,17 @@ |
2350 | -package gozk_test |
2351 | +package zk_test |
2352 | |
2353 | import ( |
2354 | . "launchpad.net/gocheck" |
2355 | - "gozk" |
2356 | + "launchpad.net/gozk/zk" |
2357 | "time" |
2358 | ) |
2359 | |
2360 | // This error will be delivered via C errno, since ZK unfortunately |
2361 | // only provides the handler back from zookeeper_init(). |
2362 | func (s *S) TestInitErrorThroughErrno(c *C) { |
2363 | - zk, watch, err := gozk.Init("bad-domain-without-port", 5e9) |
2364 | - if zk != nil { |
2365 | - zk.Close() |
2366 | + conn, watch, err := zk.Dial("bad-domain-without-port", 5e9) |
2367 | + if conn != nil { |
2368 | + conn.Close() |
2369 | } |
2370 | if watch != nil { |
2371 | go func() { |
2372 | @@ -23,15 +23,15 @@ |
2373 | } |
2374 | }() |
2375 | } |
2376 | - c.Assert(zk, IsNil) |
2377 | + c.Assert(conn, IsNil) |
2378 | c.Assert(watch, IsNil) |
2379 | c.Assert(err, Matches, "invalid argument") |
2380 | } |
2381 | |
2382 | func (s *S) TestRecvTimeoutInitParameter(c *C) { |
2383 | - zk, watch, err := gozk.Init(s.zkAddr, 0) |
2384 | + conn, watch, err := zk.Dial(s.zkAddr, 0) |
2385 | c.Assert(err, IsNil) |
2386 | - defer zk.Close() |
2387 | + defer conn.Close() |
2388 | |
2389 | select { |
2390 | case <-watch: |
2391 | @@ -40,7 +40,7 @@ |
2392 | } |
2393 | |
2394 | for i := 0; i != 1000; i++ { |
2395 | - _, _, err := zk.Get("/zookeeper") |
2396 | + _, _, err := conn.Get("/zookeeper") |
2397 | if err != nil { |
2398 | c.Assert(err, Matches, "operation timeout") |
2399 | c.SucceedNow() |
2400 | @@ -51,76 +51,79 @@ |
2401 | } |
2402 | |
2403 | func (s *S) TestSessionWatches(c *C) { |
2404 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2405 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2406 | |
2407 | zk1, watch1 := s.init(c) |
2408 | zk2, watch2 := s.init(c) |
2409 | zk3, watch3 := s.init(c) |
2410 | |
2411 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2412 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2413 | |
2414 | event1 := <-watch1 |
2415 | - c.Assert(event1.Type, Equals, gozk.EVENT_SESSION) |
2416 | - c.Assert(event1.State, Equals, gozk.STATE_CONNECTED) |
2417 | + c.Assert(event1.Type, Equals, zk.EVENT_SESSION) |
2418 | + c.Assert(event1.State, Equals, zk.STATE_CONNECTED) |
2419 | |
2420 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2421 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2422 | |
2423 | event2 := <-watch2 |
2424 | - c.Assert(event2.Type, Equals, gozk.EVENT_SESSION) |
2425 | - c.Assert(event2.State, Equals, gozk.STATE_CONNECTED) |
2426 | + c.Assert(event2.Type, Equals, zk.EVENT_SESSION) |
2427 | + c.Assert(event2.State, Equals, zk.STATE_CONNECTED) |
2428 | |
2429 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2430 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2431 | |
2432 | event3 := <-watch3 |
2433 | - c.Assert(event3.Type, Equals, gozk.EVENT_SESSION) |
2434 | - c.Assert(event3.State, Equals, gozk.STATE_CONNECTED) |
2435 | + c.Assert(event3.Type, Equals, zk.EVENT_SESSION) |
2436 | + c.Assert(event3.State, Equals, zk.STATE_CONNECTED) |
2437 | |
2438 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2439 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2440 | |
2441 | zk1.Close() |
2442 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
2443 | + c.Assert(zk.CountPendingWatches(), Equals, 2) |
2444 | zk2.Close() |
2445 | - c.Assert(gozk.CountPendingWatches(), Equals, 1) |
2446 | + c.Assert(zk.CountPendingWatches(), Equals, 1) |
2447 | zk3.Close() |
2448 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2449 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2450 | } |
2451 | |
2452 | -// Gozk injects a STATE_CLOSED event when zk.Close() is called, right |
2453 | +// Gozk injects a STATE_CLOSED event when conn.Close() is called, right |
2454 | // before the channel is closed. Closing the channel injects a nil |
2455 | // pointer, as usual for Go, so the STATE_CLOSED gives a chance to |
2456 | // know that a nil pointer is coming, and to stop the procedure. |
2457 | // Hopefully this procedure will avoid some nil-pointer references by |
2458 | // mistake. |
2459 | func (s *S) TestClosingStateInSessionWatch(c *C) { |
2460 | - zk, watch := s.init(c) |
2461 | + conn, watch := s.init(c) |
2462 | |
2463 | event := <-watch |
2464 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2465 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2466 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
2467 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
2468 | |
2469 | - zk.Close() |
2470 | + conn.Close() |
2471 | event, ok := <-watch |
2472 | c.Assert(ok, Equals, false) |
2473 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
2474 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
2475 | + c.Assert(event.Type, Equals, zk.EVENT_CLOSED) |
2476 | + c.Assert(event.State, Equals, zk.STATE_CLOSED) |
2477 | } |
2478 | |
2479 | func (s *S) TestEventString(c *C) { |
2480 | - var event gozk.Event |
2481 | - event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED} |
2482 | + var event zk.Event |
2483 | + event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED} |
2484 | c.Assert(event, Matches, "ZooKeeper connected") |
2485 | - event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED} |
2486 | + event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED} |
2487 | c.Assert(event, Matches, "ZooKeeper connected; path created: /path") |
2488 | - event = gozk.Event{-1, "/path", gozk.STATE_CLOSED} |
2489 | + event = zk.Event{-1, "/path", zk.STATE_CLOSED} |
2490 | c.Assert(event, Matches, "ZooKeeper connection closed") |
2491 | } |
2492 | |
2493 | -var okTests = []struct{gozk.Event; Ok bool}{ |
2494 | - {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true}, |
2495 | - {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true}, |
2496 | - {gozk.Event{0, "", gozk.STATE_CLOSED}, false}, |
2497 | - {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false}, |
2498 | - {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false}, |
2499 | +var okTests = []struct { |
2500 | + zk.Event |
2501 | + Ok bool |
2502 | +}{ |
2503 | + {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true}, |
2504 | + {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true}, |
2505 | + {zk.Event{0, "", zk.STATE_CLOSED}, false}, |
2506 | + {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false}, |
2507 | + {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false}, |
2508 | } |
2509 | |
2510 | func (s *S) TestEventOk(c *C) { |
2511 | @@ -130,9 +133,9 @@ |
2512 | } |
2513 | |
2514 | func (s *S) TestGetAndStat(c *C) { |
2515 | - zk, _ := s.init(c) |
2516 | + conn, _ := s.init(c) |
2517 | |
2518 | - data, stat, err := zk.Get("/zookeeper") |
2519 | + data, stat, err := conn.Get("/zookeeper") |
2520 | c.Assert(err, IsNil) |
2521 | c.Assert(data, Equals, "") |
2522 | c.Assert(stat.Czxid(), Equals, int64(0)) |
2523 | @@ -149,58 +152,58 @@ |
2524 | } |
2525 | |
2526 | func (s *S) TestGetAndError(c *C) { |
2527 | - zk, _ := s.init(c) |
2528 | + conn, _ := s.init(c) |
2529 | |
2530 | - data, stat, err := zk.Get("/non-existent") |
2531 | + data, stat, err := conn.Get("/non-existent") |
2532 | |
2533 | c.Assert(data, Equals, "") |
2534 | c.Assert(stat, IsNil) |
2535 | c.Assert(err, Matches, "no node") |
2536 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2537 | + c.Assert(err, Equals, zk.ZNONODE) |
2538 | } |
2539 | |
2540 | func (s *S) TestCreateAndGet(c *C) { |
2541 | - zk, _ := s.init(c) |
2542 | + conn, _ := s.init(c) |
2543 | |
2544 | - path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2545 | + path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2546 | c.Assert(err, IsNil) |
2547 | c.Assert(path, Matches, "/test-[0-9]+") |
2548 | |
2549 | // Check the error condition from Create(). |
2550 | - _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2551 | + _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2552 | c.Assert(err, Matches, "node exists") |
2553 | |
2554 | - data, _, err := zk.Get(path) |
2555 | + data, _, err := conn.Get(path) |
2556 | c.Assert(err, IsNil) |
2557 | c.Assert(data, Equals, "bababum") |
2558 | } |
2559 | |
2560 | func (s *S) TestCreateSetAndGet(c *C) { |
2561 | - zk, _ := s.init(c) |
2562 | + conn, _ := s.init(c) |
2563 | |
2564 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2565 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2566 | c.Assert(err, IsNil) |
2567 | |
2568 | - stat, err := zk.Set("/test", "bababum", -1) // Any version. |
2569 | + stat, err := conn.Set("/test", "bababum", -1) // Any version. |
2570 | c.Assert(err, IsNil) |
2571 | c.Assert(stat.Version(), Equals, int32(1)) |
2572 | |
2573 | - data, _, err := zk.Get("/test") |
2574 | + data, _, err := conn.Get("/test") |
2575 | c.Assert(err, IsNil) |
2576 | c.Assert(data, Equals, "bababum") |
2577 | } |
2578 | |
2579 | func (s *S) TestGetAndWatch(c *C) { |
2580 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2581 | - |
2582 | - zk, _ := s.init(c) |
2583 | - |
2584 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2585 | - |
2586 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2587 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2588 | + |
2589 | + conn, _ := s.init(c) |
2590 | + |
2591 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2592 | + |
2593 | + _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2594 | c.Assert(err, IsNil) |
2595 | |
2596 | - data, stat, watch, err := zk.GetW("/test") |
2597 | + data, stat, watch, err := conn.GetW("/test") |
2598 | c.Assert(err, IsNil) |
2599 | c.Assert(data, Equals, "one") |
2600 | c.Assert(stat.Version(), Equals, int32(0)) |
2601 | @@ -211,17 +214,17 @@ |
2602 | default: |
2603 | } |
2604 | |
2605 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2606 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2607 | |
2608 | - _, err = zk.Set("/test", "two", -1) |
2609 | + _, err = conn.Set("/test", "two", -1) |
2610 | c.Assert(err, IsNil) |
2611 | |
2612 | event := <-watch |
2613 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2614 | - |
2615 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2616 | - |
2617 | - data, _, watch, err = zk.GetW("/test") |
2618 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2619 | + |
2620 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2621 | + |
2622 | + data, _, watch, err = conn.GetW("/test") |
2623 | c.Assert(err, IsNil) |
2624 | c.Assert(data, Equals, "two") |
2625 | |
2626 | @@ -231,86 +234,86 @@ |
2627 | default: |
2628 | } |
2629 | |
2630 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2631 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2632 | |
2633 | - _, err = zk.Set("/test", "three", -1) |
2634 | + _, err = conn.Set("/test", "three", -1) |
2635 | c.Assert(err, IsNil) |
2636 | |
2637 | event = <-watch |
2638 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2639 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2640 | |
2641 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2642 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2643 | } |
2644 | |
2645 | func (s *S) TestGetAndWatchWithError(c *C) { |
2646 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2647 | - |
2648 | - zk, _ := s.init(c) |
2649 | - |
2650 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2651 | - |
2652 | - _, _, watch, err := zk.GetW("/test") |
2653 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2654 | + |
2655 | + conn, _ := s.init(c) |
2656 | + |
2657 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2658 | + |
2659 | + _, _, watch, err := conn.GetW("/test") |
2660 | c.Assert(err, NotNil) |
2661 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2662 | + c.Assert(err, Equals, zk.ZNONODE) |
2663 | c.Assert(watch, IsNil) |
2664 | |
2665 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2666 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2667 | } |
2668 | |
2669 | func (s *S) TestCloseReleasesWatches(c *C) { |
2670 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2671 | - |
2672 | - zk, _ := s.init(c) |
2673 | - |
2674 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2675 | - |
2676 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2677 | - c.Assert(err, IsNil) |
2678 | - |
2679 | - _, _, _, err = zk.GetW("/test") |
2680 | - c.Assert(err, IsNil) |
2681 | - |
2682 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
2683 | - |
2684 | - zk.Close() |
2685 | - |
2686 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2687 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2688 | + |
2689 | + conn, _ := s.init(c) |
2690 | + |
2691 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2692 | + |
2693 | + _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2694 | + c.Assert(err, IsNil) |
2695 | + |
2696 | + _, _, _, err = conn.GetW("/test") |
2697 | + c.Assert(err, IsNil) |
2698 | + |
2699 | + c.Assert(zk.CountPendingWatches(), Equals, 2) |
2700 | + |
2701 | + conn.Close() |
2702 | + |
2703 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2704 | } |
2705 | |
2706 | // By default, the ZooKeeper C client will hang indefinitely if a |
2707 | // handler is closed twice. We get in the way and prevent it. |
2708 | func (s *S) TestClosingTwiceDoesntHang(c *C) { |
2709 | - zk, _ := s.init(c) |
2710 | - err := zk.Close() |
2711 | + conn, _ := s.init(c) |
2712 | + err := conn.Close() |
2713 | c.Assert(err, IsNil) |
2714 | - err = zk.Close() |
2715 | + err = conn.Close() |
2716 | c.Assert(err, NotNil) |
2717 | - c.Assert(err.Code(), Equals, gozk.ZCLOSING) |
2718 | + c.Assert(err, Equals, zk.ZCLOSING) |
2719 | } |
2720 | |
2721 | func (s *S) TestChildren(c *C) { |
2722 | - zk, _ := s.init(c) |
2723 | + conn, _ := s.init(c) |
2724 | |
2725 | - children, stat, err := zk.Children("/") |
2726 | + children, stat, err := conn.Children("/") |
2727 | c.Assert(err, IsNil) |
2728 | c.Assert(children, Equals, []string{"zookeeper"}) |
2729 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
2730 | |
2731 | - children, stat, err = zk.Children("/non-existent") |
2732 | + children, stat, err = conn.Children("/non-existent") |
2733 | c.Assert(err, NotNil) |
2734 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2735 | + c.Assert(err, Equals, zk.ZNONODE) |
2736 | c.Assert(children, Equals, []string{}) |
2737 | - c.Assert(stat, Equals, nil) |
2738 | + c.Assert(stat, IsNil) |
2739 | } |
2740 | |
2741 | func (s *S) TestChildrenAndWatch(c *C) { |
2742 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2743 | - |
2744 | - zk, _ := s.init(c) |
2745 | - |
2746 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2747 | - |
2748 | - children, stat, watch, err := zk.ChildrenW("/") |
2749 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2750 | + |
2751 | + conn, _ := s.init(c) |
2752 | + |
2753 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2754 | + |
2755 | + children, stat, watch, err := conn.ChildrenW("/") |
2756 | c.Assert(err, IsNil) |
2757 | c.Assert(children, Equals, []string{"zookeeper"}) |
2758 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
2759 | @@ -321,18 +324,18 @@ |
2760 | default: |
2761 | } |
2762 | |
2763 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2764 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2765 | |
2766 | - _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2767 | + _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2768 | c.Assert(err, IsNil) |
2769 | |
2770 | event := <-watch |
2771 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
2772 | + c.Assert(event.Type, Equals, zk.EVENT_CHILD) |
2773 | c.Assert(event.Path, Equals, "/") |
2774 | |
2775 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2776 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2777 | |
2778 | - children, stat, watch, err = zk.ChildrenW("/") |
2779 | + children, stat, watch, err = conn.ChildrenW("/") |
2780 | c.Assert(err, IsNil) |
2781 | c.Assert(stat.NumChildren(), Equals, int32(2)) |
2782 | |
2783 | @@ -345,57 +348,56 @@ |
2784 | default: |
2785 | } |
2786 | |
2787 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2788 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2789 | |
2790 | - _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2791 | + _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2792 | c.Assert(err, IsNil) |
2793 | |
2794 | event = <-watch |
2795 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
2796 | + c.Assert(event.Type, Equals, zk.EVENT_CHILD) |
2797 | |
2798 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2799 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2800 | } |
2801 | |
2802 | func (s *S) TestChildrenAndWatchWithError(c *C) { |
2803 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2804 | - |
2805 | - zk, _ := s.init(c) |
2806 | - |
2807 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2808 | - |
2809 | - _, stat, watch, err := zk.ChildrenW("/test") |
2810 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2811 | + |
2812 | + conn, _ := s.init(c) |
2813 | + |
2814 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2815 | + |
2816 | + _, stat, watch, err := conn.ChildrenW("/test") |
2817 | c.Assert(err, NotNil) |
2818 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2819 | + c.Assert(err, Equals, zk.ZNONODE) |
2820 | c.Assert(watch, IsNil) |
2821 | c.Assert(stat, IsNil) |
2822 | |
2823 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2824 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2825 | } |
2826 | |
2827 | func (s *S) TestExists(c *C) { |
2828 | - zk, _ := s.init(c) |
2829 | - |
2830 | - stat, err := zk.Exists("/zookeeper") |
2831 | - c.Assert(err, IsNil) |
2832 | - c.Assert(stat.NumChildren(), Equals, int32(1)) |
2833 | - |
2834 | - stat, err = zk.Exists("/non-existent") |
2835 | + conn, _ := s.init(c) |
2836 | + |
2837 | + stat, err := conn.Exists("/non-existent") |
2838 | c.Assert(err, IsNil) |
2839 | c.Assert(stat, IsNil) |
2840 | + |
2841 | + stat, err = conn.Exists("/zookeeper") |
2842 | + c.Assert(err, IsNil) |
2843 | } |
2844 | |
2845 | func (s *S) TestExistsAndWatch(c *C) { |
2846 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2847 | - |
2848 | - zk, _ := s.init(c) |
2849 | - |
2850 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2851 | - |
2852 | - stat, watch, err := zk.ExistsW("/test") |
2853 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2854 | + |
2855 | + conn, _ := s.init(c) |
2856 | + |
2857 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2858 | + |
2859 | + stat, watch, err := conn.ExistsW("/test") |
2860 | c.Assert(err, IsNil) |
2861 | c.Assert(stat, IsNil) |
2862 | |
2863 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2864 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2865 | |
2866 | select { |
2867 | case <-watch: |
2868 | @@ -403,62 +405,62 @@ |
2869 | default: |
2870 | } |
2871 | |
2872 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2873 | + _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2874 | c.Assert(err, IsNil) |
2875 | |
2876 | event := <-watch |
2877 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
2878 | + c.Assert(event.Type, Equals, zk.EVENT_CREATED) |
2879 | c.Assert(event.Path, Equals, "/test") |
2880 | |
2881 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2882 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2883 | |
2884 | - stat, watch, err = zk.ExistsW("/test") |
2885 | + stat, watch, err = conn.ExistsW("/test") |
2886 | c.Assert(err, IsNil) |
2887 | c.Assert(stat, NotNil) |
2888 | c.Assert(stat.NumChildren(), Equals, int32(0)) |
2889 | |
2890 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2891 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2892 | } |
2893 | |
2894 | func (s *S) TestExistsAndWatchWithError(c *C) { |
2895 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2896 | - |
2897 | - zk, _ := s.init(c) |
2898 | - |
2899 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2900 | - |
2901 | - stat, watch, err := zk.ExistsW("///") |
2902 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2903 | + |
2904 | + conn, _ := s.init(c) |
2905 | + |
2906 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2907 | + |
2908 | + stat, watch, err := conn.ExistsW("///") |
2909 | c.Assert(err, NotNil) |
2910 | - c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS) |
2911 | + c.Assert(err, Equals, zk.ZBADARGUMENTS) |
2912 | c.Assert(stat, IsNil) |
2913 | c.Assert(watch, IsNil) |
2914 | |
2915 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2916 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2917 | } |
2918 | |
2919 | func (s *S) TestDelete(c *C) { |
2920 | - zk, _ := s.init(c) |
2921 | - |
2922 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2923 | - c.Assert(err, IsNil) |
2924 | - |
2925 | - err = zk.Delete("/test", 5) |
2926 | - c.Assert(err, NotNil) |
2927 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
2928 | - |
2929 | - err = zk.Delete("/test", -1) |
2930 | - c.Assert(err, IsNil) |
2931 | - |
2932 | - err = zk.Delete("/test", -1) |
2933 | - c.Assert(err, NotNil) |
2934 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2935 | + conn, _ := s.init(c) |
2936 | + |
2937 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2938 | + c.Assert(err, IsNil) |
2939 | + |
2940 | + err = conn.Delete("/test", 5) |
2941 | + c.Assert(err, NotNil) |
2942 | + c.Assert(err, Equals, zk.ZBADVERSION) |
2943 | + |
2944 | + err = conn.Delete("/test", -1) |
2945 | + c.Assert(err, IsNil) |
2946 | + |
2947 | + err = conn.Delete("/test", -1) |
2948 | + c.Assert(err, NotNil) |
2949 | + c.Assert(err, Equals, zk.ZNONODE) |
2950 | } |
2951 | |
2952 | func (s *S) TestClientIdAndReInit(c *C) { |
2953 | zk1, _ := s.init(c) |
2954 | clientId1 := zk1.ClientId() |
2955 | |
2956 | - zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1) |
2957 | + zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1) |
2958 | c.Assert(err, IsNil) |
2959 | defer zk2.Close() |
2960 | clientId2 := zk2.ClientId() |
2961 | @@ -469,110 +471,114 @@ |
2962 | // Surprisingly for some (including myself, initially), the watch |
2963 | // returned by the exists method actually fires on data changes too. |
2964 | func (s *S) TestExistsWatchOnDataChange(c *C) { |
2965 | - zk, _ := s.init(c) |
2966 | - |
2967 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2968 | - c.Assert(err, IsNil) |
2969 | - |
2970 | - _, watch, err := zk.ExistsW("/test") |
2971 | - c.Assert(err, IsNil) |
2972 | - |
2973 | - _, err = zk.Set("/test", "new", -1) |
2974 | + conn, _ := s.init(c) |
2975 | + |
2976 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2977 | + c.Assert(err, IsNil) |
2978 | + |
2979 | + _, watch, err := conn.ExistsW("/test") |
2980 | + c.Assert(err, IsNil) |
2981 | + |
2982 | + _, err = conn.Set("/test", "new", -1) |
2983 | c.Assert(err, IsNil) |
2984 | |
2985 | event := <-watch |
2986 | |
2987 | c.Assert(event.Path, Equals, "/test") |
2988 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2989 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2990 | } |
2991 | |
2992 | func (s *S) TestACL(c *C) { |
2993 | - zk, _ := s.init(c) |
2994 | - |
2995 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2996 | - c.Assert(err, IsNil) |
2997 | - |
2998 | - acl, stat, err := zk.ACL("/test") |
2999 | - c.Assert(err, IsNil) |
3000 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
3001 | + conn, _ := s.init(c) |
3002 | + |
3003 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3004 | + c.Assert(err, IsNil) |
3005 | + |
3006 | + acl, stat, err := conn.ACL("/test") |
3007 | + c.Assert(err, IsNil) |
3008 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
3009 | c.Assert(stat, NotNil) |
3010 | c.Assert(stat.Version(), Equals, int32(0)) |
3011 | |
3012 | - acl, stat, err = zk.ACL("/non-existent") |
3013 | + acl, stat, err = conn.ACL("/non-existent") |
3014 | c.Assert(err, NotNil) |
3015 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
3016 | + c.Assert(err, Equals, zk.ZNONODE) |
3017 | c.Assert(acl, IsNil) |
3018 | c.Assert(stat, IsNil) |
3019 | } |
3020 | |
3021 | func (s *S) TestSetACL(c *C) { |
3022 | - zk, _ := s.init(c) |
3023 | + conn, _ := s.init(c) |
3024 | |
3025 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3026 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3027 | c.Assert(err, IsNil) |
3028 | |
3029 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5) |
3030 | + err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5) |
3031 | c.Assert(err, NotNil) |
3032 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
3033 | - |
3034 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1) |
3035 | - c.Assert(err, IsNil) |
3036 | - |
3037 | - acl, _, err := zk.ACL("/test") |
3038 | - c.Assert(err, IsNil) |
3039 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
3040 | + c.Assert(err, Equals, zk.ZBADVERSION) |
3041 | + |
3042 | + err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1) |
3043 | + c.Assert(err, IsNil) |
3044 | + |
3045 | + acl, _, err := conn.ACL("/test") |
3046 | + c.Assert(err, IsNil) |
3047 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
3048 | } |
3049 | |
3050 | func (s *S) TestAddAuth(c *C) { |
3051 | - zk, _ := s.init(c) |
3052 | - |
3053 | - acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
3054 | - |
3055 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl) |
3056 | + conn, _ := s.init(c) |
3057 | + |
3058 | + acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
3059 | + |
3060 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, acl) |
3061 | c.Assert(err, IsNil) |
3062 | |
3063 | - _, _, err = zk.Get("/test") |
3064 | + _, _, err = conn.Get("/test") |
3065 | c.Assert(err, NotNil) |
3066 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
3067 | + c.Assert(err, Equals, zk.ZNOAUTH) |
3068 | |
3069 | - err = zk.AddAuth("digest", "joe:passwd") |
3070 | + err = conn.AddAuth("digest", "joe:passwd") |
3071 | c.Assert(err, IsNil) |
3072 | |
3073 | - _, _, err = zk.Get("/test") |
3074 | + _, _, err = conn.Get("/test") |
3075 | c.Assert(err, IsNil) |
3076 | } |
3077 | |
3078 | func (s *S) TestWatchOnReconnection(c *C) { |
3079 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3080 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3081 | |
3082 | - zk, session := s.init(c) |
3083 | + conn, session := s.init(c) |
3084 | |
3085 | event := <-session |
3086 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
3087 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
3088 | - |
3089 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3090 | - |
3091 | - stat, watch, err := zk.ExistsW("/test") |
3092 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
3093 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3094 | + |
3095 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3096 | + |
3097 | + stat, watch, err := conn.ExistsW("/test") |
3098 | c.Assert(err, IsNil) |
3099 | c.Assert(stat, IsNil) |
3100 | |
3101 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3102 | - |
3103 | - s.StopZK() |
3104 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3105 | + |
3106 | + err = s.zkServer.Stop() |
3107 | + c.Assert(err, IsNil) |
3108 | + |
3109 | time.Sleep(2e9) |
3110 | - s.StartZK() |
3111 | |
3112 | - // The session channel should receive the reconnection notification, |
3113 | + err = s.zkServer.Start() |
3114 | + c.Assert(err, IsNil) |
3115 | + |
3116 | + // The session channel should receive the reconnection notification. |
3117 | select { |
3118 | case event := <-session: |
3119 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTING) |
3120 | + c.Assert(event.State, Equals, zk.STATE_CONNECTING) |
3121 | case <-time.After(3e9): |
3122 | c.Fatal("Session watch didn't fire") |
3123 | } |
3124 | select { |
3125 | case event := <-session: |
3126 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
3127 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3128 | case <-time.After(3e9): |
3129 | c.Fatal("Session watch didn't fire") |
3130 | } |
3131 | @@ -585,40 +591,40 @@ |
3132 | } |
3133 | |
3134 | // And it should still work. |
3135 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3136 | + _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3137 | c.Assert(err, IsNil) |
3138 | |
3139 | event = <-watch |
3140 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
3141 | + c.Assert(event.Type, Equals, zk.EVENT_CREATED) |
3142 | c.Assert(event.Path, Equals, "/test") |
3143 | |
3144 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3145 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3146 | } |
3147 | |
3148 | func (s *S) TestWatchOnSessionExpiration(c *C) { |
3149 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3150 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3151 | |
3152 | - zk, session := s.init(c) |
3153 | + conn, session := s.init(c) |
3154 | |
3155 | event := <-session |
3156 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
3157 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
3158 | - |
3159 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3160 | - |
3161 | - stat, watch, err := zk.ExistsW("/test") |
3162 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
3163 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3164 | + |
3165 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3166 | + |
3167 | + stat, watch, err := conn.ExistsW("/test") |
3168 | c.Assert(err, IsNil) |
3169 | c.Assert(stat, IsNil) |
3170 | |
3171 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3172 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3173 | |
3174 | // Use expiration trick described in the FAQ. |
3175 | - clientId := zk.ClientId() |
3176 | - zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId) |
3177 | + clientId := conn.ClientId() |
3178 | + zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId) |
3179 | |
3180 | for event := range session2 { |
3181 | c.Log("Event from overlapping session: ", event) |
3182 | - if event.State == gozk.STATE_CONNECTED { |
3183 | + if event.State == zk.STATE_CONNECTED { |
3184 | // Wait for zk to process the connection. |
3185 | // Not reliable without this. :-( |
3186 | time.Sleep(1e9) |
3187 | @@ -627,21 +633,21 @@ |
3188 | } |
3189 | for event := range session { |
3190 | c.Log("Event from primary session: ", event) |
3191 | - if event.State == gozk.STATE_EXPIRED_SESSION { |
3192 | + if event.State == zk.STATE_EXPIRED_SESSION { |
3193 | break |
3194 | } |
3195 | } |
3196 | |
3197 | select { |
3198 | case event := <-watch: |
3199 | - c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION) |
3200 | + c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION) |
3201 | case <-time.After(3e9): |
3202 | c.Fatal("Watch event didn't fire") |
3203 | } |
3204 | |
3205 | event = <-watch |
3206 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
3207 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
3208 | + c.Assert(event.Type, Equals, zk.EVENT_CLOSED) |
3209 | + c.Assert(event.State, Equals, zk.STATE_CLOSED) |
3210 | |
3211 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3212 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3213 | } |
This is a massive change unrelated to documentation.