Merge lp:~jc.redoutey/libmemcached/io_flush into lp:~tangent-org/libmemcached/trunk

Proposed by JC Redoutey
Status: Work in progress
Proposed branch: lp:~jc.redoutey/libmemcached/io_flush
Merge into: lp:~tangent-org/libmemcached/trunk
Diff against target: 206 lines (+88/-39)
3 files modified
libmemcached/memcached_io.c (+40/-37)
libmemcached/memcached_server.h (+1/-0)
tests/function.c (+47/-2)
To merge this branch: bzr merge lp:~jc.redoutey/libmemcached/io_flush
Reviewer Review Type Date Requested Status
Libmemcached-developers Pending
Review via email: mp+15453@code.launchpad.net
To post a comment you must log in.
Revision history for this message
JC Redoutey (jc.redoutey) wrote :

the test case does not work as good as I hoped, I guess because network calls are in fact local. But anyway, io_flush needs to somehow support recursive calls, so here is a version proposal.

624. By Jean-Charles Redoutey <jc@Jayce2009lnx>

Removed wrongly introduced non portable call

Revision history for this message
Brian Aker (brianaker) wrote :

Anything still relevant in this?

Unmerged revisions

624. By Jean-Charles Redoutey <jc@Jayce2009lnx>

Removed wrongly introduced non portable call

623. By Jean-Charles Redoutey <jc@Jayce2009lnx>

Made io_flush reentrant, or at least enough for existing cases

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'libmemcached/memcached_io.c'
2--- libmemcached/memcached_io.c 2009-10-14 11:40:42 +0000
3+++ libmemcached/memcached_io.c 2009-12-02 17:36:12 +0000
4@@ -424,14 +424,13 @@
5 ssize_t sent_length;
6 size_t return_length;
7 char *local_write_ptr= ptr->write_buffer;
8- size_t write_length= ptr->write_buffer_offset;
9
10 *error= MEMCACHED_SUCCESS;
11
12 WATCHPOINT_ASSERT(ptr->fd != -1);
13
14 // UDP Sanity check, make sure that we are not sending somthing too big
15- if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
16+ if (ptr->type == MEMCACHED_CONNECTION_UDP && ptr->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
17 return -1;
18
19 if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
20@@ -439,24 +438,32 @@
21 return 0;
22
23 /* Looking for memory overflows */
24-#if defined(DEBUG)
25- if (write_length == MEMCACHED_MAX_BUFFER)
26- WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
27- WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
28-#endif
29-
30- return_length= 0;
31- while (write_length)
32- {
33- WATCHPOINT_ASSERT(ptr->fd != -1);
34- WATCHPOINT_ASSERT(write_length > 0);
35- sent_length= 0;
36- if (ptr->type == MEMCACHED_CONNECTION_UDP)
37- increment_udp_message_id(ptr);
38- sent_length= write(ptr->fd, local_write_ptr, write_length);
39-
40- if (sent_length == -1)
41- {
42+ #if defined(DEBUG)
43+ if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
44+ WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
45+ WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + ptr->write_buffer_offset));
46+ #endif
47+
48+ return_length= 0;
49+ while (ptr->write_buffer_offset)
50+ {
51+ WATCHPOINT_ASSERT(ptr->fd != -1);
52+ WATCHPOINT_ASSERT(ptr->write_buffer_offset > 0);
53+ if (ptr->type == MEMCACHED_CONNECTION_UDP)
54+ increment_udp_message_id(ptr);
55+
56+ sent_length= write(ptr->fd, local_write_ptr + ptr->write_buffer_start_offset, ptr->write_buffer_offset);
57+
58+ if(sent_length > 0)
59+ {
60+ ptr->io_bytes_sent += (uint32_t) sent_length;
61+ ptr->write_buffer_start_offset += sent_length;
62+ ptr->write_buffer_offset -= (uint32_t) sent_length;
63+ return_length+= (uint32_t) sent_length;
64+ }
65+
66+ if (sent_length == -1)
67+ {
68 ptr->cached_errno= errno;
69 switch (errno)
70 {
71@@ -490,23 +497,18 @@
72 }
73 }
74
75- if (ptr->type == MEMCACHED_CONNECTION_UDP &&
76- (size_t)sent_length != write_length)
77- {
78- memcached_quit_server(ptr, 1);
79- return -1;
80- }
81-
82- ptr->io_bytes_sent += (uint32_t) sent_length;
83-
84- local_write_ptr+= sent_length;
85- write_length-= (uint32_t) sent_length;
86- return_length+= (uint32_t) sent_length;
87- }
88-
89- WATCHPOINT_ASSERT(write_length == 0);
90- // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
91- // ptr->write_buffer_offset);
92+ if (ptr->type == MEMCACHED_CONNECTION_UDP &&
93+ (size_t)sent_length != ptr->write_buffer_offset)
94+ {
95+ memcached_quit_server(ptr, 1);
96+ return -1;
97+ }
98+
99+ }
100+
101+ WATCHPOINT_ASSERT(ptr->write_buffer_offset == 0);
102+ // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
103+ // ptr->write_buffer_offset);
104
105 // if we are a udp server, the begining of the buffer is reserverd for
106 // the upd frame header
107@@ -515,6 +517,7 @@
108 else
109 ptr->write_buffer_offset= 0;
110
111+ ptr->write_buffer_start_offset = 0;
112 return (ssize_t) return_length;
113 }
114
115
116=== modified file 'libmemcached/memcached_server.h'
117--- libmemcached/memcached_server.h 2009-10-10 11:57:03 +0000
118+++ libmemcached/memcached_server.h 2009-12-02 17:36:12 +0000
119@@ -33,6 +33,7 @@
120 size_t read_buffer_length;
121 size_t read_data_length;
122 size_t write_buffer_offset;
123+ size_t write_buffer_start_offset;
124 struct addrinfo *address_info;
125 time_t next_retry;
126 memcached_st *root;
127
128=== modified file 'tests/function.c'
129--- tests/function.c 2009-11-27 17:05:51 +0000
130+++ tests/function.c 2009-12-02 17:36:12 +0000
131@@ -3922,7 +3922,7 @@
132 }
133
134 #ifdef HAVE_LIBMEMCACHEDUTIL
135-static void* connection_release(void *arg)
136+static void* connection_release(void *arg)
137 {
138 struct {
139 memcached_pool_st* pool;
140@@ -4169,7 +4169,7 @@
141 rc= memcached_set(memc, keys[x], len[x], "1", 1, 0, 0);
142 test_truth(rc == MEMCACHED_SUCCESS);
143 }
144-
145+
146 memcached_quit(memc);
147
148 for (int x=0; x< 7; ++x) {
149@@ -5391,6 +5391,50 @@
150 return TEST_SUCCESS;
151 }
152
153+
154+
155+/*
156+ * Test that ensures that buffered set to not trigger problems during io_flush
157+ */
158+static test_return_t regression_bug_490520(memcached_st *memc)
159+{
160+
161+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK,1);
162+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS,1);
163+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_POLL_TIMEOUT,1000);
164+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SERVER_FAILURE_LIMIT,1);
165+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_RETRY_TIMEOUT,3600);
166+
167+ uint32_t number_of_hosts= memc->number_of_hosts;
168+ memc->number_of_hosts= 1;
169+ int max_keys= 200480;
170+
171+
172+ char **keys= calloc((size_t)max_keys, sizeof(char*));
173+ size_t *key_length=calloc((size_t)max_keys, sizeof(size_t));
174+
175+ /* First add all of the items.. */
176+ char blob[3333] = {0};
177+ memcached_return rc;
178+ for (int x= 0; x < max_keys; ++x)
179+ {
180+ char k[251];
181+ key_length[x]= (size_t)snprintf(k, sizeof(k), "0200%u", x);
182+ keys[x]= strdup(k);
183+ assert(keys[x] != NULL);
184+ rc= memcached_set(memc, keys[x], key_length[x], blob, sizeof(blob), 0, 0);
185+ assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
186+ }
187+
188+
189+ memc->number_of_hosts= number_of_hosts;
190+ return TEST_SUCCESS;
191+}
192+
193+
194+
195+
196+
197 test_st udp_setup_server_tests[] ={
198 {"set_udp_behavior_test", 0, set_udp_behavior_test},
199 {"add_tcp_server_udp_client_test", 0, add_tcp_server_udp_client_test},
200@@ -5567,6 +5611,7 @@
201 {"lp:442914", 1, regression_bug_442914 },
202 {"lp:447342", 1, regression_bug_447342 },
203 {"lp:463297", 1, regression_bug_463297 },
204+ {"lp:490520", 1, regression_bug_490520 },
205 {0, 0, 0}
206 };
207

Subscribers

People subscribed via source and target branches

to all changes: