Merge lp:~rogpeppe/gozk/clean-up-interface into lp:~juju/gozk/trunk
- clean-up-interface
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Merge reported by: | Gustavo Niemeyer | ||||
Merged at revision: | not available | ||||
Proposed branch: | lp:~rogpeppe/gozk/clean-up-interface | ||||
Merge into: | lp:~juju/gozk/trunk | ||||
Diff against target: |
2278 lines (+648/-578) 7 files modified
Makefile (+5/-2) example/example.go (+22/-23) retry_test.go (+45/-47) server.go (+166/-0) suite_test.go (+65/-114) zookeeper.go (+216/-265) zookeeper_test.go (+129/-127) |
||||
To merge this branch: | bzr merge lp:~rogpeppe/gozk/clean-up-interface | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gustavo Niemeyer | Approve | ||
Review via email: mp+76766@code.launchpad.net |
Commit message
Description of the change
Clean up gozk interface. Add Server function.
Roger Peppe (rogpeppe) wrote : | # |
On 23 September 2011 18:21, Gustavo Niemeyer <email address hidden> wrote:
> Review: Needs Fixing
>
> This is looking fantastic. Thanks Roger.
>
> Some details to sort out:
>
>
> [1]
>
> === renamed file 'gozk.go' => 'zookeeper/gozk.go'
>
> Please keep the files at the root of the branch. We'll put it in a
> new series in launchpad so that the gozk/zookeeper reference is
> correct.
ok. i moved the example file into an "example" directory
so that goinstall won't get confused.
>
>
> [2]
>
> +// Conn represents a connection to a set of Zookeeper nodes.
>
> The project name is "ZooKeeper". Please use this casing consistently
> across the code base.
done. + a few original discrepancies in this respect.
> [3]
>
> +// ClientId represents an established ZooKeeper session. It can be
> +// passed into New to reestablish a connection to an existing session.
>
> There's no New function.
i'd already fixed this.
>
> +func (zk *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
>
> Very nice changes in RetryChange too, thanks!
good. worth the bug, i'd say!
> [6]
>
> + path, err := zk.Create("/test-", "bababum", zookeeper.
>
> These lines make me unhappy about the length of "zookeeper". I'm thinking we
> might well rename it to "zk" in a follow up branch, but please keep it like
> this for now. Just for comparison:
>
> path, err := conn.Create(
my inclination would be to keep the name as "zookeeper", but
people can import as zk if they want. then strangers to code that's
using it will see "launchpad.
what the package is about.
also, people can easily define constants for this kind of thing if they wish.
> + c.Assert(stat, Equals, (*zookeeper.
>
> c.Assert(stat, IsNil)
done.
> [8]
>
> +// Server sets up a zookeeper server environment inside dataDir
>
> ZooKeeper in this file as well.
done.
> [9]
>
> +// for a server that listening on the specified TCP port, sending
>
> s/listening/
done
> [10]
>
> +// Server does not actually start the server. Instead it returns
> +// a command line, suitable for passing to exec.Command,
> +// for example.
>
> Let's do a bit better than this. There's no reason to keep the
> responsibility of creating the directory, starting, stopping,
> cleaning, etc, with the caller. We need this for real, not just
> due to testing, but in juju for the local deployment case.
i thought quite a bit about this.
the problem with doing the starting ourselves is that
there are many things the caller might wish to do
with the running executable. for example:
- logging (see the custom logging in suite_test.go)
- running as a different user
- tweaking the config file before starting
- stashing the pid for long-term killability
- starting the same server several times on the same
directory (as is done in this test suite). ok, your proposal
does deal with this.
-
>
> I suggest we define a Server type as:
>
> type Server struct {
> runDir, installDir string
> cmd *exec.Cmd
> }
>
> ...
- 24. By Roger Peppe
-
fixes after code review.
Gustavo Niemeyer (niemeyer) wrote : | # |
Thanks for the changes. Some additional feedback:
[10]
> i thought quite a bit about this.
> the problem with doing the starting ourselves is that
> there are many things the caller might wish to do
Yeah, we talked precisely about this on IRC. None of the
points you mention are a good reason to not provide the
suggested interface.
[12]
> the docs say that the only error that glob can return is if the
> pattern is malformed. those patterns are evidently not.
Until someone changes them by mistake and they become malformed,
and rather than being warned about that we get crazy behavior.
A panic is fine in that case, but please do not leave errors unchecked.
[14]
Sounds good.
- 25. By Roger Peppe
-
add error checks to Glob call
Roger Peppe (rogpeppe) wrote : | # |
On 23 September 2011 19:41, Gustavo Niemeyer <email address hidden> wrote:
>> the docs say that the only error that glob can return is if the
>> pattern is malformed. those patterns are evidently not.
>
> Until someone changes them by mistake and they become malformed,
> and rather than being warned about that we get crazy behavior.
>
> A panic is fine in that case, but please do not leave errors unchecked.
i will do this since you ask. but i really think this is unnecessary.
the worst thing that will happen if the pattern is bad is that we'll
get no files.
this will also happen if the file name is misspelled, a much more likely
case (and one incidentally that i think might apply currently - i can't
test it), and one which the error check won't catch. it seems ok to me
that they're
both debugged in the same way.
[the only time a pattern can be bad is if ends in a backslash
or it has a malformed character class. the former is easy to see
and we're unlikely to add a character class match here.]
i've pushed a new version. i'll leave the Server changes for a separate
merge, as suggested.
Gustavo Niemeyer (niemeyer) wrote : | # |
That's merged, and pushed to a new package location, and fixed accordingly:
Once the Server implementation is finished, we can announce the new location
and the changes performed.
The new location also enables us to wait a bit for people to migrate, instead
of removing the old location immediately.
On the next merge proposal, please remember to target lp:~juju/juju/zk instead of trunk.
Roger Peppe (rogpeppe) wrote : | # |
thanks, but...
one last plea to have the word "zookeeper" in the package path at least,
but preferably as the pkg identifier. people can import as zk if they wish
(or even as both!). i'm not keen on acronyms and abbreviations - can you
tell? :-)
On 24 September 2011 15:08, Gustavo Niemeyer <email address hidden> wrote:
> Review: Approve
>
> That's merged, and pushed to a new package location, and fixed accordingly:
>
> http://
>
> Once the Server implementation is finished, we can announce the new location
> and the changes performed.
>
> The new location also enables us to wait a bit for people to migrate, instead
> of removing the old location immediately.
>
> On the next merge proposal, please remember to target lp:~juju/juju/zk instead of trunk.
> --
> https:/
> You are the owner of lp:~rogpeppe/gozk/clean-up-interface.
>
Gustavo Niemeyer (niemeyer) wrote : | # |
> one last plea to have the word "zookeeper" in the package path at least,
> but preferably as the pkg identifier. people can import as zk if they wish
> (or even as both!). i'm not keen on acronyms and abbreviations - can you
> tell? :-)
I can tell, but I'm not keen on the very long lines that the new package name
has created, so being a matter of personal choice only I'll reserve the right
to keep the existing convention unchanged in that one case.
I do accept taking the "go" prefix out, though.
Preview Diff
1 | === modified file 'Makefile' |
2 | --- Makefile 2011-08-03 01:56:58 +0000 |
3 | +++ Makefile 2011-09-24 08:31:23 +0000 |
4 | @@ -2,10 +2,13 @@ |
5 | |
6 | all: package |
7 | |
8 | -TARG=gozk |
9 | +TARG=launchpad.net/zookeeper |
10 | |
11 | +GOFILES=\ |
12 | + server.go\ |
13 | + |
14 | CGOFILES=\ |
15 | - gozk.go\ |
16 | + zookeeper.go\ |
17 | |
18 | CGO_OFILES=\ |
19 | helpers.o\ |
20 | |
21 | === added directory 'example' |
22 | === renamed file 'example.go' => 'example/example.go' |
23 | --- example.go 2011-08-03 01:47:25 +0000 |
24 | +++ example/example.go 2011-09-24 08:31:23 +0000 |
25 | @@ -1,30 +1,29 @@ |
26 | package main |
27 | |
28 | import ( |
29 | - "gozk" |
30 | + "launchpad.net/zookeeper/zookeeper" |
31 | ) |
32 | |
33 | func main() { |
34 | - zk, session, err := gozk.Init("localhost:2181", 5000) |
35 | - if err != nil { |
36 | - println("Couldn't connect: " + err.String()) |
37 | - return |
38 | - } |
39 | - |
40 | - defer zk.Close() |
41 | - |
42 | - // Wait for connection. |
43 | - event := <-session |
44 | - if event.State != gozk.STATE_CONNECTED { |
45 | - println("Couldn't connect") |
46 | - return |
47 | - } |
48 | - |
49 | - _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL)) |
50 | - if err != nil { |
51 | - println(err.String()) |
52 | - } else { |
53 | - println("Created!") |
54 | - } |
55 | + zk, session, err := zookeeper.Init("localhost:2181", 5000) |
56 | + if err != nil { |
57 | + println("Couldn't connect: " + err.String()) |
58 | + return |
59 | + } |
60 | + |
61 | + defer zk.Close() |
62 | + |
63 | + // Wait for connection. |
64 | + event := <-session |
65 | + if event.State != zookeeper.STATE_CONNECTED { |
66 | + println("Couldn't connect") |
67 | + return |
68 | + } |
69 | + |
70 | + _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
71 | + if err != nil { |
72 | + println(err.String()) |
73 | + } else { |
74 | + println("Created!") |
75 | + } |
76 | } |
77 | - |
78 | |
79 | === modified file 'retry_test.go' |
80 | --- retry_test.go 2011-08-19 01:51:37 +0000 |
81 | +++ retry_test.go 2011-09-24 08:31:23 +0000 |
82 | @@ -1,16 +1,16 @@ |
83 | -package gozk_test |
84 | +package zookeeper_test |
85 | |
86 | import ( |
87 | . "launchpad.net/gocheck" |
88 | - "gozk" |
89 | + "launchpad.net/zookeeper" |
90 | "os" |
91 | ) |
92 | |
93 | func (s *S) TestRetryChangeCreating(c *C) { |
94 | zk, _ := s.init(c) |
95 | |
96 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
97 | - func(data string, stat gozk.Stat) (string, os.Error) { |
98 | + err := zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL), |
99 | + func(data string, stat *zookeeper.Stat) (string, os.Error) { |
100 | c.Assert(data, Equals, "") |
101 | c.Assert(stat, IsNil) |
102 | return "new", nil |
103 | @@ -25,18 +25,18 @@ |
104 | |
105 | acl, _, err := zk.ACL("/test") |
106 | c.Assert(err, IsNil) |
107 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
108 | + c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
109 | } |
110 | |
111 | func (s *S) TestRetryChangeSetting(c *C) { |
112 | zk, _ := s.init(c) |
113 | |
114 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
115 | - gozk.WorldACL(gozk.PERM_ALL)) |
116 | + _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL, |
117 | + zookeeper.WorldACL(zookeeper.PERM_ALL)) |
118 | c.Assert(err, IsNil) |
119 | |
120 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
121 | - func(data string, stat gozk.Stat) (string, os.Error) { |
122 | + err = zk.RetryChange("/test", zookeeper.EPHEMERAL, []zookeeper.ACL{}, |
123 | + func(data string, stat *zookeeper.Stat) (string, os.Error) { |
124 | c.Assert(data, Equals, "old") |
125 | c.Assert(stat, NotNil) |
126 | c.Assert(stat.Version(), Equals, int32(0)) |
127 | @@ -53,18 +53,18 @@ |
128 | // ACL was unchanged by RetryChange(). |
129 | acl, _, err := zk.ACL("/test") |
130 | c.Assert(err, IsNil) |
131 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
132 | + c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
133 | } |
134 | |
135 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { |
136 | zk, _ := s.init(c) |
137 | |
138 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
139 | - gozk.WorldACL(gozk.PERM_ALL)) |
140 | + _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL, |
141 | + zookeeper.WorldACL(zookeeper.PERM_ALL)) |
142 | c.Assert(err, IsNil) |
143 | |
144 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
145 | - func(data string, stat gozk.Stat) (string, os.Error) { |
146 | + err = zk.RetryChange("/test", zookeeper.EPHEMERAL, []zookeeper.ACL{}, |
147 | + func(data string, stat *zookeeper.Stat) (string, os.Error) { |
148 | c.Assert(data, Equals, "old") |
149 | c.Assert(stat, NotNil) |
150 | c.Assert(stat.Version(), Equals, int32(0)) |
151 | @@ -82,12 +82,12 @@ |
152 | func (s *S) TestRetryChangeConflictOnCreate(c *C) { |
153 | zk, _ := s.init(c) |
154 | |
155 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
156 | + changeFunc := func(data string, stat *zookeeper.Stat) (string, os.Error) { |
157 | switch data { |
158 | case "": |
159 | c.Assert(stat, IsNil) |
160 | - _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL, |
161 | - gozk.WorldACL(gozk.PERM_ALL)) |
162 | + _, err := zk.Create("/test", "conflict", zookeeper.EPHEMERAL, |
163 | + zookeeper.WorldACL(zookeeper.PERM_ALL)) |
164 | c.Assert(err, IsNil) |
165 | return "<none> => conflict", nil |
166 | case "conflict": |
167 | @@ -100,7 +100,7 @@ |
168 | return "can't happen", nil |
169 | } |
170 | |
171 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
172 | + err := zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL), |
173 | changeFunc) |
174 | c.Assert(err, IsNil) |
175 | |
176 | @@ -114,11 +114,11 @@ |
177 | func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) { |
178 | zk, _ := s.init(c) |
179 | |
180 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
181 | - gozk.WorldACL(gozk.PERM_ALL)) |
182 | + _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL, |
183 | + zookeeper.WorldACL(zookeeper.PERM_ALL)) |
184 | c.Assert(err, IsNil) |
185 | |
186 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
187 | + changeFunc := func(data string, stat *zookeeper.Stat) (string, os.Error) { |
188 | switch data { |
189 | case "old": |
190 | c.Assert(stat, NotNil) |
191 | @@ -136,7 +136,7 @@ |
192 | return "can't happen", nil |
193 | } |
194 | |
195 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc) |
196 | + err = zk.RetryChange("/test", zookeeper.EPHEMERAL, []zookeeper.ACL{}, changeFunc) |
197 | c.Assert(err, IsNil) |
198 | |
199 | data, stat, err := zk.Get("/test") |
200 | @@ -149,11 +149,11 @@ |
201 | func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) { |
202 | zk, _ := s.init(c) |
203 | |
204 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
205 | - gozk.WorldACL(gozk.PERM_ALL)) |
206 | + _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL, |
207 | + zookeeper.WorldACL(zookeeper.PERM_ALL)) |
208 | c.Assert(err, IsNil) |
209 | |
210 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
211 | + changeFunc := func(data string, stat *zookeeper.Stat) (string, os.Error) { |
212 | switch data { |
213 | case "old": |
214 | c.Assert(stat, NotNil) |
215 | @@ -170,7 +170,7 @@ |
216 | return "can't happen", nil |
217 | } |
218 | |
219 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ), |
220 | + err = zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_READ), |
221 | changeFunc) |
222 | c.Assert(err, IsNil) |
223 | |
224 | @@ -183,18 +183,17 @@ |
225 | // Should be the new ACL. |
226 | acl, _, err := zk.ACL("/test") |
227 | c.Assert(err, IsNil) |
228 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
229 | + c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_READ)) |
230 | } |
231 | |
232 | func (s *S) TestRetryChangeErrorInCallback(c *C) { |
233 | zk, _ := s.init(c) |
234 | |
235 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
236 | - func(data string, stat gozk.Stat) (string, os.Error) { |
237 | + err := zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL), |
238 | + func(data string, stat *zookeeper.Stat) (string, os.Error) { |
239 | return "don't use this", os.NewError("BOOM!") |
240 | }) |
241 | c.Assert(err, NotNil) |
242 | - c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR) |
243 | c.Assert(err.String(), Equals, "BOOM!") |
244 | |
245 | stat, err := zk.Exists("/test") |
246 | @@ -205,18 +204,18 @@ |
247 | func (s *S) TestRetryChangeFailsReading(c *C) { |
248 | zk, _ := s.init(c) |
249 | |
250 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
251 | - gozk.WorldACL(gozk.PERM_WRITE)) // Write only! |
252 | + _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL, |
253 | + zookeeper.WorldACL(zookeeper.PERM_WRITE)) // Write only! |
254 | c.Assert(err, IsNil) |
255 | |
256 | var called bool |
257 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
258 | - func(data string, stat gozk.Stat) (string, os.Error) { |
259 | + err = zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL), |
260 | + func(data string, stat *zookeeper.Stat) (string, os.Error) { |
261 | called = true |
262 | return "", nil |
263 | }) |
264 | c.Assert(err, NotNil) |
265 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
266 | + c.Assert(err, Equals, zookeeper.ZNOAUTH) |
267 | |
268 | stat, err := zk.Exists("/test") |
269 | c.Assert(err, IsNil) |
270 | @@ -229,18 +228,17 @@ |
271 | func (s *S) TestRetryChangeFailsSetting(c *C) { |
272 | zk, _ := s.init(c) |
273 | |
274 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
275 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
276 | + _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL, |
277 | + zookeeper.WorldACL(zookeeper.PERM_READ)) // Read only! |
278 | c.Assert(err, IsNil) |
279 | |
280 | var called bool |
281 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
282 | - func(data string, stat gozk.Stat) (string, os.Error) { |
283 | + err = zk.RetryChange("/test", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL), |
284 | + func(data string, stat *zookeeper.Stat) (string, os.Error) { |
285 | called = true |
286 | return "", nil |
287 | }) |
288 | - c.Assert(err, NotNil) |
289 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
290 | + c.Assert(err, Equals, zookeeper.ZNOAUTH) |
291 | |
292 | stat, err := zk.Exists("/test") |
293 | c.Assert(err, IsNil) |
294 | @@ -253,19 +251,19 @@ |
295 | func (s *S) TestRetryChangeFailsCreating(c *C) { |
296 | zk, _ := s.init(c) |
297 | |
298 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
299 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
300 | + _, err := zk.Create("/test", "old", zookeeper.EPHEMERAL, |
301 | + zookeeper.WorldACL(zookeeper.PERM_READ)) // Read only! |
302 | c.Assert(err, IsNil) |
303 | |
304 | var called bool |
305 | - err = zk.RetryChange("/test/sub", gozk.EPHEMERAL, |
306 | - gozk.WorldACL(gozk.PERM_ALL), |
307 | - func(data string, stat gozk.Stat) (string, os.Error) { |
308 | + err = zk.RetryChange("/test/sub", zookeeper.EPHEMERAL, |
309 | + zookeeper.WorldACL(zookeeper.PERM_ALL), |
310 | + func(data string, stat *zookeeper.Stat) (string, os.Error) { |
311 | called = true |
312 | return "", nil |
313 | }) |
314 | c.Assert(err, NotNil) |
315 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
316 | + c.Assert(err, Equals, zookeeper.ZNOAUTH) |
317 | |
318 | stat, err := zk.Exists("/test/sub") |
319 | c.Assert(err, IsNil) |
320 | |
321 | === added file 'server.go' |
322 | --- server.go 1970-01-01 00:00:00 +0000 |
323 | +++ server.go 2011-09-24 08:31:23 +0000 |
324 | @@ -0,0 +1,166 @@ |
325 | +package zookeeper |
326 | + |
327 | +import ( |
328 | + "bufio" |
329 | + "bytes" |
330 | + "exec" |
331 | + "fmt" |
332 | + "io/ioutil" |
333 | + "os" |
334 | + "path/filepath" |
335 | + "strings" |
336 | +) |
337 | + |
338 | +const zookeeperEnviron = "/etc/zookeeper/conf/environment" |
339 | + |
340 | +// Server sets up a zookeeper server environment inside dataDir |
341 | +// for a server that listens on the specified TCP port, sending |
342 | +// all log messages to standard output. |
343 | +// The dataDir directory must exist already. |
344 | +// |
345 | +// The zookeeper installation directory is specified by installedDir. |
346 | +// If this is empty, a system default will be used. |
347 | +// |
348 | +// Server does not actually start the server. Instead it returns |
349 | +// a command line, suitable for passing to exec.Command, |
350 | +// for example. |
351 | +func Server(port int, dataDir, installedDir string) ([]string, os.Error) { |
352 | + cp, err := classPath(installedDir) |
353 | + if err != nil { |
354 | + return nil, err |
355 | + } |
356 | + logDir := filepath.Join(dataDir, "log") |
357 | + if err = os.Mkdir(logDir, 0777); err != nil && err.(*os.PathError).Error != os.EEXIST { |
358 | + return nil, err |
359 | + } |
360 | + logConfigPath, err := writeLog4JConfig(dataDir) |
361 | + if err != nil { |
362 | + return nil, err |
363 | + } |
364 | + configPath, err := writeZooKeeperConfig(dataDir, port) |
365 | + if err != nil { |
366 | + return nil, err |
367 | + } |
368 | + exe, err := exec.LookPath("java") |
369 | + if err != nil { |
370 | + return nil, err |
371 | + } |
372 | + cmd := []string{ |
373 | + exe, |
374 | + "-cp", strings.Join(cp, ":"), |
375 | + "-Dzookeeper.log.dir=" + logDir, |
376 | + "-Dzookeeper.root.logger=INFO,CONSOLE", |
377 | + "-Dlog4j.configuration=file:" + logConfigPath, |
378 | + "org.apache.zookeeper.server.quorum.QuorumPeerMain", |
379 | + configPath, |
380 | + } |
381 | + return cmd, nil |
382 | +} |
383 | + |
384 | +var log4jProperties = ` |
385 | +log4j.rootLogger=INFO, CONSOLE |
386 | +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender |
387 | +log4j.appender.CONSOLE.Threshold=INFO |
388 | +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout |
389 | +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n |
390 | +` |
391 | + |
392 | +func writeLog4JConfig(dir string) (path string, err os.Error) { |
393 | + path = filepath.Join(dir, "log4j.properties") |
394 | + err = ioutil.WriteFile(path, []byte(log4jProperties), 0666) |
395 | + return |
396 | +} |
397 | + |
398 | +func writeZooKeeperConfig(dir string, port int) (path string, err os.Error) { |
399 | + path = filepath.Join(dir, "zoo.cfg") |
400 | + err = ioutil.WriteFile(path, []byte(fmt.Sprintf( |
401 | + "tickTime=2000\n"+ |
402 | + "dataDir=%s\n"+ |
403 | + "clientPort=%d\n"+ |
404 | + "maxClientCnxns=500\n", |
405 | + dir, port)), 0666) |
406 | + return |
407 | +} |
408 | + |
409 | +func classPath(dir string) ([]string, os.Error) { |
410 | + if dir == "" { |
411 | + return systemClassPath() |
412 | + } |
413 | + if err := checkDirectory(dir); err != nil { |
414 | + return nil, err |
415 | + } |
416 | + // Two possibilities, as seen in zkEnv.sh: |
417 | + // 1) locally built binaries (jars are in build directory) |
418 | + // 2) release binaries |
419 | + if build := filepath.Join(dir, "build"); checkDirectory(build) == nil { |
420 | + dir = build |
421 | + } |
422 | + classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar")) |
423 | + if err != nil { |
424 | + panic(fmt.Errorf("glob for jar files: %v", err)) |
425 | + } |
426 | + more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar")) |
427 | + if err != nil { |
428 | + panic(fmt.Errorf("glob for lib jar files: %v", err)) |
429 | + } |
430 | + |
431 | + classPath = append(classPath, more...) |
432 | + if len(classPath) == 0 { |
433 | + return nil, fmt.Errorf("zookeeper libraries not found in %q", dir) |
434 | + } |
435 | + return classPath, nil |
436 | +} |
437 | + |
438 | +func systemClassPath() ([]string, os.Error) { |
439 | + f, err := os.Open(zookeeperEnviron) |
440 | + if f == nil { |
441 | + return nil, err |
442 | + } |
443 | + r := bufio.NewReader(f) |
444 | + for { |
445 | + line, err := r.ReadSlice('\n') |
446 | + if err != nil { |
447 | + break |
448 | + } |
449 | + if !bytes.HasPrefix(line, []byte("CLASSPATH=")) { |
450 | + continue |
451 | + } |
452 | + |
453 | + // remove variable and newline |
454 | + path := string(line[len("CLASSPATH=") : len(line)-1]) |
455 | + |
456 | + // trim white space |
457 | + path = strings.Trim(path, " \t\r") |
458 | + |
459 | + // strip quotes |
460 | + if path[0] == '"' { |
461 | + path = path[1 : len(path)-1] |
462 | + } |
463 | + |
464 | + // split on : |
465 | + classPath := strings.Split(path, ":") |
466 | + |
467 | + // split off $ZOOCFGDIR |
468 | + if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" { |
469 | + classPath = classPath[1:] |
470 | + } |
471 | + |
472 | + if len(classPath) == 0 { |
473 | + return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron) |
474 | + } |
475 | + return classPath, nil |
476 | + } |
477 | + return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron) |
478 | +} |
479 | + |
480 | +// checkDirectory returns an error if the given path |
481 | +// does not exist or is not a directory. |
482 | +func checkDirectory(path string) os.Error { |
483 | + if info, err := os.Stat(path); err != nil || !info.IsDirectory() { |
484 | + if err == nil { |
485 | + err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")} |
486 | + } |
487 | + return err |
488 | + } |
489 | + return nil |
490 | +} |
491 | |
492 | === modified file 'suite_test.go' |
493 | --- suite_test.go 2011-08-19 01:43:37 +0000 |
494 | +++ suite_test.go 2011-09-24 08:31:23 +0000 |
495 | @@ -1,13 +1,13 @@ |
496 | -package gozk_test |
497 | +package zookeeper_test |
498 | |
499 | import ( |
500 | . "launchpad.net/gocheck" |
501 | - "testing" |
502 | - "io/ioutil" |
503 | - "path" |
504 | + "bufio" |
505 | + "exec" |
506 | "fmt" |
507 | + "launchpad.net/zookeeper" |
508 | "os" |
509 | - "gozk" |
510 | + "testing" |
511 | "time" |
512 | ) |
513 | |
514 | @@ -18,48 +18,32 @@ |
515 | var _ = Suite(&S{}) |
516 | |
517 | type S struct { |
518 | - zkRoot string |
519 | - zkTestRoot string |
520 | - zkTestPort int |
521 | - zkServerSh string |
522 | - zkServerOut *os.File |
523 | - zkAddr string |
524 | + zkArgs []string |
525 | + zkTestRoot string |
526 | + zkTestPort int |
527 | + zkProcess *os.Process // The running ZooKeeper process |
528 | + zkAddr string |
529 | |
530 | - handles []*gozk.ZooKeeper |
531 | - events []*gozk.Event |
532 | + handles []*zookeeper.Conn |
533 | + events []*zookeeper.Event |
534 | liveWatches int |
535 | deadWatches chan bool |
536 | } |
537 | |
538 | -var logLevel = 0 //gozk.LOG_ERROR |
539 | - |
540 | - |
541 | -var testZooCfg = ("dataDir=%s\n" + |
542 | - "clientPort=%d\n" + |
543 | - "tickTime=2000\n" + |
544 | - "initLimit=10\n" + |
545 | - "syncLimit=5\n" + |
546 | - "") |
547 | - |
548 | -var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" + |
549 | - "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + |
550 | - "log4j.appender.CONSOLE.Threshold=DEBUG\n" + |
551 | - "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + |
552 | - "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" + |
553 | - "") |
554 | - |
555 | -func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) { |
556 | - zk, watch, err := gozk.Init(s.zkAddr, 5e9) |
557 | +var logLevel = 0 //zookeeper.LOG_ERROR |
558 | + |
559 | +func (s *S) init(c *C) (*zookeeper.Conn, chan zookeeper.Event) { |
560 | + zk, watch, err := zookeeper.Dial(s.zkAddr, 5e9) |
561 | c.Assert(err, IsNil) |
562 | |
563 | s.handles = append(s.handles, zk) |
564 | |
565 | event := <-watch |
566 | |
567 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
568 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
569 | + c.Assert(event.Type, Equals, zookeeper.EVENT_SESSION) |
570 | + c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED) |
571 | |
572 | - bufferedWatch := make(chan gozk.Event, 256) |
573 | + bufferedWatch := make(chan zookeeper.Event, 256) |
574 | bufferedWatch <- event |
575 | |
576 | s.liveWatches += 1 |
577 | @@ -86,9 +70,9 @@ |
578 | } |
579 | |
580 | func (s *S) SetUpTest(c *C) { |
581 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
582 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 0, |
583 | Bug("Test got a dirty watch state before running!")) |
584 | - gozk.SetLogLevel(logLevel) |
585 | + zookeeper.SetLogLevel(logLevel) |
586 | } |
587 | |
588 | func (s *S) TearDownTest(c *C) { |
589 | @@ -108,98 +92,65 @@ |
590 | } |
591 | |
592 | // Reset the list of handles. |
593 | - s.handles = make([]*gozk.ZooKeeper, 0) |
594 | + s.handles = make([]*zookeeper.Conn, 0) |
595 | |
596 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
597 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 0, |
598 | Bug("Test left live watches behind!")) |
599 | } |
600 | |
601 | -// We use the suite set up and tear down to manage a custom zookeeper |
602 | +// We use the suite set up and tear down to manage a custom ZooKeeper |
603 | // |
604 | func (s *S) SetUpSuite(c *C) { |
605 | - |
606 | + var err os.Error |
607 | s.deadWatches = make(chan bool) |
608 | |
609 | - var err os.Error |
610 | - |
611 | - s.zkRoot = os.Getenv("ZKROOT") |
612 | - if s.zkRoot == "" { |
613 | - panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)") |
614 | - } |
615 | - |
616 | s.zkTestRoot = c.MkDir() |
617 | s.zkTestPort = 21812 |
618 | - |
619 | - println("ZooKeeper test server directory:", s.zkTestRoot) |
620 | - println("ZooKeeper test server port:", s.zkTestPort) |
621 | - |
622 | - s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort) |
623 | - |
624 | - s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh") |
625 | - s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"), |
626 | - os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) |
627 | - if err != nil { |
628 | - panic("Can't open stdout.txt file for server: " + err.String()) |
629 | - } |
630 | - |
631 | - dataDir := path.Join(s.zkTestRoot, "data") |
632 | - confDir := path.Join(s.zkTestRoot, "conf") |
633 | - |
634 | - os.Mkdir(dataDir, 0755) |
635 | - os.Mkdir(confDir, 0755) |
636 | - |
637 | - err = os.Setenv("ZOOCFGDIR", confDir) |
638 | - if err != nil { |
639 | - panic("Can't set $ZOOCFGDIR: " + err.String()) |
640 | - } |
641 | - |
642 | - zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort)) |
643 | - err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644) |
644 | - if err != nil { |
645 | - panic("Can't write zoo.cfg: " + err.String()) |
646 | - } |
647 | - |
648 | - log4jPrp := []byte(testLog4jPrp) |
649 | - err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644) |
650 | - if err != nil { |
651 | - panic("Can't write log4j.properties: " + err.String()) |
652 | - } |
653 | - |
654 | - s.StartZK() |
655 | + s.zkAddr = fmt.Sprint("localhost:", s.zkTestPort) |
656 | + |
657 | + s.zkArgs, err = zookeeper.Server(s.zkTestPort, s.zkTestRoot, "") |
658 | + if err != nil { |
659 | + c.Fatal("Cannot set up server environment: ", err) |
660 | + } |
661 | + s.StartZK(c) |
662 | } |
663 | |
664 | func (s *S) TearDownSuite(c *C) { |
665 | s.StopZK() |
666 | - s.zkServerOut.Close() |
667 | -} |
668 | - |
669 | -func (s *S) StartZK() { |
670 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
671 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr) |
672 | - if err != nil { |
673 | - panic("Problem executing zkServer.sh start: " + err.String()) |
674 | - } |
675 | - |
676 | - result, err := proc.Wait(0) |
677 | - if err != nil { |
678 | - panic(err.String()) |
679 | - } else if result.ExitStatus() != 0 { |
680 | - panic("'zkServer.sh start' exited with non-zero status") |
681 | - } |
682 | +} |
683 | + |
684 | +func startLogger(c *C, cmd *exec.Cmd) { |
685 | + r, err := cmd.StdoutPipe() |
686 | + if err != nil { |
687 | + c.Fatal("cannot make output pipe:", err) |
688 | + } |
689 | + cmd.Stderr = cmd.Stdout |
690 | + bio := bufio.NewReader(r) |
691 | + go func() { |
692 | + for { |
693 | + line, err := bio.ReadSlice('\n') |
694 | + if err != nil { |
695 | + break |
696 | + } |
697 | + c.Log(line[0 : len(line)-1]) |
698 | + } |
699 | + }() |
700 | +} |
701 | + |
702 | +func (s *S) StartZK(c *C) { |
703 | + cmd := exec.Command(s.zkArgs[0], s.zkArgs[1:]...) |
704 | + startLogger(c, cmd) |
705 | + err := cmd.Start() |
706 | + if err != nil { |
707 | + c.Fatal("Error starting zookeeper server: ", err) |
708 | + } |
709 | + s.zkProcess = cmd.Process |
710 | } |
711 | |
712 | func (s *S) StopZK() { |
713 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
714 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr) |
715 | - if err != nil { |
716 | - panic("Problem executing zkServer.sh stop: " + err.String() + |
717 | - " (look for runaway java processes!)") |
718 | - } |
719 | - result, err := proc.Wait(0) |
720 | - if err != nil { |
721 | - panic(err.String()) |
722 | - } else if result.ExitStatus() != 0 { |
723 | - panic("'zkServer.sh stop' exited with non-zero status " + |
724 | - "(look for runaway java processes!)") |
725 | - } |
726 | + if s.zkProcess != nil { |
727 | + s.zkProcess.Kill() |
728 | + s.zkProcess.Wait(0) |
729 | + } |
730 | + s.zkProcess = nil |
731 | } |
732 | |
733 | === added directory 'zookeeper' |
734 | === renamed file 'gozk.go' => 'zookeeper.go' |
735 | --- gozk.go 2011-08-19 01:56:39 +0000 |
736 | +++ zookeeper.go 2011-09-24 08:31:23 +0000 |
737 | @@ -1,4 +1,4 @@ |
738 | -// gozk - Zookeeper support for the Go language |
739 | +// gozk - ZooKeeper support for the Go language |
740 | // |
741 | // https://wiki.ubuntu.com/gozk |
742 | // |
743 | @@ -6,7 +6,7 @@ |
744 | // |
745 | // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> |
746 | // |
747 | -package gozk |
748 | +package zookeeper |
749 | |
750 | /* |
751 | #cgo CFLAGS: -I/usr/include/c-client-src |
752 | @@ -27,17 +27,16 @@ |
753 | // ----------------------------------------------------------------------- |
754 | // Main constants and data types. |
755 | |
756 | -// The main ZooKeeper object, created through the Init function. |
757 | -// Encapsulates all communication with ZooKeeper. |
758 | -type ZooKeeper struct { |
759 | +// Conn represents a connection to a set of ZooKeeper nodes. |
760 | +type Conn struct { |
761 | watchChannels map[uintptr]chan Event |
762 | sessionWatchId uintptr |
763 | handle *C.zhandle_t |
764 | mutex sync.Mutex |
765 | } |
766 | |
767 | -// ClientId represents the established session in ZooKeeper. This is only |
768 | -// useful to be passed back into the ReInit function. |
769 | +// ClientId represents an established ZooKeeper session. It can be |
770 | +// passed into Redial to reestablish a connection to an existing session. |
771 | type ClientId struct { |
772 | cId C.clientid_t |
773 | } |
774 | @@ -98,35 +97,60 @@ |
775 | State int |
776 | } |
777 | |
778 | -// Error codes that may be used to verify the result of the |
779 | -// Code method from Error. |
780 | +// Error represents a ZooKeeper error. |
781 | +type Error int |
782 | + |
783 | const ( |
784 | - ZOK = C.ZOK |
785 | - ZSYSTEMERROR = C.ZSYSTEMERROR |
786 | - ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY |
787 | - ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY |
788 | - ZCONNECTIONLOSS = C.ZCONNECTIONLOSS |
789 | - ZMARSHALLINGERROR = C.ZMARSHALLINGERROR |
790 | - ZUNIMPLEMENTED = C.ZUNIMPLEMENTED |
791 | - ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT |
792 | - ZBADARGUMENTS = C.ZBADARGUMENTS |
793 | - ZINVALIDSTATE = C.ZINVALIDSTATE |
794 | - ZAPIERROR = C.ZAPIERROR |
795 | - ZNONODE = C.ZNONODE |
796 | - ZNOAUTH = C.ZNOAUTH |
797 | - ZBADVERSION = C.ZBADVERSION |
798 | - ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS |
799 | - ZNODEEXISTS = C.ZNODEEXISTS |
800 | - ZNOTEMPTY = C.ZNOTEMPTY |
801 | - ZSESSIONEXPIRED = C.ZSESSIONEXPIRED |
802 | - ZINVALIDCALLBACK = C.ZINVALIDCALLBACK |
803 | - ZINVALIDACL = C.ZINVALIDACL |
804 | - ZAUTHFAILED = C.ZAUTHFAILED |
805 | - ZCLOSING = C.ZCLOSING |
806 | - ZNOTHING = C.ZNOTHING |
807 | - ZSESSIONMOVED = C.ZSESSIONMOVED |
808 | + ZOK Error = C.ZOK |
809 | + ZSYSTEMERROR Error = C.ZSYSTEMERROR |
810 | + ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY |
811 | + ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY |
812 | + ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS |
813 | + ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR |
814 | + ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED |
815 | + ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT |
816 | + ZBADARGUMENTS Error = C.ZBADARGUMENTS |
817 | + ZINVALIDSTATE Error = C.ZINVALIDSTATE |
818 | + ZAPIERROR Error = C.ZAPIERROR |
819 | + ZNONODE Error = C.ZNONODE |
820 | + ZNOAUTH Error = C.ZNOAUTH |
821 | + ZBADVERSION Error = C.ZBADVERSION |
822 | + ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS |
823 | + ZNODEEXISTS Error = C.ZNODEEXISTS |
824 | + ZNOTEMPTY Error = C.ZNOTEMPTY |
825 | + ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED |
826 | + ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK |
827 | + ZINVALIDACL Error = C.ZINVALIDACL |
828 | + ZAUTHFAILED Error = C.ZAUTHFAILED |
829 | + ZCLOSING Error = C.ZCLOSING |
830 | + ZNOTHING Error = C.ZNOTHING |
831 | + ZSESSIONMOVED Error = C.ZSESSIONMOVED |
832 | ) |
833 | |
834 | +func (error Error) String() string { |
835 | + return C.GoString(C.zerror(C.int(error))) // Static, no need to free it. |
836 | +} |
837 | + |
838 | +// zkError creates an appropriate error return from |
839 | +// a ZooKeeper status and the errno return from a C API |
840 | +// call. |
841 | +func zkError(rc C.int, cerr os.Error) os.Error { |
842 | + code := Error(rc) |
843 | + switch code { |
844 | + case ZOK: |
845 | + return nil |
846 | + |
847 | + case ZSYSTEMERROR: |
848 | + // If a zookeeper call returns ZSYSTEMERROR, then |
849 | + // errno becomes significant. If errno has not been |
850 | + // set, then we will return ZSYSTEMERROR nonetheless. |
851 | + if cerr != nil { |
852 | + return cerr |
853 | + } |
854 | + } |
855 | + return code |
856 | +} |
857 | + |
858 | // Constants for SetLogLevel. |
859 | const ( |
860 | LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR |
861 | @@ -273,104 +297,54 @@ |
862 | } |
863 | |
864 | // ----------------------------------------------------------------------- |
865 | -// Error interface which maps onto the ZooKeeper error codes. |
866 | - |
867 | -type Error interface { |
868 | - String() string |
869 | - Code() int |
870 | -} |
871 | - |
872 | -type errorType struct { |
873 | - zkrc C.int |
874 | - err os.Error |
875 | -} |
876 | - |
877 | -func newError(zkrc C.int, err os.Error) Error { |
878 | - return &errorType{zkrc, err} |
879 | -} |
880 | - |
881 | -func (error *errorType) String() (result string) { |
882 | - if error.zkrc == ZSYSTEMERROR && error.err != nil { |
883 | - result = error.err.String() |
884 | - } else { |
885 | - result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it. |
886 | - } |
887 | - return |
888 | -} |
889 | - |
890 | -// Code returns the error code that may be compared against one of |
891 | -// the gozk.Z* constants. |
892 | -func (error *errorType) Code() int { |
893 | - return int(error.zkrc) |
894 | -} |
895 | - |
896 | -// ----------------------------------------------------------------------- |
897 | -// Stat interface which maps onto the ZooKeeper Stat struct. |
898 | - |
899 | -// We declare this as an interface rather than an actual struct because |
900 | -// this way we don't have to copy data around between the real C struct |
901 | -// and the Go one on every call. Most uses will only touch a few elements, |
902 | -// or even ignore the stat entirely, so that's a win. |
903 | |
904 | // Stat contains detailed information about a node. |
905 | -type Stat interface { |
906 | - Czxid() int64 |
907 | - Mzxid() int64 |
908 | - CTime() int64 |
909 | - MTime() int64 |
910 | - Version() int32 |
911 | - CVersion() int32 |
912 | - AVersion() int32 |
913 | - EphemeralOwner() int64 |
914 | - DataLength() int32 |
915 | - NumChildren() int32 |
916 | - Pzxid() int64 |
917 | -} |
918 | - |
919 | -type resultStat C.struct_Stat |
920 | - |
921 | -func (stat *resultStat) Czxid() int64 { |
922 | - return int64(stat.czxid) |
923 | -} |
924 | - |
925 | -func (stat *resultStat) Mzxid() int64 { |
926 | - return int64(stat.mzxid) |
927 | -} |
928 | - |
929 | -func (stat *resultStat) CTime() int64 { |
930 | - return int64(stat.ctime) |
931 | -} |
932 | - |
933 | -func (stat *resultStat) MTime() int64 { |
934 | - return int64(stat.mtime) |
935 | -} |
936 | - |
937 | -func (stat *resultStat) Version() int32 { |
938 | - return int32(stat.version) |
939 | -} |
940 | - |
941 | -func (stat *resultStat) CVersion() int32 { |
942 | - return int32(stat.cversion) |
943 | -} |
944 | - |
945 | -func (stat *resultStat) AVersion() int32 { |
946 | - return int32(stat.aversion) |
947 | -} |
948 | - |
949 | -func (stat *resultStat) EphemeralOwner() int64 { |
950 | - return int64(stat.ephemeralOwner) |
951 | -} |
952 | - |
953 | -func (stat *resultStat) DataLength() int32 { |
954 | - return int32(stat.dataLength) |
955 | -} |
956 | - |
957 | -func (stat *resultStat) NumChildren() int32 { |
958 | - return int32(stat.numChildren) |
959 | -} |
960 | - |
961 | -func (stat *resultStat) Pzxid() int64 { |
962 | - return int64(stat.pzxid) |
963 | +type Stat struct { |
964 | + c C.struct_Stat |
965 | +} |
966 | + |
967 | +func (stat *Stat) Czxid() int64 { |
968 | + return int64(stat.c.czxid) |
969 | +} |
970 | + |
971 | +func (stat *Stat) Mzxid() int64 { |
972 | + return int64(stat.c.mzxid) |
973 | +} |
974 | + |
975 | +func (stat *Stat) CTime() int64 { |
976 | + return int64(stat.c.ctime) |
977 | +} |
978 | + |
979 | +func (stat *Stat) MTime() int64 { |
980 | + return int64(stat.c.mtime) |
981 | +} |
982 | + |
983 | +func (stat *Stat) Version() int32 { |
984 | + return int32(stat.c.version) |
985 | +} |
986 | + |
987 | +func (stat *Stat) CVersion() int32 { |
988 | + return int32(stat.c.cversion) |
989 | +} |
990 | + |
991 | +func (stat *Stat) AVersion() int32 { |
992 | + return int32(stat.c.aversion) |
993 | +} |
994 | + |
995 | +func (stat *Stat) EphemeralOwner() int64 { |
996 | + return int64(stat.c.ephemeralOwner) |
997 | +} |
998 | + |
999 | +func (stat *Stat) DataLength() int32 { |
1000 | + return int32(stat.c.dataLength) |
1001 | +} |
1002 | + |
1003 | +func (stat *Stat) NumChildren() int32 { |
1004 | + return int32(stat.c.numChildren) |
1005 | +} |
1006 | + |
1007 | +func (stat *Stat) Pzxid() int64 { |
1008 | + return int64(stat.c.pzxid) |
1009 | } |
1010 | |
1011 | // ----------------------------------------------------------------------- |
1012 | @@ -384,7 +358,7 @@ |
1013 | C.zoo_set_debug_level(C.ZooLogLevel(level)) |
1014 | } |
1015 | |
1016 | -// Init initializes the communication with a ZooKeeper cluster. The provided |
1017 | +// Dial initializes the communication with a ZooKeeper cluster. The provided |
1018 | // servers parameter may include multiple server addresses, separated |
1019 | // by commas, so that the client will automatically attempt to connect |
1020 | // to another server if one of them stops working for whatever reason. |
1021 | @@ -398,21 +372,18 @@ |
1022 | // The watch channel receives events of type SESSION_EVENT when any change |
1023 | // to the state of the established connection happens. See the documentation |
1024 | // for the Event type for more details. |
1025 | -func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) { |
1026 | - zk, watch, err = internalInit(servers, recvTimeoutNS, nil) |
1027 | - return |
1028 | +func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan Event, os.Error) { |
1029 | + return dial(servers, recvTimeoutNS, nil) |
1030 | } |
1031 | |
1032 | -// Equivalent to Init, but attempt to reestablish an existing session |
1033 | +// Redial is equivalent to Dial, but attempts to reestablish an existing session |
1034 | // identified via the clientId parameter. |
1035 | -func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) { |
1036 | - zk, watch, err = internalInit(servers, recvTimeoutNS, clientId) |
1037 | - return |
1038 | +func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1039 | + return dial(servers, recvTimeoutNS, clientId) |
1040 | } |
1041 | |
1042 | -func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) { |
1043 | - |
1044 | - zk := &ZooKeeper{} |
1045 | +func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan Event, os.Error) { |
1046 | + zk := &Conn{} |
1047 | zk.watchChannels = make(map[uintptr]chan Event) |
1048 | |
1049 | var cId *C.clientid_t |
1050 | @@ -424,11 +395,11 @@ |
1051 | zk.sessionWatchId = watchId |
1052 | |
1053 | cservers := C.CString(servers) |
1054 | - handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0) |
1055 | + handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0) |
1056 | C.free(unsafe.Pointer(cservers)) |
1057 | if handle == nil { |
1058 | zk.closeAllWatches() |
1059 | - return nil, nil, newError(ZSYSTEMERROR, cerr) |
1060 | + return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr) |
1061 | } |
1062 | zk.handle = handle |
1063 | runWatchLoop() |
1064 | @@ -437,12 +408,12 @@ |
1065 | |
1066 | // ClientId returns the client ID for the existing session with ZooKeeper. |
1067 | // This is useful to reestablish an existing session via ReInit. |
1068 | -func (zk *ZooKeeper) ClientId() *ClientId { |
1069 | +func (zk *Conn) ClientId() *ClientId { |
1070 | return &ClientId{*C.zoo_client_id(zk.handle)} |
1071 | } |
1072 | |
1073 | // Close terminates the ZooKeeper interaction. |
1074 | -func (zk *ZooKeeper) Close() Error { |
1075 | +func (zk *Conn) Close() os.Error { |
1076 | |
1077 | // Protect from concurrency around zk.handle change. |
1078 | zk.mutex.Lock() |
1079 | @@ -451,7 +422,7 @@ |
1080 | if zk.handle == nil { |
1081 | // ZooKeeper may hang indefinitely if a handler is closed twice, |
1082 | // so we get in the way and prevent it from happening. |
1083 | - return newError(ZCLOSING, nil) |
1084 | + return ZCLOSING |
1085 | } |
1086 | rc, cerr := C.zookeeper_close(zk.handle) |
1087 | |
1088 | @@ -461,16 +432,13 @@ |
1089 | // At this point, nothing else should need zk.handle. |
1090 | zk.handle = nil |
1091 | |
1092 | - if rc != C.ZOK { |
1093 | - return newError(rc, cerr) |
1094 | - } |
1095 | - return nil |
1096 | + return zkError(rc, cerr) |
1097 | } |
1098 | |
1099 | // Get returns the data and status from an existing node. err will be nil, |
1100 | // unless an error is found. Attempting to retrieve data from a non-existing |
1101 | // node is an error. |
1102 | -func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) { |
1103 | +func (zk *Conn) Get(path string) (data string, stat *Stat, err os.Error) { |
1104 | |
1105 | cpath := C.CString(path) |
1106 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1107 | @@ -478,22 +446,22 @@ |
1108 | defer C.free(unsafe.Pointer(cpath)) |
1109 | defer C.free(unsafe.Pointer(cbuffer)) |
1110 | |
1111 | - cstat := C.struct_Stat{} |
1112 | + var cstat Stat |
1113 | rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil, |
1114 | - cbuffer, &cbufferLen, &cstat) |
1115 | + cbuffer, &cbufferLen, &cstat.c) |
1116 | if rc != C.ZOK { |
1117 | - return "", nil, newError(rc, cerr) |
1118 | + return "", nil, zkError(rc, cerr) |
1119 | } |
1120 | + |
1121 | result := C.GoStringN(cbuffer, cbufferLen) |
1122 | - |
1123 | - return result, (*resultStat)(&cstat), nil |
1124 | + return result, &cstat, nil |
1125 | } |
1126 | |
1127 | // GetW works like Get but also returns a channel that will receive |
1128 | // a single Event value when the data or existence of the given ZooKeeper |
1129 | // node changes or when critical session events happen. See the |
1130 | // documentation of the Event type for more details. |
1131 | -func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) { |
1132 | +func (zk *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err os.Error) { |
1133 | |
1134 | cpath := C.CString(path) |
1135 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
1136 | @@ -503,40 +471,39 @@ |
1137 | |
1138 | watchId, watchChannel := zk.createWatch(true) |
1139 | |
1140 | - cstat := C.struct_Stat{} |
1141 | + var cstat Stat |
1142 | rc, cerr := C.zoo_wget(zk.handle, cpath, |
1143 | C.watch_handler, unsafe.Pointer(watchId), |
1144 | - cbuffer, &cbufferLen, &cstat) |
1145 | + cbuffer, &cbufferLen, &cstat.c) |
1146 | if rc != C.ZOK { |
1147 | zk.forgetWatch(watchId) |
1148 | - return "", nil, nil, newError(rc, cerr) |
1149 | + return "", nil, nil, zkError(rc, cerr) |
1150 | } |
1151 | |
1152 | result := C.GoStringN(cbuffer, cbufferLen) |
1153 | - return result, (*resultStat)(&cstat), watchChannel, nil |
1154 | + return result, &cstat, watchChannel, nil |
1155 | } |
1156 | |
1157 | // Children returns the children list and status from an existing node. |
1158 | -// err will be nil, unless an error is found. Attempting to retrieve the |
1159 | -// children list from a non-existent node is an error. |
1160 | -func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) { |
1161 | +// Attempting to retrieve the children list from a non-existent node is an error. |
1162 | +func (zk *Conn) Children(path string) (children []string, stat *Stat, err os.Error) { |
1163 | |
1164 | cpath := C.CString(path) |
1165 | defer C.free(unsafe.Pointer(cpath)) |
1166 | |
1167 | cvector := C.struct_String_vector{} |
1168 | - cstat := C.struct_Stat{} |
1169 | + var cstat Stat |
1170 | rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil, |
1171 | - &cvector, &cstat) |
1172 | + &cvector, &cstat.c) |
1173 | |
1174 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1175 | if cvector.count != 0 { |
1176 | children = parseStringVector(&cvector) |
1177 | } |
1178 | - if rc != C.ZOK { |
1179 | - err = newError(rc, cerr) |
1180 | + if rc == C.ZOK { |
1181 | + stat = &cstat |
1182 | } else { |
1183 | - stat = (*resultStat)(&cstat) |
1184 | + err = zkError(rc, cerr) |
1185 | } |
1186 | return |
1187 | } |
1188 | @@ -545,7 +512,7 @@ |
1189 | // receive a single Event value when a node is added or removed under the |
1190 | // provided path or when critical session events happen. See the documentation |
1191 | // of the Event type for more details. |
1192 | -func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) { |
1193 | +func (zk *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err os.Error) { |
1194 | |
1195 | cpath := C.CString(path) |
1196 | defer C.free(unsafe.Pointer(cpath)) |
1197 | @@ -553,21 +520,21 @@ |
1198 | watchId, watchChannel := zk.createWatch(true) |
1199 | |
1200 | cvector := C.struct_String_vector{} |
1201 | - cstat := C.struct_Stat{} |
1202 | + var cstat Stat |
1203 | rc, cerr := C.zoo_wget_children2(zk.handle, cpath, |
1204 | C.watch_handler, unsafe.Pointer(watchId), |
1205 | - &cvector, &cstat) |
1206 | + &cvector, &cstat.c) |
1207 | |
1208 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
1209 | if cvector.count != 0 { |
1210 | children = parseStringVector(&cvector) |
1211 | } |
1212 | - if rc != C.ZOK { |
1213 | + if rc == C.ZOK { |
1214 | + stat = &cstat |
1215 | + watch = watchChannel |
1216 | + } else { |
1217 | zk.forgetWatch(watchId) |
1218 | - err = newError(rc, cerr) |
1219 | - } else { |
1220 | - stat = (*resultStat)(&cstat) |
1221 | - watch = watchChannel |
1222 | + err = zkError(rc, cerr) |
1223 | } |
1224 | return |
1225 | } |
1226 | @@ -588,20 +555,20 @@ |
1227 | // Exists checks if a node exists at the given path. If it does, |
1228 | // stat will contain meta information on the existing node, otherwise |
1229 | // it will be nil. |
1230 | -func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) { |
1231 | +func (zk *Conn) Exists(path string) (stat *Stat, err os.Error) { |
1232 | cpath := C.CString(path) |
1233 | defer C.free(unsafe.Pointer(cpath)) |
1234 | |
1235 | - cstat := C.struct_Stat{} |
1236 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat) |
1237 | + var cstat Stat |
1238 | + rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &stat.c) |
1239 | |
1240 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1241 | // for an exists call, otherwise every Exists call would have to check |
1242 | // for err != nil and err.Code() != ZNONODE. |
1243 | if rc == C.ZOK { |
1244 | - stat = (*resultStat)(&cstat) |
1245 | + stat = &cstat |
1246 | } else if rc != C.ZNONODE { |
1247 | - err = newError(rc, cerr) |
1248 | + err = zkError(rc, cerr) |
1249 | } |
1250 | return |
1251 | } |
1252 | @@ -611,28 +578,28 @@ |
1253 | // stat is nil and the node didn't exist, or when the existing node |
1254 | // is removed. It will also receive critical session events. See the |
1255 | // documentation of the Event type for more details. |
1256 | -func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) { |
1257 | +func (zk *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err os.Error) { |
1258 | cpath := C.CString(path) |
1259 | defer C.free(unsafe.Pointer(cpath)) |
1260 | |
1261 | watchId, watchChannel := zk.createWatch(true) |
1262 | |
1263 | - cstat := C.struct_Stat{} |
1264 | + var cstat Stat |
1265 | rc, cerr := C.zoo_wexists(zk.handle, cpath, |
1266 | - C.watch_handler, unsafe.Pointer(watchId), &cstat) |
1267 | + C.watch_handler, unsafe.Pointer(watchId), &cstat.c) |
1268 | |
1269 | // We diverge a bit from the usual here: a ZNONODE is not an error |
1270 | // for an exists call, otherwise every Exists call would have to check |
1271 | // for err != nil and err.Code() != ZNONODE. |
1272 | - switch rc { |
1273 | + switch Error(rc) { |
1274 | case ZOK: |
1275 | - stat = (*resultStat)(&cstat) |
1276 | + stat = &cstat |
1277 | watch = watchChannel |
1278 | case ZNONODE: |
1279 | watch = watchChannel |
1280 | default: |
1281 | zk.forgetWatch(watchId) |
1282 | - err = newError(rc, cerr) |
1283 | + err = zkError(rc, cerr) |
1284 | } |
1285 | return |
1286 | } |
1287 | @@ -646,7 +613,7 @@ |
1288 | // The returned path is useful in cases where the created path may differ |
1289 | // from the requested one, such as when a sequence number is appended |
1290 | // to it due to the use of the gozk.SEQUENCE flag. |
1291 | -func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) { |
1292 | +func (zk *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) { |
1293 | cpath := C.CString(path) |
1294 | cvalue := C.CString(value) |
1295 | defer C.free(unsafe.Pointer(cpath)) |
1296 | @@ -662,11 +629,12 @@ |
1297 | |
1298 | rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)), |
1299 | caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
1300 | - if rc != C.ZOK { |
1301 | - return "", newError(rc, cerr) |
1302 | + if rc == C.ZOK { |
1303 | + pathCreated = C.GoString(cpathCreated) |
1304 | + } else { |
1305 | + err = zkError(rc, cerr) |
1306 | } |
1307 | - |
1308 | - return C.GoString(cpathCreated), nil |
1309 | + return |
1310 | } |
1311 | |
1312 | // Set modifies the data for the existing node at the given path, replacing it |
1313 | @@ -677,35 +645,32 @@ |
1314 | // |
1315 | // It is an error to attempt to set the data of a non-existing node with |
1316 | // this function. In these cases, use Create instead. |
1317 | -func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) { |
1318 | +func (zk *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) { |
1319 | |
1320 | cpath := C.CString(path) |
1321 | cvalue := C.CString(value) |
1322 | defer C.free(unsafe.Pointer(cpath)) |
1323 | defer C.free(unsafe.Pointer(cvalue)) |
1324 | |
1325 | - cstat := C.struct_Stat{} |
1326 | - |
1327 | + var cstat Stat |
1328 | rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)), |
1329 | - C.int(version), &cstat) |
1330 | - if rc != C.ZOK { |
1331 | - return nil, newError(rc, cerr) |
1332 | + C.int(version), &cstat.c) |
1333 | + if rc == C.ZOK { |
1334 | + stat = &cstat |
1335 | + } else { |
1336 | + err = zkError(rc, cerr) |
1337 | } |
1338 | - |
1339 | - return (*resultStat)(&cstat), nil |
1340 | + return |
1341 | } |
1342 | |
1343 | // Delete removes the node at path. If version is not -1, the operation |
1344 | // will only succeed if the node is still at this version when the |
1345 | // node is deleted as an atomic operation. |
1346 | -func (zk *ZooKeeper) Delete(path string, version int32) (err Error) { |
1347 | +func (zk *Conn) Delete(path string, version int32) (err os.Error) { |
1348 | cpath := C.CString(path) |
1349 | defer C.free(unsafe.Pointer(cpath)) |
1350 | rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version)) |
1351 | - if rc != C.ZOK { |
1352 | - return newError(rc, cerr) |
1353 | - } |
1354 | - return nil |
1355 | + return zkError(rc, cerr) |
1356 | } |
1357 | |
1358 | // AddAuth adds a new authentication certificate to the ZooKeeper |
1359 | @@ -713,7 +678,7 @@ |
1360 | // authentication information, while the cert parameter provides the |
1361 | // identity data itself. For instance, the "digest" scheme requires |
1362 | // a pair like "username:password" to be provided as the certificate. |
1363 | -func (zk *ZooKeeper) AddAuth(scheme, cert string) Error { |
1364 | +func (zk *Conn) AddAuth(scheme, cert string) os.Error { |
1365 | cscheme := C.CString(scheme) |
1366 | ccert := C.CString(cert) |
1367 | defer C.free(unsafe.Pointer(cscheme)) |
1368 | @@ -728,40 +693,36 @@ |
1369 | rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)), |
1370 | C.handle_void_completion, unsafe.Pointer(data)) |
1371 | if rc != C.ZOK { |
1372 | - return newError(rc, cerr) |
1373 | + return zkError(rc, cerr) |
1374 | } |
1375 | |
1376 | C.wait_for_completion(data) |
1377 | |
1378 | rc = C.int(uintptr(data.data)) |
1379 | - if rc != C.ZOK { |
1380 | - return newError(rc, nil) |
1381 | - } |
1382 | - |
1383 | - return nil |
1384 | + return zkError(rc, nil) |
1385 | } |
1386 | |
1387 | // ACL returns the access control list for path. |
1388 | -func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) { |
1389 | +func (zk *Conn) ACL(path string) ([]ACL, *Stat, os.Error) { |
1390 | |
1391 | cpath := C.CString(path) |
1392 | defer C.free(unsafe.Pointer(cpath)) |
1393 | |
1394 | caclv := C.struct_ACL_vector{} |
1395 | - cstat := C.struct_Stat{} |
1396 | |
1397 | - rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat) |
1398 | + var cstat Stat |
1399 | + rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat.c) |
1400 | if rc != C.ZOK { |
1401 | - return nil, nil, newError(rc, cerr) |
1402 | + return nil, nil, zkError(rc, cerr) |
1403 | } |
1404 | |
1405 | aclv := parseACLVector(&caclv) |
1406 | |
1407 | - return aclv, (*resultStat)(&cstat), nil |
1408 | + return aclv, &cstat, nil |
1409 | } |
1410 | |
1411 | // SetACL changes the access control list for path. |
1412 | -func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error { |
1413 | +func (zk *Conn) SetACL(path string, aclv []ACL, version int32) os.Error { |
1414 | |
1415 | cpath := C.CString(path) |
1416 | defer C.free(unsafe.Pointer(cpath)) |
1417 | @@ -770,11 +731,7 @@ |
1418 | defer C.deallocate_ACL_vector(caclv) |
1419 | |
1420 | rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv) |
1421 | - if rc != C.ZOK { |
1422 | - return newError(rc, cerr) |
1423 | - } |
1424 | - |
1425 | - return nil |
1426 | + return zkError(rc, cerr) |
1427 | } |
1428 | |
1429 | func parseACLVector(caclv *C.struct_ACL_vector) []ACL { |
1430 | @@ -822,7 +779,7 @@ |
1431 | // ----------------------------------------------------------------------- |
1432 | // RetryChange utility method. |
1433 | |
1434 | -type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error) |
1435 | +type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error) |
1436 | |
1437 | // RetryChange runs changeFunc to attempt to atomically change path |
1438 | // in a lock free manner, and retries in case there was another |
1439 | @@ -831,8 +788,7 @@ |
1440 | // changeFunc must work correctly if called multiple times in case |
1441 | // the modification fails due to concurrent changes, and it may return |
1442 | // an error that will cause the the RetryChange function to stop and |
1443 | -// return an Error with code ZSYSTEMERROR and the same .String() result |
1444 | -// as the provided error. |
1445 | +// return the same error. |
1446 | // |
1447 | // This mechanism is not suitable for a node that is frequently modified |
1448 | // concurrently. For those cases, consider using a pessimistic locking |
1449 | @@ -845,8 +801,7 @@ |
1450 | // |
1451 | // 2. Call the changeFunc with the current node value and stat, |
1452 | // or with an empty string and nil stat, if the node doesn't yet exist. |
1453 | -// If the changeFunc returns an error, stop and return an Error with |
1454 | -// ZSYSTEMERROR Code() and the same String() as the changeFunc error. |
1455 | +// If the changeFunc returns an error, stop and return the same error. |
1456 | // |
1457 | // 3. If the changeFunc returns no errors, use the string returned as |
1458 | // the new candidate value for the node, and attempt to either create |
1459 | @@ -855,36 +810,32 @@ |
1460 | // in the same node), repeat from step 1. If this procedure fails with any |
1461 | // other error, stop and return the error found. |
1462 | // |
1463 | -func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) { |
1464 | +func (zk *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error { |
1465 | for { |
1466 | - oldValue, oldStat, getErr := zk.Get(path) |
1467 | - if getErr != nil && getErr.Code() != ZNONODE { |
1468 | - err = getErr |
1469 | - break |
1470 | - } |
1471 | - newValue, osErr := changeFunc(oldValue, oldStat) |
1472 | - if osErr != nil { |
1473 | - return newError(ZSYSTEMERROR, osErr) |
1474 | - } else if oldStat == nil { |
1475 | - _, err = zk.Create(path, newValue, flags, acl) |
1476 | - if err == nil || err.Code() != ZNODEEXISTS { |
1477 | - break |
1478 | + oldValue, oldStat, err := zk.Get(path) |
1479 | + if err != nil && err != ZNONODE { |
1480 | + return err |
1481 | + } |
1482 | + newValue, err := changeFunc(oldValue, oldStat) |
1483 | + if err != nil { |
1484 | + return err |
1485 | + } |
1486 | + if oldStat == nil { |
1487 | + _, err := zk.Create(path, newValue, flags, acl) |
1488 | + if err == nil || err != ZNODEEXISTS { |
1489 | + return err |
1490 | } |
1491 | - } else if newValue == oldValue { |
1492 | + continue |
1493 | + } |
1494 | + if newValue == oldValue { |
1495 | return nil // Nothing to do. |
1496 | - } else { |
1497 | - _, err = zk.Set(path, newValue, oldStat.Version()) |
1498 | - if err == nil { |
1499 | - break |
1500 | - } else { |
1501 | - code := err.Code() |
1502 | - if code != ZBADVERSION && code != ZNONODE { |
1503 | - break |
1504 | - } |
1505 | - } |
1506 | + } |
1507 | + _, err = zk.Set(path, newValue, oldStat.Version()) |
1508 | + if err == nil || (err != ZBADVERSION && err != ZNONODE) { |
1509 | + return err |
1510 | } |
1511 | } |
1512 | - return err |
1513 | + panic("not reached") |
1514 | } |
1515 | |
1516 | // ----------------------------------------------------------------------- |
1517 | @@ -897,7 +848,7 @@ |
1518 | // Whenever a *W method is called, it will return a channel which |
1519 | // outputs Event values. Internally, a map is used to maintain references |
1520 | // between an unique integer key (the watchId), and the event channel. The |
1521 | -// watchId is then handed to the C zookeeper library as the watch context, |
1522 | +// watchId is then handed to the C ZooKeeper library as the watch context, |
1523 | // so that we get it back when events happen. Using an integer key as the |
1524 | // watch context rather than a pointer is needed because there's no guarantee |
1525 | // that in the future the GC will not move objects around, and also because |
1526 | @@ -910,13 +861,13 @@ |
1527 | // Since Cgo doesn't allow calling back into Go, we actually fire a new |
1528 | // goroutine the very first time Init is called, and allow it to block |
1529 | // in a pthread condition variable within a C function. This condition |
1530 | -// will only be notified once a zookeeper watch callback appends new |
1531 | +// will only be notified once a ZooKeeper watch callback appends new |
1532 | // entries to the event list. When this happens, the C function returns |
1533 | // and we get back into Go land with the pointer to the watch data, |
1534 | // including the watchId and other event details such as type and path. |
1535 | |
1536 | var watchMutex sync.Mutex |
1537 | -var watchZooKeepers = make(map[uintptr]*ZooKeeper) |
1538 | +var watchConns = make(map[uintptr]*Conn) |
1539 | var watchCounter uintptr |
1540 | var watchLoopCounter int |
1541 | |
1542 | @@ -925,14 +876,14 @@ |
1543 | // mostly as a debugging and testing aid. |
1544 | func CountPendingWatches() int { |
1545 | watchMutex.Lock() |
1546 | - count := len(watchZooKeepers) |
1547 | + count := len(watchConns) |
1548 | watchMutex.Unlock() |
1549 | return count |
1550 | } |
1551 | |
1552 | // createWatch creates and registers a watch, returning the watch id |
1553 | // and channel. |
1554 | -func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
1555 | +func (zk *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
1556 | buf := 1 // session/watch event |
1557 | if session { |
1558 | buf = 32 |
1559 | @@ -943,7 +894,7 @@ |
1560 | watchId = watchCounter |
1561 | watchCounter += 1 |
1562 | zk.watchChannels[watchId] = watchChannel |
1563 | - watchZooKeepers[watchId] = zk |
1564 | + watchConns[watchId] = zk |
1565 | return |
1566 | } |
1567 | |
1568 | @@ -951,21 +902,21 @@ |
1569 | // from ever getting delivered. It shouldn't be used if there's any |
1570 | // chance the watch channel is still visible and not closed, since |
1571 | // it might mean a goroutine would be blocked forever. |
1572 | -func (zk *ZooKeeper) forgetWatch(watchId uintptr) { |
1573 | +func (zk *Conn) forgetWatch(watchId uintptr) { |
1574 | watchMutex.Lock() |
1575 | defer watchMutex.Unlock() |
1576 | zk.watchChannels[watchId] = nil, false |
1577 | - watchZooKeepers[watchId] = nil, false |
1578 | + watchConns[watchId] = nil, false |
1579 | } |
1580 | |
1581 | // closeAllWatches closes all watch channels for zk. |
1582 | -func (zk *ZooKeeper) closeAllWatches() { |
1583 | +func (zk *Conn) closeAllWatches() { |
1584 | watchMutex.Lock() |
1585 | defer watchMutex.Unlock() |
1586 | for watchId, ch := range zk.watchChannels { |
1587 | close(ch) |
1588 | zk.watchChannels[watchId] = nil, false |
1589 | - watchZooKeepers[watchId] = nil, false |
1590 | + watchConns[watchId] = nil, false |
1591 | } |
1592 | } |
1593 | |
1594 | @@ -978,7 +929,7 @@ |
1595 | } |
1596 | watchMutex.Lock() |
1597 | defer watchMutex.Unlock() |
1598 | - zk, ok := watchZooKeepers[watchId] |
1599 | + zk, ok := watchConns[watchId] |
1600 | if !ok { |
1601 | return |
1602 | } |
1603 | @@ -1010,7 +961,7 @@ |
1604 | } |
1605 | if watchId != zk.sessionWatchId { |
1606 | zk.watchChannels[watchId] = nil, false |
1607 | - watchZooKeepers[watchId] = nil, false |
1608 | + watchConns[watchId] = nil, false |
1609 | close(ch) |
1610 | } |
1611 | } |
1612 | |
1613 | === renamed file 'gozk_test.go' => 'zookeeper_test.go' |
1614 | --- gozk_test.go 2011-08-19 01:51:37 +0000 |
1615 | +++ zookeeper_test.go 2011-09-24 08:31:23 +0000 |
1616 | @@ -1,15 +1,15 @@ |
1617 | -package gozk_test |
1618 | +package zookeeper_test |
1619 | |
1620 | import ( |
1621 | . "launchpad.net/gocheck" |
1622 | - "gozk" |
1623 | + "launchpad.net/zookeeper" |
1624 | "time" |
1625 | ) |
1626 | |
1627 | // This error will be delivered via C errno, since ZK unfortunately |
1628 | // only provides the handler back from zookeeper_init(). |
1629 | func (s *S) TestInitErrorThroughErrno(c *C) { |
1630 | - zk, watch, err := gozk.Init("bad-domain-without-port", 5e9) |
1631 | + zk, watch, err := zookeeper.Dial("bad-domain-without-port", 5e9) |
1632 | if zk != nil { |
1633 | zk.Close() |
1634 | } |
1635 | @@ -29,7 +29,7 @@ |
1636 | } |
1637 | |
1638 | func (s *S) TestRecvTimeoutInitParameter(c *C) { |
1639 | - zk, watch, err := gozk.Init(s.zkAddr, 0) |
1640 | + zk, watch, err := zookeeper.Dial(s.zkAddr, 0) |
1641 | c.Assert(err, IsNil) |
1642 | defer zk.Close() |
1643 | |
1644 | @@ -51,38 +51,38 @@ |
1645 | } |
1646 | |
1647 | func (s *S) TestSessionWatches(c *C) { |
1648 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
1649 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 0) |
1650 | |
1651 | zk1, watch1 := s.init(c) |
1652 | zk2, watch2 := s.init(c) |
1653 | zk3, watch3 := s.init(c) |
1654 | |
1655 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
1656 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 3) |
1657 | |
1658 | event1 := <-watch1 |
1659 | - c.Assert(event1.Type, Equals, gozk.EVENT_SESSION) |
1660 | - c.Assert(event1.State, Equals, gozk.STATE_CONNECTED) |
1661 | + c.Assert(event1.Type, Equals, zookeeper.EVENT_SESSION) |
1662 | + c.Assert(event1.State, Equals, zookeeper.STATE_CONNECTED) |
1663 | |
1664 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
1665 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 3) |
1666 | |
1667 | event2 := <-watch2 |
1668 | - c.Assert(event2.Type, Equals, gozk.EVENT_SESSION) |
1669 | - c.Assert(event2.State, Equals, gozk.STATE_CONNECTED) |
1670 | + c.Assert(event2.Type, Equals, zookeeper.EVENT_SESSION) |
1671 | + c.Assert(event2.State, Equals, zookeeper.STATE_CONNECTED) |
1672 | |
1673 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
1674 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 3) |
1675 | |
1676 | event3 := <-watch3 |
1677 | - c.Assert(event3.Type, Equals, gozk.EVENT_SESSION) |
1678 | - c.Assert(event3.State, Equals, gozk.STATE_CONNECTED) |
1679 | + c.Assert(event3.Type, Equals, zookeeper.EVENT_SESSION) |
1680 | + c.Assert(event3.State, Equals, zookeeper.STATE_CONNECTED) |
1681 | |
1682 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
1683 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 3) |
1684 | |
1685 | zk1.Close() |
1686 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
1687 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 2) |
1688 | zk2.Close() |
1689 | - c.Assert(gozk.CountPendingWatches(), Equals, 1) |
1690 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 1) |
1691 | zk3.Close() |
1692 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
1693 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 0) |
1694 | } |
1695 | |
1696 | // Gozk injects a STATE_CLOSED event when zk.Close() is called, right |
1697 | @@ -95,32 +95,35 @@ |
1698 | zk, watch := s.init(c) |
1699 | |
1700 | event := <-watch |
1701 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
1702 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
1703 | + c.Assert(event.Type, Equals, zookeeper.EVENT_SESSION) |
1704 | + c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED) |
1705 | |
1706 | zk.Close() |
1707 | event, ok := <-watch |
1708 | c.Assert(ok, Equals, false) |
1709 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
1710 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
1711 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CLOSED) |
1712 | + c.Assert(event.State, Equals, zookeeper.STATE_CLOSED) |
1713 | } |
1714 | |
1715 | func (s *S) TestEventString(c *C) { |
1716 | - var event gozk.Event |
1717 | - event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED} |
1718 | + var event zookeeper.Event |
1719 | + event = zookeeper.Event{zookeeper.EVENT_SESSION, "/path", zookeeper.STATE_CONNECTED} |
1720 | c.Assert(event, Matches, "ZooKeeper connected") |
1721 | - event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED} |
1722 | + event = zookeeper.Event{zookeeper.EVENT_CREATED, "/path", zookeeper.STATE_CONNECTED} |
1723 | c.Assert(event, Matches, "ZooKeeper connected; path created: /path") |
1724 | - event = gozk.Event{-1, "/path", gozk.STATE_CLOSED} |
1725 | + event = zookeeper.Event{-1, "/path", zookeeper.STATE_CLOSED} |
1726 | c.Assert(event, Matches, "ZooKeeper connection closed") |
1727 | } |
1728 | |
1729 | -var okTests = []struct{gozk.Event; Ok bool}{ |
1730 | - {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true}, |
1731 | - {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true}, |
1732 | - {gozk.Event{0, "", gozk.STATE_CLOSED}, false}, |
1733 | - {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false}, |
1734 | - {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false}, |
1735 | +var okTests = []struct { |
1736 | + zookeeper.Event |
1737 | + Ok bool |
1738 | +}{ |
1739 | + {zookeeper.Event{zookeeper.EVENT_SESSION, "", zookeeper.STATE_CONNECTED}, true}, |
1740 | + {zookeeper.Event{zookeeper.EVENT_CREATED, "", zookeeper.STATE_CONNECTED}, true}, |
1741 | + {zookeeper.Event{0, "", zookeeper.STATE_CLOSED}, false}, |
1742 | + {zookeeper.Event{0, "", zookeeper.STATE_EXPIRED_SESSION}, false}, |
1743 | + {zookeeper.Event{0, "", zookeeper.STATE_AUTH_FAILED}, false}, |
1744 | } |
1745 | |
1746 | func (s *S) TestEventOk(c *C) { |
1747 | @@ -156,18 +159,18 @@ |
1748 | c.Assert(data, Equals, "") |
1749 | c.Assert(stat, IsNil) |
1750 | c.Assert(err, Matches, "no node") |
1751 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
1752 | + c.Assert(err, Equals, zookeeper.ZNONODE) |
1753 | } |
1754 | |
1755 | func (s *S) TestCreateAndGet(c *C) { |
1756 | zk, _ := s.init(c) |
1757 | |
1758 | - path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
1759 | + path, err := zk.Create("/test-", "bababum", zookeeper.SEQUENCE|zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
1760 | c.Assert(err, IsNil) |
1761 | c.Assert(path, Matches, "/test-[0-9]+") |
1762 | |
1763 | // Check the error condition from Create(). |
1764 | - _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
1765 | + _, err = zk.Create(path, "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
1766 | c.Assert(err, Matches, "node exists") |
1767 | |
1768 | data, _, err := zk.Get(path) |
1769 | @@ -178,7 +181,7 @@ |
1770 | func (s *S) TestCreateSetAndGet(c *C) { |
1771 | zk, _ := s.init(c) |
1772 | |
1773 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
1774 | + _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
1775 | c.Assert(err, IsNil) |
1776 | |
1777 | stat, err := zk.Set("/test", "bababum", -1) // Any version. |
1778 | @@ -191,13 +194,13 @@ |
1779 | } |
1780 | |
1781 | func (s *S) TestGetAndWatch(c *C) { |
1782 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
1783 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
1784 | |
1785 | zk, _ := s.init(c) |
1786 | |
1787 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1788 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1789 | |
1790 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
1791 | + _, err := zk.Create("/test", "one", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
1792 | c.Assert(err, IsNil) |
1793 | |
1794 | data, stat, watch, err := zk.GetW("/test") |
1795 | @@ -211,15 +214,15 @@ |
1796 | default: |
1797 | } |
1798 | |
1799 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
1800 | + c.Check(zookeeper.CountPendingWatches(), Equals, 2) |
1801 | |
1802 | _, err = zk.Set("/test", "two", -1) |
1803 | c.Assert(err, IsNil) |
1804 | |
1805 | event := <-watch |
1806 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
1807 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CHANGED) |
1808 | |
1809 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1810 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1811 | |
1812 | data, _, watch, err = zk.GetW("/test") |
1813 | c.Assert(err, IsNil) |
1814 | @@ -231,50 +234,50 @@ |
1815 | default: |
1816 | } |
1817 | |
1818 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
1819 | + c.Check(zookeeper.CountPendingWatches(), Equals, 2) |
1820 | |
1821 | _, err = zk.Set("/test", "three", -1) |
1822 | c.Assert(err, IsNil) |
1823 | |
1824 | event = <-watch |
1825 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
1826 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CHANGED) |
1827 | |
1828 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1829 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1830 | } |
1831 | |
1832 | func (s *S) TestGetAndWatchWithError(c *C) { |
1833 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
1834 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
1835 | |
1836 | zk, _ := s.init(c) |
1837 | |
1838 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1839 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1840 | |
1841 | _, _, watch, err := zk.GetW("/test") |
1842 | c.Assert(err, NotNil) |
1843 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
1844 | + c.Assert(err, Equals, zookeeper.ZNONODE) |
1845 | c.Assert(watch, IsNil) |
1846 | |
1847 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1848 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1849 | } |
1850 | |
1851 | func (s *S) TestCloseReleasesWatches(c *C) { |
1852 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
1853 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
1854 | |
1855 | zk, _ := s.init(c) |
1856 | |
1857 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1858 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1859 | |
1860 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
1861 | + _, err := zk.Create("/test", "one", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
1862 | c.Assert(err, IsNil) |
1863 | |
1864 | _, _, _, err = zk.GetW("/test") |
1865 | c.Assert(err, IsNil) |
1866 | |
1867 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
1868 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 2) |
1869 | |
1870 | zk.Close() |
1871 | |
1872 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
1873 | + c.Assert(zookeeper.CountPendingWatches(), Equals, 0) |
1874 | } |
1875 | |
1876 | // By default, the ZooKeeper C client will hang indefinitely if a |
1877 | @@ -285,7 +288,7 @@ |
1878 | c.Assert(err, IsNil) |
1879 | err = zk.Close() |
1880 | c.Assert(err, NotNil) |
1881 | - c.Assert(err.Code(), Equals, gozk.ZCLOSING) |
1882 | + c.Assert(err, Equals, zookeeper.ZCLOSING) |
1883 | } |
1884 | |
1885 | func (s *S) TestChildren(c *C) { |
1886 | @@ -298,17 +301,17 @@ |
1887 | |
1888 | children, stat, err = zk.Children("/non-existent") |
1889 | c.Assert(err, NotNil) |
1890 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
1891 | + c.Assert(err, Equals, zookeeper.ZNONODE) |
1892 | c.Assert(children, Equals, []string{}) |
1893 | - c.Assert(stat, Equals, nil) |
1894 | + c.Assert(stat, IsNil) |
1895 | } |
1896 | |
1897 | func (s *S) TestChildrenAndWatch(c *C) { |
1898 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
1899 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
1900 | |
1901 | zk, _ := s.init(c) |
1902 | |
1903 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1904 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1905 | |
1906 | children, stat, watch, err := zk.ChildrenW("/") |
1907 | c.Assert(err, IsNil) |
1908 | @@ -321,16 +324,16 @@ |
1909 | default: |
1910 | } |
1911 | |
1912 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
1913 | + c.Check(zookeeper.CountPendingWatches(), Equals, 2) |
1914 | |
1915 | - _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
1916 | + _, err = zk.Create("/test1", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
1917 | c.Assert(err, IsNil) |
1918 | |
1919 | event := <-watch |
1920 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
1921 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CHILD) |
1922 | c.Assert(event.Path, Equals, "/") |
1923 | |
1924 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1925 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1926 | |
1927 | children, stat, watch, err = zk.ChildrenW("/") |
1928 | c.Assert(err, IsNil) |
1929 | @@ -345,57 +348,56 @@ |
1930 | default: |
1931 | } |
1932 | |
1933 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
1934 | + c.Check(zookeeper.CountPendingWatches(), Equals, 2) |
1935 | |
1936 | - _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
1937 | + _, err = zk.Create("/test2", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
1938 | c.Assert(err, IsNil) |
1939 | |
1940 | event = <-watch |
1941 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
1942 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CHILD) |
1943 | |
1944 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1945 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1946 | } |
1947 | |
1948 | func (s *S) TestChildrenAndWatchWithError(c *C) { |
1949 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
1950 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
1951 | |
1952 | zk, _ := s.init(c) |
1953 | |
1954 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1955 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1956 | |
1957 | _, stat, watch, err := zk.ChildrenW("/test") |
1958 | c.Assert(err, NotNil) |
1959 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
1960 | + c.Assert(err, Equals, zookeeper.ZNONODE) |
1961 | c.Assert(watch, IsNil) |
1962 | c.Assert(stat, IsNil) |
1963 | |
1964 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1965 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1966 | } |
1967 | |
1968 | func (s *S) TestExists(c *C) { |
1969 | zk, _ := s.init(c) |
1970 | |
1971 | - stat, err := zk.Exists("/zookeeper") |
1972 | - c.Assert(err, IsNil) |
1973 | - c.Assert(stat.NumChildren(), Equals, int32(1)) |
1974 | - |
1975 | - stat, err = zk.Exists("/non-existent") |
1976 | + stat, err := zk.Exists("/non-existent") |
1977 | c.Assert(err, IsNil) |
1978 | c.Assert(stat, IsNil) |
1979 | + |
1980 | + stat, err = zk.Exists("/zookeeper") |
1981 | + c.Assert(err, IsNil) |
1982 | } |
1983 | |
1984 | func (s *S) TestExistsAndWatch(c *C) { |
1985 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
1986 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
1987 | |
1988 | zk, _ := s.init(c) |
1989 | |
1990 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
1991 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
1992 | |
1993 | stat, watch, err := zk.ExistsW("/test") |
1994 | c.Assert(err, IsNil) |
1995 | c.Assert(stat, IsNil) |
1996 | |
1997 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
1998 | + c.Check(zookeeper.CountPendingWatches(), Equals, 2) |
1999 | |
2000 | select { |
2001 | case <-watch: |
2002 | @@ -403,62 +405,62 @@ |
2003 | default: |
2004 | } |
2005 | |
2006 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2007 | + _, err = zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
2008 | c.Assert(err, IsNil) |
2009 | |
2010 | event := <-watch |
2011 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
2012 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CREATED) |
2013 | c.Assert(event.Path, Equals, "/test") |
2014 | |
2015 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2016 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
2017 | |
2018 | stat, watch, err = zk.ExistsW("/test") |
2019 | c.Assert(err, IsNil) |
2020 | c.Assert(stat, NotNil) |
2021 | c.Assert(stat.NumChildren(), Equals, int32(0)) |
2022 | |
2023 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2024 | + c.Check(zookeeper.CountPendingWatches(), Equals, 2) |
2025 | } |
2026 | |
2027 | func (s *S) TestExistsAndWatchWithError(c *C) { |
2028 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2029 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
2030 | |
2031 | zk, _ := s.init(c) |
2032 | |
2033 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2034 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
2035 | |
2036 | stat, watch, err := zk.ExistsW("///") |
2037 | c.Assert(err, NotNil) |
2038 | - c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS) |
2039 | + c.Assert(err, Equals, zookeeper.ZBADARGUMENTS) |
2040 | c.Assert(stat, IsNil) |
2041 | c.Assert(watch, IsNil) |
2042 | |
2043 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2044 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
2045 | } |
2046 | |
2047 | func (s *S) TestDelete(c *C) { |
2048 | zk, _ := s.init(c) |
2049 | |
2050 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2051 | + _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
2052 | c.Assert(err, IsNil) |
2053 | |
2054 | err = zk.Delete("/test", 5) |
2055 | c.Assert(err, NotNil) |
2056 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
2057 | + c.Assert(err, Equals, zookeeper.ZBADVERSION) |
2058 | |
2059 | err = zk.Delete("/test", -1) |
2060 | c.Assert(err, IsNil) |
2061 | |
2062 | err = zk.Delete("/test", -1) |
2063 | c.Assert(err, NotNil) |
2064 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2065 | + c.Assert(err, Equals, zookeeper.ZNONODE) |
2066 | } |
2067 | |
2068 | func (s *S) TestClientIdAndReInit(c *C) { |
2069 | zk1, _ := s.init(c) |
2070 | clientId1 := zk1.ClientId() |
2071 | |
2072 | - zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1) |
2073 | + zk2, _, err := zookeeper.Redial(s.zkAddr, 5e9, clientId1) |
2074 | c.Assert(err, IsNil) |
2075 | defer zk2.Close() |
2076 | clientId2 := zk2.ClientId() |
2077 | @@ -471,7 +473,7 @@ |
2078 | func (s *S) TestExistsWatchOnDataChange(c *C) { |
2079 | zk, _ := s.init(c) |
2080 | |
2081 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2082 | + _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
2083 | c.Assert(err, IsNil) |
2084 | |
2085 | _, watch, err := zk.ExistsW("/test") |
2086 | @@ -483,24 +485,24 @@ |
2087 | event := <-watch |
2088 | |
2089 | c.Assert(event.Path, Equals, "/test") |
2090 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2091 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CHANGED) |
2092 | } |
2093 | |
2094 | func (s *S) TestACL(c *C) { |
2095 | zk, _ := s.init(c) |
2096 | |
2097 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2098 | + _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
2099 | c.Assert(err, IsNil) |
2100 | |
2101 | acl, stat, err := zk.ACL("/test") |
2102 | c.Assert(err, IsNil) |
2103 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
2104 | + c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
2105 | c.Assert(stat, NotNil) |
2106 | c.Assert(stat.Version(), Equals, int32(0)) |
2107 | |
2108 | acl, stat, err = zk.ACL("/non-existent") |
2109 | c.Assert(err, NotNil) |
2110 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2111 | + c.Assert(err, Equals, zookeeper.ZNONODE) |
2112 | c.Assert(acl, IsNil) |
2113 | c.Assert(stat, IsNil) |
2114 | } |
2115 | @@ -508,32 +510,32 @@ |
2116 | func (s *S) TestSetACL(c *C) { |
2117 | zk, _ := s.init(c) |
2118 | |
2119 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2120 | + _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
2121 | c.Assert(err, IsNil) |
2122 | |
2123 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5) |
2124 | + err = zk.SetACL("/test", zookeeper.WorldACL(zookeeper.PERM_ALL), 5) |
2125 | c.Assert(err, NotNil) |
2126 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
2127 | + c.Assert(err, Equals, zookeeper.ZBADVERSION) |
2128 | |
2129 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1) |
2130 | + err = zk.SetACL("/test", zookeeper.WorldACL(zookeeper.PERM_READ), -1) |
2131 | c.Assert(err, IsNil) |
2132 | |
2133 | acl, _, err := zk.ACL("/test") |
2134 | c.Assert(err, IsNil) |
2135 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
2136 | + c.Assert(acl, Equals, zookeeper.WorldACL(zookeeper.PERM_READ)) |
2137 | } |
2138 | |
2139 | func (s *S) TestAddAuth(c *C) { |
2140 | zk, _ := s.init(c) |
2141 | |
2142 | - acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
2143 | + acl := []zookeeper.ACL{{zookeeper.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
2144 | |
2145 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl) |
2146 | + _, err := zk.Create("/test", "", zookeeper.EPHEMERAL, acl) |
2147 | c.Assert(err, IsNil) |
2148 | |
2149 | _, _, err = zk.Get("/test") |
2150 | c.Assert(err, NotNil) |
2151 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
2152 | + c.Assert(err, Equals, zookeeper.ZNOAUTH) |
2153 | |
2154 | err = zk.AddAuth("digest", "joe:passwd") |
2155 | c.Assert(err, IsNil) |
2156 | @@ -543,36 +545,36 @@ |
2157 | } |
2158 | |
2159 | func (s *S) TestWatchOnReconnection(c *C) { |
2160 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2161 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
2162 | |
2163 | zk, session := s.init(c) |
2164 | |
2165 | event := <-session |
2166 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2167 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2168 | + c.Assert(event.Type, Equals, zookeeper.EVENT_SESSION) |
2169 | + c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED) |
2170 | |
2171 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2172 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
2173 | |
2174 | stat, watch, err := zk.ExistsW("/test") |
2175 | c.Assert(err, IsNil) |
2176 | c.Assert(stat, IsNil) |
2177 | |
2178 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2179 | + c.Check(zookeeper.CountPendingWatches(), Equals, 2) |
2180 | |
2181 | s.StopZK() |
2182 | time.Sleep(2e9) |
2183 | - s.StartZK() |
2184 | + s.StartZK(c) |
2185 | |
2186 | // The session channel should receive the reconnection notification, |
2187 | select { |
2188 | case event := <-session: |
2189 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTING) |
2190 | + c.Assert(event.State, Equals, zookeeper.STATE_CONNECTING) |
2191 | case <-time.After(3e9): |
2192 | c.Fatal("Session watch didn't fire") |
2193 | } |
2194 | select { |
2195 | case event := <-session: |
2196 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2197 | + c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED) |
2198 | case <-time.After(3e9): |
2199 | c.Fatal("Session watch didn't fire") |
2200 | } |
2201 | @@ -585,40 +587,40 @@ |
2202 | } |
2203 | |
2204 | // And it should still work. |
2205 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2206 | + _, err = zk.Create("/test", "", zookeeper.EPHEMERAL, zookeeper.WorldACL(zookeeper.PERM_ALL)) |
2207 | c.Assert(err, IsNil) |
2208 | |
2209 | event = <-watch |
2210 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
2211 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CREATED) |
2212 | c.Assert(event.Path, Equals, "/test") |
2213 | |
2214 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2215 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
2216 | } |
2217 | |
2218 | func (s *S) TestWatchOnSessionExpiration(c *C) { |
2219 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2220 | + c.Check(zookeeper.CountPendingWatches(), Equals, 0) |
2221 | |
2222 | zk, session := s.init(c) |
2223 | |
2224 | event := <-session |
2225 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2226 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2227 | + c.Assert(event.Type, Equals, zookeeper.EVENT_SESSION) |
2228 | + c.Assert(event.State, Equals, zookeeper.STATE_CONNECTED) |
2229 | |
2230 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2231 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
2232 | |
2233 | stat, watch, err := zk.ExistsW("/test") |
2234 | c.Assert(err, IsNil) |
2235 | c.Assert(stat, IsNil) |
2236 | |
2237 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2238 | + c.Check(zookeeper.CountPendingWatches(), Equals, 2) |
2239 | |
2240 | // Use expiration trick described in the FAQ. |
2241 | clientId := zk.ClientId() |
2242 | - zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId) |
2243 | + zk2, session2, err := zookeeper.Redial(s.zkAddr, 5e9, clientId) |
2244 | |
2245 | for event := range session2 { |
2246 | c.Log("Event from overlapping session: ", event) |
2247 | - if event.State == gozk.STATE_CONNECTED { |
2248 | + if event.State == zookeeper.STATE_CONNECTED { |
2249 | // Wait for zk to process the connection. |
2250 | // Not reliable without this. :-( |
2251 | time.Sleep(1e9) |
2252 | @@ -627,21 +629,21 @@ |
2253 | } |
2254 | for event := range session { |
2255 | c.Log("Event from primary session: ", event) |
2256 | - if event.State == gozk.STATE_EXPIRED_SESSION { |
2257 | + if event.State == zookeeper.STATE_EXPIRED_SESSION { |
2258 | break |
2259 | } |
2260 | } |
2261 | |
2262 | select { |
2263 | case event := <-watch: |
2264 | - c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION) |
2265 | + c.Assert(event.State, Equals, zookeeper.STATE_EXPIRED_SESSION) |
2266 | case <-time.After(3e9): |
2267 | c.Fatal("Watch event didn't fire") |
2268 | } |
2269 | |
2270 | event = <-watch |
2271 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
2272 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
2273 | + c.Assert(event.Type, Equals, zookeeper.EVENT_CLOSED) |
2274 | + c.Assert(event.State, Equals, zookeeper.STATE_CLOSED) |
2275 | |
2276 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2277 | + c.Check(zookeeper.CountPendingWatches(), Equals, 1) |
2278 | } |
This is looking fantastic. Thanks Roger.
Some details to sort out:
[1]
=== renamed file 'gozk.go' => 'zookeeper/gozk.go'
Please keep the files at the root of the branch. We'll put it in a
new series in launchpad so that the gozk/zookeeper reference is
correct.
[2]
+// Conn represents a connection to a set of Zookeeper nodes.
The project name is "ZooKeeper". Please use this casing consistently
across the code base.
[3]
+// ClientId represents an established ZooKeeper session. It can be
+// passed into New to reestablish a connection to an existing session.
There's no New function.
[4]
+ ZOK Error = C.ZOK STENCY Error = C.ZRUNTIMEINCON SISTENCY
+ ZSYSTEMERROR Error = C.ZSYSTEMERROR
+ ZRUNTIMEINCONSI
Much better!
[5]
+func (zk *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error {
Very nice changes in RetryChange too, thanks!
[6]
+ path, err := zk.Create("/test-", "bababum", zookeeper. SEQUENCE| zookeeper. EPHEMERAL, zookeeper. WorldACL( zookeeper. PERM_ALL) )
These lines make me unhappy about the length of "zookeeper". I'm thinking we
might well rename it to "zk" in a follow up branch, but please keep it like
this for now. Just for comparison:
path, err := conn.Create( "/test- ", "bababum", zk.SEQUENCE| zk.EPHEMERAL, zk.WorldACL( zk.PERM_ ALL))
[7]
+ c.Assert(stat, Equals, (*zookeeper. Stat)(nil) )
c.Assert(stat, IsNil)
[8]
+// Server sets up a zookeeper server environment inside dataDir
ZooKeeper in this file as well.
[9]
+// for a server that listening on the specified TCP port, sending
s/listening/ listens/
[10]
+// Server does not actually start the server. Instead it returns
+// a command line, suitable for passing to exec.Command,
+// for example.
Let's do a bit better than this. There's no reason to keep the
responsibility of creating the directory, starting, stopping,
cleaning, etc, with the caller. We need this for real, not just
due to testing, but in juju for the local deployment case.
I suggest we define a Server type as:
type Server struct {
runDir, installDir string
cmd *exec.Cmd
}
and:
func zookeeper. NewServer( runDir) *Server
Then, we should have this as well:
func (s *Server) Create(installDir string) os.Error
func (s *Server) Start() os.Error
func (s *Server) Stop() os.Error
func (s *Server) Destroy() os.Error
Please push this in a spearate merge proposal. I'm happy to
merge the current version as is for the moment.
[11]
+var log4jProperties = []byte(`
No need to have two copies of that file in memory at all times. Use a const
and []byte() it when necessary.
[12]
+ classPath, _ := filepath. Glob(filepath. Join(dir, "zookeeper-*.jar")) Glob(filepath. Join(dir, "lib/*.jar"))
+ more, _ := filepath.
Please check the errors.
[13]
+ fd, err := os.Open( zookeeperEnviro n)
That's a file, or an f, not an fd.
[14]
+ line, err := r.ReadSlice('\n')
ReadLine()
[15]
+ // strip quotes
+ if path[0] == '"' {
+ path = path[1 : len(path)-1]
+ }
Please use Trim. This logic assumes it...