Merge lp:~noskcaj/ubuntu/trusty/librdkafka/0.8.1 into lp:ubuntu/trusty/librdkafka
- Trusty (14.04)
- 0.8.1
- Merge into trusty
Proposed by
Jackson Doak
Status: | Merged |
---|---|
Merge reported by: | Martin Pitt |
Merged at revision: | not available |
Proposed branch: | lp:~noskcaj/ubuntu/trusty/librdkafka/0.8.1 |
Merge into: | lp:ubuntu/trusty/librdkafka |
Diff against target: |
1223 lines (+475/-123) 19 files modified
CONFIGURATION.md (+2/-0) Makefile (+4/-1) README.md (+5/-5) debian/changelog (+8/-0) debian/control (+2/-1) debian/librdkafka1.symbols (+2/-0) examples/rdkafka_example.c (+5/-0) examples/rdkafka_performance.c (+16/-8) rdkafka.c (+48/-26) rdkafka.h (+20/-5) rdkafka_broker.c (+104/-32) rdkafka_defaultconf.c (+8/-0) rdkafka_int.h (+42/-0) rdkafka_msg.c (+7/-1) rdkafka_proto.h (+11/-10) rdkafka_topic.c (+20/-25) rdsysqueue.h (+1/-8) tests/0003-msgmaxsize.c (+169/-0) tests/Makefile (+1/-1) |
To merge this branch: | bzr merge lp:~noskcaj/ubuntu/trusty/librdkafka/0.8.1 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Martin Pitt | Approve | ||
Review via email: mp+200240@code.launchpad.net |
Commit message
Description of the change
New upstream release, should fix ftbfs. I cannot test as i don't have working PPC hardware, but http://
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'CONFIGURATION.md' |
2 | --- CONFIGURATION.md 2013-11-04 16:50:07 +0000 |
3 | +++ CONFIGURATION.md 2014-01-01 03:04:53 +0000 |
4 | @@ -11,6 +11,8 @@ |
5 | topic.metadata.refresh.fast.interval.ms | 250 | See `topic.metadata.refresh.fast.cnt` description |
6 | debug | | A comma-separated list of debug contexts to enable: all,generic,broker,topic,metadata,producer,queue,msg |
7 | socket.timeout.ms | 60000 | Timeout for network requests. |
8 | +socket.send.buffer.bytes | 0 | Broker socket send buffer size. System default is used if 0. |
9 | +socket.receive.buffer.bytes | 0 | Broker socket receive buffer size. System default is used if 0. |
10 | broker.address.ttl | 300000 | How long to cache the broker address resolving results. |
11 | statistics.interval.ms | 0 | librdkafka statistics emit interval. The application also needs to register a stats callback using `rd_kafka_conf_set_stats_cb()`. The granularity is 1000ms. |
12 | queued.min.messages | 100000 | Minimum number of messages that should to be available for consumption by application. |
13 | |
14 | === modified file 'Makefile' |
15 | --- Makefile 2013-11-04 16:50:07 +0000 |
16 | +++ Makefile 2014-01-01 03:04:53 +0000 |
17 | @@ -17,7 +17,9 @@ |
18 | CFLAGS+=-g |
19 | |
20 | # Clang warnings to ignore |
21 | -CFLAGS+=-Wno-gnu-designator |
22 | +ifeq ($(CC),clang) |
23 | + CFLAGS+=-Wno-gnu-designator |
24 | +endif |
25 | |
26 | # Enable iovecs in snappy |
27 | CFLAGS+=-DSG |
28 | @@ -82,5 +84,6 @@ |
29 | rm -f $(OBJS) $(DEPS) \ |
30 | $(LIBNAME)*.a $(LIBNAME)*.so $(LIBNAME)*.so.$(LIBVER) |
31 | make -C tests clean |
32 | + make -C examples clean |
33 | |
34 | -include $(DEPS) |
35 | |
36 | === modified file 'README.md' |
37 | --- README.md 2013-11-04 16:50:07 +0000 |
38 | +++ README.md 2014-01-01 03:04:53 +0000 |
39 | @@ -24,12 +24,12 @@ |
40 | * Producer: supported |
41 | * Consumer: supported |
42 | * Compression: snappy and gzip |
43 | - * Debian package: work in progress (separate "debian" branch) |
44 | + * Debian package: "debian" branch ([ITP #710271](http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=710271)) |
45 | * ZooKeeper: not supported |
46 | - * API: not backwards compatible |
47 | + * API: Stable, not backwards compatible |
48 | * Tests: Regression tests in `tests/` directory. |
49 | * Statistics: JSON formatted, see `rd_kafka_conf_set_stats_cb` in `rdkafka.h`. |
50 | - * Status: Testing, APIs subject to change. |
51 | + * Status: Stable |
52 | |
53 | |
54 | **Apache Kafka 0.7 support:** |
55 | @@ -117,10 +117,10 @@ |
56 | [GitHub Issues](https://github.com/edenhill/librdkafka/issues) |
57 | |
58 | |
59 | -Questions and discussions are also welcome on irc.freenode.org, #kafka, |
60 | +Questions and discussions are also welcome on irc.freenode.org, #apache-kafka, |
61 | nickname Snaps. |
62 | |
63 | |
64 | -### Commerical support |
65 | +### Commercial support |
66 | |
67 | Commercial support is available from [Edenhill services](http://www.edenhill.se) |
68 | |
69 | === modified file 'debian/changelog' |
70 | --- debian/changelog 2013-11-04 16:50:07 +0000 |
71 | +++ debian/changelog 2014-01-01 03:04:53 +0000 |
72 | @@ -1,3 +1,11 @@ |
73 | +librdkafka (0.8.1-0ubuntu1) trusty; urgency=medium |
74 | + |
75 | + * New upstream release. |
76 | + - Should fix FTBFS at debian bug 730506 |
77 | + * Update symbols |
78 | + |
79 | + -- Jackson Doak <noskcaj@ubuntu.com> Wed, 01 Jan 2014 13:55:40 +1100 |
80 | + |
81 | librdkafka (0.8.0-1) unstable; urgency=low |
82 | |
83 | * Initial release. (Closes: #710271) |
84 | |
85 | === modified file 'debian/control' |
86 | --- debian/control 2013-11-04 16:50:07 +0000 |
87 | +++ debian/control 2014-01-01 03:04:53 +0000 |
88 | @@ -1,6 +1,7 @@ |
89 | Source: librdkafka |
90 | Priority: optional |
91 | -Maintainer: Faidon Liambotis <paravoid@debian.org> |
92 | +Maintainer: Ubuntu Developers <ubuntu-devel-discuss@lists.ubuntu.com> |
93 | +XSBC-Original-Maintainer: Faidon Liambotis <paravoid@debian.org> |
94 | Build-Depends: debhelper (>= 9), zlib1g-dev |
95 | Standards-Version: 3.9.4 |
96 | Section: libs |
97 | |
98 | === modified file 'debian/librdkafka1.symbols' |
99 | --- debian/librdkafka1.symbols 2013-11-04 16:50:07 +0000 |
100 | +++ debian/librdkafka1.symbols 2014-01-01 03:04:53 +0000 |
101 | @@ -84,4 +84,6 @@ |
102 | rd_kafka_toppar_insert_msg@Base 0.8.0 |
103 | rd_kafka_toppar_insert_msgq@Base 0.8.0 |
104 | rd_kafka_toppar_ua_move@Base 0.8.0 |
105 | + rd_kafka_version@Base 0.8.1 |
106 | + rd_kafka_version_str@Base 0.8.1 |
107 | rd_kafka_wait_destroyed@Base 0.8.0 |
108 | |
109 | === modified file 'examples/rdkafka_example.c' |
110 | --- examples/rdkafka_example.c 2013-11-04 16:50:07 +0000 |
111 | +++ examples/rdkafka_example.c 2014-01-01 03:04:53 +0000 |
112 | @@ -214,6 +214,8 @@ |
113 | "Usage: %s [-C|-P] -t <topic> " |
114 | "[-p <partition>] [-b <host1:port1,host2:port2,..>]\n" |
115 | "\n" |
116 | + "librdkafka version %s (0x%08x)\n" |
117 | + "\n" |
118 | " Options:\n" |
119 | " -C | -P Consumer or Producer mode\n" |
120 | " -t <topic> Topic to fetch / produce\n" |
121 | @@ -231,8 +233,11 @@ |
122 | " writes fetched messages to stdout\n" |
123 | " In Producer mode:\n" |
124 | " reads messages from stdin and sends to broker\n" |
125 | + "\n" |
126 | + "\n" |
127 | "\n", |
128 | argv[0], |
129 | + rd_kafka_version_str(), rd_kafka_version(), |
130 | RD_KAFKA_DEBUG_CONTEXTS); |
131 | exit(1); |
132 | } |
133 | |
134 | === modified file 'examples/rdkafka_performance.c' |
135 | --- examples/rdkafka_performance.c 2013-11-04 16:50:07 +0000 |
136 | +++ examples/rdkafka_performance.c 2014-01-01 03:04:53 +0000 |
137 | @@ -87,7 +87,6 @@ |
138 | void *payload, size_t len, |
139 | int error_code, |
140 | void *opaque, void *msg_opaque) { |
141 | - long int msgid = (long int)msg_opaque; |
142 | static rd_ts_t last; |
143 | rd_ts_t now = rd_clock(); |
144 | static int msgs; |
145 | @@ -106,12 +105,12 @@ |
146 | !(msgs_wait_cnt % (dispintvl / 1000)) || |
147 | (now - last) >= dispintvl * 1000) { |
148 | if (error_code) |
149 | - printf("Message %ld delivey failed: %s (%li remain)\n", |
150 | - msgid, rd_kafka_err2str(error_code), |
151 | + printf("Message delivey failed: %s (%li remain)\n", |
152 | + rd_kafka_err2str(error_code), |
153 | msgs_wait_cnt); |
154 | else if (!quiet) |
155 | - printf("Message %ld delivered: %li remain\n", |
156 | - msgid, msgs_wait_cnt); |
157 | + printf("Message delivered: %li remain\n", |
158 | + msgs_wait_cnt); |
159 | if (!quiet && do_seq) |
160 | printf(" --> \"%.*s\"\n", (int)len, (char *)payload); |
161 | last = now; |
162 | @@ -413,6 +412,8 @@ |
163 | "Usage: %s [-C|-P] -t <topic> " |
164 | "[-p <partition>] [-b <broker,broker..>] [options..]\n" |
165 | "\n" |
166 | + "librdkafka version %s (0x%08x)\n" |
167 | + "\n" |
168 | " Options:\n" |
169 | " -C | -P Consumer or Producer mode\n" |
170 | " -t <topic> Topic to fetch / produce\n" |
171 | @@ -454,6 +455,7 @@ |
172 | " writes messages of size -s <..> and prints thruput\n" |
173 | "\n", |
174 | argv[0], |
175 | + rd_kafka_version_str(), rd_kafka_version(), |
176 | RD_KAFKA_DEBUG_CONTEXTS); |
177 | exit(1); |
178 | } |
179 | @@ -560,14 +562,17 @@ |
180 | while (run && |
181 | rd_kafka_produce(rkt, partition, |
182 | sendflags, pbuf, msgsize, |
183 | - key, keylen, |
184 | - (void *)cnt.msgs) == -1) { |
185 | + key, keylen, NULL) == -1) { |
186 | if (!quiet || errno != ENOBUFS) |
187 | printf("produce error: %s%s\n", |
188 | strerror(errno), |
189 | errno == ENOBUFS ? |
190 | " (backpressure)":""); |
191 | cnt.tx_err++; |
192 | + if (errno != ENOBUFS) { |
193 | + run = 0; |
194 | + break; |
195 | + } |
196 | now = rd_clock(); |
197 | if (cnt.t_last + dispintvl <= now) { |
198 | printf("%% Backpressure %i " |
199 | @@ -633,7 +638,7 @@ |
200 | * Consumer |
201 | */ |
202 | |
203 | - rd_kafka_message_t **rkmessages; |
204 | + rd_kafka_message_t **rkmessages = NULL; |
205 | |
206 | #if 0 /* Future API */ |
207 | /* The offset storage file is optional but its presence |
208 | @@ -725,6 +730,9 @@ |
209 | strerror(errno)); |
210 | |
211 | print_stats(mode, 0, compression); |
212 | + |
213 | + /* Poll to handle stats callbacks */ |
214 | + rd_kafka_poll(rk, 0); |
215 | } |
216 | cnt.t_end = rd_clock(); |
217 | |
218 | |
219 | === modified file 'rdkafka.c' |
220 | --- rdkafka.c 2013-11-04 16:50:07 +0000 |
221 | +++ rdkafka.c 2014-01-01 03:04:53 +0000 |
222 | @@ -353,7 +353,7 @@ |
223 | rd_kafka_dbg(rk, QUEUE, "QSERVE", "Serving %i ops", localq.rkq_qlen); |
224 | |
225 | /* Call callback for each op */ |
226 | - TAILQ_FOREACH_SAFE(rko, tmp, &localq.rkq_q, rko_link) { |
227 | + TAILQ_FOREACH_SAFE(rko, &localq.rkq_q, rko_link, tmp) { |
228 | callback(rko, opaque); |
229 | rd_kafka_op_destroy(rko); |
230 | } |
231 | @@ -515,7 +515,7 @@ |
232 | |
233 | /* Decommission all topics */ |
234 | rd_kafka_lock(rk); |
235 | - TAILQ_FOREACH_SAFE(rkt, rkt_tmp, &rk->rk_topics, rkt_link) { |
236 | + TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) { |
237 | rd_kafka_unlock(rk); |
238 | rd_kafka_topic_partitions_remove(rkt); |
239 | rd_kafka_lock(rk); |
240 | @@ -553,7 +553,7 @@ |
241 | size_t size = *sizep; |
242 | int of = *ofp; |
243 | |
244 | - _st_printf("%s{ " |
245 | + _st_printf("%s\"%"PRId32"\": { " |
246 | "\"partition\":%"PRId32", " |
247 | "\"leader\":%"PRId32", " |
248 | "\"desired\":%s, " |
249 | @@ -574,6 +574,7 @@ |
250 | "} ", |
251 | first ? "" : ", ", |
252 | rktp->rktp_partition, |
253 | + rktp->rktp_partition, |
254 | rktp->rktp_leader ? rktp->rktp_leader->rkb_nodeid : -1, |
255 | (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false", |
256 | (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false", |
257 | @@ -618,7 +619,7 @@ |
258 | "\"ts\":%"PRIu64", " |
259 | "\"time\":%lli, " |
260 | "\"replyq\":%i, " |
261 | - "\"brokers\":[ "/*open brokers*/, |
262 | + "\"brokers\":{ "/*open brokers*/, |
263 | now, |
264 | (signed long long)time(NULL), |
265 | rk->rk_rep.rkq_qlen); |
266 | @@ -626,7 +627,8 @@ |
267 | |
268 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
269 | rd_kafka_broker_lock(rkb); |
270 | - _st_printf("%s{ "/*open broker*/ |
271 | + rd_kafka_avg_rollover(&rkb->rkb_rtt_last, &rkb->rkb_rtt_curr); |
272 | + _st_printf("%s\"%s\": { "/*open broker*/ |
273 | "\"name\":\"%s\", " |
274 | "\"nodeid\":%"PRId32", " |
275 | "\"state\":\"%s\", " |
276 | @@ -639,9 +641,16 @@ |
277 | "\"rx\":%"PRIu64", " |
278 | "\"rxbytes\":%"PRIu64", " |
279 | "\"rxerrs\":%"PRIu64", " |
280 | - "\"toppars\":[ "/*open toppars*/, |
281 | + "\"rtt\": {" |
282 | + " \"min\":%"PRIu64"," |
283 | + " \"max\":%"PRIu64"," |
284 | + " \"avg\":%"PRIu64"," |
285 | + " \"cnt\":%i " |
286 | + "}, " |
287 | + "\"toppars\":{ "/*open toppars*/, |
288 | rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ", |
289 | rkb->rkb_name, |
290 | + rkb->rkb_name, |
291 | rkb->rkb_nodeid, |
292 | rd_kafka_broker_state_names[rkb->rkb_state], |
293 | rkb->rkb_outbufs.rkbq_cnt, |
294 | @@ -652,37 +661,43 @@ |
295 | rkb->rkb_c.tx_retries, |
296 | rkb->rkb_c.rx, |
297 | rkb->rkb_c.rx_bytes, |
298 | - rkb->rkb_c.rx_err); |
299 | + rkb->rkb_c.rx_err, |
300 | + rkb->rkb_rtt_last.ra_min, |
301 | + rkb->rkb_rtt_last.ra_max, |
302 | + rkb->rkb_rtt_last.ra_avg, |
303 | + rkb->rkb_rtt_last.ra_cnt); |
304 | |
305 | rd_kafka_broker_toppars_rdlock(rkb); |
306 | TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) { |
307 | - _st_printf("%s{ " |
308 | + _st_printf("%s\"%.*s\": { " |
309 | "\"topic\":\"%.*s\", " |
310 | "\"partition\":%"PRId32"} ", |
311 | rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ", |
312 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
313 | + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
314 | rktp->rktp_partition); |
315 | } |
316 | rd_kafka_broker_toppars_unlock(rkb); |
317 | |
318 | rd_kafka_broker_unlock(rkb); |
319 | |
320 | - _st_printf("] "/*close toppars*/ |
321 | + _st_printf("} "/*close toppars*/ |
322 | "} "/*close broker*/); |
323 | } |
324 | |
325 | |
326 | - _st_printf("], " /* close "brokers" array */ |
327 | - "\"topics\":[ "); |
328 | + _st_printf("}, " /* close "brokers" array */ |
329 | + "\"topics\":{ "); |
330 | |
331 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
332 | int i; |
333 | |
334 | rd_kafka_topic_rdlock(rkt); |
335 | - _st_printf("%s{ " |
336 | + _st_printf("%s\"%.*s\": { " |
337 | "\"topic\":\"%.*s\", " |
338 | - "\"partitions\":[ " /*open partitions*/, |
339 | + "\"partitions\":{ " /*open partitions*/, |
340 | rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ", |
341 | + RD_KAFKAP_STR_PR(rkt->rkt_topic), |
342 | RD_KAFKAP_STR_PR(rkt->rkt_topic)); |
343 | |
344 | for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) |
345 | @@ -699,14 +714,14 @@ |
346 | rkt->rkt_ua, i++ == 0); |
347 | rd_kafka_topic_unlock(rkt); |
348 | |
349 | - _st_printf("] "/*close partitions*/ |
350 | + _st_printf("} "/*close partitions*/ |
351 | "} "/*close topic*/); |
352 | |
353 | } |
354 | |
355 | rd_kafka_unlock(rk); |
356 | |
357 | - _st_printf("] "/*close topics*/ |
358 | + _st_printf("} "/*close topics*/ |
359 | "}"/*close object*/); |
360 | |
361 | |
362 | @@ -842,17 +857,6 @@ |
363 | |
364 | /** |
365 | * Produce a single message. |
366 | - * |
367 | - * If 'partition' is unassigned (RD_KAFKA_PARTITION_UA) the configured or |
368 | - * default partitioner will be used to designate the target partition. |
369 | - * |
370 | - * See rdkafka.h for 'msgflags'. |
371 | - * |
372 | - * Returns: 0 on success or -1 on error (see errno for details) |
373 | - * |
374 | - * errnos: |
375 | - * ENOBUFS - conf.producer.max_msg_cnt would be exceeded. |
376 | - * |
377 | * Locality: any application thread |
378 | */ |
379 | int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition, |
380 | @@ -1279,3 +1283,21 @@ |
381 | int rd_kafka_outq_len (rd_kafka_t *rk) { |
382 | return rk->rk_producer.msg_cnt; |
383 | } |
384 | + |
385 | + |
386 | +int rd_kafka_version (void) { |
387 | + return RD_KAFKA_VERSION; |
388 | +} |
389 | + |
390 | +const char *rd_kafka_version_str (void) { |
391 | + static char ret[64]; |
392 | + int ver = rd_kafka_version(); |
393 | + |
394 | + if (!*ret) |
395 | + snprintf(ret, sizeof(ret), "%i.%i.%i", |
396 | + (ver >> 24) & 0xff, |
397 | + (ver >> 16) & 0xff, |
398 | + (ver >> 8) & 0xff); |
399 | + |
400 | + return ret; |
401 | +} |
402 | |
403 | === modified file 'rdkafka.h' |
404 | --- rdkafka.h 2013-11-04 16:50:07 +0000 |
405 | +++ rdkafka.h 2014-01-01 03:04:53 +0000 |
406 | @@ -43,13 +43,25 @@ |
407 | /** |
408 | * librdkafka version |
409 | * |
410 | - * interpreted as MM.mm.rr.xx: |
411 | + * Interpreted as hex MM.mm.rr.xx: |
412 | * MM = Major |
413 | * mm = minor |
414 | * rr = revision |
415 | * xx = currently unused |
416 | - */ |
417 | -#define RD_KAFKA_VERSION 0x00080000 |
418 | + * |
419 | + * I.e.: 0x00080100 = 0.8.1 |
420 | + */ |
421 | +#define RD_KAFKA_VERSION 0x00080100 |
422 | + |
423 | +/** |
424 | + * Returns the librdkafka version as integer. |
425 | + */ |
426 | +int rd_kafka_version (void); |
427 | + |
428 | +/** |
429 | + * Returns the librdkafka version as string. |
430 | + */ |
431 | +const char *rd_kafka_version_str (void); |
432 | |
433 | |
434 | /** |
435 | @@ -634,8 +646,11 @@ |
436 | * pointer that will provided in the delivery report callback (`dr_cb`) for |
437 | * referencing this message. |
438 | * |
439 | - * Returns 0 on success or -1 if the maximum number of outstanding messages |
440 | - * (conf.producer.max_messages) has been reached (`errno==ENOBUFS`). |
441 | + * Returns 0 on success or -1 on error in which case errno is set accordingly: |
442 | + * ENOBUFS - maximum number of outstanding messages has been reached: |
443 | + * "queue.buffering.max.message" |
444 | + * EMSGSIZE - message is larger than configured max size: |
445 | + * "messages.max.bytes". |
446 | * |
447 | */ |
448 | |
449 | |
450 | === modified file 'rdkafka_broker.c' |
451 | --- rdkafka_broker.c 2013-11-04 16:50:07 +0000 |
452 | +++ rdkafka_broker.c 2014-01-01 03:04:53 +0000 |
453 | @@ -72,7 +72,7 @@ |
454 | int i; |
455 | |
456 | rd_kafka_dbg(rk, MSG, "MSG", "%s: iovlen %zd", |
457 | - what, msg->msg_iovlen); |
458 | + what, (size_t)msg->msg_iovlen); |
459 | |
460 | for (i = 0 ; i < msg->msg_iovlen ; i++) { |
461 | rd_kafka_dbg(rk, MSG, what, |
462 | @@ -221,26 +221,32 @@ |
463 | TAILQ_INIT(&rkbufq->rkbq_bufs); |
464 | rkbufq->rkbq_cnt = 0; |
465 | } |
466 | + |
467 | +/** |
468 | + * Concat all buffers from 'src' to tail of 'dst' |
469 | + */ |
470 | +static void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) { |
471 | + TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link); |
472 | + rd_atomic_add(&dst->rkbq_cnt, src->rkbq_cnt); |
473 | + rd_kafka_bufq_init(src); |
474 | +} |
475 | + |
476 | /** |
477 | * Purge the wait-response queue. |
478 | + * NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps |
479 | + * or rkb_outbufs since buffers may be re-enqueued on those queues. |
480 | */ |
481 | static void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb, |
482 | rd_kafka_bufq_t *rkbufq, |
483 | rd_kafka_resp_err_t err) { |
484 | rd_kafka_buf_t *rkbuf, *tmp; |
485 | - rd_kafka_bufq_t tmpq; |
486 | |
487 | assert(pthread_self() == rkb->rkb_thread); |
488 | |
489 | - /* Move messages to temporary queue to avoid infinite loops if they |
490 | - * are requeued on the same queue in the rkbuf_cb callback. */ |
491 | - rd_kafka_bufq_init(&tmpq); |
492 | - TAILQ_MOVE(&tmpq.rkbq_bufs, &rkbufq->rkbq_bufs, rkbuf_link); |
493 | - rd_kafka_bufq_init(rkbufq); |
494 | - |
495 | - rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq"); |
496 | - |
497 | - TAILQ_FOREACH_SAFE(rkbuf, tmp, &tmpq.rkbq_bufs, rkbuf_link) |
498 | + rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers", |
499 | + rkbufq->rkbq_cnt); |
500 | + |
501 | + TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) |
502 | rkbuf->rkbuf_cb(rkb, err, NULL, rkbuf, rkbuf->rkbuf_opaque); |
503 | } |
504 | |
505 | @@ -255,14 +261,14 @@ |
506 | |
507 | assert(pthread_self() == rkb->rkb_thread); |
508 | |
509 | - TAILQ_FOREACH_SAFE(rkbuf, tmp, |
510 | - &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link) { |
511 | + TAILQ_FOREACH_SAFE(rkbuf, |
512 | + &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link, tmp) { |
513 | if (likely(rkbuf->rkbuf_ts_timeout > now)) |
514 | continue; |
515 | |
516 | rd_kafka_bufq_deq(&rkb->rkb_waitresps, rkbuf); |
517 | |
518 | - rkbuf->rkbuf_cb(rkb, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, |
519 | + rkbuf->rkbuf_cb(rkb, RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, |
520 | NULL, rkbuf, rkbuf->rkbuf_opaque); |
521 | cnt++; |
522 | } |
523 | @@ -286,6 +292,7 @@ |
524 | va_list ap; |
525 | int errno_save = errno; |
526 | rd_kafka_toppar_t *rktp; |
527 | + rd_kafka_bufq_t tmpq; |
528 | |
529 | assert(pthread_self() == rkb->rkb_thread); |
530 | rd_kafka_broker_lock(rkb); |
531 | @@ -338,11 +345,21 @@ |
532 | strlen(rkb->rkb_err.msg)); |
533 | } |
534 | |
535 | - rd_kafka_bufq_purge(rkb, &rkb->rkb_waitresps, err); |
536 | - rd_kafka_bufq_purge(rkb, &rkb->rkb_outbufs, err); |
537 | + /* |
538 | + * Purge all buffers |
539 | + * (put on a temporary queue since bufs may be requeued) |
540 | + */ |
541 | + rd_kafka_bufq_init(&tmpq); |
542 | + rd_kafka_bufq_concat(&tmpq, &rkb->rkb_waitresps); |
543 | + rd_kafka_bufq_concat(&tmpq, &rkb->rkb_outbufs); |
544 | |
545 | + /* Unlock broker since a requeue will try to lock it. */ |
546 | rd_kafka_broker_unlock(rkb); |
547 | |
548 | + /* Purge the buffers */ |
549 | + rd_kafka_bufq_purge(rkb, &tmpq, err); |
550 | + |
551 | + |
552 | /* Undelegate all toppars from this broker. */ |
553 | rd_kafka_broker_toppars_wrlock(rkb); |
554 | while ((rktp = TAILQ_FIRST(&rkb->rkb_toppars))) { |
555 | @@ -382,7 +399,7 @@ |
556 | |
557 | rd_kafka_dbg(rkb->rkb_rk, BROKER, "BRKSEND", |
558 | "sendmsg FAILED for iovlen %zd (%i)", |
559 | - msg->msg_iovlen, |
560 | + (size_t)msg->msg_iovlen, |
561 | IOV_MAX); |
562 | rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT, |
563 | "Send failed: %s", strerror(errno)); |
564 | @@ -390,8 +407,8 @@ |
565 | return -1; |
566 | } |
567 | |
568 | - rkb->rkb_c.tx_bytes += r; |
569 | - rkb->rkb_c.tx++; |
570 | + rd_atomic_add(&rkb->rkb_c.tx_bytes, r); |
571 | + rd_atomic_add(&rkb->rkb_c.tx, 1); |
572 | return r; |
573 | } |
574 | |
575 | @@ -1020,11 +1037,15 @@ |
576 | static rd_kafka_buf_t *rd_kafka_waitresp_find (rd_kafka_broker_t *rkb, |
577 | int32_t corrid) { |
578 | rd_kafka_buf_t *rkbuf; |
579 | + rd_ts_t now = rd_clock(); |
580 | |
581 | assert(pthread_self() == rkb->rkb_thread); |
582 | |
583 | TAILQ_FOREACH(rkbuf, &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link) |
584 | if (rkbuf->rkbuf_corrid == corrid) { |
585 | + rd_kafka_avg_add(&rkb->rkb_rtt_curr, |
586 | + now - rkbuf->rkbuf_ts_sent); |
587 | + |
588 | rd_kafka_bufq_deq(&rkb->rkb_waitresps, rkbuf); |
589 | return rkbuf; |
590 | } |
591 | @@ -1089,10 +1110,13 @@ |
592 | off_t vof = of - len; |
593 | |
594 | if (0) |
595 | - printf(" #%i/%zd and %zd: of %zd, len %zd, " |
596 | - "vof %zd: iov %zd\n", |
597 | - i, src->msg_iovlen, dst->msg_iovlen, |
598 | - of, len, vof, src->msg_iov[i].iov_len); |
599 | + printf(" #%i/%zd and %zd: of %jd, len %zd, " |
600 | + "vof %jd: iov %zd\n", |
601 | + i, |
602 | + (size_t)src->msg_iovlen, |
603 | + (size_t)dst->msg_iovlen, |
604 | + (intmax_t)of, len, (intmax_t)vof, |
605 | + src->msg_iov[i].iov_len); |
606 | if (vof < 0) |
607 | vof = 0; |
608 | |
609 | @@ -1248,8 +1272,8 @@ |
610 | if (rkbuf->rkbuf_of == rkbuf->rkbuf_len + sizeof(rkbuf->rkbuf_reshdr)) { |
611 | /* Message is complete, pass it on to the original requester. */ |
612 | rkb->rkb_recv_buf = NULL; |
613 | - rkb->rkb_c.rx++; |
614 | - rkb->rkb_c.rx_bytes += rkbuf->rkbuf_of; |
615 | + rd_atomic_add(&rkb->rkb_c.rx, 1); |
616 | + rd_atomic_add(&rkb->rkb_c.rx_bytes, rkbuf->rkbuf_of); |
617 | rd_kafka_req_response(rkb, rkbuf); |
618 | } |
619 | |
620 | @@ -1314,6 +1338,32 @@ |
621 | rd_rkb_dbg(rkb, BROKER, "CONNECTED", "connected to %s", |
622 | rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE)); |
623 | |
624 | + /* Set socket send & receive buffer sizes if configuerd */ |
625 | + if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) { |
626 | + if (setsockopt(rkb->rkb_s, SOL_SOCKET, SO_SNDBUF, |
627 | + &rkb->rkb_rk->rk_conf.socket_sndbuf_size, |
628 | + sizeof(rkb->rkb_rk->rk_conf. |
629 | + socket_sndbuf_size)) == -1) |
630 | + rd_rkb_log(rkb, LOG_WARNING, "SNDBUF", |
631 | + "Failed to set socket send " |
632 | + "buffer size to %i: %s", |
633 | + rkb->rkb_rk->rk_conf.socket_sndbuf_size, |
634 | + strerror(errno)); |
635 | + } |
636 | + |
637 | + if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) { |
638 | + if (setsockopt(rkb->rkb_s, SOL_SOCKET, SO_RCVBUF, |
639 | + &rkb->rkb_rk->rk_conf.socket_rcvbuf_size, |
640 | + sizeof(rkb->rkb_rk->rk_conf. |
641 | + socket_rcvbuf_size)) == -1) |
642 | + rd_rkb_log(rkb, LOG_WARNING, "RCVBUF", |
643 | + "Failed to set socket receive " |
644 | + "buffer size to %i: %s", |
645 | + rkb->rkb_rk->rk_conf.socket_rcvbuf_size, |
646 | + strerror(errno)); |
647 | + } |
648 | + |
649 | + |
650 | rd_kafka_broker_lock(rkb); |
651 | rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP); |
652 | rkb->rkb_err.err = 0; |
653 | @@ -1381,6 +1431,9 @@ |
654 | /* Entire buffer sent, unlink from outbuf */ |
655 | rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf); |
656 | |
657 | + /* Store time for RTT calculation */ |
658 | + rkbuf->rkbuf_ts_sent = rd_clock(); |
659 | + |
660 | /* Put buffer on response wait list unless we are not |
661 | * expecting a response (required_acks=0). */ |
662 | if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_NO_RESPONSE)) |
663 | @@ -1606,7 +1659,7 @@ |
664 | if (0) |
665 | rd_rkb_dbg(rkb, MSG, "PRODUCE", |
666 | "Serve %i/%i messages (%i iovecs) " |
667 | - "for %.*s [%"PRId32"] (%zd bytes)", |
668 | + "for %.*s [%"PRId32"] (%"PRIu64" bytes)", |
669 | msgcnt, rktp->rktp_msgq.rkmq_msg_cnt, |
670 | iovcnt, |
671 | RD_KAFKAP_STR_PR(rkt->rkt_topic), |
672 | @@ -1732,6 +1785,12 @@ |
673 | msghdr++; |
674 | } |
675 | |
676 | + /* No messages added, bail out early. */ |
677 | + if (unlikely(rkbuf->rkbuf_msgq.rkmq_msg_cnt == 0)) { |
678 | + rd_kafka_buf_destroy(rkbuf); |
679 | + return -1; |
680 | + } |
681 | + |
682 | /* Compress the messages */ |
683 | if (rkb->rkb_rk->rk_conf.compression_codec) { |
684 | int siovlen = 1; |
685 | @@ -1923,8 +1982,8 @@ |
686 | |
687 | do_send: |
688 | |
689 | - rktp->rktp_c.tx_msgs += rkbuf->rkbuf_msgq.rkmq_msg_cnt; |
690 | - rktp->rktp_c.tx_bytes += prodhdr->part2.MessageSetSize; |
691 | + rd_atomic_add(&rktp->rktp_c.tx_msgs, rkbuf->rkbuf_msgq.rkmq_msg_cnt); |
692 | + rd_atomic_add(&rktp->rktp_c.tx_bytes, prodhdr->part2.MessageSetSize); |
693 | |
694 | prodhdr->part2.MessageSetSize = |
695 | htonl(prodhdr->part2.MessageSetSize); |
696 | @@ -2104,8 +2163,10 @@ |
697 | while (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0) { |
698 | int r = rd_kafka_broker_produce_toppar( |
699 | rkb, rktp); |
700 | - if (r > 0) |
701 | + if (likely(r > 0)) |
702 | cnt += r; |
703 | + else |
704 | + break; |
705 | } |
706 | } |
707 | |
708 | @@ -2114,7 +2175,7 @@ |
709 | /* Trigger delivery report for timed out messages */ |
710 | if (unlikely(timedout.rkmq_msg_cnt > 0)) |
711 | rd_kafka_dr_msgq(rkb->rkb_rk, &timedout, |
712 | - RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT); |
713 | + RD_KAFKA_RESP_ERR__MSG_TIMED_OUT); |
714 | |
715 | rd_kafka_broker_toppars_unlock(rkb); |
716 | |
717 | @@ -3169,10 +3230,11 @@ |
718 | |
719 | int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) { |
720 | char *s = strdupa(brokerlist); |
721 | - char *t, *n; |
722 | + char *t, *t2, *n; |
723 | int cnt = 0; |
724 | rd_kafka_broker_t *rkb; |
725 | |
726 | + /* Parse comma-separated list of brokers. */ |
727 | while (*s) { |
728 | uint16_t port = 0; |
729 | |
730 | @@ -3186,7 +3248,17 @@ |
731 | else |
732 | n = s + strlen(s)-1; |
733 | |
734 | - if ((t = strchr(s, ':'))) { |
735 | + /* Check if port has been specified, but try to identify IPv6 |
736 | + * addresses first: |
737 | + * t = last ':' in string |
738 | + * t2 = first ':' in string |
739 | + * If t and t2 are equal then only one ":" exists in name |
740 | + * and thus an IPv4 address with port specified. |
741 | + * Else if not equal and t is prefixed with "]" then it's an |
742 | + * IPv6 address with port specified. |
743 | + * Else no port specified. */ |
744 | + if ((t = strrchr(s, ':')) && |
745 | + ((t2 = strchr(s, ':')) == t || *(t-1) == ']')) { |
746 | *t = '\0'; |
747 | port = atoi(t+1); |
748 | } |
749 | |
750 | === modified file 'rdkafka_defaultconf.c' |
751 | --- rdkafka_defaultconf.c 2013-11-04 16:50:07 +0000 |
752 | +++ rdkafka_defaultconf.c 2014-01-01 03:04:53 +0000 |
753 | @@ -122,6 +122,14 @@ |
754 | { _RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms), |
755 | "Timeout for network requests.", |
756 | 10, 300*1000, 60*1000 }, |
757 | + { _RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT, |
758 | + _RK(socket_sndbuf_size), |
759 | + "Broker socket send buffer size. System default is used if 0.", |
760 | + 0, 100000000, 0 }, |
761 | + { _RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT, |
762 | + _RK(socket_rcvbuf_size), |
763 | + "Broker socket receive buffer size. System default is used if 0.", |
764 | + 0, 100000000, 0 }, |
765 | { _RK_GLOBAL, "broker.address.ttl", _RK_C_INT, |
766 | _RK(broker_addr_ttl), |
767 | "How long to cache the broker address resolving results.", |
768 | |
769 | === modified file 'rdkafka_int.h' |
770 | --- rdkafka_int.h 2013-11-04 16:50:07 +0000 |
771 | +++ rdkafka_int.h 2014-01-01 03:04:53 +0000 |
772 | @@ -123,6 +123,8 @@ |
773 | int debug; |
774 | int broker_addr_ttl; |
775 | int socket_timeout_ms; |
776 | + int socket_sndbuf_size; |
777 | + int socket_rcvbuf_size; |
778 | char *clientid; |
779 | char *brokerlist; |
780 | int stats_interval_ms; |
781 | @@ -205,6 +207,43 @@ |
782 | |
783 | |
784 | |
785 | +typedef struct rd_kafka_avg_s { |
786 | + rd_ts_t ra_max; |
787 | + rd_ts_t ra_min; |
788 | + rd_ts_t ra_avg; |
789 | + rd_ts_t ra_sum; |
790 | + int ra_cnt; |
791 | +} rd_kafka_avg_t; |
792 | + |
793 | +/** |
794 | + * Add timestamp 'ts' to averager 'ra'. |
795 | + */ |
796 | +static RD_UNUSED void rd_kafka_avg_add (rd_kafka_avg_t *ra, rd_ts_t ts) { |
797 | + if (ts > ra->ra_max) |
798 | + ra->ra_max = ts; |
799 | + if (ra->ra_min == 0 || ts < ra->ra_min) |
800 | + ra->ra_min = ts; |
801 | + ra->ra_sum += ts; |
802 | + ra->ra_cnt++; |
803 | +} |
804 | + |
805 | +/** |
806 | + * Rolls over statistics in 'src' and stores the average in 'dst'. |
807 | + * 'src' is cleared and ready to be reused. |
808 | + */ |
809 | +static RD_UNUSED void rd_kafka_avg_rollover (rd_kafka_avg_t *dst, |
810 | + rd_kafka_avg_t *src) { |
811 | + *dst = *src; |
812 | + if (dst->ra_cnt) |
813 | + dst->ra_avg = dst->ra_sum / dst->ra_cnt; |
814 | + else |
815 | + dst->ra_avg = 0; |
816 | + |
817 | + memset(src, 0, sizeof(*src)); |
818 | +} |
819 | + |
820 | + |
821 | + |
822 | typedef struct rd_kafka_msg_s { |
823 | TAILQ_ENTRY(rd_kafka_msg_s) rkm_link; |
824 | int rkm_flags; |
825 | @@ -405,6 +444,9 @@ |
826 | rd_kafka_bufq_t rkb_waitresps; |
827 | rd_kafka_bufq_t rkb_retrybufs; |
828 | |
829 | + rd_kafka_avg_t rkb_rtt_curr; /* Current averaging period */ |
830 | + rd_kafka_avg_t rkb_rtt_last; /* Last averaging period */ |
831 | + |
832 | char rkb_name[128]; /* Display name */ |
833 | char rkb_nodename[128]; /* host:port */ |
834 | |
835 | |
836 | === modified file 'rdkafka_msg.c' |
837 | --- rdkafka_msg.c 2013-11-04 16:50:07 +0000 |
838 | +++ rdkafka_msg.c 2014-01-01 03:04:53 +0000 |
839 | @@ -64,6 +64,12 @@ |
840 | size_t mlen = sizeof(*rkm); |
841 | |
842 | assert(len > 0); |
843 | + |
844 | + if (unlikely(len + keylen > rkt->rkt_rk->rk_conf.max_msg_size)) { |
845 | + errno = EMSGSIZE; |
846 | + return -1; |
847 | + } |
848 | + |
849 | if (unlikely(rd_atomic_add(&rkt->rkt_rk->rk_producer.msg_cnt, 1) > |
850 | rkt->rkt_rk->rk_conf.queue_buffering_max_msgs)) { |
851 | rd_atomic_sub(&rkt->rkt_rk->rk_producer.msg_cnt, 1); |
852 | @@ -115,7 +121,7 @@ |
853 | int cnt = timedout->rkmq_msg_cnt; |
854 | |
855 | /* Assume messages are added in time sequencial order */ |
856 | - TAILQ_FOREACH_SAFE(rkm, tmp, &rkmq->rkmq_msgs, rkm_link) { |
857 | + TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) { |
858 | if (likely(rkm->rkm_ts_timeout > now)) |
859 | break; |
860 | |
861 | |
862 | === modified file 'rdkafka_proto.h' |
863 | --- rdkafka_proto.h 2013-11-04 16:50:07 +0000 |
864 | +++ rdkafka_proto.h 2014-01-01 03:04:53 +0000 |
865 | @@ -81,22 +81,22 @@ |
866 | #define RD_KAFKAP_STR_LEN_NULL -1 |
867 | /* Returns the actual size of a kafka protocol string representation. */ |
868 | #define RD_KAFKAP_STR_SIZE(kstr) (int16_t)(sizeof((kstr)->len) + \ |
869 | - (ntohs((kstr)->len) == \ |
870 | + ((int16_t)ntohs((kstr)->len) == \ |
871 | RD_KAFKAP_STR_LEN_NULL ? \ |
872 | 0 : ntohs((kstr)->len))) |
873 | /* Returns the length of the string of a kafka protocol string representation */ |
874 | #define RD_KAFKAP_STR_LEN(kstr) (int)((ntohs((kstr)->len) == \ |
875 | RD_KAFKAP_STR_LEN_NULL ? \ |
876 | - 0 : ntohs((kstr)->len))) |
877 | + 0 : (int16_t)ntohs((kstr)->len))) |
878 | |
879 | |
880 | /* Macro suitable for "%.*s" printing. */ |
881 | #define RD_KAFKAP_STR_PR(kstr) \ |
882 | - (ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL ? \ |
883 | + ((int16_t)ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL ? \ |
884 | 0 : (int)ntohs((kstr)->len)), (kstr)->str |
885 | |
886 | #define RD_KAFKAP_STR_IS_NULL(kstr) \ |
887 | - (ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL) |
888 | + ((int16_t)ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL) |
889 | |
890 | static inline int rd_kafkap_str_cmp (const rd_kafkap_str_t *a, |
891 | const rd_kafkap_str_t *b) RD_UNUSED; |
892 | @@ -137,7 +137,7 @@ |
893 | kstr->len = ntohs(len); |
894 | memcpy(kstr->str, str, len+1); |
895 | } else |
896 | - kstr->len = ntohs(RD_KAFKAP_STR_LEN_NULL); |
897 | + kstr->len = (int16_t)ntohs(RD_KAFKAP_STR_LEN_NULL); |
898 | |
899 | return kstr; |
900 | } |
901 | @@ -163,16 +163,17 @@ |
902 | #define RD_KAFKAP_BYTES_LEN_NULL -1 |
903 | /* Returns the actual size of a kafka protocol bytes representation. */ |
904 | #define RD_KAFKAP_BYTES_SIZE(kbytes) (int32_t)(sizeof((kbytes)->len) + \ |
905 | - (ntohl((kbytes)->len) == \ |
906 | + ((int32_t)ntohl((kbytes)->len)==\ |
907 | RD_KAFKAP_BYTES_LEN_NULL ? \ |
908 | 0 : ntohl((kbytes)->len))) |
909 | /* Returns the length of the string of a kafka protocol bytes representation */ |
910 | -#define RD_KAFKAP_BYTES_LEN(kbytes) (int32_t)((ntohl((kbytes)->len) == \ |
911 | +#define RD_KAFKAP_BYTES_LEN(kbytes) (int32_t)(((int32_t)ntohl((kbytes)->len) ==\ |
912 | RD_KAFKAP_BYTES_LEN_NULL ? \ |
913 | - 0 : ntohl((kbytes)->len))) |
914 | + 0 : \ |
915 | + (int32_t)ntohl((kbytes)->len))) |
916 | |
917 | #define RD_KAFKAP_BYTES_IS_NULL(kbytes) \ |
918 | - (ntohs((kbytes)->len) == RD_KAFKAP_STR_LEN_NULL) |
919 | + ((int32_t)ntohl((kbytes)->len) == RD_KAFKAP_STR_LEN_NULL) |
920 | |
921 | |
922 | static inline int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a, |
923 | @@ -207,7 +208,7 @@ |
924 | kbytes->len = ntohl(datalen); |
925 | memcpy(kbytes->data, data, datalen); |
926 | } else |
927 | - kbytes->len = ntohl(RD_KAFKAP_BYTES_LEN_NULL); |
928 | + kbytes->len = (int32_t)ntohl(RD_KAFKAP_BYTES_LEN_NULL); |
929 | |
930 | return kbytes; |
931 | } |
932 | |
933 | === modified file 'rdkafka_topic.c' |
934 | --- rdkafka_topic.c 2013-11-04 16:50:07 +0000 |
935 | +++ rdkafka_topic.c 2014-01-01 03:04:53 +0000 |
936 | @@ -528,7 +528,7 @@ |
937 | int32_t leader) { |
938 | rd_kafka_topic_t *rkt; |
939 | rd_kafka_toppar_t *rktp; |
940 | - rd_kafka_broker_t *rkb; |
941 | + rd_kafka_broker_t *rkb = NULL; |
942 | |
943 | if (!(rkt = rd_kafka_topic_find(rk, topic))) { |
944 | rd_kafka_dbg(rk, METADATA, "TOPICUPD", |
945 | @@ -537,40 +537,35 @@ |
946 | } |
947 | |
948 | /* Find broker */ |
949 | - rd_kafka_lock(rk); |
950 | - rkb = rd_kafka_broker_find_by_nodeid(rk, leader); |
951 | - rd_kafka_unlock(rk); |
952 | - |
953 | + if (leader != -1) { |
954 | + rd_kafka_lock(rk); |
955 | + rkb = rd_kafka_broker_find_by_nodeid(rk, leader); |
956 | + rd_kafka_unlock(rk); |
957 | + } |
958 | |
959 | rd_kafka_topic_wrlock(rkt); |
960 | |
961 | rktp = rd_kafka_toppar_get(rkt, partition, 0); |
962 | assert(rktp); |
963 | |
964 | - if (leader == -1) { |
965 | - /* Topic lost its leader */ |
966 | - rd_kafka_toppar_broker_delegate(rktp, NULL); |
967 | - rd_kafka_topic_unlock(rkt); |
968 | - |
969 | - /* Query for the topic leader (async) */ |
970 | - rd_kafka_topic_leader_query(rk, rkt); |
971 | - |
972 | - rd_kafka_toppar_destroy(rktp); /* from get() */ |
973 | - rd_kafka_topic_destroy(rkt); /* from find() */ |
974 | - return; |
975 | - } |
976 | - |
977 | - |
978 | if (!rkb) { |
979 | - rd_kafka_log(rk, LOG_NOTICE, "TOPICBRK", |
980 | - "Topic %s [%"PRId32"] migrated to unknown " |
981 | - "broker %"PRId32": requesting metadata update", |
982 | - topic, partition, leader); |
983 | + int had_leader = rktp->rktp_leader ? 1 : 0; |
984 | + |
985 | + if (leader == -1) |
986 | + /* Topic lost its leader */; |
987 | + else |
988 | + rd_kafka_log(rk, LOG_NOTICE, "TOPICBRK", |
989 | + "Topic %s [%"PRId32"] migrated to unknown " |
990 | + "broker %"PRId32": " |
991 | + "requesting metadata update", |
992 | + topic, partition, leader); |
993 | + |
994 | rd_kafka_toppar_broker_delegate(rktp, NULL); |
995 | rd_kafka_topic_unlock(rkt); |
996 | |
997 | /* Query for the topic leader (async) */ |
998 | - rd_kafka_topic_leader_query(rk, rkt); |
999 | + if (had_leader) |
1000 | + rd_kafka_topic_leader_query(rk, rkt); |
1001 | |
1002 | rd_kafka_toppar_destroy(rktp); /* from get() */ |
1003 | rd_kafka_topic_destroy(rkt); /* from find() */ |
1004 | @@ -804,7 +799,7 @@ |
1005 | cnt = uas.rkmq_msg_cnt; |
1006 | rd_kafka_toppar_unlock(rktp_ua); |
1007 | |
1008 | - TAILQ_FOREACH_SAFE(rkm, tmp, &uas.rkmq_msgs, rkm_link) { |
1009 | + TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) { |
1010 | if (unlikely(rd_kafka_msg_partitioner(rkt, NULL, rkm) == -1)) { |
1011 | /* Desired partition not available */ |
1012 | rd_kafka_msgq_enq(&failed, rkm); |
1013 | |
1014 | === modified file 'rdsysqueue.h' |
1015 | --- rdsysqueue.h 2013-11-04 16:50:07 +0000 |
1016 | +++ rdsysqueue.h 2014-01-01 03:04:53 +0000 |
1017 | @@ -125,13 +125,6 @@ |
1018 | (*(((struct headname *)((elm)->field.tqe_prev))->tqh_last)) |
1019 | #endif |
1020 | |
1021 | -#ifdef TAILQ_FOREACH_SAFE |
1022 | -#ifdef __APPLE__ |
1023 | -/* Apple's .._SAFE macro has the temporary variable at the end. */ |
1024 | -#undef TAILQ_FOREACH_SAFE |
1025 | -#endif |
1026 | -#endif |
1027 | - |
1028 | #ifndef TAILQ_FOREACH_SAFE |
1029 | /* |
1030 | * TAILQ_FOREACH_SAFE() provides a traversal where the current iterated element |
1031 | @@ -139,7 +132,7 @@ |
1032 | * It does not allow freeing or modifying any other element in the list, |
1033 | * at least not the next element. |
1034 | */ |
1035 | -#define TAILQ_FOREACH_SAFE(elm,tmpelm,head,field) \ |
1036 | +#define TAILQ_FOREACH_SAFE(elm,head,field,tmpelm) \ |
1037 | for ((elm) = TAILQ_FIRST(head) ; \ |
1038 | (elm) && ((tmpelm) = TAILQ_NEXT((elm), field), 1) ; \ |
1039 | (elm) = (tmpelm)) |
1040 | |
1041 | === added file 'tests/0003-msgmaxsize.c' |
1042 | --- tests/0003-msgmaxsize.c 1970-01-01 00:00:00 +0000 |
1043 | +++ tests/0003-msgmaxsize.c 2014-01-01 03:04:53 +0000 |
1044 | @@ -0,0 +1,169 @@ |
1045 | +/* |
1046 | + * librdkafka - Apache Kafka C library |
1047 | + * |
1048 | + * Copyright (c) 2012-2013, Magnus Edenhill |
1049 | + * All rights reserved. |
1050 | + * |
1051 | + * Redistribution and use in source and binary forms, with or without |
1052 | + * modification, are permitted provided that the following conditions are met: |
1053 | + * |
1054 | + * 1. Redistributions of source code must retain the above copyright notice, |
1055 | + * this list of conditions and the following disclaimer. |
1056 | + * 2. Redistributions in binary form must reproduce the above copyright notice, |
1057 | + * this list of conditions and the following disclaimer in the documentation |
1058 | + * and/or other materials provided with the distribution. |
1059 | + * |
1060 | + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
1061 | + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
1062 | + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
1063 | + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
1064 | + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
1065 | + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
1066 | + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
1067 | + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
1068 | + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
1069 | + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
1070 | + * POSSIBILITY OF SUCH DAMAGE. |
1071 | + */ |
1072 | + |
1073 | +/** |
1074 | + * Tests "message.bytes.max" |
1075 | + * Issue #24 |
1076 | + */ |
1077 | + |
1078 | +#define _GNU_SOURCE |
1079 | +#include <signal.h> |
1080 | +#include <sys/time.h> |
1081 | +#include <time.h> |
1082 | + |
1083 | +#include "test.h" |
1084 | + |
1085 | +/* Typical include path would be <librdkafka/rdkafka.h>, but this program |
1086 | + * is built from within the librdkafka source tree and thus differs. */ |
1087 | +#include "rdkafka.h" /* for Kafka driver */ |
1088 | + |
1089 | + |
1090 | +static int msgs_wait = 0; /* bitmask */ |
1091 | + |
1092 | +/** |
1093 | + * Delivery report callback. |
1094 | + * Called for each message once to signal its delivery status. |
1095 | + */ |
1096 | +static void dr_cb (rd_kafka_t *rk, void *payload, size_t len, |
1097 | + rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) { |
1098 | + int msgid = *(int *)msg_opaque; |
1099 | + |
1100 | + free(msg_opaque); |
1101 | + |
1102 | + if (err) |
1103 | + TEST_FAIL("Unexpected delivery error for message #%i: %s\n", |
1104 | + msgid, rd_kafka_err2str(err)); |
1105 | + |
1106 | + if (!(msgs_wait & (1 << msgid))) |
1107 | + TEST_FAIL("Unwanted delivery report for message #%i " |
1108 | + "(waiting for 0x%x)\n", msgid, msgs_wait); |
1109 | + |
1110 | + TEST_SAY("Delivery report for message #%i: %s\n", |
1111 | + msgid, rd_kafka_err2str(err)); |
1112 | + |
1113 | + msgs_wait &= ~(1 << msgid); |
1114 | +} |
1115 | + |
1116 | + |
1117 | +int main (int argc, char **argv) { |
1118 | + char *topic = "rdkafkatest1"; |
1119 | + int partition = 0; |
1120 | + int r; |
1121 | + rd_kafka_t *rk; |
1122 | + rd_kafka_topic_t *rkt; |
1123 | + rd_kafka_conf_t *conf; |
1124 | + rd_kafka_topic_conf_t *topic_conf; |
1125 | + char errstr[512]; |
1126 | + char msg[100000]; |
1127 | + int msgcnt = 10; |
1128 | + int i; |
1129 | + |
1130 | + /* Socket hangups are gracefully handled in librdkafka on socket error |
1131 | + * without the use of signals, so SIGPIPE should be ignored by the |
1132 | + * calling program. */ |
1133 | + signal(SIGPIPE, SIG_IGN); |
1134 | + |
1135 | + |
1136 | + test_conf_init(&conf, &topic_conf, 10); |
1137 | + |
1138 | + /* Set a small maximum message size. */ |
1139 | + if (rd_kafka_conf_set(conf, "message.max.bytes", "100000", |
1140 | + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) |
1141 | + TEST_FAIL("%s\n", errstr); |
1142 | + |
1143 | + /* Set delivery report callback */ |
1144 | + rd_kafka_conf_set_dr_cb(conf, dr_cb); |
1145 | + |
1146 | + /* Create kafka instance */ |
1147 | + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, |
1148 | + errstr, sizeof(errstr)); |
1149 | + if (!rk) |
1150 | + TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr); |
1151 | + |
1152 | + TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk)); |
1153 | + |
1154 | + rkt = rd_kafka_topic_new(rk, topic, topic_conf); |
1155 | + if (!rkt) |
1156 | + TEST_FAIL("Failed to create topic: %s\n", |
1157 | + strerror(errno)); |
1158 | + |
1159 | + memset(msg, 0, sizeof(msg)); |
1160 | + |
1161 | + /* Produce 'msgcnt' messages, size odd ones larger than max.bytes, |
1162 | + * and even ones smaller than max.bytes. */ |
1163 | + for (i = 0 ; i < msgcnt ; i++) { |
1164 | + int *msgidp = malloc(sizeof(*msgidp)); |
1165 | + size_t len; |
1166 | + int toobig = i & 1; |
1167 | + |
1168 | + *msgidp = i; |
1169 | + if (toobig) { |
1170 | + /* Too big */ |
1171 | + len = 200000; |
1172 | + } else { |
1173 | + /* Good size */ |
1174 | + len = 5000; |
1175 | + msgs_wait |= (1 << i); |
1176 | + } |
1177 | + |
1178 | + snprintf(msg, sizeof(msg), "%s test message #%i", argv[0], i); |
1179 | + r = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, |
1180 | + msg, len, NULL, 0, msgidp); |
1181 | + |
1182 | + if (toobig) { |
1183 | + if (r != -1) |
1184 | + TEST_FAIL("Succeeded to produce too " |
1185 | + "large message #%i\n", i); |
1186 | + free(msgidp); |
1187 | + } else if (r == -1) |
1188 | + TEST_FAIL("Failed to produce message #%i: %s\n", |
1189 | + i, strerror(errno)); |
1190 | + } |
1191 | + |
1192 | + /* Wait for messages to be delivered. */ |
1193 | + while (rd_kafka_outq_len(rk) > 0) |
1194 | + rd_kafka_poll(rk, 50); |
1195 | + |
1196 | + if (msgs_wait != 0) |
1197 | + TEST_FAIL("Still waiting for messages: 0x%x\n", msgs_wait); |
1198 | + |
1199 | + /* Destroy topic */ |
1200 | + rd_kafka_topic_destroy(rkt); |
1201 | + |
1202 | + /* Destroy rdkafka instance */ |
1203 | + TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); |
1204 | + rd_kafka_destroy(rk); |
1205 | + |
1206 | + /* Wait for everything to be cleaned up since broker destroys are |
1207 | + * handled in its own thread. */ |
1208 | + test_wait_exit(10); |
1209 | + |
1210 | + /* If we havent failed at this point then |
1211 | + * there were no threads leaked */ |
1212 | + return 0; |
1213 | +} |
1214 | |
1215 | === modified file 'tests/Makefile' |
1216 | --- tests/Makefile 2013-11-04 16:50:07 +0000 |
1217 | +++ tests/Makefile 2014-01-01 03:04:53 +0000 |
1218 | @@ -1,4 +1,4 @@ |
1219 | -TESTS ?= 0001-multiobj.test 0002-msgtimeout.test |
1220 | +TESTS ?= 0001-multiobj.test 0002-msgtimeout.test 0003-msgmaxsize.test |
1221 | CC ?= cc |
1222 | CXX ?= g++ |
1223 | CFLAGS += -g -Wall -Werror -Wfloat-equal -Wpointer-arith -O2 -I../ |
Thanks!