Merge lp:~rogpeppe/gozk/update-event-interface into lp:~juju/gozk/trunk
- update-event-interface
- Merge into trunk
Status: | Work in progress |
---|---|
Proposed branch: | lp:~rogpeppe/gozk/update-event-interface |
Merge into: | lp:~juju/gozk/trunk |
Diff against target: |
3594 lines (+1527/-1001) 12 files modified
Makefile (+6/-2) example/example.go (+22/-23) helpers.c (+2/-2) reattach_test.go (+202/-0) retry_test.go (+65/-74) server.go (+239/-0) service/Makefile (+20/-0) service/service.go (+160/-0) status.go (+116/-0) suite_test.go (+56/-138) zk.go (+381/-489) zk_test.go (+258/-273) |
To merge this branch: | bzr merge lp:~rogpeppe/gozk/update-event-interface |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gustavo Niemeyer | Disapprove | ||
Review via email: mp+77560@code.launchpad.net |
Commit message
Description of the change
Updated event interface and factored-out server-starting interface.
Event interface
-----------
The various functions returning event channels are changed
to return channels that more accurately reflect the kinds
of notifications that can happen on those channels.
This makes programming with the wait channels less error-prone
(no need worry about the wrong kind of event arriving
on a wait channel) and more self-documenting.
The Event type is removed:
- the Type field is now redundant, as only one kind of event can arrive through any given channel.
- the Path field was always redundant, as it is always the same as the node that is being watched.
- the value of the State field is now sent to the session watcher.
When the server goes down, watch channels are simply closed.
I can't think of any scenario when a node watcher will do something
differently depending on how the server has failed, and if it wants
to, it can always interrogate whatever is watching the session notifications.
Likely points of controversy:
- the spelling of the new constants. I chose mixed case rather than
all-caps - YMMV.
- i'd like to think of a better spelling for event_CREATED etc,
now that those constants no longer need exporting.
- use of explicit zero rather than having a zero constant defined
for each type. I *think* that it's less confusing and simpler this
way than having NodeClosed (NodeDown), ChildClosed etc.
Can a session ever go down permanently?
Server-starting interface
-------------------
I realised that server.go has increased significantly in size and
that all the new code was logically independent of ZooKeeper itself.
This inspired me to factor out the code into a new package (currently
gozk/zk/service which will potentially enable similar server packages
at little cost. There are a number of other enhancements that can now
be made to this package without increasing the zk package's complexity.
I think the new packaging works well, but it's arguable whether zookeeper
clients should use the service package directly or whether zk should
return *service.Service itself.
One specific advantage of the former (in addition to reusability and
nicer modularity) is that zookeeper clients can query or set aspects
of the zk.Server; the advantage of the latter is that it adds a line of
code and requires clients to know about another package.
YMMV as always.
- 21. By Roger Peppe
-
Rewrite Event comment.
- 22. By Roger Peppe
-
merged ChildStatus into NodeStatus.
- 23. By Roger Peppe
-
gofmt
- 24. By Roger Peppe
-
Fixed comments. Added LOG_NONE.
Roger Peppe (rogpeppe) wrote : | # |
On 29 September 2011 18:56, Gustavo Niemeyer <email address hidden> wrote:
> Review: Disapprove
>
>
> 2692 - event := Event{
> 2693 - Type: int(data.
> 2694 - Path: C.GoString(
> 2695 - State: int(data.
> 2696 - }
>
> The key problem with this proposal is the trashing of information coming
> from the server. It's discarding not only why e.g. a state is being
> trashed, but also details like the path which is related to an event.
> This path isn't just being returned from the path that called the watch.
> It's actually part of the wire protocol, and comes from the server side.
>
> Trashing both the session state error information and the paths like that
> is not a good idea.
there is no information in the paths. they are always the same as the path
of the request. if this ever changes, then i'd fully support adding the path,
but as it is Event.Path gives a misleading impression (for instance that Path
might name a child that has been added).
we are trashing no information from the server. if we are then we
should make sure that we panic immediately because the server is doing
something unexpected. [i've just added the requisite check].
the exact specification, that is, what callbacks may be made for a given
API call, is unclear because explanation seems to be omitted in
the ZooKeeper manual itself. i think we owe it to gozk's users to provide
a better defined interface, as you will need to know the details of
the interface
to use the API, and so people will rely on the undocumented behaviour
anyway.
PTAL. the latest version has only two kind of events - node changed events
and session status events. i really think this interface makes sense.
please reconsider.
Gustavo Niemeyer (niemeyer) wrote : | # |
Let's have a single thread on this topic, please.
As I answered to your email:
>> The point isn't C or Go.. the issue is simply that there's real
>> information coming through the pipe being trashed in the suggested
>> interface.
>
> there really isn't.
The protocol provides three details to the watch. Two of them are
being dropped on the floor in your branch and not reaching the event
listener. If you disagree with this we wont' be able to agree on
anything else around that change.
Unmerged revisions
- 24. By Roger Peppe
-
Fixed comments. Added LOG_NONE.
- 23. By Roger Peppe
-
gofmt
- 22. By Roger Peppe
-
merged ChildStatus into NodeStatus.
- 21. By Roger Peppe
-
Rewrite Event comment.
- 20. By Roger Peppe
-
merged changes to rename branch.
Preview Diff
1 | === modified file 'Makefile' |
2 | --- Makefile 2011-08-03 01:56:58 +0000 |
3 | +++ Makefile 2011-09-30 13:35:26 +0000 |
4 | @@ -2,10 +2,14 @@ |
5 | |
6 | all: package |
7 | |
8 | -TARG=gozk |
9 | +TARG=launchpad.net/gozk/zk |
10 | |
11 | +GOFILES=\ |
12 | + server.go\ |
13 | + status.go\ |
14 | + |
15 | CGOFILES=\ |
16 | - gozk.go\ |
17 | + zk.go\ |
18 | |
19 | CGO_OFILES=\ |
20 | helpers.o\ |
21 | |
22 | === added directory 'example' |
23 | === renamed file 'example.go' => 'example/example.go' |
24 | --- example.go 2011-08-03 01:47:25 +0000 |
25 | +++ example/example.go 2011-09-30 13:35:26 +0000 |
26 | @@ -1,30 +1,29 @@ |
27 | package main |
28 | |
29 | import ( |
30 | - "gozk" |
31 | + "launchpad.net/gozk/zk" |
32 | ) |
33 | |
34 | func main() { |
35 | - zk, session, err := gozk.Init("localhost:2181", 5000) |
36 | - if err != nil { |
37 | - println("Couldn't connect: " + err.String()) |
38 | - return |
39 | - } |
40 | - |
41 | - defer zk.Close() |
42 | - |
43 | - // Wait for connection. |
44 | - event := <-session |
45 | - if event.State != gozk.STATE_CONNECTED { |
46 | - println("Couldn't connect") |
47 | - return |
48 | - } |
49 | - |
50 | - _, err = zk.Create("/counter", "0", 0, gozk.WorldACL(gozk.PERM_ALL)) |
51 | - if err != nil { |
52 | - println(err.String()) |
53 | - } else { |
54 | - println("Created!") |
55 | - } |
56 | + conn, session, err := zk.Dial("localhost:2181", 5e9) |
57 | + if err != nil { |
58 | + println("Couldn't connect: " + err.String()) |
59 | + return |
60 | + } |
61 | + |
62 | + defer conn.Close() |
63 | + |
64 | + // Wait for connection. |
65 | + event := <-session |
66 | + if event != zk.SessionConnected { |
67 | + println("Couldn't connect") |
68 | + return |
69 | + } |
70 | + |
71 | + _, err = conn.Create("/counter", "0", 0, zk.WorldACL(zk.PERM_ALL)) |
72 | + if err != nil { |
73 | + println(err.String()) |
74 | + } else { |
75 | + println("Created!") |
76 | + } |
77 | } |
78 | - |
79 | |
80 | === modified file 'helpers.c' |
81 | --- helpers.c 2011-01-11 12:58:26 +0000 |
82 | +++ helpers.c 2011-09-30 13:35:26 +0000 |
83 | @@ -4,7 +4,6 @@ |
84 | #include <string.h> |
85 | #include "helpers.h" |
86 | |
87 | - |
88 | static pthread_mutex_t watch_mutex = PTHREAD_MUTEX_INITIALIZER; |
89 | static pthread_cond_t watch_available = PTHREAD_COND_INITIALIZER; |
90 | |
91 | @@ -38,8 +37,9 @@ |
92 | pthread_mutex_lock(&watch_mutex); |
93 | { |
94 | watch_data *data = malloc(sizeof(watch_data)); // XXX Check data. |
95 | + |
96 | + data->event_type = event_type; |
97 | data->connection_state = connection_state; |
98 | - data->event_type = event_type; |
99 | data->event_path = strdup(event_path); // XXX Check event_path. |
100 | data->watch_context = watch_context; |
101 | data->next = NULL; |
102 | |
103 | === added file 'reattach_test.go' |
104 | --- reattach_test.go 1970-01-01 00:00:00 +0000 |
105 | +++ reattach_test.go 2011-09-30 13:35:26 +0000 |
106 | @@ -0,0 +1,202 @@ |
107 | +package zk_test |
108 | + |
109 | +import ( |
110 | + "bufio" |
111 | + . "launchpad.net/gocheck" |
112 | + "launchpad.net/gozk/zk/service" |
113 | + "launchpad.net/gozk/zk" |
114 | + "exec" |
115 | + "flag" |
116 | + "fmt" |
117 | + "os" |
118 | + "strings" |
119 | + "testing" |
120 | + "time" |
121 | +) |
122 | + |
123 | +var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing") |
124 | +var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing") |
125 | +var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing") |
126 | + |
127 | +// This is the reentrancy point for testing ZooKeeper servers |
128 | +// started by processes that are not direct children of the |
129 | +// testing process. This test always succeeds - the status |
130 | +// will be written to stdout and read by indirectServer. |
131 | +func TestStartNonChildServer(t *testing.T) { |
132 | + if !*reattach { |
133 | + // not re-entrant, so ignore this test. |
134 | + return |
135 | + } |
136 | + err := startServer(*reattachRunDir, *reattachAbnormalStop) |
137 | + if err != nil { |
138 | + fmt.Printf("error:%v\n", err) |
139 | + return |
140 | + } |
141 | + fmt.Printf("done\n") |
142 | +} |
143 | + |
144 | +func (s *S) startServer(c *C, abort bool) { |
145 | + err := startServer(s.zkTestRoot, abort) |
146 | + c.Assert(err, IsNil) |
147 | +} |
148 | + |
149 | +// startServerIndirect starts a ZooKeeper server that is not |
150 | +// a direct child of the current process. If abort is true, |
151 | +// the server will be started and then terminated abnormally. |
152 | +func (s *S) startServerIndirect(c *C, abort bool) { |
153 | + if len(os.Args) == 0 { |
154 | + c.Fatal("Cannot find self executable name") |
155 | + } |
156 | + cmd := exec.Command( |
157 | + os.Args[0], |
158 | + "-zktest.reattach", |
159 | + "-zktest.rundir", s.zkTestRoot, |
160 | + "-zktest.stop=", fmt.Sprint(abort), |
161 | + "-test.run", "StartNonChildServer", |
162 | + ) |
163 | + r, err := cmd.StdoutPipe() |
164 | + c.Assert(err, IsNil) |
165 | + defer r.Close() |
166 | + if err := cmd.Start(); err != nil { |
167 | + c.Fatalf("cannot start re-entrant gotest process: %v", err) |
168 | + } |
169 | + defer cmd.Wait() |
170 | + bio := bufio.NewReader(r) |
171 | + for { |
172 | + line, err := bio.ReadSlice('\n') |
173 | + if err != nil { |
174 | + c.Fatalf("indirect server status line not found: %v", err) |
175 | + } |
176 | + s := string(line) |
177 | + if strings.HasPrefix(s, "error:") { |
178 | + c.Fatalf("indirect server error: %s", s[len("error:"):]) |
179 | + } |
180 | + if s == "done\n" { |
181 | + return |
182 | + } |
183 | + } |
184 | + panic("not reached") |
185 | +} |
186 | + |
187 | +// startServer starts a ZooKeeper server, and terminates it abnormally |
188 | +// if abort is true. |
189 | +func startServer(runDir string, abort bool) os.Error { |
190 | + s, err := zk.AttachServer(runDir) |
191 | + if err != nil { |
192 | + return fmt.Errorf("cannot attach to server at %q: %v", runDir, err) |
193 | + } |
194 | + srv := service.New(s) |
195 | + if err := srv.Start(); err != nil { |
196 | + return fmt.Errorf("cannot start server: %v", err) |
197 | + } |
198 | + if abort { |
199 | + // Give it time to start up, then kill the server process abnormally, |
200 | + // leaving the pid.txt file behind. |
201 | + time.Sleep(0.5e9) |
202 | + p, err := srv.Process() |
203 | + if err != nil { |
204 | + return fmt.Errorf("cannot get server process: %v", err) |
205 | + } |
206 | + defer p.Release() |
207 | + if err := p.Kill(); err != nil { |
208 | + return fmt.Errorf("cannot kill server process: %v", err) |
209 | + } |
210 | + } |
211 | + return nil |
212 | +} |
213 | + |
214 | +func (s *S) checkCookie(c *C) { |
215 | + conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
216 | + c.Assert(err, IsNil) |
217 | + |
218 | + e, ok := <-watch |
219 | + c.Assert(ok, Equals, true) |
220 | + c.Assert(e.Ok(), Equals, true) |
221 | + |
222 | + c.Assert(err, IsNil) |
223 | + cookie, _, err := conn.Get("/testAttachCookie") |
224 | + c.Assert(err, IsNil) |
225 | + c.Assert(cookie, Equals, "testAttachCookie") |
226 | + conn.Close() |
227 | +} |
228 | + |
229 | +// cases to test: |
230 | +// child server, stopped normally; reattach, start |
231 | +// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start |
232 | +// non-direct child server, still running; reattach, start (->error), stop, start |
233 | +// child server, still running; reattach, start (-> error) |
234 | +// child server, still running; reattach, stop, start. |
235 | +// non-direct child server, still running; reattach, stop, start. |
236 | +func (s *S) TestAttachServer(c *C) { |
237 | + // Create a cookie so that we know we are reattaching to the same instance. |
238 | + conn, _ := s.init(c) |
239 | + _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL)) |
240 | + c.Assert(err, IsNil) |
241 | + s.checkCookie(c) |
242 | + s.zkServer.Stop() |
243 | + s.zkServer = nil |
244 | + |
245 | + s.testAttachServer(c, (*S).startServer) |
246 | + s.testAttachServer(c, (*S).startServerIndirect) |
247 | + s.testAttachServerAbnormalTerminate(c, (*S).startServer) |
248 | + s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect) |
249 | + |
250 | + srv, err := zk.AttachServer(s.zkTestRoot) |
251 | + c.Assert(err, IsNil) |
252 | + |
253 | + s.zkServer = service.New(srv) |
254 | + err = s.zkServer.Start() |
255 | + c.Assert(err, IsNil) |
256 | + |
257 | + conn, _ = s.init(c) |
258 | + err = conn.Delete("/testAttachCookie", -1) |
259 | + c.Assert(err, IsNil) |
260 | +} |
261 | + |
262 | +func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) { |
263 | + start(s, c, false) |
264 | + |
265 | + s.checkCookie(c) |
266 | + |
267 | + // try attaching to it while it is still running - it should fail. |
268 | + a, err := zk.AttachServer(s.zkTestRoot) |
269 | + c.Assert(err, IsNil) |
270 | + srv := service.New(a) |
271 | + |
272 | + err = srv.Start() |
273 | + c.Assert(err, NotNil) |
274 | + |
275 | + // stop it and then start it again - it should succeed. |
276 | + err = srv.Stop() |
277 | + c.Assert(err, IsNil) |
278 | + |
279 | + err = srv.Start() |
280 | + c.Assert(err, IsNil) |
281 | + |
282 | + s.checkCookie(c) |
283 | + |
284 | + err = srv.Stop() |
285 | + c.Assert(err, IsNil) |
286 | +} |
287 | + |
288 | +func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) { |
289 | + start(s, c, true) |
290 | + |
291 | + // try attaching to it and starting - it should fail, because pid.txt |
292 | + // won't have been removed. |
293 | + a, err := zk.AttachServer(s.zkTestRoot) |
294 | + c.Assert(err, IsNil) |
295 | + srv := service.New(a) |
296 | + err = srv.Start() |
297 | + c.Assert(err, NotNil) |
298 | + |
299 | + // stopping it should bring things back to normal. |
300 | + err = srv.Stop() |
301 | + c.Assert(err, IsNil) |
302 | + err = srv.Start() |
303 | + c.Assert(err, IsNil) |
304 | + |
305 | + s.checkCookie(c) |
306 | + err = srv.Stop() |
307 | + c.Assert(err, IsNil) |
308 | +} |
309 | |
310 | === modified file 'retry_test.go' |
311 | --- retry_test.go 2011-08-19 01:51:37 +0000 |
312 | +++ retry_test.go 2011-09-30 13:35:26 +0000 |
313 | @@ -1,42 +1,41 @@ |
314 | -package gozk_test |
315 | +package zk_test |
316 | |
317 | import ( |
318 | . "launchpad.net/gocheck" |
319 | - "gozk" |
320 | + "launchpad.net/gozk/zk" |
321 | "os" |
322 | ) |
323 | |
324 | func (s *S) TestRetryChangeCreating(c *C) { |
325 | - zk, _ := s.init(c) |
326 | + conn, _ := s.init(c) |
327 | |
328 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
329 | - func(data string, stat gozk.Stat) (string, os.Error) { |
330 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
331 | + func(data string, stat *zk.Stat) (string, os.Error) { |
332 | c.Assert(data, Equals, "") |
333 | c.Assert(stat, IsNil) |
334 | return "new", nil |
335 | }) |
336 | c.Assert(err, IsNil) |
337 | |
338 | - data, stat, err := zk.Get("/test") |
339 | + data, stat, err := conn.Get("/test") |
340 | c.Assert(err, IsNil) |
341 | c.Assert(stat, NotNil) |
342 | c.Assert(stat.Version(), Equals, int32(0)) |
343 | c.Assert(data, Equals, "new") |
344 | |
345 | - acl, _, err := zk.ACL("/test") |
346 | + acl, _, err := conn.ACL("/test") |
347 | c.Assert(err, IsNil) |
348 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
349 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
350 | } |
351 | |
352 | func (s *S) TestRetryChangeSetting(c *C) { |
353 | - zk, _ := s.init(c) |
354 | + conn, _ := s.init(c) |
355 | |
356 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
357 | - gozk.WorldACL(gozk.PERM_ALL)) |
358 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
359 | c.Assert(err, IsNil) |
360 | |
361 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
362 | - func(data string, stat gozk.Stat) (string, os.Error) { |
363 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
364 | + func(data string, stat *zk.Stat) (string, os.Error) { |
365 | c.Assert(data, Equals, "old") |
366 | c.Assert(stat, NotNil) |
367 | c.Assert(stat.Version(), Equals, int32(0)) |
368 | @@ -44,27 +43,26 @@ |
369 | }) |
370 | c.Assert(err, IsNil) |
371 | |
372 | - data, stat, err := zk.Get("/test") |
373 | + data, stat, err := conn.Get("/test") |
374 | c.Assert(err, IsNil) |
375 | c.Assert(stat, NotNil) |
376 | c.Assert(stat.Version(), Equals, int32(1)) |
377 | c.Assert(data, Equals, "brand new") |
378 | |
379 | // ACL was unchanged by RetryChange(). |
380 | - acl, _, err := zk.ACL("/test") |
381 | + acl, _, err := conn.ACL("/test") |
382 | c.Assert(err, IsNil) |
383 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
384 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
385 | } |
386 | |
387 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { |
388 | - zk, _ := s.init(c) |
389 | + conn, _ := s.init(c) |
390 | |
391 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
392 | - gozk.WorldACL(gozk.PERM_ALL)) |
393 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
394 | c.Assert(err, IsNil) |
395 | |
396 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, |
397 | - func(data string, stat gozk.Stat) (string, os.Error) { |
398 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, |
399 | + func(data string, stat *zk.Stat) (string, os.Error) { |
400 | c.Assert(data, Equals, "old") |
401 | c.Assert(stat, NotNil) |
402 | c.Assert(stat.Version(), Equals, int32(0)) |
403 | @@ -72,7 +70,7 @@ |
404 | }) |
405 | c.Assert(err, IsNil) |
406 | |
407 | - data, stat, err := zk.Get("/test") |
408 | + data, stat, err := conn.Get("/test") |
409 | c.Assert(err, IsNil) |
410 | c.Assert(stat, NotNil) |
411 | c.Assert(stat.Version(), Equals, int32(0)) // Unchanged! |
412 | @@ -80,14 +78,14 @@ |
413 | } |
414 | |
415 | func (s *S) TestRetryChangeConflictOnCreate(c *C) { |
416 | - zk, _ := s.init(c) |
417 | + conn, _ := s.init(c) |
418 | |
419 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
420 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
421 | switch data { |
422 | case "": |
423 | c.Assert(stat, IsNil) |
424 | - _, err := zk.Create("/test", "conflict", gozk.EPHEMERAL, |
425 | - gozk.WorldACL(gozk.PERM_ALL)) |
426 | + _, err := conn.Create("/test", "conflict", zk.EPHEMERAL, |
427 | + zk.WorldACL(zk.PERM_ALL)) |
428 | c.Assert(err, IsNil) |
429 | return "<none> => conflict", nil |
430 | case "conflict": |
431 | @@ -100,11 +98,10 @@ |
432 | return "can't happen", nil |
433 | } |
434 | |
435 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
436 | - changeFunc) |
437 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc) |
438 | c.Assert(err, IsNil) |
439 | |
440 | - data, stat, err := zk.Get("/test") |
441 | + data, stat, err := conn.Get("/test") |
442 | c.Assert(err, IsNil) |
443 | c.Assert(data, Equals, "conflict => new") |
444 | c.Assert(stat, NotNil) |
445 | @@ -112,18 +109,17 @@ |
446 | } |
447 | |
448 | func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) { |
449 | - zk, _ := s.init(c) |
450 | + conn, _ := s.init(c) |
451 | |
452 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
453 | - gozk.WorldACL(gozk.PERM_ALL)) |
454 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
455 | c.Assert(err, IsNil) |
456 | |
457 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
458 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
459 | switch data { |
460 | case "old": |
461 | c.Assert(stat, NotNil) |
462 | c.Assert(stat.Version(), Equals, int32(0)) |
463 | - _, err := zk.Set("/test", "conflict", 0) |
464 | + _, err := conn.Set("/test", "conflict", 0) |
465 | c.Assert(err, IsNil) |
466 | return "old => new", nil |
467 | case "conflict": |
468 | @@ -136,10 +132,10 @@ |
469 | return "can't happen", nil |
470 | } |
471 | |
472 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, []gozk.ACL{}, changeFunc) |
473 | + err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc) |
474 | c.Assert(err, IsNil) |
475 | |
476 | - data, stat, err := zk.Get("/test") |
477 | + data, stat, err := conn.Get("/test") |
478 | c.Assert(err, IsNil) |
479 | c.Assert(data, Equals, "conflict => new") |
480 | c.Assert(stat, NotNil) |
481 | @@ -147,18 +143,17 @@ |
482 | } |
483 | |
484 | func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) { |
485 | - zk, _ := s.init(c) |
486 | + conn, _ := s.init(c) |
487 | |
488 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
489 | - gozk.WorldACL(gozk.PERM_ALL)) |
490 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
491 | c.Assert(err, IsNil) |
492 | |
493 | - changeFunc := func(data string, stat gozk.Stat) (string, os.Error) { |
494 | + changeFunc := func(data string, stat *zk.Stat) (string, os.Error) { |
495 | switch data { |
496 | case "old": |
497 | c.Assert(stat, NotNil) |
498 | c.Assert(stat.Version(), Equals, int32(0)) |
499 | - err := zk.Delete("/test", 0) |
500 | + err := conn.Delete("/test", 0) |
501 | c.Assert(err, IsNil) |
502 | return "old => <deleted>", nil |
503 | case "": |
504 | @@ -170,55 +165,53 @@ |
505 | return "can't happen", nil |
506 | } |
507 | |
508 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_READ), |
509 | - changeFunc) |
510 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc) |
511 | c.Assert(err, IsNil) |
512 | |
513 | - data, stat, err := zk.Get("/test") |
514 | + data, stat, err := conn.Get("/test") |
515 | c.Assert(err, IsNil) |
516 | c.Assert(data, Equals, "<deleted> => new") |
517 | c.Assert(stat, NotNil) |
518 | c.Assert(stat.Version(), Equals, int32(0)) |
519 | |
520 | // Should be the new ACL. |
521 | - acl, _, err := zk.ACL("/test") |
522 | + acl, _, err := conn.ACL("/test") |
523 | c.Assert(err, IsNil) |
524 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
525 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
526 | } |
527 | |
528 | func (s *S) TestRetryChangeErrorInCallback(c *C) { |
529 | - zk, _ := s.init(c) |
530 | + conn, _ := s.init(c) |
531 | |
532 | - err := zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
533 | - func(data string, stat gozk.Stat) (string, os.Error) { |
534 | + err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
535 | + func(data string, stat *zk.Stat) (string, os.Error) { |
536 | return "don't use this", os.NewError("BOOM!") |
537 | }) |
538 | c.Assert(err, NotNil) |
539 | - c.Assert(err.Code(), Equals, gozk.ZSYSTEMERROR) |
540 | c.Assert(err.String(), Equals, "BOOM!") |
541 | |
542 | - stat, err := zk.Exists("/test") |
543 | + stat, err := conn.Exists("/test") |
544 | c.Assert(err, IsNil) |
545 | c.Assert(stat, IsNil) |
546 | } |
547 | |
548 | func (s *S) TestRetryChangeFailsReading(c *C) { |
549 | - zk, _ := s.init(c) |
550 | + conn, _ := s.init(c) |
551 | |
552 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
553 | - gozk.WorldACL(gozk.PERM_WRITE)) // Write only! |
554 | + // Write only! |
555 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE)) |
556 | c.Assert(err, IsNil) |
557 | |
558 | var called bool |
559 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
560 | - func(data string, stat gozk.Stat) (string, os.Error) { |
561 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
562 | + func(data string, stat *zk.Stat) (string, os.Error) { |
563 | called = true |
564 | return "", nil |
565 | }) |
566 | c.Assert(err, NotNil) |
567 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
568 | + c.Assert(err, Equals, zk.ZNOAUTH) |
569 | |
570 | - stat, err := zk.Exists("/test") |
571 | + stat, err := conn.Exists("/test") |
572 | c.Assert(err, IsNil) |
573 | c.Assert(stat, NotNil) |
574 | c.Assert(stat.Version(), Equals, int32(0)) |
575 | @@ -227,22 +220,21 @@ |
576 | } |
577 | |
578 | func (s *S) TestRetryChangeFailsSetting(c *C) { |
579 | - zk, _ := s.init(c) |
580 | + conn, _ := s.init(c) |
581 | |
582 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
583 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
584 | + // Read only! |
585 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
586 | c.Assert(err, IsNil) |
587 | |
588 | var called bool |
589 | - err = zk.RetryChange("/test", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL), |
590 | - func(data string, stat gozk.Stat) (string, os.Error) { |
591 | + err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
592 | + func(data string, stat *zk.Stat) (string, os.Error) { |
593 | called = true |
594 | return "", nil |
595 | }) |
596 | - c.Assert(err, NotNil) |
597 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
598 | + c.Assert(err, Equals, zk.ZNOAUTH) |
599 | |
600 | - stat, err := zk.Exists("/test") |
601 | + stat, err := conn.Exists("/test") |
602 | c.Assert(err, IsNil) |
603 | c.Assert(stat, NotNil) |
604 | c.Assert(stat.Version(), Equals, int32(0)) |
605 | @@ -251,23 +243,22 @@ |
606 | } |
607 | |
608 | func (s *S) TestRetryChangeFailsCreating(c *C) { |
609 | - zk, _ := s.init(c) |
610 | + conn, _ := s.init(c) |
611 | |
612 | - _, err := zk.Create("/test", "old", gozk.EPHEMERAL, |
613 | - gozk.WorldACL(gozk.PERM_READ)) // Read only! |
614 | + // Read only! |
615 | + _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) |
616 | c.Assert(err, IsNil) |
617 | |
618 | var called bool |
619 | - err = zk.RetryChange("/test/sub", gozk.EPHEMERAL, |
620 | - gozk.WorldACL(gozk.PERM_ALL), |
621 | - func(data string, stat gozk.Stat) (string, os.Error) { |
622 | + err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), |
623 | + func(data string, stat *zk.Stat) (string, os.Error) { |
624 | called = true |
625 | return "", nil |
626 | }) |
627 | c.Assert(err, NotNil) |
628 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
629 | + c.Assert(err, Equals, zk.ZNOAUTH) |
630 | |
631 | - stat, err := zk.Exists("/test/sub") |
632 | + stat, err := conn.Exists("/test/sub") |
633 | c.Assert(err, IsNil) |
634 | c.Assert(stat, IsNil) |
635 | |
636 | |
637 | === added file 'server.go' |
638 | --- server.go 1970-01-01 00:00:00 +0000 |
639 | +++ server.go 2011-09-30 13:35:26 +0000 |
640 | @@ -0,0 +1,239 @@ |
641 | +package zk |
642 | + |
643 | +import ( |
644 | + "bufio" |
645 | + "bytes" |
646 | + "fmt" |
647 | + "io/ioutil" |
648 | + "net" |
649 | + "os" |
650 | + "path/filepath" |
651 | + "strings" |
652 | +) |
653 | + |
654 | +// Server represents a ZooKeeper server, its data and configuration files. |
655 | +type Server struct { |
656 | + runDir string |
657 | + installDir string |
658 | +} |
659 | + |
660 | +func (srv *Server) Directory() string { |
661 | + return srv.runDir |
662 | +} |
663 | + |
664 | +// CreateServer creates the directory runDir and sets up a ZooKeeper server |
665 | +// environment inside it. It is an error if runDir already exists. |
666 | +// The server will listen on the specified TCP port. |
667 | +// |
668 | +// The ZooKeeper installation directory is specified by installDir. |
669 | +// If this is empty, a system default will be used. |
670 | +// |
671 | +// CreateServer does not start the server - the *Server that |
672 | +// is returned can be used with the service package, for example: |
673 | +// see launchpad.net/gozk/zk/service. |
674 | +func CreateServer(port int, runDir, installDir string) (*Server, os.Error) { |
675 | + if err := os.Mkdir(runDir, 0777); err != nil { |
676 | + return nil, err |
677 | + } |
678 | + srv := &Server{runDir: runDir, installDir: installDir} |
679 | + if err := srv.writeLog4JConfig(); err != nil { |
680 | + return nil, err |
681 | + } |
682 | + if err := srv.writeZooKeeperConfig(port); err != nil { |
683 | + return nil, err |
684 | + } |
685 | + if err := srv.writeInstallDir(); err != nil { |
686 | + return nil, err |
687 | + } |
688 | + return srv, nil |
689 | +} |
690 | + |
691 | +// AttachServer creates a new ZooKeeper Server instance |
692 | +// to operate inside an existing run directory, runDir. |
693 | +// The directory must have been created with CreateServer. |
694 | +func AttachServer(runDir string) (*Server, os.Error) { |
695 | + srv := &Server{runDir: runDir} |
696 | + if err := srv.readInstallDir(); err != nil { |
697 | + return nil, fmt.Errorf("cannot read server install directory: %v", err) |
698 | + } |
699 | + return srv, nil |
700 | +} |
701 | + |
702 | +func (srv *Server) path(name string) string { |
703 | + return filepath.Join(srv.runDir, name) |
704 | +} |
705 | + |
706 | +func (srv *Server) CheckAvailability() os.Error { |
707 | + port, err := srv.NetworkPort() |
708 | + if err != nil { |
709 | + return fmt.Errorf("cannot get network port: %v", err) |
710 | + } |
711 | + l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) |
712 | + if err != nil { |
713 | + return fmt.Errorf("cannot listen on port %v: %v", port, err) |
714 | + } |
715 | + l.Close() |
716 | + return nil |
717 | +} |
718 | + |
719 | +// ServerCommand returns the command used to start the |
720 | +// ZooKeeper server. It is provided for debugging and testing |
721 | +// purposes only. |
722 | +func (srv *Server) Command() ([]string, os.Error) { |
723 | + cp, err := srv.classPath() |
724 | + if err != nil { |
725 | + return nil, fmt.Errorf("cannot get class path: %v", err) |
726 | + } |
727 | + return []string{ |
728 | + "java", |
729 | + "-cp", strings.Join(cp, ":"), |
730 | + "-Dzookeeper.root.logger=INFO,CONSOLE", |
731 | + "-Dlog4j.configuration=file:" + srv.path("log4j.properties"), |
732 | + "org.apache.zookeeper.server.quorum.QuorumPeerMain", |
733 | + srv.path("zoo.cfg"), |
734 | + }, nil |
735 | +} |
736 | + |
737 | +var log4jProperties = ` |
738 | +log4j.rootLogger=INFO, CONSOLE |
739 | +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender |
740 | +log4j.appender.CONSOLE.Threshold=INFO |
741 | +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout |
742 | +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n |
743 | +` |
744 | + |
745 | +func (srv *Server) writeLog4JConfig() (err os.Error) { |
746 | + return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666) |
747 | +} |
748 | + |
749 | +func (srv *Server) writeZooKeeperConfig(port int) (err os.Error) { |
750 | + return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf( |
751 | + "tickTime=2000\n"+ |
752 | + "dataDir=%s\n"+ |
753 | + "clientPort=%d\n"+ |
754 | + "maxClientCnxns=500\n", |
755 | + srv.runDir, port)), 0666) |
756 | +} |
757 | + |
758 | +// NetworkPort returns the TCP port number that |
759 | +// the server is configured for. |
760 | +func (srv *Server) NetworkPort() (int, os.Error) { |
761 | + f, err := os.Open(srv.path("zoo.cfg")) |
762 | + if err != nil { |
763 | + return 0, err |
764 | + } |
765 | + r := bufio.NewReader(f) |
766 | + for { |
767 | + line, err := r.ReadSlice('\n') |
768 | + if err != nil { |
769 | + return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg")) |
770 | + } |
771 | + var port int |
772 | + if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 { |
773 | + return port, nil |
774 | + } |
775 | + } |
776 | + panic("not reached") |
777 | +} |
778 | + |
779 | +func (srv *Server) writeInstallDir() os.Error { |
780 | + return ioutil.WriteFile(srv.path("installdir.txt"), []byte(srv.installDir+"\n"), 0666) |
781 | +} |
782 | + |
783 | +func (srv *Server) readInstallDir() os.Error { |
784 | + data, err := ioutil.ReadFile(srv.path("installdir.txt")) |
785 | + if err != nil { |
786 | + return err |
787 | + } |
788 | + if data[len(data)-1] == '\n' { |
789 | + data = data[0 : len(data)-1] |
790 | + } |
791 | + srv.installDir = string(data) |
792 | + return nil |
793 | +} |
794 | + |
795 | +func (srv *Server) classPath() ([]string, os.Error) { |
796 | + dir := srv.installDir |
797 | + if dir == "" { |
798 | + return systemClassPath() |
799 | + } |
800 | + if err := checkDirectory(dir); err != nil { |
801 | + return nil, err |
802 | + } |
803 | + // Two possibilities, as seen in zkEnv.sh: |
804 | + // 1) locally built binaries (jars are in build directory) |
805 | + // 2) release binaries |
806 | + if build := filepath.Join(dir, "build"); checkDirectory(build) == nil { |
807 | + dir = build |
808 | + } |
809 | + classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar")) |
810 | + if err != nil { |
811 | + panic(fmt.Errorf("glob for jar files: %v", err)) |
812 | + } |
813 | + more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar")) |
814 | + if err != nil { |
815 | + panic(fmt.Errorf("glob for lib jar files: %v", err)) |
816 | + } |
817 | + |
818 | + classPath = append(classPath, more...) |
819 | + if len(classPath) == 0 { |
820 | + return nil, fmt.Errorf("zookeeper libraries not found in %q", dir) |
821 | + } |
822 | + return classPath, nil |
823 | +} |
824 | + |
825 | +const zookeeperEnviron = "/etc/zookeeper/conf/environment" |
826 | + |
827 | +func systemClassPath() ([]string, os.Error) { |
828 | + f, err := os.Open(zookeeperEnviron) |
829 | + if f == nil { |
830 | + return nil, err |
831 | + } |
832 | + r := bufio.NewReader(f) |
833 | + for { |
834 | + line, err := r.ReadSlice('\n') |
835 | + if err != nil { |
836 | + break |
837 | + } |
838 | + if !bytes.HasPrefix(line, []byte("CLASSPATH=")) { |
839 | + continue |
840 | + } |
841 | + |
842 | + // remove variable and newline |
843 | + path := string(line[len("CLASSPATH=") : len(line)-1]) |
844 | + |
845 | + // trim white space |
846 | + path = strings.Trim(path, " \t\r") |
847 | + |
848 | + // strip quotes |
849 | + if path[0] == '"' { |
850 | + path = path[1 : len(path)-1] |
851 | + } |
852 | + |
853 | + // split on : |
854 | + classPath := strings.Split(path, ":") |
855 | + |
856 | + // split off $ZOOCFGDIR |
857 | + if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" { |
858 | + classPath = classPath[1:] |
859 | + } |
860 | + |
861 | + if len(classPath) == 0 { |
862 | + return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron) |
863 | + } |
864 | + return classPath, nil |
865 | + } |
866 | + return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron) |
867 | +} |
868 | + |
869 | +// checkDirectory returns an error if the given path |
870 | +// does not exist or is not a directory. |
871 | +func checkDirectory(path string) os.Error { |
872 | + if info, err := os.Stat(path); err != nil || !info.IsDirectory() { |
873 | + if err == nil { |
874 | + err = &os.PathError{Op: "stat", Path: path, Error: os.NewError("is not a directory")} |
875 | + } |
876 | + return err |
877 | + } |
878 | + return nil |
879 | +} |
880 | |
881 | === added directory 'service' |
882 | === added file 'service/Makefile' |
883 | --- service/Makefile 1970-01-01 00:00:00 +0000 |
884 | +++ service/Makefile 2011-09-30 13:35:26 +0000 |
885 | @@ -0,0 +1,20 @@ |
886 | +include $(GOROOT)/src/Make.inc |
887 | + |
888 | +TARG=launchpad.net/gozk/zk/service |
889 | + |
890 | +GOFILES=\ |
891 | + service.go\ |
892 | + |
893 | +GOFMT=gofmt |
894 | +BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go)) |
895 | + |
896 | +gofmt: $(BADFMT) |
897 | + @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done |
898 | + |
899 | +ifneq ($(BADFMT),) |
900 | +ifneq ($(MAKECMDGOALS),gofmt) |
901 | +$(warning WARNING: make gofmt: $(BADFMT)) |
902 | +endif |
903 | +endif |
904 | + |
905 | +include $(GOROOT)/src/Make.pkg |
906 | |
907 | === added file 'service/service.go' |
908 | --- service/service.go 1970-01-01 00:00:00 +0000 |
909 | +++ service/service.go 2011-09-30 13:35:26 +0000 |
910 | @@ -0,0 +1,160 @@ |
911 | +// The service package provides support for long-running services. |
912 | +package service |
913 | + |
914 | +import ( |
915 | + "os" |
916 | + "fmt" |
917 | + "io/ioutil" |
918 | + "strconv" |
919 | + "path/filepath" |
920 | + "exec" |
921 | + "strings" |
922 | + "time" |
923 | +) |
924 | + |
925 | +// Interface represents a single-process server. |
926 | +type Interface interface { |
927 | + // Directory gives the path to the directory containing |
928 | + // the server's configuration and data files. It is assumed that |
929 | + // this exists and can be modified. |
930 | + Directory() string |
931 | + |
932 | + // CheckAvailability should return an error if resources |
933 | + // required by the server are not available; for instance |
934 | + // if the server requires a network port which is not free. |
935 | + CheckAvailability() os.Error |
936 | + |
937 | + // Command should return a command that can be |
938 | + // used to run the server. The first element of the returned |
939 | + // slice is the executable name; the rest of the elements are |
940 | + // its arguments. |
941 | + Command() ([]string, os.Error) |
942 | +} |
943 | + |
944 | +// Service represents a possibly running server process. |
945 | +type Service struct { |
946 | + srv Interface |
947 | +} |
948 | + |
949 | +// New returns a new Service using the given |
950 | +// server interface. |
951 | +func New(srv Interface) *Service { |
952 | + return &Service{srv} |
953 | +} |
954 | + |
955 | +// Process returns a reference to the identity |
956 | +// of the last running server in the Service's directory. |
957 | +// If the server was shut down, it returns (nil, nil). |
958 | +// The server process may not still be running |
959 | +// (for instance if it was terminated abnormally). |
960 | +// This function is provided for debugging and testing |
961 | +// purposes only. |
962 | +func (s *Service) Process() (*os.Process, os.Error) { |
963 | + data, err := ioutil.ReadFile(s.path("pid.txt")) |
964 | + if err != nil { |
965 | + if err, ok := err.(*os.PathError); ok && err.Error == os.ENOENT { |
966 | + return nil, nil |
967 | + } |
968 | + return nil, err |
969 | + } |
970 | + pid, err := strconv.Atoi(string(data)) |
971 | + if err != nil { |
972 | + return nil, os.NewError("bad process id found in pid.txt") |
973 | + } |
974 | + return os.FindProcess(pid) |
975 | +} |
976 | + |
977 | +func (s *Service) path(name string) string { |
978 | + return filepath.Join(s.srv.Directory(), name) |
979 | +} |
980 | + |
981 | +// Start starts the server. It returns an error if the server is already running. |
982 | +// It stores the process ID of the server in the file "pid.txt" |
983 | +// inside the server's directory. Stdout and stderr of the server |
984 | +// are similarly written to "log.txt". |
985 | +func (s *Service) Start() os.Error { |
986 | + if err := s.srv.CheckAvailability(); err != nil { |
987 | + return err |
988 | + } |
989 | + p, err := s.Process() |
990 | + if p != nil || err != nil { |
991 | + if p != nil { |
992 | + p.Release() |
993 | + } |
994 | + return fmt.Errorf("server may already be running (remove %q to clear)", s.path("pid.txt")) |
995 | + } |
996 | + |
997 | + // create the pid file before starting the process so that if we get two |
998 | + // programs trying to concurrently start a server on the same directory |
999 | + // at the same time, only one should succeed. |
1000 | + pidf, err := os.OpenFile(s.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666) |
1001 | + if err != nil { |
1002 | + return fmt.Errorf("cannot create pid.txt: %v", err) |
1003 | + } |
1004 | + defer pidf.Close() |
1005 | + |
1006 | + args, err := s.srv.Command() |
1007 | + if err != nil { |
1008 | + return fmt.Errorf("cannot determine command: %v", err) |
1009 | + } |
1010 | + cmd := exec.Command(args[0], args[1:]...) |
1011 | + |
1012 | + logf, err := os.OpenFile(s.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) |
1013 | + if err != nil { |
1014 | + return fmt.Errorf("cannot create log file: %v", err) |
1015 | + } |
1016 | + defer logf.Close() |
1017 | + cmd.Stdout = logf |
1018 | + cmd.Stderr = logf |
1019 | + if err := cmd.Start(); err != nil { |
1020 | + return fmt.Errorf("cannot start server: %v", err) |
1021 | + } |
1022 | + if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil { |
1023 | + return fmt.Errorf("cannot write pid file: %v", err) |
1024 | + } |
1025 | + return nil |
1026 | +} |
1027 | + |
1028 | +// Stop kills the server. It is a no-op if it is already running. |
1029 | +func (s *Service) Stop() os.Error { |
1030 | + p, err := s.Process() |
1031 | + if p == nil { |
1032 | + if err != nil { |
1033 | + return fmt.Errorf("cannot read process ID of server: %v", err) |
1034 | + } |
1035 | + return nil |
1036 | + } |
1037 | + defer p.Release() |
1038 | + if err := p.Kill(); err != nil && !strings.Contains(err.String(), "no such process") { |
1039 | + return fmt.Errorf("cannot kill server process: %v", err) |
1040 | + } |
1041 | + // ignore the error returned from Wait because there's little |
1042 | + // we can do about it - it either means that the process has just exited |
1043 | + // anyway or that we can't wait for it for some other reason, |
1044 | + // for example because it was originally started by some other process. |
1045 | + _, err = p.Wait(0) |
1046 | + if err != nil && strings.Contains(err.String(), "no child processes") { |
1047 | + // If we can't wait for the server, it's possible that it was running |
1048 | + // but not as a child of this process, so give it a little while |
1049 | + // to exit. TODO poll with kill(no signal)? |
1050 | + time.Sleep(0.5e9) |
1051 | + } |
1052 | + |
1053 | + if err := os.Remove(s.path("pid.txt")); err != nil { |
1054 | + return fmt.Errorf("cannot remove server process ID file: %v", err) |
1055 | + } |
1056 | + return nil |
1057 | +} |
1058 | + |
1059 | +// Destroy stops the server, and then removes its |
1060 | +// directory and all its contents. |
1061 | +// Warning: this will destroy all data associated with the server. |
1062 | +func (s *Service) Destroy() os.Error { |
1063 | + if err := s.Stop(); err != nil { |
1064 | + return err |
1065 | + } |
1066 | + if err := os.RemoveAll(s.srv.Directory()); err != nil { |
1067 | + return err |
1068 | + } |
1069 | + return nil |
1070 | +} |
1071 | |
1072 | === added file 'status.go' |
1073 | --- status.go 1970-01-01 00:00:00 +0000 |
1074 | +++ status.go 2011-09-30 13:35:26 +0000 |
1075 | @@ -0,0 +1,116 @@ |
1076 | +package zk |
1077 | + |
1078 | +import ( |
1079 | + "fmt" |
1080 | +) |
1081 | + |
1082 | +// event types, as defined by libzookeeper |
1083 | +const ( |
1084 | + _ = iota |
1085 | + event_CREATED |
1086 | + event_DELETED |
1087 | + event_CHANGED |
1088 | + event_CHILD |
1089 | + event_SESSION = -1 |
1090 | + event_NOTWATCHING = -2 |
1091 | +) |
1092 | + |
1093 | +// The statusChan interface represents an event channel. |
1094 | +// It translates from libzookeeper events to Go channel sends |
1095 | +// of the appropriate type. |
1096 | +type statusChan interface { |
1097 | + // close closes the wait channel. |
1098 | + close() |
1099 | + |
1100 | + // send sends an event with the given type and connection |
1101 | + // state to the wait channel. It should do a non-blocking |
1102 | + // send and return false if the send would have blocked. |
1103 | + send(eventType, connectionState int, path string) bool |
1104 | +} |
1105 | + |
1106 | +// SessionStatus represents the status of a ZooKeeper session. |
1107 | +type SessionStatus int |
1108 | + |
1109 | +// Constants that represent the status of ZooKeeper |
1110 | +// connection session. |
1111 | +const ( |
1112 | + _ SessionStatus = iota |
1113 | + SessionConnecting |
1114 | + SessionAssociating |
1115 | + SessionConnected |
1116 | + SessionExpired SessionStatus = -112 |
1117 | + SessionAuthFailed SessionStatus = -113 |
1118 | +) |
1119 | + |
1120 | +type sessionStatusChan chan SessionStatus |
1121 | + |
1122 | +func (c sessionStatusChan) close() { |
1123 | + close(c) |
1124 | +} |
1125 | + |
1126 | +func (c sessionStatusChan) send(eventType, eventState int, path string) bool { |
1127 | + if eventType != event_SESSION { |
1128 | + panic(fmt.Errorf("unexpected event, type %v; status %v", eventType, eventState)) |
1129 | + } |
1130 | + if path != "" { |
1131 | + panic(fmt.Errorf("non-empty path received on session event")) |
1132 | + } |
1133 | + select { |
1134 | + case c <- SessionStatus(eventState): |
1135 | + default: |
1136 | + return false |
1137 | + } |
1138 | + return true |
1139 | +} |
1140 | + |
1141 | +// NodeStatus represents the status of a node after ExistsW, |
1142 | +// GetW, or ChildrenW has been called on the node. |
1143 | +type NodeStatus int |
1144 | + |
1145 | +// Constants that represent changes to a ZooKeeper node. |
1146 | +const ( |
1147 | + _ NodeStatus = iota |
1148 | + NodeDeleted // The node has been deleted. |
1149 | + NodeCreated // The node has been created. |
1150 | + NodeChanged // The node data has changed. |
1151 | + NodeChangedChild // The number of children of the node has changed. |
1152 | +) |
1153 | + |
1154 | +type nodeStatusChan struct { |
1155 | + c chan NodeStatus |
1156 | + path string // stored so we can check that the server is behaving. |
1157 | +} |
1158 | + |
1159 | +func newNodeStatusChan(path string) *nodeStatusChan { |
1160 | + return &nodeStatusChan{path: path, c: make(chan NodeStatus, 1)} |
1161 | +} |
1162 | + |
1163 | +func (c *nodeStatusChan) close() { |
1164 | + close(c.c) |
1165 | +} |
1166 | + |
1167 | +func (c *nodeStatusChan) send(eventType, eventState int, path string) bool { |
1168 | + if path != c.path { |
1169 | + panic(fmt.Errorf("unexpected path %q received on node change event (type %v, status %v)", |
1170 | + path, eventType, eventState)) |
1171 | + } |
1172 | + var val NodeStatus |
1173 | + switch eventType { |
1174 | + case event_CREATED: |
1175 | + val = NodeCreated |
1176 | + case event_DELETED: |
1177 | + val = NodeDeleted |
1178 | + case event_CHANGED: |
1179 | + val = NodeChanged |
1180 | + case event_CHILD: |
1181 | + val = NodeChangedChild |
1182 | + default: |
1183 | + panic(fmt.Errorf("unexpected event, type %v; status %v", eventType, eventState)) |
1184 | + } |
1185 | + select { |
1186 | + case c.c <- val: |
1187 | + default: |
1188 | + return false |
1189 | + } |
1190 | + return true |
1191 | +} |
1192 | |
1193 | === modified file 'suite_test.go' |
1194 | --- suite_test.go 2011-08-19 01:43:37 +0000 |
1195 | +++ suite_test.go 2011-09-30 13:35:26 +0000 |
1196 | @@ -1,94 +1,72 @@ |
1197 | -package gozk_test |
1198 | +package zk_test |
1199 | |
1200 | import ( |
1201 | . "launchpad.net/gocheck" |
1202 | - "testing" |
1203 | - "io/ioutil" |
1204 | - "path" |
1205 | "fmt" |
1206 | + "launchpad.net/gozk/zk/service" |
1207 | + "launchpad.net/gozk/zk" |
1208 | "os" |
1209 | - "gozk" |
1210 | + "testing" |
1211 | "time" |
1212 | ) |
1213 | |
1214 | func TestAll(t *testing.T) { |
1215 | - TestingT(t) |
1216 | + if !*reattach { |
1217 | + TestingT(t) |
1218 | + } |
1219 | } |
1220 | |
1221 | var _ = Suite(&S{}) |
1222 | |
1223 | type S struct { |
1224 | - zkRoot string |
1225 | - zkTestRoot string |
1226 | - zkTestPort int |
1227 | - zkServerSh string |
1228 | - zkServerOut *os.File |
1229 | - zkAddr string |
1230 | + zkServer *service.Service |
1231 | + zkTestRoot string |
1232 | + zkAddr string |
1233 | |
1234 | - handles []*gozk.ZooKeeper |
1235 | - events []*gozk.Event |
1236 | + handles []*zk.Conn |
1237 | + events []zk.SessionStatus |
1238 | liveWatches int |
1239 | deadWatches chan bool |
1240 | } |
1241 | |
1242 | -var logLevel = 0 //gozk.LOG_ERROR |
1243 | - |
1244 | - |
1245 | -var testZooCfg = ("dataDir=%s\n" + |
1246 | - "clientPort=%d\n" + |
1247 | - "tickTime=2000\n" + |
1248 | - "initLimit=10\n" + |
1249 | - "syncLimit=5\n" + |
1250 | - "") |
1251 | - |
1252 | -var testLog4jPrp = ("log4j.rootLogger=INFO,CONSOLE\n" + |
1253 | - "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + |
1254 | - "log4j.appender.CONSOLE.Threshold=DEBUG\n" + |
1255 | - "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + |
1256 | - "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" + |
1257 | - "") |
1258 | - |
1259 | -func (s *S) init(c *C) (*gozk.ZooKeeper, chan gozk.Event) { |
1260 | - zk, watch, err := gozk.Init(s.zkAddr, 5e9) |
1261 | +var logLevel = 0 // zk.LOG_ERROR |
1262 | + |
1263 | +func (s *S) init(c *C) (*zk.Conn, <-chan zk.SessionStatus) { |
1264 | + conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
1265 | c.Assert(err, IsNil) |
1266 | |
1267 | - s.handles = append(s.handles, zk) |
1268 | - |
1269 | - event := <-watch |
1270 | - |
1271 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
1272 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
1273 | - |
1274 | - bufferedWatch := make(chan gozk.Event, 256) |
1275 | - bufferedWatch <- event |
1276 | + s.handles = append(s.handles, conn) |
1277 | + |
1278 | + status := <-watch |
1279 | + c.Assert(status == zk.SessionConnected, Equals, true) |
1280 | + |
1281 | + bufferedWatch := make(chan zk.SessionStatus, 256) |
1282 | + bufferedWatch <- status |
1283 | |
1284 | s.liveWatches += 1 |
1285 | go func() { |
1286 | - loop: |
1287 | for { |
1288 | + status, ok := <-watch |
1289 | + if !ok { |
1290 | + break |
1291 | + } |
1292 | select { |
1293 | - case event, ok := <-watch: |
1294 | - if !ok { |
1295 | - close(bufferedWatch) |
1296 | - break loop |
1297 | - } |
1298 | - select { |
1299 | - case bufferedWatch <- event: |
1300 | - default: |
1301 | - panic("Too many events in buffered watch!") |
1302 | - } |
1303 | + case bufferedWatch <- status: |
1304 | + default: |
1305 | + panic("Too many events in buffered watch!") |
1306 | } |
1307 | } |
1308 | + close(bufferedWatch) |
1309 | s.deadWatches <- true |
1310 | }() |
1311 | |
1312 | - return zk, bufferedWatch |
1313 | + return conn, bufferedWatch |
1314 | } |
1315 | |
1316 | func (s *S) SetUpTest(c *C) { |
1317 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
1318 | + c.Assert(zk.CountPendingWatches(), Equals, 0, |
1319 | Bug("Test got a dirty watch state before running!")) |
1320 | - gozk.SetLogLevel(logLevel) |
1321 | + zk.SetLogLevel(logLevel) |
1322 | } |
1323 | |
1324 | func (s *S) TearDownTest(c *C) { |
1325 | @@ -108,98 +86,38 @@ |
1326 | } |
1327 | |
1328 | // Reset the list of handles. |
1329 | - s.handles = make([]*gozk.ZooKeeper, 0) |
1330 | + s.handles = make([]*zk.Conn, 0) |
1331 | |
1332 | - c.Assert(gozk.CountPendingWatches(), Equals, 0, |
1333 | + c.Assert(zk.CountPendingWatches(), Equals, 0, |
1334 | Bug("Test left live watches behind!")) |
1335 | } |
1336 | |
1337 | -// We use the suite set up and tear down to manage a custom zookeeper |
1338 | +// We use the suite set up and tear down to manage a custom ZooKeeper |
1339 | // |
1340 | func (s *S) SetUpSuite(c *C) { |
1341 | - |
1342 | + var err os.Error |
1343 | s.deadWatches = make(chan bool) |
1344 | |
1345 | - var err os.Error |
1346 | - |
1347 | - s.zkRoot = os.Getenv("ZKROOT") |
1348 | - if s.zkRoot == "" { |
1349 | - panic("You must define $ZKROOT ($ZKROOT/bin/zkServer.sh is needed)") |
1350 | - } |
1351 | - |
1352 | - s.zkTestRoot = c.MkDir() |
1353 | - s.zkTestPort = 21812 |
1354 | - |
1355 | - println("ZooKeeper test server directory:", s.zkTestRoot) |
1356 | - println("ZooKeeper test server port:", s.zkTestPort) |
1357 | - |
1358 | - s.zkAddr = fmt.Sprintf("localhost:%d", s.zkTestPort) |
1359 | - |
1360 | - s.zkServerSh = path.Join(s.zkRoot, "bin/zkServer.sh") |
1361 | - s.zkServerOut, err = os.OpenFile(path.Join(s.zkTestRoot, "stdout.txt"), |
1362 | - os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) |
1363 | - if err != nil { |
1364 | - panic("Can't open stdout.txt file for server: " + err.String()) |
1365 | - } |
1366 | - |
1367 | - dataDir := path.Join(s.zkTestRoot, "data") |
1368 | - confDir := path.Join(s.zkTestRoot, "conf") |
1369 | - |
1370 | - os.Mkdir(dataDir, 0755) |
1371 | - os.Mkdir(confDir, 0755) |
1372 | - |
1373 | - err = os.Setenv("ZOOCFGDIR", confDir) |
1374 | - if err != nil { |
1375 | - panic("Can't set $ZOOCFGDIR: " + err.String()) |
1376 | - } |
1377 | - |
1378 | - zooCfg := []byte(fmt.Sprintf(testZooCfg, dataDir, s.zkTestPort)) |
1379 | - err = ioutil.WriteFile(path.Join(confDir, "zoo.cfg"), zooCfg, 0644) |
1380 | - if err != nil { |
1381 | - panic("Can't write zoo.cfg: " + err.String()) |
1382 | - } |
1383 | - |
1384 | - log4jPrp := []byte(testLog4jPrp) |
1385 | - err = ioutil.WriteFile(path.Join(confDir, "log4j.properties"), log4jPrp, 0644) |
1386 | - if err != nil { |
1387 | - panic("Can't write log4j.properties: " + err.String()) |
1388 | - } |
1389 | - |
1390 | - s.StartZK() |
1391 | + // N.B. We meed to create a subdirectory because zk.CreateServer |
1392 | + // insists on creating its own directory. |
1393 | + s.zkTestRoot = c.MkDir() + "/zk" |
1394 | + |
1395 | + port := 21812 |
1396 | + s.zkAddr = fmt.Sprint("localhost:", port) |
1397 | + |
1398 | + srv, err := zk.CreateServer(port, s.zkTestRoot, "") |
1399 | + if err != nil { |
1400 | + c.Fatal("Cannot set up server environment: ", err) |
1401 | + } |
1402 | + s.zkServer = service.New(srv) |
1403 | + err = s.zkServer.Start() |
1404 | + if err != nil { |
1405 | + c.Fatal("Cannot start ZooKeeper server: ", err) |
1406 | + } |
1407 | } |
1408 | |
1409 | func (s *S) TearDownSuite(c *C) { |
1410 | - s.StopZK() |
1411 | - s.zkServerOut.Close() |
1412 | -} |
1413 | - |
1414 | -func (s *S) StartZK() { |
1415 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
1416 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "start"}, &attr) |
1417 | - if err != nil { |
1418 | - panic("Problem executing zkServer.sh start: " + err.String()) |
1419 | - } |
1420 | - |
1421 | - result, err := proc.Wait(0) |
1422 | - if err != nil { |
1423 | - panic(err.String()) |
1424 | - } else if result.ExitStatus() != 0 { |
1425 | - panic("'zkServer.sh start' exited with non-zero status") |
1426 | - } |
1427 | -} |
1428 | - |
1429 | -func (s *S) StopZK() { |
1430 | - attr := os.ProcAttr{Files: []*os.File{os.Stdin, s.zkServerOut, os.Stderr}} |
1431 | - proc, err := os.StartProcess(s.zkServerSh, []string{s.zkServerSh, "stop"}, &attr) |
1432 | - if err != nil { |
1433 | - panic("Problem executing zkServer.sh stop: " + err.String() + |
1434 | - " (look for runaway java processes!)") |
1435 | - } |
1436 | - result, err := proc.Wait(0) |
1437 | - if err != nil { |
1438 | - panic(err.String()) |
1439 | - } else if result.ExitStatus() != 0 { |
1440 | - panic("'zkServer.sh stop' exited with non-zero status " + |
1441 | - "(look for runaway java processes!)") |
1442 | + if s.zkServer != nil { |
1443 | + s.zkServer.Stop() // TODO Destroy |
1444 | } |
1445 | } |
1446 | |
1447 | === renamed file 'gozk.go' => 'zk.go' |
1448 | --- gozk.go 2011-08-19 01:56:39 +0000 |
1449 | +++ zk.go 2011-09-30 13:35:26 +0000 |
1450 | @@ -1,12 +1,26 @@ |
1451 | -// gozk - Zookeeper support for the Go language |
1452 | +// gozk - ZooKeeper support for the Go language |
1453 | // |
1454 | // https://wiki.ubuntu.com/gozk |
1455 | // |
1456 | // Copyright (c) 2010-2011 Canonical Ltd. |
1457 | -// |
1458 | // Written by Gustavo Niemeyer <gustavo.niemeyer@canonical.com> |
1459 | // |
1460 | -package gozk |
1461 | +// A note about channels as used for change notification in gozk. |
1462 | +// |
1463 | +// Event channels are used to provide notifications of changes to ZooKeeper |
1464 | +// nodes, from GetW, ExistsW, etc. These are invariably one-shot - a single |
1465 | +// value will be sent on the wait channel to signify the change and then the |
1466 | +// channel will be closed. If the ZooKeeper server goes down when awaiting |
1467 | +// a change, the channel will be closed without sending a value. Clients |
1468 | +// can either check explicitly for the closed channel with: |
1469 | +// |
1470 | +// if v, ok := <-wait; !ok { |
1471 | +// } |
1472 | +// |
1473 | +// or just test the received value against zero, which |
1474 | +// signifies no value for each of the notification types. |
1475 | +// |
1476 | +package zk |
1477 | |
1478 | /* |
1479 | #cgo CFLAGS: -I/usr/include/c-client-src |
1480 | @@ -27,17 +41,16 @@ |
1481 | // ----------------------------------------------------------------------- |
1482 | // Main constants and data types. |
1483 | |
1484 | -// The main ZooKeeper object, created through the Init function. |
1485 | -// Encapsulates all communication with ZooKeeper. |
1486 | -type ZooKeeper struct { |
1487 | - watchChannels map[uintptr]chan Event |
1488 | +// Conn represents a connection to a set of ZooKeeper nodes. |
1489 | +type Conn struct { |
1490 | + watchChannels map[uintptr]statusChan |
1491 | sessionWatchId uintptr |
1492 | handle *C.zhandle_t |
1493 | mutex sync.Mutex |
1494 | } |
1495 | |
1496 | -// ClientId represents the established session in ZooKeeper. This is only |
1497 | -// useful to be passed back into the ReInit function. |
1498 | +// ClientId represents an established ZooKeeper session. It can be |
1499 | +// passed into Redial to reestablish a connection to an existing session. |
1500 | type ClientId struct { |
1501 | cId C.clientid_t |
1502 | } |
1503 | @@ -51,84 +64,63 @@ |
1504 | Id string |
1505 | } |
1506 | |
1507 | -// Event channels are used to provide notifications of changes in the |
1508 | -// ZooKeeper connection state and in specific node aspects. |
1509 | -// |
1510 | -// There are two sources of events: the session channel obtained during |
1511 | -// initialization with Init, and any watch channels obtained |
1512 | -// through one of the W-suffixed functions (GetW, ExistsW, etc). |
1513 | -// |
1514 | -// The session channel will only receive session-level events notifying |
1515 | -// about critical and transient changes in the ZooKeeper connection |
1516 | -// state (STATE_CONNECTED, STATE_EXPIRED_SESSION, etc). On long |
1517 | -// running applications the session channel must *necessarily* be |
1518 | -// observed since certain events like session expirations require an |
1519 | -// explicit reconnection and reestablishment of state (or bailing out). |
1520 | -// Because of that, the buffer used on the session channel has a limited |
1521 | -// size, and a panic will occur if too many events are not collected. |
1522 | -// |
1523 | -// Watch channels enable monitoring state for nodes, and the |
1524 | -// moment they're fired depends on which function was called to |
1525 | -// create them. Note that, unlike in other ZooKeeper interfaces, |
1526 | -// gozk will NOT dispatch unimportant session events such as |
1527 | -// STATE_ASSOCIATING, STATE_CONNECTING and STATE_CONNECTED to |
1528 | -// watch Event channels, since they are transient and disruptive |
1529 | -// to the workflow. Critical state changes such as expirations |
1530 | -// are still delivered to all event channels, though, and the |
1531 | -// transient events may be obsererved in the session channel. |
1532 | -// |
1533 | -// Since every watch channel may receive critical session events, events |
1534 | -// received must not be handled blindly as if the watch requested has |
1535 | -// been fired. To facilitate such tests, Events offer the Ok method, |
1536 | -// and they also have a good String method so they may be used as an |
1537 | -// os.Error value if wanted. E.g.: |
1538 | -// |
1539 | -// event := <-watch |
1540 | -// if !event.Ok() { |
1541 | -// err = event |
1542 | -// return |
1543 | -// } |
1544 | -// |
1545 | -// Note that closed channels will deliver zeroed Event, which means |
1546 | -// event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED, |
1547 | -// to facilitate handling. |
1548 | -type Event struct { |
1549 | - Type int |
1550 | - Path string |
1551 | - State int |
1552 | -} |
1553 | +// Error represents a ZooKeeper error. |
1554 | +type Error int |
1555 | |
1556 | -// Error codes that may be used to verify the result of the |
1557 | -// Code method from Error. |
1558 | const ( |
1559 | - ZOK = C.ZOK |
1560 | - ZSYSTEMERROR = C.ZSYSTEMERROR |
1561 | - ZRUNTIMEINCONSISTENCY = C.ZRUNTIMEINCONSISTENCY |
1562 | - ZDATAINCONSISTENCY = C.ZDATAINCONSISTENCY |
1563 | - ZCONNECTIONLOSS = C.ZCONNECTIONLOSS |
1564 | - ZMARSHALLINGERROR = C.ZMARSHALLINGERROR |
1565 | - ZUNIMPLEMENTED = C.ZUNIMPLEMENTED |
1566 | - ZOPERATIONTIMEOUT = C.ZOPERATIONTIMEOUT |
1567 | - ZBADARGUMENTS = C.ZBADARGUMENTS |
1568 | - ZINVALIDSTATE = C.ZINVALIDSTATE |
1569 | - ZAPIERROR = C.ZAPIERROR |
1570 | - ZNONODE = C.ZNONODE |
1571 | - ZNOAUTH = C.ZNOAUTH |
1572 | - ZBADVERSION = C.ZBADVERSION |
1573 | - ZNOCHILDRENFOREPHEMERALS = C.ZNOCHILDRENFOREPHEMERALS |
1574 | - ZNODEEXISTS = C.ZNODEEXISTS |
1575 | - ZNOTEMPTY = C.ZNOTEMPTY |
1576 | - ZSESSIONEXPIRED = C.ZSESSIONEXPIRED |
1577 | - ZINVALIDCALLBACK = C.ZINVALIDCALLBACK |
1578 | - ZINVALIDACL = C.ZINVALIDACL |
1579 | - ZAUTHFAILED = C.ZAUTHFAILED |
1580 | - ZCLOSING = C.ZCLOSING |
1581 | - ZNOTHING = C.ZNOTHING |
1582 | - ZSESSIONMOVED = C.ZSESSIONMOVED |
1583 | + ZOK Error = C.ZOK |
1584 | + ZSYSTEMERROR Error = C.ZSYSTEMERROR |
1585 | + ZRUNTIMEINCONSISTENCY Error = C.ZRUNTIMEINCONSISTENCY |
1586 | + ZDATAINCONSISTENCY Error = C.ZDATAINCONSISTENCY |
1587 | + ZCONNECTIONLOSS Error = C.ZCONNECTIONLOSS |
1588 | + ZMARSHALLINGERROR Error = C.ZMARSHALLINGERROR |
1589 | + ZUNIMPLEMENTED Error = C.ZUNIMPLEMENTED |
1590 | + ZOPERATIONTIMEOUT Error = C.ZOPERATIONTIMEOUT |
1591 | + ZBADARGUMENTS Error = C.ZBADARGUMENTS |
1592 | + ZINVALIDSTATE Error = C.ZINVALIDSTATE |
1593 | + ZAPIERROR Error = C.ZAPIERROR |
1594 | + ZNONODE Error = C.ZNONODE |
1595 | + ZNOAUTH Error = C.ZNOAUTH |
1596 | + ZBADVERSION Error = C.ZBADVERSION |
1597 | + ZNOCHILDRENFOREPHEMERALS Error = C.ZNOCHILDRENFOREPHEMERALS |
1598 | + ZNODEEXISTS Error = C.ZNODEEXISTS |
1599 | + ZNOTEMPTY Error = C.ZNOTEMPTY |
1600 | + ZSESSIONEXPIRED Error = C.ZSESSIONEXPIRED |
1601 | + ZINVALIDCALLBACK Error = C.ZINVALIDCALLBACK |
1602 | + ZINVALIDACL Error = C.ZINVALIDACL |
1603 | + ZAUTHFAILED Error = C.ZAUTHFAILED |
1604 | + ZCLOSING Error = C.ZCLOSING |
1605 | + ZNOTHING Error = C.ZNOTHING |
1606 | + ZSESSIONMOVED Error = C.ZSESSIONMOVED |
1607 | ) |
1608 | |
1609 | +func (error Error) String() string { |
1610 | + return C.GoString(C.zerror(C.int(error))) // Static, no need to free it. |
1611 | +} |
1612 | + |
1613 | +// zkError creates an appropriate error return from |
1614 | +// a ZooKeeper status and the errno return from a C API |
1615 | +// call. |
1616 | +func zkError(rc C.int, cerr os.Error) os.Error { |
1617 | + code := Error(rc) |
1618 | + switch code { |
1619 | + case ZOK: |
1620 | + return nil |
1621 | + |
1622 | + case ZSYSTEMERROR: |
1623 | + // If a ZooKeeper call returns ZSYSTEMERROR, then |
1624 | + // errno becomes significant. If errno has not been |
1625 | + // set, then we will return ZSYSTEMERROR nonetheless. |
1626 | + if cerr != nil { |
1627 | + return cerr |
1628 | + } |
1629 | + } |
1630 | + return code |
1631 | +} |
1632 | + |
1633 | // Constants for SetLogLevel. |
1634 | const ( |
1635 | + LOG_NONE = 0, |
1636 | LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR |
1637 | LOG_WARN = C.ZOO_LOG_LEVEL_WARN |
1638 | LOG_INFO = C.ZOO_LOG_LEVEL_INFO |
1639 | @@ -155,33 +147,6 @@ |
1640 | PERM_ALL = 0x1f |
1641 | ) |
1642 | |
1643 | -// Constants for Event Type. |
1644 | -const ( |
1645 | - EVENT_CREATED = iota + 1 |
1646 | - EVENT_DELETED |
1647 | - EVENT_CHANGED |
1648 | - EVENT_CHILD |
1649 | - EVENT_SESSION = -1 |
1650 | - EVENT_NOTWATCHING = -2 |
1651 | - |
1652 | - // Doesn't really exist in zk, but handy for use in zeroed Event |
1653 | - // values (e.g. closed channels). |
1654 | - EVENT_CLOSED = 0 |
1655 | -) |
1656 | - |
1657 | -// Constants for Event State. |
1658 | -const ( |
1659 | - STATE_EXPIRED_SESSION = -112 |
1660 | - STATE_AUTH_FAILED = -113 |
1661 | - STATE_CONNECTING = 1 |
1662 | - STATE_ASSOCIATING = 2 |
1663 | - STATE_CONNECTED = 3 |
1664 | - |
1665 | - // Doesn't really exist in zk, but handy for use in zeroed Event |
1666 | - // values (e.g. closed channels). |
1667 | - STATE_CLOSED = 0 |
1668 | -) |
1669 | - |
1670 | func init() { |
1671 | if EPHEMERAL != C.ZOO_EPHEMERAL || |
1672 | SEQUENCE != C.ZOO_SEQUENCE || |
1673 | @@ -191,17 +156,17 @@ |
1674 | PERM_DELETE != C.ZOO_PERM_DELETE || |
1675 | PERM_ADMIN != C.ZOO_PERM_ADMIN || |
1676 | PERM_ALL != C.ZOO_PERM_ALL || |
1677 | - EVENT_CREATED != C.ZOO_CREATED_EVENT || |
1678 | - EVENT_DELETED != C.ZOO_DELETED_EVENT || |
1679 | - EVENT_CHANGED != C.ZOO_CHANGED_EVENT || |
1680 | - EVENT_CHILD != C.ZOO_CHILD_EVENT || |
1681 | - EVENT_SESSION != C.ZOO_SESSION_EVENT || |
1682 | - EVENT_NOTWATCHING != C.ZOO_NOTWATCHING_EVENT || |
1683 | - STATE_EXPIRED_SESSION != C.ZOO_EXPIRED_SESSION_STATE || |
1684 | - STATE_AUTH_FAILED != C.ZOO_AUTH_FAILED_STATE || |
1685 | - STATE_CONNECTING != C.ZOO_CONNECTING_STATE || |
1686 | - STATE_ASSOCIATING != C.ZOO_ASSOCIATING_STATE || |
1687 | - STATE_CONNECTED != C.ZOO_CONNECTED_STATE { |
1688 | + event_CREATED != C.ZOO_CREATED_EVENT || |
1689 | + event_DELETED != C.ZOO_DELETED_EVENT || |
1690 | + event_CHANGED != C.ZOO_CHANGED_EVENT || |
1691 | + event_CHILD != C.ZOO_CHILD_EVENT || |
1692 | + event_SESSION != C.ZOO_SESSION_EVENT || |
1693 | + event_NOTWATCHING != C.ZOO_NOTWATCHING_EVENT || |
1694 | + SessionExpired != SessionStatus(C.ZOO_EXPIRED_SESSION_STATE) || |
1695 | + SessionAuthFailed != SessionStatus(C.ZOO_AUTH_FAILED_STATE) || |
1696 | + SessionConnecting != SessionStatus(C.ZOO_CONNECTING_STATE) || |
1697 | + SessionAssociating != SessionStatus(C.ZOO_ASSOCIATING_STATE) || |
1698 | + SessionConnected != SessionStatus(C.ZOO_CONNECTED_STATE) { |
1699 | |
1700 | panic("OOPS: Constants don't match C counterparts") |
1701 | } |
1702 | @@ -222,155 +187,97 @@ |
1703 | } |
1704 | |
1705 | // ----------------------------------------------------------------------- |
1706 | -// Event methods. |
1707 | +// Session status methods. |
1708 | |
1709 | // Ok returns true in case the event reports zk as being in a usable state. |
1710 | -func (e Event) Ok() bool { |
1711 | +func (status SessionStatus) Ok() bool { |
1712 | // That's really it for now. Anything else seems to mean zk |
1713 | // can't be used at the moment. |
1714 | - return e.State == STATE_CONNECTED |
1715 | + return status == SessionConnected |
1716 | } |
1717 | |
1718 | -func (e Event) String() (s string) { |
1719 | - switch e.State { |
1720 | - case STATE_EXPIRED_SESSION: |
1721 | +func (status SessionStatus) String() (s string) { |
1722 | + switch status { |
1723 | + case 0: |
1724 | + s = "ZooKeeper connection closed" |
1725 | + case SessionExpired: |
1726 | s = "ZooKeeper session expired" |
1727 | - case STATE_AUTH_FAILED: |
1728 | + case SessionAuthFailed: |
1729 | s = "ZooKeeper authentication failed" |
1730 | - case STATE_CONNECTING: |
1731 | + case SessionConnecting: |
1732 | s = "ZooKeeper connecting" |
1733 | - case STATE_ASSOCIATING: |
1734 | + case SessionAssociating: |
1735 | s = "ZooKeeper still associating" |
1736 | - case STATE_CONNECTED: |
1737 | + case SessionConnected: |
1738 | s = "ZooKeeper connected" |
1739 | - case STATE_CLOSED: |
1740 | - s = "ZooKeeper connection closed" |
1741 | default: |
1742 | - s = fmt.Sprintf("unknown ZooKeeper state %d", e.State) |
1743 | - } |
1744 | - if e.Type == -1 || e.Type == EVENT_SESSION { |
1745 | - return |
1746 | - } |
1747 | - if s != "" { |
1748 | - s += "; " |
1749 | - } |
1750 | - switch e.Type { |
1751 | - case EVENT_CREATED: |
1752 | - s += "path created: " |
1753 | - case EVENT_DELETED: |
1754 | - s += "path deleted: " |
1755 | - case EVENT_CHANGED: |
1756 | - s += "path changed: " |
1757 | - case EVENT_CHILD: |
1758 | - s += "path children changed: " |
1759 | - case EVENT_NOTWATCHING: |
1760 | - s += "not watching: " // !? |
1761 | - case EVENT_SESSION: |
1762 | - // nothing |
1763 | - } |
1764 | - s += e.Path |
1765 | - return |
1766 | -} |
1767 | - |
1768 | -// ----------------------------------------------------------------------- |
1769 | -// Error interface which maps onto the ZooKeeper error codes. |
1770 | - |
1771 | -type Error interface { |
1772 | - String() string |
1773 | - Code() int |
1774 | -} |
1775 | - |
1776 | -type errorType struct { |
1777 | - zkrc C.int |
1778 | - err os.Error |
1779 | -} |
1780 | - |
1781 | -func newError(zkrc C.int, err os.Error) Error { |
1782 | - return &errorType{zkrc, err} |
1783 | -} |
1784 | - |
1785 | -func (error *errorType) String() (result string) { |
1786 | - if error.zkrc == ZSYSTEMERROR && error.err != nil { |
1787 | - result = error.err.String() |
1788 | - } else { |
1789 | - result = C.GoString(C.zerror(error.zkrc)) // Static, no need to free it. |
1790 | - } |
1791 | - return |
1792 | -} |
1793 | - |
1794 | -// Code returns the error code that may be compared against one of |
1795 | -// the gozk.Z* constants. |
1796 | -func (error *errorType) Code() int { |
1797 | - return int(error.zkrc) |
1798 | -} |
1799 | - |
1800 | -// ----------------------------------------------------------------------- |
1801 | -// Stat interface which maps onto the ZooKeeper Stat struct. |
1802 | - |
1803 | -// We declare this as an interface rather than an actual struct because |
1804 | -// this way we don't have to copy data around between the real C struct |
1805 | -// and the Go one on every call. Most uses will only touch a few elements, |
1806 | -// or even ignore the stat entirely, so that's a win. |
1807 | + s = fmt.Sprintf("unknown ZooKeeper state %d", status) |
1808 | + } |
1809 | + return s |
1810 | +} |
1811 | + |
1812 | +// ----------------------------------------------------------------------- |
1813 | |
1814 | // Stat contains detailed information about a node. |
1815 | -type Stat interface { |
1816 | - Czxid() int64 |
1817 | - Mzxid() int64 |
1818 | - CTime() int64 |
1819 | - MTime() int64 |
1820 | - Version() int32 |
1821 | - CVersion() int32 |
1822 | - AVersion() int32 |
1823 | - EphemeralOwner() int64 |
1824 | - DataLength() int32 |
1825 | - NumChildren() int32 |
1826 | - Pzxid() int64 |
1827 | -} |
1828 | - |
1829 | -type resultStat C.struct_Stat |
1830 | - |
1831 | -func (stat *resultStat) Czxid() int64 { |
1832 | - return int64(stat.czxid) |
1833 | -} |
1834 | - |
1835 | -func (stat *resultStat) Mzxid() int64 { |
1836 | - return int64(stat.mzxid) |
1837 | -} |
1838 | - |
1839 | -func (stat *resultStat) CTime() int64 { |
1840 | - return int64(stat.ctime) |
1841 | -} |
1842 | - |
1843 | -func (stat *resultStat) MTime() int64 { |
1844 | - return int64(stat.mtime) |
1845 | -} |
1846 | - |
1847 | -func (stat *resultStat) Version() int32 { |
1848 | - return int32(stat.version) |
1849 | -} |
1850 | - |
1851 | -func (stat *resultStat) CVersion() int32 { |
1852 | - return int32(stat.cversion) |
1853 | -} |
1854 | - |
1855 | -func (stat *resultStat) AVersion() int32 { |
1856 | - return int32(stat.aversion) |
1857 | -} |
1858 | - |
1859 | -func (stat *resultStat) EphemeralOwner() int64 { |
1860 | - return int64(stat.ephemeralOwner) |
1861 | -} |
1862 | - |
1863 | -func (stat *resultStat) DataLength() int32 { |
1864 | - return int32(stat.dataLength) |
1865 | -} |
1866 | - |
1867 | -func (stat *resultStat) NumChildren() int32 { |
1868 | - return int32(stat.numChildren) |
1869 | -} |
1870 | - |
1871 | -func (stat *resultStat) Pzxid() int64 { |
1872 | - return int64(stat.pzxid) |
1873 | +type Stat struct { |
1874 | + c C.struct_Stat |
1875 | +} |
1876 | + |
1877 | +func (stat *Stat) String() string { |
1878 | + return fmt.Sprintf( |
1879 | + "{Czxid: %d; Mzxid: %d; CTime: %d; MTime: %d; "+ |
1880 | + "Version: %d; CVersion: %d; AVersion: %d; "+ |
1881 | + "EphemeralOwner: %d; DataLength: %d; "+ |
1882 | + "NumChildren: %d; Pzxid: %d}", |
1883 | + stat.Czxid(), stat.Mzxid(), stat.CTime(), stat.MTime(), |
1884 | + stat.Version(), stat.CVersion(), stat.AVersion(), |
1885 | + stat.EphemeralOwner(), stat.DataLength(), |
1886 | + stat.NumChildren(), stat.Pzxid(), |
1887 | + ) |
1888 | +} |
1889 | + |
1890 | +func (stat *Stat) Czxid() int64 { |
1891 | + return int64(stat.c.czxid) |
1892 | +} |
1893 | + |
1894 | +func (stat *Stat) Mzxid() int64 { |
1895 | + return int64(stat.c.mzxid) |
1896 | +} |
1897 | + |
1898 | +func (stat *Stat) CTime() int64 { |
1899 | + return int64(stat.c.ctime) |
1900 | +} |
1901 | + |
1902 | +func (stat *Stat) MTime() int64 { |
1903 | + return int64(stat.c.mtime) |
1904 | +} |
1905 | + |
1906 | +func (stat *Stat) Version() int32 { |
1907 | + return int32(stat.c.version) |
1908 | +} |
1909 | + |
1910 | +func (stat *Stat) CVersion() int32 { |
1911 | + return int32(stat.c.cversion) |
1912 | +} |
1913 | + |
1914 | +func (stat *Stat) AVersion() int32 { |
1915 | + return int32(stat.c.aversion) |
1916 | +} |
1917 | + |
1918 | +func (stat *Stat) EphemeralOwner() int64 { |
1919 | + return int64(stat.c.ephemeralOwner) |
1920 | +} |
1921 | + |
1922 | +func (stat *Stat) DataLength() int32 { |
1923 | + return int32(stat.c.dataLength) |
1924 | +} |
1925 | + |
1926 | +func (stat *Stat) NumChildren() int32 { |
1927 | + return int32(stat.c.numChildren) |
1928 | +} |
1929 | + |
1930 | +func (stat *Stat) Pzxid() int64 { |
1931 | + return int64(stat.c.pzxid) |
1932 | } |
1933 | |
1934 | // ----------------------------------------------------------------------- |
1935 | @@ -380,11 +287,12 @@ |
1936 | |
1937 | // SetLogLevel changes the minimum level of logging output generated |
1938 | // to adjust the amount of information provided. |
1939 | +// The default logging level is LOG_ERROR. |
1940 | func SetLogLevel(level int) { |
1941 | C.zoo_set_debug_level(C.ZooLogLevel(level)) |
1942 | } |
1943 | |
1944 | -// Init initializes the communication with a ZooKeeper cluster. The provided |
1945 | +// Dial initializes the communication with a ZooKeeper cluster. The provided |
1946 | // servers parameter may include multiple server addresses, separated |
1947 | // by commas, so that the client will automatically attempt to connect |
1948 | // to another server if one of them stops working for whatever reason. |
1949 | @@ -395,82 +303,80 @@ |
1950 | // |
1951 | // Session establishment is asynchronous, meaning that this function |
1952 | // will return before the communication with ZooKeeper is fully established. |
1953 | -// The watch channel receives events of type SESSION_EVENT when any change |
1954 | -// to the state of the established connection happens. See the documentation |
1955 | -// for the Event type for more details. |
1956 | -func Init(servers string, recvTimeoutNS int64) (zk *ZooKeeper, watch chan Event, err Error) { |
1957 | - zk, watch, err = internalInit(servers, recvTimeoutNS, nil) |
1958 | - return |
1959 | +// The returned channel is used to receive events about the current |
1960 | +// status of the connection. On long running applications the session channel |
1961 | +// must *necessarily* be observed since certain events like session expirations require an |
1962 | +// explicit reconnection and reestablishment of state (or bailing out). |
1963 | +// Because of that, the buffer used on the session channel has a limited |
1964 | +// size, and a panic will occur if too many events are not collected. |
1965 | +func Dial(servers string, recvTimeoutNS int64) (*Conn, <-chan SessionStatus, os.Error) { |
1966 | + return dial(servers, recvTimeoutNS, nil) |
1967 | } |
1968 | |
1969 | -// Equivalent to Init, but attempt to reestablish an existing session |
1970 | +// Redial is equivalent to Dial, but attempts to reestablish an existing session |
1971 | // identified via the clientId parameter. |
1972 | -func ReInit(servers string, recvTimeoutNS int64, clientId *ClientId) (zk *ZooKeeper, watch chan Event, err Error) { |
1973 | - zk, watch, err = internalInit(servers, recvTimeoutNS, clientId) |
1974 | - return |
1975 | +func Redial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan SessionStatus, os.Error) { |
1976 | + return dial(servers, recvTimeoutNS, clientId) |
1977 | } |
1978 | |
1979 | -func internalInit(servers string, recvTimeoutNS int64, clientId *ClientId) (*ZooKeeper, chan Event, Error) { |
1980 | - |
1981 | - zk := &ZooKeeper{} |
1982 | - zk.watchChannels = make(map[uintptr]chan Event) |
1983 | +func dial(servers string, recvTimeoutNS int64, clientId *ClientId) (*Conn, <-chan SessionStatus, os.Error) { |
1984 | + conn := &Conn{} |
1985 | + conn.watchChannels = make(map[uintptr]statusChan) |
1986 | |
1987 | var cId *C.clientid_t |
1988 | if clientId != nil { |
1989 | cId = &clientId.cId |
1990 | } |
1991 | |
1992 | - watchId, watchChannel := zk.createWatch(true) |
1993 | - zk.sessionWatchId = watchId |
1994 | + watchChannel := make(sessionStatusChan, 32) |
1995 | + watchId := conn.createWatch(watchChannel) |
1996 | + conn.sessionWatchId = watchId |
1997 | |
1998 | cservers := C.CString(servers) |
1999 | - handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS / 1e6), cId, unsafe.Pointer(watchId), 0) |
2000 | + handle, cerr := C.zookeeper_init(cservers, C.watch_handler, C.int(recvTimeoutNS/1e6), cId, unsafe.Pointer(watchId), 0) |
2001 | C.free(unsafe.Pointer(cservers)) |
2002 | if handle == nil { |
2003 | - zk.closeAllWatches() |
2004 | - return nil, nil, newError(ZSYSTEMERROR, cerr) |
2005 | + conn.closeAllWatches() |
2006 | + return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr) |
2007 | } |
2008 | - zk.handle = handle |
2009 | + conn.handle = handle |
2010 | runWatchLoop() |
2011 | - return zk, watchChannel, nil |
2012 | + return conn, watchChannel, nil |
2013 | } |
2014 | |
2015 | // ClientId returns the client ID for the existing session with ZooKeeper. |
2016 | // This is useful to reestablish an existing session via ReInit. |
2017 | -func (zk *ZooKeeper) ClientId() *ClientId { |
2018 | - return &ClientId{*C.zoo_client_id(zk.handle)} |
2019 | +func (conn *Conn) ClientId() *ClientId { |
2020 | + return &ClientId{*C.zoo_client_id(conn.handle)} |
2021 | } |
2022 | |
2023 | // Close terminates the ZooKeeper interaction. |
2024 | -func (zk *ZooKeeper) Close() Error { |
2025 | - |
2026 | - // Protect from concurrency around zk.handle change. |
2027 | - zk.mutex.Lock() |
2028 | - defer zk.mutex.Unlock() |
2029 | - |
2030 | - if zk.handle == nil { |
2031 | +func (conn *Conn) Close() os.Error { |
2032 | + |
2033 | + // Protect from concurrency around conn.handle change. |
2034 | + conn.mutex.Lock() |
2035 | + defer conn.mutex.Unlock() |
2036 | + |
2037 | + if conn.handle == nil { |
2038 | // ZooKeeper may hang indefinitely if a handler is closed twice, |
2039 | // so we get in the way and prevent it from happening. |
2040 | - return newError(ZCLOSING, nil) |
2041 | + return ZCLOSING |
2042 | } |
2043 | - rc, cerr := C.zookeeper_close(zk.handle) |
2044 | + rc, cerr := C.zookeeper_close(conn.handle) |
2045 | |
2046 | - zk.closeAllWatches() |
2047 | + conn.closeAllWatches() |
2048 | stopWatchLoop() |
2049 | |
2050 | - // At this point, nothing else should need zk.handle. |
2051 | - zk.handle = nil |
2052 | + // At this point, nothing else should need conn.handle. |
2053 | + conn.handle = nil |
2054 | |
2055 | - if rc != C.ZOK { |
2056 | - return newError(rc, cerr) |
2057 | - } |
2058 | - return nil |
2059 | + return zkError(rc, cerr) |
2060 | } |
2061 | |
2062 | // Get returns the data and status from an existing node. err will be nil, |
2063 | // unless an error is found. Attempting to retrieve data from a non-existing |
2064 | // node is an error. |
2065 | -func (zk *ZooKeeper) Get(path string) (data string, stat Stat, err Error) { |
2066 | +func (conn *Conn) Get(path string) (data string, stat *Stat, err os.Error) { |
2067 | |
2068 | cpath := C.CString(path) |
2069 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
2070 | @@ -478,22 +384,22 @@ |
2071 | defer C.free(unsafe.Pointer(cpath)) |
2072 | defer C.free(unsafe.Pointer(cbuffer)) |
2073 | |
2074 | - cstat := C.struct_Stat{} |
2075 | - rc, cerr := C.zoo_wget(zk.handle, cpath, nil, nil, |
2076 | - cbuffer, &cbufferLen, &cstat) |
2077 | + var cstat Stat |
2078 | + rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c) |
2079 | if rc != C.ZOK { |
2080 | - return "", nil, newError(rc, cerr) |
2081 | + return "", nil, zkError(rc, cerr) |
2082 | } |
2083 | + |
2084 | result := C.GoStringN(cbuffer, cbufferLen) |
2085 | - |
2086 | - return result, (*resultStat)(&cstat), nil |
2087 | + return result, &cstat, nil |
2088 | } |
2089 | |
2090 | // GetW works like Get but also returns a channel that will receive |
2091 | -// a single Event value when the data or existence of the given ZooKeeper |
2092 | -// node changes or when critical session events happen. See the |
2093 | -// documentation of the Event type for more details. |
2094 | -func (zk *ZooKeeper) GetW(path string) (data string, stat Stat, watch chan Event, err Error) { |
2095 | +// a single value, one of NodeChanged or NodeDeleted, when the |
2096 | +// node contents change or it is deleted. |
2097 | +// If the ZooKeeper server goes down before an |
2098 | +// event happens, the channel will be closed. |
2099 | +func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan NodeStatus, err os.Error) { |
2100 | |
2101 | cpath := C.CString(path) |
2102 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
2103 | @@ -501,82 +407,94 @@ |
2104 | defer C.free(unsafe.Pointer(cpath)) |
2105 | defer C.free(unsafe.Pointer(cbuffer)) |
2106 | |
2107 | - watchId, watchChannel := zk.createWatch(true) |
2108 | + watchChannel := newNodeStatusChan(path) |
2109 | + watchId := conn.createWatch(watchChannel) |
2110 | |
2111 | - cstat := C.struct_Stat{} |
2112 | - rc, cerr := C.zoo_wget(zk.handle, cpath, |
2113 | - C.watch_handler, unsafe.Pointer(watchId), |
2114 | - cbuffer, &cbufferLen, &cstat) |
2115 | + var cstat Stat |
2116 | + rc, cerr := C.zoo_wget(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), cbuffer, &cbufferLen, &cstat.c) |
2117 | if rc != C.ZOK { |
2118 | - zk.forgetWatch(watchId) |
2119 | - return "", nil, nil, newError(rc, cerr) |
2120 | + conn.forgetWatch(watchId) |
2121 | + return "", nil, nil, zkError(rc, cerr) |
2122 | } |
2123 | |
2124 | result := C.GoStringN(cbuffer, cbufferLen) |
2125 | - return result, (*resultStat)(&cstat), watchChannel, nil |
2126 | + return result, &cstat, watchChannel.c, nil |
2127 | } |
2128 | |
2129 | // Children returns the children list and status from an existing node. |
2130 | -// err will be nil, unless an error is found. Attempting to retrieve the |
2131 | -// children list from a non-existent node is an error. |
2132 | -func (zk *ZooKeeper) Children(path string) (children []string, stat Stat, err Error) { |
2133 | +// Attempting to retrieve the children list from a non-existent node is an error. |
2134 | +func (conn *Conn) Children(path string) (children []string, stat *Stat, err os.Error) { |
2135 | |
2136 | cpath := C.CString(path) |
2137 | defer C.free(unsafe.Pointer(cpath)) |
2138 | |
2139 | cvector := C.struct_String_vector{} |
2140 | - cstat := C.struct_Stat{} |
2141 | - rc, cerr := C.zoo_wget_children2(zk.handle, cpath, nil, nil, |
2142 | - &cvector, &cstat) |
2143 | + var cstat Stat |
2144 | + rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c) |
2145 | |
2146 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
2147 | if cvector.count != 0 { |
2148 | children = parseStringVector(&cvector) |
2149 | } |
2150 | - if rc != C.ZOK { |
2151 | - err = newError(rc, cerr) |
2152 | + if rc == C.ZOK { |
2153 | + stat = &cstat |
2154 | } else { |
2155 | - stat = (*resultStat)(&cstat) |
2156 | + err = zkError(rc, cerr) |
2157 | } |
2158 | return |
2159 | } |
2160 | |
2161 | // ChildrenW works like Children but also returns a channel that will |
2162 | -// receive a single Event value when a node is added or removed under the |
2163 | -// provided path or when critical session events happen. See the documentation |
2164 | -// of the Event type for more details. |
2165 | -func (zk *ZooKeeper) ChildrenW(path string) (children []string, stat Stat, watch chan Event, err Error) { |
2166 | +// receive a single value, one of NodeDeleted or NodeChangedChild, |
2167 | +// when a node is added or removed under the |
2168 | +// provided path or when the node itself is deleted. If the ZooKeeper server |
2169 | +// goes down before an event happens, the channel will be closed. |
2170 | +func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan NodeStatus, err os.Error) { |
2171 | |
2172 | cpath := C.CString(path) |
2173 | defer C.free(unsafe.Pointer(cpath)) |
2174 | |
2175 | - watchId, watchChannel := zk.createWatch(true) |
2176 | + watchChannel := newNodeStatusChan(path) |
2177 | + watchId := conn.createWatch(watchChannel) |
2178 | |
2179 | cvector := C.struct_String_vector{} |
2180 | - cstat := C.struct_Stat{} |
2181 | - rc, cerr := C.zoo_wget_children2(zk.handle, cpath, |
2182 | - C.watch_handler, unsafe.Pointer(watchId), |
2183 | - &cvector, &cstat) |
2184 | + var cstat Stat |
2185 | + rc, cerr := C.zoo_wget_children2(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cvector, &cstat.c) |
2186 | |
2187 | // Can't happen if rc != 0, but avoid potential memory leaks in the future. |
2188 | if cvector.count != 0 { |
2189 | children = parseStringVector(&cvector) |
2190 | } |
2191 | - if rc != C.ZOK { |
2192 | - zk.forgetWatch(watchId) |
2193 | - err = newError(rc, cerr) |
2194 | + if rc == C.ZOK { |
2195 | + stat = &cstat |
2196 | + watch = watchChannel.c |
2197 | } else { |
2198 | - stat = (*resultStat)(&cstat) |
2199 | - watch = watchChannel |
2200 | + conn.forgetWatch(watchId) |
2201 | + err = zkError(rc, cerr) |
2202 | } |
2203 | return |
2204 | } |
2205 | |
2206 | +// an arguably nicer version of parseStringVector (untested) |
2207 | +//func parseStringVector(c *C.struct_String_vector) []string { |
2208 | +// // make a Go slice onto the C data. |
2209 | +// cv := *(*[]*C.char)(&reflect.SliceHeader{ |
2210 | +// Data: uintptr(unsafe.Pointer(c.data)), |
2211 | +// Len: c.count, |
2212 | +// Cap: c.count, |
2213 | +// }) |
2214 | +// v := make([]string, c.count) |
2215 | +// for i, s := range cv { |
2216 | +// v[i] = C.GoString(s) |
2217 | +// } |
2218 | +// return v |
2219 | +//} |
2220 | + |
2221 | func parseStringVector(cvector *C.struct_String_vector) []string { |
2222 | vector := make([]string, cvector.count) |
2223 | dataStart := uintptr(unsafe.Pointer(cvector.data)) |
2224 | uintptrSize := unsafe.Sizeof(dataStart) |
2225 | - for i := 0; i != len(vector); i++ { |
2226 | + for i := range vector { |
2227 | cpathPos := dataStart + uintptr(i)*uintptrSize |
2228 | cpath := *(**C.char)(unsafe.Pointer(cpathPos)) |
2229 | vector[i] = C.GoString(cpath) |
2230 | @@ -588,51 +506,50 @@ |
2231 | // Exists checks if a node exists at the given path. If it does, |
2232 | // stat will contain meta information on the existing node, otherwise |
2233 | // it will be nil. |
2234 | -func (zk *ZooKeeper) Exists(path string) (stat Stat, err Error) { |
2235 | +func (conn *Conn) Exists(path string) (stat *Stat, err os.Error) { |
2236 | cpath := C.CString(path) |
2237 | defer C.free(unsafe.Pointer(cpath)) |
2238 | |
2239 | - cstat := C.struct_Stat{} |
2240 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, nil, nil, &cstat) |
2241 | + var cstat Stat |
2242 | + rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &stat.c) |
2243 | |
2244 | // We diverge a bit from the usual here: a ZNONODE is not an error |
2245 | // for an exists call, otherwise every Exists call would have to check |
2246 | // for err != nil and err.Code() != ZNONODE. |
2247 | if rc == C.ZOK { |
2248 | - stat = (*resultStat)(&cstat) |
2249 | + stat = &cstat |
2250 | } else if rc != C.ZNONODE { |
2251 | - err = newError(rc, cerr) |
2252 | + err = zkError(rc, cerr) |
2253 | } |
2254 | return |
2255 | } |
2256 | |
2257 | // ExistsW works like Exists but also returns a channel that will |
2258 | -// receive an Event value when a node is created in case the returned |
2259 | -// stat is nil and the node didn't exist, or when the existing node |
2260 | -// is removed. It will also receive critical session events. See the |
2261 | -// documentation of the Event type for more details. |
2262 | -func (zk *ZooKeeper) ExistsW(path string) (stat Stat, watch chan Event, err Error) { |
2263 | +// receive a single value, one of NodeCreated, NodeDeleted or NodeChanged, |
2264 | +// when the node is next created, removed, or its contents change. |
2265 | +// If the ZooKeeper server goes down before an event happens, the channel will be closed. |
2266 | +func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan NodeStatus, err os.Error) { |
2267 | cpath := C.CString(path) |
2268 | defer C.free(unsafe.Pointer(cpath)) |
2269 | |
2270 | - watchId, watchChannel := zk.createWatch(true) |
2271 | + watchChannel := newNodeStatusChan(path) |
2272 | + watchId := conn.createWatch(watchChannel) |
2273 | |
2274 | - cstat := C.struct_Stat{} |
2275 | - rc, cerr := C.zoo_wexists(zk.handle, cpath, |
2276 | - C.watch_handler, unsafe.Pointer(watchId), &cstat) |
2277 | + var cstat Stat |
2278 | + rc, cerr := C.zoo_wexists(conn.handle, cpath, C.watch_handler, unsafe.Pointer(watchId), &cstat.c) |
2279 | |
2280 | // We diverge a bit from the usual here: a ZNONODE is not an error |
2281 | // for an exists call, otherwise every Exists call would have to check |
2282 | // for err != nil and err.Code() != ZNONODE. |
2283 | - switch rc { |
2284 | + switch Error(rc) { |
2285 | case ZOK: |
2286 | - stat = (*resultStat)(&cstat) |
2287 | - watch = watchChannel |
2288 | + stat = &cstat |
2289 | + watch = watchChannel.c |
2290 | case ZNONODE: |
2291 | - watch = watchChannel |
2292 | + watch = watchChannel.c |
2293 | default: |
2294 | - zk.forgetWatch(watchId) |
2295 | - err = newError(rc, cerr) |
2296 | + conn.forgetWatch(watchId) |
2297 | + err = zkError(rc, cerr) |
2298 | } |
2299 | return |
2300 | } |
2301 | @@ -646,7 +563,7 @@ |
2302 | // The returned path is useful in cases where the created path may differ |
2303 | // from the requested one, such as when a sequence number is appended |
2304 | // to it due to the use of the gozk.SEQUENCE flag. |
2305 | -func (zk *ZooKeeper) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err Error) { |
2306 | +func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err os.Error) { |
2307 | cpath := C.CString(path) |
2308 | cvalue := C.CString(value) |
2309 | defer C.free(unsafe.Pointer(cpath)) |
2310 | @@ -660,13 +577,13 @@ |
2311 | cpathCreated := (*C.char)(C.malloc(cpathLen)) |
2312 | defer C.free(unsafe.Pointer(cpathCreated)) |
2313 | |
2314 | - rc, cerr := C.zoo_create(zk.handle, cpath, cvalue, C.int(len(value)), |
2315 | - caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
2316 | - if rc != C.ZOK { |
2317 | - return "", newError(rc, cerr) |
2318 | + rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen)) |
2319 | + if rc == C.ZOK { |
2320 | + pathCreated = C.GoString(cpathCreated) |
2321 | + } else { |
2322 | + err = zkError(rc, cerr) |
2323 | } |
2324 | - |
2325 | - return C.GoString(cpathCreated), nil |
2326 | + return |
2327 | } |
2328 | |
2329 | // Set modifies the data for the existing node at the given path, replacing it |
2330 | @@ -677,35 +594,31 @@ |
2331 | // |
2332 | // It is an error to attempt to set the data of a non-existing node with |
2333 | // this function. In these cases, use Create instead. |
2334 | -func (zk *ZooKeeper) Set(path, value string, version int32) (stat Stat, err Error) { |
2335 | +func (conn *Conn) Set(path, value string, version int32) (stat *Stat, err os.Error) { |
2336 | |
2337 | cpath := C.CString(path) |
2338 | cvalue := C.CString(value) |
2339 | defer C.free(unsafe.Pointer(cpath)) |
2340 | defer C.free(unsafe.Pointer(cvalue)) |
2341 | |
2342 | - cstat := C.struct_Stat{} |
2343 | - |
2344 | - rc, cerr := C.zoo_set2(zk.handle, cpath, cvalue, C.int(len(value)), |
2345 | - C.int(version), &cstat) |
2346 | - if rc != C.ZOK { |
2347 | - return nil, newError(rc, cerr) |
2348 | + var cstat Stat |
2349 | + rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c) |
2350 | + if rc == C.ZOK { |
2351 | + stat = &cstat |
2352 | + } else { |
2353 | + err = zkError(rc, cerr) |
2354 | } |
2355 | - |
2356 | - return (*resultStat)(&cstat), nil |
2357 | + return |
2358 | } |
2359 | |
2360 | // Delete removes the node at path. If version is not -1, the operation |
2361 | // will only succeed if the node is still at this version when the |
2362 | // node is deleted as an atomic operation. |
2363 | -func (zk *ZooKeeper) Delete(path string, version int32) (err Error) { |
2364 | +func (conn *Conn) Delete(path string, version int32) (err os.Error) { |
2365 | cpath := C.CString(path) |
2366 | defer C.free(unsafe.Pointer(cpath)) |
2367 | - rc, cerr := C.zoo_delete(zk.handle, cpath, C.int(version)) |
2368 | - if rc != C.ZOK { |
2369 | - return newError(rc, cerr) |
2370 | - } |
2371 | - return nil |
2372 | + rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) |
2373 | + return zkError(rc, cerr) |
2374 | } |
2375 | |
2376 | // AddAuth adds a new authentication certificate to the ZooKeeper |
2377 | @@ -713,7 +626,7 @@ |
2378 | // authentication information, while the cert parameter provides the |
2379 | // identity data itself. For instance, the "digest" scheme requires |
2380 | // a pair like "username:password" to be provided as the certificate. |
2381 | -func (zk *ZooKeeper) AddAuth(scheme, cert string) Error { |
2382 | +func (conn *Conn) AddAuth(scheme, cert string) os.Error { |
2383 | cscheme := C.CString(scheme) |
2384 | ccert := C.CString(cert) |
2385 | defer C.free(unsafe.Pointer(cscheme)) |
2386 | @@ -725,43 +638,38 @@ |
2387 | } |
2388 | defer C.destroy_completion_data(data) |
2389 | |
2390 | - rc, cerr := C.zoo_add_auth(zk.handle, cscheme, ccert, C.int(len(cert)), |
2391 | - C.handle_void_completion, unsafe.Pointer(data)) |
2392 | + rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data)) |
2393 | if rc != C.ZOK { |
2394 | - return newError(rc, cerr) |
2395 | + return zkError(rc, cerr) |
2396 | } |
2397 | |
2398 | C.wait_for_completion(data) |
2399 | |
2400 | rc = C.int(uintptr(data.data)) |
2401 | - if rc != C.ZOK { |
2402 | - return newError(rc, nil) |
2403 | - } |
2404 | - |
2405 | - return nil |
2406 | + return zkError(rc, nil) |
2407 | } |
2408 | |
2409 | // ACL returns the access control list for path. |
2410 | -func (zk *ZooKeeper) ACL(path string) ([]ACL, Stat, Error) { |
2411 | +func (conn *Conn) ACL(path string) ([]ACL, *Stat, os.Error) { |
2412 | |
2413 | cpath := C.CString(path) |
2414 | defer C.free(unsafe.Pointer(cpath)) |
2415 | |
2416 | caclv := C.struct_ACL_vector{} |
2417 | - cstat := C.struct_Stat{} |
2418 | |
2419 | - rc, cerr := C.zoo_get_acl(zk.handle, cpath, &caclv, &cstat) |
2420 | + var cstat Stat |
2421 | + rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) |
2422 | if rc != C.ZOK { |
2423 | - return nil, nil, newError(rc, cerr) |
2424 | + return nil, nil, zkError(rc, cerr) |
2425 | } |
2426 | |
2427 | aclv := parseACLVector(&caclv) |
2428 | |
2429 | - return aclv, (*resultStat)(&cstat), nil |
2430 | + return aclv, &cstat, nil |
2431 | } |
2432 | |
2433 | // SetACL changes the access control list for path. |
2434 | -func (zk *ZooKeeper) SetACL(path string, aclv []ACL, version int32) Error { |
2435 | +func (conn *Conn) SetACL(path string, aclv []ACL, version int32) os.Error { |
2436 | |
2437 | cpath := C.CString(path) |
2438 | defer C.free(unsafe.Pointer(cpath)) |
2439 | @@ -769,12 +677,8 @@ |
2440 | caclv := buildACLVector(aclv) |
2441 | defer C.deallocate_ACL_vector(caclv) |
2442 | |
2443 | - rc, cerr := C.zoo_set_acl(zk.handle, cpath, C.int(version), caclv) |
2444 | - if rc != C.ZOK { |
2445 | - return newError(rc, cerr) |
2446 | - } |
2447 | - |
2448 | - return nil |
2449 | + rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) |
2450 | + return zkError(rc, cerr) |
2451 | } |
2452 | |
2453 | func parseACLVector(caclv *C.struct_ACL_vector) []ACL { |
2454 | @@ -822,7 +726,7 @@ |
2455 | // ----------------------------------------------------------------------- |
2456 | // RetryChange utility method. |
2457 | |
2458 | -type ChangeFunc func(oldValue string, oldStat Stat) (newValue string, err os.Error) |
2459 | +type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err os.Error) |
2460 | |
2461 | // RetryChange runs changeFunc to attempt to atomically change path |
2462 | // in a lock free manner, and retries in case there was another |
2463 | @@ -831,8 +735,7 @@ |
2464 | // changeFunc must work correctly if called multiple times in case |
2465 | // the modification fails due to concurrent changes, and it may return |
2466 | // an error that will cause the the RetryChange function to stop and |
2467 | -// return an Error with code ZSYSTEMERROR and the same .String() result |
2468 | -// as the provided error. |
2469 | +// return the same error. |
2470 | // |
2471 | // This mechanism is not suitable for a node that is frequently modified |
2472 | // concurrently. For those cases, consider using a pessimistic locking |
2473 | @@ -845,8 +748,7 @@ |
2474 | // |
2475 | // 2. Call the changeFunc with the current node value and stat, |
2476 | // or with an empty string and nil stat, if the node doesn't yet exist. |
2477 | -// If the changeFunc returns an error, stop and return an Error with |
2478 | -// ZSYSTEMERROR Code() and the same String() as the changeFunc error. |
2479 | +// If the changeFunc returns an error, stop and return the same error. |
2480 | // |
2481 | // 3. If the changeFunc returns no errors, use the string returned as |
2482 | // the new candidate value for the node, and attempt to either create |
2483 | @@ -855,36 +757,32 @@ |
2484 | // in the same node), repeat from step 1. If this procedure fails with any |
2485 | // other error, stop and return the error found. |
2486 | // |
2487 | -func (zk *ZooKeeper) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) (err Error) { |
2488 | +func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) os.Error { |
2489 | for { |
2490 | - oldValue, oldStat, getErr := zk.Get(path) |
2491 | - if getErr != nil && getErr.Code() != ZNONODE { |
2492 | - err = getErr |
2493 | - break |
2494 | - } |
2495 | - newValue, osErr := changeFunc(oldValue, oldStat) |
2496 | - if osErr != nil { |
2497 | - return newError(ZSYSTEMERROR, osErr) |
2498 | - } else if oldStat == nil { |
2499 | - _, err = zk.Create(path, newValue, flags, acl) |
2500 | - if err == nil || err.Code() != ZNODEEXISTS { |
2501 | - break |
2502 | + oldValue, oldStat, err := conn.Get(path) |
2503 | + if err != nil && err != ZNONODE { |
2504 | + return err |
2505 | + } |
2506 | + newValue, err := changeFunc(oldValue, oldStat) |
2507 | + if err != nil { |
2508 | + return err |
2509 | + } |
2510 | + if oldStat == nil { |
2511 | + _, err := conn.Create(path, newValue, flags, acl) |
2512 | + if err == nil || err != ZNODEEXISTS { |
2513 | + return err |
2514 | } |
2515 | - } else if newValue == oldValue { |
2516 | + continue |
2517 | + } |
2518 | + if newValue == oldValue { |
2519 | return nil // Nothing to do. |
2520 | - } else { |
2521 | - _, err = zk.Set(path, newValue, oldStat.Version()) |
2522 | - if err == nil { |
2523 | - break |
2524 | - } else { |
2525 | - code := err.Code() |
2526 | - if code != ZBADVERSION && code != ZNONODE { |
2527 | - break |
2528 | - } |
2529 | - } |
2530 | + } |
2531 | + _, err = conn.Set(path, newValue, oldStat.Version()) |
2532 | + if err == nil || (err != ZBADVERSION && err != ZNONODE) { |
2533 | + return err |
2534 | } |
2535 | } |
2536 | - return err |
2537 | + panic("not reached") |
2538 | } |
2539 | |
2540 | // ----------------------------------------------------------------------- |
2541 | @@ -897,7 +795,7 @@ |
2542 | // Whenever a *W method is called, it will return a channel which |
2543 | // outputs Event values. Internally, a map is used to maintain references |
2544 | // between an unique integer key (the watchId), and the event channel. The |
2545 | -// watchId is then handed to the C zookeeper library as the watch context, |
2546 | +// watchId is then handed to the C ZooKeeper library as the watch context, |
2547 | // so that we get it back when events happen. Using an integer key as the |
2548 | // watch context rather than a pointer is needed because there's no guarantee |
2549 | // that in the future the GC will not move objects around, and also because |
2550 | @@ -910,13 +808,13 @@ |
2551 | // Since Cgo doesn't allow calling back into Go, we actually fire a new |
2552 | // goroutine the very first time Init is called, and allow it to block |
2553 | // in a pthread condition variable within a C function. This condition |
2554 | -// will only be notified once a zookeeper watch callback appends new |
2555 | +// will only be notified once a ZooKeeper watch callback appends new |
2556 | // entries to the event list. When this happens, the C function returns |
2557 | // and we get back into Go land with the pointer to the watch data, |
2558 | // including the watchId and other event details such as type and path. |
2559 | |
2560 | var watchMutex sync.Mutex |
2561 | -var watchZooKeepers = make(map[uintptr]*ZooKeeper) |
2562 | +var watchConns = make(map[uintptr]*Conn) |
2563 | var watchCounter uintptr |
2564 | var watchLoopCounter int |
2565 | |
2566 | @@ -925,25 +823,20 @@ |
2567 | // mostly as a debugging and testing aid. |
2568 | func CountPendingWatches() int { |
2569 | watchMutex.Lock() |
2570 | - count := len(watchZooKeepers) |
2571 | + count := len(watchConns) |
2572 | watchMutex.Unlock() |
2573 | return count |
2574 | } |
2575 | |
2576 | -// createWatch creates and registers a watch, returning the watch id |
2577 | -// and channel. |
2578 | -func (zk *ZooKeeper) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { |
2579 | - buf := 1 // session/watch event |
2580 | - if session { |
2581 | - buf = 32 |
2582 | - } |
2583 | - watchChannel = make(chan Event, buf) |
2584 | +// createWatch creates a watch that will send events down the provided |
2585 | +// channel, which must be one of the four possible event channel types. |
2586 | +func (conn *Conn) createWatch(watchChannel statusChan) (watchId uintptr) { |
2587 | watchMutex.Lock() |
2588 | defer watchMutex.Unlock() |
2589 | watchId = watchCounter |
2590 | watchCounter += 1 |
2591 | - zk.watchChannels[watchId] = watchChannel |
2592 | - watchZooKeepers[watchId] = zk |
2593 | + conn.watchChannels[watchId] = watchChannel |
2594 | + watchConns[watchId] = conn |
2595 | return |
2596 | } |
2597 | |
2598 | @@ -951,67 +844,71 @@ |
2599 | // from ever getting delivered. It shouldn't be used if there's any |
2600 | // chance the watch channel is still visible and not closed, since |
2601 | // it might mean a goroutine would be blocked forever. |
2602 | -func (zk *ZooKeeper) forgetWatch(watchId uintptr) { |
2603 | +func (conn *Conn) forgetWatch(watchId uintptr) { |
2604 | watchMutex.Lock() |
2605 | defer watchMutex.Unlock() |
2606 | - zk.watchChannels[watchId] = nil, false |
2607 | - watchZooKeepers[watchId] = nil, false |
2608 | + conn.watchChannels[watchId] = nil, false |
2609 | + watchConns[watchId] = nil, false |
2610 | } |
2611 | |
2612 | -// closeAllWatches closes all watch channels for zk. |
2613 | -func (zk *ZooKeeper) closeAllWatches() { |
2614 | +// closeAllWatches closes all watch channels for conn. |
2615 | +func (conn *Conn) closeAllWatches() { |
2616 | watchMutex.Lock() |
2617 | defer watchMutex.Unlock() |
2618 | - for watchId, ch := range zk.watchChannels { |
2619 | - close(ch) |
2620 | - zk.watchChannels[watchId] = nil, false |
2621 | - watchZooKeepers[watchId] = nil, false |
2622 | + for watchId, ch := range conn.watchChannels { |
2623 | + ch.close() |
2624 | + conn.watchChannels[watchId] = nil, false |
2625 | + watchConns[watchId] = nil, false |
2626 | } |
2627 | } |
2628 | |
2629 | // sendEvent delivers the event to the watchId event channel. If the |
2630 | // event channel is a watch event channel, the event is delivered, |
2631 | // the channel is closed, and resources are freed. |
2632 | -func sendEvent(watchId uintptr, event Event) { |
2633 | - if event.State == STATE_CLOSED { |
2634 | - panic("Attempted to send a CLOSED event") |
2635 | +func sendEvent(watchId uintptr, eventType, connectionState int, path string) { |
2636 | + if eventType == event_SESSION && connectionState == 0 { |
2637 | + panic("Attempted to send a CLOSED connection status") |
2638 | } |
2639 | watchMutex.Lock() |
2640 | defer watchMutex.Unlock() |
2641 | - zk, ok := watchZooKeepers[watchId] |
2642 | + conn, ok := watchConns[watchId] |
2643 | if !ok { |
2644 | return |
2645 | } |
2646 | - if event.Type == EVENT_SESSION && watchId != zk.sessionWatchId { |
2647 | - switch event.State { |
2648 | - case STATE_EXPIRED_SESSION, STATE_AUTH_FAILED: |
2649 | + ch := conn.watchChannels[watchId] |
2650 | + if ch == nil { |
2651 | + return |
2652 | + } |
2653 | + if eventType == event_SESSION && watchId != conn.sessionWatchId { |
2654 | + switch SessionStatus(connectionState) { |
2655 | + case SessionExpired, SessionAuthFailed: |
2656 | + // The channel will be closed below. |
2657 | default: |
2658 | + // Ignore non-critical session events. |
2659 | // WTF? Feels like TCP saying "dropped a dup packet, ok?" |
2660 | return |
2661 | } |
2662 | - } |
2663 | - ch := zk.watchChannels[watchId] |
2664 | - if ch == nil { |
2665 | - return |
2666 | - } |
2667 | - select { |
2668 | - case ch <- event: |
2669 | - default: |
2670 | - // Channel not available for sending, which means session |
2671 | - // events are necessarily involved (trivial events go |
2672 | - // straight to the buffer), and the application isn't paying |
2673 | - // attention for long enough to have the buffer filled up. |
2674 | - // Break down now rather than leaking forever. |
2675 | - if watchId == zk.sessionWatchId { |
2676 | - panic("Session event channel buffer is full") |
2677 | - } else { |
2678 | - panic("Watch event channel buffer is full") |
2679 | - } |
2680 | - } |
2681 | - if watchId != zk.sessionWatchId { |
2682 | - zk.watchChannels[watchId] = nil, false |
2683 | - watchZooKeepers[watchId] = nil, false |
2684 | - close(ch) |
2685 | + } else { |
2686 | + if eventType != event_SESSION && watchId == conn.sessionWatchId { |
2687 | + panic(fmt.Errorf("unexpected non-session event %v %v", eventType, connectionState)) |
2688 | + } |
2689 | + if !ch.send(eventType, connectionState, path) { |
2690 | + // Channel not available for sending, which means session |
2691 | + // events are necessarily involved (trivial events go |
2692 | + // straight to the buffer), and the application isn't paying |
2693 | + // attention for long enough to have the buffer filled up. |
2694 | + // Break down now rather than leaking forever. |
2695 | + if watchId == conn.sessionWatchId { |
2696 | + panic("Session event channel buffer is full") |
2697 | + } else { |
2698 | + panic("Watch event channel buffer is full (impossible!)") |
2699 | + } |
2700 | + } |
2701 | + } |
2702 | + if watchId != conn.sessionWatchId { |
2703 | + conn.watchChannels[watchId] = nil, false |
2704 | + watchConns[watchId] = nil, false |
2705 | + ch.close() |
2706 | } |
2707 | } |
2708 | |
2709 | @@ -1049,13 +946,8 @@ |
2710 | for { |
2711 | // This will block until there's a watch event is available. |
2712 | data := C.wait_for_watch() |
2713 | - event := Event{ |
2714 | - Type: int(data.event_type), |
2715 | - Path: C.GoString(data.event_path), |
2716 | - State: int(data.connection_state), |
2717 | - } |
2718 | watchId := uintptr(data.watch_context) |
2719 | + sendEvent(watchId, int(data.event_type), int(data.connection_state), C.GoString(data.event_path)) |
2720 | C.destroy_watch_data(data) |
2721 | - sendEvent(watchId, event) |
2722 | } |
2723 | } |
2724 | |
2725 | === renamed file 'gozk_test.go' => 'zk_test.go' |
2726 | --- gozk_test.go 2011-08-19 01:51:37 +0000 |
2727 | +++ zk_test.go 2011-09-30 13:35:26 +0000 |
2728 | @@ -1,17 +1,17 @@ |
2729 | -package gozk_test |
2730 | +package zk_test |
2731 | |
2732 | import ( |
2733 | . "launchpad.net/gocheck" |
2734 | - "gozk" |
2735 | + "launchpad.net/gozk/zk" |
2736 | "time" |
2737 | ) |
2738 | |
2739 | // This error will be delivered via C errno, since ZK unfortunately |
2740 | // only provides the handler back from zookeeper_init(). |
2741 | func (s *S) TestInitErrorThroughErrno(c *C) { |
2742 | - zk, watch, err := gozk.Init("bad-domain-without-port", 5e9) |
2743 | - if zk != nil { |
2744 | - zk.Close() |
2745 | + conn, watch, err := zk.Dial("bad-domain-without-port", 5e9) |
2746 | + if conn != nil { |
2747 | + conn.Close() |
2748 | } |
2749 | if watch != nil { |
2750 | go func() { |
2751 | @@ -23,15 +23,15 @@ |
2752 | } |
2753 | }() |
2754 | } |
2755 | - c.Assert(zk, IsNil) |
2756 | + c.Assert(conn, IsNil) |
2757 | c.Assert(watch, IsNil) |
2758 | c.Assert(err, Matches, "invalid argument") |
2759 | } |
2760 | |
2761 | func (s *S) TestRecvTimeoutInitParameter(c *C) { |
2762 | - zk, watch, err := gozk.Init(s.zkAddr, 0) |
2763 | + conn, watch, err := zk.Dial(s.zkAddr, 0) |
2764 | c.Assert(err, IsNil) |
2765 | - defer zk.Close() |
2766 | + defer conn.Close() |
2767 | |
2768 | select { |
2769 | case <-watch: |
2770 | @@ -40,7 +40,7 @@ |
2771 | } |
2772 | |
2773 | for i := 0; i != 1000; i++ { |
2774 | - _, _, err := zk.Get("/zookeeper") |
2775 | + _, _, err := conn.Get("/zookeeper") |
2776 | if err != nil { |
2777 | c.Assert(err, Matches, "operation timeout") |
2778 | c.SucceedNow() |
2779 | @@ -51,88 +51,75 @@ |
2780 | } |
2781 | |
2782 | func (s *S) TestSessionWatches(c *C) { |
2783 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2784 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2785 | |
2786 | zk1, watch1 := s.init(c) |
2787 | zk2, watch2 := s.init(c) |
2788 | zk3, watch3 := s.init(c) |
2789 | |
2790 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2791 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2792 | |
2793 | event1 := <-watch1 |
2794 | - c.Assert(event1.Type, Equals, gozk.EVENT_SESSION) |
2795 | - c.Assert(event1.State, Equals, gozk.STATE_CONNECTED) |
2796 | + c.Assert(event1, Equals, zk.SessionConnected) |
2797 | |
2798 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2799 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2800 | |
2801 | event2 := <-watch2 |
2802 | - c.Assert(event2.Type, Equals, gozk.EVENT_SESSION) |
2803 | - c.Assert(event2.State, Equals, gozk.STATE_CONNECTED) |
2804 | + c.Assert(event2, Equals, zk.SessionConnected) |
2805 | |
2806 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2807 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2808 | |
2809 | event3 := <-watch3 |
2810 | - c.Assert(event3.Type, Equals, gozk.EVENT_SESSION) |
2811 | - c.Assert(event3.State, Equals, gozk.STATE_CONNECTED) |
2812 | + c.Assert(event3, Equals, zk.SessionConnected) |
2813 | |
2814 | - c.Assert(gozk.CountPendingWatches(), Equals, 3) |
2815 | + c.Assert(zk.CountPendingWatches(), Equals, 3) |
2816 | |
2817 | zk1.Close() |
2818 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
2819 | + c.Assert(zk.CountPendingWatches(), Equals, 2) |
2820 | zk2.Close() |
2821 | - c.Assert(gozk.CountPendingWatches(), Equals, 1) |
2822 | + c.Assert(zk.CountPendingWatches(), Equals, 1) |
2823 | zk3.Close() |
2824 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
2825 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
2826 | } |
2827 | |
2828 | -// Gozk injects a STATE_CLOSED event when zk.Close() is called, right |
2829 | +// Gozk injects a STATE_CLOSED event when conn.Close() is called, right |
2830 | // before the channel is closed. Closing the channel injects a nil |
2831 | // pointer, as usual for Go, so the STATE_CLOSED gives a chance to |
2832 | // know that a nil pointer is coming, and to stop the procedure. |
2833 | // Hopefully this procedure will avoid some nil-pointer references by |
2834 | // mistake. |
2835 | func (s *S) TestClosingStateInSessionWatch(c *C) { |
2836 | - zk, watch := s.init(c) |
2837 | + conn, watch := s.init(c) |
2838 | |
2839 | event := <-watch |
2840 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
2841 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
2842 | + c.Assert(event, Equals, zk.SessionConnected) |
2843 | |
2844 | - zk.Close() |
2845 | + conn.Close() |
2846 | event, ok := <-watch |
2847 | c.Assert(ok, Equals, false) |
2848 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
2849 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
2850 | -} |
2851 | - |
2852 | -func (s *S) TestEventString(c *C) { |
2853 | - var event gozk.Event |
2854 | - event = gozk.Event{gozk.EVENT_SESSION, "/path", gozk.STATE_CONNECTED} |
2855 | - c.Assert(event, Matches, "ZooKeeper connected") |
2856 | - event = gozk.Event{gozk.EVENT_CREATED, "/path", gozk.STATE_CONNECTED} |
2857 | - c.Assert(event, Matches, "ZooKeeper connected; path created: /path") |
2858 | - event = gozk.Event{-1, "/path", gozk.STATE_CLOSED} |
2859 | - c.Assert(event, Matches, "ZooKeeper connection closed") |
2860 | -} |
2861 | - |
2862 | -var okTests = []struct{gozk.Event; Ok bool}{ |
2863 | - {gozk.Event{gozk.EVENT_SESSION, "", gozk.STATE_CONNECTED}, true}, |
2864 | - {gozk.Event{gozk.EVENT_CREATED, "", gozk.STATE_CONNECTED}, true}, |
2865 | - {gozk.Event{0, "", gozk.STATE_CLOSED}, false}, |
2866 | - {gozk.Event{0, "", gozk.STATE_EXPIRED_SESSION}, false}, |
2867 | - {gozk.Event{0, "", gozk.STATE_AUTH_FAILED}, false}, |
2868 | + c.Assert(event, Equals, zk.SessionStatus(0)) |
2869 | +} |
2870 | + |
2871 | +var okTests = []struct { |
2872 | + zk.SessionStatus |
2873 | + Ok bool |
2874 | +}{ |
2875 | + {zk.SessionConnected, true}, |
2876 | + {0, false}, |
2877 | + {zk.SessionExpired, false}, |
2878 | + {zk.SessionAuthFailed, false}, |
2879 | } |
2880 | |
2881 | func (s *S) TestEventOk(c *C) { |
2882 | for _, t := range okTests { |
2883 | - c.Assert(t.Event.Ok(), Equals, t.Ok) |
2884 | + c.Assert(t.SessionStatus.Ok(), Equals, t.Ok) |
2885 | } |
2886 | } |
2887 | |
2888 | func (s *S) TestGetAndStat(c *C) { |
2889 | - zk, _ := s.init(c) |
2890 | + conn, _ := s.init(c) |
2891 | |
2892 | - data, stat, err := zk.Get("/zookeeper") |
2893 | + data, stat, err := conn.Get("/zookeeper") |
2894 | c.Assert(err, IsNil) |
2895 | c.Assert(data, Equals, "") |
2896 | c.Assert(stat.Czxid(), Equals, int64(0)) |
2897 | @@ -149,58 +136,58 @@ |
2898 | } |
2899 | |
2900 | func (s *S) TestGetAndError(c *C) { |
2901 | - zk, _ := s.init(c) |
2902 | + conn, _ := s.init(c) |
2903 | |
2904 | - data, stat, err := zk.Get("/non-existent") |
2905 | + data, stat, err := conn.Get("/non-existent") |
2906 | |
2907 | c.Assert(data, Equals, "") |
2908 | c.Assert(stat, IsNil) |
2909 | c.Assert(err, Matches, "no node") |
2910 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
2911 | + c.Assert(err, Equals, zk.ZNONODE) |
2912 | } |
2913 | |
2914 | func (s *S) TestCreateAndGet(c *C) { |
2915 | - zk, _ := s.init(c) |
2916 | + conn, _ := s.init(c) |
2917 | |
2918 | - path, err := zk.Create("/test-", "bababum", gozk.SEQUENCE|gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2919 | + path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2920 | c.Assert(err, IsNil) |
2921 | c.Assert(path, Matches, "/test-[0-9]+") |
2922 | |
2923 | // Check the error condition from Create(). |
2924 | - _, err = zk.Create(path, "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2925 | + _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2926 | c.Assert(err, Matches, "node exists") |
2927 | |
2928 | - data, _, err := zk.Get(path) |
2929 | + data, _, err := conn.Get(path) |
2930 | c.Assert(err, IsNil) |
2931 | c.Assert(data, Equals, "bababum") |
2932 | } |
2933 | |
2934 | func (s *S) TestCreateSetAndGet(c *C) { |
2935 | - zk, _ := s.init(c) |
2936 | + conn, _ := s.init(c) |
2937 | |
2938 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2939 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2940 | c.Assert(err, IsNil) |
2941 | |
2942 | - stat, err := zk.Set("/test", "bababum", -1) // Any version. |
2943 | + stat, err := conn.Set("/test", "bababum", -1) // Any version. |
2944 | c.Assert(err, IsNil) |
2945 | c.Assert(stat.Version(), Equals, int32(1)) |
2946 | |
2947 | - data, _, err := zk.Get("/test") |
2948 | + data, _, err := conn.Get("/test") |
2949 | c.Assert(err, IsNil) |
2950 | c.Assert(data, Equals, "bababum") |
2951 | } |
2952 | |
2953 | func (s *S) TestGetAndWatch(c *C) { |
2954 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
2955 | - |
2956 | - zk, _ := s.init(c) |
2957 | - |
2958 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2959 | - |
2960 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
2961 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
2962 | + |
2963 | + conn, _ := s.init(c) |
2964 | + |
2965 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2966 | + |
2967 | + _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
2968 | c.Assert(err, IsNil) |
2969 | |
2970 | - data, stat, watch, err := zk.GetW("/test") |
2971 | + data, stat, watch, err := conn.GetW("/test") |
2972 | c.Assert(err, IsNil) |
2973 | c.Assert(data, Equals, "one") |
2974 | c.Assert(stat.Version(), Equals, int32(0)) |
2975 | @@ -211,17 +198,17 @@ |
2976 | default: |
2977 | } |
2978 | |
2979 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
2980 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
2981 | |
2982 | - _, err = zk.Set("/test", "two", -1) |
2983 | + _, err = conn.Set("/test", "two", -1) |
2984 | c.Assert(err, IsNil) |
2985 | |
2986 | event := <-watch |
2987 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
2988 | - |
2989 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
2990 | - |
2991 | - data, _, watch, err = zk.GetW("/test") |
2992 | + c.Assert(event, Equals, zk.NodeChanged) |
2993 | + |
2994 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
2995 | + |
2996 | + data, _, watch, err = conn.GetW("/test") |
2997 | c.Assert(err, IsNil) |
2998 | c.Assert(data, Equals, "two") |
2999 | |
3000 | @@ -231,86 +218,86 @@ |
3001 | default: |
3002 | } |
3003 | |
3004 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3005 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3006 | |
3007 | - _, err = zk.Set("/test", "three", -1) |
3008 | + _, err = conn.Set("/test", "three", -1) |
3009 | c.Assert(err, IsNil) |
3010 | |
3011 | event = <-watch |
3012 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
3013 | + c.Assert(event, Equals, zk.NodeChanged) |
3014 | |
3015 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3016 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3017 | } |
3018 | |
3019 | func (s *S) TestGetAndWatchWithError(c *C) { |
3020 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3021 | - |
3022 | - zk, _ := s.init(c) |
3023 | - |
3024 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3025 | - |
3026 | - _, _, watch, err := zk.GetW("/test") |
3027 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3028 | + |
3029 | + conn, _ := s.init(c) |
3030 | + |
3031 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3032 | + |
3033 | + _, _, watch, err := conn.GetW("/test") |
3034 | c.Assert(err, NotNil) |
3035 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
3036 | + c.Assert(err, Equals, zk.ZNONODE) |
3037 | c.Assert(watch, IsNil) |
3038 | |
3039 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3040 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3041 | } |
3042 | |
3043 | func (s *S) TestCloseReleasesWatches(c *C) { |
3044 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3045 | - |
3046 | - zk, _ := s.init(c) |
3047 | - |
3048 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3049 | - |
3050 | - _, err := zk.Create("/test", "one", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3051 | - c.Assert(err, IsNil) |
3052 | - |
3053 | - _, _, _, err = zk.GetW("/test") |
3054 | - c.Assert(err, IsNil) |
3055 | - |
3056 | - c.Assert(gozk.CountPendingWatches(), Equals, 2) |
3057 | - |
3058 | - zk.Close() |
3059 | - |
3060 | - c.Assert(gozk.CountPendingWatches(), Equals, 0) |
3061 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3062 | + |
3063 | + conn, _ := s.init(c) |
3064 | + |
3065 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3066 | + |
3067 | + _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3068 | + c.Assert(err, IsNil) |
3069 | + |
3070 | + _, _, _, err = conn.GetW("/test") |
3071 | + c.Assert(err, IsNil) |
3072 | + |
3073 | + c.Assert(zk.CountPendingWatches(), Equals, 2) |
3074 | + |
3075 | + conn.Close() |
3076 | + |
3077 | + c.Assert(zk.CountPendingWatches(), Equals, 0) |
3078 | } |
3079 | |
3080 | // By default, the ZooKeeper C client will hang indefinitely if a |
3081 | // handler is closed twice. We get in the way and prevent it. |
3082 | func (s *S) TestClosingTwiceDoesntHang(c *C) { |
3083 | - zk, _ := s.init(c) |
3084 | - err := zk.Close() |
3085 | + conn, _ := s.init(c) |
3086 | + err := conn.Close() |
3087 | c.Assert(err, IsNil) |
3088 | - err = zk.Close() |
3089 | + err = conn.Close() |
3090 | c.Assert(err, NotNil) |
3091 | - c.Assert(err.Code(), Equals, gozk.ZCLOSING) |
3092 | + c.Assert(err, Equals, zk.ZCLOSING) |
3093 | } |
3094 | |
3095 | func (s *S) TestChildren(c *C) { |
3096 | - zk, _ := s.init(c) |
3097 | + conn, _ := s.init(c) |
3098 | |
3099 | - children, stat, err := zk.Children("/") |
3100 | + children, stat, err := conn.Children("/") |
3101 | c.Assert(err, IsNil) |
3102 | c.Assert(children, Equals, []string{"zookeeper"}) |
3103 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
3104 | |
3105 | - children, stat, err = zk.Children("/non-existent") |
3106 | + children, stat, err = conn.Children("/non-existent") |
3107 | c.Assert(err, NotNil) |
3108 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
3109 | + c.Assert(err, Equals, zk.ZNONODE) |
3110 | c.Assert(children, Equals, []string{}) |
3111 | - c.Assert(stat, Equals, nil) |
3112 | + c.Assert(stat, IsNil) |
3113 | } |
3114 | |
3115 | func (s *S) TestChildrenAndWatch(c *C) { |
3116 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3117 | - |
3118 | - zk, _ := s.init(c) |
3119 | - |
3120 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3121 | - |
3122 | - children, stat, watch, err := zk.ChildrenW("/") |
3123 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3124 | + |
3125 | + conn, _ := s.init(c) |
3126 | + |
3127 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3128 | + |
3129 | + children, stat, watch, err := conn.ChildrenW("/") |
3130 | c.Assert(err, IsNil) |
3131 | c.Assert(children, Equals, []string{"zookeeper"}) |
3132 | c.Assert(stat.NumChildren(), Equals, int32(1)) |
3133 | @@ -321,18 +308,17 @@ |
3134 | default: |
3135 | } |
3136 | |
3137 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3138 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3139 | |
3140 | - _, err = zk.Create("/test1", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3141 | + _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3142 | c.Assert(err, IsNil) |
3143 | |
3144 | event := <-watch |
3145 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
3146 | - c.Assert(event.Path, Equals, "/") |
3147 | - |
3148 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3149 | - |
3150 | - children, stat, watch, err = zk.ChildrenW("/") |
3151 | + c.Assert(event, Equals, zk.NodeChangedChild) |
3152 | + |
3153 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3154 | + |
3155 | + children, stat, watch, err = conn.ChildrenW("/") |
3156 | c.Assert(err, IsNil) |
3157 | c.Assert(stat.NumChildren(), Equals, int32(2)) |
3158 | |
3159 | @@ -345,57 +331,56 @@ |
3160 | default: |
3161 | } |
3162 | |
3163 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3164 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3165 | |
3166 | - _, err = zk.Create("/test2", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3167 | + _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3168 | c.Assert(err, IsNil) |
3169 | |
3170 | event = <-watch |
3171 | - c.Assert(event.Type, Equals, gozk.EVENT_CHILD) |
3172 | + c.Assert(event, Equals, zk.NodeChangedChild) |
3173 | |
3174 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3175 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3176 | } |
3177 | |
3178 | func (s *S) TestChildrenAndWatchWithError(c *C) { |
3179 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3180 | - |
3181 | - zk, _ := s.init(c) |
3182 | - |
3183 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3184 | - |
3185 | - _, stat, watch, err := zk.ChildrenW("/test") |
3186 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3187 | + |
3188 | + conn, _ := s.init(c) |
3189 | + |
3190 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3191 | + |
3192 | + _, stat, watch, err := conn.ChildrenW("/test") |
3193 | c.Assert(err, NotNil) |
3194 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
3195 | + c.Assert(err, Equals, zk.ZNONODE) |
3196 | c.Assert(watch, IsNil) |
3197 | c.Assert(stat, IsNil) |
3198 | |
3199 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3200 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3201 | } |
3202 | |
3203 | func (s *S) TestExists(c *C) { |
3204 | - zk, _ := s.init(c) |
3205 | - |
3206 | - stat, err := zk.Exists("/zookeeper") |
3207 | - c.Assert(err, IsNil) |
3208 | - c.Assert(stat.NumChildren(), Equals, int32(1)) |
3209 | - |
3210 | - stat, err = zk.Exists("/non-existent") |
3211 | + conn, _ := s.init(c) |
3212 | + |
3213 | + stat, err := conn.Exists("/non-existent") |
3214 | c.Assert(err, IsNil) |
3215 | c.Assert(stat, IsNil) |
3216 | + |
3217 | + stat, err = conn.Exists("/zookeeper") |
3218 | + c.Assert(err, IsNil) |
3219 | } |
3220 | |
3221 | func (s *S) TestExistsAndWatch(c *C) { |
3222 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3223 | - |
3224 | - zk, _ := s.init(c) |
3225 | - |
3226 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3227 | - |
3228 | - stat, watch, err := zk.ExistsW("/test") |
3229 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3230 | + |
3231 | + conn, _ := s.init(c) |
3232 | + |
3233 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3234 | + |
3235 | + stat, watch, err := conn.ExistsW("/test") |
3236 | c.Assert(err, IsNil) |
3237 | c.Assert(stat, IsNil) |
3238 | |
3239 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3240 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3241 | |
3242 | select { |
3243 | case <-watch: |
3244 | @@ -403,62 +388,61 @@ |
3245 | default: |
3246 | } |
3247 | |
3248 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3249 | + _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3250 | c.Assert(err, IsNil) |
3251 | |
3252 | event := <-watch |
3253 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
3254 | - c.Assert(event.Path, Equals, "/test") |
3255 | - |
3256 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3257 | - |
3258 | - stat, watch, err = zk.ExistsW("/test") |
3259 | + c.Assert(event, Equals, zk.NodeCreated) |
3260 | + |
3261 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3262 | + |
3263 | + stat, watch, err = conn.ExistsW("/test") |
3264 | c.Assert(err, IsNil) |
3265 | c.Assert(stat, NotNil) |
3266 | c.Assert(stat.NumChildren(), Equals, int32(0)) |
3267 | |
3268 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3269 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3270 | } |
3271 | |
3272 | func (s *S) TestExistsAndWatchWithError(c *C) { |
3273 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3274 | - |
3275 | - zk, _ := s.init(c) |
3276 | - |
3277 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3278 | - |
3279 | - stat, watch, err := zk.ExistsW("///") |
3280 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3281 | + |
3282 | + conn, _ := s.init(c) |
3283 | + |
3284 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3285 | + |
3286 | + stat, watch, err := conn.ExistsW("///") |
3287 | c.Assert(err, NotNil) |
3288 | - c.Assert(err.Code(), Equals, gozk.ZBADARGUMENTS) |
3289 | + c.Assert(err, Equals, zk.ZBADARGUMENTS) |
3290 | c.Assert(stat, IsNil) |
3291 | c.Assert(watch, IsNil) |
3292 | |
3293 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3294 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3295 | } |
3296 | |
3297 | func (s *S) TestDelete(c *C) { |
3298 | - zk, _ := s.init(c) |
3299 | - |
3300 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3301 | - c.Assert(err, IsNil) |
3302 | - |
3303 | - err = zk.Delete("/test", 5) |
3304 | - c.Assert(err, NotNil) |
3305 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
3306 | - |
3307 | - err = zk.Delete("/test", -1) |
3308 | - c.Assert(err, IsNil) |
3309 | - |
3310 | - err = zk.Delete("/test", -1) |
3311 | - c.Assert(err, NotNil) |
3312 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
3313 | + conn, _ := s.init(c) |
3314 | + |
3315 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3316 | + c.Assert(err, IsNil) |
3317 | + |
3318 | + err = conn.Delete("/test", 5) |
3319 | + c.Assert(err, NotNil) |
3320 | + c.Assert(err, Equals, zk.ZBADVERSION) |
3321 | + |
3322 | + err = conn.Delete("/test", -1) |
3323 | + c.Assert(err, IsNil) |
3324 | + |
3325 | + err = conn.Delete("/test", -1) |
3326 | + c.Assert(err, NotNil) |
3327 | + c.Assert(err, Equals, zk.ZNONODE) |
3328 | } |
3329 | |
3330 | func (s *S) TestClientIdAndReInit(c *C) { |
3331 | zk1, _ := s.init(c) |
3332 | clientId1 := zk1.ClientId() |
3333 | |
3334 | - zk2, _, err := gozk.ReInit(s.zkAddr, 5e9, clientId1) |
3335 | + zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1) |
3336 | c.Assert(err, IsNil) |
3337 | defer zk2.Close() |
3338 | clientId2 := zk2.ClientId() |
3339 | @@ -469,110 +453,113 @@ |
3340 | // Surprisingly for some (including myself, initially), the watch |
3341 | // returned by the exists method actually fires on data changes too. |
3342 | func (s *S) TestExistsWatchOnDataChange(c *C) { |
3343 | - zk, _ := s.init(c) |
3344 | - |
3345 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3346 | - c.Assert(err, IsNil) |
3347 | - |
3348 | - _, watch, err := zk.ExistsW("/test") |
3349 | - c.Assert(err, IsNil) |
3350 | - |
3351 | - _, err = zk.Set("/test", "new", -1) |
3352 | + conn, _ := s.init(c) |
3353 | + |
3354 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3355 | + c.Assert(err, IsNil) |
3356 | + |
3357 | + _, watch, err := conn.ExistsW("/test") |
3358 | + c.Assert(err, IsNil) |
3359 | + |
3360 | + _, err = conn.Set("/test", "new", -1) |
3361 | c.Assert(err, IsNil) |
3362 | |
3363 | event := <-watch |
3364 | |
3365 | - c.Assert(event.Path, Equals, "/test") |
3366 | - c.Assert(event.Type, Equals, gozk.EVENT_CHANGED) |
3367 | + c.Assert(event, Equals, zk.NodeChanged) |
3368 | } |
3369 | |
3370 | func (s *S) TestACL(c *C) { |
3371 | - zk, _ := s.init(c) |
3372 | - |
3373 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3374 | - c.Assert(err, IsNil) |
3375 | - |
3376 | - acl, stat, err := zk.ACL("/test") |
3377 | - c.Assert(err, IsNil) |
3378 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_ALL)) |
3379 | + conn, _ := s.init(c) |
3380 | + |
3381 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3382 | + c.Assert(err, IsNil) |
3383 | + |
3384 | + acl, stat, err := conn.ACL("/test") |
3385 | + c.Assert(err, IsNil) |
3386 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) |
3387 | c.Assert(stat, NotNil) |
3388 | c.Assert(stat.Version(), Equals, int32(0)) |
3389 | |
3390 | - acl, stat, err = zk.ACL("/non-existent") |
3391 | + acl, stat, err = conn.ACL("/non-existent") |
3392 | c.Assert(err, NotNil) |
3393 | - c.Assert(err.Code(), Equals, gozk.ZNONODE) |
3394 | + c.Assert(err, Equals, zk.ZNONODE) |
3395 | c.Assert(acl, IsNil) |
3396 | c.Assert(stat, IsNil) |
3397 | + c.Log("finished TestACL") |
3398 | } |
3399 | |
3400 | func (s *S) TestSetACL(c *C) { |
3401 | - zk, _ := s.init(c) |
3402 | + conn, _ := s.init(c) |
3403 | |
3404 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3405 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3406 | c.Assert(err, IsNil) |
3407 | |
3408 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_ALL), 5) |
3409 | + err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5) |
3410 | c.Assert(err, NotNil) |
3411 | - c.Assert(err.Code(), Equals, gozk.ZBADVERSION) |
3412 | - |
3413 | - err = zk.SetACL("/test", gozk.WorldACL(gozk.PERM_READ), -1) |
3414 | - c.Assert(err, IsNil) |
3415 | - |
3416 | - acl, _, err := zk.ACL("/test") |
3417 | - c.Assert(err, IsNil) |
3418 | - c.Assert(acl, Equals, gozk.WorldACL(gozk.PERM_READ)) |
3419 | + c.Assert(err, Equals, zk.ZBADVERSION) |
3420 | + |
3421 | + err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1) |
3422 | + c.Assert(err, IsNil) |
3423 | + |
3424 | + acl, _, err := conn.ACL("/test") |
3425 | + c.Assert(err, IsNil) |
3426 | + c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) |
3427 | } |
3428 | |
3429 | func (s *S) TestAddAuth(c *C) { |
3430 | - zk, _ := s.init(c) |
3431 | - |
3432 | - acl := []gozk.ACL{{gozk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
3433 | - |
3434 | - _, err := zk.Create("/test", "", gozk.EPHEMERAL, acl) |
3435 | + conn, _ := s.init(c) |
3436 | + |
3437 | + acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} |
3438 | + |
3439 | + _, err := conn.Create("/test", "", zk.EPHEMERAL, acl) |
3440 | c.Assert(err, IsNil) |
3441 | |
3442 | - _, _, err = zk.Get("/test") |
3443 | + _, _, err = conn.Get("/test") |
3444 | c.Assert(err, NotNil) |
3445 | - c.Assert(err.Code(), Equals, gozk.ZNOAUTH) |
3446 | + c.Assert(err, Equals, zk.ZNOAUTH) |
3447 | |
3448 | - err = zk.AddAuth("digest", "joe:passwd") |
3449 | + err = conn.AddAuth("digest", "joe:passwd") |
3450 | c.Assert(err, IsNil) |
3451 | |
3452 | - _, _, err = zk.Get("/test") |
3453 | + _, _, err = conn.Get("/test") |
3454 | c.Assert(err, IsNil) |
3455 | } |
3456 | |
3457 | func (s *S) TestWatchOnReconnection(c *C) { |
3458 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3459 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3460 | |
3461 | - zk, session := s.init(c) |
3462 | + conn, session := s.init(c) |
3463 | |
3464 | event := <-session |
3465 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
3466 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
3467 | - |
3468 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3469 | - |
3470 | - stat, watch, err := zk.ExistsW("/test") |
3471 | + c.Assert(event, Equals, zk.SessionConnected) |
3472 | + |
3473 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3474 | + |
3475 | + stat, watch, err := conn.ExistsW("/test") |
3476 | c.Assert(err, IsNil) |
3477 | c.Assert(stat, IsNil) |
3478 | |
3479 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3480 | - |
3481 | - s.StopZK() |
3482 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3483 | + |
3484 | + err = s.zkServer.Stop() |
3485 | + c.Assert(err, IsNil) |
3486 | + |
3487 | time.Sleep(2e9) |
3488 | - s.StartZK() |
3489 | - |
3490 | - // The session channel should receive the reconnection notification, |
3491 | + |
3492 | + err = s.zkServer.Start() |
3493 | + c.Assert(err, IsNil) |
3494 | + |
3495 | + // The session channel should receive the reconnection notification. |
3496 | select { |
3497 | case event := <-session: |
3498 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTING) |
3499 | + c.Assert(event, Equals, zk.SessionConnecting) |
3500 | case <-time.After(3e9): |
3501 | c.Fatal("Session watch didn't fire") |
3502 | } |
3503 | select { |
3504 | case event := <-session: |
3505 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
3506 | + c.Assert(event, Equals, zk.SessionConnected) |
3507 | case <-time.After(3e9): |
3508 | c.Fatal("Session watch didn't fire") |
3509 | } |
3510 | @@ -585,40 +572,38 @@ |
3511 | } |
3512 | |
3513 | // And it should still work. |
3514 | - _, err = zk.Create("/test", "", gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)) |
3515 | + _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) |
3516 | c.Assert(err, IsNil) |
3517 | |
3518 | - event = <-watch |
3519 | - c.Assert(event.Type, Equals, gozk.EVENT_CREATED) |
3520 | - c.Assert(event.Path, Equals, "/test") |
3521 | + nodeStatus := <-watch |
3522 | + c.Assert(nodeStatus, Equals, zk.NodeCreated) |
3523 | |
3524 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3525 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3526 | } |
3527 | |
3528 | func (s *S) TestWatchOnSessionExpiration(c *C) { |
3529 | - c.Check(gozk.CountPendingWatches(), Equals, 0) |
3530 | + c.Check(zk.CountPendingWatches(), Equals, 0) |
3531 | |
3532 | - zk, session := s.init(c) |
3533 | + conn, session := s.init(c) |
3534 | |
3535 | event := <-session |
3536 | - c.Assert(event.Type, Equals, gozk.EVENT_SESSION) |
3537 | - c.Assert(event.State, Equals, gozk.STATE_CONNECTED) |
3538 | - |
3539 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3540 | - |
3541 | - stat, watch, err := zk.ExistsW("/test") |
3542 | + c.Assert(event, Equals, zk.SessionConnected) |
3543 | + |
3544 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3545 | + |
3546 | + stat, watch, err := conn.ExistsW("/test") |
3547 | c.Assert(err, IsNil) |
3548 | c.Assert(stat, IsNil) |
3549 | |
3550 | - c.Check(gozk.CountPendingWatches(), Equals, 2) |
3551 | + c.Check(zk.CountPendingWatches(), Equals, 2) |
3552 | |
3553 | // Use expiration trick described in the FAQ. |
3554 | - clientId := zk.ClientId() |
3555 | - zk2, session2, err := gozk.ReInit(s.zkAddr, 5e9, clientId) |
3556 | + clientId := conn.ClientId() |
3557 | + zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId) |
3558 | |
3559 | for event := range session2 { |
3560 | c.Log("Event from overlapping session: ", event) |
3561 | - if event.State == gozk.STATE_CONNECTED { |
3562 | + if event == zk.SessionConnected { |
3563 | // Wait for zk to process the connection. |
3564 | // Not reliable without this. :-( |
3565 | time.Sleep(1e9) |
3566 | @@ -627,21 +612,21 @@ |
3567 | } |
3568 | for event := range session { |
3569 | c.Log("Event from primary session: ", event) |
3570 | - if event.State == gozk.STATE_EXPIRED_SESSION { |
3571 | + if event == zk.SessionExpired { |
3572 | break |
3573 | } |
3574 | } |
3575 | |
3576 | select { |
3577 | - case event := <-watch: |
3578 | - c.Assert(event.State, Equals, gozk.STATE_EXPIRED_SESSION) |
3579 | + case event, ok := <-watch: |
3580 | + c.Assert(event, Equals, zk.NodeStatus(0)) |
3581 | + c.Assert(ok, Equals, false) |
3582 | case <-time.After(3e9): |
3583 | c.Fatal("Watch event didn't fire") |
3584 | } |
3585 | |
3586 | - event = <-watch |
3587 | - c.Assert(event.Type, Equals, gozk.EVENT_CLOSED) |
3588 | - c.Assert(event.State, Equals, gozk.STATE_CLOSED) |
3589 | + nodeStatus := <-watch |
3590 | + c.Assert(nodeStatus, Equals, zk.NodeStatus(0)) |
3591 | |
3592 | - c.Check(gozk.CountPendingWatches(), Equals, 1) |
3593 | + c.Check(zk.CountPendingWatches(), Equals, 1) |
3594 | } |
2692 - event := Event{ event_type) , data.event_ path), connection_ state),
2693 - Type: int(data.
2694 - Path: C.GoString(
2695 - State: int(data.
2696 - }
The key problem with this proposal is the trashing of information coming
from the server. It's discarding not only why e.g. a state is being
trashed, but also details like the path which is related to an event.
This path isn't just being returned from the path that called the watch.
It's actually part of the wire protocol, and comes from the server side.
Trashing both the session state error information and the paths like that
is not a good idea.