Merge lp:~rogpeppe/gozk/safe-close into lp:gozk/zookeeper

Proposed by Roger Peppe
Status: Merged
Merged at revision: 31
Proposed branch: lp:~rogpeppe/gozk/safe-close
Merge into: lp:gozk/zookeeper
Diff against target: 523 lines (+301/-13)
5 files modified
close_test.go (+220/-0)
retry_test.go (+3/-3)
suite_test.go (+4/-3)
zk.go (+68/-1)
zk_test.go (+6/-6)
To merge this branch: bzr merge lp:~rogpeppe/gozk/safe-close
Reviewer Review Type Date Requested Status
The Go Language Gophers Pending
Review via email: mp+94812@code.launchpad.net

Description of the change

zookeeper: make Conn.Close safe to call concurrently with other operations.

I am concerned at how complex the test for this is,
relative to the simplicity of the actual code change.
(and it's also potentially fragile on a heavily loaded machine).
That said, I can't think of another way to test the
operations directly, so I'm leaving it in.
Better suggestions welcome.

https://codereview.appspot.com/5699093/

To post a comment you must log in.
Revision history for this message
Roger Peppe (rogpeppe) wrote :

Reviewers: mp+94812_code.launchpad.net,

Message:
Please take a look.

Description:
I am concerned at how complex the test for this is,
relative to the simplicity of the actual code change.
(and it's also potentially fragile on a heavily loaded machine).
That said, I can't think of another way to test the
operations directly, so I'm leaving it in.
Better suggestions welcome.

https://code.launchpad.net/~rogpeppe/gozk/safe-close/+merge/94812

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/5699093/

Affected files:
   A close_test.go
   M retry_test.go
   M suite_test.go
   M zk.go
   M zk_test.go

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

https://codereview.appspot.com/5699093/diff/1001/close_test.go
File close_test.go (right):

https://codereview.appspot.com/5699093/diff/1001/close_test.go#newcode96
close_test.go:96: time.Sleep(0.05e9)
How can you tell this is really unblocking at the right time and
exercising what you intend it to at all?

https://codereview.appspot.com/5699093/

Revision history for this message
Roger Peppe (rogpeppe) wrote :
lp:~rogpeppe/gozk/safe-close updated
33. By Roger Peppe

improve close_test comment.
simplify io.Copy logic.

Revision history for this message
William Reade (fwereade) wrote :

This essentially LGTM; I don't think that it's a serious problem that
the tests can theoretically fail under "enough" load. So it comes down
to a question of practical reliability: for the sake of argument, what
if we shoot for an observed failure rate of <1% on your local machine
under "normal" load, and bump up the sleeps as needed if they turn out
not to be good enough for clean testing in practice (say, on ARM ;))?

https://codereview.appspot.com/5699093/

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

LGTM, sorry for the delay.

https://codereview.appspot.com/5699093/

Revision history for this message
Roger Peppe (rogpeppe) wrote :

i forgot to publish this comment.

https://codereview.appspot.com/5699093/diff/1001/close_test.go
File close_test.go (right):

https://codereview.appspot.com/5699093/diff/1001/close_test.go#newcode96
close_test.go:96: time.Sleep(0.05e9)
On 2012/02/27 18:03:17, niemeyer wrote:
> How can you tell this is really unblocking at the right time and
exercising what
> you intend it to at all?

the idea is that any request that requests or changes a zookeeper node
must make at least one round trip to the server. so we interpose a proxy
between the client and the server which can stop all incoming traffic on
demand, thus hopefully blocking the request until we want it to unblock.

we assume that all requests take less than 0.1s to complete, thus when
we wait below, neither of the above goroutines should complete within
the allotted time (the request because it's waiting for a reply from the
server and the close because it's waiting for the request to complete).
if the locking doesn't work, the Close will return early. if the proxy
blocking doesn't work, the request will return early.

when we reenable incoming messages from the server, both goroutines
should complete (we can't tell which completes first - i don't think
that's observable) but i think that the fact that the close blocked
waiting for the request is sufficient.

i've changed the above comment to try to make this clearer.

as i said in the description of the CL, i think this a lot of mechanism
to test something that's quite simple to verify by eye, but i haven't
yet thought of a better way.
one way could be to test Close without testing all the individual
operations - simpler but less complete.

https://codereview.appspot.com/5699093/

Revision history for this message
Roger Peppe (rogpeppe) wrote :

*** Submitted:

zookeeper: make Conn.Close safe to call concurrently with other
operations.

I am concerned at how complex the test for this is,
relative to the simplicity of the actual code change.
(and it's also potentially fragile on a heavily loaded machine).
That said, I can't think of another way to test the
operations directly, so I'm leaving it in.
Better suggestions welcome.

R=niemeyer, fwereade
CC=
https://codereview.appspot.com/5699093

https://codereview.appspot.com/5699093/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'close_test.go'
2--- close_test.go 1970-01-01 00:00:00 +0000
3+++ close_test.go 2012-02-28 09:18:17 +0000
4@@ -0,0 +1,220 @@
5+package zookeeper_test
6+
7+import (
8+ "io"
9+ . "launchpad.net/gocheck"
10+ zk "launchpad.net/gozk/zookeeper"
11+ "log"
12+ "net"
13+ "time"
14+)
15+
16+// requestFuncs holds all the requests that take a read lock
17+// on the zk connection except those that don't actually
18+// make a round trip to the server.
19+var requestFuncs = []func(conn *zk.Conn, path string) error{
20+ func(conn *zk.Conn, path string) error {
21+ _, err := conn.Create(path, "", 0, nil)
22+ return err
23+ },
24+ func(conn *zk.Conn, path string) error {
25+ _, err := conn.Exists(path)
26+ return err
27+ },
28+ func(conn *zk.Conn, path string) error {
29+ _, _, err := conn.ExistsW(path)
30+ return err
31+ },
32+ func(conn *zk.Conn, path string) error {
33+ _, _, err := conn.Get(path)
34+ return err
35+ },
36+ func(conn *zk.Conn, path string) error {
37+ _, _, _, err := conn.GetW(path)
38+ return err
39+ },
40+ func(conn *zk.Conn, path string) error {
41+ _, _, err := conn.Children(path)
42+ return err
43+ },
44+ func(conn *zk.Conn, path string) error {
45+ _, _, _, err := conn.ChildrenW(path)
46+ return err
47+ },
48+ func(conn *zk.Conn, path string) error {
49+ _, err := conn.Set(path, "", 0)
50+ return err
51+ },
52+ func(conn *zk.Conn, path string) error {
53+ _, _, err := conn.ACL(path)
54+ return err
55+ },
56+ func(conn *zk.Conn, path string) error {
57+ return conn.SetACL(path, []zk.ACL{{
58+ Perms: zk.PERM_ALL,
59+ Scheme: "digest",
60+ Id: "foo",
61+ }}, 0)
62+ },
63+ func(conn *zk.Conn, path string) error {
64+ return conn.Delete(path, 0)
65+ },
66+}
67+
68+func (s *S) TestConcurrentClose(c *C) {
69+ // make sure the server is ready to receive connections.
70+ s.init(c)
71+
72+ // Close should wait until all outstanding requests have
73+ // completed before returning. The idea of this test is that
74+ // any request that requests or changes a zookeeper node must
75+ // make at least one round trip to the server, so we interpose a
76+ // proxy between the client and the server which can stop all
77+ // incoming traffic on demand, thus blocking the request until
78+ // we want it to unblock.
79+ //
80+ // We assume that all requests take less than 0.1s to complete,
81+ // thus when we wait below, neither of the above goroutines
82+ // should complete within the allotted time (the request because
83+ // it's waiting for a reply from the server and the close
84+ // because it's waiting for the request to complete). If the
85+ // locking doesn't work, the Close will return early. If the
86+ // proxy blocking doesn't work, the request will return early.
87+ //
88+ // When we reenable incoming messages from the server, both
89+ // goroutines should complete. We can't tell which completes
90+ // first, but the fact that the close blocked is sufficient to
91+ // tell that the locking is working correctly.
92+ for i, f := range requestFuncs {
93+ c.Logf("iter %d", i)
94+ p := newProxy(c, s.zkAddr)
95+ conn, watch, err := zk.Dial(p.addr(), 5e9)
96+ c.Assert(err, IsNil)
97+ c.Assert((<-watch).Ok(), Equals, true)
98+
99+ // sanity check that the connection is actually
100+ // up and running.
101+ _, err = conn.Exists("/nothing")
102+ c.Assert(err, IsNil)
103+
104+ p.stopIncoming()
105+ reqDone := make(chan bool)
106+ closeDone := make(chan bool)
107+ go func() {
108+ f(conn, "/closetest")
109+ reqDone <- true
110+ }()
111+ go func() {
112+ // sleep for long enough for the request to be initiated and the read lock taken.
113+ time.Sleep(0.05e9)
114+ conn.Close()
115+ closeDone <- true
116+ }()
117+ select {
118+ case <-reqDone:
119+ c.Fatalf("request %d finished early", i)
120+ case <-closeDone:
121+ c.Fatalf("request %d close finished early", i)
122+ case <-time.After(0.1e9):
123+ }
124+ p.startIncoming()
125+ for reqDone != nil || closeDone != nil {
126+ select {
127+ case <-reqDone:
128+ reqDone = nil
129+ case <-closeDone:
130+ closeDone = nil
131+ case <-time.After(0.4e9):
132+ c.Fatalf("request %d timed out waiting for req (%p) and close(%p)", i, reqDone, closeDone)
133+ }
134+ }
135+ p.close()
136+ err = f(conn, "/closetest")
137+ c.Check(err, Equals, zk.ZCLOSING)
138+ }
139+}
140+
141+type proxy struct {
142+ stop, start chan bool
143+ listener net.Listener
144+}
145+
146+// newProxy will listen on proxyAddr and connect its client to dstAddr, and return
147+// a proxy instance that can be used to control the connection.
148+func newProxy(c *C, dstAddr string) *proxy {
149+ listener, err := net.Listen("tcp", "127.0.0.1:0")
150+ c.Assert(err, IsNil)
151+ p := &proxy{
152+ stop: make(chan bool, 1),
153+ start: make(chan bool, 1),
154+ listener: listener,
155+ }
156+
157+ go func() {
158+ for {
159+ client, err := p.listener.Accept()
160+ if err != nil {
161+ // Ignore the error, because the connection will fail anyway.
162+ return
163+ }
164+ go func() {
165+ defer client.Close()
166+ server, err := net.Dial("tcp", dstAddr)
167+ if err != nil {
168+ log.Printf("cannot dial %q: %v", dstAddr, err)
169+ return
170+ }
171+ defer server.Close()
172+ go io.Copy(&haltableWriter{
173+ w: client,
174+ stop: p.stop,
175+ start: p.start},
176+ server)
177+ // When the client is closed, the deferred closes will
178+ // take down the other io.Copy too.
179+ io.Copy(server, client)
180+ }()
181+ }
182+ }()
183+ return p
184+}
185+
186+func (p *proxy) close() error {
187+ return p.listener.Close()
188+}
189+
190+func (p *proxy) addr() string {
191+ return p.listener.Addr().String()
192+}
193+
194+func (p *proxy) stopIncoming() {
195+ if p.stop == nil {
196+ panic("cannot stop twice")
197+ }
198+ p.stop <- true
199+ p.stop = nil
200+}
201+
202+func (p *proxy) startIncoming() {
203+ if p.start == nil {
204+ panic("cannot start twice")
205+ }
206+ p.start <- true
207+ p.start = nil
208+}
209+
210+type haltableWriter struct {
211+ w io.Writer
212+ stop, start chan bool
213+}
214+
215+func (w *haltableWriter) Write(buf []byte) (int, error) {
216+ select {
217+ case <-w.stop:
218+ w.stop <- true
219+ <-w.start
220+ w.start <- true
221+ default:
222+ }
223+ return w.w.Write(buf)
224+}
225
226=== modified file 'retry_test.go'
227--- retry_test.go 2012-02-15 17:18:34 +0000
228+++ retry_test.go 2012-02-28 09:18:17 +0000
229@@ -25,7 +25,7 @@
230
231 acl, _, err := conn.ACL("/test")
232 c.Assert(err, IsNil)
233- c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
234+ c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL))
235 }
236
237 func (s *S) TestRetryChangeSetting(c *C) {
238@@ -52,7 +52,7 @@
239 // ACL was unchanged by RetryChange().
240 acl, _, err := conn.ACL("/test")
241 c.Assert(err, IsNil)
242- c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
243+ c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL))
244 }
245
246 func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) {
247@@ -177,7 +177,7 @@
248 // Should be the new ACL.
249 acl, _, err := conn.ACL("/test")
250 c.Assert(err, IsNil)
251- c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
252+ c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_READ))
253 }
254
255 func (s *S) TestRetryChangeErrorInCallback(c *C) {
256
257=== modified file 'suite_test.go'
258--- suite_test.go 2012-02-10 13:33:24 +0000
259+++ suite_test.go 2012-02-28 09:18:17 +0000
260@@ -31,6 +31,7 @@
261 var logLevel = 0 //zk.LOG_ERROR
262
263 func (s *S) init(c *C) (*zk.Conn, chan zk.Event) {
264+ c.Logf("init dialling %q", s.zkAddr)
265 conn, watch, err := zk.Dial(s.zkAddr, 5e9)
266 c.Assert(err, IsNil)
267 s.handles = append(s.handles, conn)
268@@ -71,7 +72,7 @@
269
270 func (s *S) SetUpTest(c *C) {
271 c.Assert(zk.CountPendingWatches(), Equals, 0,
272- Bug("Test got a dirty watch state before running!"))
273+ Commentf("Test got a dirty watch state before running!"))
274 zk.SetLogLevel(logLevel)
275 }
276
277@@ -95,7 +96,7 @@
278 s.handles = make([]*zk.Conn, 0)
279
280 c.Assert(zk.CountPendingWatches(), Equals, 0,
281- Bug("Test left live watches behind!"))
282+ Commentf("Test left live watches behind!"))
283 }
284
285 // We use the suite set up and tear down to manage a custom ZooKeeper
286@@ -104,7 +105,7 @@
287 var err error
288 s.deadWatches = make(chan bool)
289
290- // N.B. We meed to create a subdirectory because zk.CreateServer
291+ // N.B. We need to create a subdirectory because zk.CreateServer
292 // insists on creating its own directory.
293
294 s.zkTestRoot = c.MkDir() + "/zk"
295
296=== modified file 'zk.go'
297--- zk.go 2012-02-15 17:51:23 +0000
298+++ zk.go 2012-02-28 09:18:17 +0000
299@@ -32,7 +32,7 @@
300 watchChannels map[uintptr]chan Event
301 sessionWatchId uintptr
302 handle *C.zhandle_t
303- mutex sync.Mutex
304+ mutex sync.RWMutex
305 }
306
307 // ClientId represents an established ZooKeeper session. It can be
308@@ -429,6 +429,8 @@
309 // ClientId returns the client ID for the existing session with ZooKeeper.
310 // This is useful to reestablish an existing session via ReInit.
311 func (conn *Conn) ClientId() *ClientId {
312+ conn.mutex.RLock()
313+ defer conn.mutex.RUnlock()
314 return &ClientId{*C.zoo_client_id(conn.handle)}
315 }
316
317@@ -459,6 +461,11 @@
318 // unless an error is found. Attempting to retrieve data from a non-existing
319 // node is an error.
320 func (conn *Conn) Get(path string) (data string, stat *Stat, err error) {
321+ conn.mutex.RLock()
322+ defer conn.mutex.RUnlock()
323+ if conn.handle == nil {
324+ return "", nil, ZCLOSING
325+ }
326
327 cpath := C.CString(path)
328 cbuffer := (*C.char)(C.malloc(bufferSize))
329@@ -481,6 +488,11 @@
330 // node changes or when critical session events happen. See the
331 // documentation of the Event type for more details.
332 func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err error) {
333+ conn.mutex.RLock()
334+ defer conn.mutex.RUnlock()
335+ if conn.handle == nil {
336+ return "", nil, nil, ZCLOSING
337+ }
338
339 cpath := C.CString(path)
340 cbuffer := (*C.char)(C.malloc(bufferSize))
341@@ -504,6 +516,11 @@
342 // Children returns the children list and status from an existing node.
343 // Attempting to retrieve the children list from a non-existent node is an error.
344 func (conn *Conn) Children(path string) (children []string, stat *Stat, err error) {
345+ conn.mutex.RLock()
346+ defer conn.mutex.RUnlock()
347+ if conn.handle == nil {
348+ return nil, nil, ZCLOSING
349+ }
350
351 cpath := C.CString(path)
352 defer C.free(unsafe.Pointer(cpath))
353@@ -529,6 +546,11 @@
354 // provided path or when critical session events happen. See the documentation
355 // of the Event type for more details.
356 func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err error) {
357+ conn.mutex.RLock()
358+ defer conn.mutex.RUnlock()
359+ if conn.handle == nil {
360+ return nil, nil, nil, ZCLOSING
361+ }
362
363 cpath := C.CString(path)
364 defer C.free(unsafe.Pointer(cpath))
365@@ -570,6 +592,12 @@
366 // stat will contain meta information on the existing node, otherwise
367 // it will be nil.
368 func (conn *Conn) Exists(path string) (stat *Stat, err error) {
369+ conn.mutex.RLock()
370+ defer conn.mutex.RUnlock()
371+ if conn.handle == nil {
372+ return nil, ZCLOSING
373+ }
374+
375 cpath := C.CString(path)
376 defer C.free(unsafe.Pointer(cpath))
377
378@@ -593,6 +621,12 @@
379 // is removed. It will also receive critical session events. See the
380 // documentation of the Event type for more details.
381 func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err error) {
382+ conn.mutex.RLock()
383+ defer conn.mutex.RUnlock()
384+ if conn.handle == nil {
385+ return nil, nil, ZCLOSING
386+ }
387+
388 cpath := C.CString(path)
389 defer C.free(unsafe.Pointer(cpath))
390
391@@ -627,6 +661,12 @@
392 // from the requested one, such as when a sequence number is appended
393 // to it due to the use of the gozk.SEQUENCE flag.
394 func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err error) {
395+ conn.mutex.RLock()
396+ defer conn.mutex.RUnlock()
397+ if conn.handle == nil {
398+ return "", ZCLOSING
399+ }
400+
401 cpath := C.CString(path)
402 cvalue := C.CString(value)
403 defer C.free(unsafe.Pointer(cpath))
404@@ -658,6 +698,11 @@
405 // It is an error to attempt to set the data of a non-existing node with
406 // this function. In these cases, use Create instead.
407 func (conn *Conn) Set(path, value string, version int) (stat *Stat, err error) {
408+ conn.mutex.RLock()
409+ defer conn.mutex.RUnlock()
410+ if conn.handle == nil {
411+ return nil, ZCLOSING
412+ }
413
414 cpath := C.CString(path)
415 cvalue := C.CString(value)
416@@ -678,6 +723,12 @@
417 // will only succeed if the node is still at this version when the
418 // node is deleted as an atomic operation.
419 func (conn *Conn) Delete(path string, version int) (err error) {
420+ conn.mutex.RLock()
421+ defer conn.mutex.RUnlock()
422+ if conn.handle == nil {
423+ return ZCLOSING
424+ }
425+
426 cpath := C.CString(path)
427 defer C.free(unsafe.Pointer(cpath))
428 rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version))
429@@ -690,6 +741,12 @@
430 // identity data itself. For instance, the "digest" scheme requires
431 // a pair like "username:password" to be provided as the certificate.
432 func (conn *Conn) AddAuth(scheme, cert string) error {
433+ conn.mutex.RLock()
434+ defer conn.mutex.RUnlock()
435+ if conn.handle == nil {
436+ return ZCLOSING
437+ }
438+
439 cscheme := C.CString(scheme)
440 ccert := C.CString(cert)
441 defer C.free(unsafe.Pointer(cscheme))
442@@ -714,6 +771,11 @@
443
444 // ACL returns the access control list for path.
445 func (conn *Conn) ACL(path string) ([]ACL, *Stat, error) {
446+ conn.mutex.RLock()
447+ defer conn.mutex.RUnlock()
448+ if conn.handle == nil {
449+ return nil, nil, ZCLOSING
450+ }
451
452 cpath := C.CString(path)
453 defer C.free(unsafe.Pointer(cpath))
454@@ -733,6 +795,11 @@
455
456 // SetACL changes the access control list for path.
457 func (conn *Conn) SetACL(path string, aclv []ACL, version int) error {
458+ conn.mutex.RLock()
459+ defer conn.mutex.RUnlock()
460+ if conn.handle == nil {
461+ return ZCLOSING
462+ }
463
464 cpath := C.CString(path)
465 defer C.free(unsafe.Pointer(cpath))
466
467=== modified file 'zk_test.go'
468--- zk_test.go 2012-02-15 17:51:23 +0000
469+++ zk_test.go 2012-02-28 09:18:17 +0000
470@@ -312,7 +312,7 @@
471
472 children, stat, err := conn.Children("/")
473 c.Assert(err, IsNil)
474- c.Assert(children, Equals, []string{"zookeeper"})
475+ c.Assert(children, DeepEquals, []string{"zookeeper"})
476 c.Assert(stat.NumChildren(), Equals, 1)
477
478 children, stat, err = conn.Children("/non-existent")
479@@ -330,7 +330,7 @@
480
481 children, stat, watch, err := conn.ChildrenW("/")
482 c.Assert(err, IsNil)
483- c.Assert(children, Equals, []string{"zookeeper"})
484+ c.Assert(children, DeepEquals, []string{"zookeeper"})
485 c.Assert(stat.NumChildren(), Equals, 1)
486
487 select {
488@@ -355,7 +355,7 @@
489 c.Assert(stat.NumChildren(), Equals, 2)
490
491 // The ordering is most likely unstable, so this test must be fixed.
492- c.Assert(children, Equals, []string{"test1", "zookeeper"})
493+ c.Assert(children, DeepEquals, []string{"test1", "zookeeper"})
494
495 select {
496 case <-watch:
497@@ -481,7 +481,7 @@
498 defer zk2.Close()
499 clientId2 := zk2.ClientId()
500
501- c.Assert(clientId1, Equals, clientId2)
502+ c.Assert(clientId1, DeepEquals, clientId2)
503 }
504
505 // Surprisingly for some (including myself, initially), the watch
506@@ -512,7 +512,7 @@
507
508 acl, stat, err := conn.ACL("/test")
509 c.Assert(err, IsNil)
510- c.Assert(acl, Equals, zk.WorldACL(zk.PERM_ALL))
511+ c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL))
512 c.Assert(stat, NotNil)
513 c.Assert(stat.Version(), Equals, 0)
514
515@@ -538,7 +538,7 @@
516
517 acl, _, err := conn.ACL("/test")
518 c.Assert(err, IsNil)
519- c.Assert(acl, Equals, zk.WorldACL(zk.PERM_READ))
520+ c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_READ))
521 }
522
523 func (s *S) TestAddAuth(c *C) {

Subscribers

People subscribed via source and target branches