Merge lp:~rogpeppe/gozk/update-documentation into lp:~juju/gozk/trunk
- update-documentation
- Merge into trunk
Status: | Rejected | ||||
---|---|---|---|---|---|
Rejected by: | Gustavo Niemeyer | ||||
Proposed branch: | lp:~rogpeppe/gozk/update-documentation | ||||
Merge into: | lp:~juju/gozk/trunk | ||||
Diff against target: |
3213 lines (+1334/-814) 10 files modified
Makefile (+5/-2) example/example.go (+22/-23) reattach_test.go (+202/-0) retry_test.go (+65/-74) server.go (+239/-0) service/Makefile (+20/-0) service/service.go (+160/-0) suite_test.go (+45/-122) zk.go (+318/-341) zk_test.go (+258/-252) |
||||
To merge this branch: | bzr merge lp:~rogpeppe/gozk/update-documentation | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+77980@code.launchpad.net |
Commit message
Description of the change
Update documentation.
Add more complete description of event delivery and comments
on Stat field accessors.
Gustavo Niemeyer (niemeyer) wrote : | # |
Roger Peppe (rogpeppe) wrote : | # |
this was a while ago. i didn't know how to handle the review system...
i think the intended changes are still worth making. i'll make a CL.
i could also use that proposal to change appropriate methods
to return a time.Time if that seems reasonable (e.g. Stat.CTime etc)
On 10 February 2012 12:27, Gustavo Niemeyer <email address hidden> wrote:
> The proposal to merge lp:~rogpeppe/gozk/update-documentation into lp:gozk has been updated.
>
> Status: Needs review => Rejected
>
> For more details, see:
> https:/
> --
> https:/
> You are the owner of lp:~rogpeppe/gozk/update-documentation.
Unmerged revisions
- 24. By Roger Peppe
-
Update documentation to include more accurate
description of events and comments on stat field accessors. - 23. By Roger Peppe
-
Factored out server running code into a separate package.
- 22. By Roger Peppe
-
Fix Server code.
- 21. By Gustavo Niemeyer
-
The package location is now http://
launchpad. net/gozk/ zk. The shorter package name makes for significantly more digestible line lengths.
- 20. By Gustavo Niemeyer
-
Merged clean-up-interface branch, by Roger Peppe [r=niemeyer]
This branch does a number of incompatible changes that improve
the overall API of gozk.
Preview Diff
1 | === modified file 'Makefile' | |||
2 | --- Makefile 2011-08-03 01:56:58 +0000 | |||
3 | +++ Makefile 2011-10-03 17:02:23 +0000 | |||
4 | @@ -2,10 +2,13 @@ | |||
5 | 2 | 2 | ||
6 | 3 | all: package | 3 | all: package |
7 | 4 | 4 | ||
9 | 5 | TARG=gozk | 5 | TARG=launchpad.net/gozk/zk |
10 | 6 | 6 | ||
11 | 7 | GOFILES=\ | ||
12 | 8 | server.go\ | ||
13 | 9 | |||
14 | 7 | CGOFILES=\ | 10 | CGOFILES=\ |
16 | 8 | gozk.go\ | 11 | zk.go\ |
17 | 9 | 12 | ||
18 | 10 | CGO_OFILES=\ | 13 | CGO_OFILES=\ |
19 | 11 | helpers.o\ | 14 | helpers.o\ |
20 | 12 | 15 | ||
21 | === added directory 'example' | |||
22 | === renamed file 'example.go' => 'example/example.go' | |||
23 | --- example.go 2011-08-03 01:47:25 +0000 | |||
24 | +++ example/example.go 2011-10-03 17:02:23 +0000 | |||
25 | @@ -1,30 +1,29 @@ | |||
26 | 1 | package main | 1 | package main |
27 | 2 | 2 | ||
28 | 3 | import ( | 3 | import ( |
30 | 4 | "gozk" | 4 | "launchpad.net/zookeeper/zookeeper" |
31 | 5 | ) | 5 | ) |
32 | 6 | 6 | ||
33 | 7 | func main() { | 7 | func main() { |
55 | 8 | zk, session, err := gozk.Init("localhost:2181", 5000) | 8 | zk, session, err := zookeeper.Init("localhost:2181", 5000) |
56 | 9 | if err != nil { | 9 | if err != nil { |
57 | 10 | println("Couldn't connect: " + err.String()) | 10 | println("Couldn't connect: " + err.String()) |
58 | 11 | return | 11 | return |
59 | 12 | } | 12 | } |
60 | 13 | 13 | ||
61 | 14 | defer zk.Close() | 14 | defer zk.Close() |
62 | 15 | 15 | ||
63 | 16 | // Wait for connection. | 16 | // Wait for connection. |
64 | 17 | event := <-session | 17 | event := <-session |
65 | 18 | if event.State != gozk.STATE_CONNECTED { | 18 | if event.State != zookeeper.STATE_CONNECTED { |
66 | 19 | println("Couldn't connect") | 19 | println("Couldn't connect") |
67 | 20 | return | 20 | return |
68 | 21 | } | 21 | } |
69 | 22 | 22 | ||
70 | 23 | _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL)) | 23 | _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
71 | 24 | if err != nil { | 24 | if err != nil { |
72 | 25 | println(err.String()) | 25 | println(err.String()) |
73 | 26 | } else { | 26 | } else { |
74 | 27 | println("Created!") | 27 | println("Created!") |
75 | 28 | } | 28 | } |
76 | 29 | } | 29 | } |
77 | 30 | |||
78 | 31 | 30 | ||
79 | === added file 'reattach_test.go' | |||
80 | --- reattach_test.go 1970-01-01 00:00:00 +0000 | |||
81 | +++ reattach_test.go 2011-10-03 17:02:23 +0000 | |||
82 | @@ -0,0 +1,202 @@ | |||
83 | 1 | package zk_test | ||
84 | 2 | |||
85 | 3 | import ( | ||
86 | 4 | "bufio" | ||
87 | 5 | . "launchpad.net/gocheck" | ||
88 | 6 | "launchpad.net/gozk/zk/service" | ||
89 | 7 | "launchpad.net/gozk/zk" | ||
90 | 8 | "exec" | ||
91 | 9 | "flag" | ||
92 | 10 | "fmt" | ||
93 | 11 | "os" | ||
94 | 12 | "strings" | ||
95 | 13 | "testing" | ||
96 | 14 | "time" | ||
97 | 15 | ) | ||
98 | 16 | |||
99 | 17 | var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing") | ||
100 | 18 | var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing") | ||
101 | 19 | var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing") | ||
102 | 20 | |||
103 | 21 | // This is the reentrancy point for testing ZooKeeper servers | ||
104 | 22 | // started by processes that are not direct children of the | ||
105 | 23 | // testing process. This test always succeeds - the status | ||
106 | 24 | // will be written to stdout and read by indirectServer. | ||
107 | 25 | func TestStartNonChildServer(t *testing.T) { | ||
108 | 26 | if !*reattach { | ||
109 | 27 | // not re-entrant, so ignore this test. | ||
110 | 28 | return | ||
111 | 29 | } | ||
112 | 30 | err := startServer(*reattachRunDir, *reattachAbnormalStop) | ||
113 | 31 | if err != nil { | ||
114 | 32 | fmt.Printf("error:%v\n", err) | ||
115 | 33 | return | ||
116 | 34 | } | ||
117 | 35 | fmt.Printf("done\n") | ||
118 | 36 | } | ||
119 | 37 | |||
120 | 38 | func (s *S) startServer(c *C, abort bool) { | ||
121 | 39 | err := startServer(s.zkTestRoot, abort) | ||
122 | 40 | c.Assert(err, IsNil) | ||
123 | 41 | } | ||
124 | 42 | |||
125 | 43 | // startServerIndirect starts a ZooKeeper server that is not | ||
126 | 44 | // a direct child of the current process. If abort is true, | ||
127 | 45 | // the server will be started and then terminated abnormally. | ||
128 | 46 | func (s *S) startServerIndirect(c *C, abort bool) { | ||
129 | 47 | if len(os.Args) == 0 { | ||
130 | 48 | c.Fatal("Cannot find self executable name") | ||
131 | 49 | } | ||
132 | 50 | cmd := exec.Command( | ||
133 | 51 | os.Args[0], | ||
134 | 52 | "-zktest.reattach", | ||
135 | 53 | "-zktest.rundir", s.zkTestRoot, | ||
136 | 54 | "-zktest.stop=", fmt.Sprint(abort), | ||
137 | 55 | "-test.run", "StartNonChildServer", | ||
138 | 56 | ) | ||
139 | 57 | r, err := cmd.StdoutPipe() | ||
140 | 58 | c.Assert(err, IsNil) | ||
141 | 59 | defer r.Close() | ||
142 | 60 | if err := cmd.Start(); err != nil { | ||
143 | 61 | c.Fatalf("cannot start re-entrant gotest process: %v", err) | ||
144 | 62 | } | ||
145 | 63 | defer cmd.Wait() | ||
146 | 64 | bio := bufio.NewReader(r) | ||
147 | 65 | for { | ||
148 | 66 | line, err := bio.ReadSlice('\n') | ||
149 | 67 | if err != nil { | ||
150 | 68 | c.Fatalf("indirect server status line not found: %v", err) | ||
151 | 69 | } | ||
152 | 70 | s := string(line) | ||
153 | 71 | if strings.HasPrefix(s, "error:") { | ||
154 | 72 | c.Fatalf("indirect server error: %s", s[len("error:"):]) | ||
155 | 73 | } | ||
156 | 74 | if s == "done\n" { | ||
157 | 75 | return | ||
158 | 76 | } | ||
159 | 77 | } | ||
160 | 78 | panic("not reached") | ||
161 | 79 | } | ||
162 | 80 | |||
163 | 81 | // startServer starts a ZooKeeper server, and terminates it abnormally | ||
164 | 82 | // if abort is true. | ||
165 | 83 | func startServer(runDir string, abort bool) os.Error { | ||
166 | 84 | s, err := zk.AttachServer(runDir) | ||
167 | 85 | if err != nil { | ||
168 | 86 | return fmt.Errorf("cannot attach to server at %q: %v", runDir, err) | ||
169 | 87 | } | ||
170 | 88 | srv := service.New(s) | ||
171 | 89 | if err := srv.Start(); err != nil { | ||
172 | 90 | return fmt.Errorf("cannot start server: %v", err) | ||
173 | 91 | } | ||
174 | 92 | if abort { | ||
175 | 93 | // Give it time to start up, then kill the server process abnormally, | ||
176 | 94 | // leaving the pid.txt file behind. | ||
177 | 95 | time.Sleep(0.5e9) | ||
178 | 96 | p, err := srv.Process() | ||
179 | 97 | if err != nil { | ||
180 | 98 | return fmt.Errorf("cannot get server process: %v", err) | ||
181 | 99 | } | ||
182 | 100 | defer p.Release() | ||
183 | 101 | if err := p.Kill(); err != nil { | ||
184 | 102 | return fmt.Errorf("cannot kill server process: %v", err) | ||
185 | 103 | } | ||
186 | 104 | } | ||
187 | 105 | return nil | ||
188 | 106 | } | ||
189 | 107 | |||
190 | 108 | func (s *S) checkCookie(c *C) { | ||
191 | 109 | conn, watch, err := zk.Dial(s.zkAddr, 5e9) | ||
192 | 110 | c.Assert(err, IsNil) | ||
193 | 111 | |||
194 | 112 | e, ok := <-watch | ||
195 | 113 | c.Assert(ok, Equals, true) | ||
196 | 114 | c.Assert(e.Ok(), Equals, true) | ||
197 | 115 | |||
198 | 116 | c.Assert(err, IsNil) | ||
199 | 117 | cookie, _, err := conn.Get("/testAttachCookie") | ||
200 | 118 | c.Assert(err, IsNil) | ||
201 | 119 | c.Assert(cookie, Equals, "testAttachCookie") | ||
202 | 120 | conn.Close() | ||
203 | 121 | } | ||
204 | 122 | |||
205 | 123 | // cases to test: | ||
206 | 124 | // child server, stopped normally; reattach, start | ||
207 | 125 | // non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start | ||
208 | 126 | // non-direct child server, still running; reattach, start (->error), stop, start | ||
209 | 127 | // child server, still running; reattach, start (-> error) | ||
210 | 128 | // child server, still running; reattach, stop, start. | ||
211 | 129 | // non-direct child server, still running; reattach, stop, start. | ||
212 | 130 | func (s *S) TestAttachServer(c *C) { | ||
213 | 131 | // Create a cookie so that we know we are reattaching to the same instance. | ||
214 | 132 | conn, _ := s.init(c) | ||
215 | 133 | _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL)) | ||
216 | 134 | c.Assert(err, IsNil) | ||
217 | 135 | s.checkCookie(c) | ||
218 | 136 | s.zkServer.Stop() | ||
219 | 137 | s.zkServer = nil | ||
220 | 138 | |||
221 | 139 | s.testAttachServer(c, (*S).startServer) | ||
222 | 140 | s.testAttachServer(c, (*S).startServerIndirect) | ||
223 | 141 | s.testAttachServerAbnormalTerminate(c, (*S).startServer) | ||
224 | 142 | s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect) | ||
225 | 143 | |||
226 | 144 | srv, err := zk.AttachServer(s.zkTestRoot) | ||
227 | 145 | c.Assert(err, IsNil) | ||
228 | 146 | |||
229 | 147 | s.zkServer = service.New(srv) | ||
230 | 148 | err = s.zkServer.Start() | ||
231 | 149 | c.Assert(err, IsNil) | ||
232 | 150 | |||
233 | 151 | conn, _ = s.init(c) | ||
234 | 152 | err = conn.Delete("/testAttachCookie", -1) | ||
235 | 153 | c.Assert(err, IsNil) | ||
236 | 154 | } | ||
237 | 155 | |||
238 | 156 | func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) { | ||
239 | 157 | start(s, c, false) | ||
240 | 158 | |||
241 | 159 | s.checkCookie(c) | ||
242 | 160 | |||
243 | 161 | // try attaching to it while it is still running - it should fail. | ||
244 | 162 | a, err := zk.AttachServer(s.zkTestRoot) | ||
245 | 163 | c.Assert(err, IsNil) | ||
246 | 164 | srv := service.New(a) | ||
247 | 165 | |||
248 | 166 | err = srv.Start() | ||
249 | 167 | c.Assert(err, NotNil) | ||
250 | 168 | |||
251 | 169 | // stop it and then start it again - it should succeed. | ||
252 | 170 | err = srv.Stop() | ||
253 | 171 | c.Assert(err, IsNil) | ||
254 | 172 | |||
255 | 173 | err = srv.Start() | ||
256 | 174 | c.Assert(err, IsNil) | ||
257 | 175 | |||
258 | 176 | s.checkCookie(c) | ||
259 | 177 | |||
260 | 178 | err = srv.Stop() | ||
261 | 179 | c.Assert(err, IsNil) | ||
262 | 180 | } | ||
263 | 181 | |||
264 | 182 | func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) { | ||
265 | 183 | start(s, c, true) | ||
266 | 184 | |||
267 | 185 | // try attaching to it and starting - it should fail, because pid.txt | ||
268 | 186 | // won't have been removed. | ||
269 | 187 | a, err := zk.AttachServer(s.zkTestRoot) | ||
270 | 188 | c.Assert(err, IsNil) | ||
271 | 189 | srv := service.New(a) | ||
272 | 190 | err = srv.Start() | ||
273 | 191 | c.Assert(err, NotNil) | ||
274 | 192 | |||
275 | 193 | // stopping it should bring things back to normal. | ||
276 | 194 | err = srv.Stop() | ||
277 | 195 | c.Assert(err, IsNil) | ||
278 | 196 | err = srv.Start() | ||
279 | 197 | c.Assert(err, IsNil) | ||
280 | 198 | |||
281 | 199 | s.checkCookie(c) | ||
282 | 200 | err = srv.Stop() | ||
283 | 201 | c.Assert(err, IsNil) | ||
284 | 202 | } | ||
285 | 0 | \ No newline at end of file | 203 | \ No newline at end of file |
286 | 1 | 204 | ||
287 | === modified file 'retry_test.go' | |||
288 | --- retry_test.go 2011-08-19 01:51:37 +0000 | |||
289 | +++ retry_test.go 2011-10-03 17:02:23 +0000 | |||
290 | @@ -1,42 +1,41 @@ | |||
292 | 1 | package gozk_test | 1 | package zk_test |
293 | 2 | 2 | ||
294 | 3 | import ( | 3 | import ( |
295 | 4 | . "launchpad.net/gocheck" | 4 | . "launchpad.net/gocheck" |
297 | 5 | "gozk" | 5 | "launchpad.net/gozk/zk" |
298 | 6 | "os" | 6 | "os" |
299 | 7 | ) | 7 | ) |
300 | 8 | 8 | ||
301 | 9 | func (s *S) TestRetryChangeCreating(c *C) { | 9 | func (s *S) TestRetryChangeCreating(c *C) { |
303 | 10 | zk, _ := s.init(c) | 10 | conn, _ := s.init(c) |
304 | 11 | 11 | ||
307 | 12 | err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), | 12 | err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
308 | 13 | func(data string, stat gozk.Stat) (string, os.Error) { | 13 | func(data string, stat *zk.Stat) (string, os.Error) { |
309 | 14 | c.Assert(data, Equals, "") | 14 | c.Assert(data, Equals, "") |
310 | 15 | c.Assert(stat, IsNil) | 15 | c.Assert(stat, IsNil) |
311 | 16 | return "new", nil | 16 | return "new", nil |
312 | 17 | }) | 17 | }) |
313 | 18 | c.Assert(err, IsNil) | 18 | c.Assert(err, IsNil) |
314 | 19 | 19 | ||
316 | 20 | data, stat, err := zk.Get("/test") | 20 | data, stat, err := conn.Get("/test") |
317 | 21 | c.Assert(err, IsNil) | 21 | c.Assert(err, IsNil) |
318 | 22 | c.Assert(stat, NotNil) | 22 | c.Assert(stat, NotNil) |
319 | 23 | c.Assert(stat.Version(), Equals, int32(0)) | 23 | c.Assert(stat.Version(), Equals, int32(0)) |
320 | 24 | c.Assert(data, Equals, "new") | 24 | c.Assert(data, Equals, "new") |
321 | 25 | 25 | ||
323 | 26 | acl, _, err := zk.ACL("/test") | 26 | acl, _, err := conn.ACL("/test") |
324 | 27 | c.Assert(err, IsNil) | 27 | c.Assert(err, IsNil) |
326 | 28 | c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) | 28 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
327 | 29 | } | 29 | } |
328 | 30 | 30 | ||
329 | 31 | func (s *S) TestRetryChangeSetting(c *C) { | 31 | func (s *S) TestRetryChangeSetting(c *C) { |
331 | 32 | zk, _ := s.init(c) | 32 | conn, _ := s.init(c) |
332 | 33 | 33 | ||
335 | 34 | _, err := zk.Create("/test", "old", gozk.EPHEMERAL, | 34 | _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
334 | 35 | gozk.WorldACL(gozk.PERM_ALL)) | ||
336 | 36 | c.Assert(err, IsNil) | 35 | c.Assert(err, IsNil) |
337 | 37 | 36 | ||
340 | 38 | err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, | 37 | err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
341 | 39 | func(data string, stat gozk.Stat) (string, os.Error) { | 38 | func(data string, stat *zk.Stat) (string, os.Error) { |
342 | 40 | c.Assert(data, Equals, "old") | 39 | c.Assert(data, Equals, "old") |
343 | 41 | c.Assert(stat, NotNil) | 40 | c.Assert(stat, NotNil) |
344 | 42 | c.Assert(stat.Version(), Equals, int32(0)) | 41 | c.Assert(stat.Version(), Equals, int32(0)) |
345 | @@ -44,27 +43,26 @@ | |||
346 | 44 | }) | 43 | }) |
347 | 45 | c.Assert(err, IsNil) | 44 | c.Assert(err, IsNil) |
348 | 46 | 45 | ||
350 | 47 | data, stat, err := zk.Get("/test") | 46 | data, stat, err := conn.Get("/test") |
351 | 48 | c.Assert(err, IsNil) | 47 | c.Assert(err, IsNil) |
352 | 49 | c.Assert(stat, NotNil) | 48 | c.Assert(stat, NotNil) |
353 | 50 | c.Assert(stat.Version(), Equals, int32(1)) | 49 | c.Assert(stat.Version(), Equals, int32(1)) |
354 | 51 | c.Assert(data, Equals, "brand new") | 50 | c.Assert(data, Equals, "brand new") |
355 | 52 | 51 | ||
356 | 53 | // ACL was unchanged by RetryChange(). | 52 | // ACL was unchanged by RetryChange(). |
358 | 54 | acl, _, err := zk.ACL("/test") | 53 | acl, _, err := conn.ACL("/test") |
359 | 55 | c.Assert(err, IsNil) | 54 | c.Assert(err, IsNil) |
361 | 56 | c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) | 55 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
362 | 57 | } | 56 | } |
363 | 58 | 57 | ||
364 | 59 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { | 58 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { |
366 | 60 | zk, _ := s.init(c) | 59 | conn, _ := s.init(c) |
367 | 61 | 60 | ||
370 | 62 | _, err := zk.Create("/test", "old", gozk.EPHEMERAL, | 61 | _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
369 | 63 | gozk.WorldACL(gozk.PERM_ALL)) | ||
371 | 64 | c.Assert(err, IsNil) | 62 | c.Assert(err, IsNil) |
372 | 65 | 63 | ||
375 | 66 | err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, | 64 | err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
376 | 67 | func(data string, stat gozk.Stat) (string, os.Error) { | 65 | func(data string, stat *zk.Stat) (string, os.Error) { |
377 | 68 | c.Assert(data, Equals, "old") | 66 | c.Assert(data, Equals, "old") |
378 | 69 | c.Assert(stat, NotNil) | 67 | c.Assert(stat, NotNil) |
379 | 70 | c.Assert(stat.Version(), Equals, int32(0)) | 68 | c.Assert(stat.Version(), Equals, int32(0)) |
380 | @@ -72,7 +70,7 @@ | |||
381 | 72 | }) | 70 | }) |
382 | 73 | c.Assert(err, IsNil) | 71 | c.Assert(err, IsNil) |
383 | 74 | 72 | ||
385 | 75 | data, stat, err := zk.Get("/test") | 73 | data, stat, err := conn.Get("/test") |
386 | 76 | c.Assert(err, IsNil) | 74 | c.Assert(err, IsNil) |
387 | 77 | c.Assert(stat, NotNil) | 75 | c.Assert(stat, NotNil) |
388 | 78 | c.Assert(stat.Version(), Equals, int32(0)) // Unchanged! | 76 | c.Assert(stat.Version(), Equals, int32(0)) // Unchanged! |
389 | @@ -80,14 +78,14 @@ | |||
390 | 80 | } | 78 | } |
391 | 81 | 79 | ||
392 | 82 | func (s *S) TestRetryChangeConflictOnCreate(c *C) { | 80 | func (s *S) TestRetryChangeConflictOnCreate(c *C) { |
394 | 83 | zk, _ := s.init(c) | 81 | conn, _ := s.init(c) |
395 | 84 | 82 | ||
397 | 85 | changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { | 83 | changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
398 | 86 | switch data { | 84 | switch data { |
399 | 87 | case "": | 85 | case "": |
400 | 88 | c.Assert(stat, IsNil) | 86 | c.Assert(stat, IsNil) |
403 | 89 | _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL, | 87 | _, err := conn.Create("/test", "conflict", zk.EPHEMERAL, |
404 | 90 | gozk.WorldACL(gozk.PERM_ALL)) | 88 | zk.WorldACL(zk.PERM_ALL)) |
405 | 91 | c.Assert(err, IsNil) | 89 | c.Assert(err, IsNil) |
406 | 92 | return "<none> => conflict", nil | 90 | return "<none> => conflict", nil |
407 | 93 | case "conflict": | 91 | case "conflict": |
408 | @@ -100,11 +98,10 @@ | |||
409 | 100 | return "can't happen", nil | 98 | return "can't happen", nil |
410 | 101 | } | 99 | } |
411 | 102 | 100 | ||
414 | 103 | err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), | 101 | err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc) |
413 | 104 | changeFunc) | ||
415 | 105 | c.Assert(err, IsNil) | 102 | c.Assert(err, IsNil) |
416 | 106 | 103 | ||
418 | 107 | data, stat, err := zk.Get("/test") | 104 | data, stat, err := conn.Get("/test") |
419 | 108 | c.Assert(err, IsNil) | 105 | c.Assert(err, IsNil) |
420 | 109 | c.Assert(data, Equals, "conflict => new") | 106 | c.Assert(data, Equals, "conflict => new") |
421 | 110 | c.Assert(stat, NotNil) | 107 | c.Assert(stat, NotNil) |
422 | @@ -112,18 +109,17 @@ | |||
423 | 112 | } | 109 | } |
424 | 113 | 110 | ||
425 | 114 | func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) { | 111 | func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) { |
427 | 115 | zk, _ := s.init(c) | 112 | conn, _ := s.init(c) |
428 | 116 | 113 | ||
431 | 117 | _, err := zk.Create("/test", "old", gozk.EPHEMERAL, | 114 | _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
430 | 118 | gozk.WorldACL(gozk.PERM_ALL)) | ||
432 | 119 | c.Assert(err, IsNil) | 115 | c.Assert(err, IsNil) |
433 | 120 | 116 | ||
435 | 121 | changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { | 117 | changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
436 | 122 | switch data { | 118 | switch data { |
437 | 123 | case "old": | 119 | case "old": |
438 | 124 | c.Assert(stat, NotNil) | 120 | c.Assert(stat, NotNil) |
439 | 125 | c.Assert(stat.Version(), Equals, int32(0)) | 121 | c.Assert(stat.Version(), Equals, int32(0)) |
441 | 126 | _, err := zk.Set("/test", "conflict", 0) | 122 | _, err := conn.Set("/test", "conflict", 0) |
442 | 127 | c.Assert(err, IsNil) | 123 | c.Assert(err, IsNil) |
443 | 128 | return "old => new", nil | 124 | return "old => new", nil |
444 | 129 | case "conflict": | 125 | case "conflict": |
445 | @@ -136,10 +132,10 @@ | |||
446 | 136 | return "can't happen", nil | 132 | return "can't happen", nil |
447 | 137 | } | 133 | } |
448 | 138 | 134 | ||
450 | 139 | err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc) | 135 | err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc) |
451 | 140 | c.Assert(err, IsNil) | 136 | c.Assert(err, IsNil) |
452 | 141 | 137 | ||
454 | 142 | data, stat, err := zk.Get("/test") | 138 | data, stat, err := conn.Get("/test") |
455 | 143 | c.Assert(err, IsNil) | 139 | c.Assert(err, IsNil) |
456 | 144 | c.Assert(data, Equals, "conflict => new") | 140 | c.Assert(data, Equals, "conflict => new") |
457 | 145 | c.Assert(stat, NotNil) | 141 | c.Assert(stat, NotNil) |
458 | @@ -147,18 +143,17 @@ | |||
459 | 147 | } | 143 | } |
460 | 148 | 144 | ||
461 | 149 | func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) { | 145 | func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) { |
463 | 150 | zk, _ := s.init(c) | 146 | conn, _ := s.init(c) |
464 | 151 | 147 | ||
467 | 152 | _, err := zk.Create("/test", "old", gozk.EPHEMERAL, | 148 | _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
466 | 153 | gozk.WorldACL(gozk.PERM_ALL)) | ||
468 | 154 | c.Assert(err, IsNil) | 149 | c.Assert(err, IsNil) |
469 | 155 | 150 | ||
471 | 156 | changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { | 151 | changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
472 | 157 | switch data { | 152 | switch data { |
473 | 158 | case "old": | 153 | case "old": |
474 | 159 | c.Assert(stat, NotNil) | 154 | c.Assert(stat, NotNil) |
475 | 160 | c.Assert(stat.Version(), Equals, int32(0)) | 155 | c.Assert(stat.Version(), Equals, int32(0)) |
477 | 161 | err := zk.Delete("/test", 0) | 156 | err := conn.Delete("/test", 0) |
478 | 162 | c.Assert(err, IsNil) | 157 | c.Assert(err, IsNil) |
479 | 163 | return "old => <deleted>", nil | 158 | return "old => <deleted>", nil |
480 | 164 | case "": | 159 | case "": |
481 | @@ -170,55 +165,53 @@ | |||
482 | 170 | return "can't happen", nil | 165 | return "can't happen", nil |
483 | 171 | } | 166 | } |
484 | 172 | 167 | ||
487 | 173 | err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ), | 168 | err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc) |
486 | 174 | changeFunc) | ||
488 | 175 | c.Assert(err, IsNil) | 169 | c.Assert(err, IsNil) |
489 | 176 | 170 | ||
491 | 177 | data, stat, err := zk.Get("/test") | 171 | data, stat, err := conn.Get("/test") |
492 | 178 | c.Assert(err, IsNil) | 172 | c.Assert(err, IsNil) |
493 | 179 | c.Assert(data, Equals, "<deleted> => new") | 173 | c.Assert(data, Equals, "<deleted> => new") |
494 | 180 | c.Assert(stat, NotNil) | 174 | c.Assert(stat, NotNil) |
495 | 181 | c.Assert(stat.Version(), Equals, int32(0)) | 175 | c.Assert(stat.Version(), Equals, int32(0)) |
496 | 182 | 176 | ||
497 | 183 | // Should be the new ACL. | 177 | // Should be the new ACL. |
499 | 184 | acl, _, err := zk.ACL("/test") | 178 | acl, _, err := conn.ACL("/test") |
500 | 185 | c.Assert(err, IsNil) | 179 | c.Assert(err, IsNil) |
502 | 186 | c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) | 180 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
503 | 187 | } | 181 | } |
504 | 188 | 182 | ||
505 | 189 | func (s *S) TestRetryChangeErrorInCallback(c *C) { | 183 | func (s *S) TestRetryChangeErrorInCallback(c *C) { |
507 | 190 | zk, _ := s.init(c) | 184 | conn, _ := s.init(c) |
508 | 191 | 185 | ||
511 | 192 | err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), | 186 | err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
512 | 193 | func(data string, stat gozk.Stat) (string, os.Error) { | 187 | func(data string, stat *zk.Stat) (string, os.Error) { |
513 | 194 | return "don't use this", os.NewError("BOOM!") | 188 | return "don't use this", os.NewError("BOOM!") |
514 | 195 | }) | 189 | }) |
515 | 196 | c.Assert(err, NotNil) | 190 | c.Assert(err, NotNil) |
516 | 197 | c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR) | ||
517 | 198 | c.Assert(err.String(), Equals, "BOOM!") | 191 | c.Assert(err.String(), Equals, "BOOM!") |
518 | 199 | 192 | ||
520 | 200 | stat, err := zk.Exists("/test") | 193 | stat, err := conn.Exists("/test") |
521 | 201 | c.Assert(err, IsNil) | 194 | c.Assert(err, IsNil) |
522 | 202 | c.Assert(stat, IsNil) | 195 | c.Assert(stat, IsNil) |
523 | 203 | } | 196 | } |
524 | 204 | 197 | ||
525 | 205 | func (s *S) TestRetryChangeFailsReading(c *C) { | 198 | func (s *S) TestRetryChangeFailsReading(c *C) { |
527 | 206 | zk, _ := s.init(c) | 199 | conn, _ := s.init(c) |
528 | 207 | 200 | ||
531 | 208 | _, err := zk.Create("/test", "old", gozk.EPHEMERAL, | 201 | // Write only! |
532 | 209 | gozk.WorldACL(gozk.PERM_WRITE)) // Write only! | 202 | _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE)) |
533 | 210 | c.Assert(err, IsNil) | 203 | c.Assert(err, IsNil) |
534 | 211 | 204 | ||
535 | 212 | var called bool | 205 | var called bool |
538 | 213 | err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), | 206 | err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
539 | 214 | func(data string, stat gozk.Stat) (string, os.Error) { | 207 | func(data string, stat *zk.Stat) (string, os.Error) { |
540 | 215 | called = true | 208 | called = true |
541 | 216 | return "", nil | 209 | return "", nil |
542 | 217 | }) | 210 | }) |
543 | 218 | c.Assert(err, NotNil) | 211 | c.Assert(err, NotNil) |
545 | 219 | c.Assert(err.Code(), Equals, gozk.ZNOAUTH) | 212 | c.Assert(err, Equals, zk.ZNOAUTH) |
546 | 220 | 213 | ||
548 | 221 | stat, err := zk.Exists("/test") | 214 | stat, err := conn.Exists("/test") |
549 | 222 | c.Assert(err, IsNil) | 215 | c.Assert(err, IsNil) |
550 | 223 | c.Assert(stat, NotNil) | 216 | c.Assert(stat, NotNil) |
551 | 224 | c.Assert(stat.Version(), Equals, int32(0)) | 217 | c.Assert(stat.Version(), Equals, int32(0)) |
552 | @@ -227,22 +220,21 @@ | |||
553 | 227 | } | 220 | } |
554 | 228 | 221 | ||
555 | 229 | func (s *S) TestRetryChangeFailsSetting(c *C) { | 222 | func (s *S) TestRetryChangeFailsSetting(c *C) { |
557 | 230 | zk, _ := s.init(c) | 223 | conn, _ := s.init(c) |
558 | 231 | 224 | ||
561 | 232 | _, err := zk.Create("/test", "old", gozk.EPHEMERAL, | 225 | // Read only! |
562 | 233 | gozk.WorldACL(gozk.PERM_READ)) // Read only! | 226 | _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
563 | 234 | c.Assert(err, IsNil) | 227 | c.Assert(err, IsNil) |
564 | 235 | 228 | ||
565 | 236 | var called bool | 229 | var called bool |
568 | 237 | err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), | 230 | err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
569 | 238 | func(data string, stat gozk.Stat) (string, os.Error) { | 231 | func(data string, stat *zk.Stat) (string, os.Error) { |
570 | 239 | called = true | 232 | called = true |
571 | 240 | return "", nil | 233 | return "", nil |
572 | 241 | }) | 234 | }) |
575 | 242 | c.Assert(err, NotNil) | 235 | c.Assert(err, Equals, zk.ZNOAUTH) |
574 | 243 | c.Assert(err.Code(), Equals, gozk.ZNOAUTH) | ||
576 | 244 | 236 | ||
578 | 245 | stat, err := zk.Exists("/test") | 237 | stat, err := conn.Exists("/test") |
579 | 246 | c.Assert(err, IsNil) | 238 | c.Assert(err, IsNil) |
580 | 247 | c.Assert(stat, NotNil) | 239 | c.Assert(stat, NotNil) |
581 | 248 | c.Assert(stat.Version(), Equals, int32(0)) | 240 | c.Assert(stat.Version(), Equals, int32(0)) |
582 | @@ -251,23 +243,22 @@ | |||
583 | 251 | } | 243 | } |
584 | 252 | 244 | ||
585 | 253 | func (s *S) TestRetryChangeFailsCreating(c *C) { | 245 | func (s *S) TestRetryChangeFailsCreating(c *C) { |
587 | 254 | zk, _ := s.init(c) | 246 | conn, _ := s.init(c) |
588 | 255 | 247 | ||
591 | 256 | _, err := zk.Create("/test", "old", gozk.EPHEMERAL, | 248 | // Read only! |
592 | 257 | gozk.WorldACL(gozk.PERM_READ)) // Read only! | 249 | _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
593 | 258 | c.Assert(err, IsNil) | 250 | c.Assert(err, IsNil) |
594 | 259 | 251 | ||
595 | 260 | var called bool | 252 | var called bool |
599 | 261 | err = zk.RetryChange("/test/sub", gozk.EPHEMERAL, | 253 | err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
600 | 262 | gozk.WorldACL(gozk.PERM_ALL), | 254 | func(data string, stat *zk.Stat) (string, os.Error) { |
598 | 263 | func(data string, stat gozk.Stat) (string, os.Error) { | ||
601 | 264 | called = true | 255 | called = true |
602 | 265 | return "", nil | 256 | return "", nil |
603 | 266 | }) | 257 | }) |
604 | 267 | c.Assert(err, NotNil) | 258 | c.Assert(err, NotNil) |
606 | 268 | c.Assert(err.Code(), Equals, gozk.ZNOAUTH) | 259 | c.Assert(err, Equals, zk.ZNOAUTH) |
607 | 269 | 260 | ||
609 | 270 | stat, err := zk.Exists("/test/sub") | 261 | stat, err := conn.Exists("/test/sub") |
610 | 271 | c.Assert(err, IsNil) | 262 | c.Assert(err, IsNil) |
611 | 272 | c.Assert(stat, IsNil) | 263 | c.Assert(stat, IsNil) |
612 | 273 | 264 | ||
613 | 274 | 265 | ||
614 | === added file 'server.go' | |||
615 | --- server.go 1970-01-01 00:00:00 +0000 | |||
616 | +++ server.go 2011-10-03 17:02:23 +0000 | |||
617 | @@ -0,0 +1,239 @@ | |||
618 | 1 | package zk | ||
619 | 2 | |||
620 | 3 | import ( | ||
621 | 4 | "bufio" | ||
622 | 5 | "bytes" | ||
623 | 6 | "fmt" | ||
624 | 7 | "io/ioutil" | ||
625 | 8 | "net" | ||
626 | 9 | "os" | ||
627 | 10 | "path/filepath" | ||
628 | 11 | "strings" | ||
629 | 12 | ) | ||
630 | 13 | |||
631 | 14 | // Server represents a ZooKeeper server, its data and configuration files. | ||
632 | 15 | type Server struct { | ||
633 | 16 | runDir string | ||
634 | 17 | installDir string | ||
635 | 18 | } | ||
636 | 19 | |||
637 | 20 | func (srv *Server) Directory() string { | ||
638 | 21 | return srv.runDir | ||
639 | 22 | } | ||
640 | 23 | |||
641 | 24 | // CreateServer creates the directory runDir and sets up a ZooKeeper server | ||
642 | 25 | // environment inside it. It is an error if runDir already exists. | ||
643 | 26 | // The server will listen on the specified TCP port. | ||
644 | 27 | // | ||
645 | 28 | // The ZooKeeper installation directory is specified by installDir. | ||
646 | 29 | // If this is empty, a system default will be used. | ||
647 | 30 | // | ||
648 | 31 | // CreateServer does not start the server - the *Server that | ||
649 | 32 | // is returned can be used with the service package, for example: | ||
650 | 33 | // see launchpad.net/gozk/zk/service. | ||
651 | 34 | func CreateServer(port int, runDir, installDir string) (*Server, os.Error) { | ||
652 | 35 | if err := os.Mkdir(runDir, 0777); err != nil { | ||
653 | 36 | return nil, err | ||
654 | 37 | } | ||
655 | 38 | srv := &Server{runDir: runDir, installDir: installDir} | ||
656 | 39 | if err := srv.writeLog4JConfig(); err != nil { | ||
657 | 40 | return nil, err | ||
658 | 41 | } | ||
659 | 42 | if err := srv.writeZooKeeperConfig(port); err != nil { | ||
660 | 43 | return nil, err | ||
661 | 44 | } | ||
662 | 45 | if err := srv.writeInstallDir(); err != nil { | ||
663 | 46 | return nil, err | ||
664 | 47 | } | ||
665 | 48 | return srv, nil | ||
666 | 49 | } | ||
667 | 50 | |||
668 | 51 | // AttachServer creates a new ZooKeeper Server instance | ||
669 | 52 | // to operate inside an existing run directory, runDir. | ||
670 | 53 | // The directory must have been created with CreateServer. | ||
671 | 54 | func AttachServer(runDir string) (*Server, os.Error) { | ||
672 | 55 | srv := &Server{runDir: runDir} | ||
673 | 56 | if err := srv.readInstallDir(); err != nil { | ||
674 | 57 | return nil, fmt.Errorf("cannot read server install directory: %v", err) | ||
675 | 58 | } | ||
676 | 59 | return srv, nil | ||
677 | 60 | } | ||
678 | 61 | |||
679 | 62 | func (srv *Server) path(name string) string { | ||
680 | 63 | return filepath.Join(srv.runDir, name) | ||
681 | 64 | } | ||
682 | 65 | |||
683 | 66 | func (srv *Server) CheckAvailability() os.Error { | ||
684 | 67 | port, err := srv.NetworkPort() | ||
685 | 68 | if err != nil { | ||
686 | 69 | return fmt.Errorf("cannot get network port: %v", err) | ||
687 | 70 | } | ||
688 | 71 | l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) | ||
689 | 72 | if err != nil { | ||
690 | 73 | return fmt.Errorf("cannot listen on port %v: %v", port, err) | ||
691 | 74 | } | ||
692 | 75 | l.Close() | ||
693 | 76 | return nil | ||
694 | 77 | } | ||
695 | 78 | |||
696 | 79 | // ServerCommand returns the command used to start the | ||
697 | 80 | // ZooKeeper server. It is provided for debugging and testing | ||
698 | 81 | // purposes only. | ||
699 | 82 | func (srv *Server) Command() ([]string, os.Error) { | ||
700 | 83 | cp, err := srv.classPath() | ||
701 | 84 | if err != nil { | ||
702 | 85 | return nil, fmt.Errorf("cannot get class path: %v", err) | ||
703 | 86 | } | ||
704 | 87 | return []string{ | ||
705 | 88 | "java", | ||
706 | 89 | "-cp", strings.Join(cp, ":"), | ||
707 | 90 | "-Dzookeeper.root.logger=INFO,CONSOLE", | ||
708 | 91 | "-Dlog4j.configuration=file:"+srv.path("log4j.properties"), | ||
709 | 92 | "org.apache.zookeeper.server.quorum.QuorumPeerMain", | ||
710 | 93 | srv.path("zoo.cfg"), | ||
711 | 94 | }, nil | ||
712 | 95 | } | ||
713 | 96 | |||
714 | 97 | var log4jProperties = ` | ||
715 | 98 | log4j.rootLogger=INFO, CONSOLE | ||
716 | 99 | log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender | ||
717 | 100 | log4j.appender.CONSOLE.Threshold=INFO | ||
718 | 101 | log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout | ||
719 | 102 | log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n | ||
720 | 103 | ` | ||
721 | 104 | |||
722 | 105 | func (srv *Server) writeLog4JConfig() (err os.Error) { | ||
723 | 106 | return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666) | ||
724 | 107 | } | ||
725 | 108 | |||
726 | 109 | func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) { | ||
727 | 110 | return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf( | ||
728 | 111 | "tickTime=2000\n"+ | ||
729 | 112 | "dataDir=%s\n"+ | ||
730 | 113 | "clientPort=%d\n"+ | ||
731 | 114 | "maxClientCnxns=500\n", | ||
732 | 115 | srv.runDir, port)), 0666) | ||
733 | 116 | } | ||
734 | 117 | |||
735 | 118 | // NetworkPort returns the TCP port number that | ||
736 | 119 | // the server is configured for. | ||
737 | 120 | func (srv *Server) NetworkPort() (int, os.Error) { | ||
738 | 121 | f, err := os.Open(srv.path("zoo.cfg")) | ||
739 | 122 | if err != nil { | ||
740 | 123 | return 0, err | ||
741 | 124 | } | ||
742 | 125 | r := bufio.NewReader(f) | ||
743 | 126 | for { | ||
744 | 127 | line, err := r.ReadSlice('\n') | ||
745 | 128 | if err != nil { | ||
746 | 129 | return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg")) | ||
747 | 130 | } | ||
748 | 131 | var port int | ||
749 | 132 | if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 { | ||
750 | 133 | return port, nil | ||
751 | 134 | } | ||
752 | 135 | } | ||
753 | 136 | panic("not reached") | ||
754 | 137 | } | ||
755 | 138 | |||
756 | 139 | func (srv *Server) writeInstallDir() os.Error { | ||
757 | 140 | return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666) | ||
758 | 141 | } | ||
759 | 142 | |||
760 | 143 | func (srv *Server) readInstallDir() os.Error { | ||
761 | 144 | data, err := ioutil.ReadFile(srv.path("installdir.txt")) | ||
762 | 145 | if err != nil { | ||
763 | 146 | return err | ||
764 | 147 | } | ||
765 | 148 | if data[len(data)-1] == '\n' { | ||
766 | 149 | data = data[0 : len(data)-1] | ||
767 | 150 | } | ||
768 | 151 | srv.installDir = string(data) | ||
769 | 152 | return nil | ||
770 | 153 | } | ||
771 | 154 | |||
772 | 155 | func (srv *Server) classPath() ([]string, os.Error) { | ||
773 | 156 | dir := srv.installDir | ||
774 | 157 | if dir == "" { | ||
775 | 158 | return systemClassPath() | ||
776 | 159 | } | ||
777 | 160 | if err := checkDirectory(dir); err != nil { | ||
778 | 161 | return nil, err | ||
779 | 162 | } | ||
780 | 163 | // Two possibilities, as seen in zkEnv.sh: | ||
781 | 164 | // 1) locally built binaries (jars are in build directory) | ||
782 | 165 | // 2) release binaries | ||
783 | 166 | if build := filepath.Join(dir, "build"); checkDirectory(build) == nil { | ||
784 | 167 | dir = build | ||
785 | 168 | } | ||
786 | 169 | classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar")) | ||
787 | 170 | if err != nil { | ||
788 | 171 | panic(fmt.Errorf("glob for jar files: %v", err)) | ||
789 | 172 | } | ||
790 | 173 | more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar")) | ||
791 | 174 | if err != nil { | ||
792 | 175 | panic(fmt.Errorf("glob for lib jar files: %v", err)) | ||
793 | 176 | } | ||
794 | 177 | |||
795 | 178 | classPath = append(classPath, more...) | ||
796 | 179 | if len(classPath) == 0 { | ||
797 | 180 | return nil, fmt.Errorf("zookeeper libraries not found in %q", dir) | ||
798 | 181 | } | ||
799 | 182 | return classPath, nil | ||
800 | 183 | } | ||
801 | 184 | |||
802 | 185 | const zookeeperEnviron = "/etc/zookeeper/conf/environment" | ||
803 | 186 | |||
804 | 187 | func systemClassPath() ([]string, os.Error) { | ||
805 | 188 | f, err := os.Open(zookeeperEnviron) | ||
806 | 189 | if f == nil { | ||
807 | 190 | return nil, err | ||
808 | 191 | } | ||
809 | 192 | r := bufio.NewReader(f) | ||
810 | 193 | for { | ||
811 | 194 | line, err := r.ReadSlice('\n') | ||
812 | 195 | if err != nil { | ||
813 | 196 | break | ||
814 | 197 | } | ||
815 | 198 | if !bytes.HasPrefix(line, []byte("CLASSPATH=")) { | ||
816 | 199 | continue | ||
817 | 200 | } | ||
818 | 201 | |||
819 | 202 | // remove variable and newline | ||
820 | 203 | path := string(line[len("CLASSPATH=") : len(line)-1]) | ||
821 | 204 | |||
822 | 205 | // trim white space | ||
823 | 206 | path = strings.Trim(path, " \t\r") | ||
824 | 207 | |||
825 | 208 | // strip quotes | ||
826 | 209 | if path[0] == '"' { | ||
827 | 210 | path = path[1 : len(path)-1] | ||
828 | 211 | } | ||
829 | 212 | |||
830 | 213 | // split on : | ||
831 | 214 | classPath := strings.Split(path, ":") | ||
832 | 215 | |||
833 | 216 | // split off $ZOOCFGDIR | ||
834 | 217 | if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" { | ||
835 | 218 | classPath = classPath[1:] | ||
836 | 219 | } | ||
837 | 220 | |||
838 | 221 | if len(classPath) == 0 { | ||
839 | 222 | return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron) | ||
840 | 223 | } | ||
841 | 224 | return classPath, nil | ||
842 | 225 | } | ||
843 | 226 | return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron) | ||
844 | 227 | } | ||
845 | 228 | |||
846 | 229 | // checkDirectory returns an error if the given path | ||
847 | 230 | // does not exist or is not a directory. | ||
848 | 231 | func checkDirectory(path string) os.Error { | ||
849 | 232 | if info, err := os.Stat(path); err != nil || !info.IsDirectory() { | ||
850 | 233 | if err == nil { | ||
851 | 234 | err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")} | ||
852 | 235 | } | ||
853 | 236 | return err | ||
854 | 237 | } | ||
855 | 238 | return nil | ||
856 | 239 | } | ||
857 | 0 | 240 | ||
858 | === added directory 'service' | |||
859 | === added file 'service/Makefile' | |||
860 | --- service/Makefile 1970-01-01 00:00:00 +0000 | |||
861 | +++ service/Makefile 2011-10-03 17:02:23 +0000 | |||
862 | @@ -0,0 +1,20 @@ | |||
863 | 1 | include $(GOROOT)/src/Make.inc | ||
864 | 2 | |||
865 | 3 | TARG=launchpad.net/gozk/zk/service | ||
866 | 4 | |||
867 | 5 | GOFILES=\ | ||
868 | 6 | service.go\ | ||
869 | 7 | |||
870 | 8 | GOFMT=gofmt | ||
871 | 9 | BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go)) | ||
872 | 10 | |||
873 | 11 | gofmt: $(BADFMT) | ||
874 | 12 | @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done | ||
875 | 13 | |||
876 | 14 | ifneq ($(BADFMT),) | ||
877 | 15 | ifneq ($(MAKECMDGOALS),gofmt) | ||
878 | 16 | $(warning WARNING: make gofmt: $(BADFMT)) | ||
879 | 17 | endif | ||
880 | 18 | endif | ||
881 | 19 | |||
882 | 20 | include $(GOROOT)/src/Make.pkg | ||
883 | 0 | 21 | ||
884 | === added file 'service/service.go' | |||
885 | --- service/service.go 1970-01-01 00:00:00 +0000 | |||
886 | +++ service/service.go 2011-10-03 17:02:23 +0000 | |||
887 | @@ -0,0 +1,160 @@ | |||
888 | 1 | // The service package provides support for long-running services. | ||
889 | 2 | package service | ||
890 | 3 | |||
891 | 4 | import ( | ||
892 | 5 | "os" | ||
893 | 6 | "fmt" | ||
894 | 7 | "io/ioutil" | ||
895 | 8 | "strconv" | ||
896 | 9 | "path/filepath" | ||
897 | 10 | "exec" | ||
898 | 11 | "strings" | ||
899 | 12 | "time" | ||
900 | 13 | ) | ||
901 | 14 | |||
902 | 15 | // Interface represents a single-process server. | ||
903 | 16 | type Interface interface { | ||
904 | 17 | // Directory gives the path to the directory containing | ||
905 | 18 | // the server's configuration and data files. It is assumed that | ||
906 | 19 | // this exists and can be modified. | ||
907 | 20 | Directory() string | ||
908 | 21 | |||
909 | 22 | // CheckAvailability should return an error if resources | ||
910 | 23 | // required by the server are not available; for instance | ||
911 | 24 | // if the server requires a network port which is not free. | ||
912 | 25 | CheckAvailability() os.Error | ||
913 | 26 | |||
914 | 27 | // Command should return a command that can be | ||
915 | 28 | // used to run the server. The first element of the returned | ||
916 | 29 | // slice is the executable name; the rest of the elements are | ||
917 | 30 | // its arguments. | ||
918 | 31 | Command() ([]string, os.Error) | ||
919 | 32 | } | ||
920 | 33 | |||
921 | 34 | // Service represents a possibly running server process. | ||
922 | 35 | type Service struct { | ||
923 | 36 | srv Interface | ||
924 | 37 | } | ||
925 | 38 | |||
926 | 39 | // New returns a new Service using the given | ||
927 | 40 | // server interface. | ||
928 | 41 | func New(srv Interface) *Service { | ||
929 | 42 | return &Service{srv} | ||
930 | 43 | } | ||
931 | 44 | |||
932 | 45 | // Process returns a reference to the identity | ||
933 | 46 | // of the last running server in the Service's directory. | ||
934 | 47 | // If the server was shut down, it returns (nil, nil). | ||
935 | 48 | // The server process may not still be running | ||
936 | 49 | // (for instance if it was terminated abnormally). | ||
937 | 50 | // This function is provided for debugging and testing | ||
938 | 51 | // purposes only. | ||
939 | 52 | func (s *Service) Process() (*os.Process, os.Error) { | ||
940 | 53 | data, err := ioutil.ReadFile(s.path("pid.txt")) | ||
941 | 54 | if err != nil { | ||
942 | 55 | if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT { | ||
943 | 56 | return nil, nil | ||
944 | 57 | } | ||
945 | 58 | return nil, err | ||
946 | 59 | } | ||
947 | 60 | pid, err := strconv.Atoi(string(data)) | ||
948 | 61 | if err != nil { | ||
949 | 62 | return nil, os.NewError("bad process id found in pid.txt") | ||
950 | 63 | } | ||
951 | 64 | return os.FindProcess(pid) | ||
952 | 65 | } | ||
953 | 66 | |||
954 | 67 | func (s *Service) path(name string) string { | ||
955 | 68 | return filepath.Join(s.srv.Directory(), name) | ||
956 | 69 | } | ||
957 | 70 | |||
958 | 71 | // Start starts the server. It returns an error if the server is already running. | ||
959 | 72 | // It stores the process ID of the server in the file "pid.txt" | ||
960 | 73 | // inside the server's directory. Stdout and stderr of the server | ||
961 | 74 | // are similarly written to "log.txt". | ||
962 | 75 | func (s *Service) Start() os.Error { | ||
963 | 76 | if err := s.srv.CheckAvailability(); err != nil { | ||
964 | 77 | return err | ||
965 | 78 | } | ||
966 | 79 | p, err := s.Process() | ||
967 | 80 | if p != nil || err != nil { | ||
968 | 81 | if p != nil { | ||
969 | 82 | p.Release() | ||
970 | 83 | } | ||
971 | 84 | return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt")) | ||
972 | 85 | } | ||
973 | 86 | |||
974 | 87 | // create the pid file before starting the process so that if we get two | ||
975 | 88 | // programs trying to concurrently start a server on the same directory | ||
976 | 89 | // at the same time, only one should succeed. | ||
977 | 90 | pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666) | ||
978 | 91 | if err != nil { | ||
979 | 92 | return fmt.Errorf("cannot create pid.txt: %v", err) | ||
980 | 93 | } | ||
981 | 94 | defer pidf.Close() | ||
982 | 95 | |||
983 | 96 | args, err := s.srv.Command() | ||
984 | 97 | if err != nil { | ||
985 | 98 | return fmt.Errorf("cannot determine command: %v", err) | ||
986 | 99 | } | ||
987 | 100 | cmd := exec.Command(args[0], args[1:]...) | ||
988 | 101 | |||
989 | 102 | logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) | ||
990 | 103 | if err != nil { | ||
991 | 104 | return fmt.Errorf("cannot create log file: %v", err) | ||
992 | 105 | } | ||
993 | 106 | defer logf.Close() | ||
994 | 107 | cmd.Stdout = logf | ||
995 | 108 | cmd.Stderr = logf | ||
996 | 109 | if err := cmd.Start(); err != nil { | ||
997 | 110 | return fmt.Errorf("cannot start server: %v", err) | ||
998 | 111 | } | ||
999 | 112 | if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil { | ||
1000 | 113 | return fmt.Errorf("cannot write pid file: %v", err) | ||
1001 | 114 | } | ||
1002 | 115 | return nil | ||
1003 | 116 | } | ||
1004 | 117 | |||
1005 | 118 | // Stop kills the server. It is a no-op if it is already running. | ||
1006 | 119 | func (s *Service) Stop() os.Error { | ||
1007 | 120 | p, err := s.Process() | ||
1008 | 121 | if p == nil { | ||
1009 | 122 | if err != nil { | ||
1010 | 123 | return fmt.Errorf("cannot read process ID of server: %v", err) | ||
1011 | 124 | } | ||
1012 | 125 | return nil | ||
1013 | 126 | } | ||
1014 | 127 | defer p.Release() | ||
1015 | 128 | if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") { | ||
1016 | 129 | return fmt.Errorf("cannot kill server process: %v", err) | ||
1017 | 130 | } | ||
1018 | 131 | // ignore the error returned from Wait because there's little | ||
1019 | 132 | // we can do about it - it either means that the process has just exited | ||
1020 | 133 | // anyway or that we can't wait for it for some other reason, | ||
1021 | 134 | // for example because it was originally started by some other process. | ||
1022 | 135 | _, err = p.Wait(0) | ||
1023 | 136 | if err != nil && strings.Contains(err.String(), "no child processes") { | ||
1024 | 137 | // If we can't wait for the server, it's possible that it was running | ||
1025 | 138 | // but not as a child of this process, so give it a little while | ||
1026 | 139 | // to exit. TODO poll with kill(no signal)? | ||
1027 | 140 | time.Sleep(0.5e9) | ||
1028 | 141 | } | ||
1029 | 142 | |||
1030 | 143 | if err := os.Remove(s.path("pid.txt")); err != nil { | ||
1031 | 144 | return fmt.Errorf("cannot remove server process ID file: %v", err) | ||
1032 | 145 | } | ||
1033 | 146 | return nil | ||
1034 | 147 | } | ||
1035 | 148 | |||
1036 | 149 | // Destroy stops the server, and then removes its | ||
1037 | 150 | // directory and all its contents. | ||
1038 | 151 | // Warning: this will destroy all data associated with the server. | ||
1039 | 152 | func (s *Service) Destroy() os.Error { | ||
1040 | 153 | if err := s.Stop(); err != nil { | ||
1041 | 154 | return err | ||
1042 | 155 | } | ||
1043 | 156 | if err := os.RemoveAll(s.srv.Directory()); err != nil { | ||
1044 | 157 | return err | ||
1045 | 158 | } | ||
1046 | 159 | return nil | ||
1047 | 160 | } | ||
1048 | 0 | 161 | ||
1049 | === modified file 'suite_test.go' | |||
1050 | --- suite_test.go 2011-08-19 01:43:37 +0000 | |||
1051 | +++ suite_test.go 2011-10-03 17:02:23 +0000 | |||
1052 | @@ -1,65 +1,48 @@ | |||
1054 | 1 | package gozk_test | 1 | package zk_test |
1055 | 2 | 2 | ||
1056 | 3 | import ( | 3 | import ( |
1057 | 4 | . "launchpad.net/gocheck" | 4 | . "launchpad.net/gocheck" |
1058 | 5 | "testing" | ||
1059 | 6 | "io/ioutil" | ||
1060 | 7 | "path" | ||
1061 | 8 | "fmt" | 5 | "fmt" |
1062 | 6 | "launchpad.net/gozk/zk/service" | ||
1063 | 7 | "launchpad.net/gozk/zk" | ||
1064 | 9 | "os" | 8 | "os" |
1066 | 10 | "gozk" | 9 | "testing" |
1067 | 11 | "time" | 10 | "time" |
1068 | 12 | ) | 11 | ) |
1069 | 13 | 12 | ||
1070 | 14 | func TestAll(t *testing.T) { | 13 | func TestAll(t *testing.T) { |
1072 | 15 | TestingT(t) | 14 | if !*reattach { |
1073 | 15 | TestingT(t) | ||
1074 | 16 | } | ||
1075 | 16 | } | 17 | } |
1076 | 17 | 18 | ||
1077 | 18 | var _ = Suite(&S{}) | 19 | var _ = Suite(&S{}) |
1078 | 19 | 20 | ||
1079 | 20 | type S struct { | 21 | type S struct { |
1086 | 21 | zkRoot string | 22 | zkServer *service.Service |
1087 | 22 | zkTestRoot string | 23 | zkTestRoot string |
1088 | 23 | zkTestPort int | 24 | zkAddr string |
1083 | 24 | zkServerSh string | ||
1084 | 25 | zkServerOut *os.File | ||
1085 | 26 | zkAddr string | ||
1089 | 27 | 25 | ||
1092 | 28 | handles []*gozk.ZooKeeper | 26 | handles []*zk.Conn |
1093 | 29 | events []*gozk.Event | 27 | events []*zk.Event |
1094 | 30 | liveWatches int | 28 | liveWatches int |
1095 | 31 | deadWatches chan bool | 29 | deadWatches chan bool |
1096 | 32 | } | 30 | } |
1097 | 33 | 31 | ||
1117 | 34 | var logLevel = 0 //gozk.LOG_ERROR | 32 | var logLevel = 0 //zk.LOG_ERROR |
1118 | 35 | 33 | ||
1119 | 36 | 34 | func (s *S) init(c *C) (*zk.Conn, chan zk.Event) { | |
1120 | 37 | var testZooCfg = ("dataDir=%s\n" + | 35 | conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
1102 | 38 | "clientPort=%d\n" + | ||
1103 | 39 | "tickTime=2000\n" + | ||
1104 | 40 | "initLimit=10\n" + | ||
1105 | 41 | "syncLimit=5\n" + | ||
1106 | 42 | "") | ||
1107 | 43 | |||
1108 | 44 | var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" + | ||
1109 | 45 | "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + | ||
1110 | 46 | "log4j.appender.CONSOLE.Threshold=DEBUG\n" + | ||
1111 | 47 | "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + | ||
1112 | 48 | "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" + | ||
1113 | 49 | "") | ||
1114 | 50 | |||
1115 | 51 | func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) { | ||
1116 | 52 | zk, watch, err := gozk.Init(s.zkAddr, 5e9) | ||
1121 | 53 | c.Assert(err, IsNil) | 36 | c.Assert(err, IsNil) |
1122 | 54 | 37 | ||
1124 | 55 | s.handles = append(s.handles, zk) | 38 | s.handles = append(s.handles, conn) |
1125 | 56 | 39 | ||
1126 | 57 | event := <-watch | 40 | event := <-watch |
1127 | 58 | 41 | ||
1130 | 59 | c.Assert(event.Type, Equals, gozk.EVENT_SESSION) | 42 | c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
1131 | 60 | c.Assert(event.State, Equals, gozk.STATE_CONNECTED) | 43 | c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
1132 | 61 | 44 | ||
1134 | 62 | bufferedWatch := make(chan gozk.Event, 256) | 45 | bufferedWatch := make(chan zk.Event, 256) |
1135 | 63 | bufferedWatch <- event | 46 | bufferedWatch <- event |
1136 | 64 | 47 | ||
1137 | 65 | s.liveWatches += 1 | 48 | s.liveWatches += 1 |
1138 | @@ -82,13 +65,13 @@ | |||
1139 | 82 | s.deadWatches <- true | 65 | s.deadWatches <- true |
1140 | 83 | }() | 66 | }() |
1141 | 84 | 67 | ||
1143 | 85 | return zk, bufferedWatch | 68 | return conn, bufferedWatch |
1144 | 86 | } | 69 | } |
1145 | 87 | 70 | ||
1146 | 88 | func (s *S) SetUpTest(c *C) { | 71 | func (s *S) SetUpTest(c *C) { |
1148 | 89 | c.Assert(gozk.CountPendingWatches(), Equals, 0, | 72 | c.Assert(zk.CountPendingWatches(), Equals, 0, |
1149 | 90 | Bug("Test got a dirty watch state before running!")) | 73 | Bug("Test got a dirty watch state before running!")) |
1151 | 91 | gozk.SetLogLevel(logLevel) | 74 | zk.SetLogLevel(logLevel) |
1152 | 92 | } | 75 | } |
1153 | 93 | 76 | ||
1154 | 94 | func (s *S) TearDownTest(c *C) { | 77 | func (s *S) TearDownTest(c *C) { |
1155 | @@ -108,98 +91,38 @@ | |||
1156 | 108 | } | 91 | } |
1157 | 109 | 92 | ||
1158 | 110 | // Reset the list of handles. | 93 | // Reset the list of handles. |
1160 | 111 | s.handles = make([]*gozk.ZooKeeper, 0) | 94 | s.handles = make([]*zk.Conn, 0) |
1161 | 112 | 95 | ||
1163 | 113 | c.Assert(gozk.CountPendingWatches(), Equals, 0, | 96 | c.Assert(zk.CountPendingWatches(), Equals, 0, |
1164 | 114 | Bug("Test left live watches behind!")) | 97 | Bug("Test left live watches behind!")) |
1165 | 115 | } | 98 | } |
1166 | 116 | 99 | ||
1168 | 117 | // We use the suite set up and tear down to manage a custom zookeeper | 100 | // We use the suite set up and tear down to manage a custom ZooKeeper |
1169 | 118 | // | 101 | // |
1170 | 119 | func (s *S) SetUpSuite(c *C) { | 102 | func (s *S) SetUpSuite(c *C) { |
1172 | 120 | 103 | var err os.Error | |
1173 | 121 | s.deadWatches = make(chan bool) | 104 | s.deadWatches = make(chan bool) |
1174 | 122 | 105 | ||
1221 | 123 | var err os.Error | 106 | // N.B. We meed to create a subdirectory because zk.CreateServer |
1222 | 124 | 107 | // insists on creating its own directory. | |
1223 | 125 | s.zkRoot = os.Getenv("ZKROOT") | 108 | s.zkTestRoot = c.MkDir() + "/zk" |
1224 | 126 | if s.zkRoot == "" { | 109 | |
1225 | 127 | panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)") | 110 | port := 21812 |
1226 | 128 | } | 111 | s.zkAddr = fmt.Sprint("localhost:", port) |
1227 | 129 | 112 | ||
1228 | 130 | s.zkTestRoot = c.MkDir() | 113 | srv, err := zk.CreateServer(port, s.zkTestRoot, "") |
1229 | 131 | s.zkTestPort = 21812 | 114 | if err != nil { |
1230 | 132 | 115 | c.Fatal("Cannot set up server environment: ", err) | |
1231 | 133 | println("ZooKeeper test server directory:", s.zkTestRoot) | 116 | } |
1232 | 134 | println("ZooKeeper test server port:", s.zkTestPort) | 117 | s.zkServer = service.New(srv) |
1233 | 135 | 118 | err = s.zkServer.Start() | |
1234 | 136 | s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort) | 119 | if err != nil { |
1235 | 137 | 120 | c.Fatal("Cannot start ZooKeeper server: ", err) | |
1236 | 138 | s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh") | 121 | } |
1191 | 139 | s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"), | ||
1192 | 140 | os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) | ||
1193 | 141 | if err != nil { | ||
1194 | 142 | panic("Can't open stdout.txt file for server: " + err.String()) | ||
1195 | 143 | } | ||
1196 | 144 | |||
1197 | 145 | dataDir := path.Join(s.zkTestRoot, "data") | ||
1198 | 146 | confDir := path.Join(s.zkTestRoot, "conf") | ||
1199 | 147 | |||
1200 | 148 | os.Mkdir(dataDir, 0755) | ||
1201 | 149 | os.Mkdir(confDir, 0755) | ||
1202 | 150 | |||
1203 | 151 | err = os.Setenv("ZOOCFGDIR", confDir) | ||
1204 | 152 | if err != nil { | ||
1205 | 153 | panic("Can't set $ZOOCFGDIR: " + err.String()) | ||
1206 | 154 | } | ||
1207 | 155 | |||
1208 | 156 | zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort)) | ||
1209 | 157 | err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644) | ||
1210 | 158 | if err != nil { | ||
1211 | 159 | panic("Can't write zoo.cfg: " + err.String()) | ||
1212 | 160 | } | ||
1213 | 161 | |||
1214 | 162 | log4jPrp := []byte(testLog4jPrp) | ||
1215 | 163 | err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644) | ||
1216 | 164 | if err != nil { | ||
1217 | 165 | panic("Can't write log4j.properties: " + err.String()) | ||
1218 | 166 | } | ||
1219 | 167 | |||
1220 | 168 | s.StartZK() | ||
1237 | 169 | } | 122 | } |
1238 | 170 | 123 | ||
1239 | 171 | func (s *S) TearDownSuite(c *C) { | 124 | func (s *S) TearDownSuite(c *C) { |
1272 | 172 | s.StopZK() | 125 | if s.zkServer != nil { |
1273 | 173 | s.zkServerOut.Close() | 126 | s.zkServer.Stop() // TODO Destroy |
1242 | 174 | } | ||
1243 | 175 | |||
1244 | 176 | func (s *S) StartZK() { | ||
1245 | 177 | attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} | ||
1246 | 178 | proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr) | ||
1247 | 179 | if err != nil { | ||
1248 | 180 | panic("Problem executing zkServer.sh start: " + err.String()) | ||
1249 | 181 | } | ||
1250 | 182 | |||
1251 | 183 | result, err := proc.Wait(0) | ||
1252 | 184 | if err != nil { | ||
1253 | 185 | panic(err.String()) | ||
1254 | 186 | } else if result.ExitStatus() != 0 { | ||
1255 | 187 | panic("'zkServer.sh start' exited with non-zero status") | ||
1256 | 188 | } | ||
1257 | 189 | } | ||
1258 | 190 | |||
1259 | 191 | func (s *S) StopZK() { | ||
1260 | 192 | attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} | ||
1261 | 193 | proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr) | ||
1262 | 194 | if err != nil { | ||
1263 | 195 | panic("Problem executing zkServer.sh stop: " + err.String() + | ||
1264 | 196 | " (look for runaway java processes!)") | ||
1265 | 197 | } | ||
1266 | 198 | result, err := proc.Wait(0) | ||
1267 | 199 | if err != nil { | ||
1268 | 200 | panic(err.String()) | ||
1269 | 201 | } else if result.ExitStatus() != 0 { | ||
1270 | 202 | panic("'zkServer.sh stop' exited with non-zero status " + | ||
1271 | 203 | "(look for runaway java processes!)") | ||
1274 | 204 | } | 127 | } |
1275 | 205 | } | 128 | } |
1276 | 206 | 129 | ||
1277 | === renamed file 'gozk.go' => 'zk.go' | |||
1278 | --- gozk.go 2011-08-19 01:56:39 +0000 | |||
1279 | +++ zk.go 2011-10-03 17:02:23 +0000 | |||
1280 | @@ -1,4 +1,4 @@ | |||
1282 | 1 | // gozk - Zookeeper support for the Go language | 1 | // gozk - ZooKeeper support for the Go language |
1283 | 2 | // | 2 | // |
1284 | 3 | // https://wiki.ubuntu.com/gozk | 3 | // https://wiki.ubuntu.com/gozk |
1285 | 4 | // | 4 | // |
1286 | @@ -6,7 +6,7 @@ | |||
1287 | 6 | // | 6 | // |
1288 | 7 | // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> | 7 | // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> |
1289 | 8 | // | 8 | // |
1291 | 9 | package gozk | 9 | package zk |
1292 | 10 | 10 | ||
1293 | 11 | /* | 11 | /* |
1294 | 12 | #cgo CFLAGS: -I/usr/include/c-client-src | 12 | #cgo CFLAGS: -I/usr/include/c-client-src |
1295 | @@ -27,17 +27,16 @@ | |||
1296 | 27 | // ----------------------------------------------------------------------- | 27 | // ----------------------------------------------------------------------- |
1297 | 28 | // Main constants and data types. | 28 | // Main constants and data types. |
1298 | 29 | 29 | ||
1302 | 30 | // The main ZooKeeper object, created through the Init function. | 30 | // Conn represents a connection to a set of ZooKeeper nodes. |
1303 | 31 | // Encapsulates all communication with ZooKeeper. | 31 | type Conn struct { |
1301 | 32 | type ZooKeeper struct { | ||
1304 | 33 | watchChannels map[uintptr]chan Event | 32 | watchChannels map[uintptr]chan Event |
1305 | 34 | sessionWatchId uintptr | 33 | sessionWatchId uintptr |
1306 | 35 | handle *C.zhandle_t | 34 | handle *C.zhandle_t |
1307 | 36 | mutex sync.Mutex | 35 | mutex sync.Mutex |
1308 | 37 | } | 36 | } |
1309 | 38 | 37 | ||
1312 | 39 | // ClientId represents the established session in ZooKeeper. This is only | 38 | // ClientId represents an established ZooKeeper session. It can be |
1313 | 40 | // useful to be passed back into the ReInit function. | 39 | // passed into Redial to reestablish a connection to an existing session. |
1314 | 41 | type ClientId struct { | 40 | type ClientId struct { |
1315 | 42 | cId C.clientid_t | 41 | cId C.clientid_t |
1316 | 43 | } | 42 | } |
1317 | @@ -59,25 +58,21 @@ | |||
1318 | 59 | // through one of the W-suffixed functions (GetW, ExistsW, etc). | 58 | // through one of the W-suffixed functions (GetW, ExistsW, etc). |
1319 | 60 | // | 59 | // |
1320 | 61 | // The session channel will only receive session-level events notifying | 60 | // The session channel will only receive session-level events notifying |
1328 | 62 | // about critical and transient changes in the ZooKeeper connection | 61 | // about changes in the ZooKeeper connection state (STATE_CONNECTED, |
1329 | 63 | // state (STATE_CONNECTED, STATE_EXPIRED_SESSION, etc). On long | 62 | // STATE_EXPIRED_SESSION, etc). On long running applications the session |
1330 | 64 | // running applications the session channel must *necessarily* be | 63 | // channel must *necessarily* be observed since certain events like session |
1331 | 65 | // observed since certain events like session expirations require an | 64 | // expirations require an explicit reconnection and reestablishment of |
1332 | 66 | // explicit reconnection and reestablishment of state (or bailing out). | 65 | // state (or bailing out). Because of that, the buffer used on the session |
1333 | 67 | // Because of that, the buffer used on the session channel has a limited | 66 | // channel has a limited size, and a panic will occur if too many events |
1334 | 68 | // size, and a panic will occur if too many events are not collected. | 67 | // are not collected. |
1335 | 69 | // | 68 | // |
1336 | 70 | // Watch channels enable monitoring state for nodes, and the | 69 | // Watch channels enable monitoring state for nodes, and the |
1337 | 71 | // moment they're fired depends on which function was called to | 70 | // moment they're fired depends on which function was called to |
1345 | 72 | // create them. Note that, unlike in other ZooKeeper interfaces, | 71 | // create them. Two critical session events, STATE_EXPIRED_SESSION |
1346 | 73 | // gozk will NOT dispatch unimportant session events such as | 72 | // and STATE_EXPIRED_SESSION, will be delivered to all |
1347 | 74 | // STATE_ASSOCIATING, STATE_CONNECTING and STATE_CONNECTED to | 73 | // node watch channels if they occur. |
1341 | 75 | // watch Event channels, since they are transient and disruptive | ||
1342 | 76 | // to the workflow. Critical state changes such as expirations | ||
1343 | 77 | // are still delivered to all event channels, though, and the | ||
1344 | 78 | // transient events may be obsererved in the session channel. | ||
1348 | 79 | // | 74 | // |
1350 | 80 | // Since every watch channel may receive critical session events, events | 75 | // Since every watch channel may receive these critical session events, events |
1351 | 81 | // received must not be handled blindly as if the watch requested has | 76 | // received must not be handled blindly as if the watch requested has |
1352 | 82 | // been fired. To facilitate such tests, Events offer the Ok method, | 77 | // been fired. To facilitate such tests, Events offer the Ok method, |
1353 | 83 | // and they also have a good String method so they may be used as an | 78 | // and they also have a good String method so they may be used as an |
1354 | @@ -89,44 +84,78 @@ | |||
1355 | 89 | // return | 84 | // return |
1356 | 90 | // } | 85 | // } |
1357 | 91 | // | 86 | // |
1359 | 92 | // Note that closed channels will deliver zeroed Event, which means | 87 | // Note that closed channels will deliver a zeroed Event, which means |
1360 | 93 | // event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED, | 88 | // event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED, |
1361 | 94 | // to facilitate handling. | 89 | // to facilitate handling. |
1362 | 95 | type Event struct { | 90 | type Event struct { |
1365 | 96 | Type int | 91 | // Type gives the type of event (one of the EVENT_* constants). |
1366 | 97 | Path string | 92 | // If Type is EVENT_SESSION, then the events is a session |
1367 | 93 | // event. | ||
1368 | 94 | Type int | ||
1369 | 95 | |||
1370 | 96 | // For non-session events, Path gives the path of the node | ||
1371 | 97 | // that was being watched. | ||
1372 | 98 | Path string | ||
1373 | 99 | |||
1374 | 100 | // For session events, State (one of the STATE* constants) gives the session | ||
1375 | 101 | // status. | ||
1376 | 98 | State int | 102 | State int |
1377 | 99 | } | 103 | } |
1378 | 100 | 104 | ||
1381 | 101 | // Error codes that may be used to verify the result of the | 105 | // Error represents a ZooKeeper error. |
1382 | 102 | // Code method from Error. | 106 | type Error int |
1383 | 107 | |||
1384 | 103 | const ( | 108 | const ( |
1409 | 104 | ZOK = C.ZOK | 109 | ZOK Error = C.ZOK |
1410 | 105 | ZSYSTEMERROR = C.ZSYSTEMERROR | 110 | ZSYSTEMERROR Error = C.ZSYSTEMERROR |
1411 | 106 | ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY | 111 | ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY |
1412 | 107 | ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY | 112 | ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY |
1413 | 108 | ZCONNECTIONLOSS = C.ZCONNECTIONLOSS | 113 | ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS |
1414 | 109 | ZMARSHALLINGERROR = C.ZMARSHALLINGERROR | 114 | ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR |
1415 | 110 | ZUNIMPLEMENTED = C.ZUNIMPLEMENTED | 115 | ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED |
1416 | 111 | ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT | 116 | ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT |
1417 | 112 | ZBADARGUMENTS = C.ZBADARGUMENTS | 117 | ZBADARGUMENTS Error = C.ZBADARGUMENTS |
1418 | 113 | ZINVALIDSTATE = C.ZINVALIDSTATE | 118 | ZINVALIDSTATE Error = C.ZINVALIDSTATE |
1419 | 114 | ZAPIERROR = C.ZAPIERROR | 119 | ZAPIERROR Error = C.ZAPIERROR |
1420 | 115 | ZNONODE = C.ZNONODE | 120 | ZNONODE Error = C.ZNONODE |
1421 | 116 | ZNOAUTH = C.ZNOAUTH | 121 | ZNOAUTH Error = C.ZNOAUTH |
1422 | 117 | ZBADVERSION = C.ZBADVERSION | 122 | ZBADVERSION Error = C.ZBADVERSION |
1423 | 118 | ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS | 123 | ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS |
1424 | 119 | ZNODEEXISTS = C.ZNODEEXISTS | 124 | ZNODEEXISTS Error = C.ZNODEEXISTS |
1425 | 120 | ZNOTEMPTY = C.ZNOTEMPTY | 125 | ZNOTEMPTY Error = C.ZNOTEMPTY |
1426 | 121 | ZSESSIONEXPIRED = C.ZSESSIONEXPIRED | 126 | ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED |
1427 | 122 | ZINVALIDCALLBACK = C.ZINVALIDCALLBACK | 127 | ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK |
1428 | 123 | ZINVALIDACL = C.ZINVALIDACL | 128 | ZINVALIDACL Error = C.ZINVALIDACL |
1429 | 124 | ZAUTHFAILED = C.ZAUTHFAILED | 129 | ZAUTHFAILED Error = C.ZAUTHFAILED |
1430 | 125 | ZCLOSING = C.ZCLOSING | 130 | ZCLOSING Error = C.ZCLOSING |
1431 | 126 | ZNOTHING = C.ZNOTHING | 131 | ZNOTHING Error = C.ZNOTHING |
1432 | 127 | ZSESSIONMOVED = C.ZSESSIONMOVED | 132 | ZSESSIONMOVED Error = C.ZSESSIONMOVED |
1433 | 128 | ) | 133 | ) |
1434 | 129 | 134 | ||
1435 | 135 | func (error Error) String() string { | ||
1436 | 136 | return C.GoString(C.zerror(C.int(error))) // Static, no need to free it. | ||
1437 | 137 | } | ||
1438 | 138 | |||
1439 | 139 | // zkError creates an appropriate error return from | ||
1440 | 140 | // a ZooKeeper status and the errno return from a C API | ||
1441 | 141 | // call. | ||
1442 | 142 | func zkError(rc C.int, cerr os.Error) os.Error { | ||
1443 | 143 | code := Error(rc) | ||
1444 | 144 | switch code { | ||
1445 | 145 | case ZOK: | ||
1446 | 146 | return nil | ||
1447 | 147 | |||
1448 | 148 | case ZSYSTEMERROR: | ||
1449 | 149 | // If a ZooKeeper call returns ZSYSTEMERROR, then | ||
1450 | 150 | // errno becomes significant. If errno has not been | ||
1451 | 151 | // set, then we will return ZSYSTEMERROR nonetheless. | ||
1452 | 152 | if cerr != nil { | ||
1453 | 153 | return cerr | ||
1454 | 154 | } | ||
1455 | 155 | } | ||
1456 | 156 | return code | ||
1457 | 157 | } | ||
1458 | 158 | |||
1459 | 130 | // Constants for SetLogLevel. | 159 | // Constants for SetLogLevel. |
1460 | 131 | const ( | 160 | const ( |
1461 | 132 | LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR | 161 | LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR |
1462 | @@ -141,8 +170,8 @@ | |||
1463 | 141 | 170 | ||
1464 | 142 | // Constants for Create's flags parameter. | 171 | // Constants for Create's flags parameter. |
1465 | 143 | const ( | 172 | const ( |
1468 | 144 | EPHEMERAL = 1 << iota | 173 | EPHEMERAL = 1 << iota // The node will be deleted when the current session ends. |
1469 | 145 | SEQUENCE | 174 | SEQUENCE // Append a sequence number to the node name. |
1470 | 146 | ) | 175 | ) |
1471 | 147 | 176 | ||
1472 | 148 | // Constants for ACL Perms. | 177 | // Constants for ACL Perms. |
1473 | @@ -273,104 +302,79 @@ | |||
1474 | 273 | } | 302 | } |
1475 | 274 | 303 | ||
1476 | 275 | // ----------------------------------------------------------------------- | 304 | // ----------------------------------------------------------------------- |
1477 | 276 | // Error interface which maps onto the ZooKeeper error codes. | ||
1478 | 277 | |||
1479 | 278 | type Error interface { | ||
1480 | 279 | String() string | ||
1481 | 280 | Code() int | ||
1482 | 281 | } | ||
1483 | 282 | |||
1484 | 283 | type errorType struct { | ||
1485 | 284 | zkrc C.int | ||
1486 | 285 | err os.Error | ||
1487 | 286 | } | ||
1488 | 287 | |||
1489 | 288 | func newError(zkrc C.int, err os.Error) Error { | ||
1490 | 289 | return &errorType{zkrc, err} | ||
1491 | 290 | } | ||
1492 | 291 | |||
1493 | 292 | func (error *errorType) String() (result string) { | ||
1494 | 293 | if error.zkrc == ZSYSTEMERROR && error.err != nil { | ||
1495 | 294 | result = error.err.String() | ||
1496 | 295 | } else { | ||
1497 | 296 | result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it. | ||
1498 | 297 | } | ||
1499 | 298 | return | ||
1500 | 299 | } | ||
1501 | 300 | |||
1502 | 301 | // Code returns the error code that may be compared against one of | ||
1503 | 302 | // the gozk.Z* constants. | ||
1504 | 303 | func (error *errorType) Code() int { | ||
1505 | 304 | return int(error.zkrc) | ||
1506 | 305 | } | ||
1507 | 306 | |||
1508 | 307 | // ----------------------------------------------------------------------- | ||
1509 | 308 | // Stat interface which maps onto the ZooKeeper Stat struct. | ||
1510 | 309 | |||
1511 | 310 | // We declare this as an interface rather than an actual struct because | ||
1512 | 311 | // this way we don't have to copy data around between the real C struct | ||
1513 | 312 | // and the Go one on every call. Most uses will only touch a few elements, | ||
1514 | 313 | // or even ignore the stat entirely, so that's a win. | ||
1515 | 314 | 305 | ||
1516 | 315 | // Stat contains detailed information about a node. | 306 | // Stat contains detailed information about a node. |
1575 | 316 | type Stat interface { | 307 | type Stat struct { |
1576 | 317 | Czxid() int64 | 308 | c C.struct_Stat |
1577 | 318 | Mzxid() int64 | 309 | } |
1578 | 319 | CTime() int64 | 310 | |
1579 | 320 | MTime() int64 | 311 | func (stat *Stat) String() string { |
1580 | 321 | Version() int32 | 312 | return fmt.Sprintf( |
1581 | 322 | CVersion() int32 | 313 | "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+ |
1582 | 323 | AVersion() int32 | 314 | "Version: %d; CVersion: %d; AVersion: %d; "+ |
1583 | 324 | EphemeralOwner() int64 | 315 | "EphemeralOwner: %d; DataLength: %d; "+ |
1584 | 325 | DataLength() int32 | 316 | "NumChildren: %d; Pzxid: %d}", |
1585 | 326 | NumChildren() int32 | 317 | stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(), |
1586 | 327 | Pzxid() int64 | 318 | stat.Version(), stat.CVersion(), stat.AVersion(), |
1587 | 328 | } | 319 | stat.EphemeralOwner(), stat.DataLength(), |
1588 | 329 | 320 | stat.NumChildren(), stat.Pzxid(), | |
1589 | 330 | type resultStat C.struct_Stat | 321 | ) |
1590 | 331 | 322 | } | |
1591 | 332 | func (stat *resultStat) Czxid() int64 { | 323 | |
1592 | 333 | return int64(stat.czxid) | 324 | // Czxid returns the zxid of the change that caused the node to be created. |
1593 | 334 | } | 325 | func (stat *Stat) Czxid() int64 { |
1594 | 335 | 326 | return int64(stat.c.czxid) | |
1595 | 336 | func (stat *resultStat) Mzxid() int64 { | 327 | } |
1596 | 337 | return int64(stat.mzxid) | 328 | |
1597 | 338 | } | 329 | // Mzxid returns the zxid of the change that last modified the node. |
1598 | 339 | 330 | func (stat *Stat) Mzxid() int64 { | |
1599 | 340 | func (stat *resultStat) CTime() int64 { | 331 | return int64(stat.c.mzxid) |
1600 | 341 | return int64(stat.ctime) | 332 | } |
1601 | 342 | } | 333 | |
1602 | 343 | 334 | // CTime returns the time in milliseconds from epoch when the node was created. | |
1603 | 344 | func (stat *resultStat) MTime() int64 { | 335 | func (stat *Stat) CTime() int64 { |
1604 | 345 | return int64(stat.mtime) | 336 | return int64(stat.c.ctime) |
1605 | 346 | } | 337 | } |
1606 | 347 | 338 | ||
1607 | 348 | func (stat *resultStat) Version() int32 { | 339 | // MTime returns the time in milliseconds from epoch when the node was last modified. |
1608 | 349 | return int32(stat.version) | 340 | func (stat *Stat) MTime() int64 { |
1609 | 350 | } | 341 | return int64(stat.c.mtime) |
1610 | 351 | 342 | } | |
1611 | 352 | func (stat *resultStat) CVersion() int32 { | 343 | |
1612 | 353 | return int32(stat.cversion) | 344 | // Version returns the number of changes to the data of the node. |
1613 | 354 | } | 345 | func (stat *Stat) Version() int32 { |
1614 | 355 | 346 | return int32(stat.c.version) | |
1615 | 356 | func (stat *resultStat) AVersion() int32 { | 347 | } |
1616 | 357 | return int32(stat.aversion) | 348 | |
1617 | 358 | } | 349 | // CVersion returns the number of changes to the children of the node. |
1618 | 359 | 350 | func (stat *Stat) CVersion() int32 { | |
1619 | 360 | func (stat *resultStat) EphemeralOwner() int64 { | 351 | return int32(stat.c.cversion) |
1620 | 361 | return int64(stat.ephemeralOwner) | 352 | } |
1621 | 362 | } | 353 | |
1622 | 363 | 354 | // AVersion returns the number of changes to the ACL of the node. | |
1623 | 364 | func (stat *resultStat) DataLength() int32 { | 355 | func (stat *Stat) AVersion() int32 { |
1624 | 365 | return int32(stat.dataLength) | 356 | return int32(stat.c.aversion) |
1625 | 366 | } | 357 | } |
1626 | 367 | 358 | ||
1627 | 368 | func (stat *resultStat) NumChildren() int32 { | 359 | // If the node is an ephemeral node, EphemeralOwner returns the session id |
1628 | 369 | return int32(stat.numChildren) | 360 | // of the owner of the node; otherwise it will return zero. |
1629 | 370 | } | 361 | func (stat *Stat) EphemeralOwner() int64 { |
1630 | 371 | 362 | return int64(stat.c.ephemeralOwner) | |
1631 | 372 | func (stat *resultStat) Pzxid() int64 { | 363 | } |
1632 | 373 | return int64(stat.pzxid) | 364 | |
1633 | 365 | // DataLength returns the length of the data in the node in bytes. | ||
1634 | 366 | func (stat *Stat) DataLength() int32 { | ||
1635 | 367 | return int32(stat.c.dataLength) | ||
1636 | 368 | } | ||
1637 | 369 | |||
1638 | 370 | // NumChildren returns the number of children of the znode. | ||
1639 | 371 | func (stat *Stat) NumChildren() int32 { | ||
1640 | 372 | return int32(stat.c.numChildren) | ||
1641 | 373 | } | ||
1642 | 374 | |||
1643 | 375 | // Pzxid returns the Pzxid of the node, whatever that is. | ||
1644 | 376 | func (stat *Stat) Pzxid() int64 { | ||
1645 | 377 | return int64(stat.c.pzxid) | ||
1646 | 374 | } | 378 | } |
1647 | 375 | 379 | ||
1648 | 376 | // ----------------------------------------------------------------------- | 380 | // ----------------------------------------------------------------------- |
1649 | @@ -380,11 +384,13 @@ | |||
1650 | 380 | 384 | ||
1651 | 381 | // SetLogLevel changes the minimum level of logging output generated | 385 | // SetLogLevel changes the minimum level of logging output generated |
1652 | 382 | // to adjust the amount of information provided. | 386 | // to adjust the amount of information provided. |
1653 | 387 | // The default is LOG_ERROR. If level is zero, no logging output | ||
1654 | 388 | // will be generated. | ||
1655 | 383 | func SetLogLevel(level int) { | 389 | func SetLogLevel(level int) { |
1656 | 384 | C.zoo_set_debug_level(C.ZooLogLevel(level)) | 390 | C.zoo_set_debug_level(C.ZooLogLevel(level)) |
1657 | 385 | } | 391 | } |
1658 | 386 | 392 | ||
1660 | 387 | // Init initializes the communication with a ZooKeeper cluster. The provided | 393 | // Dial initializes the communication with a ZooKeeper cluster. The provided |
1661 | 388 | // servers parameter may include multiple server addresses, separated | 394 | // servers parameter may include multiple server addresses, separated |
1662 | 389 | // by commas, so that the client will automatically attempt to connect | 395 | // by commas, so that the client will automatically attempt to connect |
1663 | 390 | // to another server if one of them stops working for whatever reason. | 396 | // to another server if one of them stops working for whatever reason. |
1664 | @@ -398,79 +404,73 @@ | |||
1665 | 398 | // The watch channel receives events of type SESSION_EVENT when any change | 404 | // The watch channel receives events of type SESSION_EVENT when any change |
1666 | 399 | // to the state of the established connection happens. See the documentation | 405 | // to the state of the established connection happens. See the documentation |
1667 | 400 | // for the Event type for more details. | 406 | // for the Event type for more details. |
1671 | 401 | func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) { | 407 | func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) { |
1672 | 402 | zk, watch, err = internalInit(servers, recvTimeoutNS, nil) | 408 | return dial(servers, recvTimeoutNS, nil) |
1670 | 403 | return | ||
1673 | 404 | } | 409 | } |
1674 | 405 | 410 | ||
1676 | 406 | // Equivalent to Init, but attempt to reestablish an existing session | 411 | // Redial is equivalent to Dial, but attempts to reestablish an existing session |
1677 | 407 | // identified via the clientId parameter. | 412 | // identified via the clientId parameter. |
1681 | 408 | func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) { | 413 | func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1682 | 409 | zk, watch, err = internalInit(servers, recvTimeoutNS, clientId) | 414 | return dial(servers, recvTimeoutNS, clientId) |
1680 | 410 | return | ||
1683 | 411 | } | 415 | } |
1684 | 412 | 416 | ||
1689 | 413 | func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) { | 417 | func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1690 | 414 | 418 | conn := &Conn{} | |
1691 | 415 | zk := &ZooKeeper{} | 419 | conn.watchChannels = make(map[uintptr]chan Event) |
1688 | 416 | zk.watchChannels = make(map[uintptr]chan Event) | ||
1692 | 417 | 420 | ||
1693 | 418 | var cId *C.clientid_t | 421 | var cId *C.clientid_t |
1694 | 419 | if clientId != nil { | 422 | if clientId != nil { |
1695 | 420 | cId = &clientId.cId | 423 | cId = &clientId.cId |
1696 | 421 | } | 424 | } |
1697 | 422 | 425 | ||
1700 | 423 | watchId, watchChannel := zk.createWatch(true) | 426 | watchId, watchChannel := conn.createWatch(true) |
1701 | 424 | zk.sessionWatchId = watchId | 427 | conn.sessionWatchId = watchId |
1702 | 425 | 428 | ||
1703 | 426 | cservers := C.CString(servers) | 429 | cservers := C.CString(servers) |
1705 | 427 | handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0) | 430 | handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0) |
1706 | 428 | C.free(unsafe.Pointer(cservers)) | 431 | C.free(unsafe.Pointer(cservers)) |
1707 | 429 | if handle == nil { | 432 | if handle == nil { |
1710 | 430 | zk.closeAllWatches() | 433 | conn.closeAllWatches() |
1711 | 431 | return nil, nil, newError(ZSYSTEMERROR, cerr) | 434 | return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr) |
1712 | 432 | } | 435 | } |
1714 | 433 | zk.handle = handle | 436 | conn.handle = handle |
1715 | 434 | runWatchLoop() | 437 | runWatchLoop() |
1717 | 435 | return zk, watchChannel, nil | 438 | return conn, watchChannel, nil |
1718 | 436 | } | 439 | } |
1719 | 437 | 440 | ||
1720 | 438 | // ClientId returns the client ID for the existing session with ZooKeeper. | 441 | // ClientId returns the client ID for the existing session with ZooKeeper. |
1721 | 439 | // This is useful to reestablish an existing session via ReInit. | 442 | // This is useful to reestablish an existing session via ReInit. |
1724 | 440 | func (zk *ZooKeeper) ClientId() *ClientId { | 443 | func (conn *Conn) ClientId() *ClientId { |
1725 | 441 | return &ClientId{*C.zoo_client_id(zk.handle)} | 444 | return &ClientId{*C.zoo_client_id(conn.handle)} |
1726 | 442 | } | 445 | } |
1727 | 443 | 446 | ||
1728 | 444 | // Close terminates the ZooKeeper interaction. | 447 | // Close terminates the ZooKeeper interaction. |
1736 | 445 | func (zk *ZooKeeper) Close() Error { | 448 | func (conn *Conn) Close() os.Error { |
1737 | 446 | 449 | ||
1738 | 447 | // Protect from concurrency around zk.handle change. | 450 | // Protect from concurrency around conn.handle change. |
1739 | 448 | zk.mutex.Lock() | 451 | conn.mutex.Lock() |
1740 | 449 | defer zk.mutex.Unlock() | 452 | defer conn.mutex.Unlock() |
1741 | 450 | 453 | ||
1742 | 451 | if zk.handle == nil { | 454 | if conn.handle == nil { |
1743 | 452 | // ZooKeeper may hang indefinitely if a handler is closed twice, | 455 | // ZooKeeper may hang indefinitely if a handler is closed twice, |
1744 | 453 | // so we get in the way and prevent it from happening. | 456 | // so we get in the way and prevent it from happening. |
1746 | 454 | return newError(ZCLOSING, nil) | 457 | return ZCLOSING |
1747 | 455 | } | 458 | } |
1749 | 456 | rc, cerr := C.zookeeper_close(zk.handle) | 459 | rc, cerr := C.zookeeper_close(conn.handle) |
1750 | 457 | 460 | ||
1752 | 458 | zk.closeAllWatches() | 461 | conn.closeAllWatches() |
1753 | 459 | stopWatchLoop() | 462 | stopWatchLoop() |
1754 | 460 | 463 | ||
1757 | 461 | // At this point, nothing else should need zk.handle. | 464 | // At this point, nothing else should need conn.handle. |
1758 | 462 | zk.handle = nil | 465 | conn.handle = nil |
1759 | 463 | 466 | ||
1764 | 464 | if rc != C.ZOK { | 467 | return zkError(rc, cerr) |
1761 | 465 | return newError(rc, cerr) | ||
1762 | 466 | } | ||
1763 | 467 | return nil | ||
1765 | 468 | } | 468 | } |
1766 | 469 | 469 | ||
1767 | 470 | // Get returns the data and status from an existing node. err will be nil, | 470 | // Get returns the data and status from an existing node. err will be nil, |
1768 | 471 | // unless an error is found. Attempting to retrieve data from a non-existing | 471 | // unless an error is found. Attempting to retrieve data from a non-existing |
1769 | 472 | // node is an error. | 472 | // node is an error. |
1771 | 473 | func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) { | 473 | func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) { |
1772 | 474 | 474 | ||
1773 | 475 | cpath := C.CString(path) | 475 | cpath := C.CString(path) |
1774 | 476 | cbuffer := (*C.char)(C.malloc(bufferSize)) | 476 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1775 | @@ -478,22 +478,22 @@ | |||
1776 | 478 | defer C.free(unsafe.Pointer(cpath)) | 478 | defer C.free(unsafe.Pointer(cpath)) |
1777 | 479 | defer C.free(unsafe.Pointer(cbuffer)) | 479 | defer C.free(unsafe.Pointer(cbuffer)) |
1778 | 480 | 480 | ||
1782 | 481 | cstat := C.struct_Stat{} | 481 | var cstat Stat |
1783 | 482 | rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil, | 482 | rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c) |
1781 | 483 | cbuffer, &cbufferLen, &cstat) | ||
1784 | 484 | if rc != C.ZOK { | 483 | if rc != C.ZOK { |
1786 | 485 | return "", nil, newError(rc, cerr) | 484 | return "", nil, zkError(rc, cerr) |
1787 | 486 | } | 485 | } |
1788 | 486 | |||
1789 | 487 | result := C.GoStringN(cbuffer, cbufferLen) | 487 | result := C.GoStringN(cbuffer, cbufferLen) |
1792 | 488 | 488 | return result, &cstat, nil | |
1791 | 489 | return result, (*resultStat)(&cstat), nil | ||
1793 | 490 | } | 489 | } |
1794 | 491 | 490 | ||
1795 | 492 | // GetW works like Get but also returns a channel that will receive | 491 | // GetW works like Get but also returns a channel that will receive |
1800 | 493 | // a single Event value when the data or existence of the given ZooKeeper | 492 | // a single Event value, with Type EVENT_CHANGED or EVENT_DELETED, |
1801 | 494 | // node changes or when critical session events happen. See the | 493 | // when the node contents change or it is deleted. |
1802 | 495 | // documentation of the Event type for more details. | 494 | // It will also receive a critical session event if the server goes down. |
1803 | 496 | func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) { | 495 | // See the documentation of the Event type for more details. |
1804 | 496 | func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) { | ||
1805 | 497 | 497 | ||
1806 | 498 | cpath := C.CString(path) | 498 | cpath := C.CString(path) |
1807 | 499 | cbuffer := (*C.char)(C.malloc(bufferSize)) | 499 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1808 | @@ -501,73 +501,69 @@ | |||
1809 | 501 | defer C.free(unsafe.Pointer(cpath)) | 501 | defer C.free(unsafe.Pointer(cpath)) |
1810 | 502 | defer C.free(unsafe.Pointer(cbuffer)) | 502 | defer C.free(unsafe.Pointer(cbuffer)) |
1811 | 503 | 503 | ||
1813 | 504 | watchId, watchChannel := zk.createWatch(true) | 504 | watchId, watchChannel := conn.createWatch(true) |
1814 | 505 | 505 | ||
1819 | 506 | cstat := C.struct_Stat{} | 506 | var cstat Stat |
1820 | 507 | rc, cerr := C.zoo_wget(zk.handle, cpath, | 507 | rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c) |
1817 | 508 | C.watch_handler, unsafe.Pointer(watchId), | ||
1818 | 509 | cbuffer, &cbufferLen, &cstat) | ||
1821 | 510 | if rc != C.ZOK { | 508 | if rc != C.ZOK { |
1824 | 511 | zk.forgetWatch(watchId) | 509 | conn.forgetWatch(watchId) |
1825 | 512 | return "", nil, nil, newError(rc, cerr) | 510 | return "", nil, nil, zkError(rc, cerr) |
1826 | 513 | } | 511 | } |
1827 | 514 | 512 | ||
1828 | 515 | result := C.GoStringN(cbuffer, cbufferLen) | 513 | result := C.GoStringN(cbuffer, cbufferLen) |
1830 | 516 | return result, (*resultStat)(&cstat), watchChannel, nil | 514 | return result, &cstat, watchChannel, nil |
1831 | 517 | } | 515 | } |
1832 | 518 | 516 | ||
1833 | 519 | // Children returns the children list and status from an existing node. | 517 | // Children returns the children list and status from an existing node. |
1837 | 520 | // err will be nil, unless an error is found. Attempting to retrieve the | 518 | // Attempting to retrieve the children list from a non-existent node is an error. |
1838 | 521 | // children list from a non-existent node is an error. | 519 | func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) { |
1836 | 522 | func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) { | ||
1839 | 523 | 520 | ||
1840 | 524 | cpath := C.CString(path) | 521 | cpath := C.CString(path) |
1841 | 525 | defer C.free(unsafe.Pointer(cpath)) | 522 | defer C.free(unsafe.Pointer(cpath)) |
1842 | 526 | 523 | ||
1843 | 527 | cvector := C.struct_String_vector{} | 524 | cvector := C.struct_String_vector{} |
1847 | 528 | cstat := C.struct_Stat{} | 525 | var cstat Stat |
1848 | 529 | rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil, | 526 | rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c) |
1846 | 530 | &cvector, &cstat) | ||
1849 | 531 | 527 | ||
1850 | 532 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. | 528 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1851 | 533 | if cvector.count != 0 { | 529 | if cvector.count != 0 { |
1852 | 534 | children = parseStringVector(&cvector) | 530 | children = parseStringVector(&cvector) |
1853 | 535 | } | 531 | } |
1856 | 536 | if rc != C.ZOK { | 532 | if rc == C.ZOK { |
1857 | 537 | err = newError(rc, cerr) | 533 | stat = &cstat |
1858 | 538 | } else { | 534 | } else { |
1860 | 539 | stat = (*resultStat)(&cstat) | 535 | err = zkError(rc, cerr) |
1861 | 540 | } | 536 | } |
1862 | 541 | return | 537 | return |
1863 | 542 | } | 538 | } |
1864 | 543 | 539 | ||
1865 | 544 | // ChildrenW works like Children but also returns a channel that will | 540 | // ChildrenW works like Children but also returns a channel that will |
1870 | 545 | // receive a single Event value when a node is added or removed under the | 541 | // receive a single Event value when a child of the given path is |
1871 | 546 | // provided path or when critical session events happen. See the documentation | 542 | // added or removed (EVENT_CHILD), or when the path node itself is |
1872 | 547 | // of the Event type for more details. | 543 | // deleted (EVENT_DELETED). |
1873 | 548 | func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) { | 544 | // It will also receive a critical session event if the server goes down. |
1874 | 545 | // See the documentation of the Event type for more details. | ||
1875 | 546 | func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) { | ||
1876 | 549 | 547 | ||
1877 | 550 | cpath := C.CString(path) | 548 | cpath := C.CString(path) |
1878 | 551 | defer C.free(unsafe.Pointer(cpath)) | 549 | defer C.free(unsafe.Pointer(cpath)) |
1879 | 552 | 550 | ||
1881 | 553 | watchId, watchChannel := zk.createWatch(true) | 551 | watchId, watchChannel := conn.createWatch(true) |
1882 | 554 | 552 | ||
1883 | 555 | cvector := C.struct_String_vector{} | 553 | cvector := C.struct_String_vector{} |
1888 | 556 | cstat := C.struct_Stat{} | 554 | var cstat Stat |
1889 | 557 | rc, cerr := C.zoo_wget_children2(zk.handle, cpath, | 555 | rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c) |
1886 | 558 | C.watch_handler, unsafe.Pointer(watchId), | ||
1887 | 559 | &cvector, &cstat) | ||
1890 | 560 | 556 | ||
1891 | 561 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. | 557 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1892 | 562 | if cvector.count != 0 { | 558 | if cvector.count != 0 { |
1893 | 563 | children = parseStringVector(&cvector) | 559 | children = parseStringVector(&cvector) |
1894 | 564 | } | 560 | } |
1898 | 565 | if rc != C.ZOK { | 561 | if rc == C.ZOK { |
1899 | 566 | zk.forgetWatch(watchId) | 562 | stat = &cstat |
1900 | 567 | err = newError(rc, cerr) | 563 | watch = watchChannel |
1901 | 568 | } else { | 564 | } else { |
1904 | 569 | stat = (*resultStat)(&cstat) | 565 | conn.forgetWatch(watchId) |
1905 | 570 | watch = watchChannel | 566 | err = zkError(rc, cerr) |
1906 | 571 | } | 567 | } |
1907 | 572 | return | 568 | return |
1908 | 573 | } | 569 | } |
1909 | @@ -588,51 +584,51 @@ | |||
1910 | 588 | // Exists checks if a node exists at the given path. If it does, | 584 | // Exists checks if a node exists at the given path. If it does, |
1911 | 589 | // stat will contain meta information on the existing node, otherwise | 585 | // stat will contain meta information on the existing node, otherwise |
1912 | 590 | // it will be nil. | 586 | // it will be nil. |
1914 | 591 | func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) { | 587 | func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) { |
1915 | 592 | cpath := C.CString(path) | 588 | cpath := C.CString(path) |
1916 | 593 | defer C.free(unsafe.Pointer(cpath)) | 589 | defer C.free(unsafe.Pointer(cpath)) |
1917 | 594 | 590 | ||
1920 | 595 | cstat := C.struct_Stat{} | 591 | var cstat Stat |
1921 | 596 | rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat) | 592 | rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c) |
1922 | 597 | 593 | ||
1923 | 598 | // We diverge a bit from the usual here: a ZNONODE is not an error | 594 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1924 | 599 | // for an exists call, otherwise every Exists call would have to check | 595 | // for an exists call, otherwise every Exists call would have to check |
1925 | 600 | // for err != nil and err.Code() != ZNONODE. | 596 | // for err != nil and err.Code() != ZNONODE. |
1926 | 601 | if rc == C.ZOK { | 597 | if rc == C.ZOK { |
1928 | 602 | stat = (*resultStat)(&cstat) | 598 | stat = &cstat |
1929 | 603 | } else if rc != C.ZNONODE { | 599 | } else if rc != C.ZNONODE { |
1931 | 604 | err = newError(rc, cerr) | 600 | err = zkError(rc, cerr) |
1932 | 605 | } | 601 | } |
1933 | 606 | return | 602 | return |
1934 | 607 | } | 603 | } |
1935 | 608 | 604 | ||
1936 | 609 | // ExistsW works like Exists but also returns a channel that will | 605 | // ExistsW works like Exists but also returns a channel that will |
1942 | 610 | // receive an Event value when a node is created in case the returned | 606 | // receive a single Event, with Type EVENT_CREATED, EVENT_DELETED |
1943 | 611 | // stat is nil and the node didn't exist, or when the existing node | 607 | // or EVENT_CHANGED, when the node is next created, removed, |
1944 | 612 | // is removed. It will also receive critical session events. See the | 608 | // or its contents change. |
1945 | 613 | // documentation of the Event type for more details. | 609 | // It will also receive a critical session event if the server goes down. |
1946 | 614 | func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) { | 610 | // See the documentation of the Event type for more details. |
1947 | 611 | func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) { | ||
1948 | 615 | cpath := C.CString(path) | 612 | cpath := C.CString(path) |
1949 | 616 | defer C.free(unsafe.Pointer(cpath)) | 613 | defer C.free(unsafe.Pointer(cpath)) |
1950 | 617 | 614 | ||
1952 | 618 | watchId, watchChannel := zk.createWatch(true) | 615 | watchId, watchChannel := conn.createWatch(true) |
1953 | 619 | 616 | ||
1957 | 620 | cstat := C.struct_Stat{} | 617 | var cstat Stat |
1958 | 621 | rc, cerr := C.zoo_wexists(zk.handle, cpath, | 618 | rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c) |
1956 | 622 | C.watch_handler, unsafe.Pointer(watchId), &cstat) | ||
1959 | 623 | 619 | ||
1960 | 624 | // We diverge a bit from the usual here: a ZNONODE is not an error | 620 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1961 | 625 | // for an exists call, otherwise every Exists call would have to check | 621 | // for an exists call, otherwise every Exists call would have to check |
1962 | 626 | // for err != nil and err.Code() != ZNONODE. | 622 | // for err != nil and err.Code() != ZNONODE. |
1964 | 627 | switch rc { | 623 | switch Error(rc) { |
1965 | 628 | case ZOK: | 624 | case ZOK: |
1967 | 629 | stat = (*resultStat)(&cstat) | 625 | stat = &cstat |
1968 | 630 | watch = watchChannel | 626 | watch = watchChannel |
1969 | 631 | case ZNONODE: | 627 | case ZNONODE: |
1970 | 632 | watch = watchChannel | 628 | watch = watchChannel |
1971 | 633 | default: | 629 | default: |
1974 | 634 | zk.forgetWatch(watchId) | 630 | conn.forgetWatch(watchId) |
1975 | 635 | err = newError(rc, cerr) | 631 | err = zkError(rc, cerr) |
1976 | 636 | } | 632 | } |
1977 | 637 | return | 633 | return |
1978 | 638 | } | 634 | } |
1979 | @@ -646,7 +642,7 @@ | |||
1980 | 646 | // The returned path is useful in cases where the created path may differ | 642 | // The returned path is useful in cases where the created path may differ |
1981 | 647 | // from the requested one, such as when a sequence number is appended | 643 | // from the requested one, such as when a sequence number is appended |
1982 | 648 | // to it due to the use of the gozk.SEQUENCE flag. | 644 | // to it due to the use of the gozk.SEQUENCE flag. |
1984 | 649 | func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) { | 645 | func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) { |
1985 | 650 | cpath := C.CString(path) | 646 | cpath := C.CString(path) |
1986 | 651 | cvalue := C.CString(value) | 647 | cvalue := C.CString(value) |
1987 | 652 | defer C.free(unsafe.Pointer(cpath)) | 648 | defer C.free(unsafe.Pointer(cpath)) |
1988 | @@ -660,13 +656,13 @@ | |||
1989 | 660 | cpathCreated := (*C.char)(C.malloc(cpathLen)) | 656 | cpathCreated := (*C.char)(C.malloc(cpathLen)) |
1990 | 661 | defer C.free(unsafe.Pointer(cpathCreated)) | 657 | defer C.free(unsafe.Pointer(cpathCreated)) |
1991 | 662 | 658 | ||
1996 | 663 | rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)), | 659 | rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
1997 | 664 | caclv, C.int(flags), cpathCreated, C.int(cpathLen)) | 660 | if rc == C.ZOK { |
1998 | 665 | if rc != C.ZOK { | 661 | pathCreated = C.GoString(cpathCreated) |
1999 | 666 | return "", newError(rc, cerr) | 662 | } else { |
2000 | 663 | err = zkError(rc, cerr) | ||
2001 | 667 | } | 664 | } |
2004 | 668 | 665 | return | |
2003 | 669 | return C.GoString(cpathCreated), nil | ||
2005 | 670 | } | 666 | } |
2006 | 671 | 667 | ||
2007 | 672 | // Set modifies the data for the existing node at the given path, replacing it | 668 | // Set modifies the data for the existing node at the given path, replacing it |
2008 | @@ -677,35 +673,31 @@ | |||
2009 | 677 | // | 673 | // |
2010 | 678 | // It is an error to attempt to set the data of a non-existing node with | 674 | // It is an error to attempt to set the data of a non-existing node with |
2011 | 679 | // this function. In these cases, use Create instead. | 675 | // this function. In these cases, use Create instead. |
2013 | 680 | func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) { | 676 | func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) { |
2014 | 681 | 677 | ||
2015 | 682 | cpath := C.CString(path) | 678 | cpath := C.CString(path) |
2016 | 683 | cvalue := C.CString(value) | 679 | cvalue := C.CString(value) |
2017 | 684 | defer C.free(unsafe.Pointer(cpath)) | 680 | defer C.free(unsafe.Pointer(cpath)) |
2018 | 685 | defer C.free(unsafe.Pointer(cvalue)) | 681 | defer C.free(unsafe.Pointer(cvalue)) |
2019 | 686 | 682 | ||
2026 | 687 | cstat := C.struct_Stat{} | 683 | var cstat Stat |
2027 | 688 | 684 | rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c) | |
2028 | 689 | rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)), | 685 | if rc == C.ZOK { |
2029 | 690 | C.int(version), &cstat) | 686 | stat = &cstat |
2030 | 691 | if rc != C.ZOK { | 687 | } else { |
2031 | 692 | return nil, newError(rc, cerr) | 688 | err = zkError(rc, cerr) |
2032 | 693 | } | 689 | } |
2035 | 694 | 690 | return | |
2034 | 695 | return (*resultStat)(&cstat), nil | ||
2036 | 696 | } | 691 | } |
2037 | 697 | 692 | ||
2038 | 698 | // Delete removes the node at path. If version is not -1, the operation | 693 | // Delete removes the node at path. If version is not -1, the operation |
2039 | 699 | // will only succeed if the node is still at this version when the | 694 | // will only succeed if the node is still at this version when the |
2040 | 700 | // node is deleted as an atomic operation. | 695 | // node is deleted as an atomic operation. |
2042 | 701 | func (zk *ZooKeeper) Delete(path string, version int32) (err Error) { | 696 | func (conn *Conn) Delete(path string, version int32) (err os.Error) { |
2043 | 702 | cpath := C.CString(path) | 697 | cpath := C.CString(path) |
2044 | 703 | defer C.free(unsafe.Pointer(cpath)) | 698 | defer C.free(unsafe.Pointer(cpath)) |
2050 | 704 | rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version)) | 699 | rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) |
2051 | 705 | if rc != C.ZOK { | 700 | return zkError(rc, cerr) |
2047 | 706 | return newError(rc, cerr) | ||
2048 | 707 | } | ||
2049 | 708 | return nil | ||
2052 | 709 | } | 701 | } |
2053 | 710 | 702 | ||
2054 | 711 | // AddAuth adds a new authentication certificate to the ZooKeeper | 703 | // AddAuth adds a new authentication certificate to the ZooKeeper |
2055 | @@ -713,7 +705,7 @@ | |||
2056 | 713 | // authentication information, while the cert parameter provides the | 705 | // authentication information, while the cert parameter provides the |
2057 | 714 | // identity data itself. For instance, the "digest" scheme requires | 706 | // identity data itself. For instance, the "digest" scheme requires |
2058 | 715 | // a pair like "username:password" to be provided as the certificate. | 707 | // a pair like "username:password" to be provided as the certificate. |
2060 | 716 | func (zk *ZooKeeper) AddAuth(scheme, cert string) Error { | 708 | func (conn *Conn) AddAuth(scheme, cert string) os.Error { |
2061 | 717 | cscheme := C.CString(scheme) | 709 | cscheme := C.CString(scheme) |
2062 | 718 | ccert := C.CString(cert) | 710 | ccert := C.CString(cert) |
2063 | 719 | defer C.free(unsafe.Pointer(cscheme)) | 711 | defer C.free(unsafe.Pointer(cscheme)) |
2064 | @@ -725,43 +717,38 @@ | |||
2065 | 725 | } | 717 | } |
2066 | 726 | defer C.destroy_completion_data(data) | 718 | defer C.destroy_completion_data(data) |
2067 | 727 | 719 | ||
2070 | 728 | rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)), | 720 | rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data)) |
2069 | 729 | C.handle_void_completion, unsafe.Pointer(data)) | ||
2071 | 730 | if rc != C.ZOK { | 721 | if rc != C.ZOK { |
2073 | 731 | return newError(rc, cerr) | 722 | return zkError(rc, cerr) |
2074 | 732 | } | 723 | } |
2075 | 733 | 724 | ||
2076 | 734 | C.wait_for_completion(data) | 725 | C.wait_for_completion(data) |
2077 | 735 | 726 | ||
2078 | 736 | rc = C.int(uintptr(data.data)) | 727 | rc = C.int(uintptr(data.data)) |
2084 | 737 | if rc != C.ZOK { | 728 | return zkError(rc, nil) |
2080 | 738 | return newError(rc, nil) | ||
2081 | 739 | } | ||
2082 | 740 | |||
2083 | 741 | return nil | ||
2085 | 742 | } | 729 | } |
2086 | 743 | 730 | ||
2087 | 744 | // ACL returns the access control list for path. | 731 | // ACL returns the access control list for path. |
2089 | 745 | func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) { | 732 | func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) { |
2090 | 746 | 733 | ||
2091 | 747 | cpath := C.CString(path) | 734 | cpath := C.CString(path) |
2092 | 748 | defer C.free(unsafe.Pointer(cpath)) | 735 | defer C.free(unsafe.Pointer(cpath)) |
2093 | 749 | 736 | ||
2094 | 750 | caclv := C.struct_ACL_vector{} | 737 | caclv := C.struct_ACL_vector{} |
2095 | 751 | cstat := C.struct_Stat{} | ||
2096 | 752 | 738 | ||
2098 | 753 | rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat) | 739 | var cstat Stat |
2099 | 740 | rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) | ||
2100 | 754 | if rc != C.ZOK { | 741 | if rc != C.ZOK { |
2102 | 755 | return nil, nil, newError(rc, cerr) | 742 | return nil, nil, zkError(rc, cerr) |
2103 | 756 | } | 743 | } |
2104 | 757 | 744 | ||
2105 | 758 | aclv := parseACLVector(&caclv) | 745 | aclv := parseACLVector(&caclv) |
2106 | 759 | 746 | ||
2108 | 760 | return aclv, (*resultStat)(&cstat), nil | 747 | return aclv, &cstat, nil |
2109 | 761 | } | 748 | } |
2110 | 762 | 749 | ||
2111 | 763 | // SetACL changes the access control list for path. | 750 | // SetACL changes the access control list for path. |
2113 | 764 | func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error { | 751 | func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error { |
2114 | 765 | 752 | ||
2115 | 766 | cpath := C.CString(path) | 753 | cpath := C.CString(path) |
2116 | 767 | defer C.free(unsafe.Pointer(cpath)) | 754 | defer C.free(unsafe.Pointer(cpath)) |
2117 | @@ -769,12 +756,8 @@ | |||
2118 | 769 | caclv := buildACLVector(aclv) | 756 | caclv := buildACLVector(aclv) |
2119 | 770 | defer C.deallocate_ACL_vector(caclv) | 757 | defer C.deallocate_ACL_vector(caclv) |
2120 | 771 | 758 | ||
2127 | 772 | rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv) | 759 | rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) |
2128 | 773 | if rc != C.ZOK { | 760 | return zkError(rc, cerr) |
2123 | 774 | return newError(rc, cerr) | ||
2124 | 775 | } | ||
2125 | 776 | |||
2126 | 777 | return nil | ||
2129 | 778 | } | 761 | } |
2130 | 779 | 762 | ||
2131 | 780 | func parseACLVector(caclv *C.struct_ACL_vector) []ACL { | 763 | func parseACLVector(caclv *C.struct_ACL_vector) []ACL { |
2132 | @@ -822,7 +805,7 @@ | |||
2133 | 822 | // ----------------------------------------------------------------------- | 805 | // ----------------------------------------------------------------------- |
2134 | 823 | // RetryChange utility method. | 806 | // RetryChange utility method. |
2135 | 824 | 807 | ||
2137 | 825 | type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error) | 808 | type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error) |
2138 | 826 | 809 | ||
2139 | 827 | // RetryChange runs changeFunc to attempt to atomically change path | 810 | // RetryChange runs changeFunc to attempt to atomically change path |
2140 | 828 | // in a lock free manner, and retries in case there was another | 811 | // in a lock free manner, and retries in case there was another |
2141 | @@ -831,8 +814,7 @@ | |||
2142 | 831 | // changeFunc must work correctly if called multiple times in case | 814 | // changeFunc must work correctly if called multiple times in case |
2143 | 832 | // the modification fails due to concurrent changes, and it may return | 815 | // the modification fails due to concurrent changes, and it may return |
2144 | 833 | // an error that will cause the the RetryChange function to stop and | 816 | // an error that will cause the the RetryChange function to stop and |
2147 | 834 | // return an Error with code ZSYSTEMERROR and the same .String() result | 817 | // return the same error. |
2146 | 835 | // as the provided error. | ||
2148 | 836 | // | 818 | // |
2149 | 837 | // This mechanism is not suitable for a node that is frequently modified | 819 | // This mechanism is not suitable for a node that is frequently modified |
2150 | 838 | // concurrently. For those cases, consider using a pessimistic locking | 820 | // concurrently. For those cases, consider using a pessimistic locking |
2151 | @@ -845,8 +827,7 @@ | |||
2152 | 845 | // | 827 | // |
2153 | 846 | // 2. Call the changeFunc with the current node value and stat, | 828 | // 2. Call the changeFunc with the current node value and stat, |
2154 | 847 | // or with an empty string and nil stat, if the node doesn't yet exist. | 829 | // or with an empty string and nil stat, if the node doesn't yet exist. |
2157 | 848 | // If the changeFunc returns an error, stop and return an Error with | 830 | // If the changeFunc returns an error, stop and return the same error. |
2156 | 849 | // ZSYSTEMERROR Code() and the same String() as the changeFunc error. | ||
2158 | 850 | // | 831 | // |
2159 | 851 | // 3. If the changeFunc returns no errors, use the string returned as | 832 | // 3. If the changeFunc returns no errors, use the string returned as |
2160 | 852 | // the new candidate value for the node, and attempt to either create | 833 | // the new candidate value for the node, and attempt to either create |
2161 | @@ -855,36 +836,32 @@ | |||
2162 | 855 | // in the same node), repeat from step 1. If this procedure fails with any | 836 | // in the same node), repeat from step 1. If this procedure fails with any |
2163 | 856 | // other error, stop and return the error found. | 837 | // other error, stop and return the error found. |
2164 | 857 | // | 838 | // |
2166 | 858 | func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) { | 839 | func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error { |
2167 | 859 | for { | 840 | for { |
2180 | 860 | oldValue, oldStat, getErr := zk.Get(path) | 841 | oldValue, oldStat, err := conn.Get(path) |
2181 | 861 | if getErr != nil && getErr.Code() != ZNONODE { | 842 | if err != nil && err != ZNONODE { |
2182 | 862 | err = getErr | 843 | return err |
2183 | 863 | break | 844 | } |
2184 | 864 | } | 845 | newValue, err := changeFunc(oldValue, oldStat) |
2185 | 865 | newValue, osErr := changeFunc(oldValue, oldStat) | 846 | if err != nil { |
2186 | 866 | if osErr != nil { | 847 | return err |
2187 | 867 | return newError(ZSYSTEMERROR, osErr) | 848 | } |
2188 | 868 | } else if oldStat == nil { | 849 | if oldStat == nil { |
2189 | 869 | _, err = zk.Create(path, newValue, flags, acl) | 850 | _, err := conn.Create(path, newValue, flags, acl) |
2190 | 870 | if err == nil || err.Code() != ZNODEEXISTS { | 851 | if err == nil || err != ZNODEEXISTS { |
2191 | 871 | break | 852 | return err |
2192 | 872 | } | 853 | } |
2194 | 873 | } else if newValue == oldValue { | 854 | continue |
2195 | 855 | } | ||
2196 | 856 | if newValue == oldValue { | ||
2197 | 874 | return nil // Nothing to do. | 857 | return nil // Nothing to do. |
2208 | 875 | } else { | 858 | } |
2209 | 876 | _, err = zk.Set(path, newValue, oldStat.Version()) | 859 | _, err = conn.Set(path, newValue, oldStat.Version()) |
2210 | 877 | if err == nil { | 860 | if err == nil || (err != ZBADVERSION && err != ZNONODE) { |
2211 | 878 | break | 861 | return err |
2202 | 879 | } else { | ||
2203 | 880 | code := err.Code() | ||
2204 | 881 | if code != ZBADVERSION && code != ZNONODE { | ||
2205 | 882 | break | ||
2206 | 883 | } | ||
2207 | 884 | } | ||
2212 | 885 | } | 862 | } |
2213 | 886 | } | 863 | } |
2215 | 887 | return err | 864 | panic("not reached") |
2216 | 888 | } | 865 | } |
2217 | 889 | 866 | ||
2218 | 890 | // ----------------------------------------------------------------------- | 867 | // ----------------------------------------------------------------------- |
2219 | @@ -897,7 +874,7 @@ | |||
2220 | 897 | // Whenever a *W method is called, it will return a channel which | 874 | // Whenever a *W method is called, it will return a channel which |
2221 | 898 | // outputs Event values. Internally, a map is used to maintain references | 875 | // outputs Event values. Internally, a map is used to maintain references |
2222 | 899 | // between an unique integer key (the watchId), and the event channel. The | 876 | // between an unique integer key (the watchId), and the event channel. The |
2224 | 900 | // watchId is then handed to the C zookeeper library as the watch context, | 877 | // watchId is then handed to the C ZooKeeper library as the watch context, |
2225 | 901 | // so that we get it back when events happen. Using an integer key as the | 878 | // so that we get it back when events happen. Using an integer key as the |
2226 | 902 | // watch context rather than a pointer is needed because there's no guarantee | 879 | // watch context rather than a pointer is needed because there's no guarantee |
2227 | 903 | // that in the future the GC will not move objects around, and also because | 880 | // that in the future the GC will not move objects around, and also because |
2228 | @@ -910,13 +887,13 @@ | |||
2229 | 910 | // Since Cgo doesn't allow calling back into Go, we actually fire a new | 887 | // Since Cgo doesn't allow calling back into Go, we actually fire a new |
2230 | 911 | // goroutine the very first time Init is called, and allow it to block | 888 | // goroutine the very first time Init is called, and allow it to block |
2231 | 912 | // in a pthread condition variable within a C function. This condition | 889 | // in a pthread condition variable within a C function. This condition |
2233 | 913 | // will only be notified once a zookeeper watch callback appends new | 890 | // will only be notified once a ZooKeeper watch callback appends new |
2234 | 914 | // entries to the event list. When this happens, the C function returns | 891 | // entries to the event list. When this happens, the C function returns |
2235 | 915 | // and we get back into Go land with the pointer to the watch data, | 892 | // and we get back into Go land with the pointer to the watch data, |
2236 | 916 | // including the watchId and other event details such as type and path. | 893 | // including the watchId and other event details such as type and path. |
2237 | 917 | 894 | ||
2238 | 918 | var watchMutex sync.Mutex | 895 | var watchMutex sync.Mutex |
2240 | 919 | var watchZooKeepers = make(map[uintptr]*ZooKeeper) | 896 | var watchConns = make(map[uintptr]*Conn) |
2241 | 920 | var watchCounter uintptr | 897 | var watchCounter uintptr |
2242 | 921 | var watchLoopCounter int | 898 | var watchLoopCounter int |
2243 | 922 | 899 | ||
2244 | @@ -925,14 +902,14 @@ | |||
2245 | 925 | // mostly as a debugging and testing aid. | 902 | // mostly as a debugging and testing aid. |
2246 | 926 | func CountPendingWatches() int { | 903 | func CountPendingWatches() int { |
2247 | 927 | watchMutex.Lock() | 904 | watchMutex.Lock() |
2249 | 928 | count := len(watchZooKeepers) | 905 | count := len(watchConns) |
2250 | 929 | watchMutex.Unlock() | 906 | watchMutex.Unlock() |
2251 | 930 | return count | 907 | return count |
2252 | 931 | } | 908 | } |
2253 | 932 | 909 | ||
2254 | 933 | // createWatch creates and registers a watch, returning the watch id | 910 | // createWatch creates and registers a watch, returning the watch id |
2255 | 934 | // and channel. | 911 | // and channel. |
2257 | 935 | func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { | 912 | func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
2258 | 936 | buf := 1 // session/watch event | 913 | buf := 1 // session/watch event |
2259 | 937 | if session { | 914 | if session { |
2260 | 938 | buf = 32 | 915 | buf = 32 |
2261 | @@ -942,8 +919,8 @@ | |||
2262 | 942 | defer watchMutex.Unlock() | 919 | defer watchMutex.Unlock() |
2263 | 943 | watchId = watchCounter | 920 | watchId = watchCounter |
2264 | 944 | watchCounter += 1 | 921 | watchCounter += 1 |
2267 | 945 | zk.watchChannels[watchId] = watchChannel | 922 | conn.watchChannels[watchId] = watchChannel |
2268 | 946 | watchZooKeepers[watchId] = zk | 923 | watchConns[watchId] = conn |
2269 | 947 | return | 924 | return |
2270 | 948 | } | 925 | } |
2271 | 949 | 926 | ||
2272 | @@ -951,21 +928,21 @@ | |||
2273 | 951 | // from ever getting delivered. It shouldn't be used if there's any | 928 | // from ever getting delivered. It shouldn't be used if there's any |
2274 | 952 | // chance the watch channel is still visible and not closed, since | 929 | // chance the watch channel is still visible and not closed, since |
2275 | 953 | // it might mean a goroutine would be blocked forever. | 930 | // it might mean a goroutine would be blocked forever. |
2277 | 954 | func (zk *ZooKeeper) forgetWatch(watchId uintptr) { | 931 | func (conn *Conn) forgetWatch(watchId uintptr) { |
2278 | 955 | watchMutex.Lock() | 932 | watchMutex.Lock() |
2279 | 956 | defer watchMutex.Unlock() | 933 | defer watchMutex.Unlock() |
2282 | 957 | zk.watchChannels[watchId] = nil, false | 934 | conn.watchChannels[watchId] = nil, false |
2283 | 958 | watchZooKeepers[watchId] = nil, false | 935 | watchConns[watchId] = nil, false |
2284 | 959 | } | 936 | } |
2285 | 960 | 937 | ||
2288 | 961 | // closeAllWatches closes all watch channels for zk. | 938 | // closeAllWatches closes all watch channels for conn. |
2289 | 962 | func (zk *ZooKeeper) closeAllWatches() { | 939 | func (conn *Conn) closeAllWatches() { |
2290 | 963 | watchMutex.Lock() | 940 | watchMutex.Lock() |
2291 | 964 | defer watchMutex.Unlock() | 941 | defer watchMutex.Unlock() |
2293 | 965 | for watchId, ch := range zk.watchChannels { | 942 | for watchId, ch := range conn.watchChannels { |
2294 | 966 | close(ch) | 943 | close(ch) |
2297 | 967 | zk.watchChannels[watchId] = nil, false | 944 | conn.watchChannels[watchId] = nil, false |
2298 | 968 | watchZooKeepers[watchId] = nil, false | 945 | watchConns[watchId] = nil, false |
2299 | 969 | } | 946 | } |
2300 | 970 | } | 947 | } |
2301 | 971 | 948 | ||
2302 | @@ -978,11 +955,11 @@ | |||
2303 | 978 | } | 955 | } |
2304 | 979 | watchMutex.Lock() | 956 | watchMutex.Lock() |
2305 | 980 | defer watchMutex.Unlock() | 957 | defer watchMutex.Unlock() |
2307 | 981 | zk, ok := watchZooKeepers[watchId] | 958 | conn, ok := watchConns[watchId] |
2308 | 982 | if !ok { | 959 | if !ok { |
2309 | 983 | return | 960 | return |
2310 | 984 | } | 961 | } |
2312 | 985 | if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId { | 962 | if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId { |
2313 | 986 | switch event.State { | 963 | switch event.State { |
2314 | 987 | case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED: | 964 | case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED: |
2315 | 988 | default: | 965 | default: |
2316 | @@ -990,7 +967,7 @@ | |||
2317 | 990 | return | 967 | return |
2318 | 991 | } | 968 | } |
2319 | 992 | } | 969 | } |
2321 | 993 | ch := zk.watchChannels[watchId] | 970 | ch := conn.watchChannels[watchId] |
2322 | 994 | if ch == nil { | 971 | if ch == nil { |
2323 | 995 | return | 972 | return |
2324 | 996 | } | 973 | } |
2325 | @@ -1002,15 +979,15 @@ | |||
2326 | 1002 | // straight to the buffer), and the application isn't paying | 979 | // straight to the buffer), and the application isn't paying |
2327 | 1003 | // attention for long enough to have the buffer filled up. | 980 | // attention for long enough to have the buffer filled up. |
2328 | 1004 | // Break down now rather than leaking forever. | 981 | // Break down now rather than leaking forever. |
2330 | 1005 | if watchId == zk.sessionWatchId { | 982 | if watchId == conn.sessionWatchId { |
2331 | 1006 | panic("Session event channel buffer is full") | 983 | panic("Session event channel buffer is full") |
2332 | 1007 | } else { | 984 | } else { |
2333 | 1008 | panic("Watch event channel buffer is full") | 985 | panic("Watch event channel buffer is full") |
2334 | 1009 | } | 986 | } |
2335 | 1010 | } | 987 | } |
2339 | 1011 | if watchId != zk.sessionWatchId { | 988 | if watchId != conn.sessionWatchId { |
2340 | 1012 | zk.watchChannels[watchId] = nil, false | 989 | conn.watchChannels[watchId] = nil, false |
2341 | 1013 | watchZooKeepers[watchId] = nil, false | 990 | watchConns[watchId] = nil, false |
2342 | 1014 | close(ch) | 991 | close(ch) |
2343 | 1015 | } | 992 | } |
2344 | 1016 | } | 993 | } |
2345 | 1017 | 994 | ||
2346 | === renamed file 'gozk_test.go' => 'zk_test.go' | |||
2347 | --- gozk_test.go 2011-08-19 01:51:37 +0000 | |||
2348 | +++ zk_test.go 2011-10-03 17:02:23 +0000 | |||
2349 | @@ -1,17 +1,17 @@ | |||
2351 | 1 | package gozk_test | 1 | package zk_test |
2352 | 2 | 2 | ||
2353 | 3 | import ( | 3 | import ( |
2354 | 4 | . "launchpad.net/gocheck" | 4 | . "launchpad.net/gocheck" |
2356 | 5 | "gozk" | 5 | "launchpad.net/gozk/zk" |
2357 | 6 | "time" | 6 | "time" |
2358 | 7 | ) | 7 | ) |
2359 | 8 | 8 | ||
2360 | 9 | // This error will be delivered via C errno, since ZK unfortunately | 9 | // This error will be delivered via C errno, since ZK unfortunately |
2361 | 10 | // only provides the handler back from zookeeper_init(). | 10 | // only provides the handler back from zookeeper_init(). |
2362 | 11 | func (s *S) TestInitErrorThroughErrno(c *C) { | 11 | func (s *S) TestInitErrorThroughErrno(c *C) { |
2366 | 12 | zk, watch, err := gozk.Init("bad-domain-without-port", 5e9) | 12 | conn, watch, err := zk.Dial("bad-domain-without-port", 5e9) |
2367 | 13 | if zk != nil { | 13 | if conn != nil { |
2368 | 14 | zk.Close() | 14 | conn.Close() |
2369 | 15 | } | 15 | } |
2370 | 16 | if watch != nil { | 16 | if watch != nil { |
2371 | 17 | go func() { | 17 | go func() { |
2372 | @@ -23,15 +23,15 @@ | |||
2373 | 23 | } | 23 | } |
2374 | 24 | }() | 24 | }() |
2375 | 25 | } | 25 | } |
2377 | 26 | c.Assert(zk, IsNil) | 26 | c.Assert(conn, IsNil) |
2378 | 27 | c.Assert(watch, IsNil) | 27 | c.Assert(watch, IsNil) |
2379 | 28 | c.Assert(err, Matches, "invalid argument") | 28 | c.Assert(err, Matches, "invalid argument") |
2380 | 29 | } | 29 | } |
2381 | 30 | 30 | ||
2382 | 31 | func (s *S) TestRecvTimeoutInitParameter(c *C) { | 31 | func (s *S) TestRecvTimeoutInitParameter(c *C) { |
2384 | 32 | zk, watch, err := gozk.Init(s.zkAddr, 0) | 32 | conn, watch, err := zk.Dial(s.zkAddr, 0) |
2385 | 33 | c.Assert(err, IsNil) | 33 | c.Assert(err, IsNil) |
2387 | 34 | defer zk.Close() | 34 | defer conn.Close() |
2388 | 35 | 35 | ||
2389 | 36 | select { | 36 | select { |
2390 | 37 | case <-watch: | 37 | case <-watch: |
2391 | @@ -40,7 +40,7 @@ | |||
2392 | 40 | } | 40 | } |
2393 | 41 | 41 | ||
2394 | 42 | for i := 0; i != 1000; i++ { | 42 | for i := 0; i != 1000; i++ { |
2396 | 43 | _, _, err := zk.Get("/zookeeper") | 43 | _, _, err := conn.Get("/zookeeper") |
2397 | 44 | if err != nil { | 44 | if err != nil { |
2398 | 45 | c.Assert(err, Matches, "operation timeout") | 45 | c.Assert(err, Matches, "operation timeout") |
2399 | 46 | c.SucceedNow() | 46 | c.SucceedNow() |
2400 | @@ -51,76 +51,79 @@ | |||
2401 | 51 | } | 51 | } |
2402 | 52 | 52 | ||
2403 | 53 | func (s *S) TestSessionWatches(c *C) { | 53 | func (s *S) TestSessionWatches(c *C) { |
2405 | 54 | c.Assert(gozk.CountPendingWatches(), Equals, 0) | 54 | c.Assert(zk.CountPendingWatches(), Equals, 0) |
2406 | 55 | 55 | ||
2407 | 56 | zk1, watch1 := s.init(c) | 56 | zk1, watch1 := s.init(c) |
2408 | 57 | zk2, watch2 := s.init(c) | 57 | zk2, watch2 := s.init(c) |
2409 | 58 | zk3, watch3 := s.init(c) | 58 | zk3, watch3 := s.init(c) |
2410 | 59 | 59 | ||
2412 | 60 | c.Assert(gozk.CountPendingWatches(), Equals, 3) | 60 | c.Assert(zk.CountPendingWatches(), Equals, 3) |
2413 | 61 | 61 | ||
2414 | 62 | event1 := <-watch1 | 62 | event1 := <-watch1 |
2417 | 63 | c.Assert(event1.Type, Equals, gozk.EVENT_SESSION) | 63 | c.Assert(event1.Type, Equals, zk.EVENT_SESSION) |
2418 | 64 | c.Assert(event1.State, Equals, gozk.STATE_CONNECTED) | 64 | c.Assert(event1.State, Equals, zk.STATE_CONNECTED) |
2419 | 65 | 65 | ||
2421 | 66 | c.Assert(gozk.CountPendingWatches(), Equals, 3) | 66 | c.Assert(zk.CountPendingWatches(), Equals, 3) |
2422 | 67 | 67 | ||
2423 | 68 | event2 := <-watch2 | 68 | event2 := <-watch2 |
2426 | 69 | c.Assert(event2.Type, Equals, gozk.EVENT_SESSION) | 69 | c.Assert(event2.Type, Equals, zk.EVENT_SESSION) |
2427 | 70 | c.Assert(event2.State, Equals, gozk.STATE_CONNECTED) | 70 | c.Assert(event2.State, Equals, zk.STATE_CONNECTED) |
2428 | 71 | 71 | ||
2430 | 72 | c.Assert(gozk.CountPendingWatches(), Equals, 3) | 72 | c.Assert(zk.CountPendingWatches(), Equals, 3) |
2431 | 73 | 73 | ||
2432 | 74 | event3 := <-watch3 | 74 | event3 := <-watch3 |
2435 | 75 | c.Assert(event3.Type, Equals, gozk.EVENT_SESSION) | 75 | c.Assert(event3.Type, Equals, zk.EVENT_SESSION) |
2436 | 76 | c.Assert(event3.State, Equals, gozk.STATE_CONNECTED) | 76 | c.Assert(event3.State, Equals, zk.STATE_CONNECTED) |
2437 | 77 | 77 | ||
2439 | 78 | c.Assert(gozk.CountPendingWatches(), Equals, 3) | 78 | c.Assert(zk.CountPendingWatches(), Equals, 3) |
2440 | 79 | 79 | ||
2441 | 80 | zk1.Close() | 80 | zk1.Close() |
2443 | 81 | c.Assert(gozk.CountPendingWatches(), Equals, 2) | 81 | c.Assert(zk.CountPendingWatches(), Equals, 2) |
2444 | 82 | zk2.Close() | 82 | zk2.Close() |
2446 | 83 | c.Assert(gozk.CountPendingWatches(), Equals, 1) | 83 | c.Assert(zk.CountPendingWatches(), Equals, 1) |
2447 | 84 | zk3.Close() | 84 | zk3.Close() |
2449 | 85 | c.Assert(gozk.CountPendingWatches(), Equals, 0) | 85 | c.Assert(zk.CountPendingWatches(), Equals, 0) |
2450 | 86 | } | 86 | } |
2451 | 87 | 87 | ||
2453 | 88 | // Gozk injects a STATE_CLOSED event when zk.Close() is called, right | 88 | // Gozk injects a STATE_CLOSED event when conn.Close() is called, right |
2454 | 89 | // before the channel is closed. Closing the channel injects a nil | 89 | // before the channel is closed. Closing the channel injects a nil |
2455 | 90 | // pointer, as usual for Go, so the STATE_CLOSED gives a chance to | 90 | // pointer, as usual for Go, so the STATE_CLOSED gives a chance to |
2456 | 91 | // know that a nil pointer is coming, and to stop the procedure. | 91 | // know that a nil pointer is coming, and to stop the procedure. |
2457 | 92 | // Hopefully this procedure will avoid some nil-pointer references by | 92 | // Hopefully this procedure will avoid some nil-pointer references by |
2458 | 93 | // mistake. | 93 | // mistake. |
2459 | 94 | func (s *S) TestClosingStateInSessionWatch(c *C) { | 94 | func (s *S) TestClosingStateInSessionWatch(c *C) { |
2461 | 95 | zk, watch := s.init(c) | 95 | conn, watch := s.init(c) |
2462 | 96 | 96 | ||
2463 | 97 | event := <-watch | 97 | event := <-watch |
2466 | 98 | c.Assert(event.Type, Equals, gozk.EVENT_SESSION) | 98 | c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
2467 | 99 | c.Assert(event.State, Equals, gozk.STATE_CONNECTED) | 99 | c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
2468 | 100 | 100 | ||
2470 | 101 | zk.Close() | 101 | conn.Close() |
2471 | 102 | event, ok := <-watch | 102 | event, ok := <-watch |
2472 | 103 | c.Assert(ok, Equals, false) | 103 | c.Assert(ok, Equals, false) |
2475 | 104 | c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) | 104 | c.Assert(event.Type, Equals, zk.EVENT_CLOSED) |
2476 | 105 | c.Assert(event.State, Equals, gozk.STATE_CLOSED) | 105 | c.Assert(event.State, Equals, zk.STATE_CLOSED) |
2477 | 106 | } | 106 | } |
2478 | 107 | 107 | ||
2479 | 108 | func (s *S) TestEventString(c *C) { | 108 | func (s *S) TestEventString(c *C) { |
2482 | 109 | var event gozk.Event | 109 | var event zk.Event |
2483 | 110 | event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED} | 110 | event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED} |
2484 | 111 | c.Assert(event, Matches, "ZooKeeper connected") | 111 | c.Assert(event, Matches, "ZooKeeper connected") |
2486 | 112 | event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED} | 112 | event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED} |
2487 | 113 | c.Assert(event, Matches, "ZooKeeper connected; path created: /path") | 113 | c.Assert(event, Matches, "ZooKeeper connected; path created: /path") |
2489 | 114 | event = gozk.Event{-1, "/path", gozk.STATE_CLOSED} | 114 | event = zk.Event{-1, "/path", zk.STATE_CLOSED} |
2490 | 115 | c.Assert(event, Matches, "ZooKeeper connection closed") | 115 | c.Assert(event, Matches, "ZooKeeper connection closed") |
2491 | 116 | } | 116 | } |
2492 | 117 | 117 | ||
2499 | 118 | var okTests = []struct{gozk.Event; Ok bool}{ | 118 | var okTests = []struct { |
2500 | 119 | {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true}, | 119 | zk.Event |
2501 | 120 | {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true}, | 120 | Ok bool |
2502 | 121 | {gozk.Event{0, "", gozk.STATE_CLOSED}, false}, | 121 | }{ |
2503 | 122 | {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false}, | 122 | {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true}, |
2504 | 123 | {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false}, | 123 | {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true}, |
2505 | 124 | {zk.Event{0, "", zk.STATE_CLOSED}, false}, | ||
2506 | 125 | {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false}, | ||
2507 | 126 | {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false}, | ||
2508 | 124 | } | 127 | } |
2509 | 125 | 128 | ||
2510 | 126 | func (s *S) TestEventOk(c *C) { | 129 | func (s *S) TestEventOk(c *C) { |
2511 | @@ -130,9 +133,9 @@ | |||
2512 | 130 | } | 133 | } |
2513 | 131 | 134 | ||
2514 | 132 | func (s *S) TestGetAndStat(c *C) { | 135 | func (s *S) TestGetAndStat(c *C) { |
2516 | 133 | zk, _ := s.init(c) | 136 | conn, _ := s.init(c) |
2517 | 134 | 137 | ||
2519 | 135 | data, stat, err := zk.Get("/zookeeper") | 138 | data, stat, err := conn.Get("/zookeeper") |
2520 | 136 | c.Assert(err, IsNil) | 139 | c.Assert(err, IsNil) |
2521 | 137 | c.Assert(data, Equals, "") | 140 | c.Assert(data, Equals, "") |
2522 | 138 | c.Assert(stat.Czxid(), Equals, int64(0)) | 141 | c.Assert(stat.Czxid(), Equals, int64(0)) |
2523 | @@ -149,58 +152,58 @@ | |||
2524 | 149 | } | 152 | } |
2525 | 150 | 153 | ||
2526 | 151 | func (s *S) TestGetAndError(c *C) { | 154 | func (s *S) TestGetAndError(c *C) { |
2528 | 152 | zk, _ := s.init(c) | 155 | conn, _ := s.init(c) |
2529 | 153 | 156 | ||
2531 | 154 | data, stat, err := zk.Get("/non-existent") | 157 | data, stat, err := conn.Get("/non-existent") |
2532 | 155 | 158 | ||
2533 | 156 | c.Assert(data, Equals, "") | 159 | c.Assert(data, Equals, "") |
2534 | 157 | c.Assert(stat, IsNil) | 160 | c.Assert(stat, IsNil) |
2535 | 158 | c.Assert(err, Matches, "no node") | 161 | c.Assert(err, Matches, "no node") |
2537 | 159 | c.Assert(err.Code(), Equals, gozk.ZNONODE) | 162 | c.Assert(err, Equals, zk.ZNONODE) |
2538 | 160 | } | 163 | } |
2539 | 161 | 164 | ||
2540 | 162 | func (s *S) TestCreateAndGet(c *C) { | 165 | func (s *S) TestCreateAndGet(c *C) { |
2542 | 163 | zk, _ := s.init(c) | 166 | conn, _ := s.init(c) |
2543 | 164 | 167 | ||
2545 | 165 | path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 168 | path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2546 | 166 | c.Assert(err, IsNil) | 169 | c.Assert(err, IsNil) |
2547 | 167 | c.Assert(path, Matches, "/test-[0-9]+") | 170 | c.Assert(path, Matches, "/test-[0-9]+") |
2548 | 168 | 171 | ||
2549 | 169 | // Check the error condition from Create(). | 172 | // Check the error condition from Create(). |
2551 | 170 | _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 173 | _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2552 | 171 | c.Assert(err, Matches, "node exists") | 174 | c.Assert(err, Matches, "node exists") |
2553 | 172 | 175 | ||
2555 | 173 | data, _, err := zk.Get(path) | 176 | data, _, err := conn.Get(path) |
2556 | 174 | c.Assert(err, IsNil) | 177 | c.Assert(err, IsNil) |
2557 | 175 | c.Assert(data, Equals, "bababum") | 178 | c.Assert(data, Equals, "bababum") |
2558 | 176 | } | 179 | } |
2559 | 177 | 180 | ||
2560 | 178 | func (s *S) TestCreateSetAndGet(c *C) { | 181 | func (s *S) TestCreateSetAndGet(c *C) { |
2562 | 179 | zk, _ := s.init(c) | 182 | conn, _ := s.init(c) |
2563 | 180 | 183 | ||
2565 | 181 | _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 184 | _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2566 | 182 | c.Assert(err, IsNil) | 185 | c.Assert(err, IsNil) |
2567 | 183 | 186 | ||
2569 | 184 | stat, err := zk.Set("/test", "bababum", -1) // Any version. | 187 | stat, err := conn.Set("/test", "bababum", -1) // Any version. |
2570 | 185 | c.Assert(err, IsNil) | 188 | c.Assert(err, IsNil) |
2571 | 186 | c.Assert(stat.Version(), Equals, int32(1)) | 189 | c.Assert(stat.Version(), Equals, int32(1)) |
2572 | 187 | 190 | ||
2574 | 188 | data, _, err := zk.Get("/test") | 191 | data, _, err := conn.Get("/test") |
2575 | 189 | c.Assert(err, IsNil) | 192 | c.Assert(err, IsNil) |
2576 | 190 | c.Assert(data, Equals, "bababum") | 193 | c.Assert(data, Equals, "bababum") |
2577 | 191 | } | 194 | } |
2578 | 192 | 195 | ||
2579 | 193 | func (s *S) TestGetAndWatch(c *C) { | 196 | func (s *S) TestGetAndWatch(c *C) { |
2587 | 194 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 197 | c.Check(zk.CountPendingWatches(), Equals, 0) |
2588 | 195 | 198 | ||
2589 | 196 | zk, _ := s.init(c) | 199 | conn, _ := s.init(c) |
2590 | 197 | 200 | ||
2591 | 198 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 201 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2592 | 199 | 202 | ||
2593 | 200 | _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 203 | _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2594 | 201 | c.Assert(err, IsNil) | 204 | c.Assert(err, IsNil) |
2595 | 202 | 205 | ||
2597 | 203 | data, stat, watch, err := zk.GetW("/test") | 206 | data, stat, watch, err := conn.GetW("/test") |
2598 | 204 | c.Assert(err, IsNil) | 207 | c.Assert(err, IsNil) |
2599 | 205 | c.Assert(data, Equals, "one") | 208 | c.Assert(data, Equals, "one") |
2600 | 206 | c.Assert(stat.Version(), Equals, int32(0)) | 209 | c.Assert(stat.Version(), Equals, int32(0)) |
2601 | @@ -211,17 +214,17 @@ | |||
2602 | 211 | default: | 214 | default: |
2603 | 212 | } | 215 | } |
2604 | 213 | 216 | ||
2606 | 214 | c.Check(gozk.CountPendingWatches(), Equals, 2) | 217 | c.Check(zk.CountPendingWatches(), Equals, 2) |
2607 | 215 | 218 | ||
2609 | 216 | _, err = zk.Set("/test", "two", -1) | 219 | _, err = conn.Set("/test", "two", -1) |
2610 | 217 | c.Assert(err, IsNil) | 220 | c.Assert(err, IsNil) |
2611 | 218 | 221 | ||
2612 | 219 | event := <-watch | 222 | event := <-watch |
2618 | 220 | c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) | 223 | c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2619 | 221 | 224 | ||
2620 | 222 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 225 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2621 | 223 | 226 | ||
2622 | 224 | data, _, watch, err = zk.GetW("/test") | 227 | data, _, watch, err = conn.GetW("/test") |
2623 | 225 | c.Assert(err, IsNil) | 228 | c.Assert(err, IsNil) |
2624 | 226 | c.Assert(data, Equals, "two") | 229 | c.Assert(data, Equals, "two") |
2625 | 227 | 230 | ||
2626 | @@ -231,86 +234,86 @@ | |||
2627 | 231 | default: | 234 | default: |
2628 | 232 | } | 235 | } |
2629 | 233 | 236 | ||
2631 | 234 | c.Check(gozk.CountPendingWatches(), Equals, 2) | 237 | c.Check(zk.CountPendingWatches(), Equals, 2) |
2632 | 235 | 238 | ||
2634 | 236 | _, err = zk.Set("/test", "three", -1) | 239 | _, err = conn.Set("/test", "three", -1) |
2635 | 237 | c.Assert(err, IsNil) | 240 | c.Assert(err, IsNil) |
2636 | 238 | 241 | ||
2637 | 239 | event = <-watch | 242 | event = <-watch |
2639 | 240 | c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) | 243 | c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2640 | 241 | 244 | ||
2642 | 242 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 245 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2643 | 243 | } | 246 | } |
2644 | 244 | 247 | ||
2645 | 245 | func (s *S) TestGetAndWatchWithError(c *C) { | 248 | func (s *S) TestGetAndWatchWithError(c *C) { |
2653 | 246 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 249 | c.Check(zk.CountPendingWatches(), Equals, 0) |
2654 | 247 | 250 | ||
2655 | 248 | zk, _ := s.init(c) | 251 | conn, _ := s.init(c) |
2656 | 249 | 252 | ||
2657 | 250 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 253 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2658 | 251 | 254 | ||
2659 | 252 | _, _, watch, err := zk.GetW("/test") | 255 | _, _, watch, err := conn.GetW("/test") |
2660 | 253 | c.Assert(err, NotNil) | 256 | c.Assert(err, NotNil) |
2662 | 254 | c.Assert(err.Code(), Equals, gozk.ZNONODE) | 257 | c.Assert(err, Equals, zk.ZNONODE) |
2663 | 255 | c.Assert(watch, IsNil) | 258 | c.Assert(watch, IsNil) |
2664 | 256 | 259 | ||
2666 | 257 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 260 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2667 | 258 | } | 261 | } |
2668 | 259 | 262 | ||
2669 | 260 | func (s *S) TestCloseReleasesWatches(c *C) { | 263 | func (s *S) TestCloseReleasesWatches(c *C) { |
2687 | 261 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 264 | c.Check(zk.CountPendingWatches(), Equals, 0) |
2688 | 262 | 265 | ||
2689 | 263 | zk, _ := s.init(c) | 266 | conn, _ := s.init(c) |
2690 | 264 | 267 | ||
2691 | 265 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 268 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2692 | 266 | 269 | ||
2693 | 267 | _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 270 | _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2694 | 268 | c.Assert(err, IsNil) | 271 | c.Assert(err, IsNil) |
2695 | 269 | 272 | ||
2696 | 270 | _, _, _, err = zk.GetW("/test") | 273 | _, _, _, err = conn.GetW("/test") |
2697 | 271 | c.Assert(err, IsNil) | 274 | c.Assert(err, IsNil) |
2698 | 272 | 275 | ||
2699 | 273 | c.Assert(gozk.CountPendingWatches(), Equals, 2) | 276 | c.Assert(zk.CountPendingWatches(), Equals, 2) |
2700 | 274 | 277 | ||
2701 | 275 | zk.Close() | 278 | conn.Close() |
2702 | 276 | 279 | ||
2703 | 277 | c.Assert(gozk.CountPendingWatches(), Equals, 0) | 280 | c.Assert(zk.CountPendingWatches(), Equals, 0) |
2704 | 278 | } | 281 | } |
2705 | 279 | 282 | ||
2706 | 280 | // By default, the ZooKeeper C client will hang indefinitely if a | 283 | // By default, the ZooKeeper C client will hang indefinitely if a |
2707 | 281 | // handler is closed twice. We get in the way and prevent it. | 284 | // handler is closed twice. We get in the way and prevent it. |
2708 | 282 | func (s *S) TestClosingTwiceDoesntHang(c *C) { | 285 | func (s *S) TestClosingTwiceDoesntHang(c *C) { |
2711 | 283 | zk, _ := s.init(c) | 286 | conn, _ := s.init(c) |
2712 | 284 | err := zk.Close() | 287 | err := conn.Close() |
2713 | 285 | c.Assert(err, IsNil) | 288 | c.Assert(err, IsNil) |
2715 | 286 | err = zk.Close() | 289 | err = conn.Close() |
2716 | 287 | c.Assert(err, NotNil) | 290 | c.Assert(err, NotNil) |
2718 | 288 | c.Assert(err.Code(), Equals, gozk.ZCLOSING) | 291 | c.Assert(err, Equals, zk.ZCLOSING) |
2719 | 289 | } | 292 | } |
2720 | 290 | 293 | ||
2721 | 291 | func (s *S) TestChildren(c *C) { | 294 | func (s *S) TestChildren(c *C) { |
2723 | 292 | zk, _ := s.init(c) | 295 | conn, _ := s.init(c) |
2724 | 293 | 296 | ||
2726 | 294 | children, stat, err := zk.Children("/") | 297 | children, stat, err := conn.Children("/") |
2727 | 295 | c.Assert(err, IsNil) | 298 | c.Assert(err, IsNil) |
2728 | 296 | c.Assert(children, Equals, []string{"zookeeper"}) | 299 | c.Assert(children, Equals, []string{"zookeeper"}) |
2729 | 297 | c.Assert(stat.NumChildren(), Equals, int32(1)) | 300 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
2730 | 298 | 301 | ||
2732 | 299 | children, stat, err = zk.Children("/non-existent") | 302 | children, stat, err = conn.Children("/non-existent") |
2733 | 300 | c.Assert(err, NotNil) | 303 | c.Assert(err, NotNil) |
2735 | 301 | c.Assert(err.Code(), Equals, gozk.ZNONODE) | 304 | c.Assert(err, Equals, zk.ZNONODE) |
2736 | 302 | c.Assert(children, Equals, []string{}) | 305 | c.Assert(children, Equals, []string{}) |
2738 | 303 | c.Assert(stat, Equals, nil) | 306 | c.Assert(stat, IsNil) |
2739 | 304 | } | 307 | } |
2740 | 305 | 308 | ||
2741 | 306 | func (s *S) TestChildrenAndWatch(c *C) { | 309 | func (s *S) TestChildrenAndWatch(c *C) { |
2749 | 307 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 310 | c.Check(zk.CountPendingWatches(), Equals, 0) |
2750 | 308 | 311 | ||
2751 | 309 | zk, _ := s.init(c) | 312 | conn, _ := s.init(c) |
2752 | 310 | 313 | ||
2753 | 311 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 314 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2754 | 312 | 315 | ||
2755 | 313 | children, stat, watch, err := zk.ChildrenW("/") | 316 | children, stat, watch, err := conn.ChildrenW("/") |
2756 | 314 | c.Assert(err, IsNil) | 317 | c.Assert(err, IsNil) |
2757 | 315 | c.Assert(children, Equals, []string{"zookeeper"}) | 318 | c.Assert(children, Equals, []string{"zookeeper"}) |
2758 | 316 | c.Assert(stat.NumChildren(), Equals, int32(1)) | 319 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
2759 | @@ -321,18 +324,18 @@ | |||
2760 | 321 | default: | 324 | default: |
2761 | 322 | } | 325 | } |
2762 | 323 | 326 | ||
2764 | 324 | c.Check(gozk.CountPendingWatches(), Equals, 2) | 327 | c.Check(zk.CountPendingWatches(), Equals, 2) |
2765 | 325 | 328 | ||
2767 | 326 | _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 329 | _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2768 | 327 | c.Assert(err, IsNil) | 330 | c.Assert(err, IsNil) |
2769 | 328 | 331 | ||
2770 | 329 | event := <-watch | 332 | event := <-watch |
2772 | 330 | c.Assert(event.Type, Equals, gozk.EVENT_CHILD) | 333 | c.Assert(event.Type, Equals, zk.EVENT_CHILD) |
2773 | 331 | c.Assert(event.Path, Equals, "/") | 334 | c.Assert(event.Path, Equals, "/") |
2774 | 332 | 335 | ||
2776 | 333 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 336 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2777 | 334 | 337 | ||
2779 | 335 | children, stat, watch, err = zk.ChildrenW("/") | 338 | children, stat, watch, err = conn.ChildrenW("/") |
2780 | 336 | c.Assert(err, IsNil) | 339 | c.Assert(err, IsNil) |
2781 | 337 | c.Assert(stat.NumChildren(), Equals, int32(2)) | 340 | c.Assert(stat.NumChildren(), Equals, int32(2)) |
2782 | 338 | 341 | ||
2783 | @@ -345,57 +348,56 @@ | |||
2784 | 345 | default: | 348 | default: |
2785 | 346 | } | 349 | } |
2786 | 347 | 350 | ||
2788 | 348 | c.Check(gozk.CountPendingWatches(), Equals, 2) | 351 | c.Check(zk.CountPendingWatches(), Equals, 2) |
2789 | 349 | 352 | ||
2791 | 350 | _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 353 | _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2792 | 351 | c.Assert(err, IsNil) | 354 | c.Assert(err, IsNil) |
2793 | 352 | 355 | ||
2794 | 353 | event = <-watch | 356 | event = <-watch |
2796 | 354 | c.Assert(event.Type, Equals, gozk.EVENT_CHILD) | 357 | c.Assert(event.Type, Equals, zk.EVENT_CHILD) |
2797 | 355 | 358 | ||
2799 | 356 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 359 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2800 | 357 | } | 360 | } |
2801 | 358 | 361 | ||
2802 | 359 | func (s *S) TestChildrenAndWatchWithError(c *C) { | 362 | func (s *S) TestChildrenAndWatchWithError(c *C) { |
2810 | 360 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 363 | c.Check(zk.CountPendingWatches(), Equals, 0) |
2811 | 361 | 364 | ||
2812 | 362 | zk, _ := s.init(c) | 365 | conn, _ := s.init(c) |
2813 | 363 | 366 | ||
2814 | 364 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 367 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2815 | 365 | 368 | ||
2816 | 366 | _, stat, watch, err := zk.ChildrenW("/test") | 369 | _, stat, watch, err := conn.ChildrenW("/test") |
2817 | 367 | c.Assert(err, NotNil) | 370 | c.Assert(err, NotNil) |
2819 | 368 | c.Assert(err.Code(), Equals, gozk.ZNONODE) | 371 | c.Assert(err, Equals, zk.ZNONODE) |
2820 | 369 | c.Assert(watch, IsNil) | 372 | c.Assert(watch, IsNil) |
2821 | 370 | c.Assert(stat, IsNil) | 373 | c.Assert(stat, IsNil) |
2822 | 371 | 374 | ||
2824 | 372 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 375 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2825 | 373 | } | 376 | } |
2826 | 374 | 377 | ||
2827 | 375 | func (s *S) TestExists(c *C) { | 378 | func (s *S) TestExists(c *C) { |
2835 | 376 | zk, _ := s.init(c) | 379 | conn, _ := s.init(c) |
2836 | 377 | 380 | ||
2837 | 378 | stat, err := zk.Exists("/zookeeper") | 381 | stat, err := conn.Exists("/non-existent") |
2831 | 379 | c.Assert(err, IsNil) | ||
2832 | 380 | c.Assert(stat.NumChildren(), Equals, int32(1)) | ||
2833 | 381 | |||
2834 | 382 | stat, err = zk.Exists("/non-existent") | ||
2838 | 383 | c.Assert(err, IsNil) | 382 | c.Assert(err, IsNil) |
2839 | 384 | c.Assert(stat, IsNil) | 383 | c.Assert(stat, IsNil) |
2840 | 384 | |||
2841 | 385 | stat, err = conn.Exists("/zookeeper") | ||
2842 | 386 | c.Assert(err, IsNil) | ||
2843 | 385 | } | 387 | } |
2844 | 386 | 388 | ||
2845 | 387 | func (s *S) TestExistsAndWatch(c *C) { | 389 | func (s *S) TestExistsAndWatch(c *C) { |
2853 | 388 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 390 | c.Check(zk.CountPendingWatches(), Equals, 0) |
2854 | 389 | 391 | ||
2855 | 390 | zk, _ := s.init(c) | 392 | conn, _ := s.init(c) |
2856 | 391 | 393 | ||
2857 | 392 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 394 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2858 | 393 | 395 | ||
2859 | 394 | stat, watch, err := zk.ExistsW("/test") | 396 | stat, watch, err := conn.ExistsW("/test") |
2860 | 395 | c.Assert(err, IsNil) | 397 | c.Assert(err, IsNil) |
2861 | 396 | c.Assert(stat, IsNil) | 398 | c.Assert(stat, IsNil) |
2862 | 397 | 399 | ||
2864 | 398 | c.Check(gozk.CountPendingWatches(), Equals, 2) | 400 | c.Check(zk.CountPendingWatches(), Equals, 2) |
2865 | 399 | 401 | ||
2866 | 400 | select { | 402 | select { |
2867 | 401 | case <-watch: | 403 | case <-watch: |
2868 | @@ -403,62 +405,62 @@ | |||
2869 | 403 | default: | 405 | default: |
2870 | 404 | } | 406 | } |
2871 | 405 | 407 | ||
2873 | 406 | _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 408 | _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2874 | 407 | c.Assert(err, IsNil) | 409 | c.Assert(err, IsNil) |
2875 | 408 | 410 | ||
2876 | 409 | event := <-watch | 411 | event := <-watch |
2878 | 410 | c.Assert(event.Type, Equals, gozk.EVENT_CREATED) | 412 | c.Assert(event.Type, Equals, zk.EVENT_CREATED) |
2879 | 411 | c.Assert(event.Path, Equals, "/test") | 413 | c.Assert(event.Path, Equals, "/test") |
2880 | 412 | 414 | ||
2882 | 413 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 415 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2883 | 414 | 416 | ||
2885 | 415 | stat, watch, err = zk.ExistsW("/test") | 417 | stat, watch, err = conn.ExistsW("/test") |
2886 | 416 | c.Assert(err, IsNil) | 418 | c.Assert(err, IsNil) |
2887 | 417 | c.Assert(stat, NotNil) | 419 | c.Assert(stat, NotNil) |
2888 | 418 | c.Assert(stat.NumChildren(), Equals, int32(0)) | 420 | c.Assert(stat.NumChildren(), Equals, int32(0)) |
2889 | 419 | 421 | ||
2891 | 420 | c.Check(gozk.CountPendingWatches(), Equals, 2) | 422 | c.Check(zk.CountPendingWatches(), Equals, 2) |
2892 | 421 | } | 423 | } |
2893 | 422 | 424 | ||
2894 | 423 | func (s *S) TestExistsAndWatchWithError(c *C) { | 425 | func (s *S) TestExistsAndWatchWithError(c *C) { |
2902 | 424 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 426 | c.Check(zk.CountPendingWatches(), Equals, 0) |
2903 | 425 | 427 | ||
2904 | 426 | zk, _ := s.init(c) | 428 | conn, _ := s.init(c) |
2905 | 427 | 429 | ||
2906 | 428 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 430 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2907 | 429 | 431 | ||
2908 | 430 | stat, watch, err := zk.ExistsW("///") | 432 | stat, watch, err := conn.ExistsW("///") |
2909 | 431 | c.Assert(err, NotNil) | 433 | c.Assert(err, NotNil) |
2911 | 432 | c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS) | 434 | c.Assert(err, Equals, zk.ZBADARGUMENTS) |
2912 | 433 | c.Assert(stat, IsNil) | 435 | c.Assert(stat, IsNil) |
2913 | 434 | c.Assert(watch, IsNil) | 436 | c.Assert(watch, IsNil) |
2914 | 435 | 437 | ||
2916 | 436 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 438 | c.Check(zk.CountPendingWatches(), Equals, 1) |
2917 | 437 | } | 439 | } |
2918 | 438 | 440 | ||
2919 | 439 | func (s *S) TestDelete(c *C) { | 441 | func (s *S) TestDelete(c *C) { |
2935 | 440 | zk, _ := s.init(c) | 442 | conn, _ := s.init(c) |
2936 | 441 | 443 | ||
2937 | 442 | _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 444 | _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2938 | 443 | c.Assert(err, IsNil) | 445 | c.Assert(err, IsNil) |
2939 | 444 | 446 | ||
2940 | 445 | err = zk.Delete("/test", 5) | 447 | err = conn.Delete("/test", 5) |
2941 | 446 | c.Assert(err, NotNil) | 448 | c.Assert(err, NotNil) |
2942 | 447 | c.Assert(err.Code(), Equals, gozk.ZBADVERSION) | 449 | c.Assert(err, Equals, zk.ZBADVERSION) |
2943 | 448 | 450 | ||
2944 | 449 | err = zk.Delete("/test", -1) | 451 | err = conn.Delete("/test", -1) |
2945 | 450 | c.Assert(err, IsNil) | 452 | c.Assert(err, IsNil) |
2946 | 451 | 453 | ||
2947 | 452 | err = zk.Delete("/test", -1) | 454 | err = conn.Delete("/test", -1) |
2948 | 453 | c.Assert(err, NotNil) | 455 | c.Assert(err, NotNil) |
2949 | 454 | c.Assert(err.Code(), Equals, gozk.ZNONODE) | 456 | c.Assert(err, Equals, zk.ZNONODE) |
2950 | 455 | } | 457 | } |
2951 | 456 | 458 | ||
2952 | 457 | func (s *S) TestClientIdAndReInit(c *C) { | 459 | func (s *S) TestClientIdAndReInit(c *C) { |
2953 | 458 | zk1, _ := s.init(c) | 460 | zk1, _ := s.init(c) |
2954 | 459 | clientId1 := zk1.ClientId() | 461 | clientId1 := zk1.ClientId() |
2955 | 460 | 462 | ||
2957 | 461 | zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1) | 463 | zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1) |
2958 | 462 | c.Assert(err, IsNil) | 464 | c.Assert(err, IsNil) |
2959 | 463 | defer zk2.Close() | 465 | defer zk2.Close() |
2960 | 464 | clientId2 := zk2.ClientId() | 466 | clientId2 := zk2.ClientId() |
2961 | @@ -469,110 +471,114 @@ | |||
2962 | 469 | // Surprisingly for some (including myself, initially), the watch | 471 | // Surprisingly for some (including myself, initially), the watch |
2963 | 470 | // returned by the exists method actually fires on data changes too. | 472 | // returned by the exists method actually fires on data changes too. |
2964 | 471 | func (s *S) TestExistsWatchOnDataChange(c *C) { | 473 | func (s *S) TestExistsWatchOnDataChange(c *C) { |
2974 | 472 | zk, _ := s.init(c) | 474 | conn, _ := s.init(c) |
2975 | 473 | 475 | ||
2976 | 474 | _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 476 | _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2977 | 475 | c.Assert(err, IsNil) | 477 | c.Assert(err, IsNil) |
2978 | 476 | 478 | ||
2979 | 477 | _, watch, err := zk.ExistsW("/test") | 479 | _, watch, err := conn.ExistsW("/test") |
2980 | 478 | c.Assert(err, IsNil) | 480 | c.Assert(err, IsNil) |
2981 | 479 | 481 | ||
2982 | 480 | _, err = zk.Set("/test", "new", -1) | 482 | _, err = conn.Set("/test", "new", -1) |
2983 | 481 | c.Assert(err, IsNil) | 483 | c.Assert(err, IsNil) |
2984 | 482 | 484 | ||
2985 | 483 | event := <-watch | 485 | event := <-watch |
2986 | 484 | 486 | ||
2987 | 485 | c.Assert(event.Path, Equals, "/test") | 487 | c.Assert(event.Path, Equals, "/test") |
2989 | 486 | c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) | 488 | c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2990 | 487 | } | 489 | } |
2991 | 488 | 490 | ||
2992 | 489 | func (s *S) TestACL(c *C) { | 491 | func (s *S) TestACL(c *C) { |
3001 | 490 | zk, _ := s.init(c) | 492 | conn, _ := s.init(c) |
3002 | 491 | 493 | ||
3003 | 492 | _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 494 | _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3004 | 493 | c.Assert(err, IsNil) | 495 | c.Assert(err, IsNil) |
3005 | 494 | 496 | ||
3006 | 495 | acl, stat, err := zk.ACL("/test") | 497 | acl, stat, err := conn.ACL("/test") |
3007 | 496 | c.Assert(err, IsNil) | 498 | c.Assert(err, IsNil) |
3008 | 497 | c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) | 499 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
3009 | 498 | c.Assert(stat, NotNil) | 500 | c.Assert(stat, NotNil) |
3010 | 499 | c.Assert(stat.Version(), Equals, int32(0)) | 501 | c.Assert(stat.Version(), Equals, int32(0)) |
3011 | 500 | 502 | ||
3013 | 501 | acl, stat, err = zk.ACL("/non-existent") | 503 | acl, stat, err = conn.ACL("/non-existent") |
3014 | 502 | c.Assert(err, NotNil) | 504 | c.Assert(err, NotNil) |
3016 | 503 | c.Assert(err.Code(), Equals, gozk.ZNONODE) | 505 | c.Assert(err, Equals, zk.ZNONODE) |
3017 | 504 | c.Assert(acl, IsNil) | 506 | c.Assert(acl, IsNil) |
3018 | 505 | c.Assert(stat, IsNil) | 507 | c.Assert(stat, IsNil) |
3019 | 506 | } | 508 | } |
3020 | 507 | 509 | ||
3021 | 508 | func (s *S) TestSetACL(c *C) { | 510 | func (s *S) TestSetACL(c *C) { |
3023 | 509 | zk, _ := s.init(c) | 511 | conn, _ := s.init(c) |
3024 | 510 | 512 | ||
3026 | 511 | _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 513 | _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3027 | 512 | c.Assert(err, IsNil) | 514 | c.Assert(err, IsNil) |
3028 | 513 | 515 | ||
3030 | 514 | err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5) | 516 | err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5) |
3031 | 515 | c.Assert(err, NotNil) | 517 | c.Assert(err, NotNil) |
3040 | 516 | c.Assert(err.Code(), Equals, gozk.ZBADVERSION) | 518 | c.Assert(err, Equals, zk.ZBADVERSION) |
3041 | 517 | 519 | ||
3042 | 518 | err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1) | 520 | err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1) |
3043 | 519 | c.Assert(err, IsNil) | 521 | c.Assert(err, IsNil) |
3044 | 520 | 522 | ||
3045 | 521 | acl, _, err := zk.ACL("/test") | 523 | acl, _, err := conn.ACL("/test") |
3046 | 522 | c.Assert(err, IsNil) | 524 | c.Assert(err, IsNil) |
3047 | 523 | c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) | 525 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
3048 | 524 | } | 526 | } |
3049 | 525 | 527 | ||
3050 | 526 | func (s *S) TestAddAuth(c *C) { | 528 | func (s *S) TestAddAuth(c *C) { |
3056 | 527 | zk, _ := s.init(c) | 529 | conn, _ := s.init(c) |
3057 | 528 | 530 | ||
3058 | 529 | acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} | 531 | acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
3059 | 530 | 532 | ||
3060 | 531 | _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl) | 533 | _, err := conn.Create("/test", "", zk.EPHEMERAL, acl) |
3061 | 532 | c.Assert(err, IsNil) | 534 | c.Assert(err, IsNil) |
3062 | 533 | 535 | ||
3064 | 534 | _, _, err = zk.Get("/test") | 536 | _, _, err = conn.Get("/test") |
3065 | 535 | c.Assert(err, NotNil) | 537 | c.Assert(err, NotNil) |
3067 | 536 | c.Assert(err.Code(), Equals, gozk.ZNOAUTH) | 538 | c.Assert(err, Equals, zk.ZNOAUTH) |
3068 | 537 | 539 | ||
3070 | 538 | err = zk.AddAuth("digest", "joe:passwd") | 540 | err = conn.AddAuth("digest", "joe:passwd") |
3071 | 539 | c.Assert(err, IsNil) | 541 | c.Assert(err, IsNil) |
3072 | 540 | 542 | ||
3074 | 541 | _, _, err = zk.Get("/test") | 543 | _, _, err = conn.Get("/test") |
3075 | 542 | c.Assert(err, IsNil) | 544 | c.Assert(err, IsNil) |
3076 | 543 | } | 545 | } |
3077 | 544 | 546 | ||
3078 | 545 | func (s *S) TestWatchOnReconnection(c *C) { | 547 | func (s *S) TestWatchOnReconnection(c *C) { |
3080 | 546 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 548 | c.Check(zk.CountPendingWatches(), Equals, 0) |
3081 | 547 | 549 | ||
3083 | 548 | zk, session := s.init(c) | 550 | conn, session := s.init(c) |
3084 | 549 | 551 | ||
3085 | 550 | event := <-session | 552 | event := <-session |
3092 | 551 | c.Assert(event.Type, Equals, gozk.EVENT_SESSION) | 553 | c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
3093 | 552 | c.Assert(event.State, Equals, gozk.STATE_CONNECTED) | 554 | c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3094 | 553 | 555 | ||
3095 | 554 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 556 | c.Check(zk.CountPendingWatches(), Equals, 1) |
3096 | 555 | 557 | ||
3097 | 556 | stat, watch, err := zk.ExistsW("/test") | 558 | stat, watch, err := conn.ExistsW("/test") |
3098 | 557 | c.Assert(err, IsNil) | 559 | c.Assert(err, IsNil) |
3099 | 558 | c.Assert(stat, IsNil) | 560 | c.Assert(stat, IsNil) |
3100 | 559 | 561 | ||
3104 | 560 | c.Check(gozk.CountPendingWatches(), Equals, 2) | 562 | c.Check(zk.CountPendingWatches(), Equals, 2) |
3105 | 561 | 563 | ||
3106 | 562 | s.StopZK() | 564 | err = s.zkServer.Stop() |
3107 | 565 | c.Assert(err, IsNil) | ||
3108 | 566 | |||
3109 | 563 | time.Sleep(2e9) | 567 | time.Sleep(2e9) |
3110 | 564 | s.StartZK() | ||
3111 | 565 | 568 | ||
3113 | 566 | // The session channel should receive the reconnection notification, | 569 | err = s.zkServer.Start() |
3114 | 570 | c.Assert(err, IsNil) | ||
3115 | 571 | |||
3116 | 572 | // The session channel should receive the reconnection notification. | ||
3117 | 567 | select { | 573 | select { |
3118 | 568 | case event := <-session: | 574 | case event := <-session: |
3120 | 569 | c.Assert(event.State, Equals, gozk.STATE_CONNECTING) | 575 | c.Assert(event.State, Equals, zk.STATE_CONNECTING) |
3121 | 570 | case <-time.After(3e9): | 576 | case <-time.After(3e9): |
3122 | 571 | c.Fatal("Session watch didn't fire") | 577 | c.Fatal("Session watch didn't fire") |
3123 | 572 | } | 578 | } |
3124 | 573 | select { | 579 | select { |
3125 | 574 | case event := <-session: | 580 | case event := <-session: |
3127 | 575 | c.Assert(event.State, Equals, gozk.STATE_CONNECTED) | 581 | c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3128 | 576 | case <-time.After(3e9): | 582 | case <-time.After(3e9): |
3129 | 577 | c.Fatal("Session watch didn't fire") | 583 | c.Fatal("Session watch didn't fire") |
3130 | 578 | } | 584 | } |
3131 | @@ -585,40 +591,40 @@ | |||
3132 | 585 | } | 591 | } |
3133 | 586 | 592 | ||
3134 | 587 | // And it should still work. | 593 | // And it should still work. |
3136 | 588 | _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) | 594 | _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3137 | 589 | c.Assert(err, IsNil) | 595 | c.Assert(err, IsNil) |
3138 | 590 | 596 | ||
3139 | 591 | event = <-watch | 597 | event = <-watch |
3141 | 592 | c.Assert(event.Type, Equals, gozk.EVENT_CREATED) | 598 | c.Assert(event.Type, Equals, zk.EVENT_CREATED) |
3142 | 593 | c.Assert(event.Path, Equals, "/test") | 599 | c.Assert(event.Path, Equals, "/test") |
3143 | 594 | 600 | ||
3145 | 595 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 601 | c.Check(zk.CountPendingWatches(), Equals, 1) |
3146 | 596 | } | 602 | } |
3147 | 597 | 603 | ||
3148 | 598 | func (s *S) TestWatchOnSessionExpiration(c *C) { | 604 | func (s *S) TestWatchOnSessionExpiration(c *C) { |
3150 | 599 | c.Check(gozk.CountPendingWatches(), Equals, 0) | 605 | c.Check(zk.CountPendingWatches(), Equals, 0) |
3151 | 600 | 606 | ||
3153 | 601 | zk, session := s.init(c) | 607 | conn, session := s.init(c) |
3154 | 602 | 608 | ||
3155 | 603 | event := <-session | 609 | event := <-session |
3162 | 604 | c.Assert(event.Type, Equals, gozk.EVENT_SESSION) | 610 | c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
3163 | 605 | c.Assert(event.State, Equals, gozk.STATE_CONNECTED) | 611 | c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3164 | 606 | 612 | ||
3165 | 607 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 613 | c.Check(zk.CountPendingWatches(), Equals, 1) |
3166 | 608 | 614 | ||
3167 | 609 | stat, watch, err := zk.ExistsW("/test") | 615 | stat, watch, err := conn.ExistsW("/test") |
3168 | 610 | c.Assert(err, IsNil) | 616 | c.Assert(err, IsNil) |
3169 | 611 | c.Assert(stat, IsNil) | 617 | c.Assert(stat, IsNil) |
3170 | 612 | 618 | ||
3172 | 613 | c.Check(gozk.CountPendingWatches(), Equals, 2) | 619 | c.Check(zk.CountPendingWatches(), Equals, 2) |
3173 | 614 | 620 | ||
3174 | 615 | // Use expiration trick described in the FAQ. | 621 | // Use expiration trick described in the FAQ. |
3177 | 616 | clientId := zk.ClientId() | 622 | clientId := conn.ClientId() |
3178 | 617 | zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId) | 623 | zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId) |
3179 | 618 | 624 | ||
3180 | 619 | for event := range session2 { | 625 | for event := range session2 { |
3181 | 620 | c.Log("Event from overlapping session: ", event) | 626 | c.Log("Event from overlapping session: ", event) |
3183 | 621 | if event.State == gozk.STATE_CONNECTED { | 627 | if event.State == zk.STATE_CONNECTED { |
3184 | 622 | // Wait for zk to process the connection. | 628 | // Wait for zk to process the connection. |
3185 | 623 | // Not reliable without this. :-( | 629 | // Not reliable without this. :-( |
3186 | 624 | time.Sleep(1e9) | 630 | time.Sleep(1e9) |
3187 | @@ -627,21 +633,21 @@ | |||
3188 | 627 | } | 633 | } |
3189 | 628 | for event := range session { | 634 | for event := range session { |
3190 | 629 | c.Log("Event from primary session: ", event) | 635 | c.Log("Event from primary session: ", event) |
3192 | 630 | if event.State == gozk.STATE_EXPIRED_SESSION { | 636 | if event.State == zk.STATE_EXPIRED_SESSION { |
3193 | 631 | break | 637 | break |
3194 | 632 | } | 638 | } |
3195 | 633 | } | 639 | } |
3196 | 634 | 640 | ||
3197 | 635 | select { | 641 | select { |
3198 | 636 | case event := <-watch: | 642 | case event := <-watch: |
3200 | 637 | c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION) | 643 | c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION) |
3201 | 638 | case <-time.After(3e9): | 644 | case <-time.After(3e9): |
3202 | 639 | c.Fatal("Watch event didn't fire") | 645 | c.Fatal("Watch event didn't fire") |
3203 | 640 | } | 646 | } |
3204 | 641 | 647 | ||
3205 | 642 | event = <-watch | 648 | event = <-watch |
3208 | 643 | c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) | 649 | c.Assert(event.Type, Equals, zk.EVENT_CLOSED) |
3209 | 644 | c.Assert(event.State, Equals, gozk.STATE_CLOSED) | 650 | c.Assert(event.State, Equals, zk.STATE_CLOSED) |
3210 | 645 | 651 | ||
3212 | 646 | c.Check(gozk.CountPendingWatches(), Equals, 1) | 652 | c.Check(zk.CountPendingWatches(), Equals, 1) |
3213 | 647 | } | 653 | } |
This is a massive change unrelated to documentation.