Merge lp:~rogpeppe/gozk/update-server-interface into lp:~juju/gozk/trunk
- update-server-interface
- Merge into trunk
Status: | Merged |
---|---|
Merge reported by: | Gustavo Niemeyer |
Merged at revision: | not available |
Proposed branch: | lp:~rogpeppe/gozk/update-server-interface |
Merge into: | lp:~juju/gozk/trunk |
Diff against target: |
3053 lines (+1235/-782) 9 files modified
Makefile (+6/-2) example/example.go (+22/-23) reattach_test.go (+198/-0) retry_test.go (+65/-74) runserver.go (+168/-0) server.go (+229/-0) suite_test.go (+42/-121) zk.go (+252/-311) zk_test.go (+253/-251) |
To merge this branch: | bzr merge lp:~rogpeppe/gozk/update-server-interface |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gustavo Niemeyer | Needs Fixing | ||
Review via email: mp+77009@code.launchpad.net |
Commit message
Description of the change
Updated server API.
- 24. By Roger Peppe
-
Fix zookeeper spelling
Gustavo Niemeyer (niemeyer) wrote : | # |
[10]
+// exist, it assumes the server is not running and returns (nil, nil).
This interface is strange, and can easily lead to bugs since it's not an idiom in Go.
I'd never assume p is nil if err == nil. Please return an error if p == nil.
Roger Peppe (rogpeppe) wrote : | # |
On 5 October 2011 15:34, Gustavo Niemeyer <email address hidden> wrote:
> Review: Approve
>
> Great stuff here Rog, thanks!
>
> Some comments:
>
>
> [1]
>
> -// The ZooKepeer installation directory is specified by installedDir.
> +// The ZooKeeper installation directory is specified by installDir.
>
> installDir feels like a strange name in this context. I agree installedDir
> isn't great either, but the new one gives a wrong impression about what
> it means.. we're not installing anything there.
>
> Let's use something like zkDir instead.
done.
> +func (srv *Server) getServerProcess() (*os.Process, os.Error) {
> (...)
> + return os.FindProcess(pid)
>
> FindProcess does nothing on Unix. We can run at least a Signal 0 against
> the pid to tell if it exists or not.
ok, as discussed i'll just not bother about the race.
> [5]
>
> +// Stop kills the ZooKeeper server. It is a no-op if it is already running.
>
> The second part is reversed, and reads a bit hard.
>
> Suggestion: It does nothing if not running.
better. my text was just wrong. done.
>
> Also, please mention that the data is preserved, and that the server can
> be restarted. "kill" may feel like otherwise.
done.
>
>
> [6]
>
> +// Destroy stops the ZooKeeper server, and then removes its run
> +// directory and all its contents.
>
> s/and all its contents././
>
> Removing the directory means removing its contents necessarily.
done.
> [7]
>
> +func (srv *Server) writeInstallDir() os.Error {
> + return ioutil.
>
> These functions and filename should be renamed in sync to the first point.
i'm not sure what you mean here.
> + if data[len(data)-1] == '\n' {
> + data = data[0 : len(data)-1]
> + }
> + srv.installDir = string(data)
>
> srv.zkDir = string(
really? this code will have written the data inside the file, so there
should be no
extra space added. it's legal (ok it's pretty unusual) for a file name to
end in a space, and calling TrimSpace would break that.
- 25. By Roger Peppe
-
Added reattach_test.go to test server reattaching.
Service-related methods now get their own file.
Roger Peppe (rogpeppe) wrote : | # |
On 5 October 2011 15:49, Gustavo Niemeyer <email address hidden> wrote:
> [10]
>
>
> +// exist, it assumes the server is not running and returns (nil, nil).
>
> This interface is strange, and can easily lead to bugs since it's not an idiom in Go.
> I'd never assume p is nil if err == nil. Please return an error if p == nil.
i've changed this, but for reference, i've just noticed a similar example
in your code, in Exists:
// We diverge a bit from the usual here: a ZNONODE is not an error
// for an exists call, otherwise every Exists call would have to check
// for err != nil and err.Code() != ZNONODE.
it's for a very similar reason that i returned a nil process and nil error from
Process when there was no process to be found.
Gustavo Niemeyer (niemeyer) wrote : | # |
Good changes overall, thanks. Some follow ups:
[7]
You've renamed installDir to zkDir. Let's rename functions and filenames
to match that as well (zkdir.txt, writeZkDir, etc)
[8]
Ok, but this logic is wrong because it doesn't check the length and could
panic.
Alright, so instead of doing either of those things, please just don't append
a '\n' at the end, so we can just string(data) straight. This is already
being done for pid.txt, and sounds like a sane approach.
[10]
Indeed, but that feels like a slightly different case because the method
is already a question (does it exist?), so the fact it doesn't exist
is an answer to that question rather than an error.
If the method was named something like Stat, it should definitely be an
error rather than simply an empty stat.
[11]
+// ServerCommand returns the command used to start the
+// ZooKeeper server. It is provided for debugging and testing
+// purposes only.
+func (srv *Server) command() ([]string, os.Error) {
s/ServerCommand
s/It is provided.*//
[12]
+// NetworkPort returns the TCP port number that
+// the server is configured for.
s/NetworkPort/
[13]
+// This file defines methods on Server that deal with starting
+// and stopping the ZooKeeper service. They are independent of ZooKeeper
+// itself, and may be factored out at a later date.
Please merge this on server.go. As pointed out in the other review and obvious
through out the code, this is really specific to ZooKeeper. If/when we do
refactor this out to generalize, we'll have to reorganize anyway.
[14]
+var ErrNotRunning = os.NewError(
I'm not a big fan of the Err* convention. Makes for an ugly name with typing
information embedded on it. Some packages do use it, but thankfully it's
also not strictly followed in the Go standard library either.
I'd appreciate renaming this to NotRunning instead. zk.NotRunning seems
pretty good.
[15]
+// Process returns a Process referring to the running server from
+// where it's been stored in pid.txt. If the file does not
+// exist, or it cannot find the process, it returns the error
+// ErrNotRunning.
Where the pid is stored is an implementation detail that can
change. We don't have to document that for now.
[16]
+// getProcess gets a Process from a pid and check that the
s/check/checks/
[17]
+ if e, ok := err.(*os.
Ugh. I hope we get that cleaned in Go itself with the suggested changes for 1.
[18]
+ // is to poll until it exits. If the process has taken longer than
+ // a second to exit, then it's probably not going to.
This is very optimistic. A process can easily take more than a second to exit. If
this is an edge case and there's a reason to wait at all (dir content, etc), then
let's wait a bit longer, like 10 seconds for instance.
[19]
+// cases to test:
+// child server, stopped normally; reattach, start
+// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start
+// non-direct child server, still running; reattach, start (->error), stop, start
+// child server, still running; reattach,...
Roger Peppe (rogpeppe) wrote : | # |
PTAL
On 7 October 2011 16:58, Gustavo Niemeyer <email address hidden> wrote:
> [7]
>
> You've renamed installDir to zkDir. Let's rename functions and filenames
> to match that as well (zkdir.txt, writeZkDir, etc)
done
>
> [8]
>
> Ok, but this logic is wrong because it doesn't check the length and could
> panic.
>
> Alright, so instead of doing either of those things, please just don't append
> a '\n' at the end, so we can just string(data) straight. This is already
> being done for pid.txt, and sounds like a sane approach.
done
> [11]
>
> +// ServerCommand returns the command used to start the
> +// ZooKeeper server. It is provided for debugging and testing
> +// purposes only.
> +func (srv *Server) command() ([]string, os.Error) {
>
> s/ServerCommand
> s/It is provided.*//
given that it's not external any more, that "debugging and testing"
remark isn't necessary now.
> [12]
>
> +// NetworkPort returns the TCP port number that
> +// the server is configured for.
>
> s/NetworkPort/
>
> [13]
>
> +// This file defines methods on Server that deal with starting
> +// and stopping the ZooKeeper service. They are independent of ZooKeeper
> +// itself, and may be factored out at a later date.
>
> Please merge this on server.go. As pointed out in the other review and obvious
> through out the code, this is really specific to ZooKeeper. If/when we do
> refactor this out to generalize, we'll have to reorganize anyway.
i can do this, but given that the dependencies of the two files
are mostly different (only fmt, ioutil and os in common), and there's
very little interlink between the two, i think it makes sense to keep
them as two different files - i think it makes the code easier to read.
perhaps "runserver.go" might be a better name than "service.go" though.
i've done that for the time being. that said, it's no big deal.
> [14]
>
> +var ErrNotRunning = os.NewError(
>
> I'm not a big fan of the Err* convention. Makes for an ugly name with typing
> information embedded on it. Some packages do use it, but thankfully it's
> also not strictly followed in the Go standard library either.
>
> I'd appreciate renaming this to NotRunning instead. zk.NotRunning seems
> pretty good.
ok.
>
> [15]
>
> +// Process returns a Process referring to the running server from
> +// where it's been stored in pid.txt. If the file does not
> +// exist, or it cannot find the process, it returns the error
> +// ErrNotRunning.
>
> Where the pid is stored is an implementation detail that can
> change. We don't have to document that for now.
>
> [16]
>
> +// getProcess gets a Process from a pid and check that the
>
> s/check/checks/
done
> [18]
>
> + // is to poll until it exits. If the process has taken longer than
> + // a second to exit, then it's probably not going to.
>
> This is very optimistic. A process can easily take more than a second to exit. If
> this is an edge case and there's a reason to wait at all (dir content, etc), then
> let's wait a bit longer, like 10 seconds for instance.
ok. given that we're killing the server with SIGKILL, it's not going to
have any chance to take ages to c...
- 26. By Roger Peppe
-
Fixes as per review.
Preview Diff
1 | === modified file 'Makefile' |
2 | --- Makefile 2011-08-03 01:56:58 +0000 |
3 | +++ Makefile 2011-10-10 13:55:28 +0000 |
4 | @@ -2,10 +2,14 @@ |
5 | |
6 | all: package |
7 | |
8 | -TARG=gozk |
9 | +TARG=launchpad.net/gozk/zk |
10 | |
11 | +GOFILES=\ |
12 | + server.go\ |
13 | + runserver.go\ |
14 | + |
15 | CGOFILES=\ |
16 | - gozk.go\ |
17 | + zk.go\ |
18 | |
19 | CGO_OFILES=\ |
20 | helpers.o\ |
21 | |
22 | === added directory 'example' |
23 | === renamed file 'example.go' => 'example/example.go' |
24 | --- example.go 2011-08-03 01:47:25 +0000 |
25 | +++ example/example.go 2011-10-10 13:55:28 +0000 |
26 | @@ -1,30 +1,29 @@ |
27 | package main |
28 | |
29 | import ( |
30 | - "gozk" |
31 | + "launchpad.net/zookeeper/zookeeper" |
32 | ) |
33 | |
34 | func main() { |
35 | - zk, session, err := gozk.Init("localhost:2181", 5000) |
36 | - if err != nil { |
37 | - println("Couldn't connect: " + err.String()) |
38 | - return |
39 | - } |
40 | - |
41 | - defer zk.Close() |
42 | - |
43 | - // Wait for connection. |
44 | - event := <-session |
45 | - if event.State != gozk.STATE_CONNECTED { |
46 | - println("Couldn't connect") |
47 | - return |
48 | - } |
49 | - |
50 | - _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL)) |
51 | - if err != nil { |
52 | - println(err.String()) |
53 | - } else { |
54 | - println("Created!") |
55 | - } |
56 | + zk, session, err := zookeeper.Init("localhost:2181", 5000) |
57 | + if err != nil { |
58 | + println("Couldn't connect: " + err.String()) |
59 | + return |
60 | + } |
61 | + |
62 | + defer zk.Close() |
63 | + |
64 | + // Wait for connection. |
65 | + event := <-session |
66 | + if event.State != zookeeper.STATE_CONNECTED { |
67 | + println("Couldn't connect") |
68 | + return |
69 | + } |
70 | + |
71 | + _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
72 | + if err != nil { |
73 | + println(err.String()) |
74 | + } else { |
75 | + println("Created!") |
76 | + } |
77 | } |
78 | - |
79 | |
80 | === added file 'reattach_test.go' |
81 | --- reattach_test.go 1970-01-01 00:00:00 +0000 |
82 | +++ reattach_test.go 2011-10-10 13:55:28 +0000 |
83 | @@ -0,0 +1,198 @@ |
84 | +package zk_test |
85 | + |
86 | +import ( |
87 | + "bufio" |
88 | + . "launchpad.net/gocheck" |
89 | + "launchpad.net/gozk/zk" |
90 | + "exec" |
91 | + "flag" |
92 | + "fmt" |
93 | + "os" |
94 | + "strings" |
95 | + "testing" |
96 | + "time" |
97 | +) |
98 | + |
99 | +var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing") |
100 | +var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing") |
101 | +var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing") |
102 | + |
103 | +// This is the reentrancy point for testing ZooKeeper servers |
104 | +// started by processes that are not direct children of the |
105 | +// testing process. This test always succeeds - the status |
106 | +// will be written to stdout and read by indirectServer. |
107 | +func TestStartNonChildServer(t *testing.T) { |
108 | + if !*reattach { |
109 | + // not re-entrant, so ignore this test. |
110 | + return |
111 | + } |
112 | + err := startServer(*reattachRunDir, *reattachAbnormalStop) |
113 | + if err != nil { |
114 | + fmt.Printf("error:%v\n", err) |
115 | + return |
116 | + } |
117 | + fmt.Printf("done\n") |
118 | +} |
119 | + |
120 | +func (s *S) startServer(c *C, abort bool) { |
121 | + err := startServer(s.zkTestRoot, abort) |
122 | + c.Assert(err, IsNil) |
123 | +} |
124 | + |
125 | +// startServerIndirect starts a ZooKeeper server that is not |
126 | +// a direct child of the current process. If abort is true, |
127 | +// the server will be started and then terminated abnormally. |
128 | +func (s *S) startServerIndirect(c *C, abort bool) { |
129 | + if len(os.Args) == 0 { |
130 | + c.Fatal("Cannot find self executable name") |
131 | + } |
132 | + cmd := exec.Command( |
133 | + os.Args[0], |
134 | + "-zktest.reattach", |
135 | + "-zktest.rundir", s.zkTestRoot, |
136 | + "-zktest.stop=", fmt.Sprint(abort), |
137 | + "-test.run", "StartNonChildServer", |
138 | + ) |
139 | + r, err := cmd.StdoutPipe() |
140 | + c.Assert(err, IsNil) |
141 | + defer r.Close() |
142 | + if err := cmd.Start(); err != nil { |
143 | + c.Fatalf("cannot start re-entrant gotest process: %v", err) |
144 | + } |
145 | + defer cmd.Wait() |
146 | + bio := bufio.NewReader(r) |
147 | + for { |
148 | + line, err := bio.ReadSlice('\n') |
149 | + if err != nil { |
150 | + c.Fatalf("indirect server status line not found: %v", err) |
151 | + } |
152 | + s := string(line) |
153 | + if strings.HasPrefix(s, "error:") { |
154 | + c.Fatalf("indirect server error: %s", s[len("error:"):]) |
155 | + } |
156 | + if s == "done\n" { |
157 | + return |
158 | + } |
159 | + } |
160 | + panic("not reached") |
161 | +} |
162 | + |
163 | +// startServer starts a ZooKeeper server, and terminates it abnormally |
164 | +// if abort is true. |
165 | +func startServer(runDir string, abort bool) os.Error { |
166 | + srv, err := zk.AttachServer(runDir) |
167 | + if err != nil { |
168 | + return fmt.Errorf("cannot attach to server at %q: %v", runDir, err) |
169 | + } |
170 | + if err := srv.Start(); err != nil { |
171 | + return fmt.Errorf("cannot start server: %v", err) |
172 | + } |
173 | + if abort { |
174 | + // Give it time to start up, then kill the server process abnormally, |
175 | + // leaving the pid.txt file behind. |
176 | + time.Sleep(0.5e9) |
177 | + p, err := srv.Process() |
178 | + if err != nil { |
179 | + return fmt.Errorf("cannot get server process: %v", err) |
180 | + } |
181 | + defer p.Release() |
182 | + if err := p.Kill(); err != nil { |
183 | + return fmt.Errorf("cannot kill server process: %v", err) |
184 | + } |
185 | + } |
186 | + return nil |
187 | +} |
188 | + |
189 | +func (s *S) checkCookie(c *C) { |
190 | + conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
191 | + c.Assert(err, IsNil) |
192 | + |
193 | + e, ok := <-watch |
194 | + c.Assert(ok, Equals, true) |
195 | + c.Assert(e.Ok(), Equals, true) |
196 | + |
197 | + c.Assert(err, IsNil) |
198 | + cookie, _, err := conn.Get("/testAttachCookie") |
199 | + c.Assert(err, IsNil) |
200 | + c.Assert(cookie, Equals, "testAttachCookie") |
201 | + conn.Close() |
202 | +} |
203 | + |
204 | +// cases to test: |
205 | +// child server, stopped normally; reattach, start |
206 | +// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start |
207 | +// non-direct child server, still running; reattach, start (->error), stop, start |
208 | +// child server, still running; reattach, start (-> error) |
209 | +// child server, still running; reattach, stop, start. |
210 | +// non-direct child server, still running; reattach, stop, start. |
211 | +func (s *S) TestAttachServer(c *C) { |
212 | + // Create a cookie so that we know we are reattaching to the same instance. |
213 | + conn, _ := s.init(c) |
214 | + _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL)) |
215 | + c.Assert(err, IsNil) |
216 | + s.checkCookie(c) |
217 | + s.zkServer.Stop() |
218 | + s.zkServer = nil |
219 | + |
220 | + s.testAttachServer(c, (*S).startServer) |
221 | + s.testAttachServer(c, (*S).startServerIndirect) |
222 | + s.testAttachServerAbnormalTerminate(c, (*S).startServer) |
223 | + s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect) |
224 | + |
225 | + srv, err := zk.AttachServer(s.zkTestRoot) |
226 | + c.Assert(err, IsNil) |
227 | + |
228 | + s.zkServer = srv |
229 | + err = s.zkServer.Start() |
230 | + c.Assert(err, IsNil) |
231 | + |
232 | + conn, _ = s.init(c) |
233 | + err = conn.Delete("/testAttachCookie", -1) |
234 | + c.Assert(err, IsNil) |
235 | +} |
236 | + |
237 | +func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) { |
238 | + start(s, c, false) |
239 | + |
240 | + s.checkCookie(c) |
241 | + |
242 | + // try attaching to it while it is still running - it should fail. |
243 | + srv, err := zk.AttachServer(s.zkTestRoot) |
244 | + c.Assert(err, IsNil) |
245 | + |
246 | + err = srv.Start() |
247 | + c.Assert(err, NotNil) |
248 | + |
249 | + // stop it and then start it again - it should succeed. |
250 | + err = srv.Stop() |
251 | + c.Assert(err, IsNil) |
252 | + |
253 | + err = srv.Start() |
254 | + c.Assert(err, IsNil) |
255 | + |
256 | + s.checkCookie(c) |
257 | + |
258 | + err = srv.Stop() |
259 | + c.Assert(err, IsNil) |
260 | +} |
261 | + |
262 | +func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) { |
263 | + start(s, c, true) |
264 | + |
265 | + // try attaching to it and starting - it should fail, because pid.txt |
266 | + // won't have been removed. |
267 | + srv, err := zk.AttachServer(s.zkTestRoot) |
268 | + c.Assert(err, IsNil) |
269 | + err = srv.Start() |
270 | + c.Assert(err, NotNil) |
271 | + |
272 | + // stopping it should bring things back to normal. |
273 | + err = srv.Stop() |
274 | + c.Assert(err, IsNil) |
275 | + err = srv.Start() |
276 | + c.Assert(err, IsNil) |
277 | + |
278 | + s.checkCookie(c) |
279 | + err = srv.Stop() |
280 | + c.Assert(err, IsNil) |
281 | +} |
282 | |
283 | === modified file 'retry_test.go' |
284 | --- retry_test.go 2011-08-19 01:51:37 +0000 |
285 | +++ retry_test.go 2011-10-10 13:55:28 +0000 |
286 | @@ -1,42 +1,41 @@ |
287 | -package gozk_test |
288 | +package zk_test |
289 | |
290 | import ( |
291 | . "launchpad.net/gocheck" |
292 | - "gozk" |
293 | + "launchpad.net/gozk/zk" |
294 | "os" |
295 | ) |
296 | |
297 | func (s *S) TestRetryChangeCreating(c *C) { |
298 | - zk, _ := s.init(c) |
299 | + conn, _ := s.init(c) |
300 | |
301 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
302 | - func(data string, stat gozk.Stat) (string, os.Error) { |
303 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
304 | + func(data string, stat *zk.Stat) (string, os.Error) { |
305 | c.Assert(data, Equals, "") |
306 | c.Assert(stat, IsNil) |
307 | return "new", nil |
308 | }) |
309 | c.Assert(err, IsNil) |
310 | |
311 | - data, stat, err := zk.Get("/test") |
312 | + data, stat, err := conn.Get("/test") |
313 | c.Assert(err, IsNil) |
314 | c.Assert(stat, NotNil) |
315 | c.Assert(stat.Version(), Equals, int32(0)) |
316 | c.Assert(data, Equals, "new") |
317 | |
318 | - acl, _, err := zk.ACL("/test") |
319 | + acl, _, err := conn.ACL("/test") |
320 | c.Assert(err, IsNil) |
321 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
322 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
323 | } |
324 | |
325 | func (s *S) TestRetryChangeSetting(c *C) { |
326 | - zk, _ := s.init(c) |
327 | + conn, _ := s.init(c) |
328 | |
329 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
330 | - gozk.WorldACL(gozk.PERM_ALL)) |
331 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
332 | c.Assert(err, IsNil) |
333 | |
334 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
335 | - func(data string, stat gozk.Stat) (string, os.Error) { |
336 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
337 | + func(data string, stat *zk.Stat) (string, os.Error) { |
338 | c.Assert(data, Equals, "old") |
339 | c.Assert(stat, NotNil) |
340 | c.Assert(stat.Version(), Equals, int32(0)) |
341 | @@ -44,27 +43,26 @@ |
342 | }) |
343 | c.Assert(err, IsNil) |
344 | |
345 | - data, stat, err := zk.Get("/test") |
346 | + data, stat, err := conn.Get("/test") |
347 | c.Assert(err, IsNil) |
348 | c.Assert(stat, NotNil) |
349 | c.Assert(stat.Version(), Equals, int32(1)) |
350 | c.Assert(data, Equals, "brand new") |
351 | |
352 | // ACL was unchanged by RetryChange(). |
353 | - acl, _, err := zk.ACL("/test") |
354 | + acl, _, err := conn.ACL("/test") |
355 | c.Assert(err, IsNil) |
356 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
357 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
358 | } |
359 | |
360 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { |
361 | - zk, _ := s.init(c) |
362 | + conn, _ := s.init(c) |
363 | |
364 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
365 | - gozk.WorldACL(gozk.PERM_ALL)) |
366 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
367 | c.Assert(err, IsNil) |
368 | |
369 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
370 | - func(data string, stat gozk.Stat) (string, os.Error) { |
371 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
372 | + func(data string, stat *zk.Stat) (string, os.Error) { |
373 | c.Assert(data, Equals, "old") |
374 | c.Assert(stat, NotNil) |
375 | c.Assert(stat.Version(), Equals, int32(0)) |
376 | @@ -72,7 +70,7 @@ |
377 | }) |
378 | c.Assert(err, IsNil) |
379 | |
380 | - data, stat, err := zk.Get("/test") |
381 | + data, stat, err := conn.Get("/test") |
382 | c.Assert(err, IsNil) |
383 | c.Assert(stat, NotNil) |
384 | c.Assert(stat.Version(), Equals, int32(0)) // Unchanged! |
385 | @@ -80,14 +78,14 @@ |
386 | } |
387 | |
388 | func (s *S) TestRetryChangeConflictOnCreate(c *C) { |
389 | - zk, _ := s.init(c) |
390 | + conn, _ := s.init(c) |
391 | |
392 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
393 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
394 | switch data { |
395 | case "": |
396 | c.Assert(stat, IsNil) |
397 | - _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL, |
398 | - gozk.WorldACL(gozk.PERM_ALL)) |
399 | + _, err := conn.Create("/test", "conflict", zk.EPHEMERAL, |
400 | + zk.WorldACL(zk.PERM_ALL)) |
401 | c.Assert(err, IsNil) |
402 | return "<none> => conflict", nil |
403 | case "conflict": |
404 | @@ -100,11 +98,10 @@ |
405 | return "can't happen", nil |
406 | } |
407 | |
408 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
409 | - changeFunc) |
410 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc) |
411 | c.Assert(err, IsNil) |
412 | |
413 | - data, stat, err := zk.Get("/test") |
414 | + data, stat, err := conn.Get("/test") |
415 | c.Assert(err, IsNil) |
416 | c.Assert(data, Equals, "conflict => new") |
417 | c.Assert(stat, NotNil) |
418 | @@ -112,18 +109,17 @@ |
419 | } |
420 | |
421 | func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) { |
422 | - zk, _ := s.init(c) |
423 | + conn, _ := s.init(c) |
424 | |
425 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
426 | - gozk.WorldACL(gozk.PERM_ALL)) |
427 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
428 | c.Assert(err, IsNil) |
429 | |
430 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
431 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
432 | switch data { |
433 | case "old": |
434 | c.Assert(stat, NotNil) |
435 | c.Assert(stat.Version(), Equals, int32(0)) |
436 | - _, err := zk.Set("/test", "conflict", 0) |
437 | + _, err := conn.Set("/test", "conflict", 0) |
438 | c.Assert(err, IsNil) |
439 | return "old => new", nil |
440 | case "conflict": |
441 | @@ -136,10 +132,10 @@ |
442 | return "can't happen", nil |
443 | } |
444 | |
445 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc) |
446 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc) |
447 | c.Assert(err, IsNil) |
448 | |
449 | - data, stat, err := zk.Get("/test") |
450 | + data, stat, err := conn.Get("/test") |
451 | c.Assert(err, IsNil) |
452 | c.Assert(data, Equals, "conflict => new") |
453 | c.Assert(stat, NotNil) |
454 | @@ -147,18 +143,17 @@ |
455 | } |
456 | |
457 | func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) { |
458 | - zk, _ := s.init(c) |
459 | + conn, _ := s.init(c) |
460 | |
461 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
462 | - gozk.WorldACL(gozk.PERM_ALL)) |
463 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
464 | c.Assert(err, IsNil) |
465 | |
466 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
467 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
468 | switch data { |
469 | case "old": |
470 | c.Assert(stat, NotNil) |
471 | c.Assert(stat.Version(), Equals, int32(0)) |
472 | - err := zk.Delete("/test", 0) |
473 | + err := conn.Delete("/test", 0) |
474 | c.Assert(err, IsNil) |
475 | return "old => <deleted>", nil |
476 | case "": |
477 | @@ -170,55 +165,53 @@ |
478 | return "can't happen", nil |
479 | } |
480 | |
481 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ), |
482 | - changeFunc) |
483 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc) |
484 | c.Assert(err, IsNil) |
485 | |
486 | - data, stat, err := zk.Get("/test") |
487 | + data, stat, err := conn.Get("/test") |
488 | c.Assert(err, IsNil) |
489 | c.Assert(data, Equals, "<deleted> => new") |
490 | c.Assert(stat, NotNil) |
491 | c.Assert(stat.Version(), Equals, int32(0)) |
492 | |
493 | // Should be the new ACL. |
494 | - acl, _, err := zk.ACL("/test") |
495 | + acl, _, err := conn.ACL("/test") |
496 | c.Assert(err, IsNil) |
497 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
498 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
499 | } |
500 | |
501 | func (s *S) TestRetryChangeErrorInCallback(c *C) { |
502 | - zk, _ := s.init(c) |
503 | + conn, _ := s.init(c) |
504 | |
505 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
506 | - func(data string, stat gozk.Stat) (string, os.Error) { |
507 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
508 | + func(data string, stat *zk.Stat) (string, os.Error) { |
509 | return "don't use this", os.NewError("BOOM!") |
510 | }) |
511 | c.Assert(err, NotNil) |
512 | - c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR) |
513 | c.Assert(err.String(), Equals, "BOOM!") |
514 | |
515 | - stat, err := zk.Exists("/test") |
516 | + stat, err := conn.Exists("/test") |
517 | c.Assert(err, IsNil) |
518 | c.Assert(stat, IsNil) |
519 | } |
520 | |
521 | func (s *S) TestRetryChangeFailsReading(c *C) { |
522 | - zk, _ := s.init(c) |
523 | + conn, _ := s.init(c) |
524 | |
525 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
526 | - gozk.WorldACL(gozk.PERM_WRITE)) // Write only! |
527 | + // Write only! |
528 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE)) |
529 | c.Assert(err, IsNil) |
530 | |
531 | var called bool |
532 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
533 | - func(data string, stat gozk.Stat) (string, os.Error) { |
534 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
535 | + func(data string, stat *zk.Stat) (string, os.Error) { |
536 | called = true |
537 | return "", nil |
538 | }) |
539 | c.Assert(err, NotNil) |
540 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
541 | + c.Assert(err, Equals, zk.ZNOAUTH) |
542 | |
543 | - stat, err := zk.Exists("/test") |
544 | + stat, err := conn.Exists("/test") |
545 | c.Assert(err, IsNil) |
546 | c.Assert(stat, NotNil) |
547 | c.Assert(stat.Version(), Equals, int32(0)) |
548 | @@ -227,22 +220,21 @@ |
549 | } |
550 | |
551 | func (s *S) TestRetryChangeFailsSetting(c *C) { |
552 | - zk, _ := s.init(c) |
553 | + conn, _ := s.init(c) |
554 | |
555 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
556 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
557 | + // Read only! |
558 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
559 | c.Assert(err, IsNil) |
560 | |
561 | var called bool |
562 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
563 | - func(data string, stat gozk.Stat) (string, os.Error) { |
564 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
565 | + func(data string, stat *zk.Stat) (string, os.Error) { |
566 | called = true |
567 | return "", nil |
568 | }) |
569 | - c.Assert(err, NotNil) |
570 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
571 | + c.Assert(err, Equals, zk.ZNOAUTH) |
572 | |
573 | - stat, err := zk.Exists("/test") |
574 | + stat, err := conn.Exists("/test") |
575 | c.Assert(err, IsNil) |
576 | c.Assert(stat, NotNil) |
577 | c.Assert(stat.Version(), Equals, int32(0)) |
578 | @@ -251,23 +243,22 @@ |
579 | } |
580 | |
581 | func (s *S) TestRetryChangeFailsCreating(c *C) { |
582 | - zk, _ := s.init(c) |
583 | + conn, _ := s.init(c) |
584 | |
585 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
586 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
587 | + // Read only! |
588 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
589 | c.Assert(err, IsNil) |
590 | |
591 | var called bool |
592 | - err = zk.RetryChange("/test/sub", gozk.EPHEMERAL, |
593 | - gozk.WorldACL(gozk.PERM_ALL), |
594 | - func(data string, stat gozk.Stat) (string, os.Error) { |
595 | + err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
596 | + func(data string, stat *zk.Stat) (string, os.Error) { |
597 | called = true |
598 | return "", nil |
599 | }) |
600 | c.Assert(err, NotNil) |
601 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
602 | + c.Assert(err, Equals, zk.ZNOAUTH) |
603 | |
604 | - stat, err := zk.Exists("/test/sub") |
605 | + stat, err := conn.Exists("/test/sub") |
606 | c.Assert(err, IsNil) |
607 | c.Assert(stat, IsNil) |
608 | |
609 | |
610 | === added file 'runserver.go' |
611 | --- runserver.go 1970-01-01 00:00:00 +0000 |
612 | +++ runserver.go 2011-10-10 13:55:28 +0000 |
613 | @@ -0,0 +1,168 @@ |
614 | +package zk |
615 | + |
616 | +// This file defines methods on Server that deal with starting |
617 | +// and stopping the ZooKeeper service. They are independent of ZooKeeper |
618 | +// itself, and may be factored out at a later date. |
619 | + |
620 | +import ( |
621 | + "exec" |
622 | + "fmt" |
623 | + "io/ioutil" |
624 | + "os" |
625 | + "strconv" |
626 | + "time" |
627 | +) |
628 | + |
629 | +var NotRunning = os.NewError("process not running") |
630 | + |
631 | +// Process returns a Process referring to the running server from |
632 | +// where it's been stored in pid.txt. If the file does not |
633 | +// exist, or it cannot find the process, it returns the error |
634 | +// NotRunning. |
635 | +func (srv *Server) Process() (*os.Process, os.Error) { |
636 | + data, err := ioutil.ReadFile(srv.path("pid.txt")) |
637 | + if err != nil { |
638 | + if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT { |
639 | + return nil, NotRunning |
640 | + } |
641 | + return nil, err |
642 | + } |
643 | + pid, err := strconv.Atoi(string(data)) |
644 | + if err != nil { |
645 | + return nil, os.NewError("bad process id found in pid.txt") |
646 | + } |
647 | + return getProcess(pid) |
648 | +} |
649 | + |
650 | +// getProcess gets a Process from a pid and checks that the |
651 | +// process is actually running. If the process |
652 | +// is not running, then getProcess returns a nil |
653 | +// Process and the error NotRunning. |
654 | +func getProcess(pid int) (*os.Process, os.Error) { |
655 | + p, err := os.FindProcess(pid) |
656 | + if err != nil { |
657 | + return nil, err |
658 | + } |
659 | + |
660 | + // try to check if the process is actually running by sending |
661 | + // it signal 0. |
662 | + err = p.Signal(os.UnixSignal(0)) |
663 | + if err == nil { |
664 | + return p, nil |
665 | + } |
666 | + if err, ok := err.(os.Errno); ok && err == os.ESRCH { |
667 | + return nil, NotRunning |
668 | + } |
669 | + return nil, os.NewError("server running but inaccessible") |
670 | +} |
671 | + |
672 | +// Start starts the ZooKeeper server. |
673 | +// It returns an error if the server is already running. |
674 | +func (srv *Server) Start() os.Error { |
675 | + if err := srv.checkAvailability(); err != nil { |
676 | + return err |
677 | + } |
678 | + p, err := srv.Process() |
679 | + if err == nil || err != NotRunning { |
680 | + if p != nil { |
681 | + p.Release() |
682 | + } |
683 | + return os.NewError("server is already running") |
684 | + } |
685 | + |
686 | + if _, err := os.Stat(srv.path("pid.txt")); err == nil { |
687 | + // Thre pid.txt file still exists although server is not running. |
688 | + // Remove it so it can be re-created. |
689 | + // This leads to a race: if two processes are both |
690 | + // calling Start, one might remove the file the other |
691 | + // has just created, leading to a situation where |
692 | + // pid.txt describes the wrong server process. |
693 | + // We ignore that possibility for now. |
694 | + // TODO use syscall.Flock? |
695 | + if err := os.Remove(srv.path("pid.txt")); err != nil { |
696 | + return fmt.Errorf("cannot remove pid.txt: %v", err) |
697 | + } |
698 | + } |
699 | + |
700 | + // Open the pid file before starting the process so that if we get two |
701 | + // programs trying to concurrently start a server on the same directory |
702 | + // at the same time, only one should succeed. |
703 | + pidf, err := os.OpenFile(srv.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666) |
704 | + if err != nil { |
705 | + return fmt.Errorf("cannot create pid.txt: %v", err) |
706 | + } |
707 | + defer pidf.Close() |
708 | + args, err := srv.command() |
709 | + if err != nil { |
710 | + return fmt.Errorf("cannot determine command: %v", err) |
711 | + } |
712 | + cmd := exec.Command(args[0], args[1:]...) |
713 | + |
714 | + logf, err := os.OpenFile(srv.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) |
715 | + if err != nil { |
716 | + return fmt.Errorf("cannot create log file: %v", err) |
717 | + } |
718 | + defer logf.Close() |
719 | + cmd.Stdout = logf |
720 | + cmd.Stderr = logf |
721 | + if err := cmd.Start(); err != nil { |
722 | + return fmt.Errorf("cannot start server: %v", err) |
723 | + } |
724 | + if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil { |
725 | + return fmt.Errorf("cannot write pid file: %v", err) |
726 | + } |
727 | + return nil |
728 | +} |
729 | + |
730 | +// Stop kills the ZooKeeper server. It does nothing if it is not running. |
731 | +// Note that Stop does not remove any data from the run directory, |
732 | +// so Start may be called later on the same directory. |
733 | +func (srv *Server) Stop() os.Error { |
734 | + p, err := srv.Process() |
735 | + if p == nil { |
736 | + if err != nil { |
737 | + return fmt.Errorf("cannot read process ID of server: %v", err) |
738 | + } |
739 | + return nil |
740 | + } |
741 | + defer p.Release() |
742 | + if err := p.Kill(); err != nil { |
743 | + return fmt.Errorf("cannot kill server process: %v", err) |
744 | + } |
745 | + // ignore the error returned from Wait because there's little |
746 | + // we can do about it - it either means that the process has just exited |
747 | + // anyway or that we can't wait for it for some other reason, |
748 | + // for example because it was originally started by some other process. |
749 | + _, err = p.Wait(0) |
750 | + if e, ok := err.(*os.SyscallError); ok && e.Errno == os.ECHILD || err == os.ECHILD { |
751 | + // If we can't wait for the server, it's possible that it was running |
752 | + // but not as a child of this process, so the only thing we can do |
753 | + // is to poll until it exits. If the process has taken longer than |
754 | + // a second to exit, then it's probably not going to. |
755 | + for i := 0; i < 5*4; i++ { |
756 | + time.Sleep(1e9 / 4) |
757 | + if np, err := getProcess(p.Pid); err != nil { |
758 | + break |
759 | + } else { |
760 | + np.Release() |
761 | + } |
762 | + } |
763 | + } |
764 | + |
765 | + if err := os.Remove(srv.path("pid.txt")); err != nil { |
766 | + return fmt.Errorf("cannot remove server process ID file: %v", err) |
767 | + } |
768 | + return nil |
769 | +} |
770 | + |
771 | +// Destroy stops the ZooKeeper server, and then removes its run |
772 | +// directory. Warning: this will destroy all data associated with the server. |
773 | +func (srv *Server) Destroy() os.Error { |
774 | + if err := srv.Stop(); err != nil { |
775 | + return err |
776 | + } |
777 | + if err := os.RemoveAll(srv.runDir); err != nil { |
778 | + return err |
779 | + } |
780 | + return nil |
781 | +} |
782 | |
783 | === added file 'server.go' |
784 | --- server.go 1970-01-01 00:00:00 +0000 |
785 | +++ server.go 2011-10-10 13:55:28 +0000 |
786 | @@ -0,0 +1,229 @@ |
787 | +package zk |
788 | + |
789 | +import ( |
790 | + "bufio" |
791 | + "bytes" |
792 | + "fmt" |
793 | + "io/ioutil" |
794 | + "net" |
795 | + "os" |
796 | + "path/filepath" |
797 | + "strings" |
798 | +) |
799 | + |
800 | +// Server represents a ZooKeeper server, its data and configuration files. |
801 | +type Server struct { |
802 | + runDir string |
803 | + zkDir string |
804 | +} |
805 | + |
806 | +// CreateServer creates the directory runDir and sets up a ZooKeeper server |
807 | +// environment inside it. It is an error if runDir already exists. |
808 | +// The server will listen on the specified TCP port. |
809 | +// |
810 | +// The ZooKeeper installation directory is specified by zkDir. |
811 | +// If this is empty, a system default will be used. |
812 | +// |
813 | +// CreateServer does not start the server. |
814 | +func CreateServer(port int, runDir, zkDir string) (*Server, os.Error) { |
815 | + if err := os.Mkdir(runDir, 0777); err != nil { |
816 | + return nil, err |
817 | + } |
818 | + srv := &Server{runDir: runDir, zkDir: zkDir} |
819 | + if err := srv.writeLog4JConfig(); err != nil { |
820 | + return nil, err |
821 | + } |
822 | + if err := srv.writeZooKeeperConfig(port); err != nil { |
823 | + return nil, err |
824 | + } |
825 | + if err := srv.writeZkDir(); err != nil { |
826 | + return nil, err |
827 | + } |
828 | + return srv, nil |
829 | +} |
830 | + |
831 | +// AttachServer creates a new ZooKeeper Server instance |
832 | +// to operate inside an existing run directory, runDir. |
833 | +// The directory must have been created with CreateServer. |
834 | +func AttachServer(runDir string) (*Server, os.Error) { |
835 | + srv := &Server{runDir: runDir} |
836 | + if err := srv.readZkDir(); err != nil { |
837 | + return nil, fmt.Errorf("cannot read server install directory: %v", err) |
838 | + } |
839 | + return srv, nil |
840 | +} |
841 | + |
842 | +func (srv *Server) checkAvailability() os.Error { |
843 | + port, err := srv.networkPort() |
844 | + if err != nil { |
845 | + return fmt.Errorf("cannot get network port: %v", err) |
846 | + } |
847 | + l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) |
848 | + if err != nil { |
849 | + return fmt.Errorf("cannot listen on port %v: %v", port, err) |
850 | + } |
851 | + l.Close() |
852 | + return nil |
853 | +} |
854 | + |
855 | +// networkPort returns the TCP port number that |
856 | +// the server is configured for. |
857 | +func (srv *Server) networkPort() (int, os.Error) { |
858 | + f, err := os.Open(srv.path("zoo.cfg")) |
859 | + if err != nil { |
860 | + return 0, err |
861 | + } |
862 | + r := bufio.NewReader(f) |
863 | + for { |
864 | + line, err := r.ReadSlice('\n') |
865 | + if err != nil { |
866 | + return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg")) |
867 | + } |
868 | + var port int |
869 | + if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 { |
870 | + return port, nil |
871 | + } |
872 | + } |
873 | + panic("not reached") |
874 | +} |
875 | + |
876 | +// command returns the command used to start the |
877 | +// ZooKeeper server. |
878 | +func (srv *Server) command() ([]string, os.Error) { |
879 | + cp, err := srv.classPath() |
880 | + if err != nil { |
881 | + return nil, fmt.Errorf("cannot get class path: %v", err) |
882 | + } |
883 | + return []string{ |
884 | + "java", |
885 | + "-cp", strings.Join(cp, ":"), |
886 | + "-Dzookeeper.root.logger=INFO,CONSOLE", |
887 | + "-Dlog4j.configuration=file:" + srv.path("log4j.properties"), |
888 | + "org.apache.zookeeper.server.quorum.QuorumPeerMain", |
889 | + srv.path("zoo.cfg"), |
890 | + }, nil |
891 | +} |
892 | + |
893 | +var log4jProperties = ` |
894 | +log4j.rootLogger=INFO, CONSOLE |
895 | +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender |
896 | +log4j.appender.CONSOLE.Threshold=INFO |
897 | +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout |
898 | +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n |
899 | +` |
900 | + |
901 | +func (srv *Server) writeLog4JConfig() (err os.Error) { |
902 | + return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666) |
903 | +} |
904 | + |
905 | +func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) { |
906 | + return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf( |
907 | + "tickTime=2000\n"+ |
908 | + "dataDir=%s\n"+ |
909 | + "clientPort=%d\n"+ |
910 | + "maxClientCnxns=500\n", |
911 | + srv.runDir, port)), 0666) |
912 | +} |
913 | + |
914 | +func (srv *Server) writeZkDir() os.Error { |
915 | + return ioutil.WriteFile(srv.path("zkdir.txt"), []byte(srv.zkDir), 0666) |
916 | +} |
917 | + |
918 | +func (srv *Server) readZkDir() os.Error { |
919 | + data, err := ioutil.ReadFile(srv.path("zkdir.txt")) |
920 | + if err != nil { |
921 | + return err |
922 | + } |
923 | + srv.zkDir = string(data) |
924 | + return nil |
925 | +} |
926 | + |
927 | +func (srv *Server) classPath() ([]string, os.Error) { |
928 | + dir := srv.zkDir |
929 | + if dir == "" { |
930 | + return systemClassPath() |
931 | + } |
932 | + if err := checkDirectory(dir); err != nil { |
933 | + return nil, err |
934 | + } |
935 | + // Two possibilities, as seen in zkEnv.sh: |
936 | + // 1) locally built binaries (jars are in build directory) |
937 | + // 2) release binaries |
938 | + if build := filepath.Join(dir, "build"); checkDirectory(build) == nil { |
939 | + dir = build |
940 | + } |
941 | + classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar")) |
942 | + if err != nil { |
943 | + panic(fmt.Errorf("glob for jar files: %v", err)) |
944 | + } |
945 | + more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar")) |
946 | + if err != nil { |
947 | + panic(fmt.Errorf("glob for lib jar files: %v", err)) |
948 | + } |
949 | + |
950 | + classPath = append(classPath, more...) |
951 | + if len(classPath) == 0 { |
952 | + return nil, fmt.Errorf("zookeeper libraries not found in %q", dir) |
953 | + } |
954 | + return classPath, nil |
955 | +} |
956 | + |
957 | +const zookeeperEnviron = "/etc/zookeeper/conf/environment" |
958 | + |
959 | +func systemClassPath() ([]string, os.Error) { |
960 | + f, err := os.Open(zookeeperEnviron) |
961 | + if f == nil { |
962 | + return nil, err |
963 | + } |
964 | + r := bufio.NewReader(f) |
965 | + for { |
966 | + line, err := r.ReadSlice('\n') |
967 | + if err != nil { |
968 | + break |
969 | + } |
970 | + if !bytes.HasPrefix(line, []byte("CLASSPATH=")) { |
971 | + continue |
972 | + } |
973 | + |
974 | + // remove variable and newline |
975 | + path := string(line[len("CLASSPATH=") : len(line)-1]) |
976 | + |
977 | + // trim white space |
978 | + path = strings.Trim(path, " \t\r") |
979 | + |
980 | + // strip quotes |
981 | + if path[0] == '"' { |
982 | + path = path[1 : len(path)-1] |
983 | + } |
984 | + |
985 | + // split on : |
986 | + classPath := strings.Split(path, ":") |
987 | + |
988 | + // split off $ZOOCFGDIR |
989 | + if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" { |
990 | + classPath = classPath[1:] |
991 | + } |
992 | + |
993 | + if len(classPath) == 0 { |
994 | + return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron) |
995 | + } |
996 | + return classPath, nil |
997 | + } |
998 | + return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron) |
999 | +} |
1000 | + |
1001 | +// checkDirectory returns an error if the given path |
1002 | +// does not exist or is not a directory. |
1003 | +func checkDirectory(path string) os.Error { |
1004 | + if info, err := os.Stat(path); err != nil || !info.IsDirectory() { |
1005 | + if err == nil { |
1006 | + err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")} |
1007 | + } |
1008 | + return err |
1009 | + } |
1010 | + return nil |
1011 | +} |
1012 | + |
1013 | +func (srv *Server) path(name string) string { |
1014 | + return filepath.Join(srv.runDir, name) |
1015 | +} |
1016 | |
1017 | === modified file 'suite_test.go' |
1018 | --- suite_test.go 2011-08-19 01:43:37 +0000 |
1019 | +++ suite_test.go 2011-10-10 13:55:28 +0000 |
1020 | @@ -1,13 +1,11 @@ |
1021 | -package gozk_test |
1022 | +package zk_test |
1023 | |
1024 | import ( |
1025 | . "launchpad.net/gocheck" |
1026 | - "testing" |
1027 | - "io/ioutil" |
1028 | - "path" |
1029 | "fmt" |
1030 | + "launchpad.net/gozk/zk" |
1031 | "os" |
1032 | - "gozk" |
1033 | + "testing" |
1034 | "time" |
1035 | ) |
1036 | |
1037 | @@ -18,48 +16,32 @@ |
1038 | var _ = Suite(&S{}) |
1039 | |
1040 | type S struct { |
1041 | - zkRoot string |
1042 | - zkTestRoot string |
1043 | - zkTestPort int |
1044 | - zkServerSh string |
1045 | - zkServerOut *os.File |
1046 | - zkAddr string |
1047 | + zkServer *zk.Server |
1048 | + zkTestRoot string |
1049 | + zkTestPort int |
1050 | + zkProcess *os.Process // The running ZooKeeper process |
1051 | + zkAddr string |
1052 | |
1053 | - handles []*gozk.ZooKeeper |
1054 | - events []*gozk.Event |
1055 | + handles []*zk.Conn |
1056 | + events []*zk.Event |
1057 | liveWatches int |
1058 | deadWatches chan bool |
1059 | } |
1060 | |
1061 | -var logLevel = 0 //gozk.LOG_ERROR |
1062 | - |
1063 | - |
1064 | -var testZooCfg = ("dataDir=%s\n" + |
1065 | - "clientPort=%d\n" + |
1066 | - "tickTime=2000\n" + |
1067 | - "initLimit=10\n" + |
1068 | - "syncLimit=5\n" + |
1069 | - "") |
1070 | - |
1071 | -var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" + |
1072 | - "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + |
1073 | - "log4j.appender.CONSOLE.Threshold=DEBUG\n" + |
1074 | - "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + |
1075 | - "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" + |
1076 | - "") |
1077 | - |
1078 | -func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) { |
1079 | - zk, watch, err := gozk.Init(s.zkAddr, 5e9) |
1080 | +var logLevel = 0 //zk.LOG_ERROR |
1081 | + |
1082 | +func (s *S) init(c *C) (*zk.Conn, chan zk.Event) { |
1083 | + conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
1084 | c.Assert(err, IsNil) |
1085 | |
1086 | - s.handles = append(s.handles, zk) |
1087 | + s.handles = append(s.handles, conn) |
1088 | |
1089 | event := <-watch |
1090 | |
1091 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
1092 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
1093 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
1094 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
1095 | |
1096 | - bufferedWatch := make(chan gozk.Event, 256) |
1097 | + bufferedWatch := make(chan zk.Event, 256) |
1098 | bufferedWatch <- event |
1099 | |
1100 | s.liveWatches += 1 |
1101 | @@ -82,13 +64,13 @@ |
1102 | s.deadWatches <- true |
1103 | }() |
1104 | |
1105 | - return zk, bufferedWatch |
1106 | + return conn, bufferedWatch |
1107 | } |
1108 | |
1109 | func (s *S) SetUpTest(c *C) { |
1110 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
1111 | + c.Assert(zk.CountPendingWatches(), Equals, 0, |
1112 | Bug("Test got a dirty watch state before running!")) |
1113 | - gozk.SetLogLevel(logLevel) |
1114 | + zk.SetLogLevel(logLevel) |
1115 | } |
1116 | |
1117 | func (s *S) TearDownTest(c *C) { |
1118 | @@ -108,98 +90,37 @@ |
1119 | } |
1120 | |
1121 | // Reset the list of handles. |
1122 | - s.handles = make([]*gozk.ZooKeeper, 0) |
1123 | + s.handles = make([]*zk.Conn, 0) |
1124 | |
1125 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
1126 | + c.Assert(zk.CountPendingWatches(), Equals, 0, |
1127 | Bug("Test left live watches behind!")) |
1128 | } |
1129 | |
1130 | -// We use the suite set up and tear down to manage a custom zookeeper |
1131 | +// We use the suite set up and tear down to manage a custom ZooKeeper |
1132 | // |
1133 | func (s *S) SetUpSuite(c *C) { |
1134 | - |
1135 | + var err os.Error |
1136 | s.deadWatches = make(chan bool) |
1137 | |
1138 | - var err os.Error |
1139 | - |
1140 | - s.zkRoot = os.Getenv("ZKROOT") |
1141 | - if s.zkRoot == "" { |
1142 | - panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)") |
1143 | - } |
1144 | - |
1145 | - s.zkTestRoot = c.MkDir() |
1146 | - s.zkTestPort = 21812 |
1147 | - |
1148 | - println("ZooKeeper test server directory:", s.zkTestRoot) |
1149 | - println("ZooKeeper test server port:", s.zkTestPort) |
1150 | - |
1151 | - s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort) |
1152 | - |
1153 | - s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh") |
1154 | - s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"), |
1155 | - os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) |
1156 | - if err != nil { |
1157 | - panic("Can't open stdout.txt file for server: " + err.String()) |
1158 | - } |
1159 | - |
1160 | - dataDir := path.Join(s.zkTestRoot, "data") |
1161 | - confDir := path.Join(s.zkTestRoot, "conf") |
1162 | - |
1163 | - os.Mkdir(dataDir, 0755) |
1164 | - os.Mkdir(confDir, 0755) |
1165 | - |
1166 | - err = os.Setenv("ZOOCFGDIR", confDir) |
1167 | - if err != nil { |
1168 | - panic("Can't set $ZOOCFGDIR: " + err.String()) |
1169 | - } |
1170 | - |
1171 | - zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort)) |
1172 | - err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644) |
1173 | - if err != nil { |
1174 | - panic("Can't write zoo.cfg: " + err.String()) |
1175 | - } |
1176 | - |
1177 | - log4jPrp := []byte(testLog4jPrp) |
1178 | - err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644) |
1179 | - if err != nil { |
1180 | - panic("Can't write log4j.properties: " + err.String()) |
1181 | - } |
1182 | - |
1183 | - s.StartZK() |
1184 | + // N.B. We meed to create a subdirectory because zk.CreateServer |
1185 | + // insists on creating its own directory. |
1186 | + |
1187 | + s.zkTestRoot = c.MkDir() + "/zk" |
1188 | + port := 21812 |
1189 | + s.zkAddr = fmt.Sprint("localhost:", port) |
1190 | + |
1191 | + s.zkServer, err = zk.CreateServer(port, s.zkTestRoot, "") |
1192 | + if err != nil { |
1193 | + c.Fatal("Cannot set up server environment: ", err) |
1194 | + } |
1195 | + err = s.zkServer.Start() |
1196 | + if err != nil { |
1197 | + c.Fatal("Cannot start ZooKeeper server: ", err) |
1198 | + } |
1199 | } |
1200 | |
1201 | func (s *S) TearDownSuite(c *C) { |
1202 | - s.StopZK() |
1203 | - s.zkServerOut.Close() |
1204 | -} |
1205 | - |
1206 | -func (s *S) StartZK() { |
1207 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
1208 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr) |
1209 | - if err != nil { |
1210 | - panic("Problem executing zkServer.sh start: " + err.String()) |
1211 | - } |
1212 | - |
1213 | - result, err := proc.Wait(0) |
1214 | - if err != nil { |
1215 | - panic(err.String()) |
1216 | - } else if result.ExitStatus() != 0 { |
1217 | - panic("'zkServer.sh start' exited with non-zero status") |
1218 | - } |
1219 | -} |
1220 | - |
1221 | -func (s *S) StopZK() { |
1222 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
1223 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr) |
1224 | - if err != nil { |
1225 | - panic("Problem executing zkServer.sh stop: " + err.String() + |
1226 | - " (look for runaway java processes!)") |
1227 | - } |
1228 | - result, err := proc.Wait(0) |
1229 | - if err != nil { |
1230 | - panic(err.String()) |
1231 | - } else if result.ExitStatus() != 0 { |
1232 | - panic("'zkServer.sh stop' exited with non-zero status " + |
1233 | - "(look for runaway java processes!)") |
1234 | + if s.zkServer != nil { |
1235 | + s.zkServer.Destroy() |
1236 | } |
1237 | } |
1238 | |
1239 | === renamed file 'gozk.go' => 'zk.go' |
1240 | --- gozk.go 2011-08-19 01:56:39 +0000 |
1241 | +++ zk.go 2011-10-10 13:55:28 +0000 |
1242 | @@ -1,4 +1,4 @@ |
1243 | -// gozk - Zookeeper support for the Go language |
1244 | +// gozk - ZooKeeper support for the Go language |
1245 | // |
1246 | // https://wiki.ubuntu.com/gozk |
1247 | // |
1248 | @@ -6,7 +6,7 @@ |
1249 | // |
1250 | // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> |
1251 | // |
1252 | -package gozk |
1253 | +package zk |
1254 | |
1255 | /* |
1256 | #cgo CFLAGS: -I/usr/include/c-client-src |
1257 | @@ -27,17 +27,16 @@ |
1258 | // ----------------------------------------------------------------------- |
1259 | // Main constants and data types. |
1260 | |
1261 | -// The main ZooKeeper object, created through the Init function. |
1262 | -// Encapsulates all communication with ZooKeeper. |
1263 | -type ZooKeeper struct { |
1264 | +// Conn represents a connection to a set of ZooKeeper nodes. |
1265 | +type Conn struct { |
1266 | watchChannels map[uintptr]chan Event |
1267 | sessionWatchId uintptr |
1268 | handle *C.zhandle_t |
1269 | mutex sync.Mutex |
1270 | } |
1271 | |
1272 | -// ClientId represents the established session in ZooKeeper. This is only |
1273 | -// useful to be passed back into the ReInit function. |
1274 | +// ClientId represents an established ZooKeeper session. It can be |
1275 | +// passed into Redial to reestablish a connection to an existing session. |
1276 | type ClientId struct { |
1277 | cId C.clientid_t |
1278 | } |
1279 | @@ -98,35 +97,60 @@ |
1280 | State int |
1281 | } |
1282 | |
1283 | -// Error codes that may be used to verify the result of the |
1284 | -// Code method from Error. |
1285 | +// Error represents a ZooKeeper error. |
1286 | +type Error int |
1287 | + |
1288 | const ( |
1289 | - ZOK = C.ZOK |
1290 | - ZSYSTEMERROR = C.ZSYSTEMERROR |
1291 | - ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY |
1292 | - ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY |
1293 | - ZCONNECTIONLOSS = C.ZCONNECTIONLOSS |
1294 | - ZMARSHALLINGERROR = C.ZMARSHALLINGERROR |
1295 | - ZUNIMPLEMENTED = C.ZUNIMPLEMENTED |
1296 | - ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT |
1297 | - ZBADARGUMENTS = C.ZBADARGUMENTS |
1298 | - ZINVALIDSTATE = C.ZINVALIDSTATE |
1299 | - ZAPIERROR = C.ZAPIERROR |
1300 | - ZNONODE = C.ZNONODE |
1301 | - ZNOAUTH = C.ZNOAUTH |
1302 | - ZBADVERSION = C.ZBADVERSION |
1303 | - ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS |
1304 | - ZNODEEXISTS = C.ZNODEEXISTS |
1305 | - ZNOTEMPTY = C.ZNOTEMPTY |
1306 | - ZSESSIONEXPIRED = C.ZSESSIONEXPIRED |
1307 | - ZINVALIDCALLBACK = C.ZINVALIDCALLBACK |
1308 | - ZINVALIDACL = C.ZINVALIDACL |
1309 | - ZAUTHFAILED = C.ZAUTHFAILED |
1310 | - ZCLOSING = C.ZCLOSING |
1311 | - ZNOTHING = C.ZNOTHING |
1312 | - ZSESSIONMOVED = C.ZSESSIONMOVED |
1313 | + ZOK Error = C.ZOK |
1314 | + ZSYSTEMERROR Error = C.ZSYSTEMERROR |
1315 | + ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY |
1316 | + ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY |
1317 | + ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS |
1318 | + ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR |
1319 | + ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED |
1320 | + ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT |
1321 | + ZBADARGUMENTS Error = C.ZBADARGUMENTS |
1322 | + ZINVALIDSTATE Error = C.ZINVALIDSTATE |
1323 | + ZAPIERROR Error = C.ZAPIERROR |
1324 | + ZNONODE Error = C.ZNONODE |
1325 | + ZNOAUTH Error = C.ZNOAUTH |
1326 | + ZBADVERSION Error = C.ZBADVERSION |
1327 | + ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS |
1328 | + ZNODEEXISTS Error = C.ZNODEEXISTS |
1329 | + ZNOTEMPTY Error = C.ZNOTEMPTY |
1330 | + ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED |
1331 | + ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK |
1332 | + ZINVALIDACL Error = C.ZINVALIDACL |
1333 | + ZAUTHFAILED Error = C.ZAUTHFAILED |
1334 | + ZCLOSING Error = C.ZCLOSING |
1335 | + ZNOTHING Error = C.ZNOTHING |
1336 | + ZSESSIONMOVED Error = C.ZSESSIONMOVED |
1337 | ) |
1338 | |
1339 | +func (error Error) String() string { |
1340 | + return C.GoString(C.zerror(C.int(error))) // Static, no need to free it. |
1341 | +} |
1342 | + |
1343 | +// zkError creates an appropriate error return from |
1344 | +// a ZooKeeper status and the errno return from a C API |
1345 | +// call. |
1346 | +func zkError(rc C.int, cerr os.Error) os.Error { |
1347 | + code := Error(rc) |
1348 | + switch code { |
1349 | + case ZOK: |
1350 | + return nil |
1351 | + |
1352 | + case ZSYSTEMERROR: |
1353 | + // If a ZooKeeper call returns ZSYSTEMERROR, then |
1354 | + // errno becomes significant. If errno has not been |
1355 | + // set, then we will return ZSYSTEMERROR nonetheless. |
1356 | + if cerr != nil { |
1357 | + return cerr |
1358 | + } |
1359 | + } |
1360 | + return code |
1361 | +} |
1362 | + |
1363 | // Constants for SetLogLevel. |
1364 | const ( |
1365 | LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR |
1366 | @@ -273,104 +297,54 @@ |
1367 | } |
1368 | |
1369 | // ----------------------------------------------------------------------- |
1370 | -// Error interface which maps onto the ZooKeeper error codes. |
1371 | - |
1372 | -type Error interface { |
1373 | - String() string |
1374 | - Code() int |
1375 | -} |
1376 | - |
1377 | -type errorType struct { |
1378 | - zkrc C.int |
1379 | - err os.Error |
1380 | -} |
1381 | - |
1382 | -func newError(zkrc C.int, err os.Error) Error { |
1383 | - return &errorType{zkrc, err} |
1384 | -} |
1385 | - |
1386 | -func (error *errorType) String() (result string) { |
1387 | - if error.zkrc == ZSYSTEMERROR && error.err != nil { |
1388 | - result = error.err.String() |
1389 | - } else { |
1390 | - result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it. |
1391 | - } |
1392 | - return |
1393 | -} |
1394 | - |
1395 | -// Code returns the error code that may be compared against one of |
1396 | -// the gozk.Z* constants. |
1397 | -func (error *errorType) Code() int { |
1398 | - return int(error.zkrc) |
1399 | -} |
1400 | - |
1401 | -// ----------------------------------------------------------------------- |
1402 | -// Stat interface which maps onto the ZooKeeper Stat struct. |
1403 | - |
1404 | -// We declare this as an interface rather than an actual struct because |
1405 | -// this way we don't have to copy data around between the real C struct |
1406 | -// and the Go one on every call. Most uses will only touch a few elements, |
1407 | -// or even ignore the stat entirely, so that's a win. |
1408 | |
1409 | // Stat contains detailed information about a node. |
1410 | -type Stat interface { |
1411 | - Czxid() int64 |
1412 | - Mzxid() int64 |
1413 | - CTime() int64 |
1414 | - MTime() int64 |
1415 | - Version() int32 |
1416 | - CVersion() int32 |
1417 | - AVersion() int32 |
1418 | - EphemeralOwner() int64 |
1419 | - DataLength() int32 |
1420 | - NumChildren() int32 |
1421 | - Pzxid() int64 |
1422 | -} |
1423 | - |
1424 | -type resultStat C.struct_Stat |
1425 | - |
1426 | -func (stat *resultStat) Czxid() int64 { |
1427 | - return int64(stat.czxid) |
1428 | -} |
1429 | - |
1430 | -func (stat *resultStat) Mzxid() int64 { |
1431 | - return int64(stat.mzxid) |
1432 | -} |
1433 | - |
1434 | -func (stat *resultStat) CTime() int64 { |
1435 | - return int64(stat.ctime) |
1436 | -} |
1437 | - |
1438 | -func (stat *resultStat) MTime() int64 { |
1439 | - return int64(stat.mtime) |
1440 | -} |
1441 | - |
1442 | -func (stat *resultStat) Version() int32 { |
1443 | - return int32(stat.version) |
1444 | -} |
1445 | - |
1446 | -func (stat *resultStat) CVersion() int32 { |
1447 | - return int32(stat.cversion) |
1448 | -} |
1449 | - |
1450 | -func (stat *resultStat) AVersion() int32 { |
1451 | - return int32(stat.aversion) |
1452 | -} |
1453 | - |
1454 | -func (stat *resultStat) EphemeralOwner() int64 { |
1455 | - return int64(stat.ephemeralOwner) |
1456 | -} |
1457 | - |
1458 | -func (stat *resultStat) DataLength() int32 { |
1459 | - return int32(stat.dataLength) |
1460 | -} |
1461 | - |
1462 | -func (stat *resultStat) NumChildren() int32 { |
1463 | - return int32(stat.numChildren) |
1464 | -} |
1465 | - |
1466 | -func (stat *resultStat) Pzxid() int64 { |
1467 | - return int64(stat.pzxid) |
1468 | +type Stat struct { |
1469 | + c C.struct_Stat |
1470 | +} |
1471 | + |
1472 | +func (stat *Stat) Czxid() int64 { |
1473 | + return int64(stat.c.czxid) |
1474 | +} |
1475 | + |
1476 | +func (stat *Stat) Mzxid() int64 { |
1477 | + return int64(stat.c.mzxid) |
1478 | +} |
1479 | + |
1480 | +func (stat *Stat) CTime() int64 { |
1481 | + return int64(stat.c.ctime) |
1482 | +} |
1483 | + |
1484 | +func (stat *Stat) MTime() int64 { |
1485 | + return int64(stat.c.mtime) |
1486 | +} |
1487 | + |
1488 | +func (stat *Stat) Version() int32 { |
1489 | + return int32(stat.c.version) |
1490 | +} |
1491 | + |
1492 | +func (stat *Stat) CVersion() int32 { |
1493 | + return int32(stat.c.cversion) |
1494 | +} |
1495 | + |
1496 | +func (stat *Stat) AVersion() int32 { |
1497 | + return int32(stat.c.aversion) |
1498 | +} |
1499 | + |
1500 | +func (stat *Stat) EphemeralOwner() int64 { |
1501 | + return int64(stat.c.ephemeralOwner) |
1502 | +} |
1503 | + |
1504 | +func (stat *Stat) DataLength() int32 { |
1505 | + return int32(stat.c.dataLength) |
1506 | +} |
1507 | + |
1508 | +func (stat *Stat) NumChildren() int32 { |
1509 | + return int32(stat.c.numChildren) |
1510 | +} |
1511 | + |
1512 | +func (stat *Stat) Pzxid() int64 { |
1513 | + return int64(stat.c.pzxid) |
1514 | } |
1515 | |
1516 | // ----------------------------------------------------------------------- |
1517 | @@ -384,7 +358,7 @@ |
1518 | C.zoo_set_debug_level(C.ZooLogLevel(level)) |
1519 | } |
1520 | |
1521 | -// Init initializes the communication with a ZooKeeper cluster. The provided |
1522 | +// Dial initializes the communication with a ZooKeeper cluster. The provided |
1523 | // servers parameter may include multiple server addresses, separated |
1524 | // by commas, so that the client will automatically attempt to connect |
1525 | // to another server if one of them stops working for whatever reason. |
1526 | @@ -398,79 +372,73 @@ |
1527 | // The watch channel receives events of type SESSION_EVENT when any change |
1528 | // to the state of the established connection happens. See the documentation |
1529 | // for the Event type for more details. |
1530 | -func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) { |
1531 | - zk, watch, err = internalInit(servers, recvTimeoutNS, nil) |
1532 | - return |
1533 | +func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) { |
1534 | + return dial(servers, recvTimeoutNS, nil) |
1535 | } |
1536 | |
1537 | -// Equivalent to Init, but attempt to reestablish an existing session |
1538 | +// Redial is equivalent to Dial, but attempts to reestablish an existing session |
1539 | // identified via the clientId parameter. |
1540 | -func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) { |
1541 | - zk, watch, err = internalInit(servers, recvTimeoutNS, clientId) |
1542 | - return |
1543 | +func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1544 | + return dial(servers, recvTimeoutNS, clientId) |
1545 | } |
1546 | |
1547 | -func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) { |
1548 | - |
1549 | - zk := &ZooKeeper{} |
1550 | - zk.watchChannels = make(map[uintptr]chan Event) |
1551 | +func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1552 | + conn := &Conn{} |
1553 | + conn.watchChannels = make(map[uintptr]chan Event) |
1554 | |
1555 | var cId *C.clientid_t |
1556 | if clientId != nil { |
1557 | cId = &clientId.cId |
1558 | } |
1559 | |
1560 | - watchId, watchChannel := zk.createWatch(true) |
1561 | - zk.sessionWatchId = watchId |
1562 | + watchId, watchChannel := conn.createWatch(true) |
1563 | + conn.sessionWatchId = watchId |
1564 | |
1565 | cservers := C.CString(servers) |
1566 | - handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0) |
1567 | + handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0) |
1568 | C.free(unsafe.Pointer(cservers)) |
1569 | if handle == nil { |
1570 | - zk.closeAllWatches() |
1571 | - return nil, nil, newError(ZSYSTEMERROR, cerr) |
1572 | + conn.closeAllWatches() |
1573 | + return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr) |
1574 | } |
1575 | - zk.handle = handle |
1576 | + conn.handle = handle |
1577 | runWatchLoop() |
1578 | - return zk, watchChannel, nil |
1579 | + return conn, watchChannel, nil |
1580 | } |
1581 | |
1582 | // ClientId returns the client ID for the existing session with ZooKeeper. |
1583 | // This is useful to reestablish an existing session via ReInit. |
1584 | -func (zk *ZooKeeper) ClientId() *ClientId { |
1585 | - return &ClientId{*C.zoo_client_id(zk.handle)} |
1586 | +func (conn *Conn) ClientId() *ClientId { |
1587 | + return &ClientId{*C.zoo_client_id(conn.handle)} |
1588 | } |
1589 | |
1590 | // Close terminates the ZooKeeper interaction. |
1591 | -func (zk *ZooKeeper) Close() Error { |
1592 | - |
1593 | - // Protect from concurrency around zk.handle change. |
1594 | - zk.mutex.Lock() |
1595 | - defer zk.mutex.Unlock() |
1596 | - |
1597 | - if zk.handle == nil { |
1598 | +func (conn *Conn) Close() os.Error { |
1599 | + |
1600 | + // Protect from concurrency around conn.handle change. |
1601 | + conn.mutex.Lock() |
1602 | + defer conn.mutex.Unlock() |
1603 | + |
1604 | + if conn.handle == nil { |
1605 | // ZooKeeper may hang indefinitely if a handler is closed twice, |
1606 | // so we get in the way and prevent it from happening. |
1607 | - return newError(ZCLOSING, nil) |
1608 | + return ZCLOSING |
1609 | } |
1610 | - rc, cerr := C.zookeeper_close(zk.handle) |
1611 | + rc, cerr := C.zookeeper_close(conn.handle) |
1612 | |
1613 | - zk.closeAllWatches() |
1614 | + conn.closeAllWatches() |
1615 | stopWatchLoop() |
1616 | |
1617 | - // At this point, nothing else should need zk.handle. |
1618 | - zk.handle = nil |
1619 | + // At this point, nothing else should need conn.handle. |
1620 | + conn.handle = nil |
1621 | |
1622 | - if rc != C.ZOK { |
1623 | - return newError(rc, cerr) |
1624 | - } |
1625 | - return nil |
1626 | + return zkError(rc, cerr) |
1627 | } |
1628 | |
1629 | // Get returns the data and status from an existing node. err will be nil, |
1630 | // unless an error is found. Attempting to retrieve data from a non-existing |
1631 | // node is an error. |
1632 | -func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) { |
1633 | +func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) { |
1634 | |
1635 | cpath := C.CString(path) |
1636 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1637 | @@ -478,22 +446,21 @@ |
1638 | defer C.free(unsafe.Pointer(cpath)) |
1639 | defer C.free(unsafe.Pointer(cbuffer)) |
1640 | |
1641 | - cstat := C.struct_Stat{} |
1642 | - rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil, |
1643 | - cbuffer, &cbufferLen, &cstat) |
1644 | + var cstat Stat |
1645 | + rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c) |
1646 | if rc != C.ZOK { |
1647 | - return "", nil, newError(rc, cerr) |
1648 | + return "", nil, zkError(rc, cerr) |
1649 | } |
1650 | + |
1651 | result := C.GoStringN(cbuffer, cbufferLen) |
1652 | - |
1653 | - return result, (*resultStat)(&cstat), nil |
1654 | + return result, &cstat, nil |
1655 | } |
1656 | |
1657 | // GetW works like Get but also returns a channel that will receive |
1658 | // a single Event value when the data or existence of the given ZooKeeper |
1659 | // node changes or when critical session events happen. See the |
1660 | // documentation of the Event type for more details. |
1661 | -func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) { |
1662 | +func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) { |
1663 | |
1664 | cpath := C.CString(path) |
1665 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1666 | @@ -501,42 +468,38 @@ |
1667 | defer C.free(unsafe.Pointer(cpath)) |
1668 | defer C.free(unsafe.Pointer(cbuffer)) |
1669 | |
1670 | - watchId, watchChannel := zk.createWatch(true) |
1671 | + watchId, watchChannel := conn.createWatch(true) |
1672 | |
1673 | - cstat := C.struct_Stat{} |
1674 | - rc, cerr := C.zoo_wget(zk.handle, cpath, |
1675 | - C.watch_handler, unsafe.Pointer(watchId), |
1676 | - cbuffer, &cbufferLen, &cstat) |
1677 | + var cstat Stat |
1678 | + rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c) |
1679 | if rc != C.ZOK { |
1680 | - zk.forgetWatch(watchId) |
1681 | - return "", nil, nil, newError(rc, cerr) |
1682 | + conn.forgetWatch(watchId) |
1683 | + return "", nil, nil, zkError(rc, cerr) |
1684 | } |
1685 | |
1686 | result := C.GoStringN(cbuffer, cbufferLen) |
1687 | - return result, (*resultStat)(&cstat), watchChannel, nil |
1688 | + return result, &cstat, watchChannel, nil |
1689 | } |
1690 | |
1691 | // Children returns the children list and status from an existing node. |
1692 | -// err will be nil, unless an error is found. Attempting to retrieve the |
1693 | -// children list from a non-existent node is an error. |
1694 | -func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) { |
1695 | +// Attempting to retrieve the children list from a non-existent node is an error. |
1696 | +func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) { |
1697 | |
1698 | cpath := C.CString(path) |
1699 | defer C.free(unsafe.Pointer(cpath)) |
1700 | |
1701 | cvector := C.struct_String_vector{} |
1702 | - cstat := C.struct_Stat{} |
1703 | - rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil, |
1704 | - &cvector, &cstat) |
1705 | + var cstat Stat |
1706 | + rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c) |
1707 | |
1708 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1709 | if cvector.count != 0 { |
1710 | children = parseStringVector(&cvector) |
1711 | } |
1712 | - if rc != C.ZOK { |
1713 | - err = newError(rc, cerr) |
1714 | + if rc == C.ZOK { |
1715 | + stat = &cstat |
1716 | } else { |
1717 | - stat = (*resultStat)(&cstat) |
1718 | + err = zkError(rc, cerr) |
1719 | } |
1720 | return |
1721 | } |
1722 | @@ -545,29 +508,27 @@ |
1723 | // receive a single Event value when a node is added or removed under the |
1724 | // provided path or when critical session events happen. See the documentation |
1725 | // of the Event type for more details. |
1726 | -func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) { |
1727 | +func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) { |
1728 | |
1729 | cpath := C.CString(path) |
1730 | defer C.free(unsafe.Pointer(cpath)) |
1731 | |
1732 | - watchId, watchChannel := zk.createWatch(true) |
1733 | + watchId, watchChannel := conn.createWatch(true) |
1734 | |
1735 | cvector := C.struct_String_vector{} |
1736 | - cstat := C.struct_Stat{} |
1737 | - rc, cerr := C.zoo_wget_children2(zk.handle, cpath, |
1738 | - C.watch_handler, unsafe.Pointer(watchId), |
1739 | - &cvector, &cstat) |
1740 | + var cstat Stat |
1741 | + rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c) |
1742 | |
1743 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1744 | if cvector.count != 0 { |
1745 | children = parseStringVector(&cvector) |
1746 | } |
1747 | - if rc != C.ZOK { |
1748 | - zk.forgetWatch(watchId) |
1749 | - err = newError(rc, cerr) |
1750 | + if rc == C.ZOK { |
1751 | + stat = &cstat |
1752 | + watch = watchChannel |
1753 | } else { |
1754 | - stat = (*resultStat)(&cstat) |
1755 | - watch = watchChannel |
1756 | + conn.forgetWatch(watchId) |
1757 | + err = zkError(rc, cerr) |
1758 | } |
1759 | return |
1760 | } |
1761 | @@ -588,20 +549,20 @@ |
1762 | // Exists checks if a node exists at the given path. If it does, |
1763 | // stat will contain meta information on the existing node, otherwise |
1764 | // it will be nil. |
1765 | -func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) { |
1766 | +func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) { |
1767 | cpath := C.CString(path) |
1768 | defer C.free(unsafe.Pointer(cpath)) |
1769 | |
1770 | - cstat := C.struct_Stat{} |
1771 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat) |
1772 | + var cstat Stat |
1773 | + rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c) |
1774 | |
1775 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1776 | // for an exists call, otherwise every Exists call would have to check |
1777 | // for err != nil and err.Code() != ZNONODE. |
1778 | if rc == C.ZOK { |
1779 | - stat = (*resultStat)(&cstat) |
1780 | + stat = &cstat |
1781 | } else if rc != C.ZNONODE { |
1782 | - err = newError(rc, cerr) |
1783 | + err = zkError(rc, cerr) |
1784 | } |
1785 | return |
1786 | } |
1787 | @@ -611,28 +572,27 @@ |
1788 | // stat is nil and the node didn't exist, or when the existing node |
1789 | // is removed. It will also receive critical session events. See the |
1790 | // documentation of the Event type for more details. |
1791 | -func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) { |
1792 | +func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) { |
1793 | cpath := C.CString(path) |
1794 | defer C.free(unsafe.Pointer(cpath)) |
1795 | |
1796 | - watchId, watchChannel := zk.createWatch(true) |
1797 | + watchId, watchChannel := conn.createWatch(true) |
1798 | |
1799 | - cstat := C.struct_Stat{} |
1800 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, |
1801 | - C.watch_handler, unsafe.Pointer(watchId), &cstat) |
1802 | + var cstat Stat |
1803 | + rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c) |
1804 | |
1805 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1806 | // for an exists call, otherwise every Exists call would have to check |
1807 | // for err != nil and err.Code() != ZNONODE. |
1808 | - switch rc { |
1809 | + switch Error(rc) { |
1810 | case ZOK: |
1811 | - stat = (*resultStat)(&cstat) |
1812 | + stat = &cstat |
1813 | watch = watchChannel |
1814 | case ZNONODE: |
1815 | watch = watchChannel |
1816 | default: |
1817 | - zk.forgetWatch(watchId) |
1818 | - err = newError(rc, cerr) |
1819 | + conn.forgetWatch(watchId) |
1820 | + err = zkError(rc, cerr) |
1821 | } |
1822 | return |
1823 | } |
1824 | @@ -646,7 +606,7 @@ |
1825 | // The returned path is useful in cases where the created path may differ |
1826 | // from the requested one, such as when a sequence number is appended |
1827 | // to it due to the use of the gozk.SEQUENCE flag. |
1828 | -func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) { |
1829 | +func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) { |
1830 | cpath := C.CString(path) |
1831 | cvalue := C.CString(value) |
1832 | defer C.free(unsafe.Pointer(cpath)) |
1833 | @@ -660,13 +620,13 @@ |
1834 | cpathCreated := (*C.char)(C.malloc(cpathLen)) |
1835 | defer C.free(unsafe.Pointer(cpathCreated)) |
1836 | |
1837 | - rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)), |
1838 | - caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
1839 | - if rc != C.ZOK { |
1840 | - return "", newError(rc, cerr) |
1841 | + rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
1842 | + if rc == C.ZOK { |
1843 | + pathCreated = C.GoString(cpathCreated) |
1844 | + } else { |
1845 | + err = zkError(rc, cerr) |
1846 | } |
1847 | - |
1848 | - return C.GoString(cpathCreated), nil |
1849 | + return |
1850 | } |
1851 | |
1852 | // Set modifies the data for the existing node at the given path, replacing it |
1853 | @@ -677,35 +637,31 @@ |
1854 | // |
1855 | // It is an error to attempt to set the data of a non-existing node with |
1856 | // this function. In these cases, use Create instead. |
1857 | -func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) { |
1858 | +func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) { |
1859 | |
1860 | cpath := C.CString(path) |
1861 | cvalue := C.CString(value) |
1862 | defer C.free(unsafe.Pointer(cpath)) |
1863 | defer C.free(unsafe.Pointer(cvalue)) |
1864 | |
1865 | - cstat := C.struct_Stat{} |
1866 | - |
1867 | - rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)), |
1868 | - C.int(version), &cstat) |
1869 | - if rc != C.ZOK { |
1870 | - return nil, newError(rc, cerr) |
1871 | + var cstat Stat |
1872 | + rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c) |
1873 | + if rc == C.ZOK { |
1874 | + stat = &cstat |
1875 | + } else { |
1876 | + err = zkError(rc, cerr) |
1877 | } |
1878 | - |
1879 | - return (*resultStat)(&cstat), nil |
1880 | + return |
1881 | } |
1882 | |
1883 | // Delete removes the node at path. If version is not -1, the operation |
1884 | // will only succeed if the node is still at this version when the |
1885 | // node is deleted as an atomic operation. |
1886 | -func (zk *ZooKeeper) Delete(path string, version int32) (err Error) { |
1887 | +func (conn *Conn) Delete(path string, version int32) (err os.Error) { |
1888 | cpath := C.CString(path) |
1889 | defer C.free(unsafe.Pointer(cpath)) |
1890 | - rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version)) |
1891 | - if rc != C.ZOK { |
1892 | - return newError(rc, cerr) |
1893 | - } |
1894 | - return nil |
1895 | + rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) |
1896 | + return zkError(rc, cerr) |
1897 | } |
1898 | |
1899 | // AddAuth adds a new authentication certificate to the ZooKeeper |
1900 | @@ -713,7 +669,7 @@ |
1901 | // authentication information, while the cert parameter provides the |
1902 | // identity data itself. For instance, the "digest" scheme requires |
1903 | // a pair like "username:password" to be provided as the certificate. |
1904 | -func (zk *ZooKeeper) AddAuth(scheme, cert string) Error { |
1905 | +func (conn *Conn) AddAuth(scheme, cert string) os.Error { |
1906 | cscheme := C.CString(scheme) |
1907 | ccert := C.CString(cert) |
1908 | defer C.free(unsafe.Pointer(cscheme)) |
1909 | @@ -725,43 +681,38 @@ |
1910 | } |
1911 | defer C.destroy_completion_data(data) |
1912 | |
1913 | - rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)), |
1914 | - C.handle_void_completion, unsafe.Pointer(data)) |
1915 | + rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data)) |
1916 | if rc != C.ZOK { |
1917 | - return newError(rc, cerr) |
1918 | + return zkError(rc, cerr) |
1919 | } |
1920 | |
1921 | C.wait_for_completion(data) |
1922 | |
1923 | rc = C.int(uintptr(data.data)) |
1924 | - if rc != C.ZOK { |
1925 | - return newError(rc, nil) |
1926 | - } |
1927 | - |
1928 | - return nil |
1929 | + return zkError(rc, nil) |
1930 | } |
1931 | |
1932 | // ACL returns the access control list for path. |
1933 | -func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) { |
1934 | +func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) { |
1935 | |
1936 | cpath := C.CString(path) |
1937 | defer C.free(unsafe.Pointer(cpath)) |
1938 | |
1939 | caclv := C.struct_ACL_vector{} |
1940 | - cstat := C.struct_Stat{} |
1941 | |
1942 | - rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat) |
1943 | + var cstat Stat |
1944 | + rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) |
1945 | if rc != C.ZOK { |
1946 | - return nil, nil, newError(rc, cerr) |
1947 | + return nil, nil, zkError(rc, cerr) |
1948 | } |
1949 | |
1950 | aclv := parseACLVector(&caclv) |
1951 | |
1952 | - return aclv, (*resultStat)(&cstat), nil |
1953 | + return aclv, &cstat, nil |
1954 | } |
1955 | |
1956 | // SetACL changes the access control list for path. |
1957 | -func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error { |
1958 | +func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error { |
1959 | |
1960 | cpath := C.CString(path) |
1961 | defer C.free(unsafe.Pointer(cpath)) |
1962 | @@ -769,12 +720,8 @@ |
1963 | caclv := buildACLVector(aclv) |
1964 | defer C.deallocate_ACL_vector(caclv) |
1965 | |
1966 | - rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv) |
1967 | - if rc != C.ZOK { |
1968 | - return newError(rc, cerr) |
1969 | - } |
1970 | - |
1971 | - return nil |
1972 | + rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) |
1973 | + return zkError(rc, cerr) |
1974 | } |
1975 | |
1976 | func parseACLVector(caclv *C.struct_ACL_vector) []ACL { |
1977 | @@ -822,7 +769,7 @@ |
1978 | // ----------------------------------------------------------------------- |
1979 | // RetryChange utility method. |
1980 | |
1981 | -type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error) |
1982 | +type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error) |
1983 | |
1984 | // RetryChange runs changeFunc to attempt to atomically change path |
1985 | // in a lock free manner, and retries in case there was another |
1986 | @@ -831,8 +778,7 @@ |
1987 | // changeFunc must work correctly if called multiple times in case |
1988 | // the modification fails due to concurrent changes, and it may return |
1989 | // an error that will cause the the RetryChange function to stop and |
1990 | -// return an Error with code ZSYSTEMERROR and the same .String() result |
1991 | -// as the provided error. |
1992 | +// return the same error. |
1993 | // |
1994 | // This mechanism is not suitable for a node that is frequently modified |
1995 | // concurrently. For those cases, consider using a pessimistic locking |
1996 | @@ -845,8 +791,7 @@ |
1997 | // |
1998 | // 2. Call the changeFunc with the current node value and stat, |
1999 | // or with an empty string and nil stat, if the node doesn't yet exist. |
2000 | -// If the changeFunc returns an error, stop and return an Error with |
2001 | -// ZSYSTEMERROR Code() and the same String() as the changeFunc error. |
2002 | +// If the changeFunc returns an error, stop and return the same error. |
2003 | // |
2004 | // 3. If the changeFunc returns no errors, use the string returned as |
2005 | // the new candidate value for the node, and attempt to either create |
2006 | @@ -855,36 +800,32 @@ |
2007 | // in the same node), repeat from step 1. If this procedure fails with any |
2008 | // other error, stop and return the error found. |
2009 | // |
2010 | -func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) { |
2011 | +func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error { |
2012 | for { |
2013 | - oldValue, oldStat, getErr := zk.Get(path) |
2014 | - if getErr != nil && getErr.Code() != ZNONODE { |
2015 | - err = getErr |
2016 | - break |
2017 | - } |
2018 | - newValue, osErr := changeFunc(oldValue, oldStat) |
2019 | - if osErr != nil { |
2020 | - return newError(ZSYSTEMERROR, osErr) |
2021 | - } else if oldStat == nil { |
2022 | - _, err = zk.Create(path, newValue, flags, acl) |
2023 | - if err == nil || err.Code() != ZNODEEXISTS { |
2024 | - break |
2025 | + oldValue, oldStat, err := conn.Get(path) |
2026 | + if err != nil && err != ZNONODE { |
2027 | + return err |
2028 | + } |
2029 | + newValue, err := changeFunc(oldValue, oldStat) |
2030 | + if err != nil { |
2031 | + return err |
2032 | + } |
2033 | + if oldStat == nil { |
2034 | + _, err := conn.Create(path, newValue, flags, acl) |
2035 | + if err == nil || err != ZNODEEXISTS { |
2036 | + return err |
2037 | } |
2038 | - } else if newValue == oldValue { |
2039 | + continue |
2040 | + } |
2041 | + if newValue == oldValue { |
2042 | return nil // Nothing to do. |
2043 | - } else { |
2044 | - _, err = zk.Set(path, newValue, oldStat.Version()) |
2045 | - if err == nil { |
2046 | - break |
2047 | - } else { |
2048 | - code := err.Code() |
2049 | - if code != ZBADVERSION && code != ZNONODE { |
2050 | - break |
2051 | - } |
2052 | - } |
2053 | + } |
2054 | + _, err = conn.Set(path, newValue, oldStat.Version()) |
2055 | + if err == nil || (err != ZBADVERSION && err != ZNONODE) { |
2056 | + return err |
2057 | } |
2058 | } |
2059 | - return err |
2060 | + panic("not reached") |
2061 | } |
2062 | |
2063 | // ----------------------------------------------------------------------- |
2064 | @@ -897,7 +838,7 @@ |
2065 | // Whenever a *W method is called, it will return a channel which |
2066 | // outputs Event values. Internally, a map is used to maintain references |
2067 | // between an unique integer key (the watchId), and the event channel. The |
2068 | -// watchId is then handed to the C zookeeper library as the watch context, |
2069 | +// watchId is then handed to the C ZooKeeper library as the watch context, |
2070 | // so that we get it back when events happen. Using an integer key as the |
2071 | // watch context rather than a pointer is needed because there's no guarantee |
2072 | // that in the future the GC will not move objects around, and also because |
2073 | @@ -910,13 +851,13 @@ |
2074 | // Since Cgo doesn't allow calling back into Go, we actually fire a new |
2075 | // goroutine the very first time Init is called, and allow it to block |
2076 | // in a pthread condition variable within a C function. This condition |
2077 | -// will only be notified once a zookeeper watch callback appends new |
2078 | +// will only be notified once a ZooKeeper watch callback appends new |
2079 | // entries to the event list. When this happens, the C function returns |
2080 | // and we get back into Go land with the pointer to the watch data, |
2081 | // including the watchId and other event details such as type and path. |
2082 | |
2083 | var watchMutex sync.Mutex |
2084 | -var watchZooKeepers = make(map[uintptr]*ZooKeeper) |
2085 | +var watchConns = make(map[uintptr]*Conn) |
2086 | var watchCounter uintptr |
2087 | var watchLoopCounter int |
2088 | |
2089 | @@ -925,14 +866,14 @@ |
2090 | // mostly as a debugging and testing aid. |
2091 | func CountPendingWatches() int { |
2092 | watchMutex.Lock() |
2093 | - count := len(watchZooKeepers) |
2094 | + count := len(watchConns) |
2095 | watchMutex.Unlock() |
2096 | return count |
2097 | } |
2098 | |
2099 | // createWatch creates and registers a watch, returning the watch id |
2100 | // and channel. |
2101 | -func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
2102 | +func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
2103 | buf := 1 // session/watch event |
2104 | if session { |
2105 | buf = 32 |
2106 | @@ -942,8 +883,8 @@ |
2107 | defer watchMutex.Unlock() |
2108 | watchId = watchCounter |
2109 | watchCounter += 1 |
2110 | - zk.watchChannels[watchId] = watchChannel |
2111 | - watchZooKeepers[watchId] = zk |
2112 | + conn.watchChannels[watchId] = watchChannel |
2113 | + watchConns[watchId] = conn |
2114 | return |
2115 | } |
2116 | |
2117 | @@ -951,21 +892,21 @@ |
2118 | // from ever getting delivered. It shouldn't be used if there's any |
2119 | // chance the watch channel is still visible and not closed, since |
2120 | // it might mean a goroutine would be blocked forever. |
2121 | -func (zk *ZooKeeper) forgetWatch(watchId uintptr) { |
2122 | +func (conn *Conn) forgetWatch(watchId uintptr) { |
2123 | watchMutex.Lock() |
2124 | defer watchMutex.Unlock() |
2125 | - zk.watchChannels[watchId] = nil, false |
2126 | - watchZooKeepers[watchId] = nil, false |
2127 | + conn.watchChannels[watchId] = nil, false |
2128 | + watchConns[watchId] = nil, false |
2129 | } |
2130 | |
2131 | -// closeAllWatches closes all watch channels for zk. |
2132 | -func (zk *ZooKeeper) closeAllWatches() { |
2133 | +// closeAllWatches closes all watch channels for conn. |
2134 | +func (conn *Conn) closeAllWatches() { |
2135 | watchMutex.Lock() |
2136 | defer watchMutex.Unlock() |
2137 | - for watchId, ch := range zk.watchChannels { |
2138 | + for watchId, ch := range conn.watchChannels { |
2139 | close(ch) |
2140 | - zk.watchChannels[watchId] = nil, false |
2141 | - watchZooKeepers[watchId] = nil, false |
2142 | + conn.watchChannels[watchId] = nil, false |
2143 | + watchConns[watchId] = nil, false |
2144 | } |
2145 | } |
2146 | |
2147 | @@ -978,11 +919,11 @@ |
2148 | } |
2149 | watchMutex.Lock() |
2150 | defer watchMutex.Unlock() |
2151 | - zk, ok := watchZooKeepers[watchId] |
2152 | + conn, ok := watchConns[watchId] |
2153 | if !ok { |
2154 | return |
2155 | } |
2156 | - if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId { |
2157 | + if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId { |
2158 | switch event.State { |
2159 | case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED: |
2160 | default: |
2161 | @@ -990,7 +931,7 @@ |
2162 | return |
2163 | } |
2164 | } |
2165 | - ch := zk.watchChannels[watchId] |
2166 | + ch := conn.watchChannels[watchId] |
2167 | if ch == nil { |
2168 | return |
2169 | } |
2170 | @@ -1002,15 +943,15 @@ |
2171 | // straight to the buffer), and the application isn't paying |
2172 | // attention for long enough to have the buffer filled up. |
2173 | // Break down now rather than leaking forever. |
2174 | - if watchId == zk.sessionWatchId { |
2175 | + if watchId == conn.sessionWatchId { |
2176 | panic("Session event channel buffer is full") |
2177 | } else { |
2178 | panic("Watch event channel buffer is full") |
2179 | } |
2180 | } |
2181 | - if watchId != zk.sessionWatchId { |
2182 | - zk.watchChannels[watchId] = nil, false |
2183 | - watchZooKeepers[watchId] = nil, false |
2184 | + if watchId != conn.sessionWatchId { |
2185 | + conn.watchChannels[watchId] = nil, false |
2186 | + watchConns[watchId] = nil, false |
2187 | close(ch) |
2188 | } |
2189 | } |
2190 | |
2191 | === renamed file 'gozk_test.go' => 'zk_test.go' |
2192 | --- gozk_test.go 2011-08-19 01:51:37 +0000 |
2193 | +++ zk_test.go 2011-10-10 13:55:28 +0000 |
2194 | @@ -1,17 +1,17 @@ |
2195 | -package gozk_test |
2196 | +package zk_test |
2197 | |
2198 | import ( |
2199 | . "launchpad.net/gocheck" |
2200 | - "gozk" |
2201 | + "launchpad.net/gozk/zk" |
2202 | "time" |
2203 | ) |
2204 | |
2205 | // This error will be delivered via C errno, since ZK unfortunately |
2206 | // only provides the handler back from zookeeper_init(). |
2207 | func (s *S) TestInitErrorThroughErrno(c *C) { |
2208 | - zk, watch, err := gozk.Init("bad-domain-without-port", 5e9) |
2209 | - if zk != nil { |
2210 | - zk.Close() |
2211 | + conn, watch, err := zk.Dial("bad-domain-without-port", 5e9) |
2212 | + if conn != nil { |
2213 | + conn.Close() |
2214 | } |
2215 | if watch != nil { |
2216 | go func() { |
2217 | @@ -23,15 +23,15 @@ |
2218 | } |
2219 | }() |
2220 | } |
2221 | - c.Assert(zk, IsNil) |
2222 | + c.Assert(conn, IsNil) |
2223 | c.Assert(watch, IsNil) |
2224 | c.Assert(err, Matches, "invalid argument") |
2225 | } |
2226 | |
2227 | func (s *S) TestRecvTimeoutInitParameter(c *C) { |
2228 | - zk, watch, err := gozk.Init(s.zkAddr, 0) |
2229 | + conn, watch, err := zk.Dial(s.zkAddr, 0) |
2230 | c.Assert(err, IsNil) |
2231 | - defer zk.Close() |
2232 | + defer conn.Close() |
2233 | |
2234 | select { |
2235 | case <-watch: |
2236 | @@ -40,7 +40,7 @@ |
2237 | } |
2238 | |
2239 | for i := 0; i != 1000; i++ { |
2240 | - _, _, err := zk.Get("/zookeeper") |
2241 | + _, _, err := conn.Get("/zookeeper") |
2242 | if err != nil { |
2243 | c.Assert(err, Matches, "operation timeout") |
2244 | c.SucceedNow() |
2245 | @@ -51,76 +51,79 @@ |
2246 | } |
2247 | |
2248 | func (s *S) TestSessionWatches(c *C) { |
2249 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2250 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2251 | |
2252 | zk1, watch1 := s.init(c) |
2253 | zk2, watch2 := s.init(c) |
2254 | zk3, watch3 := s.init(c) |
2255 | |
2256 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2257 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2258 | |
2259 | event1 := <-watch1 |
2260 | - c.Assert(event1.Type, Equals, gozk.EVENT_SESSION) |
2261 | - c.Assert(event1.State, Equals, gozk.STATE_CONNECTED) |
2262 | + c.Assert(event1.Type, Equals, zk.EVENT_SESSION) |
2263 | + c.Assert(event1.State, Equals, zk.STATE_CONNECTED) |
2264 | |
2265 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2266 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2267 | |
2268 | event2 := <-watch2 |
2269 | - c.Assert(event2.Type, Equals, gozk.EVENT_SESSION) |
2270 | - c.Assert(event2.State, Equals, gozk.STATE_CONNECTED) |
2271 | + c.Assert(event2.Type, Equals, zk.EVENT_SESSION) |
2272 | + c.Assert(event2.State, Equals, zk.STATE_CONNECTED) |
2273 | |
2274 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2275 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2276 | |
2277 | event3 := <-watch3 |
2278 | - c.Assert(event3.Type, Equals, gozk.EVENT_SESSION) |
2279 | - c.Assert(event3.State, Equals, gozk.STATE_CONNECTED) |
2280 | + c.Assert(event3.Type, Equals, zk.EVENT_SESSION) |
2281 | + c.Assert(event3.State, Equals, zk.STATE_CONNECTED) |
2282 | |
2283 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2284 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2285 | |
2286 | zk1.Close() |
2287 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
2288 | + c.Assert(zk.CountPendingWatches(), Equals, 2) |
2289 | zk2.Close() |
2290 | - c.Assert(gozk.CountPendingWatches(), Equals, 1) |
2291 | + c.Assert(zk.CountPendingWatches(), Equals, 1) |
2292 | zk3.Close() |
2293 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2294 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2295 | } |
2296 | |
2297 | -// Gozk injects a STATE_CLOSED event when zk.Close() is called, right |
2298 | +// Gozk injects a STATE_CLOSED event when conn.Close() is called, right |
2299 | // before the channel is closed. Closing the channel injects a nil |
2300 | // pointer, as usual for Go, so the STATE_CLOSED gives a chance to |
2301 | // know that a nil pointer is coming, and to stop the procedure. |
2302 | // Hopefully this procedure will avoid some nil-pointer references by |
2303 | // mistake. |
2304 | func (s *S) TestClosingStateInSessionWatch(c *C) { |
2305 | - zk, watch := s.init(c) |
2306 | + conn, watch := s.init(c) |
2307 | |
2308 | event := <-watch |
2309 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2310 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2311 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
2312 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
2313 | |
2314 | - zk.Close() |
2315 | + conn.Close() |
2316 | event, ok := <-watch |
2317 | c.Assert(ok, Equals, false) |
2318 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
2319 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
2320 | + c.Assert(event.Type, Equals, zk.EVENT_CLOSED) |
2321 | + c.Assert(event.State, Equals, zk.STATE_CLOSED) |
2322 | } |
2323 | |
2324 | func (s *S) TestEventString(c *C) { |
2325 | - var event gozk.Event |
2326 | - event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED} |
2327 | + var event zk.Event |
2328 | + event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED} |
2329 | c.Assert(event, Matches, "ZooKeeper connected") |
2330 | - event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED} |
2331 | + event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED} |
2332 | c.Assert(event, Matches, "ZooKeeper connected; path created: /path") |
2333 | - event = gozk.Event{-1, "/path", gozk.STATE_CLOSED} |
2334 | + event = zk.Event{-1, "/path", zk.STATE_CLOSED} |
2335 | c.Assert(event, Matches, "ZooKeeper connection closed") |
2336 | } |
2337 | |
2338 | -var okTests = []struct{gozk.Event; Ok bool}{ |
2339 | - {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true}, |
2340 | - {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true}, |
2341 | - {gozk.Event{0, "", gozk.STATE_CLOSED}, false}, |
2342 | - {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false}, |
2343 | - {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false}, |
2344 | +var okTests = []struct { |
2345 | + zk.Event |
2346 | + Ok bool |
2347 | +}{ |
2348 | + {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true}, |
2349 | + {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true}, |
2350 | + {zk.Event{0, "", zk.STATE_CLOSED}, false}, |
2351 | + {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false}, |
2352 | + {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false}, |
2353 | } |
2354 | |
2355 | func (s *S) TestEventOk(c *C) { |
2356 | @@ -130,9 +133,9 @@ |
2357 | } |
2358 | |
2359 | func (s *S) TestGetAndStat(c *C) { |
2360 | - zk, _ := s.init(c) |
2361 | + conn, _ := s.init(c) |
2362 | |
2363 | - data, stat, err := zk.Get("/zookeeper") |
2364 | + data, stat, err := conn.Get("/zookeeper") |
2365 | c.Assert(err, IsNil) |
2366 | c.Assert(data, Equals, "") |
2367 | c.Assert(stat.Czxid(), Equals, int64(0)) |
2368 | @@ -149,58 +152,58 @@ |
2369 | } |
2370 | |
2371 | func (s *S) TestGetAndError(c *C) { |
2372 | - zk, _ := s.init(c) |
2373 | + conn, _ := s.init(c) |
2374 | |
2375 | - data, stat, err := zk.Get("/non-existent") |
2376 | + data, stat, err := conn.Get("/non-existent") |
2377 | |
2378 | c.Assert(data, Equals, "") |
2379 | c.Assert(stat, IsNil) |
2380 | c.Assert(err, Matches, "no node") |
2381 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2382 | + c.Assert(err, Equals, zk.ZNONODE) |
2383 | } |
2384 | |
2385 | func (s *S) TestCreateAndGet(c *C) { |
2386 | - zk, _ := s.init(c) |
2387 | + conn, _ := s.init(c) |
2388 | |
2389 | - path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2390 | + path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2391 | c.Assert(err, IsNil) |
2392 | c.Assert(path, Matches, "/test-[0-9]+") |
2393 | |
2394 | // Check the error condition from Create(). |
2395 | - _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2396 | + _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2397 | c.Assert(err, Matches, "node exists") |
2398 | |
2399 | - data, _, err := zk.Get(path) |
2400 | + data, _, err := conn.Get(path) |
2401 | c.Assert(err, IsNil) |
2402 | c.Assert(data, Equals, "bababum") |
2403 | } |
2404 | |
2405 | func (s *S) TestCreateSetAndGet(c *C) { |
2406 | - zk, _ := s.init(c) |
2407 | + conn, _ := s.init(c) |
2408 | |
2409 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2410 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2411 | c.Assert(err, IsNil) |
2412 | |
2413 | - stat, err := zk.Set("/test", "bababum", -1) // Any version. |
2414 | + stat, err := conn.Set("/test", "bababum", -1) // Any version. |
2415 | c.Assert(err, IsNil) |
2416 | c.Assert(stat.Version(), Equals, int32(1)) |
2417 | |
2418 | - data, _, err := zk.Get("/test") |
2419 | + data, _, err := conn.Get("/test") |
2420 | c.Assert(err, IsNil) |
2421 | c.Assert(data, Equals, "bababum") |
2422 | } |
2423 | |
2424 | func (s *S) TestGetAndWatch(c *C) { |
2425 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2426 | - |
2427 | - zk, _ := s.init(c) |
2428 | - |
2429 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2430 | - |
2431 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2432 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2433 | + |
2434 | + conn, _ := s.init(c) |
2435 | + |
2436 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2437 | + |
2438 | + _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2439 | c.Assert(err, IsNil) |
2440 | |
2441 | - data, stat, watch, err := zk.GetW("/test") |
2442 | + data, stat, watch, err := conn.GetW("/test") |
2443 | c.Assert(err, IsNil) |
2444 | c.Assert(data, Equals, "one") |
2445 | c.Assert(stat.Version(), Equals, int32(0)) |
2446 | @@ -211,17 +214,17 @@ |
2447 | default: |
2448 | } |
2449 | |
2450 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2451 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2452 | |
2453 | - _, err = zk.Set("/test", "two", -1) |
2454 | + _, err = conn.Set("/test", "two", -1) |
2455 | c.Assert(err, IsNil) |
2456 | |
2457 | event := <-watch |
2458 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2459 | - |
2460 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2461 | - |
2462 | - data, _, watch, err = zk.GetW("/test") |
2463 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2464 | + |
2465 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2466 | + |
2467 | + data, _, watch, err = conn.GetW("/test") |
2468 | c.Assert(err, IsNil) |
2469 | c.Assert(data, Equals, "two") |
2470 | |
2471 | @@ -231,86 +234,86 @@ |
2472 | default: |
2473 | } |
2474 | |
2475 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2476 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2477 | |
2478 | - _, err = zk.Set("/test", "three", -1) |
2479 | + _, err = conn.Set("/test", "three", -1) |
2480 | c.Assert(err, IsNil) |
2481 | |
2482 | event = <-watch |
2483 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2484 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2485 | |
2486 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2487 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2488 | } |
2489 | |
2490 | func (s *S) TestGetAndWatchWithError(c *C) { |
2491 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2492 | - |
2493 | - zk, _ := s.init(c) |
2494 | - |
2495 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2496 | - |
2497 | - _, _, watch, err := zk.GetW("/test") |
2498 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2499 | + |
2500 | + conn, _ := s.init(c) |
2501 | + |
2502 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2503 | + |
2504 | + _, _, watch, err := conn.GetW("/test") |
2505 | c.Assert(err, NotNil) |
2506 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2507 | + c.Assert(err, Equals, zk.ZNONODE) |
2508 | c.Assert(watch, IsNil) |
2509 | |
2510 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2511 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2512 | } |
2513 | |
2514 | func (s *S) TestCloseReleasesWatches(c *C) { |
2515 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2516 | - |
2517 | - zk, _ := s.init(c) |
2518 | - |
2519 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2520 | - |
2521 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2522 | - c.Assert(err, IsNil) |
2523 | - |
2524 | - _, _, _, err = zk.GetW("/test") |
2525 | - c.Assert(err, IsNil) |
2526 | - |
2527 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
2528 | - |
2529 | - zk.Close() |
2530 | - |
2531 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2532 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2533 | + |
2534 | + conn, _ := s.init(c) |
2535 | + |
2536 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2537 | + |
2538 | + _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2539 | + c.Assert(err, IsNil) |
2540 | + |
2541 | + _, _, _, err = conn.GetW("/test") |
2542 | + c.Assert(err, IsNil) |
2543 | + |
2544 | + c.Assert(zk.CountPendingWatches(), Equals, 2) |
2545 | + |
2546 | + conn.Close() |
2547 | + |
2548 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2549 | } |
2550 | |
2551 | // By default, the ZooKeeper C client will hang indefinitely if a |
2552 | // handler is closed twice. We get in the way and prevent it. |
2553 | func (s *S) TestClosingTwiceDoesntHang(c *C) { |
2554 | - zk, _ := s.init(c) |
2555 | - err := zk.Close() |
2556 | + conn, _ := s.init(c) |
2557 | + err := conn.Close() |
2558 | c.Assert(err, IsNil) |
2559 | - err = zk.Close() |
2560 | + err = conn.Close() |
2561 | c.Assert(err, NotNil) |
2562 | - c.Assert(err.Code(), Equals, gozk.ZCLOSING) |
2563 | + c.Assert(err, Equals, zk.ZCLOSING) |
2564 | } |
2565 | |
2566 | func (s *S) TestChildren(c *C) { |
2567 | - zk, _ := s.init(c) |
2568 | + conn, _ := s.init(c) |
2569 | |
2570 | - children, stat, err := zk.Children("/") |
2571 | + children, stat, err := conn.Children("/") |
2572 | c.Assert(err, IsNil) |
2573 | c.Assert(children, Equals, []string{"zookeeper"}) |
2574 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
2575 | |
2576 | - children, stat, err = zk.Children("/non-existent") |
2577 | + children, stat, err = conn.Children("/non-existent") |
2578 | c.Assert(err, NotNil) |
2579 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2580 | + c.Assert(err, Equals, zk.ZNONODE) |
2581 | c.Assert(children, Equals, []string{}) |
2582 | - c.Assert(stat, Equals, nil) |
2583 | + c.Assert(stat, IsNil) |
2584 | } |
2585 | |
2586 | func (s *S) TestChildrenAndWatch(c *C) { |
2587 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2588 | - |
2589 | - zk, _ := s.init(c) |
2590 | - |
2591 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2592 | - |
2593 | - children, stat, watch, err := zk.ChildrenW("/") |
2594 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2595 | + |
2596 | + conn, _ := s.init(c) |
2597 | + |
2598 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2599 | + |
2600 | + children, stat, watch, err := conn.ChildrenW("/") |
2601 | c.Assert(err, IsNil) |
2602 | c.Assert(children, Equals, []string{"zookeeper"}) |
2603 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
2604 | @@ -321,18 +324,18 @@ |
2605 | default: |
2606 | } |
2607 | |
2608 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2609 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2610 | |
2611 | - _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2612 | + _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2613 | c.Assert(err, IsNil) |
2614 | |
2615 | event := <-watch |
2616 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
2617 | + c.Assert(event.Type, Equals, zk.EVENT_CHILD) |
2618 | c.Assert(event.Path, Equals, "/") |
2619 | |
2620 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2621 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2622 | |
2623 | - children, stat, watch, err = zk.ChildrenW("/") |
2624 | + children, stat, watch, err = conn.ChildrenW("/") |
2625 | c.Assert(err, IsNil) |
2626 | c.Assert(stat.NumChildren(), Equals, int32(2)) |
2627 | |
2628 | @@ -345,57 +348,56 @@ |
2629 | default: |
2630 | } |
2631 | |
2632 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2633 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2634 | |
2635 | - _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2636 | + _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2637 | c.Assert(err, IsNil) |
2638 | |
2639 | event = <-watch |
2640 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
2641 | + c.Assert(event.Type, Equals, zk.EVENT_CHILD) |
2642 | |
2643 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2644 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2645 | } |
2646 | |
2647 | func (s *S) TestChildrenAndWatchWithError(c *C) { |
2648 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2649 | - |
2650 | - zk, _ := s.init(c) |
2651 | - |
2652 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2653 | - |
2654 | - _, stat, watch, err := zk.ChildrenW("/test") |
2655 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2656 | + |
2657 | + conn, _ := s.init(c) |
2658 | + |
2659 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2660 | + |
2661 | + _, stat, watch, err := conn.ChildrenW("/test") |
2662 | c.Assert(err, NotNil) |
2663 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2664 | + c.Assert(err, Equals, zk.ZNONODE) |
2665 | c.Assert(watch, IsNil) |
2666 | c.Assert(stat, IsNil) |
2667 | |
2668 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2669 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2670 | } |
2671 | |
2672 | func (s *S) TestExists(c *C) { |
2673 | - zk, _ := s.init(c) |
2674 | - |
2675 | - stat, err := zk.Exists("/zookeeper") |
2676 | - c.Assert(err, IsNil) |
2677 | - c.Assert(stat.NumChildren(), Equals, int32(1)) |
2678 | - |
2679 | - stat, err = zk.Exists("/non-existent") |
2680 | + conn, _ := s.init(c) |
2681 | + |
2682 | + stat, err := conn.Exists("/non-existent") |
2683 | c.Assert(err, IsNil) |
2684 | c.Assert(stat, IsNil) |
2685 | + |
2686 | + stat, err = conn.Exists("/zookeeper") |
2687 | + c.Assert(err, IsNil) |
2688 | } |
2689 | |
2690 | func (s *S) TestExistsAndWatch(c *C) { |
2691 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2692 | - |
2693 | - zk, _ := s.init(c) |
2694 | - |
2695 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2696 | - |
2697 | - stat, watch, err := zk.ExistsW("/test") |
2698 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2699 | + |
2700 | + conn, _ := s.init(c) |
2701 | + |
2702 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2703 | + |
2704 | + stat, watch, err := conn.ExistsW("/test") |
2705 | c.Assert(err, IsNil) |
2706 | c.Assert(stat, IsNil) |
2707 | |
2708 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2709 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2710 | |
2711 | select { |
2712 | case <-watch: |
2713 | @@ -403,62 +405,62 @@ |
2714 | default: |
2715 | } |
2716 | |
2717 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2718 | + _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2719 | c.Assert(err, IsNil) |
2720 | |
2721 | event := <-watch |
2722 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
2723 | + c.Assert(event.Type, Equals, zk.EVENT_CREATED) |
2724 | c.Assert(event.Path, Equals, "/test") |
2725 | |
2726 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2727 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2728 | |
2729 | - stat, watch, err = zk.ExistsW("/test") |
2730 | + stat, watch, err = conn.ExistsW("/test") |
2731 | c.Assert(err, IsNil) |
2732 | c.Assert(stat, NotNil) |
2733 | c.Assert(stat.NumChildren(), Equals, int32(0)) |
2734 | |
2735 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2736 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2737 | } |
2738 | |
2739 | func (s *S) TestExistsAndWatchWithError(c *C) { |
2740 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2741 | - |
2742 | - zk, _ := s.init(c) |
2743 | - |
2744 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2745 | - |
2746 | - stat, watch, err := zk.ExistsW("///") |
2747 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2748 | + |
2749 | + conn, _ := s.init(c) |
2750 | + |
2751 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2752 | + |
2753 | + stat, watch, err := conn.ExistsW("///") |
2754 | c.Assert(err, NotNil) |
2755 | - c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS) |
2756 | + c.Assert(err, Equals, zk.ZBADARGUMENTS) |
2757 | c.Assert(stat, IsNil) |
2758 | c.Assert(watch, IsNil) |
2759 | |
2760 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2761 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2762 | } |
2763 | |
2764 | func (s *S) TestDelete(c *C) { |
2765 | - zk, _ := s.init(c) |
2766 | - |
2767 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2768 | - c.Assert(err, IsNil) |
2769 | - |
2770 | - err = zk.Delete("/test", 5) |
2771 | - c.Assert(err, NotNil) |
2772 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
2773 | - |
2774 | - err = zk.Delete("/test", -1) |
2775 | - c.Assert(err, IsNil) |
2776 | - |
2777 | - err = zk.Delete("/test", -1) |
2778 | - c.Assert(err, NotNil) |
2779 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2780 | + conn, _ := s.init(c) |
2781 | + |
2782 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2783 | + c.Assert(err, IsNil) |
2784 | + |
2785 | + err = conn.Delete("/test", 5) |
2786 | + c.Assert(err, NotNil) |
2787 | + c.Assert(err, Equals, zk.ZBADVERSION) |
2788 | + |
2789 | + err = conn.Delete("/test", -1) |
2790 | + c.Assert(err, IsNil) |
2791 | + |
2792 | + err = conn.Delete("/test", -1) |
2793 | + c.Assert(err, NotNil) |
2794 | + c.Assert(err, Equals, zk.ZNONODE) |
2795 | } |
2796 | |
2797 | func (s *S) TestClientIdAndReInit(c *C) { |
2798 | zk1, _ := s.init(c) |
2799 | clientId1 := zk1.ClientId() |
2800 | |
2801 | - zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1) |
2802 | + zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1) |
2803 | c.Assert(err, IsNil) |
2804 | defer zk2.Close() |
2805 | clientId2 := zk2.ClientId() |
2806 | @@ -469,110 +471,110 @@ |
2807 | // Surprisingly for some (including myself, initially), the watch |
2808 | // returned by the exists method actually fires on data changes too. |
2809 | func (s *S) TestExistsWatchOnDataChange(c *C) { |
2810 | - zk, _ := s.init(c) |
2811 | - |
2812 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2813 | - c.Assert(err, IsNil) |
2814 | - |
2815 | - _, watch, err := zk.ExistsW("/test") |
2816 | - c.Assert(err, IsNil) |
2817 | - |
2818 | - _, err = zk.Set("/test", "new", -1) |
2819 | + conn, _ := s.init(c) |
2820 | + |
2821 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2822 | + c.Assert(err, IsNil) |
2823 | + |
2824 | + _, watch, err := conn.ExistsW("/test") |
2825 | + c.Assert(err, IsNil) |
2826 | + |
2827 | + _, err = conn.Set("/test", "new", -1) |
2828 | c.Assert(err, IsNil) |
2829 | |
2830 | event := <-watch |
2831 | |
2832 | c.Assert(event.Path, Equals, "/test") |
2833 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2834 | + c.Assert(event.Type, Equals, zk.EVENT_CHANGED) |
2835 | } |
2836 | |
2837 | func (s *S) TestACL(c *C) { |
2838 | - zk, _ := s.init(c) |
2839 | - |
2840 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2841 | - c.Assert(err, IsNil) |
2842 | - |
2843 | - acl, stat, err := zk.ACL("/test") |
2844 | - c.Assert(err, IsNil) |
2845 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
2846 | + conn, _ := s.init(c) |
2847 | + |
2848 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2849 | + c.Assert(err, IsNil) |
2850 | + |
2851 | + acl, stat, err := conn.ACL("/test") |
2852 | + c.Assert(err, IsNil) |
2853 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
2854 | c.Assert(stat, NotNil) |
2855 | c.Assert(stat.Version(), Equals, int32(0)) |
2856 | |
2857 | - acl, stat, err = zk.ACL("/non-existent") |
2858 | + acl, stat, err = conn.ACL("/non-existent") |
2859 | c.Assert(err, NotNil) |
2860 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2861 | + c.Assert(err, Equals, zk.ZNONODE) |
2862 | c.Assert(acl, IsNil) |
2863 | c.Assert(stat, IsNil) |
2864 | } |
2865 | |
2866 | func (s *S) TestSetACL(c *C) { |
2867 | - zk, _ := s.init(c) |
2868 | + conn, _ := s.init(c) |
2869 | |
2870 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2871 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2872 | c.Assert(err, IsNil) |
2873 | |
2874 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5) |
2875 | + err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5) |
2876 | c.Assert(err, NotNil) |
2877 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
2878 | - |
2879 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1) |
2880 | - c.Assert(err, IsNil) |
2881 | - |
2882 | - acl, _, err := zk.ACL("/test") |
2883 | - c.Assert(err, IsNil) |
2884 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
2885 | + c.Assert(err, Equals, zk.ZBADVERSION) |
2886 | + |
2887 | + err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1) |
2888 | + c.Assert(err, IsNil) |
2889 | + |
2890 | + acl, _, err := conn.ACL("/test") |
2891 | + c.Assert(err, IsNil) |
2892 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
2893 | } |
2894 | |
2895 | func (s *S) TestAddAuth(c *C) { |
2896 | - zk, _ := s.init(c) |
2897 | - |
2898 | - acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
2899 | - |
2900 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl) |
2901 | + conn, _ := s.init(c) |
2902 | + |
2903 | + acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
2904 | + |
2905 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, acl) |
2906 | c.Assert(err, IsNil) |
2907 | |
2908 | - _, _, err = zk.Get("/test") |
2909 | + _, _, err = conn.Get("/test") |
2910 | c.Assert(err, NotNil) |
2911 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
2912 | + c.Assert(err, Equals, zk.ZNOAUTH) |
2913 | |
2914 | - err = zk.AddAuth("digest", "joe:passwd") |
2915 | + err = conn.AddAuth("digest", "joe:passwd") |
2916 | c.Assert(err, IsNil) |
2917 | |
2918 | - _, _, err = zk.Get("/test") |
2919 | + _, _, err = conn.Get("/test") |
2920 | c.Assert(err, IsNil) |
2921 | } |
2922 | |
2923 | func (s *S) TestWatchOnReconnection(c *C) { |
2924 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2925 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2926 | |
2927 | - zk, session := s.init(c) |
2928 | + conn, session := s.init(c) |
2929 | |
2930 | event := <-session |
2931 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2932 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2933 | - |
2934 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2935 | - |
2936 | - stat, watch, err := zk.ExistsW("/test") |
2937 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
2938 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
2939 | + |
2940 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2941 | + |
2942 | + stat, watch, err := conn.ExistsW("/test") |
2943 | c.Assert(err, IsNil) |
2944 | c.Assert(stat, IsNil) |
2945 | |
2946 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2947 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2948 | |
2949 | - s.StopZK() |
2950 | + s.zkServer.Stop() |
2951 | time.Sleep(2e9) |
2952 | - s.StartZK() |
2953 | + s.zkServer.Start() |
2954 | |
2955 | - // The session channel should receive the reconnection notification, |
2956 | + // The session channel should receive the reconnection notification. |
2957 | select { |
2958 | case event := <-session: |
2959 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTING) |
2960 | + c.Assert(event.State, Equals, zk.STATE_CONNECTING) |
2961 | case <-time.After(3e9): |
2962 | c.Fatal("Session watch didn't fire") |
2963 | } |
2964 | select { |
2965 | case event := <-session: |
2966 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2967 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
2968 | case <-time.After(3e9): |
2969 | c.Fatal("Session watch didn't fire") |
2970 | } |
2971 | @@ -585,40 +587,40 @@ |
2972 | } |
2973 | |
2974 | // And it should still work. |
2975 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2976 | + _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2977 | c.Assert(err, IsNil) |
2978 | |
2979 | event = <-watch |
2980 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
2981 | + c.Assert(event.Type, Equals, zk.EVENT_CREATED) |
2982 | c.Assert(event.Path, Equals, "/test") |
2983 | |
2984 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2985 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2986 | } |
2987 | |
2988 | func (s *S) TestWatchOnSessionExpiration(c *C) { |
2989 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2990 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2991 | |
2992 | - zk, session := s.init(c) |
2993 | + conn, session := s.init(c) |
2994 | |
2995 | event := <-session |
2996 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2997 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2998 | - |
2999 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3000 | - |
3001 | - stat, watch, err := zk.ExistsW("/test") |
3002 | + c.Assert(event.Type, Equals, zk.EVENT_SESSION) |
3003 | + c.Assert(event.State, Equals, zk.STATE_CONNECTED) |
3004 | + |
3005 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3006 | + |
3007 | + stat, watch, err := conn.ExistsW("/test") |
3008 | c.Assert(err, IsNil) |
3009 | c.Assert(stat, IsNil) |
3010 | |
3011 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3012 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3013 | |
3014 | // Use expiration trick described in the FAQ. |
3015 | - clientId := zk.ClientId() |
3016 | - zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId) |
3017 | + clientId := conn.ClientId() |
3018 | + zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId) |
3019 | |
3020 | for event := range session2 { |
3021 | c.Log("Event from overlapping session: ", event) |
3022 | - if event.State == gozk.STATE_CONNECTED { |
3023 | + if event.State == zk.STATE_CONNECTED { |
3024 | // Wait for zk to process the connection. |
3025 | // Not reliable without this. :-( |
3026 | time.Sleep(1e9) |
3027 | @@ -627,21 +629,21 @@ |
3028 | } |
3029 | for event := range session { |
3030 | c.Log("Event from primary session: ", event) |
3031 | - if event.State == gozk.STATE_EXPIRED_SESSION { |
3032 | + if event.State == zk.STATE_EXPIRED_SESSION { |
3033 | break |
3034 | } |
3035 | } |
3036 | |
3037 | select { |
3038 | case event := <-watch: |
3039 | - c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION) |
3040 | + c.Assert(event.State, Equals, zk.STATE_EXPIRED_SESSION) |
3041 | case <-time.After(3e9): |
3042 | c.Fatal("Watch event didn't fire") |
3043 | } |
3044 | |
3045 | event = <-watch |
3046 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
3047 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
3048 | + c.Assert(event.Type, Equals, zk.EVENT_CLOSED) |
3049 | + c.Assert(event.State, Equals, zk.STATE_CLOSED) |
3050 | |
3051 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3052 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3053 | } |
Great stuff here Rog, thanks!
Some comments:
[1]
-// The ZooKepeer installation directory is specified by installedDir.
+// The ZooKeeper installation directory is specified by installDir.
installDir feels like a strange name in this context. I agree installedDir
isn't great either, but the new one gives a wrong impression about what
it means.. we're not installing anything there.
Let's use something like zkDir instead.
[2]
+func CreateServer(port int, runDir, installDir string) (*Server, os.Error) {
(...)
+func AttachServer(runDir string) (*Server, os.Error) {
The organization inside these is looking very nice.
[3]
+func (srv *Server) getServerProcess() (*os.Process, os.Error) {
(...)
+ return os.FindProcess(pid)
FindProcess does nothing on Unix. We can run at least a Signal 0 against
the pid to tell if it exists or not.
[4]
+ if p != nil { "ZooKeeper server may already be running (remove %q to clear)", srv.path( "pid.txt" ))
+ p.Release()
+ }
+ return fmt.Errorf(
The recommendation here is a bit strange. If the process is actually running,
removing the pid file will allow a second server to be started in parallel,
which is not nice.
I suggest just saying "ZooKeeper server seems to be running already", which
is a bit more true given that we'll be checking the pid as per the point above.
Then, the developer can run Stop() on it if he feels like stopping it, or
do whatever else out of band if he doesn't want to use the interface, but
then it's his responsibility to check for background processes, etc.
[5]
+// Stop kills the ZooKeeper server. It is a no-op if it is already running.
The second part is reversed, and reads a bit hard.
Suggestion: It does nothing if not running.
Also, please mention that the data is preserved, and that the server can
be restarted. "kill" may feel like otherwise.
[6]
+// Destroy stops the ZooKeeper server, and then removes its run
+// directory and all its contents.
s/and all its contents././
Removing the directory means removing its contents necessarily.
[7]
+func (srv *Server) writeInstallDir() os.Error { WriteFile( srv.path( "installdir. txt"), []byte( srv.installDir+ "\n"), 0666)
+ return ioutil.
These functions and filename should be renamed in sync to the first point.
[8]
+ if data[len(data)-1] == '\n' {
+ data = data[0 : len(data)-1]
+ }
+ srv.installDir = string(data)
srv.zkDir = string( bytes.TrimSpace (data))
[9]
+ s.zkTestRoot = c.MkDir() + "/zk" "localhost: ", port) (port, s.zkTestRoot, "")
+ port := 21812
+ s.zkAddr = fmt.Sprint(
+
+ s.zkServer, err = zk.CreateServer
if err != nil {
c.Fatal("Cannot set up server environment: ", err)
}
- s.StartZK(c)
+ err = s.zkServer.Start()
+ if err != nil {
+ c.Fatal("Cannot start ZooKeeper server: ", err)
+ }
This is awesome! Thanks!