Merge lp:~rogpeppe/gozk/safe-close into lp:gozk/zookeeper
- safe-close
- Merge into zookeeper
Status: | Merged |
---|---|
Merged at revision: | 31 |
Proposed branch: | lp:~rogpeppe/gozk/safe-close |
Merge into: | lp:gozk/zookeeper |
Diff against target: |
523 lines (+301/-13) 5 files modified
close_test.go (+220/-0) retry_test.go (+3/-3) suite_test.go (+4/-3) zk.go (+68/-1) zk_test.go (+6/-6) |
To merge this branch: | bzr merge lp:~rogpeppe/gozk/safe-close |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
The Go Language Gophers | Pending | ||
Review via email: mp+94812@code.launchpad.net |
Commit message
Description of the change
zookeeper: make Conn.Close safe to call concurrently with other operations.
I am concerned at how complex the test for this is,
relative to the simplicity of the actual code change.
(and it's also potentially fragile on a heavily loaded machine).
That said, I can't think of another way to test the
operations directly, so I'm leaving it in.
Better suggestions welcome.
Roger Peppe (rogpeppe) wrote : | # |
Gustavo Niemeyer (niemeyer) wrote : | # |
https:/
File close_test.go (right):
https:/
close_test.go:96: time.Sleep(0.05e9)
How can you tell this is really unblocking at the right time and
exercising what you intend it to at all?
Roger Peppe (rogpeppe) wrote : | # |
Please take a look.
- 33. By Roger Peppe
-
improve close_test comment.
simplify io.Copy logic.
William Reade (fwereade) wrote : | # |
This essentially LGTM; I don't think that it's a serious problem that
the tests can theoretically fail under "enough" load. So it comes down
to a question of practical reliability: for the sake of argument, what
if we shoot for an observed failure rate of <1% on your local machine
under "normal" load, and bump up the sleeps as needed if they turn out
not to be good enough for clean testing in practice (say, on ARM ;))?
Gustavo Niemeyer (niemeyer) wrote : | # |
LGTM, sorry for the delay.
Roger Peppe (rogpeppe) wrote : | # |
i forgot to publish this comment.
https:/
File close_test.go (right):
https:/
close_test.go:96: time.Sleep(0.05e9)
On 2012/02/27 18:03:17, niemeyer wrote:
> How can you tell this is really unblocking at the right time and
exercising what
> you intend it to at all?
the idea is that any request that requests or changes a zookeeper node
must make at least one round trip to the server. so we interpose a proxy
between the client and the server which can stop all incoming traffic on
demand, thus hopefully blocking the request until we want it to unblock.
we assume that all requests take less than 0.1s to complete, thus when
we wait below, neither of the above goroutines should complete within
the allotted time (the request because it's waiting for a reply from the
server and the close because it's waiting for the request to complete).
if the locking doesn't work, the Close will return early. if the proxy
blocking doesn't work, the request will return early.
when we reenable incoming messages from the server, both goroutines
should complete (we can't tell which completes first - i don't think
that's observable) but i think that the fact that the close blocked
waiting for the request is sufficient.
i've changed the above comment to try to make this clearer.
as i said in the description of the CL, i think this a lot of mechanism
to test something that's quite simple to verify by eye, but i haven't
yet thought of a better way.
one way could be to test Close without testing all the individual
operations - simpler but less complete.
Roger Peppe (rogpeppe) wrote : | # |
*** Submitted:
zookeeper: make Conn.Close safe to call concurrently with other
operations.
I am concerned at how complex the test for this is,
relative to the simplicity of the actual code change.
(and it's also potentially fragile on a heavily loaded machine).
That said, I can't think of another way to test the
operations directly, so I'm leaving it in.
Better suggestions welcome.
R=niemeyer, fwereade
CC=
https:/
Preview Diff
1 | === added file 'close_test.go' | |||
2 | --- close_test.go 1970-01-01 00:00:00 +0000 | |||
3 | +++ close_test.go 2012-02-28 09:18:17 +0000 | |||
4 | @@ -0,0 +1,220 @@ | |||
5 | 1 | package zookeeper_test | ||
6 | 2 | |||
7 | 3 | import ( | ||
8 | 4 | "io" | ||
9 | 5 | . "launchpad.net/gocheck" | ||
10 | 6 | zk "launchpad.net/gozk/zookeeper" | ||
11 | 7 | "log" | ||
12 | 8 | "net" | ||
13 | 9 | "time" | ||
14 | 10 | ) | ||
15 | 11 | |||
16 | 12 | // requestFuncs holds all the requests that take a read lock | ||
17 | 13 | // on the zk connection except those that don't actually | ||
18 | 14 | // make a round trip to the server. | ||
19 | 15 | var requestFuncs = []func(conn *zk.Conn, path string) error{ | ||
20 | 16 | func(conn *zk.Conn, path string) error { | ||
21 | 17 | _, err := conn.Create(path, "", 0, nil) | ||
22 | 18 | return err | ||
23 | 19 | }, | ||
24 | 20 | func(conn *zk.Conn, path string) error { | ||
25 | 21 | _, err := conn.Exists(path) | ||
26 | 22 | return err | ||
27 | 23 | }, | ||
28 | 24 | func(conn *zk.Conn, path string) error { | ||
29 | 25 | _, _, err := conn.ExistsW(path) | ||
30 | 26 | return err | ||
31 | 27 | }, | ||
32 | 28 | func(conn *zk.Conn, path string) error { | ||
33 | 29 | _, _, err := conn.Get(path) | ||
34 | 30 | return err | ||
35 | 31 | }, | ||
36 | 32 | func(conn *zk.Conn, path string) error { | ||
37 | 33 | _, _, _, err := conn.GetW(path) | ||
38 | 34 | return err | ||
39 | 35 | }, | ||
40 | 36 | func(conn *zk.Conn, path string) error { | ||
41 | 37 | _, _, err := conn.Children(path) | ||
42 | 38 | return err | ||
43 | 39 | }, | ||
44 | 40 | func(conn *zk.Conn, path string) error { | ||
45 | 41 | _, _, _, err := conn.ChildrenW(path) | ||
46 | 42 | return err | ||
47 | 43 | }, | ||
48 | 44 | func(conn *zk.Conn, path string) error { | ||
49 | 45 | _, err := conn.Set(path, "", 0) | ||
50 | 46 | return err | ||
51 | 47 | }, | ||
52 | 48 | func(conn *zk.Conn, path string) error { | ||
53 | 49 | _, _, err := conn.ACL(path) | ||
54 | 50 | return err | ||
55 | 51 | }, | ||
56 | 52 | func(conn *zk.Conn, path string) error { | ||
57 | 53 | return conn.SetACL(path, []zk.ACL{{ | ||
58 | 54 | Perms: zk.PERM_ALL, | ||
59 | 55 | Scheme: "digest", | ||
60 | 56 | Id: "foo", | ||
61 | 57 | }}, 0) | ||
62 | 58 | }, | ||
63 | 59 | func(conn *zk.Conn, path string) error { | ||
64 | 60 | return conn.Delete(path, 0) | ||
65 | 61 | }, | ||
66 | 62 | } | ||
67 | 63 | |||
68 | 64 | func (s *S) TestConcurrentClose(c *C) { | ||
69 | 65 | // make sure the server is ready to receive connections. | ||
70 | 66 | s.init(c) | ||
71 | 67 | |||
72 | 68 | // Close should wait until all outstanding requests have | ||
73 | 69 | // completed before returning. The idea of this test is that | ||
74 | 70 | // any request that requests or changes a zookeeper node must | ||
75 | 71 | // make at least one round trip to the server, so we interpose a | ||
76 | 72 | // proxy between the client and the server which can stop all | ||
77 | 73 | // incoming traffic on demand, thus blocking the request until | ||
78 | 74 | // we want it to unblock. | ||
79 | 75 | // | ||
80 | 76 | // We assume that all requests take less than 0.1s to complete, | ||
81 | 77 | // thus when we wait below, neither of the above goroutines | ||
82 | 78 | // should complete within the allotted time (the request because | ||
83 | 79 | // it's waiting for a reply from the server and the close | ||
84 | 80 | // because it's waiting for the request to complete). If the | ||
85 | 81 | // locking doesn't work, the Close will return early. If the | ||
86 | 82 | // proxy blocking doesn't work, the request will return early. | ||
87 | 83 | // | ||
88 | 84 | // When we reenable incoming messages from the server, both | ||
89 | 85 | // goroutines should complete. We can't tell which completes | ||
90 | 86 | // first, but the fact that the close blocked is sufficient to | ||
91 | 87 | // tell that the locking is working correctly. | ||
92 | 88 | for i, f := range requestFuncs { | ||
93 | 89 | c.Logf("iter %d", i) | ||
94 | 90 | p := newProxy(c, s.zkAddr) | ||
95 | 91 | conn, watch, err := zk.Dial(p.addr(), 5e9) | ||
96 | 92 | c.Assert(err, IsNil) | ||
97 | 93 | c.Assert((<-watch).Ok(), Equals, true) | ||
98 | 94 | |||
99 | 95 | // sanity check that the connection is actually | ||
100 | 96 | // up and running. | ||
101 | 97 | _, err = conn.Exists("/nothing") | ||
102 | 98 | c.Assert(err, IsNil) | ||
103 | 99 | |||
104 | 100 | p.stopIncoming() | ||
105 | 101 | reqDone := make(chan bool) | ||
106 | 102 | closeDone := make(chan bool) | ||
107 | 103 | go func() { | ||
108 | 104 | f(conn, "/closetest") | ||
109 | 105 | reqDone <- true | ||
110 | 106 | }() | ||
111 | 107 | go func() { | ||
112 | 108 | // sleep for long enough for the request to be initiated and the read lock taken. | ||
113 | 109 | time.Sleep(0.05e9) | ||
114 | 110 | conn.Close() | ||
115 | 111 | closeDone <- true | ||
116 | 112 | }() | ||
117 | 113 | select { | ||
118 | 114 | case <-reqDone: | ||
119 | 115 | c.Fatalf("request %d finished early", i) | ||
120 | 116 | case <-closeDone: | ||
121 | 117 | c.Fatalf("request %d close finished early", i) | ||
122 | 118 | case <-time.After(0.1e9): | ||
123 | 119 | } | ||
124 | 120 | p.startIncoming() | ||
125 | 121 | for reqDone != nil || closeDone != nil { | ||
126 | 122 | select { | ||
127 | 123 | case <-reqDone: | ||
128 | 124 | reqDone = nil | ||
129 | 125 | case <-closeDone: | ||
130 | 126 | closeDone = nil | ||
131 | 127 | case <-time.After(0.4e9): | ||
132 | 128 | c.Fatalf("request %d timed out waiting for req (%p) and close(%p)", i, reqDone, closeDone) | ||
133 | 129 | } | ||
134 | 130 | } | ||
135 | 131 | p.close() | ||
136 | 132 | err = f(conn, "/closetest") | ||
137 | 133 | c.Check(err, Equals, zk.ZCLOSING) | ||
138 | 134 | } | ||
139 | 135 | } | ||
140 | 136 | |||
141 | 137 | type proxy struct { | ||
142 | 138 | stop, start chan bool | ||
143 | 139 | listener net.Listener | ||
144 | 140 | } | ||
145 | 141 | |||
146 | 142 | // newProxy will listen on proxyAddr and connect its client to dstAddr, and return | ||
147 | 143 | // a proxy instance that can be used to control the connection. | ||
148 | 144 | func newProxy(c *C, dstAddr string) *proxy { | ||
149 | 145 | listener, err := net.Listen("tcp", "127.0.0.1:0") | ||
150 | 146 | c.Assert(err, IsNil) | ||
151 | 147 | p := &proxy{ | ||
152 | 148 | stop: make(chan bool, 1), | ||
153 | 149 | start: make(chan bool, 1), | ||
154 | 150 | listener: listener, | ||
155 | 151 | } | ||
156 | 152 | |||
157 | 153 | go func() { | ||
158 | 154 | for { | ||
159 | 155 | client, err := p.listener.Accept() | ||
160 | 156 | if err != nil { | ||
161 | 157 | // Ignore the error, because the connection will fail anyway. | ||
162 | 158 | return | ||
163 | 159 | } | ||
164 | 160 | go func() { | ||
165 | 161 | defer client.Close() | ||
166 | 162 | server, err := net.Dial("tcp", dstAddr) | ||
167 | 163 | if err != nil { | ||
168 | 164 | log.Printf("cannot dial %q: %v", dstAddr, err) | ||
169 | 165 | return | ||
170 | 166 | } | ||
171 | 167 | defer server.Close() | ||
172 | 168 | go io.Copy(&haltableWriter{ | ||
173 | 169 | w: client, | ||
174 | 170 | stop: p.stop, | ||
175 | 171 | start: p.start}, | ||
176 | 172 | server) | ||
177 | 173 | // When the client is closed, the deferred closes will | ||
178 | 174 | // take down the other io.Copy too. | ||
179 | 175 | io.Copy(server, client) | ||
180 | 176 | }() | ||
181 | 177 | } | ||
182 | 178 | }() | ||
183 | 179 | return p | ||
184 | 180 | } | ||
185 | 181 | |||
186 | 182 | func (p *proxy) close() error { | ||
187 | 183 | return p.listener.Close() | ||
188 | 184 | } | ||
189 | 185 | |||
190 | 186 | func (p *proxy) addr() string { | ||
191 | 187 | return p.listener.Addr().String() | ||
192 | 188 | } | ||
193 | 189 | |||
194 | 190 | func (p *proxy) stopIncoming() { | ||
195 | 191 | if p.stop == nil { | ||
196 | 192 | panic("cannot stop twice") | ||
197 | 193 | } | ||
198 | 194 | p.stop <- true | ||
199 | 195 | p.stop = nil | ||
200 | 196 | } | ||
201 | 197 | |||
202 | 198 | func (p *proxy) startIncoming() { | ||
203 | 199 | if p.start == nil { | ||
204 | 200 | panic("cannot start twice") | ||
205 | 201 | } | ||
206 | 202 | p.start <- true | ||
207 | 203 | p.start = nil | ||
208 | 204 | } | ||
209 | 205 | |||
210 | 206 | type haltableWriter struct { | ||
211 | 207 | w io.Writer | ||
212 | 208 | stop, start chan bool | ||
213 | 209 | } | ||
214 | 210 | |||
215 | 211 | func (w *haltableWriter) Write(buf []byte) (int, error) { | ||
216 | 212 | select { | ||
217 | 213 | case <-w.stop: | ||
218 | 214 | w.stop <- true | ||
219 | 215 | <-w.start | ||
220 | 216 | w.start <- true | ||
221 | 217 | default: | ||
222 | 218 | } | ||
223 | 219 | return w.w.Write(buf) | ||
224 | 220 | } | ||
225 | 0 | 221 | ||
226 | === modified file 'retry_test.go' | |||
227 | --- retry_test.go 2012-02-15 17:18:34 +0000 | |||
228 | +++ retry_test.go 2012-02-28 09:18:17 +0000 | |||
229 | @@ -25,7 +25,7 @@ | |||
230 | 25 | 25 | ||
231 | 26 | acl, _, err := conn.ACL("/test") | 26 | acl, _, err := conn.ACL("/test") |
232 | 27 | c.Assert(err, IsNil) | 27 | c.Assert(err, IsNil) |
234 | 28 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) | 28 | c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL)) |
235 | 29 | } | 29 | } |
236 | 30 | 30 | ||
237 | 31 | func (s *S) TestRetryChangeSetting(c *C) { | 31 | func (s *S) TestRetryChangeSetting(c *C) { |
238 | @@ -52,7 +52,7 @@ | |||
239 | 52 | // ACL was unchanged by RetryChange(). | 52 | // ACL was unchanged by RetryChange(). |
240 | 53 | acl, _, err := conn.ACL("/test") | 53 | acl, _, err := conn.ACL("/test") |
241 | 54 | c.Assert(err, IsNil) | 54 | c.Assert(err, IsNil) |
243 | 55 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) | 55 | c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL)) |
244 | 56 | } | 56 | } |
245 | 57 | 57 | ||
246 | 58 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { | 58 | func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { |
247 | @@ -177,7 +177,7 @@ | |||
248 | 177 | // Should be the new ACL. | 177 | // Should be the new ACL. |
249 | 178 | acl, _, err := conn.ACL("/test") | 178 | acl, _, err := conn.ACL("/test") |
250 | 179 | c.Assert(err, IsNil) | 179 | c.Assert(err, IsNil) |
252 | 180 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) | 180 | c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_READ)) |
253 | 181 | } | 181 | } |
254 | 182 | 182 | ||
255 | 183 | func (s *S) TestRetryChangeErrorInCallback(c *C) { | 183 | func (s *S) TestRetryChangeErrorInCallback(c *C) { |
256 | 184 | 184 | ||
257 | === modified file 'suite_test.go' | |||
258 | --- suite_test.go 2012-02-10 13:33:24 +0000 | |||
259 | +++ suite_test.go 2012-02-28 09:18:17 +0000 | |||
260 | @@ -31,6 +31,7 @@ | |||
261 | 31 | var logLevel = 0 //zk.LOG_ERROR | 31 | var logLevel = 0 //zk.LOG_ERROR |
262 | 32 | 32 | ||
263 | 33 | func (s *S) init(c *C) (*zk.Conn, chan zk.Event) { | 33 | func (s *S) init(c *C) (*zk.Conn, chan zk.Event) { |
264 | 34 | c.Logf("init dialling %q", s.zkAddr) | ||
265 | 34 | conn, watch, err := zk.Dial(s.zkAddr, 5e9) | 35 | conn, watch, err := zk.Dial(s.zkAddr, 5e9) |
266 | 35 | c.Assert(err, IsNil) | 36 | c.Assert(err, IsNil) |
267 | 36 | s.handles = append(s.handles, conn) | 37 | s.handles = append(s.handles, conn) |
268 | @@ -71,7 +72,7 @@ | |||
269 | 71 | 72 | ||
270 | 72 | func (s *S) SetUpTest(c *C) { | 73 | func (s *S) SetUpTest(c *C) { |
271 | 73 | c.Assert(zk.CountPendingWatches(), Equals, 0, | 74 | c.Assert(zk.CountPendingWatches(), Equals, 0, |
273 | 74 | Bug("Test got a dirty watch state before running!")) | 75 | Commentf("Test got a dirty watch state before running!")) |
274 | 75 | zk.SetLogLevel(logLevel) | 76 | zk.SetLogLevel(logLevel) |
275 | 76 | } | 77 | } |
276 | 77 | 78 | ||
277 | @@ -95,7 +96,7 @@ | |||
278 | 95 | s.handles = make([]*zk.Conn, 0) | 96 | s.handles = make([]*zk.Conn, 0) |
279 | 96 | 97 | ||
280 | 97 | c.Assert(zk.CountPendingWatches(), Equals, 0, | 98 | c.Assert(zk.CountPendingWatches(), Equals, 0, |
282 | 98 | Bug("Test left live watches behind!")) | 99 | Commentf("Test left live watches behind!")) |
283 | 99 | } | 100 | } |
284 | 100 | 101 | ||
285 | 101 | // We use the suite set up and tear down to manage a custom ZooKeeper | 102 | // We use the suite set up and tear down to manage a custom ZooKeeper |
286 | @@ -104,7 +105,7 @@ | |||
287 | 104 | var err error | 105 | var err error |
288 | 105 | s.deadWatches = make(chan bool) | 106 | s.deadWatches = make(chan bool) |
289 | 106 | 107 | ||
291 | 107 | // N.B. We meed to create a subdirectory because zk.CreateServer | 108 | // N.B. We need to create a subdirectory because zk.CreateServer |
292 | 108 | // insists on creating its own directory. | 109 | // insists on creating its own directory. |
293 | 109 | 110 | ||
294 | 110 | s.zkTestRoot = c.MkDir() + "/zk" | 111 | s.zkTestRoot = c.MkDir() + "/zk" |
295 | 111 | 112 | ||
296 | === modified file 'zk.go' | |||
297 | --- zk.go 2012-02-15 17:51:23 +0000 | |||
298 | +++ zk.go 2012-02-28 09:18:17 +0000 | |||
299 | @@ -32,7 +32,7 @@ | |||
300 | 32 | watchChannels map[uintptr]chan Event | 32 | watchChannels map[uintptr]chan Event |
301 | 33 | sessionWatchId uintptr | 33 | sessionWatchId uintptr |
302 | 34 | handle *C.zhandle_t | 34 | handle *C.zhandle_t |
304 | 35 | mutex sync.Mutex | 35 | mutex sync.RWMutex |
305 | 36 | } | 36 | } |
306 | 37 | 37 | ||
307 | 38 | // ClientId represents an established ZooKeeper session. It can be | 38 | // ClientId represents an established ZooKeeper session. It can be |
308 | @@ -429,6 +429,8 @@ | |||
309 | 429 | // ClientId returns the client ID for the existing session with ZooKeeper. | 429 | // ClientId returns the client ID for the existing session with ZooKeeper. |
310 | 430 | // This is useful to reestablish an existing session via ReInit. | 430 | // This is useful to reestablish an existing session via ReInit. |
311 | 431 | func (conn *Conn) ClientId() *ClientId { | 431 | func (conn *Conn) ClientId() *ClientId { |
312 | 432 | conn.mutex.RLock() | ||
313 | 433 | defer conn.mutex.RUnlock() | ||
314 | 432 | return &ClientId{*C.zoo_client_id(conn.handle)} | 434 | return &ClientId{*C.zoo_client_id(conn.handle)} |
315 | 433 | } | 435 | } |
316 | 434 | 436 | ||
317 | @@ -459,6 +461,11 @@ | |||
318 | 459 | // unless an error is found. Attempting to retrieve data from a non-existing | 461 | // unless an error is found. Attempting to retrieve data from a non-existing |
319 | 460 | // node is an error. | 462 | // node is an error. |
320 | 461 | func (conn *Conn) Get(path string) (data string, stat *Stat, err error) { | 463 | func (conn *Conn) Get(path string) (data string, stat *Stat, err error) { |
321 | 464 | conn.mutex.RLock() | ||
322 | 465 | defer conn.mutex.RUnlock() | ||
323 | 466 | if conn.handle == nil { | ||
324 | 467 | return "", nil, ZCLOSING | ||
325 | 468 | } | ||
326 | 462 | 469 | ||
327 | 463 | cpath := C.CString(path) | 470 | cpath := C.CString(path) |
328 | 464 | cbuffer := (*C.char)(C.malloc(bufferSize)) | 471 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
329 | @@ -481,6 +488,11 @@ | |||
330 | 481 | // node changes or when critical session events happen. See the | 488 | // node changes or when critical session events happen. See the |
331 | 482 | // documentation of the Event type for more details. | 489 | // documentation of the Event type for more details. |
332 | 483 | func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err error) { | 490 | func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err error) { |
333 | 491 | conn.mutex.RLock() | ||
334 | 492 | defer conn.mutex.RUnlock() | ||
335 | 493 | if conn.handle == nil { | ||
336 | 494 | return "", nil, nil, ZCLOSING | ||
337 | 495 | } | ||
338 | 484 | 496 | ||
339 | 485 | cpath := C.CString(path) | 497 | cpath := C.CString(path) |
340 | 486 | cbuffer := (*C.char)(C.malloc(bufferSize)) | 498 | cbuffer := (*C.char)(C.malloc(bufferSize)) |
341 | @@ -504,6 +516,11 @@ | |||
342 | 504 | // Children returns the children list and status from an existing node. | 516 | // Children returns the children list and status from an existing node. |
343 | 505 | // Attempting to retrieve the children list from a non-existent node is an error. | 517 | // Attempting to retrieve the children list from a non-existent node is an error. |
344 | 506 | func (conn *Conn) Children(path string) (children []string, stat *Stat, err error) { | 518 | func (conn *Conn) Children(path string) (children []string, stat *Stat, err error) { |
345 | 519 | conn.mutex.RLock() | ||
346 | 520 | defer conn.mutex.RUnlock() | ||
347 | 521 | if conn.handle == nil { | ||
348 | 522 | return nil, nil, ZCLOSING | ||
349 | 523 | } | ||
350 | 507 | 524 | ||
351 | 508 | cpath := C.CString(path) | 525 | cpath := C.CString(path) |
352 | 509 | defer C.free(unsafe.Pointer(cpath)) | 526 | defer C.free(unsafe.Pointer(cpath)) |
353 | @@ -529,6 +546,11 @@ | |||
354 | 529 | // provided path or when critical session events happen. See the documentation | 546 | // provided path or when critical session events happen. See the documentation |
355 | 530 | // of the Event type for more details. | 547 | // of the Event type for more details. |
356 | 531 | func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err error) { | 548 | func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err error) { |
357 | 549 | conn.mutex.RLock() | ||
358 | 550 | defer conn.mutex.RUnlock() | ||
359 | 551 | if conn.handle == nil { | ||
360 | 552 | return nil, nil, nil, ZCLOSING | ||
361 | 553 | } | ||
362 | 532 | 554 | ||
363 | 533 | cpath := C.CString(path) | 555 | cpath := C.CString(path) |
364 | 534 | defer C.free(unsafe.Pointer(cpath)) | 556 | defer C.free(unsafe.Pointer(cpath)) |
365 | @@ -570,6 +592,12 @@ | |||
366 | 570 | // stat will contain meta information on the existing node, otherwise | 592 | // stat will contain meta information on the existing node, otherwise |
367 | 571 | // it will be nil. | 593 | // it will be nil. |
368 | 572 | func (conn *Conn) Exists(path string) (stat *Stat, err error) { | 594 | func (conn *Conn) Exists(path string) (stat *Stat, err error) { |
369 | 595 | conn.mutex.RLock() | ||
370 | 596 | defer conn.mutex.RUnlock() | ||
371 | 597 | if conn.handle == nil { | ||
372 | 598 | return nil, ZCLOSING | ||
373 | 599 | } | ||
374 | 600 | |||
375 | 573 | cpath := C.CString(path) | 601 | cpath := C.CString(path) |
376 | 574 | defer C.free(unsafe.Pointer(cpath)) | 602 | defer C.free(unsafe.Pointer(cpath)) |
377 | 575 | 603 | ||
378 | @@ -593,6 +621,12 @@ | |||
379 | 593 | // is removed. It will also receive critical session events. See the | 621 | // is removed. It will also receive critical session events. See the |
380 | 594 | // documentation of the Event type for more details. | 622 | // documentation of the Event type for more details. |
381 | 595 | func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err error) { | 623 | func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err error) { |
382 | 624 | conn.mutex.RLock() | ||
383 | 625 | defer conn.mutex.RUnlock() | ||
384 | 626 | if conn.handle == nil { | ||
385 | 627 | return nil, nil, ZCLOSING | ||
386 | 628 | } | ||
387 | 629 | |||
388 | 596 | cpath := C.CString(path) | 630 | cpath := C.CString(path) |
389 | 597 | defer C.free(unsafe.Pointer(cpath)) | 631 | defer C.free(unsafe.Pointer(cpath)) |
390 | 598 | 632 | ||
391 | @@ -627,6 +661,12 @@ | |||
392 | 627 | // from the requested one, such as when a sequence number is appended | 661 | // from the requested one, such as when a sequence number is appended |
393 | 628 | // to it due to the use of the gozk.SEQUENCE flag. | 662 | // to it due to the use of the gozk.SEQUENCE flag. |
394 | 629 | func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err error) { | 663 | func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err error) { |
395 | 664 | conn.mutex.RLock() | ||
396 | 665 | defer conn.mutex.RUnlock() | ||
397 | 666 | if conn.handle == nil { | ||
398 | 667 | return "", ZCLOSING | ||
399 | 668 | } | ||
400 | 669 | |||
401 | 630 | cpath := C.CString(path) | 670 | cpath := C.CString(path) |
402 | 631 | cvalue := C.CString(value) | 671 | cvalue := C.CString(value) |
403 | 632 | defer C.free(unsafe.Pointer(cpath)) | 672 | defer C.free(unsafe.Pointer(cpath)) |
404 | @@ -658,6 +698,11 @@ | |||
405 | 658 | // It is an error to attempt to set the data of a non-existing node with | 698 | // It is an error to attempt to set the data of a non-existing node with |
406 | 659 | // this function. In these cases, use Create instead. | 699 | // this function. In these cases, use Create instead. |
407 | 660 | func (conn *Conn) Set(path, value string, version int) (stat *Stat, err error) { | 700 | func (conn *Conn) Set(path, value string, version int) (stat *Stat, err error) { |
408 | 701 | conn.mutex.RLock() | ||
409 | 702 | defer conn.mutex.RUnlock() | ||
410 | 703 | if conn.handle == nil { | ||
411 | 704 | return nil, ZCLOSING | ||
412 | 705 | } | ||
413 | 661 | 706 | ||
414 | 662 | cpath := C.CString(path) | 707 | cpath := C.CString(path) |
415 | 663 | cvalue := C.CString(value) | 708 | cvalue := C.CString(value) |
416 | @@ -678,6 +723,12 @@ | |||
417 | 678 | // will only succeed if the node is still at this version when the | 723 | // will only succeed if the node is still at this version when the |
418 | 679 | // node is deleted as an atomic operation. | 724 | // node is deleted as an atomic operation. |
419 | 680 | func (conn *Conn) Delete(path string, version int) (err error) { | 725 | func (conn *Conn) Delete(path string, version int) (err error) { |
420 | 726 | conn.mutex.RLock() | ||
421 | 727 | defer conn.mutex.RUnlock() | ||
422 | 728 | if conn.handle == nil { | ||
423 | 729 | return ZCLOSING | ||
424 | 730 | } | ||
425 | 731 | |||
426 | 681 | cpath := C.CString(path) | 732 | cpath := C.CString(path) |
427 | 682 | defer C.free(unsafe.Pointer(cpath)) | 733 | defer C.free(unsafe.Pointer(cpath)) |
428 | 683 | rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) | 734 | rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) |
429 | @@ -690,6 +741,12 @@ | |||
430 | 690 | // identity data itself. For instance, the "digest" scheme requires | 741 | // identity data itself. For instance, the "digest" scheme requires |
431 | 691 | // a pair like "username:password" to be provided as the certificate. | 742 | // a pair like "username:password" to be provided as the certificate. |
432 | 692 | func (conn *Conn) AddAuth(scheme, cert string) error { | 743 | func (conn *Conn) AddAuth(scheme, cert string) error { |
433 | 744 | conn.mutex.RLock() | ||
434 | 745 | defer conn.mutex.RUnlock() | ||
435 | 746 | if conn.handle == nil { | ||
436 | 747 | return ZCLOSING | ||
437 | 748 | } | ||
438 | 749 | |||
439 | 693 | cscheme := C.CString(scheme) | 750 | cscheme := C.CString(scheme) |
440 | 694 | ccert := C.CString(cert) | 751 | ccert := C.CString(cert) |
441 | 695 | defer C.free(unsafe.Pointer(cscheme)) | 752 | defer C.free(unsafe.Pointer(cscheme)) |
442 | @@ -714,6 +771,11 @@ | |||
443 | 714 | 771 | ||
444 | 715 | // ACL returns the access control list for path. | 772 | // ACL returns the access control list for path. |
445 | 716 | func (conn *Conn) ACL(path string) ([]ACL, *Stat, error) { | 773 | func (conn *Conn) ACL(path string) ([]ACL, *Stat, error) { |
446 | 774 | conn.mutex.RLock() | ||
447 | 775 | defer conn.mutex.RUnlock() | ||
448 | 776 | if conn.handle == nil { | ||
449 | 777 | return nil, nil, ZCLOSING | ||
450 | 778 | } | ||
451 | 717 | 779 | ||
452 | 718 | cpath := C.CString(path) | 780 | cpath := C.CString(path) |
453 | 719 | defer C.free(unsafe.Pointer(cpath)) | 781 | defer C.free(unsafe.Pointer(cpath)) |
454 | @@ -733,6 +795,11 @@ | |||
455 | 733 | 795 | ||
456 | 734 | // SetACL changes the access control list for path. | 796 | // SetACL changes the access control list for path. |
457 | 735 | func (conn *Conn) SetACL(path string, aclv []ACL, version int) error { | 797 | func (conn *Conn) SetACL(path string, aclv []ACL, version int) error { |
458 | 798 | conn.mutex.RLock() | ||
459 | 799 | defer conn.mutex.RUnlock() | ||
460 | 800 | if conn.handle == nil { | ||
461 | 801 | return ZCLOSING | ||
462 | 802 | } | ||
463 | 736 | 803 | ||
464 | 737 | cpath := C.CString(path) | 804 | cpath := C.CString(path) |
465 | 738 | defer C.free(unsafe.Pointer(cpath)) | 805 | defer C.free(unsafe.Pointer(cpath)) |
466 | 739 | 806 | ||
467 | === modified file 'zk_test.go' | |||
468 | --- zk_test.go 2012-02-15 17:51:23 +0000 | |||
469 | +++ zk_test.go 2012-02-28 09:18:17 +0000 | |||
470 | @@ -312,7 +312,7 @@ | |||
471 | 312 | 312 | ||
472 | 313 | children, stat, err := conn.Children("/") | 313 | children, stat, err := conn.Children("/") |
473 | 314 | c.Assert(err, IsNil) | 314 | c.Assert(err, IsNil) |
475 | 315 | c.Assert(children, Equals, []string{"zookeeper"}) | 315 | c.Assert(children, DeepEquals, []string{"zookeeper"}) |
476 | 316 | c.Assert(stat.NumChildren(), Equals, 1) | 316 | c.Assert(stat.NumChildren(), Equals, 1) |
477 | 317 | 317 | ||
478 | 318 | children, stat, err = conn.Children("/non-existent") | 318 | children, stat, err = conn.Children("/non-existent") |
479 | @@ -330,7 +330,7 @@ | |||
480 | 330 | 330 | ||
481 | 331 | children, stat, watch, err := conn.ChildrenW("/") | 331 | children, stat, watch, err := conn.ChildrenW("/") |
482 | 332 | c.Assert(err, IsNil) | 332 | c.Assert(err, IsNil) |
484 | 333 | c.Assert(children, Equals, []string{"zookeeper"}) | 333 | c.Assert(children, DeepEquals, []string{"zookeeper"}) |
485 | 334 | c.Assert(stat.NumChildren(), Equals, 1) | 334 | c.Assert(stat.NumChildren(), Equals, 1) |
486 | 335 | 335 | ||
487 | 336 | select { | 336 | select { |
488 | @@ -355,7 +355,7 @@ | |||
489 | 355 | c.Assert(stat.NumChildren(), Equals, 2) | 355 | c.Assert(stat.NumChildren(), Equals, 2) |
490 | 356 | 356 | ||
491 | 357 | // The ordering is most likely unstable, so this test must be fixed. | 357 | // The ordering is most likely unstable, so this test must be fixed. |
493 | 358 | c.Assert(children, Equals, []string{"test1", "zookeeper"}) | 358 | c.Assert(children, DeepEquals, []string{"test1", "zookeeper"}) |
494 | 359 | 359 | ||
495 | 360 | select { | 360 | select { |
496 | 361 | case <-watch: | 361 | case <-watch: |
497 | @@ -481,7 +481,7 @@ | |||
498 | 481 | defer zk2.Close() | 481 | defer zk2.Close() |
499 | 482 | clientId2 := zk2.ClientId() | 482 | clientId2 := zk2.ClientId() |
500 | 483 | 483 | ||
502 | 484 | c.Assert(clientId1, Equals, clientId2) | 484 | c.Assert(clientId1, DeepEquals, clientId2) |
503 | 485 | } | 485 | } |
504 | 486 | 486 | ||
505 | 487 | // Surprisingly for some (including myself, initially), the watch | 487 | // Surprisingly for some (including myself, initially), the watch |
506 | @@ -512,7 +512,7 @@ | |||
507 | 512 | 512 | ||
508 | 513 | acl, stat, err := conn.ACL("/test") | 513 | acl, stat, err := conn.ACL("/test") |
509 | 514 | c.Assert(err, IsNil) | 514 | c.Assert(err, IsNil) |
511 | 515 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL)) | 515 | c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL)) |
512 | 516 | c.Assert(stat, NotNil) | 516 | c.Assert(stat, NotNil) |
513 | 517 | c.Assert(stat.Version(), Equals, 0) | 517 | c.Assert(stat.Version(), Equals, 0) |
514 | 518 | 518 | ||
515 | @@ -538,7 +538,7 @@ | |||
516 | 538 | 538 | ||
517 | 539 | acl, _, err := conn.ACL("/test") | 539 | acl, _, err := conn.ACL("/test") |
518 | 540 | c.Assert(err, IsNil) | 540 | c.Assert(err, IsNil) |
520 | 541 | c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ)) | 541 | c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_READ)) |
521 | 542 | } | 542 | } |
522 | 543 | 543 | ||
523 | 544 | func (s *S) TestAddAuth(c *C) { | 544 | func (s *S) TestAddAuth(c *C) { |
Reviewers: mp+94812_ code.launchpad. net,
Message:
Please take a look.
Description:
I am concerned at how complex the test for this is,
relative to the simplicity of the actual code change.
(and it's also potentially fragile on a heavily loaded machine).
That said, I can't think of another way to test the
operations directly, so I'm leaving it in.
Better suggestions welcome.
https:/ /code.launchpad .net/~rogpeppe/ gozk/safe- close/+ merge/94812
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/5699093/
Affected files:
A close_test.go
M retry_test.go
M suite_test.go
M zk.go
M zk_test.go