Merge lp:~rogpeppe/gozk/factor-out-service into lp:~juju/gozk/trunk
- factor-out-service
- Merge into trunk
Status: | Rejected | ||||
---|---|---|---|---|---|
Rejected by: | Gustavo Niemeyer | ||||
Proposed branch: | lp:~rogpeppe/gozk/factor-out-service | ||||
Merge into: | lp:~juju/gozk/trunk | ||||
Diff against target: |
3109 lines (+1281/-784) 10 files modified
Makefile (+5/-2) example/example.go (+22/-23) reattach_test.go (+202/-0) retry_test.go (+65/-74) server.go (+239/-0) service/Makefile (+20/-0) service/service.go (+160/-0) suite_test.go (+45/-122) zk.go (+265/-311) zk_test.go (+258/-252) |
||||
To merge this branch: | bzr merge lp:~rogpeppe/gozk/factor-out-service | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gustavo Niemeyer | Disapprove | ||
Review via email: mp+77960@code.launchpad.net |
Commit message
Description of the change
Factor out server starting code into its own package.
It was orthogonal to the ZooKeeper server code,
feels like it might gain complexity over time,
and it potentially re-usable for other servers.
gozk/service should not be the final name,
as it's entirely independent of zk functionality.
Roger Peppe (rogpeppe) wrote : | # |
On 5 October 2011 16:32, Gustavo Niemeyer <email address hidden> wrote:
> Review: Disapprove
>
> This is an interesting idea, but at the same time it's full of problems and
> not really necessary right now. I suggest we delay it until a future point
> when we actually need that abstraction, as we'll be in a better position
> to generalize it. At that point, we can even preserve for the internal
> implementation of the ZooKeeper server itself, without many problems.
ok, i'll do that if you don't mind.
the thing that was mostly bothering me was the mingling of the code
from Server and Service, and that the complexities of Service were
obscuring the essential simplicity of Server.
BTW many of the comments below are applicable even without
the factored-out service package.
> Here is some feedback, regardless, which I suggest you don't work on
> right now besides for [1] which should be integrated.
>
> [1]
>
> === added file 'reattach_test.go'
> --- reattach_test.go 1970-01-01 00:00:00 +0000
> +++ reattach_test.go 2011-10-05 14:37:11 +0000
>
> This should be in a separate merge proposal as it's important and unrelated
> to the strawman changes proposed.
oh, i thought it was in the earlier merge.
i've added to the update-
> +type Interface interface {
> + // Directory gives the path to the directory containing
> + // the server's configuration and data files. It is assumed that
> + // this exists and can be modified.
> + Directory() string
>
> s/Directory/Dir/
actually maybe RunDir
> [3]
>
> +
> + // CheckAvailability should return an error if resources
> + // required by the server are not available; for instance
> + // if the server requires a network port which is not free.
> + CheckAvailability() os.Error
>
> We don't need this. The Command function can return an error if
> it knows it cannot start.
perhaps. i preferred to keep the availability checking separate
so that calling Command didn't imply that the service was
going to be started immediately.
> [4]
>
> + l, err := net.Listen("tcp", fmt.Sprintf(
> + if err != nil {
> + return fmt.Errorf("cannot listen on port %v: %v", port, err)
> + }
> + l.Close()
>
> This doesn't really guarantee that when the service itself is started,
> the port will be available, so we can as well not do it.
it doesn't guarantee it, but it does mean that we can return
an early error rather than relying on the server to print
the error somewhere at some unspecified later time.
i found it very useful when testing.
i wonder if the Start should print stdout and stderr from
the server for the first half second or so after it's been started.
that's a possible solution, but makes things slower.
> [5]
>
> +// Process returns a reference to the identity
> +// of the last running server in the Service's directory.
> +// If the server was shut down, it returns (nil, nil).
>
> This shouldn't be in a generic service API. What if there
> are multiple processes handled by the service?
my plan was to change things so the server is started in
its own process group or sess...
Unmerged revisions
- 23. By Roger Peppe
-
Factored out server running code into a separate package.
- 22. By Roger Peppe
-
Fix Server code.
- 21. By Gustavo Niemeyer
-
The package location is now http://
launchpad. net/gozk/ zk. The shorter package name makes for significantly more digestible line lengths.
- 20. By Gustavo Niemeyer
-
Merged clean-up-interface branch, by Roger Peppe [r=niemeyer]
This branch does a number of incompatible changes that improve
the overall API of gozk.
Preview Diff
1 | === modified file 'Makefile' |
2 | --- Makefile 2011-08-03 01:56:58 +0000 |
3 | +++ Makefile 2011-10-03 15:02:28 +0000 |
4 | @@ -2,10 +2,13 @@ |
5 | |
6 | all: package |
7 | |
8 | -TARG=gozk |
9 | +TARG=launchpad.net/gozk/zk |
10 | |
11 | +GOFILES=\ |
12 | + server.go\ |
13 | + |
14 | CGOFILES=\ |
15 | - gozk.go\ |
16 | + zk.go\ |
17 | |
18 | CGO_OFILES=\ |
19 | helpers.o\ |
20 | |
21 | === added directory 'example' |
22 | === renamed file 'example.go' => 'example/example.go' |
23 | --- example.go 2011-08-03 01:47:25 +0000 |
24 | +++ example/example.go 2011-10-03 15:02:28 +0000 |
25 | @@ -1,30 +1,29 @@ |
26 | package main |
27 | |
28 | import ( |
29 | - "gozk" |
30 | + "launchpad.net/zookeeper/zookeeper" |
31 | ) |
32 | |
33 | func main() { |
34 | - zk, session, err := gozk.Init("localhost:2181", 5000) |
35 | - if err != nil { |
36 | - println("Couldn't connect: " + err.String()) |
37 | - return |
38 | - } |
39 | - |
40 | - defer zk.Close() |
41 | - |
42 | - // Wait for connection. |
43 | - event := <-session |
44 | - if event.State != gozk.STATE_CONNECTED { |
45 | - println("Couldn't connect") |
46 | - return |
47 | - } |
48 | - |
49 | - _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL)) |
50 | - if err != nil { |
51 | - println(err.String()) |
52 | - } else { |
53 | - println("Created!") |
54 | - } |
55 | + zk, session, err := zookeeper.Init("localhost:2181", 5000) |
56 | + if err != nil { |
57 | + println("Couldn't connect: " + err.String()) |
58 | + return |
59 | + } |
60 | + |
61 | + defer zk.Close() |
62 | + |
63 | + // Wait for connection. |
64 | + event := <-session |
65 | + if event.State != zookeeper.STATE_CONNECTED { |
66 | + println("Couldn't connect") |
67 | + return |
68 | + } |
69 | + |
70 | + _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
71 | + if err != nil { |
72 | + println(err.String()) |
73 | + } else { |
74 | + println("Created!") |
75 | + } |
76 | } |
77 | - |
78 | |
79 | === added file 'reattach_test.go' |
80 | --- reattach_test.go 1970-01-01 00:00:00 +0000 |
81 | +++ reattach_test.go 2011-10-03 15:02:28 +0000 |
82 | @@ -0,0 +1,202 @@ |
83 | +package zk_test |
84 | + |
85 | +import ( |
86 | + "bufio" |
87 | + . "launchpad.net/gocheck" |
88 | + "launchpad.net/gozk/zk/service" |
89 | + "launchpad.net/gozk/zk" |
90 | + "exec" |
91 | + "flag" |
92 | + "fmt" |
93 | + "os" |
94 | + "strings" |
95 | + "testing" |
96 | + "time" |
97 | +) |
98 | + |
99 | +var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing") |
100 | +var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing") |
101 | +var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing") |
102 | + |
103 | +// This is the reentrancy point for testing ZooKeeper servers |
104 | +// started by processes that are not direct children of the |
105 | +// testing process. This test always succeeds - the status |
106 | +// will be written to stdout and read by indirectServer. |
107 | +func TestStartNonChildServer(t *testing.T) { |
108 | + if !*reattach { |
109 | + // not re-entrant, so ignore this test. |
110 | + return |
111 | + } |
112 | + err := startServer(*reattachRunDir, *reattachAbnormalStop) |
113 | + if err != nil { |
114 | + fmt.Printf("error:%v\n", err) |
115 | + return |
116 | + } |
117 | + fmt.Printf("done\n") |
118 | +} |
119 | + |
120 | +func (s *S) startServer(c *C, abort bool) { |
121 | + err := startServer(s.zkTestRoot, abort) |
122 | + c.Assert(err, IsNil) |
123 | +} |
124 | + |
125 | +// startServerIndirect starts a ZooKeeper server that is not |
126 | +// a direct child of the current process. If abort is true, |
127 | +// the server will be started and then terminated abnormally. |
128 | +func (s *S) startServerIndirect(c *C, abort bool) { |
129 | + if len(os.Args) == 0 { |
130 | + c.Fatal("Cannot find self executable name") |
131 | + } |
132 | + cmd := exec.Command( |
133 | + os.Args[0], |
134 | + "-zktest.reattach", |
135 | + "-zktest.rundir", s.zkTestRoot, |
136 | + "-zktest.stop=", fmt.Sprint(abort), |
137 | + "-test.run", "StartNonChildServer", |
138 | + ) |
139 | + r, err := cmd.StdoutPipe() |
140 | + c.Assert(err, IsNil) |
141 | + defer r.Close() |
142 | + if err := cmd.Start(); err != nil { |
143 | + c.Fatalf("cannot start re-entrant gotest process: %v", err) |
144 | + } |
145 | + defer cmd.Wait() |
146 | + bio := bufio.NewReader(r) |
147 | + for { |
148 | + line, err := bio.ReadSlice('\n') |
149 | + if err != nil { |
150 | + c.Fatalf("indirect server status line not found: %v", err) |
151 | + } |
152 | + s := string(line) |
153 | + if strings.HasPrefix(s, "error:") { |
154 | + c.Fatalf("indirect server error: %s", s[len("error:"):]) |
155 | + } |
156 | + if s == "done\n" { |
157 | + return |
158 | + } |
159 | + } |
160 | + panic("not reached") |
161 | +} |
162 | + |
163 | +// startServer starts a ZooKeeper server, and terminates it abnormally |
164 | +// if abort is true. |
165 | +func startServer(runDir string, abort bool) os.Error { |
166 | + s, err := zk.AttachServer(runDir) |
167 | + if err != nil { |
168 | + return fmt.Errorf("cannot attach to server at %q: %v", runDir, err) |
169 | + } |
170 | + srv := service.New(s) |
171 | + if err := srv.Start(); err != nil { |
172 | + return fmt.Errorf("cannot start server: %v", err) |
173 | + } |
174 | + if abort { |
175 | + // Give it time to start up, then kill the server process abnormally, |
176 | + // leaving the pid.txt file behind. |
177 | + time.Sleep(0.5e9) |
178 | + p, err := srv.Process() |
179 | + if err != nil { |
180 | + return fmt.Errorf("cannot get server process: %v", err) |
181 | + } |
182 | + defer p.Release() |
183 | + if err := p.Kill(); err != nil { |
184 | + return fmt.Errorf("cannot kill server process: %v", err) |
185 | + } |
186 | + } |
187 | + return nil |
188 | +} |
189 | + |
190 | +func (s *S) checkCookie(c *C) { |
191 | + conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
192 | + c.Assert(err, IsNil) |
193 | + |
194 | + e, ok := <-watch |
195 | + c.Assert(ok, Equals, true) |
196 | + c.Assert(e.Ok(), Equals, true) |
197 | + |
198 | + c.Assert(err, IsNil) |
199 | + cookie, _, err := conn.Get("/testAttachCookie") |
200 | + c.Assert(err, IsNil) |
201 | + c.Assert(cookie, Equals, "testAttachCookie") |
202 | + conn.Close() |
203 | +} |
204 | + |
205 | +// cases to test: |
206 | +// child server, stopped normally; reattach, start |
207 | +// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start |
208 | +// non-direct child server, still running; reattach, start (->error), stop, start |
209 | +// child server, still running; reattach, start (-> error) |
210 | +// child server, still running; reattach, stop, start. |
211 | +// non-direct child server, still running; reattach, stop, start. |
212 | +func (s *S) TestAttachServer(c *C) { |
213 | + // Create a cookie so that we know we are reattaching to the same instance. |
214 | + conn, _ := s.init(c) |
215 | + _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL)) |
216 | + c.Assert(err, IsNil) |
217 | + s.checkCookie(c) |
218 | + s.zkServer.Stop() |
219 | + s.zkServer = nil |
220 | + |
221 | + s.testAttachServer(c, (*S).startServer) |
222 | + s.testAttachServer(c, (*S).startServerIndirect) |
223 | + s.testAttachServerAbnormalTerminate(c, (*S).startServer) |
224 | + s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect) |
225 | + |
226 | + srv, err := zk.AttachServer(s.zkTestRoot) |
227 | + c.Assert(err, IsNil) |
228 | + |
229 | + s.zkServer = service.New(srv) |
230 | + err = s.zkServer.Start() |
231 | + c.Assert(err, IsNil) |
232 | + |
233 | + conn, _ = s.init(c) |
234 | + err = conn.Delete("/testAttachCookie", -1) |
235 | + c.Assert(err, IsNil) |
236 | +} |
237 | + |
238 | +func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) { |
239 | + start(s, c, false) |
240 | + |
241 | + s.checkCookie(c) |
242 | + |
243 | + // try attaching to it while it is still running - it should fail. |
244 | + a, err := zk.AttachServer(s.zkTestRoot) |
245 | + c.Assert(err, IsNil) |
246 | + srv := service.New(a) |
247 | + |
248 | + err = srv.Start() |
249 | + c.Assert(err, NotNil) |
250 | + |
251 | + // stop it and then start it again - it should succeed. |
252 | + err = srv.Stop() |
253 | + c.Assert(err, IsNil) |
254 | + |
255 | + err = srv.Start() |
256 | + c.Assert(err, IsNil) |
257 | + |
258 | + s.checkCookie(c) |
259 | + |
260 | + err = srv.Stop() |
261 | + c.Assert(err, IsNil) |
262 | +} |
263 | + |
264 | +func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) { |
265 | + start(s, c, true) |
266 | + |
267 | + // try attaching to it and starting - it should fail, because pid.txt |
268 | + // won't have been removed. |
269 | + a, err := zk.AttachServer(s.zkTestRoot) |
270 | + c.Assert(err, IsNil) |
271 | + srv := service.New(a) |
272 | + err = srv.Start() |
273 | + c.Assert(err, NotNil) |
274 | + |
275 | + // stopping it should bring things back to normal. |
276 | + err = srv.Stop() |
277 | + c.Assert(err, IsNil) |
278 | + err = srv.Start() |
279 | + c.Assert(err, IsNil) |
280 | + |
281 | + s.checkCookie(c) |
282 | + err = srv.Stop() |
283 | + c.Assert(err, IsNil) |
284 | +} |
285 | \ No newline at end of file |
286 | |
287 | === modified file 'retry_test.go' |
288 | --- retry_test.go 2011-08-19 01:51:37 +0000 |
289 | +++ retry_test.go 2011-10-03 15:02:28 +0000 |
290 | @@ -1,42 +1,41 @@ |
291 | -package gozk_test |
292 | +package zk_test |
293 | |
294 | import ( |
295 | . "launchpad.net/gocheck" |
296 | - "gozk" |
297 | + "launchpad.net/gozk/zk" |
298 | "os" |
299 | ) |
300 | |
301 | func (s *S) TestRetryChangeCreating(c *C) { |
302 | - zk, _ := s.init(c) |
303 | + conn, _ := s.init(c) |
304 | |
305 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
306 | - func(data string, stat gozk.Stat) (string, os.Error) { |
307 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
308 | + func(data string, stat *zk.Stat) (string, os.Error) { |
309 | c.Assert(data, Equals, "") |
310 | c.Assert(stat, IsNil) |
311 | return "new", nil |
312 | }) |
313 | c.Assert(err, IsNil) |
314 | |
315 | - data, stat, err := zk.Get("/test") |
316 | + data, stat, err := conn.Get("/test") |
317 | c.Assert(err, IsNil) |
318 | c.Assert(stat, NotNil) |
319 | c.Assert(stat.Version(), Equals, int32(0)) |
320 | c.Assert(data, Equals, "new") |
321 | |
322 | - acl, _, err := zk.ACL("/test") |
323 | + acl, _, err := conn.ACL("/test") |
324 | c.Assert(err, IsNil) |
325 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
326 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
327 | } |
328 | |
329 | func (s *S) TestRetryChangeSetting(c *C) { |
330 | - zk, _ := s.init(c) |
331 | + conn, _ := s.init(c) |
332 | |
333 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
334 | - gozk.WorldACL(gozk.PERM_ALL)) |
335 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
336 | c.Assert(err, IsNil) |
337 | |
338 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
339 | - func(data string, stat gozk.Stat) (string, os.Error) { |
340 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
341 | + func(data string, stat *zk.Stat) (string, os.Error) { |
342 | c.Assert(data, Equals, "old") |
343 | c.Assert(stat, NotNil) |
344 | c.Assert(stat.Version(), Equals, int32(0)) |
345 | @@ -44,27 +43,26 @@ |
346 | }) |
347 | c.Assert(err, IsNil) |
348 | |
349 | - data, stat, err := zk.Get("/test") |
350 | + data, stat, err := conn.Get("/test") |
351 | c.Assert(err, IsNil) |
352 | c.Assert(stat, NotNil) |
353 | c.Assert(stat.Version(), Equals, int32(1)) |
354 | c.Assert(data, Equals, "brand new") |
355 | |
356 | // ACL was unchanged by RetryChange(). |
357 | - acl, _, err := zk.ACL("/test") |
358 | + acl, _, err := conn.ACL("/test") |
359 | c.Assert(err, IsNil) |
360 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
361 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
362 | } |
363 | |
364 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { |
365 | - zk, _ := s.init(c) |
366 | + conn, _ := s.init(c) |
367 | |
368 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
369 | - gozk.WorldACL(gozk.PERM_ALL)) |
370 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
371 | c.Assert(err, IsNil) |
372 | |
373 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
374 | - func(data string, stat gozk.Stat) (string, os.Error) { |
375 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
376 | + func(data string, stat *zk.Stat) (string, os.Error) { |
377 | c.Assert(data, Equals, "old") |
378 | c.Assert(stat, NotNil) |
379 | c.Assert(stat.Version(), Equals, int32(0)) |
380 | @@ -72,7 +70,7 @@ |
381 | }) |
382 | c.Assert(err, IsNil) |
383 | |
384 | - data, stat, err := zk.Get("/test") |
385 | + data, stat, err := conn.Get("/test") |
386 | c.Assert(err, IsNil) |
387 | c.Assert(stat, NotNil) |
388 | c.Assert(stat.Version(), Equals, int32(0)) // Unchanged! |
389 | @@ -80,14 +78,14 @@ |
390 | } |
391 | |
392 | func (s *S) TestRetryChangeConflictOnCreate(c *C) { |
393 | - zk, _ := s.init(c) |
394 | + conn, _ := s.init(c) |
395 | |
396 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
397 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
398 | switch data { |
399 | case "": |
400 | c.Assert(stat, IsNil) |
401 | - _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL, |
402 | - gozk.WorldACL(gozk.PERM_ALL)) |
403 | + _, err := conn.Create("/test", "conflict", zk.EPHEMERAL, |
404 | + zk.WorldACL(zk.PERM_ALL)) |
405 | c.Assert(err, IsNil) |
406 | return "<none> => conflict", nil |
407 | case "conflict": |
408 | @@ -100,11 +98,10 @@ |
409 | return "can't happen", nil |
410 | } |
411 | |
412 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
413 | - changeFunc) |
414 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc) |
415 | c.Assert(err, IsNil) |
416 | |
417 | - data, stat, err := zk.Get("/test") |
418 | + data, stat, err := conn.Get("/test") |
419 | c.Assert(err, IsNil) |
420 | c.Assert(data, Equals, "conflict => new") |
421 | c.Assert(stat, NotNil) |
422 | @@ -112,18 +109,17 @@ |
423 | } |
424 | |
425 | func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) { |
426 | - zk, _ := s.init(c) |
427 | + conn, _ := s.init(c) |
428 | |
429 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
430 | - gozk.WorldACL(gozk.PERM_ALL)) |
431 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
432 | c.Assert(err, IsNil) |
433 | |
434 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
435 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
436 | switch data { |
437 | case "old": |
438 | c.Assert(stat, NotNil) |
439 | c.Assert(stat.Version(), Equals, int32(0)) |
440 | - _, err := zk.Set("/test", "conflict", 0) |
441 | + _, err := conn.Set("/test", "conflict", 0) |
442 | c.Assert(err, IsNil) |
443 | return "old => new", nil |
444 | case "conflict": |
445 | @@ -136,10 +132,10 @@ |
446 | return "can't happen", nil |
447 | } |
448 | |
449 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc) |
450 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc) |
451 | c.Assert(err, IsNil) |
452 | |
453 | - data, stat, err := zk.Get("/test") |
454 | + data, stat, err := conn.Get("/test") |
455 | c.Assert(err, IsNil) |
456 | c.Assert(data, Equals, "conflict => new") |
457 | c.Assert(stat, NotNil) |
458 | @@ -147,18 +143,17 @@ |
459 | } |
460 | |
461 | func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) { |
462 | - zk, _ := s.init(c) |
463 | + conn, _ := s.init(c) |
464 | |
465 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
466 | - gozk.WorldACL(gozk.PERM_ALL)) |
467 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
468 | c.Assert(err, IsNil) |
469 | |
470 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
471 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
472 | switch data { |
473 | case "old": |
474 | c.Assert(stat, NotNil) |
475 | c.Assert(stat.Version(), Equals, int32(0)) |
476 | - err := zk.Delete("/test", 0) |
477 | + err := conn.Delete("/test", 0) |
478 | c.Assert(err, IsNil) |
479 | return "old => <deleted>", nil |
480 | case "": |
481 | @@ -170,55 +165,53 @@ |
482 | return "can't happen", nil |
483 | } |
484 | |
485 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ), |
486 | - changeFunc) |
487 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc) |
488 | c.Assert(err, IsNil) |
489 | |
490 | - data, stat, err := zk.Get("/test") |
491 | + data, stat, err := conn.Get("/test") |
492 | c.Assert(err, IsNil) |
493 | c.Assert(data, Equals, "<deleted> => new") |
494 | c.Assert(stat, NotNil) |
495 | c.Assert(stat.Version(), Equals, int32(0)) |
496 | |
497 | // Should be the new ACL. |
498 | - acl, _, err := zk.ACL("/test") |
499 | + acl, _, err := conn.ACL("/test") |
500 | c.Assert(err, IsNil) |
501 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
502 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
503 | } |
504 | |
505 | func (s *S) TestRetryChangeErrorInCallback(c *C) { |
506 | - zk, _ := s.init(c) |
507 | + conn, _ := s.init(c) |
508 | |
509 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
510 | - func(data string, stat gozk.Stat) (string, os.Error) { |
511 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
512 | + func(data string, stat *zk.Stat) (string, os.Error) { |
513 | return "don't use this", os.NewError("BOOM!") |
514 | }) |
515 | c.Assert(err, NotNil) |
516 | - c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR) |
517 | c.Assert(err.String(), Equals, "BOOM!") |
518 | |
519 | - stat, err := zk.Exists("/test") |
520 | + stat, err := conn.Exists("/test") |
521 | c.Assert(err, IsNil) |
522 | c.Assert(stat, IsNil) |
523 | } |
524 | |
525 | func (s *S) TestRetryChangeFailsReading(c *C) { |
526 | - zk, _ := s.init(c) |
527 | + conn, _ := s.init(c) |
528 | |
529 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
530 | - gozk.WorldACL(gozk.PERM_WRITE)) // Write only! |
531 | + // Write only! |
532 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE)) |
533 | c.Assert(err, IsNil) |
534 | |
535 | var called bool |
536 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
537 | - func(data string, stat gozk.Stat) (string, os.Error) { |
538 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
539 | + func(data string, stat *zk.Stat) (string, os.Error) { |
540 | called = true |
541 | return "", nil |
542 | }) |
543 | c.Assert(err, NotNil) |
544 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
545 | + c.Assert(err, Equals, zk.ZNOAUTH) |
546 | |
547 | - stat, err := zk.Exists("/test") |
548 | + stat, err := conn.Exists("/test") |
549 | c.Assert(err, IsNil) |
550 | c.Assert(stat, NotNil) |
551 | c.Assert(stat.Version(), Equals, int32(0)) |
552 | @@ -227,22 +220,21 @@ |
553 | } |
554 | |
555 | func (s *S) TestRetryChangeFailsSetting(c *C) { |
556 | - zk, _ := s.init(c) |
557 | + conn, _ := s.init(c) |
558 | |
559 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
560 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
561 | + // Read only! |
562 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
563 | c.Assert(err, IsNil) |
564 | |
565 | var called bool |
566 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
567 | - func(data string, stat gozk.Stat) (string, os.Error) { |
568 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
569 | + func(data string, stat *zk.Stat) (string, os.Error) { |
570 | called = true |
571 | return "", nil |
572 | }) |
573 | - c.Assert(err, NotNil) |
574 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
575 | + c.Assert(err, Equals, zk.ZNOAUTH) |
576 | |
577 | - stat, err := zk.Exists("/test") |
578 | + stat, err := conn.Exists("/test") |
579 | c.Assert(err, IsNil) |
580 | c.Assert(stat, NotNil) |
581 | c.Assert(stat.Version(), Equals, int32(0)) |
582 | @@ -251,23 +243,22 @@ |
583 | } |
584 | |
585 | func (s *S) TestRetryChangeFailsCreating(c *C) { |
586 | - zk, _ := s.init(c) |
587 | + conn, _ := s.init(c) |
588 | |
589 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
590 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
591 | + // Read only! |
592 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
593 | c.Assert(err, IsNil) |
594 | |
595 | var called bool |
596 | - err = zk.RetryChange("/test/sub", gozk.EPHEMERAL, |
597 | - gozk.WorldACL(gozk.PERM_ALL), |
598 | - func(data string, stat gozk.Stat) (string, os.Error) { |
599 | + err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
600 | + func(data string, stat *zk.Stat) (string, os.Error) { |
601 | called = true |
602 | return "", nil |
603 | }) |
604 | c.Assert(err, NotNil) |
605 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
606 | + c.Assert(err, Equals, zk.ZNOAUTH) |
607 | |
608 | - stat, err := zk.Exists("/test/sub") |
609 | + stat, err := conn.Exists("/test/sub") |
610 | c.Assert(err, IsNil) |
611 | c.Assert(stat, IsNil) |
612 | |
613 | |
614 | === added file 'server.go' |
615 | --- server.go 1970-01-01 00:00:00 +0000 |
616 | +++ server.go 2011-10-03 15:02:28 +0000 |
617 | @@ -0,0 +1,239 @@ |
618 | +package zk |
619 | + |
620 | +import ( |
621 | + "bufio" |
622 | + "bytes" |
623 | + "fmt" |
624 | + "io/ioutil" |
625 | + "net" |
626 | + "os" |
627 | + "path/filepath" |
628 | + "strings" |
629 | +) |
630 | + |
631 | +// Server represents a ZooKeeper server, its data and configuration files. |
632 | +type Server struct { |
633 | + runDir string |
634 | + installDir string |
635 | +} |
636 | + |
637 | +func (srv *Server) Directory() string { |
638 | + return srv.runDir |
639 | +} |
640 | + |
641 | +// CreateServer creates the directory runDir and sets up a ZooKeeper server |
642 | +// environment inside it. It is an error if runDir already exists. |
643 | +// The server will listen on the specified TCP port. |
644 | +// |
645 | +// The ZooKeeper installation directory is specified by installDir. |
646 | +// If this is empty, a system default will be used. |
647 | +// |
648 | +// CreateServer does not start the server - the *Server that |
649 | +// is returned can be used with the service package, for example: |
650 | +// see launchpad.net/gozk/zk/service. |
651 | +func CreateServer(port int, runDir, installDir string) (*Server, os.Error) { |
652 | + if err := os.Mkdir(runDir, 0777); err != nil { |
653 | + return nil, err |
654 | + } |
655 | + srv := &Server{runDir: runDir, installDir: installDir} |
656 | + if err := srv.writeLog4JConfig(); err != nil { |
657 | + return nil, err |
658 | + } |
659 | + if err := srv.writeZooKeeperConfig(port); err != nil { |
660 | + return nil, err |
661 | + } |
662 | + if err := srv.writeInstallDir(); err != nil { |
663 | + return nil, err |
664 | + } |
665 | + return srv, nil |
666 | +} |
667 | + |
668 | +// AttachServer creates a new ZooKeeper Server instance |
669 | +// to operate inside an existing run directory, runDir. |
670 | +// The directory must have been created with CreateServer. |
671 | +func AttachServer(runDir string) (*Server, os.Error) { |
672 | + srv := &Server{runDir: runDir} |
673 | + if err := srv.readInstallDir(); err != nil { |
674 | + return nil, fmt.Errorf("cannot read server install directory: %v", err) |
675 | + } |
676 | + return srv, nil |
677 | +} |
678 | + |
679 | +func (srv *Server) path(name string) string { |
680 | + return filepath.Join(srv.runDir, name) |
681 | +} |
682 | + |
683 | +func (srv *Server) CheckAvailability() os.Error { |
684 | + port, err := srv.NetworkPort() |
685 | + if err != nil { |
686 | + return fmt.Errorf("cannot get network port: %v", err) |
687 | + } |
688 | + l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) |
689 | + if err != nil { |
690 | + return fmt.Errorf("cannot listen on port %v: %v", port, err) |
691 | + } |
692 | + l.Close() |
693 | + return nil |
694 | +} |
695 | + |
696 | +// ServerCommand returns the command used to start the |
697 | +// ZooKeeper server. It is provided for debugging and testing |
698 | +// purposes only. |
699 | +func (srv *Server) Command() ([]string, os.Error) { |
700 | + cp, err := srv.classPath() |
701 | + if err != nil { |
702 | + return nil, fmt.Errorf("cannot get class path: %v", err) |
703 | + } |
704 | + return []string{ |
705 | + "java", |
706 | + "-cp", strings.Join(cp, ":"), |
707 | + "-Dzookeeper.root.logger=INFO,CONSOLE", |
708 | + "-Dlog4j.configuration=file:"+srv.path("log4j.properties"), |
709 | + "org.apache.zookeeper.server.quorum.QuorumPeerMain", |
710 | + srv.path("zoo.cfg"), |
711 | + }, nil |
712 | +} |
713 | + |
714 | +var log4jProperties = ` |
715 | +log4j.rootLogger=INFO, CONSOLE |
716 | +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender |
717 | +log4j.appender.CONSOLE.Threshold=INFO |
718 | +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout |
719 | +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n |
720 | +` |
721 | + |
722 | +func (srv *Server) writeLog4JConfig() (err os.Error) { |
723 | + return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666) |
724 | +} |
725 | + |
726 | +func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) { |
727 | + return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf( |
728 | + "tickTime=2000\n"+ |
729 | + "dataDir=%s\n"+ |
730 | + "clientPort=%d\n"+ |
731 | + "maxClientCnxns=500\n", |
732 | + srv.runDir, port)), 0666) |
733 | +} |
734 | + |
735 | +// NetworkPort returns the TCP port number that |
736 | +// the server is configured for. |
737 | +func (srv *Server) NetworkPort() (int, os.Error) { |
738 | + f, err := os.Open(srv.path("zoo.cfg")) |
739 | + if err != nil { |
740 | + return 0, err |
741 | + } |
742 | + r := bufio.NewReader(f) |
743 | + for { |
744 | + line, err := r.ReadSlice('\n') |
745 | + if err != nil { |
746 | + return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg")) |
747 | + } |
748 | + var port int |
749 | + if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 { |
750 | + return port, nil |
751 | + } |
752 | + } |
753 | + panic("not reached") |
754 | +} |
755 | + |
756 | +func (srv *Server) writeInstallDir() os.Error { |
757 | + return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666) |
758 | +} |
759 | + |
760 | +func (srv *Server) readInstallDir() os.Error { |
761 | + data, err := ioutil.ReadFile(srv.path("installdir.txt")) |
762 | + if err != nil { |
763 | + return err |
764 | + } |
765 | + if data[len(data)-1] == '\n' { |
766 | + data = data[0 : len(data)-1] |
767 | + } |
768 | + srv.installDir = string(data) |
769 | + return nil |
770 | +} |
771 | + |
772 | +func (srv *Server) classPath() ([]string, os.Error) { |
773 | + dir := srv.installDir |
774 | + if dir == "" { |
775 | + return systemClassPath() |
776 | + } |
777 | + if err := checkDirectory(dir); err != nil { |
778 | + return nil, err |
779 | + } |
780 | + // Two possibilities, as seen in zkEnv.sh: |
781 | + // 1) locally built binaries (jars are in build directory) |
782 | + // 2) release binaries |
783 | + if build := filepath.Join(dir, "build"); checkDirectory(build) == nil { |
784 | + dir = build |
785 | + } |
786 | + classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar")) |
787 | + if err != nil { |
788 | + panic(fmt.Errorf("glob for jar files: %v", err)) |
789 | + } |
790 | + more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar")) |
791 | + if err != nil { |
792 | + panic(fmt.Errorf("glob for lib jar files: %v", err)) |
793 | + } |
794 | + |
795 | + classPath = append(classPath, more...) |
796 | + if len(classPath) == 0 { |
797 | + return nil, fmt.Errorf("zookeeper libraries not found in %q", dir) |
798 | + } |
799 | + return classPath, nil |
800 | +} |
801 | + |
802 | +const zookeeperEnviron = "/etc/zookeeper/conf/environment" |
803 | + |
804 | +func systemClassPath() ([]string, os.Error) { |
805 | + f, err := os.Open(zookeeperEnviron) |
806 | + if f == nil { |
807 | + return nil, err |
808 | + } |
809 | + r := bufio.NewReader(f) |
810 | + for { |
811 | + line, err := r.ReadSlice('\n') |
812 | + if err != nil { |
813 | + break |
814 | + } |
815 | + if !bytes.HasPrefix(line, []byte("CLASSPATH=")) { |
816 | + continue |
817 | + } |
818 | + |
819 | + // remove variable and newline |
820 | + path := string(line[len("CLASSPATH=") : len(line)-1]) |
821 | + |
822 | + // trim white space |
823 | + path = strings.Trim(path, " \t\r") |
824 | + |
825 | + // strip quotes |
826 | + if path[0] == '"' { |
827 | + path = path[1 : len(path)-1] |
828 | + } |
829 | + |
830 | + // split on : |
831 | + classPath := strings.Split(path, ":") |
832 | + |
833 | + // split off $ZOOCFGDIR |
834 | + if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" { |
835 | + classPath = classPath[1:] |
836 | + } |
837 | + |
838 | + if len(classPath) == 0 { |
839 | + return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron) |
840 | + } |
841 | + return classPath, nil |
842 | + } |
843 | + return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron) |
844 | +} |
845 | + |
846 | +// checkDirectory returns an error if the given path |
847 | +// does not exist or is not a directory. |
848 | +func checkDirectory(path string) os.Error { |
849 | + if info, err := os.Stat(path); err != nil || !info.IsDirectory() { |
850 | + if err == nil { |
851 | + err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")} |
852 | + } |
853 | + return err |
854 | + } |
855 | + return nil |
856 | +} |
857 | |
858 | === added directory 'service' |
859 | === added file 'service/Makefile' |
860 | --- service/Makefile 1970-01-01 00:00:00 +0000 |
861 | +++ service/Makefile 2011-10-03 15:02:28 +0000 |
862 | @@ -0,0 +1,20 @@ |
863 | +include $(GOROOT)/src/Make.inc |
864 | + |
865 | +TARG=launchpad.net/gozk/zk/service |
866 | + |
867 | +GOFILES=\ |
868 | + service.go\ |
869 | + |
870 | +GOFMT=gofmt |
871 | +BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go)) |
872 | + |
873 | +gofmt: $(BADFMT) |
874 | + @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done |
875 | + |
876 | +ifneq ($(BADFMT),) |
877 | +ifneq ($(MAKECMDGOALS),gofmt) |
878 | +$(warning WARNING: make gofmt: $(BADFMT)) |
879 | +endif |
880 | +endif |
881 | + |
882 | +include $(GOROOT)/src/Make.pkg |
883 | |
884 | === added file 'service/service.go' |
885 | --- service/service.go 1970-01-01 00:00:00 +0000 |
886 | +++ service/service.go 2011-10-03 15:02:28 +0000 |
887 | @@ -0,0 +1,160 @@ |
888 | +// The service package provides support for long-running services. |
889 | +package service |
890 | + |
891 | +import ( |
892 | + "os" |
893 | + "fmt" |
894 | + "io/ioutil" |
895 | + "strconv" |
896 | + "path/filepath" |
897 | + "exec" |
898 | + "strings" |
899 | + "time" |
900 | +) |
901 | + |
902 | +// Interface represents a single-process server. |
903 | +type Interface interface { |
904 | + // Directory gives the path to the directory containing |
905 | + // the server's configuration and data files. It is assumed that |
906 | + // this exists and can be modified. |
907 | + Directory() string |
908 | + |
909 | + // CheckAvailability should return an error if resources |
910 | + // required by the server are not available; for instance |
911 | + // if the server requires a network port which is not free. |
912 | + CheckAvailability() os.Error |
913 | + |
914 | + // Command should return a command that can be |
915 | + // used to run the server. The first element of the returned |
916 | + // slice is the executable name; the rest of the elements are |
917 | + // its arguments. |
918 | + Command() ([]string, os.Error) |
919 | +} |
920 | + |
921 | +// Service represents a possibly running server process. |
922 | +type Service struct { |
923 | + srv Interface |
924 | +} |
925 | + |
926 | +// New returns a new Service using the given |
927 | +// server interface. |
928 | +func New(srv Interface) *Service { |
929 | + return &Service{srv} |
930 | +} |
931 | + |
932 | +// Process returns a reference to the identity |
933 | +// of the last running server in the Service's directory. |
934 | +// If the server was shut down, it returns (nil, nil). |
935 | +// The server process may not still be running |
936 | +// (for instance if it was terminated abnormally). |
937 | +// This function is provided for debugging and testing |
938 | +// purposes only. |
939 | +func (s *Service) Process() (*os.Process, os.Error) { |
940 | + data, err := ioutil.ReadFile(s.path("pid.txt")) |
941 | + if err != nil { |
942 | + if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT { |
943 | + return nil, nil |
944 | + } |
945 | + return nil, err |
946 | + } |
947 | + pid, err := strconv.Atoi(string(data)) |
948 | + if err != nil { |
949 | + return nil, os.NewError("bad process id found in pid.txt") |
950 | + } |
951 | + return os.FindProcess(pid) |
952 | +} |
953 | + |
954 | +func (s *Service) path(name string) string { |
955 | + return filepath.Join(s.srv.Directory(), name) |
956 | +} |
957 | + |
958 | +// Start starts the server. It returns an error if the server is already running. |
959 | +// It stores the process ID of the server in the file "pid.txt" |
960 | +// inside the server's directory. Stdout and stderr of the server |
961 | +// are similarly written to "log.txt". |
962 | +func (s *Service) Start() os.Error { |
963 | + if err := s.srv.CheckAvailability(); err != nil { |
964 | + return err |
965 | + } |
966 | + p, err := s.Process() |
967 | + if p != nil || err != nil { |
968 | + if p != nil { |
969 | + p.Release() |
970 | + } |
971 | + return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt")) |
972 | + } |
973 | + |
974 | + // create the pid file before starting the process so that if we get two |
975 | + // programs trying to concurrently start a server on the same directory |
976 | + // at the same time, only one should succeed. |
977 | + pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666) |
978 | + if err != nil { |
979 | + return fmt.Errorf("cannot create pid.txt: %v", err) |
980 | + } |
981 | + defer pidf.Close() |
982 | + |
983 | + args, err := s.srv.Command() |
984 | + if err != nil { |
985 | + return fmt.Errorf("cannot determine command: %v", err) |
986 | + } |
987 | + cmd := exec.Command(args[0], args[1:]...) |
988 | + |
989 | + logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) |
990 | + if err != nil { |
991 | + return fmt.Errorf("cannot create log file: %v", err) |
992 | + } |
993 | + defer logf.Close() |
994 | + cmd.Stdout = logf |
995 | + cmd.Stderr = logf |
996 | + if err := cmd.Start(); err != nil { |
997 | + return fmt.Errorf("cannot start server: %v", err) |
998 | + } |
999 | + if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil { |
1000 | + return fmt.Errorf("cannot write pid file: %v", err) |
1001 | + } |
1002 | + return nil |
1003 | +} |
1004 | + |
1005 | +// Stop kills the server. It is a no-op if it is already running. |
1006 | +func (s *Service) Stop() os.Error { |
1007 | + p, err := s.Process() |
1008 | + if p == nil { |
1009 | + if err != nil { |
1010 | + return fmt.Errorf("cannot read process ID of server: %v", err) |
1011 | + } |
1012 | + return nil |
1013 | + } |
1014 | + defer p.Release() |
1015 | + if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") { |
1016 | + return fmt.Errorf("cannot kill server process: %v", err) |
1017 | + } |
1018 | + // ignore the error returned from Wait because there's little |
1019 | + // we can do about it - it either means that the process has just exited |
1020 | + // anyway or that we can't wait for it for some other reason, |
1021 | + // for example because it was originally started by some other process. |
1022 | + _, err = p.Wait(0) |
1023 | + if err != nil && strings.Contains(err.String(), "no child processes") { |
1024 | + // If we can't wait for the server, it's possible that it was running |
1025 | + // but not as a child of this process, so give it a little while |
1026 | + // to exit. TODO poll with kill(no signal)? |
1027 | + time.Sleep(0.5e9) |
1028 | + } |
1029 | + |
1030 | + if err := os.Remove(s.path("pid.txt")); err != nil { |
1031 | + return fmt.Errorf("cannot remove server process ID file: %v", err) |
1032 | + } |
1033 | + return nil |
1034 | +} |
1035 | + |
1036 | +// Destroy stops the server, and then removes its |
1037 | +// directory and all its contents. |
1038 | +// Warning: this will destroy all data associated with the server. |
1039 | +func (s *Service) Destroy() os.Error { |
1040 | + if err := s.Stop(); err != nil { |
1041 | + return err |
1042 | + } |
1043 | + if err := os.RemoveAll(s.srv.Directory()); err != nil { |
1044 | + return err |
1045 | + } |
1046 | + return nil |
1047 | +} |
1048 | |
1049 | === modified file 'suite_test.go' |
1050 | --- suite_test.go 2011-08-19 01:43:37 +0000 |
1051 | +++ suite_test.go 2011-10-03 15:02:28 +0000 |
1052 | @@ -1,65 +1,48 @@ |
1053 | -package gozk_test |
1054 | +package zk_test |
1055 | |
1056 | import ( |
1057 | . "launchpad.net/gocheck" |
1058 | - "testing" |
1059 | - "io/ioutil" |
1060 | - "path" |
1061 | "fmt" |
1062 | + "launchpad.net/gozk/zk/service" |
1063 | + "launchpad.net/gozk/zk" |
1064 | "os" |
1065 | - "gozk" |
1066 | + "testing" |
1067 | "time" |
1068 | ) |
1069 | |
1070 | func TestAll(t *testing.T) { |
1071 | - TestingT(t) |
1072 | + if !*reattach { |
1073 | + TestingT(t) |
1074 | + } |
1075 | } |
1076 | |
1077 | var _ = Suite(&S{}) |
1078 | |
1079 | type S struct { |
1080 | - zkRoot string |
1081 | - zkTestRoot string |
1082 | - zkTestPort int |
1083 | - zkServerSh string |
1084 | - zkServerOut *os.File |
1085 | - zkAddr string |
1086 | + zkServer *service.Service |
1087 | + zkTestRoot string |
1088 | + zkAddr string |
1089 | |
1090 | - handles []*gozk.ZooKeeper |
1091 | - events []*gozk.Event |
1092 | + handles []*zk.Conn |
1093 | + events []*zk.Event |
1094 | liveWatches int |
1095 | deadWatches chan bool |
1096 | } |
1097 | |
1098 | -var logLevel = 0 //gozk.LOG_ERROR |
1099 | - |
1100 | - |
1101 | -var testZooCfg = ("dataDir=%s\n" + |
1102 | - "clientPort=%d\n" + |
1103 | - "tickTime=2000\n" + |
1104 | - "initLimit=10\n" + |
1105 | - "syncLimit=5\n" + |
1106 | - "") |
1107 | - |
1108 | -var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" + |
1109 | - "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + |
1110 | - "log4j.appender.CONSOLE.Threshold=DEBUG\n" + |
1111 | - "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + |
1112 | - "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" + |
1113 | - "") |
1114 | - |
1115 | -func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) { |
1116 | - zk, watch, err := gozk.Init(s.zkAddr, 5e9) |
1117 | +var logLevel = 0 //zk.LOG_ERROR |
1118 | + |
1119 | +func (s *S) init(c *C) (*zk.Conn, chan zk.Event) { |
1120 | + conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
1121 | c.Assert(err, IsNil) |
1122 | |
1123 | - s.handles = append(s.handles, zk) |
1124 | + s.handles = append(s.handles, conn) |
1125 | |
1126 | event := <-watch |
1127 | |
1128 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
1129 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
1130 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
1131 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
1132 | |
1133 | - bufferedWatch := make(chan gozk.Event, 256) |
1134 | + bufferedWatch := make(chan zk.Event, 256) |
1135 | bufferedWatch <- event |
1136 | |
1137 | s.liveWatches += 1 |
1138 | @@ -82,13 +65,13 @@ |
1139 | s.deadWatches <- true |
1140 | }() |
1141 | |
1142 | - return zk, bufferedWatch |
1143 | + return conn, bufferedWatch |
1144 | } |
1145 | |
1146 | func (s *S) SetUpTest(c *C) { |
1147 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
1148 | + c.Assert(zk.CountPendingWatches(), Equals, 0, |
1149 | Bug("Test got a dirty watch state before running!")) |
1150 | - gozk.SetLogLevel(logLevel) |
1151 | + zk.SetLogLevel(logLevel) |
1152 | } |
1153 | |
1154 | func (s *S) TearDownTest(c *C) { |
1155 | @@ -108,98 +91,38 @@ |
1156 | } |
1157 | |
1158 | // Reset the list of handles. |
1159 | - s.handles = make([]*gozk.ZooKeeper, 0) |
1160 | + s.handles = make([]*zk.Conn, 0) |
1161 | |
1162 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
1163 | + c.Assert(zk.CountPendingWatches(), Equals, 0, |
1164 | Bug("Test left live watches behind!")) |
1165 | } |
1166 | |
1167 | -// We use the suite set up and tear down to manage a custom zookeeper |
1168 | +// We use the suite set up and tear down to manage a custom ZooKeeper |
1169 | // |
1170 | func (s *S) SetUpSuite(c *C) { |
1171 | - |
1172 | + var err os.Error |
1173 | s.deadWatches = make(chan bool) |
1174 | |
1175 | - var err os.Error |
1176 | - |
1177 | - s.zkRoot = os.Getenv("ZKROOT") |
1178 | - if s.zkRoot == "" { |
1179 | - panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)") |
1180 | - } |
1181 | - |
1182 | - s.zkTestRoot = c.MkDir() |
1183 | - s.zkTestPort = 21812 |
1184 | - |
1185 | - println("ZooKeeper test server directory:", s.zkTestRoot) |
1186 | - println("ZooKeeper test server port:", s.zkTestPort) |
1187 | - |
1188 | - s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort) |
1189 | - |
1190 | - s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh") |
1191 | - s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"), |
1192 | - os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) |
1193 | - if err != nil { |
1194 | - panic("Can't open stdout.txt file for server: " + err.String()) |
1195 | - } |
1196 | - |
1197 | - dataDir := path.Join(s.zkTestRoot, "data") |
1198 | - confDir := path.Join(s.zkTestRoot, "conf") |
1199 | - |
1200 | - os.Mkdir(dataDir, 0755) |
1201 | - os.Mkdir(confDir, 0755) |
1202 | - |
1203 | - err = os.Setenv("ZOOCFGDIR", confDir) |
1204 | - if err != nil { |
1205 | - panic("Can't set $ZOOCFGDIR: " + err.String()) |
1206 | - } |
1207 | - |
1208 | - zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort)) |
1209 | - err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644) |
1210 | - if err != nil { |
1211 | - panic("Can't write zoo.cfg: " + err.String()) |
1212 | - } |
1213 | - |
1214 | - log4jPrp := []byte(testLog4jPrp) |
1215 | - err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644) |
1216 | - if err != nil { |
1217 | - panic("Can't write log4j.properties: " + err.String()) |
1218 | - } |
1219 | - |
1220 | - s.StartZK() |
1221 | + // N.B. We meed to create a subdirectory because zk.CreateServer |
1222 | + // insists on creating its own directory. |
1223 | + s.zkTestRoot = c.MkDir() + "/zk" |
1224 | + |
1225 | + port := 21812 |
1226 | + s.zkAddr = fmt.Sprint("localhost:", port) |
1227 | + |
1228 | + srv, err := zk.CreateServer(port, s.zkTestRoot, "") |
1229 | + if err != nil { |
1230 | + c.Fatal("Cannot set up server environment: ", err) |
1231 | + } |
1232 | + s.zkServer = service.New(srv) |
1233 | + err = s.zkServer.Start() |
1234 | + if err != nil { |
1235 | + c.Fatal("Cannot start ZooKeeper server: ", err) |
1236 | + } |
1237 | } |
1238 | |
1239 | func (s *S) TearDownSuite(c *C) { |
1240 | - s.StopZK() |
1241 | - s.zkServerOut.Close() |
1242 | -} |
1243 | - |
1244 | -func (s *S) StartZK() { |
1245 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
1246 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr) |
1247 | - if err != nil { |
1248 | - panic("Problem executing zkServer.sh start: " + err.String()) |
1249 | - } |
1250 | - |
1251 | - result, err := proc.Wait(0) |
1252 | - if err != nil { |
1253 | - panic(err.String()) |
1254 | - } else if result.ExitStatus() != 0 { |
1255 | - panic("'zkServer.sh start' exited with non-zero status") |
1256 | - } |
1257 | -} |
1258 | - |
1259 | -func (s *S) StopZK() { |
1260 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
1261 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr) |
1262 | - if err != nil { |
1263 | - panic("Problem executing zkServer.sh stop: " + err.String() + |
1264 | - " (look for runaway java processes!)") |
1265 | - } |
1266 | - result, err := proc.Wait(0) |
1267 | - if err != nil { |
1268 | - panic(err.String()) |
1269 | - } else if result.ExitStatus() != 0 { |
1270 | - panic("'zkServer.sh stop' exited with non-zero status " + |
1271 | - "(look for runaway java processes!)") |
1272 | + if s.zkServer != nil { |
1273 | + s.zkServer.Stop() // TODO Destroy |
1274 | } |
1275 | } |
1276 | |
1277 | === renamed file 'gozk.go' => 'zk.go' |
1278 | --- gozk.go 2011-08-19 01:56:39 +0000 |
1279 | +++ zk.go 2011-10-03 15:02:28 +0000 |
1280 | @@ -1,4 +1,4 @@ |
1281 | -// gozk - Zookeeper support for the Go language |
1282 | +// gozk - ZooKeeper support for the Go language |
1283 | // |
1284 | // https://wiki.ubuntu.com/gozk |
1285 | // |
1286 | @@ -6,7 +6,7 @@ |
1287 | // |
1288 | // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> |
1289 | // |
1290 | -package gozk |
1291 | +package zk |
1292 | |
1293 | /* |
1294 | #cgo CFLAGS: -I/usr/include/c-client-src |
1295 | @@ -27,17 +27,16 @@ |
1296 | // ----------------------------------------------------------------------- |
1297 | // Main constants and data types. |
1298 | |
1299 | -// The main ZooKeeper object, created through the Init function. |
1300 | -// Encapsulates all communication with ZooKeeper. |
1301 | -type ZooKeeper struct { |
1302 | +// Conn represents a connection to a set of ZooKeeper nodes. |
1303 | +type Conn struct { |
1304 | watchChannels map[uintptr]chan Event |
1305 | sessionWatchId uintptr |
1306 | handle *C.zhandle_t |
1307 | mutex sync.Mutex |
1308 | } |
1309 | |
1310 | -// ClientId represents the established session in ZooKeeper. This is only |
1311 | -// useful to be passed back into the ReInit function. |
1312 | +// ClientId represents an established ZooKeeper session. It can be |
1313 | +// passed into Redial to reestablish a connection to an existing session. |
1314 | type ClientId struct { |
1315 | cId C.clientid_t |
1316 | } |
1317 | @@ -98,35 +97,60 @@ |
1318 | State int |
1319 | } |
1320 | |
1321 | -// Error codes that may be used to verify the result of the |
1322 | -// Code method from Error. |
1323 | +// Error represents a ZooKeeper error. |
1324 | +type Error int |
1325 | + |
1326 | const ( |
1327 | - ZOK = C.ZOK |
1328 | - ZSYSTEMERROR = C.ZSYSTEMERROR |
1329 | - ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY |
1330 | - ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY |
1331 | - ZCONNECTIONLOSS = C.ZCONNECTIONLOSS |
1332 | - ZMARSHALLINGERROR = C.ZMARSHALLINGERROR |
1333 | - ZUNIMPLEMENTED = C.ZUNIMPLEMENTED |
1334 | - ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT |
1335 | - ZBADARGUMENTS = C.ZBADARGUMENTS |
1336 | - ZINVALIDSTATE = C.ZINVALIDSTATE |
1337 | - ZAPIERROR = C.ZAPIERROR |
1338 | - ZNONODE = C.ZNONODE |
1339 | - ZNOAUTH = C.ZNOAUTH |
1340 | - ZBADVERSION = C.ZBADVERSION |
1341 | - ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS |
1342 | - ZNODEEXISTS = C.ZNODEEXISTS |
1343 | - ZNOTEMPTY = C.ZNOTEMPTY |
1344 | - ZSESSIONEXPIRED = C.ZSESSIONEXPIRED |
1345 | - ZINVALIDCALLBACK = C.ZINVALIDCALLBACK |
1346 | - ZINVALIDACL = C.ZINVALIDACL |
1347 | - ZAUTHFAILED = C.ZAUTHFAILED |
1348 | - ZCLOSING = C.ZCLOSING |
1349 | - ZNOTHING = C.ZNOTHING |
1350 | - ZSESSIONMOVED = C.ZSESSIONMOVED |
1351 | + ZOK Error = C.ZOK |
1352 | + ZSYSTEMERROR Error = C.ZSYSTEMERROR |
1353 | + ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY |
1354 | + ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY |
1355 | + ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS |
1356 | + ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR |
1357 | + ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED |
1358 | + ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT |
1359 | + ZBADARGUMENTS Error = C.ZBADARGUMENTS |
1360 | + ZINVALIDSTATE Error = C.ZINVALIDSTATE |
1361 | + ZAPIERROR Error = C.ZAPIERROR |
1362 | + ZNONODE Error = C.ZNONODE |
1363 | + ZNOAUTH Error = C.ZNOAUTH |
1364 | + ZBADVERSION Error = C.ZBADVERSION |
1365 | + ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS |
1366 | + ZNODEEXISTS Error = C.ZNODEEXISTS |
1367 | + ZNOTEMPTY Error = C.ZNOTEMPTY |
1368 | + ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED |
1369 | + ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK |
1370 | + ZINVALIDACL Error = C.ZINVALIDACL |
1371 | + ZAUTHFAILED Error = C.ZAUTHFAILED |
1372 | + ZCLOSING Error = C.ZCLOSING |
1373 | + ZNOTHING Error = C.ZNOTHING |
1374 | + ZSESSIONMOVED Error = C.ZSESSIONMOVED |
1375 | ) |
1376 | |
1377 | +func (error Error) String() string { |
1378 | + return C.GoString(C.zerror(C.int(error))) // Static, no need to free it. |
1379 | +} |
1380 | + |
1381 | +// zkError creates an appropriate error return from |
1382 | +// a ZooKeeper status and the errno return from a C API |
1383 | +// call. |
1384 | +func zkError(rc C.int, cerr os.Error) os.Error { |
1385 | + code := Error(rc) |
1386 | + switch code { |
1387 | + case ZOK: |
1388 | + return nil |
1389 | + |
1390 | + case ZSYSTEMERROR: |
1391 | + // If a ZooKeeper call returns ZSYSTEMERROR, then |
1392 | + // errno becomes significant. If errno has not been |
1393 | + // set, then we will return ZSYSTEMERROR nonetheless. |
1394 | + if cerr != nil { |
1395 | + return cerr |
1396 | + } |
1397 | + } |
1398 | + return code |
1399 | +} |
1400 | + |
1401 | // Constants for SetLogLevel. |
1402 | const ( |
1403 | LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR |
1404 | @@ -273,104 +297,67 @@ |
1405 | } |
1406 | |
1407 | // ----------------------------------------------------------------------- |
1408 | -// Error interface which maps onto the ZooKeeper error codes. |
1409 | - |
1410 | -type Error interface { |
1411 | - String() string |
1412 | - Code() int |
1413 | -} |
1414 | - |
1415 | -type errorType struct { |
1416 | - zkrc C.int |
1417 | - err os.Error |
1418 | -} |
1419 | - |
1420 | -func newError(zkrc C.int, err os.Error) Error { |
1421 | - return &errorType{zkrc, err} |
1422 | -} |
1423 | - |
1424 | -func (error *errorType) String() (result string) { |
1425 | - if error.zkrc == ZSYSTEMERROR && error.err != nil { |
1426 | - result = error.err.String() |
1427 | - } else { |
1428 | - result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it. |
1429 | - } |
1430 | - return |
1431 | -} |
1432 | - |
1433 | -// Code returns the error code that may be compared against one of |
1434 | -// the gozk.Z* constants. |
1435 | -func (error *errorType) Code() int { |
1436 | - return int(error.zkrc) |
1437 | -} |
1438 | - |
1439 | -// ----------------------------------------------------------------------- |
1440 | -// Stat interface which maps onto the ZooKeeper Stat struct. |
1441 | - |
1442 | -// We declare this as an interface rather than an actual struct because |
1443 | -// this way we don't have to copy data around between the real C struct |
1444 | -// and the Go one on every call. Most uses will only touch a few elements, |
1445 | -// or even ignore the stat entirely, so that's a win. |
1446 | |
1447 | // Stat contains detailed information about a node. |
1448 | -type Stat interface { |
1449 | - Czxid() int64 |
1450 | - Mzxid() int64 |
1451 | - CTime() int64 |
1452 | - MTime() int64 |
1453 | - Version() int32 |
1454 | - CVersion() int32 |
1455 | - AVersion() int32 |
1456 | - EphemeralOwner() int64 |
1457 | - DataLength() int32 |
1458 | - NumChildren() int32 |
1459 | - Pzxid() int64 |
1460 | -} |
1461 | - |
1462 | -type resultStat C.struct_Stat |
1463 | - |
1464 | -func (stat *resultStat) Czxid() int64 { |
1465 | - return int64(stat.czxid) |
1466 | -} |
1467 | - |
1468 | -func (stat *resultStat) Mzxid() int64 { |
1469 | - return int64(stat.mzxid) |
1470 | -} |
1471 | - |
1472 | -func (stat *resultStat) CTime() int64 { |
1473 | - return int64(stat.ctime) |
1474 | -} |
1475 | - |
1476 | -func (stat *resultStat) MTime() int64 { |
1477 | - return int64(stat.mtime) |
1478 | -} |
1479 | - |
1480 | -func (stat *resultStat) Version() int32 { |
1481 | - return int32(stat.version) |
1482 | -} |
1483 | - |
1484 | -func (stat *resultStat) CVersion() int32 { |
1485 | - return int32(stat.cversion) |
1486 | -} |
1487 | - |
1488 | -func (stat *resultStat) AVersion() int32 { |
1489 | - return int32(stat.aversion) |
1490 | -} |
1491 | - |
1492 | -func (stat *resultStat) EphemeralOwner() int64 { |
1493 | - return int64(stat.ephemeralOwner) |
1494 | -} |
1495 | - |
1496 | -func (stat *resultStat) DataLength() int32 { |
1497 | - return int32(stat.dataLength) |
1498 | -} |
1499 | - |
1500 | -func (stat *resultStat) NumChildren() int32 { |
1501 | - return int32(stat.numChildren) |
1502 | -} |
1503 | - |
1504 | -func (stat *resultStat) Pzxid() int64 { |
1505 | - return int64(stat.pzxid) |
1506 | +type Stat struct { |
1507 | + c C.struct_Stat |
1508 | +} |
1509 | + |
1510 | +func (stat *Stat) String() string { |
1511 | + return fmt.Sprintf( |
1512 | + "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+ |
1513 | + "Version: %d; CVersion: %d; AVersion: %d; "+ |
1514 | + "EphemeralOwner: %d; DataLength: %d; "+ |
1515 | + "NumChildren: %d; Pzxid: %d}", |
1516 | + stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(), |
1517 | + stat.Version(), stat.CVersion(), stat.AVersion(), |
1518 | + stat.EphemeralOwner(), stat.DataLength(), |
1519 | + stat.NumChildren(), stat.Pzxid(), |
1520 | + ) |
1521 | +} |
1522 | + |
1523 | +func (stat *Stat) Czxid() int64 { |
1524 | + return int64(stat.c.czxid) |
1525 | +} |
1526 | + |
1527 | +func (stat *Stat) Mzxid() int64 { |
1528 | + return int64(stat.c.mzxid) |
1529 | +} |
1530 | + |
1531 | +func (stat *Stat) CTime() int64 { |
1532 | + return int64(stat.c.ctime) |
1533 | +} |
1534 | + |
1535 | +func (stat *Stat) MTime() int64 { |
1536 | + return int64(stat.c.mtime) |
1537 | +} |
1538 | + |
1539 | +func (stat *Stat) Version() int32 { |
1540 | + return int32(stat.c.version) |
1541 | +} |
1542 | + |
1543 | +func (stat *Stat) CVersion() int32 { |
1544 | + return int32(stat.c.cversion) |
1545 | +} |
1546 | + |
1547 | +func (stat *Stat) AVersion() int32 { |
1548 | + return int32(stat.c.aversion) |
1549 | +} |
1550 | + |
1551 | +func (stat *Stat) EphemeralOwner() int64 { |
1552 | + return int64(stat.c.ephemeralOwner) |
1553 | +} |
1554 | + |
1555 | +func (stat *Stat) DataLength() int32 { |
1556 | + return int32(stat.c.dataLength) |
1557 | +} |
1558 | + |
1559 | +func (stat *Stat) NumChildren() int32 { |
1560 | + return int32(stat.c.numChildren) |
1561 | +} |
1562 | + |
1563 | +func (stat *Stat) Pzxid() int64 { |
1564 | + return int64(stat.c.pzxid) |
1565 | } |
1566 | |
1567 | // ----------------------------------------------------------------------- |
1568 | @@ -384,7 +371,7 @@ |
1569 | C.zoo_set_debug_level(C.ZooLogLevel(level)) |
1570 | } |
1571 | |
1572 | -// Init initializes the communication with a ZooKeeper cluster. The provided |
1573 | +// Dial initializes the communication with a ZooKeeper cluster. The provided |
1574 | // servers parameter may include multiple server addresses, separated |
1575 | // by commas, so that the client will automatically attempt to connect |
1576 | // to another server if one of them stops working for whatever reason. |
1577 | @@ -398,79 +385,73 @@ |
1578 | // The watch channel receives events of type SESSION_EVENT when any change |
1579 | // to the state of the established connection happens. See the documentation |
1580 | // for the Event type for more details. |
1581 | -func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) { |
1582 | - zk, watch, err = internalInit(servers, recvTimeoutNS, nil) |
1583 | - return |
1584 | +func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) { |
1585 | + return dial(servers, recvTimeoutNS, nil) |
1586 | } |
1587 | |
1588 | -// Equivalent to Init, but attempt to reestablish an existing session |
1589 | +// Redial is equivalent to Dial, but attempts to reestablish an existing session |
1590 | // identified via the clientId parameter. |
1591 | -func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) { |
1592 | - zk, watch, err = internalInit(servers, recvTimeoutNS, clientId) |
1593 | - return |
1594 | +func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1595 | + return dial(servers, recvTimeoutNS, clientId) |
1596 | } |
1597 | |
1598 | -func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) { |
1599 | - |
1600 | - zk := &ZooKeeper{} |
1601 | - zk.watchChannels = make(map[uintptr]chan Event) |
1602 | +func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1603 | + conn := &Conn{} |
1604 | + conn.watchChannels = make(map[uintptr]chan Event) |
1605 | |
1606 | var cId *C.clientid_t |
1607 | if clientId != nil { |
1608 | cId = &clientId.cId |
1609 | } |
1610 | |
1611 | - watchId, watchChannel := zk.createWatch(true) |
1612 | - zk.sessionWatchId = watchId |
1613 | + watchId, watchChannel := conn.createWatch(true) |
1614 | + conn.sessionWatchId = watchId |
1615 | |
1616 | cservers := C.CString(servers) |
1617 | - handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0) |
1618 | + handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0) |
1619 | C.free(unsafe.Pointer(cservers)) |
1620 | if handle == nil { |
1621 | - zk.closeAllWatches() |
1622 | - return nil, nil, newError(ZSYSTEMERROR, cerr) |
1623 | + conn.closeAllWatches() |
1624 | + return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr) |
1625 | } |
1626 | - zk.handle = handle |
1627 | + conn.handle = handle |
1628 | runWatchLoop() |
1629 | - return zk, watchChannel, nil |
1630 | + return conn, watchChannel, nil |
1631 | } |
1632 | |
1633 | // ClientId returns the client ID for the existing session with ZooKeeper. |
1634 | // This is useful to reestablish an existing session via ReInit. |
1635 | -func (zk *ZooKeeper) ClientId() *ClientId { |
1636 | - return &ClientId{*C.zoo_client_id(zk.handle)} |
1637 | +func (conn *Conn) ClientId() *ClientId { |
1638 | + return &ClientId{*C.zoo_client_id(conn.handle)} |
1639 | } |
1640 | |
1641 | // Close terminates the ZooKeeper interaction. |
1642 | -func (zk *ZooKeeper) Close() Error { |
1643 | - |
1644 | - // Protect from concurrency around zk.handle change. |
1645 | - zk.mutex.Lock() |
1646 | - defer zk.mutex.Unlock() |
1647 | - |
1648 | - if zk.handle == nil { |
1649 | +func (conn *Conn) Close() os.Error { |
1650 | + |
1651 | + // Protect from concurrency around conn.handle change. |
1652 | + conn.mutex.Lock() |
1653 | + defer conn.mutex.Unlock() |
1654 | + |
1655 | + if conn.handle == nil { |
1656 | // ZooKeeper may hang indefinitely if a handler is closed twice, |
1657 | // so we get in the way and prevent it from happening. |
1658 | - return newError(ZCLOSING, nil) |
1659 | + return ZCLOSING |
1660 | } |
1661 | - rc, cerr := C.zookeeper_close(zk.handle) |
1662 | + rc, cerr := C.zookeeper_close(conn.handle) |
1663 | |
1664 | - zk.closeAllWatches() |
1665 | + conn.closeAllWatches() |
1666 | stopWatchLoop() |
1667 | |
1668 | - // At this point, nothing else should need zk.handle. |
1669 | - zk.handle = nil |
1670 | + // At this point, nothing else should need conn.handle. |
1671 | + conn.handle = nil |
1672 | |
1673 | - if rc != C.ZOK { |
1674 | - return newError(rc, cerr) |
1675 | - } |
1676 | - return nil |
1677 | + return zkError(rc, cerr) |
1678 | } |
1679 | |
1680 | // Get returns the data and status from an existing node. err will be nil, |
1681 | // unless an error is found. Attempting to retrieve data from a non-existing |
1682 | // node is an error. |
1683 | -func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) { |
1684 | +func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) { |
1685 | |
1686 | cpath := C.CString(path) |
1687 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1688 | @@ -478,22 +459,21 @@ |
1689 | defer C.free(unsafe.Pointer(cpath)) |
1690 | defer C.free(unsafe.Pointer(cbuffer)) |
1691 | |
1692 | - cstat := C.struct_Stat{} |
1693 | - rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil, |
1694 | - cbuffer, &cbufferLen, &cstat) |
1695 | + var cstat Stat |
1696 | + rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c) |
1697 | if rc != C.ZOK { |
1698 | - return "", nil, newError(rc, cerr) |
1699 | + return "", nil, zkError(rc, cerr) |
1700 | } |
1701 | + |
1702 | result := C.GoStringN(cbuffer, cbufferLen) |
1703 | - |
1704 | - return result, (*resultStat)(&cstat), nil |
1705 | + return result, &cstat, nil |
1706 | } |
1707 | |
1708 | // GetW works like Get but also returns a channel that will receive |
1709 | // a single Event value when the data or existence of the given ZooKeeper |
1710 | // node changes or when critical session events happen. See the |
1711 | // documentation of the Event type for more details. |
1712 | -func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) { |
1713 | +func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) { |
1714 | |
1715 | cpath := C.CString(path) |
1716 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1717 | @@ -501,42 +481,38 @@ |
1718 | defer C.free(unsafe.Pointer(cpath)) |
1719 | defer C.free(unsafe.Pointer(cbuffer)) |
1720 | |
1721 | - watchId, watchChannel := zk.createWatch(true) |
1722 | + watchId, watchChannel := conn.createWatch(true) |
1723 | |
1724 | - cstat := C.struct_Stat{} |
1725 | - rc, cerr := C.zoo_wget(zk.handle, cpath, |
1726 | - C.watch_handler, unsafe.Pointer(watchId), |
1727 | - cbuffer, &cbufferLen, &cstat) |
1728 | + var cstat Stat |
1729 | + rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c) |
1730 | if rc != C.ZOK { |
1731 | - zk.forgetWatch(watchId) |
1732 | - return "", nil, nil, newError(rc, cerr) |
1733 | + conn.forgetWatch(watchId) |
1734 | + return "", nil, nil, zkError(rc, cerr) |
1735 | } |
1736 | |
1737 | result := C.GoStringN(cbuffer, cbufferLen) |
1738 | - return result, (*resultStat)(&cstat), watchChannel, nil |
1739 | + return result, &cstat, watchChannel, nil |
1740 | } |
1741 | |
1742 | // Children returns the children list and status from an existing node. |
1743 | -// err will be nil, unless an error is found. Attempting to retrieve the |
1744 | -// children list from a non-existent node is an error. |
1745 | -func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) { |
1746 | +// Attempting to retrieve the children list from a non-existent node is an error. |
1747 | +func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) { |
1748 | |
1749 | cpath := C.CString(path) |
1750 | defer C.free(unsafe.Pointer(cpath)) |
1751 | |
1752 | cvector := C.struct_String_vector{} |
1753 | - cstat := C.struct_Stat{} |
1754 | - rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil, |
1755 | - &cvector, &cstat) |
1756 | + var cstat Stat |
1757 | + rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c) |
1758 | |
1759 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1760 | if cvector.count != 0 { |
1761 | children = parseStringVector(&cvector) |
1762 | } |
1763 | - if rc != C.ZOK { |
1764 | - err = newError(rc, cerr) |
1765 | + if rc == C.ZOK { |
1766 | + stat = &cstat |
1767 | } else { |
1768 | - stat = (*resultStat)(&cstat) |
1769 | + err = zkError(rc, cerr) |
1770 | } |
1771 | return |
1772 | } |
1773 | @@ -545,29 +521,27 @@ |
1774 | // receive a single Event value when a node is added or removed under the |
1775 | // provided path or when critical session events happen. See the documentation |
1776 | // of the Event type for more details. |
1777 | -func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) { |
1778 | +func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) { |
1779 | |
1780 | cpath := C.CString(path) |
1781 | defer C.free(unsafe.Pointer(cpath)) |
1782 | |
1783 | - watchId, watchChannel := zk.createWatch(true) |
1784 | + watchId, watchChannel := conn.createWatch(true) |
1785 | |
1786 | cvector := C.struct_String_vector{} |
1787 | - cstat := C.struct_Stat{} |
1788 | - rc, cerr := C.zoo_wget_children2(zk.handle, cpath, |
1789 | - C.watch_handler, unsafe.Pointer(watchId), |
1790 | - &cvector, &cstat) |
1791 | + var cstat Stat |
1792 | + rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c) |
1793 | |
1794 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1795 | if cvector.count != 0 { |
1796 | children = parseStringVector(&cvector) |
1797 | } |
1798 | - if rc != C.ZOK { |
1799 | - zk.forgetWatch(watchId) |
1800 | - err = newError(rc, cerr) |
1801 | + if rc == C.ZOK { |
1802 | + stat = &cstat |
1803 | + watch = watchChannel |
1804 | } else { |
1805 | - stat = (*resultStat)(&cstat) |
1806 | - watch = watchChannel |
1807 | + conn.forgetWatch(watchId) |
1808 | + err = zkError(rc, cerr) |
1809 | } |
1810 | return |
1811 | } |
1812 | @@ -588,20 +562,20 @@ |
1813 | // Exists checks if a node exists at the given path. If it does, |
1814 | // stat will contain meta information on the existing node, otherwise |
1815 | // it will be nil. |
1816 | -func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) { |
1817 | +func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) { |
1818 | cpath := C.CString(path) |
1819 | defer C.free(unsafe.Pointer(cpath)) |
1820 | |
1821 | - cstat := C.struct_Stat{} |
1822 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat) |
1823 | + var cstat Stat |
1824 | + rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c) |
1825 | |
1826 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1827 | // for an exists call, otherwise every Exists call would have to check |
1828 | // for err != nil and err.Code() != ZNONODE. |
1829 | if rc == C.ZOK { |
1830 | - stat = (*resultStat)(&cstat) |
1831 | + stat = &cstat |
1832 | } else if rc != C.ZNONODE { |
1833 | - err = newError(rc, cerr) |
1834 | + err = zkError(rc, cerr) |
1835 | } |
1836 | return |
1837 | } |
1838 | @@ -611,28 +585,27 @@ |
1839 | // stat is nil and the node didn't exist, or when the existing node |
1840 | // is removed. It will also receive critical session events. See the |
1841 | // documentation of the Event type for more details. |
1842 | -func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) { |
1843 | +func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) { |
1844 | cpath := C.CString(path) |
1845 | defer C.free(unsafe.Pointer(cpath)) |
1846 | |
1847 | - watchId, watchChannel := zk.createWatch(true) |
1848 | + watchId, watchChannel := conn.createWatch(true) |
1849 | |
1850 | - cstat := C.struct_Stat{} |
1851 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, |
1852 | - C.watch_handler, unsafe.Pointer(watchId), &cstat) |
1853 | + var cstat Stat |
1854 | + rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c) |
1855 | |
1856 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1857 | // for an exists call, otherwise every Exists call would have to check |
1858 | // for err != nil and err.Code() != ZNONODE. |
1859 | - switch rc { |
1860 | + switch Error(rc) { |
1861 | case ZOK: |
1862 | - stat = (*resultStat)(&cstat) |
1863 | + stat = &cstat |
1864 | watch = watchChannel |
1865 | case ZNONODE: |
1866 | watch = watchChannel |
1867 | default: |
1868 | - zk.forgetWatch(watchId) |
1869 | - err = newError(rc, cerr) |
1870 | + conn.forgetWatch(watchId) |
1871 | + err = zkError(rc, cerr) |
1872 | } |
1873 | return |
1874 | } |
1875 | @@ -646,7 +619,7 @@ |
1876 | // The returned path is useful in cases where the created path may differ |
1877 | // from the requested one, such as when a sequence number is appended |
1878 | // to it due to the use of the gozk.SEQUENCE flag. |
1879 | -func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) { |
1880 | +func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) { |
1881 | cpath := C.CString(path) |
1882 | cvalue := C.CString(value) |
1883 | defer C.free(unsafe.Pointer(cpath)) |
1884 | @@ -660,13 +633,13 @@ |
1885 | cpathCreated := (*C.char)(C.malloc(cpathLen)) |
1886 | defer C.free(unsafe.Pointer(cpathCreated)) |
1887 | |
1888 | - rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)), |
1889 | - caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
1890 | - if rc != C.ZOK { |
1891 | - return "", newError(rc, cerr) |
1892 | + rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
1893 | + if rc == C.ZOK { |
1894 | + pathCreated = C.GoString(cpathCreated) |
1895 | + } else { |
1896 | + err = zkError(rc, cerr) |
1897 | } |
1898 | - |
1899 | - return C.GoString(cpathCreated), nil |
1900 | + return |
1901 | } |
1902 | |
1903 | // Set modifies the data for the existing node at the given path, replacing it |
1904 | @@ -677,35 +650,31 @@ |
1905 | // |
1906 | // It is an error to attempt to set the data of a non-existing node with |
1907 | // this function. In these cases, use Create instead. |
1908 | -func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) { |
1909 | +func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) { |
1910 | |
1911 | cpath := C.CString(path) |
1912 | cvalue := C.CString(value) |
1913 | defer C.free(unsafe.Pointer(cpath)) |
1914 | defer C.free(unsafe.Pointer(cvalue)) |
1915 | |
1916 | - cstat := C.struct_Stat{} |
1917 | - |
1918 | - rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)), |
1919 | - C.int(version), &cstat) |
1920 | - if rc != C.ZOK { |
1921 | - return nil, newError(rc, cerr) |
1922 | + var cstat Stat |
1923 | + rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c) |
1924 | + if rc == C.ZOK { |
1925 | + stat = &cstat |
1926 | + } else { |
1927 | + err = zkError(rc, cerr) |
1928 | } |
1929 | - |
1930 | - return (*resultStat)(&cstat), nil |
1931 | + return |
1932 | } |
1933 | |
1934 | // Delete removes the node at path. If version is not -1, the operation |
1935 | // will only succeed if the node is still at this version when the |
1936 | // node is deleted as an atomic operation. |
1937 | -func (zk *ZooKeeper) Delete(path string, version int32) (err Error) { |
1938 | +func (conn *Conn) Delete(path string, version int32) (err os.Error) { |
1939 | cpath := C.CString(path) |
1940 | defer C.free(unsafe.Pointer(cpath)) |
1941 | - rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version)) |
1942 | - if rc != C.ZOK { |
1943 | - return newError(rc, cerr) |
1944 | - } |
1945 | - return nil |
1946 | + rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) |
1947 | + return zkError(rc, cerr) |
1948 | } |
1949 | |
1950 | // AddAuth adds a new authentication certificate to the ZooKeeper |
1951 | @@ -713,7 +682,7 @@ |
1952 | // authentication information, while the cert parameter provides the |
1953 | // identity data itself. For instance, the "digest" scheme requires |
1954 | // a pair like "username:password" to be provided as the certificate. |
1955 | -func (zk *ZooKeeper) AddAuth(scheme, cert string) Error { |
1956 | +func (conn *Conn) AddAuth(scheme, cert string) os.Error { |
1957 | cscheme := C.CString(scheme) |
1958 | ccert := C.CString(cert) |
1959 | defer C.free(unsafe.Pointer(cscheme)) |
1960 | @@ -725,43 +694,38 @@ |
1961 | } |
1962 | defer C.destroy_completion_data(data) |
1963 | |
1964 | - rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)), |
1965 | - C.handle_void_completion, unsafe.Pointer(data)) |
1966 | + rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data)) |
1967 | if rc != C.ZOK { |
1968 | - return newError(rc, cerr) |
1969 | + return zkError(rc, cerr) |
1970 | } |
1971 | |
1972 | C.wait_for_completion(data) |
1973 | |
1974 | rc = C.int(uintptr(data.data)) |
1975 | - if rc != C.ZOK { |
1976 | - return newError(rc, nil) |
1977 | - } |
1978 | - |
1979 | - return nil |
1980 | + return zkError(rc, nil) |
1981 | } |
1982 | |
1983 | // ACL returns the access control list for path. |
1984 | -func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) { |
1985 | +func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) { |
1986 | |
1987 | cpath := C.CString(path) |
1988 | defer C.free(unsafe.Pointer(cpath)) |
1989 | |
1990 | caclv := C.struct_ACL_vector{} |
1991 | - cstat := C.struct_Stat{} |
1992 | |
1993 | - rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat) |
1994 | + var cstat Stat |
1995 | + rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) |
1996 | if rc != C.ZOK { |
1997 | - return nil, nil, newError(rc, cerr) |
1998 | + return nil, nil, zkError(rc, cerr) |
1999 | } |
2000 | |
2001 | aclv := parseACLVector(&caclv) |
2002 | |
2003 | - return aclv, (*resultStat)(&cstat), nil |
2004 | + return aclv, &cstat, nil |
2005 | } |
2006 | |
2007 | // SetACL changes the access control list for path. |
2008 | -func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error { |
2009 | +func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error { |
2010 | |
2011 | cpath := C.CString(path) |
2012 | defer C.free(unsafe.Pointer(cpath)) |
2013 | @@ -769,12 +733,8 @@ |
2014 | caclv := buildACLVector(aclv) |
2015 | defer C.deallocate_ACL_vector(caclv) |
2016 | |
2017 | - rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv) |
2018 | - if rc != C.ZOK { |
2019 | - return newError(rc, cerr) |
2020 | - } |
2021 | - |
2022 | - return nil |
2023 | + rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) |
2024 | + return zkError(rc, cerr) |
2025 | } |
2026 | |
2027 | func parseACLVector(caclv *C.struct_ACL_vector) []ACL { |
2028 | @@ -822,7 +782,7 @@ |
2029 | // ----------------------------------------------------------------------- |
2030 | // RetryChange utility method. |
2031 | |
2032 | -type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error) |
2033 | +type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error) |
2034 | |
2035 | // RetryChange runs changeFunc to attempt to atomically change path |
2036 | // in a lock free manner, and retries in case there was another |
2037 | @@ -831,8 +791,7 @@ |
2038 | // changeFunc must work correctly if called multiple times in case |
2039 | // the modification fails due to concurrent changes, and it may return |
2040 | // an error that will cause the the RetryChange function to stop and |
2041 | -// return an Error with code ZSYSTEMERROR and the same .String() result |
2042 | -// as the provided error. |
2043 | +// return the same error. |
2044 | // |
2045 | // This mechanism is not suitable for a node that is frequently modified |
2046 | // concurrently. For those cases, consider using a pessimistic locking |
2047 | @@ -845,8 +804,7 @@ |
2048 | // |
2049 | // 2. Call the changeFunc with the current node value and stat, |
2050 | // or with an empty string and nil stat, if the node doesn't yet exist. |
2051 | -// If the changeFunc returns an error, stop and return an Error with |
2052 | -// ZSYSTEMERROR Code() and the same String() as the changeFunc error. |
2053 | +// If the changeFunc returns an error, stop and return the same error. |
2054 | // |
2055 | // 3. If the changeFunc returns no errors, use the string returned as |
2056 | // the new candidate value for the node, and attempt to either create |
2057 | @@ -855,36 +813,32 @@ |
2058 | // in the same node), repeat from step 1. If this procedure fails with any |
2059 | // other error, stop and return the error found. |
2060 | // |
2061 | -func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) { |
2062 | +func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error { |
2063 | for { |
2064 | - oldValue, oldStat, getErr := zk.Get(path) |
2065 | - if getErr != nil && getErr.Code() != ZNONODE { |
2066 | - err = getErr |
2067 | - break |
2068 | - } |
2069 | - newValue, osErr := changeFunc(oldValue, oldStat) |
2070 | - if osErr != nil { |
2071 | - return newError(ZSYSTEMERROR, osErr) |
2072 | - } else if oldStat == nil { |
2073 | - _, err = zk.Create(path, newValue, flags, acl) |
2074 | - if err == nil || err.Code() != ZNODEEXISTS { |
2075 | - break |
2076 | + oldValue, oldStat, err := conn.Get(path) |
2077 | + if err != nil && err != ZNONODE { |
2078 | + return err |
2079 | + } |
2080 | + newValue, err := changeFunc(oldValue, oldStat) |
2081 | + if err != nil { |
2082 | + return err |
2083 | + } |
2084 | + if oldStat == nil { |
2085 | + _, err := conn.Create(path, newValue, flags, acl) |
2086 | + if err == nil || err != ZNODEEXISTS { |
2087 | + return err |
2088 | } |
2089 | - } else if newValue == oldValue { |
2090 | + continue |
2091 | + } |
2092 | + if newValue == oldValue { |
2093 | return nil // Nothing to do. |
2094 | - } else { |
2095 | - _, err = zk.Set(path, newValue, oldStat.Version()) |
2096 | - if err == nil { |
2097 | - break |
2098 | - } else { |
2099 | - code := err.Code() |
2100 | - if code != ZBADVERSION && code != ZNONODE { |
2101 | - break |
2102 | - } |
2103 | - } |
2104 | + } |
2105 | + _, err = conn.Set(path, newValue, oldStat.Version()) |
2106 | + if err == nil || (err != ZBADVERSION && err != ZNONODE) { |
2107 | + return err |
2108 | } |
2109 | } |
2110 | - return err |
2111 | + panic("not reached") |
2112 | } |
2113 | |
2114 | // ----------------------------------------------------------------------- |
2115 | @@ -897,7 +851,7 @@ |
2116 | // Whenever a *W method is called, it will return a channel which |
2117 | // outputs Event values. Internally, a map is used to maintain references |
2118 | // between an unique integer key (the watchId), and the event channel. The |
2119 | -// watchId is then handed to the C zookeeper library as the watch context, |
2120 | +// watchId is then handed to the C ZooKeeper library as the watch context, |
2121 | // so that we get it back when events happen. Using an integer key as the |
2122 | // watch context rather than a pointer is needed because there's no guarantee |
2123 | // that in the future the GC will not move objects around, and also because |
2124 | @@ -910,13 +864,13 @@ |
2125 | // Since Cgo doesn't allow calling back into Go, we actually fire a new |
2126 | // goroutine the very first time Init is called, and allow it to block |
2127 | // in a pthread condition variable within a C function. This condition |
2128 | -// will only be notified once a zookeeper watch callback appends new |
2129 | +// will only be notified once a ZooKeeper watch callback appends new |
2130 | // entries to the event list. When this happens, the C function returns |
2131 | // and we get back into Go land with the pointer to the watch data, |
2132 | // including the watchId and other event details such as type and path. |
2133 | |
2134 | var watchMutex sync.Mutex |
2135 | -var watchZooKeepers = make(map[uintptr]*ZooKeeper) |
2136 | +var watchConns = make(map[uintptr]*Conn) |
2137 | var watchCounter uintptr |
2138 | var watchLoopCounter int |
2139 | |
2140 | @@ -925,14 +879,14 @@ |
2141 | // mostly as a debugging and testing aid. |
2142 | func CountPendingWatches() int { |
2143 | watchMutex.Lock() |
2144 | - count := len(watchZooKeepers) |
2145 | + count := len(watchConns) |
2146 | watchMutex.Unlock() |
2147 | return count |
2148 | } |
2149 | |
2150 | // createWatch creates and registers a watch, returning the watch id |
2151 | // and channel. |
2152 | -func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
2153 | +func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
2154 | buf := 1 // session/watch event |
2155 | if session { |
2156 | buf = 32 |
2157 | @@ -942,8 +896,8 @@ |
2158 | defer watchMutex.Unlock() |
2159 | watchId = watchCounter |
2160 | watchCounter += 1 |
2161 | - zk.watchChannels[watchId] = watchChannel |
2162 | - watchZooKeepers[watchId] = zk |
2163 | + conn.watchChannels[watchId] = watchChannel |
2164 | + watchConns[watchId] = conn |
2165 | return |
2166 | } |
2167 | |
2168 | @@ -951,21 +905,21 @@ |
2169 | // from ever getting delivered. It shouldn't be used if there's any |
2170 | // chance the watch channel is still visible and not closed, since |
2171 | // it might mean a goroutine would be blocked forever. |
2172 | -func (zk *ZooKeeper) forgetWatch(watchId uintptr) { |
2173 | +func (conn *Conn) forgetWatch(watchId uintptr) { |
2174 | watchMutex.Lock() |
2175 | defer watchMutex.Unlock() |
2176 | - zk.watchChannels[watchId] = nil, false |
2177 | - watchZooKeepers[watchId] = nil, false |
2178 | + conn.watchChannels[watchId] = nil, false |
2179 | + watchConns[watchId] = nil, false |
2180 | } |
2181 | |
2182 | -// closeAllWatches closes all watch channels for zk. |
2183 | -func (zk *ZooKeeper) closeAllWatches() { |
2184 | +// closeAllWatches closes all watch channels for conn. |
2185 | +func (conn *Conn) closeAllWatches() { |
2186 | watchMutex.Lock() |
2187 | defer watchMutex.Unlock() |
2188 | - for watchId, ch := range zk.watchChannels { |
2189 | + for watchId, ch := range conn.watchChannels { |
2190 | close(ch) |
2191 | - zk.watchChannels[watchId] = nil, false |
2192 | - watchZooKeepers[watchId] = nil, false |
2193 | + conn.watchChannels[watchId] = nil, false |
2194 | + watchConns[watchId] = nil, false |
2195 | } |
2196 | } |
2197 | |
2198 | @@ -978,11 +932,11 @@ |
2199 | } |
2200 | watchMutex.Lock() |
2201 | defer watchMutex.Unlock() |
2202 | - zk, ok := watchZooKeepers[watchId] |
2203 | + conn, ok := watchConns[watchId] |
2204 | if !ok { |
2205 | return |
2206 | } |
2207 | - if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId { |
2208 | + if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId { |
2209 | switch event.State { |
2210 | case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED: |
2211 | default: |
2212 | @@ -990,7 +944,7 @@ |
2213 | return |
2214 | } |
2215 | } |
2216 | - ch := zk.watchChannels[watchId] |
2217 | + ch := conn.watchChannels[watchId] |
2218 | if ch == nil { |
2219 | return |
2220 | } |
2221 | @@ -1002,15 +956,15 @@ |
2222 | // straight to the buffer), and the application isn't paying |
2223 | // attention for long enough to have the buffer filled up. |
2224 | // Break down now rather than leaking forever. |
2225 | - if watchId == zk.sessionWatchId { |
2226 | + if watchId == conn.sessionWatchId { |
2227 | panic("Session event channel buffer is full") |
2228 | } else { |
2229 | panic("Watch event channel buffer is full") |
2230 | } |
2231 | } |
2232 | - if watchId != zk.sessionWatchId { |
2233 | - zk.watchChannels[watchId] = nil, false |
2234 | - watchZooKeepers[watchId] = nil, false |
2235 | + if watchId != conn.sessionWatchId { |
2236 | + conn.watchChannels[watchId] = nil, false |
2237 | + watchConns[watchId] = nil, false |
2238 | close(ch) |
2239 | } |
2240 | } |
2241 | |
2242 | === renamed file 'gozk_test.go' => 'zk_test.go' |
2243 | --- gozk_test.go 2011-08-19 01:51:37 +0000 |
2244 | +++ zk_test.go 2011-10-03 15:02:28 +0000 |
2245 | @@ -1,17 +1,17 @@ |
2246 | -package gozk_test |
2247 | +package zk_test |
2248 | |
2249 | import ( |
2250 | . "launchpad.net/gocheck" |
2251 | - "gozk" |
2252 | + "launchpad.net/gozk/zk" |
2253 | "time" |
2254 | ) |
2255 | |
2256 | // This error will be delivered via C errno, since ZK unfortunately |
2257 | // only provides the handler back from zookeeper_init(). |
2258 | func (s *S) TestInitErrorThroughErrno(c *C) { |
2259 | - zk, watch, err := gozk.Init("bad-domain-without-port", 5e9) |
2260 | - if zk != nil { |
2261 | - zk.Close() |
2262 | + conn, watch, err := zk.Dial("bad-domain-without-port", 5e9) |
2263 | + if conn != nil { |
2264 | + conn.Close() |
2265 | } |
2266 | if watch != nil { |
2267 | go func() { |
2268 | @@ -23,15 +23,15 @@ |
2269 | } |
2270 | }() |
2271 | } |
2272 | - c.Assert(zk, IsNil) |
2273 | + c.Assert(conn, IsNil) |
2274 | c.Assert(watch, IsNil) |
2275 | c.Assert(err, Matches, "invalid argument") |
2276 | } |
2277 | |
2278 | func (s *S) TestRecvTimeoutInitParameter(c *C) { |
2279 | - zk, watch, err := gozk.Init(s.zkAddr, 0) |
2280 | + conn, watch, err := zk.Dial(s.zkAddr, 0) |
2281 | c.Assert(err, IsNil) |
2282 | - defer zk.Close() |
2283 | + defer conn.Close() |
2284 | |
2285 | select { |
2286 | case <-watch: |
2287 | @@ -40,7 +40,7 @@ |
2288 | } |
2289 | |
2290 | for i := 0; i != 1000; i++ { |
2291 | - _, _, err := zk.Get("/zookeeper") |
2292 | + _, _, err := conn.Get("/zookeeper") |
2293 | if err != nil { |
2294 | c.Assert(err, Matches, "operation timeout") |
2295 | c.SucceedNow() |
2296 | @@ -51,76 +51,79 @@ |
2297 | } |
2298 | |
2299 | func (s *S) TestSessionWatches(c *C) { |
2300 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2301 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2302 | |
2303 | zk1, watch1 := s.init(c) |
2304 | zk2, watch2 := s.init(c) |
2305 | zk3, watch3 := s.init(c) |
2306 | |
2307 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2308 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2309 | |
2310 | event1 := <-watch1 |
2311 | - c.Assert(event1.Type, Equals, gozk.EVENT_SESSION) |
2312 | - c.Assert(event1.State, Equals, gozk.STATE_CONNECTED) |
2313 | + c.Assert(event1.Type, Equals, zk.EVENT_SESSION) |
2314 | + c.Assert(event1.State, Equals, zk.STATE_CONNECTED) |
2315 | |
2316 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2317 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2318 | |
2319 | event2 := <-watch2 |
2320 | - c.Assert(event2.Type, Equals, gozk.EVENT_SESSION) |
2321 | - c.Assert(event2.State, Equals, gozk.STATE_CONNECTED) |
2322 | + c.Assert(event2.Type, Equals, zk.EVENT_SESSION) |
2323 | + c.Assert(event2.State, Equals, zk.STATE_CONNECTED) |
2324 | |
2325 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2326 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2327 | |
2328 | event3 := <-watch3 |
2329 | - c.Assert(event3.Type, Equals, gozk.EVENT_SESSION) |
2330 | - c.Assert(event3.State, Equals, gozk.STATE_CONNECTED) |
2331 | + c.Assert(event3.Type, Equals, zk.EVENT_SESSION) |
2332 | + c.Assert(event3.State, Equals, zk.STATE_CONNECTED) |
2333 | |
2334 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2335 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2336 | |
2337 | zk1.Close() |
2338 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
2339 | + c.Assert(zk.CountPendingWatches(), Equals, 2) |
2340 | zk2.Close() |
2341 | - c.Assert(gozk.CountPendingWatches(), Equals, 1) |
2342 | + c.Assert(zk.CountPendingWatches(), Equals, 1) |
2343 | zk3.Close() |
2344 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2345 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2346 | } |
2347 | |
2348 | -// Gozk injects a STATE_CLOSED event when zk.Close() is called, right |
2349 | +// Gozk injects a STATE_CLOSED event when conn.Close() is called, right |
2350 | // before the channel is closed. Closing the channel injects a nil |
2351 | // pointer, as usual for Go, so the STATE_CLOSED gives a chance to |
2352 | // know that a nil pointer is coming, and to stop the procedure. |
2353 | // Hopefully this procedure will avoid some nil-pointer references by |
2354 | // mistake. |
2355 | func (s *S) TestClosingStateInSessionWatch(c *C) { |
2356 | - zk, watch := s.init(c) |
2357 | + conn, watch := s.init(c) |
2358 | |
2359 | event := <-watch |
2360 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2361 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2362 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
2363 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
2364 | |
2365 | - zk.Close() |
2366 | + conn.Close() |
2367 | event, ok := <-watch |
2368 | c.Assert(ok, Equals, false) |
2369 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
2370 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
2371 | + c.Assert(event.Type, Equals, zk.EVENT_CLOSED) |
2372 | + c.Assert(event.State, Equals, zk.STATE_CLOSED) |
2373 | } |
2374 | |
2375 | func (s *S) TestEventString(c *C) { |
2376 | - var event gozk.Event |
2377 | - event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED} |
2378 | + var event zk.Event |
2379 | + event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED} |
2380 | c.Assert(event, Matches, "ZooKeeper connected") |
2381 | - event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED} |
2382 | + event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED} |
2383 | c.Assert(event, Matches, "ZooKeeper connected; path created: /path") |
2384 | - event = gozk.Event{-1, "/path", gozk.STATE_CLOSED} |
2385 | + event = zk.Event{-1, "/path", zk.STATE_CLOSED} |
2386 | c.Assert(event, Matches, "ZooKeeper connection closed") |
2387 | } |
2388 | |
2389 | -var okTests = []struct{gozk.Event; Ok bool}{ |
2390 | - {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true}, |
2391 | - {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true}, |
2392 | - {gozk.Event{0, "", gozk.STATE_CLOSED}, false}, |
2393 | - {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false}, |
2394 | - {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false}, |
2395 | +var okTests = []struct { |
2396 | + zk.Event |
2397 | + Ok bool |
2398 | +}{ |
2399 | + {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true}, |
2400 | + {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true}, |
2401 | + {zk.Event{0, "", zk.STATE_CLOSED}, false}, |
2402 | + {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false}, |
2403 | + {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false}, |
2404 | } |
2405 | |
2406 | func (s *S) TestEventOk(c *C) { |
2407 | @@ -130,9 +133,9 @@ |
2408 | } |
2409 | |
2410 | func (s *S) TestGetAndStat(c *C) { |
2411 | - zk, _ := s.init(c) |
2412 | + conn, _ := s.init(c) |
2413 | |
2414 | - data, stat, err := zk.Get("/zookeeper") |
2415 | + data, stat, err := conn.Get("/zookeeper") |
2416 | c.Assert(err, IsNil) |
2417 | c.Assert(data, Equals, "") |
2418 | c.Assert(stat.Czxid(), Equals, int64(0)) |
2419 | @@ -149,58 +152,58 @@ |
2420 | } |
2421 | |
2422 | func (s *S) TestGetAndError(c *C) { |
2423 | - zk, _ := s.init(c) |
2424 | + conn, _ := s.init(c) |
2425 | |
2426 | - data, stat, err := zk.Get("/non-existent") |
2427 | + data, stat, err := conn.Get("/non-existent") |
2428 | |
2429 | c.Assert(data, Equals, "") |
2430 | c.Assert(stat, IsNil) |
2431 | c.Assert(err, Matches, "no node") |
2432 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2433 | + c.Assert(err, Equals, zk.ZNONODE) |
2434 | } |
2435 | |
2436 | func (s *S) TestCreateAndGet(c *C) { |
2437 | - zk, _ := s.init(c) |
2438 | + conn, _ := s.init(c) |
2439 | |
2440 | - path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2441 | + path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2442 | c.Assert(err, IsNil) |
2443 | c.Assert(path, Matches, "/test-[0-9]+") |
2444 | |
2445 | // Check the error condition from Create(). |
2446 | - _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2447 | + _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2448 | c.Assert(err, Matches, "node exists") |
2449 | |
2450 | - data, _, err := zk.Get(path) |
2451 | + data, _, err := conn.Get(path) |
2452 | c.Assert(err, IsNil) |
2453 | c.Assert(data, Equals, "bababum") |
2454 | } |
2455 | |
2456 | func (s *S) TestCreateSetAndGet(c *C) { |
2457 | - zk, _ := s.init(c) |
2458 | + conn, _ := s.init(c) |
2459 | |
2460 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2461 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2462 | c.Assert(err, IsNil) |
2463 | |
2464 | - stat, err := zk.Set("/test", "bababum", -1) // Any version. |
2465 | + stat, err := conn.Set("/test", "bababum", -1) // Any version. |
2466 | c.Assert(err, IsNil) |
2467 | c.Assert(stat.Version(), Equals, int32(1)) |
2468 | |
2469 | - data, _, err := zk.Get("/test") |
2470 | + data, _, err := conn.Get("/test") |
2471 | c.Assert(err, IsNil) |
2472 | c.Assert(data, Equals, "bababum") |
2473 | } |
2474 | |
2475 | func (s *S) TestGetAndWatch(c *C) { |
2476 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2477 | - |
2478 | - zk, _ := s.init(c) |
2479 | - |
2480 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2481 | - |
2482 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2483 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2484 | + |
2485 | + conn, _ := s.init(c) |
2486 | + |
2487 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2488 | + |
2489 | + _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2490 | c.Assert(err, IsNil) |
2491 | |
2492 | - data, stat, watch, err := zk.GetW("/test") |
2493 | + data, stat, watch, err := conn.GetW("/test") |
2494 | c.Assert(err, IsNil) |
2495 | c.Assert(data, Equals, "one") |
2496 | c.Assert(stat.Version(), Equals, int32(0)) |
2497 | @@ -211,17 +214,17 @@ |
2498 | default: |
2499 | } |
2500 | |
2501 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2502 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2503 | |
2504 | - _, err = zk.Set("/test", "two", -1) |
2505 | + _, err = conn.Set("/test", "two", -1) |
2506 | c.Assert(err, IsNil) |
2507 | |
2508 | event := <-watch |
2509 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2510 | - |
2511 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2512 | - |
2513 | - data, _, watch, err = zk.GetW("/test") |
2514 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2515 | + |
2516 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2517 | + |
2518 | + data, _, watch, err = conn.GetW("/test") |
2519 | c.Assert(err, IsNil) |
2520 | c.Assert(data, Equals, "two") |
2521 | |
2522 | @@ -231,86 +234,86 @@ |
2523 | default: |
2524 | } |
2525 | |
2526 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2527 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2528 | |
2529 | - _, err = zk.Set("/test", "three", -1) |
2530 | + _, err = conn.Set("/test", "three", -1) |
2531 | c.Assert(err, IsNil) |
2532 | |
2533 | event = <-watch |
2534 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2535 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2536 | |
2537 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2538 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2539 | } |
2540 | |
2541 | func (s *S) TestGetAndWatchWithError(c *C) { |
2542 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2543 | - |
2544 | - zk, _ := s.init(c) |
2545 | - |
2546 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2547 | - |
2548 | - _, _, watch, err := zk.GetW("/test") |
2549 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2550 | + |
2551 | + conn, _ := s.init(c) |
2552 | + |
2553 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2554 | + |
2555 | + _, _, watch, err := conn.GetW("/test") |
2556 | c.Assert(err, NotNil) |
2557 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2558 | + c.Assert(err, Equals, zk.ZNONODE) |
2559 | c.Assert(watch, IsNil) |
2560 | |
2561 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2562 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2563 | } |
2564 | |
2565 | func (s *S) TestCloseReleasesWatches(c *C) { |
2566 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2567 | - |
2568 | - zk, _ := s.init(c) |
2569 | - |
2570 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2571 | - |
2572 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2573 | - c.Assert(err, IsNil) |
2574 | - |
2575 | - _, _, _, err = zk.GetW("/test") |
2576 | - c.Assert(err, IsNil) |
2577 | - |
2578 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
2579 | - |
2580 | - zk.Close() |
2581 | - |
2582 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2583 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2584 | + |
2585 | + conn, _ := s.init(c) |
2586 | + |
2587 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2588 | + |
2589 | + _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2590 | + c.Assert(err, IsNil) |
2591 | + |
2592 | + _, _, _, err = conn.GetW("/test") |
2593 | + c.Assert(err, IsNil) |
2594 | + |
2595 | + c.Assert(zk.CountPendingWatches(), Equals, 2) |
2596 | + |
2597 | + conn.Close() |
2598 | + |
2599 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2600 | } |
2601 | |
2602 | // By default, the ZooKeeper C client will hang indefinitely if a |
2603 | // handler is closed twice. We get in the way and prevent it. |
2604 | func (s *S) TestClosingTwiceDoesntHang(c *C) { |
2605 | - zk, _ := s.init(c) |
2606 | - err := zk.Close() |
2607 | + conn, _ := s.init(c) |
2608 | + err := conn.Close() |
2609 | c.Assert(err, IsNil) |
2610 | - err = zk.Close() |
2611 | + err = conn.Close() |
2612 | c.Assert(err, NotNil) |
2613 | - c.Assert(err.Code(), Equals, gozk.ZCLOSING) |
2614 | + c.Assert(err, Equals, zk.ZCLOSING) |
2615 | } |
2616 | |
2617 | func (s *S) TestChildren(c *C) { |
2618 | - zk, _ := s.init(c) |
2619 | + conn, _ := s.init(c) |
2620 | |
2621 | - children, stat, err := zk.Children("/") |
2622 | + children, stat, err := conn.Children("/") |
2623 | c.Assert(err, IsNil) |
2624 | c.Assert(children, Equals, []string{"zookeeper"}) |
2625 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
2626 | |
2627 | - children, stat, err = zk.Children("/non-existent") |
2628 | + children, stat, err = conn.Children("/non-existent") |
2629 | c.Assert(err, NotNil) |
2630 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2631 | + c.Assert(err, Equals, zk.ZNONODE) |
2632 | c.Assert(children, Equals, []string{}) |
2633 | - c.Assert(stat, Equals, nil) |
2634 | + c.Assert(stat, IsNil) |
2635 | } |
2636 | |
2637 | func (s *S) TestChildrenAndWatch(c *C) { |
2638 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2639 | - |
2640 | - zk, _ := s.init(c) |
2641 | - |
2642 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2643 | - |
2644 | - children, stat, watch, err := zk.ChildrenW("/") |
2645 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2646 | + |
2647 | + conn, _ := s.init(c) |
2648 | + |
2649 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2650 | + |
2651 | + children, stat, watch, err := conn.ChildrenW("/") |
2652 | c.Assert(err, IsNil) |
2653 | c.Assert(children, Equals, []string{"zookeeper"}) |
2654 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
2655 | @@ -321,18 +324,18 @@ |
2656 | default: |
2657 | } |
2658 | |
2659 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2660 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2661 | |
2662 | - _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2663 | + _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2664 | c.Assert(err, IsNil) |
2665 | |
2666 | event := <-watch |
2667 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
2668 | + c.Assert(event.Type, Equals, zk.EVENT_CHILD) |
2669 | c.Assert(event.Path, Equals, "/") |
2670 | |
2671 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2672 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2673 | |
2674 | - children, stat, watch, err = zk.ChildrenW("/") |
2675 | + children, stat, watch, err = conn.ChildrenW("/") |
2676 | c.Assert(err, IsNil) |
2677 | c.Assert(stat.NumChildren(), Equals, int32(2)) |
2678 | |
2679 | @@ -345,57 +348,56 @@ |
2680 | default: |
2681 | } |
2682 | |
2683 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2684 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2685 | |
2686 | - _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2687 | + _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2688 | c.Assert(err, IsNil) |
2689 | |
2690 | event = <-watch |
2691 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
2692 | + c.Assert(event.Type, Equals, zk.EVENT_CHILD) |
2693 | |
2694 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2695 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2696 | } |
2697 | |
2698 | func (s *S) TestChildrenAndWatchWithError(c *C) { |
2699 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2700 | - |
2701 | - zk, _ := s.init(c) |
2702 | - |
2703 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2704 | - |
2705 | - _, stat, watch, err := zk.ChildrenW("/test") |
2706 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2707 | + |
2708 | + conn, _ := s.init(c) |
2709 | + |
2710 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2711 | + |
2712 | + _, stat, watch, err := conn.ChildrenW("/test") |
2713 | c.Assert(err, NotNil) |
2714 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2715 | + c.Assert(err, Equals, zk.ZNONODE) |
2716 | c.Assert(watch, IsNil) |
2717 | c.Assert(stat, IsNil) |
2718 | |
2719 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2720 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2721 | } |
2722 | |
2723 | func (s *S) TestExists(c *C) { |
2724 | - zk, _ := s.init(c) |
2725 | - |
2726 | - stat, err := zk.Exists("/zookeeper") |
2727 | - c.Assert(err, IsNil) |
2728 | - c.Assert(stat.NumChildren(), Equals, int32(1)) |
2729 | - |
2730 | - stat, err = zk.Exists("/non-existent") |
2731 | + conn, _ := s.init(c) |
2732 | + |
2733 | + stat, err := conn.Exists("/non-existent") |
2734 | c.Assert(err, IsNil) |
2735 | c.Assert(stat, IsNil) |
2736 | + |
2737 | + stat, err = conn.Exists("/zookeeper") |
2738 | + c.Assert(err, IsNil) |
2739 | } |
2740 | |
2741 | func (s *S) TestExistsAndWatch(c *C) { |
2742 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2743 | - |
2744 | - zk, _ := s.init(c) |
2745 | - |
2746 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2747 | - |
2748 | - stat, watch, err := zk.ExistsW("/test") |
2749 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2750 | + |
2751 | + conn, _ := s.init(c) |
2752 | + |
2753 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2754 | + |
2755 | + stat, watch, err := conn.ExistsW("/test") |
2756 | c.Assert(err, IsNil) |
2757 | c.Assert(stat, IsNil) |
2758 | |
2759 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2760 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2761 | |
2762 | select { |
2763 | case <-watch: |
2764 | @@ -403,62 +405,62 @@ |
2765 | default: |
2766 | } |
2767 | |
2768 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2769 | + _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2770 | c.Assert(err, IsNil) |
2771 | |
2772 | event := <-watch |
2773 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
2774 | + c.Assert(event.Type, Equals, zk.EVENT_CREATED) |
2775 | c.Assert(event.Path, Equals, "/test") |
2776 | |
2777 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2778 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2779 | |
2780 | - stat, watch, err = zk.ExistsW("/test") |
2781 | + stat, watch, err = conn.ExistsW("/test") |
2782 | c.Assert(err, IsNil) |
2783 | c.Assert(stat, NotNil) |
2784 | c.Assert(stat.NumChildren(), Equals, int32(0)) |
2785 | |
2786 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2787 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2788 | } |
2789 | |
2790 | func (s *S) TestExistsAndWatchWithError(c *C) { |
2791 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2792 | - |
2793 | - zk, _ := s.init(c) |
2794 | - |
2795 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2796 | - |
2797 | - stat, watch, err := zk.ExistsW("///") |
2798 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2799 | + |
2800 | + conn, _ := s.init(c) |
2801 | + |
2802 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2803 | + |
2804 | + stat, watch, err := conn.ExistsW("///") |
2805 | c.Assert(err, NotNil) |
2806 | - c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS) |
2807 | + c.Assert(err, Equals, zk.ZBADARGUMENTS) |
2808 | c.Assert(stat, IsNil) |
2809 | c.Assert(watch, IsNil) |
2810 | |
2811 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2812 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2813 | } |
2814 | |
2815 | func (s *S) TestDelete(c *C) { |
2816 | - zk, _ := s.init(c) |
2817 | - |
2818 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2819 | - c.Assert(err, IsNil) |
2820 | - |
2821 | - err = zk.Delete("/test", 5) |
2822 | - c.Assert(err, NotNil) |
2823 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
2824 | - |
2825 | - err = zk.Delete("/test", -1) |
2826 | - c.Assert(err, IsNil) |
2827 | - |
2828 | - err = zk.Delete("/test", -1) |
2829 | - c.Assert(err, NotNil) |
2830 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2831 | + conn, _ := s.init(c) |
2832 | + |
2833 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2834 | + c.Assert(err, IsNil) |
2835 | + |
2836 | + err = conn.Delete("/test", 5) |
2837 | + c.Assert(err, NotNil) |
2838 | + c.Assert(err, Equals, zk.ZBADVERSION) |
2839 | + |
2840 | + err = conn.Delete("/test", -1) |
2841 | + c.Assert(err, IsNil) |
2842 | + |
2843 | + err = conn.Delete("/test", -1) |
2844 | + c.Assert(err, NotNil) |
2845 | + c.Assert(err, Equals, zk.ZNONODE) |
2846 | } |
2847 | |
2848 | func (s *S) TestClientIdAndReInit(c *C) { |
2849 | zk1, _ := s.init(c) |
2850 | clientId1 := zk1.ClientId() |
2851 | |
2852 | - zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1) |
2853 | + zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1) |
2854 | c.Assert(err, IsNil) |
2855 | defer zk2.Close() |
2856 | clientId2 := zk2.ClientId() |
2857 | @@ -469,110 +471,114 @@ |
2858 | // Surprisingly for some (including myself, initially), the watch |
2859 | // returned by the exists method actually fires on data changes too. |
2860 | func (s *S) TestExistsWatchOnDataChange(c *C) { |
2861 | - zk, _ := s.init(c) |
2862 | - |
2863 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2864 | - c.Assert(err, IsNil) |
2865 | - |
2866 | - _, watch, err := zk.ExistsW("/test") |
2867 | - c.Assert(err, IsNil) |
2868 | - |
2869 | - _, err = zk.Set("/test", "new", -1) |
2870 | + conn, _ := s.init(c) |
2871 | + |
2872 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2873 | + c.Assert(err, IsNil) |
2874 | + |
2875 | + _, watch, err := conn.ExistsW("/test") |
2876 | + c.Assert(err, IsNil) |
2877 | + |
2878 | + _, err = conn.Set("/test", "new", -1) |
2879 | c.Assert(err, IsNil) |
2880 | |
2881 | event := <-watch |
2882 | |
2883 | c.Assert(event.Path, Equals, "/test") |
2884 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2885 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2886 | } |
2887 | |
2888 | func (s *S) TestACL(c *C) { |
2889 | - zk, _ := s.init(c) |
2890 | - |
2891 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2892 | - c.Assert(err, IsNil) |
2893 | - |
2894 | - acl, stat, err := zk.ACL("/test") |
2895 | - c.Assert(err, IsNil) |
2896 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
2897 | + conn, _ := s.init(c) |
2898 | + |
2899 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2900 | + c.Assert(err, IsNil) |
2901 | + |
2902 | + acl, stat, err := conn.ACL("/test") |
2903 | + c.Assert(err, IsNil) |
2904 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
2905 | c.Assert(stat, NotNil) |
2906 | c.Assert(stat.Version(), Equals, int32(0)) |
2907 | |
2908 | - acl, stat, err = zk.ACL("/non-existent") |
2909 | + acl, stat, err = conn.ACL("/non-existent") |
2910 | c.Assert(err, NotNil) |
2911 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2912 | + c.Assert(err, Equals, zk.ZNONODE) |
2913 | c.Assert(acl, IsNil) |
2914 | c.Assert(stat, IsNil) |
2915 | } |
2916 | |
2917 | func (s *S) TestSetACL(c *C) { |
2918 | - zk, _ := s.init(c) |
2919 | + conn, _ := s.init(c) |
2920 | |
2921 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2922 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2923 | c.Assert(err, IsNil) |
2924 | |
2925 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5) |
2926 | + err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5) |
2927 | c.Assert(err, NotNil) |
2928 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
2929 | - |
2930 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1) |
2931 | - c.Assert(err, IsNil) |
2932 | - |
2933 | - acl, _, err := zk.ACL("/test") |
2934 | - c.Assert(err, IsNil) |
2935 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
2936 | + c.Assert(err, Equals, zk.ZBADVERSION) |
2937 | + |
2938 | + err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1) |
2939 | + c.Assert(err, IsNil) |
2940 | + |
2941 | + acl, _, err := conn.ACL("/test") |
2942 | + c.Assert(err, IsNil) |
2943 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
2944 | } |
2945 | |
2946 | func (s *S) TestAddAuth(c *C) { |
2947 | - zk, _ := s.init(c) |
2948 | - |
2949 | - acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
2950 | - |
2951 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl) |
2952 | + conn, _ := s.init(c) |
2953 | + |
2954 | + acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
2955 | + |
2956 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, acl) |
2957 | c.Assert(err, IsNil) |
2958 | |
2959 | - _, _, err = zk.Get("/test") |
2960 | + _, _, err = conn.Get("/test") |
2961 | c.Assert(err, NotNil) |
2962 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
2963 | + c.Assert(err, Equals, zk.ZNOAUTH) |
2964 | |
2965 | - err = zk.AddAuth("digest", "joe:passwd") |
2966 | + err = conn.AddAuth("digest", "joe:passwd") |
2967 | c.Assert(err, IsNil) |
2968 | |
2969 | - _, _, err = zk.Get("/test") |
2970 | + _, _, err = conn.Get("/test") |
2971 | c.Assert(err, IsNil) |
2972 | } |
2973 | |
2974 | func (s *S) TestWatchOnReconnection(c *C) { |
2975 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2976 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2977 | |
2978 | - zk, session := s.init(c) |
2979 | + conn, session := s.init(c) |
2980 | |
2981 | event := <-session |
2982 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2983 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2984 | - |
2985 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2986 | - |
2987 | - stat, watch, err := zk.ExistsW("/test") |
2988 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
2989 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
2990 | + |
2991 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2992 | + |
2993 | + stat, watch, err := conn.ExistsW("/test") |
2994 | c.Assert(err, IsNil) |
2995 | c.Assert(stat, IsNil) |
2996 | |
2997 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2998 | - |
2999 | - s.StopZK() |
3000 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3001 | + |
3002 | + err = s.zkServer.Stop() |
3003 | + c.Assert(err, IsNil) |
3004 | + |
3005 | time.Sleep(2e9) |
3006 | - s.StartZK() |
3007 | |
3008 | - // The session channel should receive the reconnection notification, |
3009 | + err = s.zkServer.Start() |
3010 | + c.Assert(err, IsNil) |
3011 | + |
3012 | + // The session channel should receive the reconnection notification. |
3013 | select { |
3014 | case event := <-session: |
3015 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTING) |
3016 | + c.Assert(event.State, Equals, zk.STATE_CONNECTING) |
3017 | case <-time.After(3e9): |
3018 | c.Fatal("Session watch didn't fire") |
3019 | } |
3020 | select { |
3021 | case event := <-session: |
3022 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
3023 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3024 | case <-time.After(3e9): |
3025 | c.Fatal("Session watch didn't fire") |
3026 | } |
3027 | @@ -585,40 +591,40 @@ |
3028 | } |
3029 | |
3030 | // And it should still work. |
3031 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3032 | + _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3033 | c.Assert(err, IsNil) |
3034 | |
3035 | event = <-watch |
3036 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
3037 | + c.Assert(event.Type, Equals, zk.EVENT_CREATED) |
3038 | c.Assert(event.Path, Equals, "/test") |
3039 | |
3040 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3041 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3042 | } |
3043 | |
3044 | func (s *S) TestWatchOnSessionExpiration(c *C) { |
3045 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3046 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3047 | |
3048 | - zk, session := s.init(c) |
3049 | + conn, session := s.init(c) |
3050 | |
3051 | event := <-session |
3052 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
3053 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
3054 | - |
3055 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3056 | - |
3057 | - stat, watch, err := zk.ExistsW("/test") |
3058 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
3059 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3060 | + |
3061 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3062 | + |
3063 | + stat, watch, err := conn.ExistsW("/test") |
3064 | c.Assert(err, IsNil) |
3065 | c.Assert(stat, IsNil) |
3066 | |
3067 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3068 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3069 | |
3070 | // Use expiration trick described in the FAQ. |
3071 | - clientId := zk.ClientId() |
3072 | - zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId) |
3073 | + clientId := conn.ClientId() |
3074 | + zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId) |
3075 | |
3076 | for event := range session2 { |
3077 | c.Log("Event from overlapping session: ", event) |
3078 | - if event.State == gozk.STATE_CONNECTED { |
3079 | + if event.State == zk.STATE_CONNECTED { |
3080 | // Wait for zk to process the connection. |
3081 | // Not reliable without this. :-( |
3082 | time.Sleep(1e9) |
3083 | @@ -627,21 +633,21 @@ |
3084 | } |
3085 | for event := range session { |
3086 | c.Log("Event from primary session: ", event) |
3087 | - if event.State == gozk.STATE_EXPIRED_SESSION { |
3088 | + if event.State == zk.STATE_EXPIRED_SESSION { |
3089 | break |
3090 | } |
3091 | } |
3092 | |
3093 | select { |
3094 | case event := <-watch: |
3095 | - c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION) |
3096 | + c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION) |
3097 | case <-time.After(3e9): |
3098 | c.Fatal("Watch event didn't fire") |
3099 | } |
3100 | |
3101 | event = <-watch |
3102 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
3103 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
3104 | + c.Assert(event.Type, Equals, zk.EVENT_CLOSED) |
3105 | + c.Assert(event.State, Equals, zk.STATE_CLOSED) |
3106 | |
3107 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3108 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3109 | } |
This is an interesting idea, but at the same time it's full of problems and
not really necessary right now. I suggest we delay it until a future point
when we actually need that abstraction, as we'll be in a better position
to generalize it. At that point, we can even preserve for the internal
implementation of the ZooKeeper server itself, without many problems.
Here is some feedback, regardless, which I suggest you don't work on
right now besides for [1] which should be integrated.
[1]
=== added file 'reattach_test.go'
--- reattach_test.go 1970-01-01 00:00:00 +0000
+++ reattach_test.go 2011-10-05 14:37:11 +0000
This should be in a separate merge proposal as it's important and unrelated
to the strawman changes proposed.
[2]
+type Interface interface {
+ // Directory gives the path to the directory containing
+ // the server's configuration and data files. It is assumed that
+ // this exists and can be modified.
+ Directory() string
s/Directory/Dir/
[3]
+
+ // CheckAvailability should return an error if resources
+ // required by the server are not available; for instance
+ // if the server requires a network port which is not free.
+ CheckAvailability() os.Error
We don't need this. The Command function can return an error if
it knows it cannot start.
[4]
+ l, err := net.Listen("tcp", fmt.Sprintf( "localhost: %d", port))
+ if err != nil {
+ return fmt.Errorf("cannot listen on port %v: %v", port, err)
+ }
+ l.Close()
This doesn't really guarantee that when the service itself is started,
the port will be available, so we can as well not do it.
[5]
+// Process returns a reference to the identity
+// of the last running server in the Service's directory.
+// If the server was shut down, it returns (nil, nil).
This shouldn't be in a generic service API. What if there
are multiple processes handled by the service?
[6]
+// If the server was shut down, it returns (nil, nil).
If we're returning a nil *Process, we necessarily need err != nil.
[7]
+// It stores the process ID of the server in the file "pid.txt"
+// inside the server's directory. Stdout and stderr of the server
+// are similarly written to "log.txt".
That's not nice. Who owns the content of this directory, the
interface implementation, or the Service? We can't write
arbitrary content into a directory of different ownership.
Also, a generic interface would need to do better in terms
of logging. We could have a file somewhere named output.txt, maybe,
and let logging with the underlying implementation.
[8]
+ if err := p.Kill(); err != nil && !strings. Contains( err.String( ), "no such process") {
+ return fmt.Errorf("cannot kill server process: %v", err)
+ }
Different services are stopped in different ways. Some prefer a SIGTERM,
before a SIGKILL, for instance. And what happens if there are multiple
processes?
[9]
+ if err != nil && strings. Contains( err.String( ), "no child processes") {
+ // If we can't wait for the server, it's possible that it was running
+ // but not as a child of this process, so give it a little while
+ // to exit. TODO po...