Merge lp:~noskcaj/ubuntu/trusty/librdkafka/0.8.1 into lp:ubuntu/trusty/librdkafka

Proposed by Jackson Doak
Status: Merged
Merge reported by: Martin Pitt
Merged at revision: not available
Proposed branch: lp:~noskcaj/ubuntu/trusty/librdkafka/0.8.1
Merge into: lp:ubuntu/trusty/librdkafka
Diff against target: 1223 lines (+475/-123)
19 files modified
CONFIGURATION.md (+2/-0)
Makefile (+4/-1)
README.md (+5/-5)
debian/changelog (+8/-0)
debian/control (+2/-1)
debian/librdkafka1.symbols (+2/-0)
examples/rdkafka_example.c (+5/-0)
examples/rdkafka_performance.c (+16/-8)
rdkafka.c (+48/-26)
rdkafka.h (+20/-5)
rdkafka_broker.c (+104/-32)
rdkafka_defaultconf.c (+8/-0)
rdkafka_int.h (+42/-0)
rdkafka_msg.c (+7/-1)
rdkafka_proto.h (+11/-10)
rdkafka_topic.c (+20/-25)
rdsysqueue.h (+1/-8)
tests/0003-msgmaxsize.c (+169/-0)
tests/Makefile (+1/-1)
To merge this branch: bzr merge lp:~noskcaj/ubuntu/trusty/librdkafka/0.8.1
Reviewer Review Type Date Requested Status
Martin Pitt Approve
Review via email: mp+200240@code.launchpad.net

Description of the change

New upstream release, should fix ftbfs. I cannot test as i don't have working PPC hardware, but http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=730506 suggests this should work.

To post a comment you must log in.
Revision history for this message
Martin Pitt (pitti) wrote :

Thanks!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'CONFIGURATION.md'
2--- CONFIGURATION.md 2013-11-04 16:50:07 +0000
3+++ CONFIGURATION.md 2014-01-01 03:04:53 +0000
4@@ -11,6 +11,8 @@
5 topic.metadata.refresh.fast.interval.ms | 250 | See `topic.metadata.refresh.fast.cnt` description
6 debug | | A comma-separated list of debug contexts to enable: all,generic,broker,topic,metadata,producer,queue,msg
7 socket.timeout.ms | 60000 | Timeout for network requests.
8+socket.send.buffer.bytes | 0 | Broker socket send buffer size. System default is used if 0.
9+socket.receive.buffer.bytes | 0 | Broker socket receive buffer size. System default is used if 0.
10 broker.address.ttl | 300000 | How long to cache the broker address resolving results.
11 statistics.interval.ms | 0 | librdkafka statistics emit interval. The application also needs to register a stats callback using `rd_kafka_conf_set_stats_cb()`. The granularity is 1000ms.
12 queued.min.messages | 100000 | Minimum number of messages that should to be available for consumption by application.
13
14=== modified file 'Makefile'
15--- Makefile 2013-11-04 16:50:07 +0000
16+++ Makefile 2014-01-01 03:04:53 +0000
17@@ -17,7 +17,9 @@
18 CFLAGS+=-g
19
20 # Clang warnings to ignore
21-CFLAGS+=-Wno-gnu-designator
22+ifeq ($(CC),clang)
23+ CFLAGS+=-Wno-gnu-designator
24+endif
25
26 # Enable iovecs in snappy
27 CFLAGS+=-DSG
28@@ -82,5 +84,6 @@
29 rm -f $(OBJS) $(DEPS) \
30 $(LIBNAME)*.a $(LIBNAME)*.so $(LIBNAME)*.so.$(LIBVER)
31 make -C tests clean
32+ make -C examples clean
33
34 -include $(DEPS)
35
36=== modified file 'README.md'
37--- README.md 2013-11-04 16:50:07 +0000
38+++ README.md 2014-01-01 03:04:53 +0000
39@@ -24,12 +24,12 @@
40 * Producer: supported
41 * Consumer: supported
42 * Compression: snappy and gzip
43- * Debian package: work in progress (separate "debian" branch)
44+ * Debian package: "debian" branch ([ITP #710271](http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=710271))
45 * ZooKeeper: not supported
46- * API: not backwards compatible
47+ * API: Stable, not backwards compatible
48 * Tests: Regression tests in `tests/` directory.
49 * Statistics: JSON formatted, see `rd_kafka_conf_set_stats_cb` in `rdkafka.h`.
50- * Status: Testing, APIs subject to change.
51+ * Status: Stable
52
53
54 **Apache Kafka 0.7 support:**
55@@ -117,10 +117,10 @@
56 [GitHub Issues](https://github.com/edenhill/librdkafka/issues)
57
58
59-Questions and discussions are also welcome on irc.freenode.org, #kafka,
60+Questions and discussions are also welcome on irc.freenode.org, #apache-kafka,
61 nickname Snaps.
62
63
64-### Commerical support
65+### Commercial support
66
67 Commercial support is available from [Edenhill services](http://www.edenhill.se)
68
69=== modified file 'debian/changelog'
70--- debian/changelog 2013-11-04 16:50:07 +0000
71+++ debian/changelog 2014-01-01 03:04:53 +0000
72@@ -1,3 +1,11 @@
73+librdkafka (0.8.1-0ubuntu1) trusty; urgency=medium
74+
75+ * New upstream release.
76+ - Should fix FTBFS at debian bug 730506
77+ * Update symbols
78+
79+ -- Jackson Doak <noskcaj@ubuntu.com> Wed, 01 Jan 2014 13:55:40 +1100
80+
81 librdkafka (0.8.0-1) unstable; urgency=low
82
83 * Initial release. (Closes: #710271)
84
85=== modified file 'debian/control'
86--- debian/control 2013-11-04 16:50:07 +0000
87+++ debian/control 2014-01-01 03:04:53 +0000
88@@ -1,6 +1,7 @@
89 Source: librdkafka
90 Priority: optional
91-Maintainer: Faidon Liambotis <paravoid@debian.org>
92+Maintainer: Ubuntu Developers <ubuntu-devel-discuss@lists.ubuntu.com>
93+XSBC-Original-Maintainer: Faidon Liambotis <paravoid@debian.org>
94 Build-Depends: debhelper (>= 9), zlib1g-dev
95 Standards-Version: 3.9.4
96 Section: libs
97
98=== modified file 'debian/librdkafka1.symbols'
99--- debian/librdkafka1.symbols 2013-11-04 16:50:07 +0000
100+++ debian/librdkafka1.symbols 2014-01-01 03:04:53 +0000
101@@ -84,4 +84,6 @@
102 rd_kafka_toppar_insert_msg@Base 0.8.0
103 rd_kafka_toppar_insert_msgq@Base 0.8.0
104 rd_kafka_toppar_ua_move@Base 0.8.0
105+ rd_kafka_version@Base 0.8.1
106+ rd_kafka_version_str@Base 0.8.1
107 rd_kafka_wait_destroyed@Base 0.8.0
108
109=== modified file 'examples/rdkafka_example.c'
110--- examples/rdkafka_example.c 2013-11-04 16:50:07 +0000
111+++ examples/rdkafka_example.c 2014-01-01 03:04:53 +0000
112@@ -214,6 +214,8 @@
113 "Usage: %s [-C|-P] -t <topic> "
114 "[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
115 "\n"
116+ "librdkafka version %s (0x%08x)\n"
117+ "\n"
118 " Options:\n"
119 " -C | -P Consumer or Producer mode\n"
120 " -t <topic> Topic to fetch / produce\n"
121@@ -231,8 +233,11 @@
122 " writes fetched messages to stdout\n"
123 " In Producer mode:\n"
124 " reads messages from stdin and sends to broker\n"
125+ "\n"
126+ "\n"
127 "\n",
128 argv[0],
129+ rd_kafka_version_str(), rd_kafka_version(),
130 RD_KAFKA_DEBUG_CONTEXTS);
131 exit(1);
132 }
133
134=== modified file 'examples/rdkafka_performance.c'
135--- examples/rdkafka_performance.c 2013-11-04 16:50:07 +0000
136+++ examples/rdkafka_performance.c 2014-01-01 03:04:53 +0000
137@@ -87,7 +87,6 @@
138 void *payload, size_t len,
139 int error_code,
140 void *opaque, void *msg_opaque) {
141- long int msgid = (long int)msg_opaque;
142 static rd_ts_t last;
143 rd_ts_t now = rd_clock();
144 static int msgs;
145@@ -106,12 +105,12 @@
146 !(msgs_wait_cnt % (dispintvl / 1000)) ||
147 (now - last) >= dispintvl * 1000) {
148 if (error_code)
149- printf("Message %ld delivey failed: %s (%li remain)\n",
150- msgid, rd_kafka_err2str(error_code),
151+ printf("Message delivey failed: %s (%li remain)\n",
152+ rd_kafka_err2str(error_code),
153 msgs_wait_cnt);
154 else if (!quiet)
155- printf("Message %ld delivered: %li remain\n",
156- msgid, msgs_wait_cnt);
157+ printf("Message delivered: %li remain\n",
158+ msgs_wait_cnt);
159 if (!quiet && do_seq)
160 printf(" --> \"%.*s\"\n", (int)len, (char *)payload);
161 last = now;
162@@ -413,6 +412,8 @@
163 "Usage: %s [-C|-P] -t <topic> "
164 "[-p <partition>] [-b <broker,broker..>] [options..]\n"
165 "\n"
166+ "librdkafka version %s (0x%08x)\n"
167+ "\n"
168 " Options:\n"
169 " -C | -P Consumer or Producer mode\n"
170 " -t <topic> Topic to fetch / produce\n"
171@@ -454,6 +455,7 @@
172 " writes messages of size -s <..> and prints thruput\n"
173 "\n",
174 argv[0],
175+ rd_kafka_version_str(), rd_kafka_version(),
176 RD_KAFKA_DEBUG_CONTEXTS);
177 exit(1);
178 }
179@@ -560,14 +562,17 @@
180 while (run &&
181 rd_kafka_produce(rkt, partition,
182 sendflags, pbuf, msgsize,
183- key, keylen,
184- (void *)cnt.msgs) == -1) {
185+ key, keylen, NULL) == -1) {
186 if (!quiet || errno != ENOBUFS)
187 printf("produce error: %s%s\n",
188 strerror(errno),
189 errno == ENOBUFS ?
190 " (backpressure)":"");
191 cnt.tx_err++;
192+ if (errno != ENOBUFS) {
193+ run = 0;
194+ break;
195+ }
196 now = rd_clock();
197 if (cnt.t_last + dispintvl <= now) {
198 printf("%% Backpressure %i "
199@@ -633,7 +638,7 @@
200 * Consumer
201 */
202
203- rd_kafka_message_t **rkmessages;
204+ rd_kafka_message_t **rkmessages = NULL;
205
206 #if 0 /* Future API */
207 /* The offset storage file is optional but its presence
208@@ -725,6 +730,9 @@
209 strerror(errno));
210
211 print_stats(mode, 0, compression);
212+
213+ /* Poll to handle stats callbacks */
214+ rd_kafka_poll(rk, 0);
215 }
216 cnt.t_end = rd_clock();
217
218
219=== modified file 'rdkafka.c'
220--- rdkafka.c 2013-11-04 16:50:07 +0000
221+++ rdkafka.c 2014-01-01 03:04:53 +0000
222@@ -353,7 +353,7 @@
223 rd_kafka_dbg(rk, QUEUE, "QSERVE", "Serving %i ops", localq.rkq_qlen);
224
225 /* Call callback for each op */
226- TAILQ_FOREACH_SAFE(rko, tmp, &localq.rkq_q, rko_link) {
227+ TAILQ_FOREACH_SAFE(rko, &localq.rkq_q, rko_link, tmp) {
228 callback(rko, opaque);
229 rd_kafka_op_destroy(rko);
230 }
231@@ -515,7 +515,7 @@
232
233 /* Decommission all topics */
234 rd_kafka_lock(rk);
235- TAILQ_FOREACH_SAFE(rkt, rkt_tmp, &rk->rk_topics, rkt_link) {
236+ TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
237 rd_kafka_unlock(rk);
238 rd_kafka_topic_partitions_remove(rkt);
239 rd_kafka_lock(rk);
240@@ -553,7 +553,7 @@
241 size_t size = *sizep;
242 int of = *ofp;
243
244- _st_printf("%s{ "
245+ _st_printf("%s\"%"PRId32"\": { "
246 "\"partition\":%"PRId32", "
247 "\"leader\":%"PRId32", "
248 "\"desired\":%s, "
249@@ -574,6 +574,7 @@
250 "} ",
251 first ? "" : ", ",
252 rktp->rktp_partition,
253+ rktp->rktp_partition,
254 rktp->rktp_leader ? rktp->rktp_leader->rkb_nodeid : -1,
255 (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
256 (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
257@@ -618,7 +619,7 @@
258 "\"ts\":%"PRIu64", "
259 "\"time\":%lli, "
260 "\"replyq\":%i, "
261- "\"brokers\":[ "/*open brokers*/,
262+ "\"brokers\":{ "/*open brokers*/,
263 now,
264 (signed long long)time(NULL),
265 rk->rk_rep.rkq_qlen);
266@@ -626,7 +627,8 @@
267
268 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
269 rd_kafka_broker_lock(rkb);
270- _st_printf("%s{ "/*open broker*/
271+ rd_kafka_avg_rollover(&rkb->rkb_rtt_last, &rkb->rkb_rtt_curr);
272+ _st_printf("%s\"%s\": { "/*open broker*/
273 "\"name\":\"%s\", "
274 "\"nodeid\":%"PRId32", "
275 "\"state\":\"%s\", "
276@@ -639,9 +641,16 @@
277 "\"rx\":%"PRIu64", "
278 "\"rxbytes\":%"PRIu64", "
279 "\"rxerrs\":%"PRIu64", "
280- "\"toppars\":[ "/*open toppars*/,
281+ "\"rtt\": {"
282+ " \"min\":%"PRIu64","
283+ " \"max\":%"PRIu64","
284+ " \"avg\":%"PRIu64","
285+ " \"cnt\":%i "
286+ "}, "
287+ "\"toppars\":{ "/*open toppars*/,
288 rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
289 rkb->rkb_name,
290+ rkb->rkb_name,
291 rkb->rkb_nodeid,
292 rd_kafka_broker_state_names[rkb->rkb_state],
293 rkb->rkb_outbufs.rkbq_cnt,
294@@ -652,37 +661,43 @@
295 rkb->rkb_c.tx_retries,
296 rkb->rkb_c.rx,
297 rkb->rkb_c.rx_bytes,
298- rkb->rkb_c.rx_err);
299+ rkb->rkb_c.rx_err,
300+ rkb->rkb_rtt_last.ra_min,
301+ rkb->rkb_rtt_last.ra_max,
302+ rkb->rkb_rtt_last.ra_avg,
303+ rkb->rkb_rtt_last.ra_cnt);
304
305 rd_kafka_broker_toppars_rdlock(rkb);
306 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
307- _st_printf("%s{ "
308+ _st_printf("%s\"%.*s\": { "
309 "\"topic\":\"%.*s\", "
310 "\"partition\":%"PRId32"} ",
311 rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
312 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
313+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
314 rktp->rktp_partition);
315 }
316 rd_kafka_broker_toppars_unlock(rkb);
317
318 rd_kafka_broker_unlock(rkb);
319
320- _st_printf("] "/*close toppars*/
321+ _st_printf("} "/*close toppars*/
322 "} "/*close broker*/);
323 }
324
325
326- _st_printf("], " /* close "brokers" array */
327- "\"topics\":[ ");
328+ _st_printf("}, " /* close "brokers" array */
329+ "\"topics\":{ ");
330
331 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
332 int i;
333
334 rd_kafka_topic_rdlock(rkt);
335- _st_printf("%s{ "
336+ _st_printf("%s\"%.*s\": { "
337 "\"topic\":\"%.*s\", "
338- "\"partitions\":[ " /*open partitions*/,
339+ "\"partitions\":{ " /*open partitions*/,
340 rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
341+ RD_KAFKAP_STR_PR(rkt->rkt_topic),
342 RD_KAFKAP_STR_PR(rkt->rkt_topic));
343
344 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
345@@ -699,14 +714,14 @@
346 rkt->rkt_ua, i++ == 0);
347 rd_kafka_topic_unlock(rkt);
348
349- _st_printf("] "/*close partitions*/
350+ _st_printf("} "/*close partitions*/
351 "} "/*close topic*/);
352
353 }
354
355 rd_kafka_unlock(rk);
356
357- _st_printf("] "/*close topics*/
358+ _st_printf("} "/*close topics*/
359 "}"/*close object*/);
360
361
362@@ -842,17 +857,6 @@
363
364 /**
365 * Produce a single message.
366- *
367- * If 'partition' is unassigned (RD_KAFKA_PARTITION_UA) the configured or
368- * default partitioner will be used to designate the target partition.
369- *
370- * See rdkafka.h for 'msgflags'.
371- *
372- * Returns: 0 on success or -1 on error (see errno for details)
373- *
374- * errnos:
375- * ENOBUFS - conf.producer.max_msg_cnt would be exceeded.
376- *
377 * Locality: any application thread
378 */
379 int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
380@@ -1279,3 +1283,21 @@
381 int rd_kafka_outq_len (rd_kafka_t *rk) {
382 return rk->rk_producer.msg_cnt;
383 }
384+
385+
386+int rd_kafka_version (void) {
387+ return RD_KAFKA_VERSION;
388+}
389+
390+const char *rd_kafka_version_str (void) {
391+ static char ret[64];
392+ int ver = rd_kafka_version();
393+
394+ if (!*ret)
395+ snprintf(ret, sizeof(ret), "%i.%i.%i",
396+ (ver >> 24) & 0xff,
397+ (ver >> 16) & 0xff,
398+ (ver >> 8) & 0xff);
399+
400+ return ret;
401+}
402
403=== modified file 'rdkafka.h'
404--- rdkafka.h 2013-11-04 16:50:07 +0000
405+++ rdkafka.h 2014-01-01 03:04:53 +0000
406@@ -43,13 +43,25 @@
407 /**
408 * librdkafka version
409 *
410- * interpreted as MM.mm.rr.xx:
411+ * Interpreted as hex MM.mm.rr.xx:
412 * MM = Major
413 * mm = minor
414 * rr = revision
415 * xx = currently unused
416- */
417-#define RD_KAFKA_VERSION 0x00080000
418+ *
419+ * I.e.: 0x00080100 = 0.8.1
420+ */
421+#define RD_KAFKA_VERSION 0x00080100
422+
423+/**
424+ * Returns the librdkafka version as integer.
425+ */
426+int rd_kafka_version (void);
427+
428+/**
429+ * Returns the librdkafka version as string.
430+ */
431+const char *rd_kafka_version_str (void);
432
433
434 /**
435@@ -634,8 +646,11 @@
436 * pointer that will provided in the delivery report callback (`dr_cb`) for
437 * referencing this message.
438 *
439- * Returns 0 on success or -1 if the maximum number of outstanding messages
440- * (conf.producer.max_messages) has been reached (`errno==ENOBUFS`).
441+ * Returns 0 on success or -1 on error in which case errno is set accordingly:
442+ * ENOBUFS - maximum number of outstanding messages has been reached:
443+ * "queue.buffering.max.message"
444+ * EMSGSIZE - message is larger than configured max size:
445+ * "messages.max.bytes".
446 *
447 */
448
449
450=== modified file 'rdkafka_broker.c'
451--- rdkafka_broker.c 2013-11-04 16:50:07 +0000
452+++ rdkafka_broker.c 2014-01-01 03:04:53 +0000
453@@ -72,7 +72,7 @@
454 int i;
455
456 rd_kafka_dbg(rk, MSG, "MSG", "%s: iovlen %zd",
457- what, msg->msg_iovlen);
458+ what, (size_t)msg->msg_iovlen);
459
460 for (i = 0 ; i < msg->msg_iovlen ; i++) {
461 rd_kafka_dbg(rk, MSG, what,
462@@ -221,26 +221,32 @@
463 TAILQ_INIT(&rkbufq->rkbq_bufs);
464 rkbufq->rkbq_cnt = 0;
465 }
466+
467+/**
468+ * Concat all buffers from 'src' to tail of 'dst'
469+ */
470+static void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
471+ TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
472+ rd_atomic_add(&dst->rkbq_cnt, src->rkbq_cnt);
473+ rd_kafka_bufq_init(src);
474+}
475+
476 /**
477 * Purge the wait-response queue.
478+ * NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps
479+ * or rkb_outbufs since buffers may be re-enqueued on those queues.
480 */
481 static void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
482 rd_kafka_bufq_t *rkbufq,
483 rd_kafka_resp_err_t err) {
484 rd_kafka_buf_t *rkbuf, *tmp;
485- rd_kafka_bufq_t tmpq;
486
487 assert(pthread_self() == rkb->rkb_thread);
488
489- /* Move messages to temporary queue to avoid infinite loops if they
490- * are requeued on the same queue in the rkbuf_cb callback. */
491- rd_kafka_bufq_init(&tmpq);
492- TAILQ_MOVE(&tmpq.rkbq_bufs, &rkbufq->rkbq_bufs, rkbuf_link);
493- rd_kafka_bufq_init(rkbufq);
494-
495- rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq");
496-
497- TAILQ_FOREACH_SAFE(rkbuf, tmp, &tmpq.rkbq_bufs, rkbuf_link)
498+ rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
499+ rkbufq->rkbq_cnt);
500+
501+ TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp)
502 rkbuf->rkbuf_cb(rkb, err, NULL, rkbuf, rkbuf->rkbuf_opaque);
503 }
504
505@@ -255,14 +261,14 @@
506
507 assert(pthread_self() == rkb->rkb_thread);
508
509- TAILQ_FOREACH_SAFE(rkbuf, tmp,
510- &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link) {
511+ TAILQ_FOREACH_SAFE(rkbuf,
512+ &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link, tmp) {
513 if (likely(rkbuf->rkbuf_ts_timeout > now))
514 continue;
515
516 rd_kafka_bufq_deq(&rkb->rkb_waitresps, rkbuf);
517
518- rkbuf->rkbuf_cb(rkb, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
519+ rkbuf->rkbuf_cb(rkb, RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
520 NULL, rkbuf, rkbuf->rkbuf_opaque);
521 cnt++;
522 }
523@@ -286,6 +292,7 @@
524 va_list ap;
525 int errno_save = errno;
526 rd_kafka_toppar_t *rktp;
527+ rd_kafka_bufq_t tmpq;
528
529 assert(pthread_self() == rkb->rkb_thread);
530 rd_kafka_broker_lock(rkb);
531@@ -338,11 +345,21 @@
532 strlen(rkb->rkb_err.msg));
533 }
534
535- rd_kafka_bufq_purge(rkb, &rkb->rkb_waitresps, err);
536- rd_kafka_bufq_purge(rkb, &rkb->rkb_outbufs, err);
537+ /*
538+ * Purge all buffers
539+ * (put on a temporary queue since bufs may be requeued)
540+ */
541+ rd_kafka_bufq_init(&tmpq);
542+ rd_kafka_bufq_concat(&tmpq, &rkb->rkb_waitresps);
543+ rd_kafka_bufq_concat(&tmpq, &rkb->rkb_outbufs);
544
545+ /* Unlock broker since a requeue will try to lock it. */
546 rd_kafka_broker_unlock(rkb);
547
548+ /* Purge the buffers */
549+ rd_kafka_bufq_purge(rkb, &tmpq, err);
550+
551+
552 /* Undelegate all toppars from this broker. */
553 rd_kafka_broker_toppars_wrlock(rkb);
554 while ((rktp = TAILQ_FIRST(&rkb->rkb_toppars))) {
555@@ -382,7 +399,7 @@
556
557 rd_kafka_dbg(rkb->rkb_rk, BROKER, "BRKSEND",
558 "sendmsg FAILED for iovlen %zd (%i)",
559- msg->msg_iovlen,
560+ (size_t)msg->msg_iovlen,
561 IOV_MAX);
562 rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT,
563 "Send failed: %s", strerror(errno));
564@@ -390,8 +407,8 @@
565 return -1;
566 }
567
568- rkb->rkb_c.tx_bytes += r;
569- rkb->rkb_c.tx++;
570+ rd_atomic_add(&rkb->rkb_c.tx_bytes, r);
571+ rd_atomic_add(&rkb->rkb_c.tx, 1);
572 return r;
573 }
574
575@@ -1020,11 +1037,15 @@
576 static rd_kafka_buf_t *rd_kafka_waitresp_find (rd_kafka_broker_t *rkb,
577 int32_t corrid) {
578 rd_kafka_buf_t *rkbuf;
579+ rd_ts_t now = rd_clock();
580
581 assert(pthread_self() == rkb->rkb_thread);
582
583 TAILQ_FOREACH(rkbuf, &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link)
584 if (rkbuf->rkbuf_corrid == corrid) {
585+ rd_kafka_avg_add(&rkb->rkb_rtt_curr,
586+ now - rkbuf->rkbuf_ts_sent);
587+
588 rd_kafka_bufq_deq(&rkb->rkb_waitresps, rkbuf);
589 return rkbuf;
590 }
591@@ -1089,10 +1110,13 @@
592 off_t vof = of - len;
593
594 if (0)
595- printf(" #%i/%zd and %zd: of %zd, len %zd, "
596- "vof %zd: iov %zd\n",
597- i, src->msg_iovlen, dst->msg_iovlen,
598- of, len, vof, src->msg_iov[i].iov_len);
599+ printf(" #%i/%zd and %zd: of %jd, len %zd, "
600+ "vof %jd: iov %zd\n",
601+ i,
602+ (size_t)src->msg_iovlen,
603+ (size_t)dst->msg_iovlen,
604+ (intmax_t)of, len, (intmax_t)vof,
605+ src->msg_iov[i].iov_len);
606 if (vof < 0)
607 vof = 0;
608
609@@ -1248,8 +1272,8 @@
610 if (rkbuf->rkbuf_of == rkbuf->rkbuf_len + sizeof(rkbuf->rkbuf_reshdr)) {
611 /* Message is complete, pass it on to the original requester. */
612 rkb->rkb_recv_buf = NULL;
613- rkb->rkb_c.rx++;
614- rkb->rkb_c.rx_bytes += rkbuf->rkbuf_of;
615+ rd_atomic_add(&rkb->rkb_c.rx, 1);
616+ rd_atomic_add(&rkb->rkb_c.rx_bytes, rkbuf->rkbuf_of);
617 rd_kafka_req_response(rkb, rkbuf);
618 }
619
620@@ -1314,6 +1338,32 @@
621 rd_rkb_dbg(rkb, BROKER, "CONNECTED", "connected to %s",
622 rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE));
623
624+ /* Set socket send & receive buffer sizes if configuerd */
625+ if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
626+ if (setsockopt(rkb->rkb_s, SOL_SOCKET, SO_SNDBUF,
627+ &rkb->rkb_rk->rk_conf.socket_sndbuf_size,
628+ sizeof(rkb->rkb_rk->rk_conf.
629+ socket_sndbuf_size)) == -1)
630+ rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
631+ "Failed to set socket send "
632+ "buffer size to %i: %s",
633+ rkb->rkb_rk->rk_conf.socket_sndbuf_size,
634+ strerror(errno));
635+ }
636+
637+ if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
638+ if (setsockopt(rkb->rkb_s, SOL_SOCKET, SO_RCVBUF,
639+ &rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
640+ sizeof(rkb->rkb_rk->rk_conf.
641+ socket_rcvbuf_size)) == -1)
642+ rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
643+ "Failed to set socket receive "
644+ "buffer size to %i: %s",
645+ rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
646+ strerror(errno));
647+ }
648+
649+
650 rd_kafka_broker_lock(rkb);
651 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
652 rkb->rkb_err.err = 0;
653@@ -1381,6 +1431,9 @@
654 /* Entire buffer sent, unlink from outbuf */
655 rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf);
656
657+ /* Store time for RTT calculation */
658+ rkbuf->rkbuf_ts_sent = rd_clock();
659+
660 /* Put buffer on response wait list unless we are not
661 * expecting a response (required_acks=0). */
662 if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_NO_RESPONSE))
663@@ -1606,7 +1659,7 @@
664 if (0)
665 rd_rkb_dbg(rkb, MSG, "PRODUCE",
666 "Serve %i/%i messages (%i iovecs) "
667- "for %.*s [%"PRId32"] (%zd bytes)",
668+ "for %.*s [%"PRId32"] (%"PRIu64" bytes)",
669 msgcnt, rktp->rktp_msgq.rkmq_msg_cnt,
670 iovcnt,
671 RD_KAFKAP_STR_PR(rkt->rkt_topic),
672@@ -1732,6 +1785,12 @@
673 msghdr++;
674 }
675
676+ /* No messages added, bail out early. */
677+ if (unlikely(rkbuf->rkbuf_msgq.rkmq_msg_cnt == 0)) {
678+ rd_kafka_buf_destroy(rkbuf);
679+ return -1;
680+ }
681+
682 /* Compress the messages */
683 if (rkb->rkb_rk->rk_conf.compression_codec) {
684 int siovlen = 1;
685@@ -1923,8 +1982,8 @@
686
687 do_send:
688
689- rktp->rktp_c.tx_msgs += rkbuf->rkbuf_msgq.rkmq_msg_cnt;
690- rktp->rktp_c.tx_bytes += prodhdr->part2.MessageSetSize;
691+ rd_atomic_add(&rktp->rktp_c.tx_msgs, rkbuf->rkbuf_msgq.rkmq_msg_cnt);
692+ rd_atomic_add(&rktp->rktp_c.tx_bytes, prodhdr->part2.MessageSetSize);
693
694 prodhdr->part2.MessageSetSize =
695 htonl(prodhdr->part2.MessageSetSize);
696@@ -2104,8 +2163,10 @@
697 while (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0) {
698 int r = rd_kafka_broker_produce_toppar(
699 rkb, rktp);
700- if (r > 0)
701+ if (likely(r > 0))
702 cnt += r;
703+ else
704+ break;
705 }
706 }
707
708@@ -2114,7 +2175,7 @@
709 /* Trigger delivery report for timed out messages */
710 if (unlikely(timedout.rkmq_msg_cnt > 0))
711 rd_kafka_dr_msgq(rkb->rkb_rk, &timedout,
712- RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT);
713+ RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
714
715 rd_kafka_broker_toppars_unlock(rkb);
716
717@@ -3169,10 +3230,11 @@
718
719 int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) {
720 char *s = strdupa(brokerlist);
721- char *t, *n;
722+ char *t, *t2, *n;
723 int cnt = 0;
724 rd_kafka_broker_t *rkb;
725
726+ /* Parse comma-separated list of brokers. */
727 while (*s) {
728 uint16_t port = 0;
729
730@@ -3186,7 +3248,17 @@
731 else
732 n = s + strlen(s)-1;
733
734- if ((t = strchr(s, ':'))) {
735+ /* Check if port has been specified, but try to identify IPv6
736+ * addresses first:
737+ * t = last ':' in string
738+ * t2 = first ':' in string
739+ * If t and t2 are equal then only one ":" exists in name
740+ * and thus an IPv4 address with port specified.
741+ * Else if not equal and t is prefixed with "]" then it's an
742+ * IPv6 address with port specified.
743+ * Else no port specified. */
744+ if ((t = strrchr(s, ':')) &&
745+ ((t2 = strchr(s, ':')) == t || *(t-1) == ']')) {
746 *t = '\0';
747 port = atoi(t+1);
748 }
749
750=== modified file 'rdkafka_defaultconf.c'
751--- rdkafka_defaultconf.c 2013-11-04 16:50:07 +0000
752+++ rdkafka_defaultconf.c 2014-01-01 03:04:53 +0000
753@@ -122,6 +122,14 @@
754 { _RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
755 "Timeout for network requests.",
756 10, 300*1000, 60*1000 },
757+ { _RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT,
758+ _RK(socket_sndbuf_size),
759+ "Broker socket send buffer size. System default is used if 0.",
760+ 0, 100000000, 0 },
761+ { _RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT,
762+ _RK(socket_rcvbuf_size),
763+ "Broker socket receive buffer size. System default is used if 0.",
764+ 0, 100000000, 0 },
765 { _RK_GLOBAL, "broker.address.ttl", _RK_C_INT,
766 _RK(broker_addr_ttl),
767 "How long to cache the broker address resolving results.",
768
769=== modified file 'rdkafka_int.h'
770--- rdkafka_int.h 2013-11-04 16:50:07 +0000
771+++ rdkafka_int.h 2014-01-01 03:04:53 +0000
772@@ -123,6 +123,8 @@
773 int debug;
774 int broker_addr_ttl;
775 int socket_timeout_ms;
776+ int socket_sndbuf_size;
777+ int socket_rcvbuf_size;
778 char *clientid;
779 char *brokerlist;
780 int stats_interval_ms;
781@@ -205,6 +207,43 @@
782
783
784
785+typedef struct rd_kafka_avg_s {
786+ rd_ts_t ra_max;
787+ rd_ts_t ra_min;
788+ rd_ts_t ra_avg;
789+ rd_ts_t ra_sum;
790+ int ra_cnt;
791+} rd_kafka_avg_t;
792+
793+/**
794+ * Add timestamp 'ts' to averager 'ra'.
795+ */
796+static RD_UNUSED void rd_kafka_avg_add (rd_kafka_avg_t *ra, rd_ts_t ts) {
797+ if (ts > ra->ra_max)
798+ ra->ra_max = ts;
799+ if (ra->ra_min == 0 || ts < ra->ra_min)
800+ ra->ra_min = ts;
801+ ra->ra_sum += ts;
802+ ra->ra_cnt++;
803+}
804+
805+/**
806+ * Rolls over statistics in 'src' and stores the average in 'dst'.
807+ * 'src' is cleared and ready to be reused.
808+ */
809+static RD_UNUSED void rd_kafka_avg_rollover (rd_kafka_avg_t *dst,
810+ rd_kafka_avg_t *src) {
811+ *dst = *src;
812+ if (dst->ra_cnt)
813+ dst->ra_avg = dst->ra_sum / dst->ra_cnt;
814+ else
815+ dst->ra_avg = 0;
816+
817+ memset(src, 0, sizeof(*src));
818+}
819+
820+
821+
822 typedef struct rd_kafka_msg_s {
823 TAILQ_ENTRY(rd_kafka_msg_s) rkm_link;
824 int rkm_flags;
825@@ -405,6 +444,9 @@
826 rd_kafka_bufq_t rkb_waitresps;
827 rd_kafka_bufq_t rkb_retrybufs;
828
829+ rd_kafka_avg_t rkb_rtt_curr; /* Current averaging period */
830+ rd_kafka_avg_t rkb_rtt_last; /* Last averaging period */
831+
832 char rkb_name[128]; /* Display name */
833 char rkb_nodename[128]; /* host:port */
834
835
836=== modified file 'rdkafka_msg.c'
837--- rdkafka_msg.c 2013-11-04 16:50:07 +0000
838+++ rdkafka_msg.c 2014-01-01 03:04:53 +0000
839@@ -64,6 +64,12 @@
840 size_t mlen = sizeof(*rkm);
841
842 assert(len > 0);
843+
844+ if (unlikely(len + keylen > rkt->rkt_rk->rk_conf.max_msg_size)) {
845+ errno = EMSGSIZE;
846+ return -1;
847+ }
848+
849 if (unlikely(rd_atomic_add(&rkt->rkt_rk->rk_producer.msg_cnt, 1) >
850 rkt->rkt_rk->rk_conf.queue_buffering_max_msgs)) {
851 rd_atomic_sub(&rkt->rkt_rk->rk_producer.msg_cnt, 1);
852@@ -115,7 +121,7 @@
853 int cnt = timedout->rkmq_msg_cnt;
854
855 /* Assume messages are added in time sequencial order */
856- TAILQ_FOREACH_SAFE(rkm, tmp, &rkmq->rkmq_msgs, rkm_link) {
857+ TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
858 if (likely(rkm->rkm_ts_timeout > now))
859 break;
860
861
862=== modified file 'rdkafka_proto.h'
863--- rdkafka_proto.h 2013-11-04 16:50:07 +0000
864+++ rdkafka_proto.h 2014-01-01 03:04:53 +0000
865@@ -81,22 +81,22 @@
866 #define RD_KAFKAP_STR_LEN_NULL -1
867 /* Returns the actual size of a kafka protocol string representation. */
868 #define RD_KAFKAP_STR_SIZE(kstr) (int16_t)(sizeof((kstr)->len) + \
869- (ntohs((kstr)->len) == \
870+ ((int16_t)ntohs((kstr)->len) == \
871 RD_KAFKAP_STR_LEN_NULL ? \
872 0 : ntohs((kstr)->len)))
873 /* Returns the length of the string of a kafka protocol string representation */
874 #define RD_KAFKAP_STR_LEN(kstr) (int)((ntohs((kstr)->len) == \
875 RD_KAFKAP_STR_LEN_NULL ? \
876- 0 : ntohs((kstr)->len)))
877+ 0 : (int16_t)ntohs((kstr)->len)))
878
879
880 /* Macro suitable for "%.*s" printing. */
881 #define RD_KAFKAP_STR_PR(kstr) \
882- (ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL ? \
883+ ((int16_t)ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL ? \
884 0 : (int)ntohs((kstr)->len)), (kstr)->str
885
886 #define RD_KAFKAP_STR_IS_NULL(kstr) \
887- (ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL)
888+ ((int16_t)ntohs((kstr)->len) == RD_KAFKAP_STR_LEN_NULL)
889
890 static inline int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
891 const rd_kafkap_str_t *b) RD_UNUSED;
892@@ -137,7 +137,7 @@
893 kstr->len = ntohs(len);
894 memcpy(kstr->str, str, len+1);
895 } else
896- kstr->len = ntohs(RD_KAFKAP_STR_LEN_NULL);
897+ kstr->len = (int16_t)ntohs(RD_KAFKAP_STR_LEN_NULL);
898
899 return kstr;
900 }
901@@ -163,16 +163,17 @@
902 #define RD_KAFKAP_BYTES_LEN_NULL -1
903 /* Returns the actual size of a kafka protocol bytes representation. */
904 #define RD_KAFKAP_BYTES_SIZE(kbytes) (int32_t)(sizeof((kbytes)->len) + \
905- (ntohl((kbytes)->len) == \
906+ ((int32_t)ntohl((kbytes)->len)==\
907 RD_KAFKAP_BYTES_LEN_NULL ? \
908 0 : ntohl((kbytes)->len)))
909 /* Returns the length of the string of a kafka protocol bytes representation */
910-#define RD_KAFKAP_BYTES_LEN(kbytes) (int32_t)((ntohl((kbytes)->len) == \
911+#define RD_KAFKAP_BYTES_LEN(kbytes) (int32_t)(((int32_t)ntohl((kbytes)->len) ==\
912 RD_KAFKAP_BYTES_LEN_NULL ? \
913- 0 : ntohl((kbytes)->len)))
914+ 0 : \
915+ (int32_t)ntohl((kbytes)->len)))
916
917 #define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
918- (ntohs((kbytes)->len) == RD_KAFKAP_STR_LEN_NULL)
919+ ((int32_t)ntohl((kbytes)->len) == RD_KAFKAP_STR_LEN_NULL)
920
921
922 static inline int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
923@@ -207,7 +208,7 @@
924 kbytes->len = ntohl(datalen);
925 memcpy(kbytes->data, data, datalen);
926 } else
927- kbytes->len = ntohl(RD_KAFKAP_BYTES_LEN_NULL);
928+ kbytes->len = (int32_t)ntohl(RD_KAFKAP_BYTES_LEN_NULL);
929
930 return kbytes;
931 }
932
933=== modified file 'rdkafka_topic.c'
934--- rdkafka_topic.c 2013-11-04 16:50:07 +0000
935+++ rdkafka_topic.c 2014-01-01 03:04:53 +0000
936@@ -528,7 +528,7 @@
937 int32_t leader) {
938 rd_kafka_topic_t *rkt;
939 rd_kafka_toppar_t *rktp;
940- rd_kafka_broker_t *rkb;
941+ rd_kafka_broker_t *rkb = NULL;
942
943 if (!(rkt = rd_kafka_topic_find(rk, topic))) {
944 rd_kafka_dbg(rk, METADATA, "TOPICUPD",
945@@ -537,40 +537,35 @@
946 }
947
948 /* Find broker */
949- rd_kafka_lock(rk);
950- rkb = rd_kafka_broker_find_by_nodeid(rk, leader);
951- rd_kafka_unlock(rk);
952-
953+ if (leader != -1) {
954+ rd_kafka_lock(rk);
955+ rkb = rd_kafka_broker_find_by_nodeid(rk, leader);
956+ rd_kafka_unlock(rk);
957+ }
958
959 rd_kafka_topic_wrlock(rkt);
960
961 rktp = rd_kafka_toppar_get(rkt, partition, 0);
962 assert(rktp);
963
964- if (leader == -1) {
965- /* Topic lost its leader */
966- rd_kafka_toppar_broker_delegate(rktp, NULL);
967- rd_kafka_topic_unlock(rkt);
968-
969- /* Query for the topic leader (async) */
970- rd_kafka_topic_leader_query(rk, rkt);
971-
972- rd_kafka_toppar_destroy(rktp); /* from get() */
973- rd_kafka_topic_destroy(rkt); /* from find() */
974- return;
975- }
976-
977-
978 if (!rkb) {
979- rd_kafka_log(rk, LOG_NOTICE, "TOPICBRK",
980- "Topic %s [%"PRId32"] migrated to unknown "
981- "broker %"PRId32": requesting metadata update",
982- topic, partition, leader);
983+ int had_leader = rktp->rktp_leader ? 1 : 0;
984+
985+ if (leader == -1)
986+ /* Topic lost its leader */;
987+ else
988+ rd_kafka_log(rk, LOG_NOTICE, "TOPICBRK",
989+ "Topic %s [%"PRId32"] migrated to unknown "
990+ "broker %"PRId32": "
991+ "requesting metadata update",
992+ topic, partition, leader);
993+
994 rd_kafka_toppar_broker_delegate(rktp, NULL);
995 rd_kafka_topic_unlock(rkt);
996
997 /* Query for the topic leader (async) */
998- rd_kafka_topic_leader_query(rk, rkt);
999+ if (had_leader)
1000+ rd_kafka_topic_leader_query(rk, rkt);
1001
1002 rd_kafka_toppar_destroy(rktp); /* from get() */
1003 rd_kafka_topic_destroy(rkt); /* from find() */
1004@@ -804,7 +799,7 @@
1005 cnt = uas.rkmq_msg_cnt;
1006 rd_kafka_toppar_unlock(rktp_ua);
1007
1008- TAILQ_FOREACH_SAFE(rkm, tmp, &uas.rkmq_msgs, rkm_link) {
1009+ TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
1010 if (unlikely(rd_kafka_msg_partitioner(rkt, NULL, rkm) == -1)) {
1011 /* Desired partition not available */
1012 rd_kafka_msgq_enq(&failed, rkm);
1013
1014=== modified file 'rdsysqueue.h'
1015--- rdsysqueue.h 2013-11-04 16:50:07 +0000
1016+++ rdsysqueue.h 2014-01-01 03:04:53 +0000
1017@@ -125,13 +125,6 @@
1018 (*(((struct headname *)((elm)->field.tqe_prev))->tqh_last))
1019 #endif
1020
1021-#ifdef TAILQ_FOREACH_SAFE
1022-#ifdef __APPLE__
1023-/* Apple's .._SAFE macro has the temporary variable at the end. */
1024-#undef TAILQ_FOREACH_SAFE
1025-#endif
1026-#endif
1027-
1028 #ifndef TAILQ_FOREACH_SAFE
1029 /*
1030 * TAILQ_FOREACH_SAFE() provides a traversal where the current iterated element
1031@@ -139,7 +132,7 @@
1032 * It does not allow freeing or modifying any other element in the list,
1033 * at least not the next element.
1034 */
1035-#define TAILQ_FOREACH_SAFE(elm,tmpelm,head,field) \
1036+#define TAILQ_FOREACH_SAFE(elm,head,field,tmpelm) \
1037 for ((elm) = TAILQ_FIRST(head) ; \
1038 (elm) && ((tmpelm) = TAILQ_NEXT((elm), field), 1) ; \
1039 (elm) = (tmpelm))
1040
1041=== added file 'tests/0003-msgmaxsize.c'
1042--- tests/0003-msgmaxsize.c 1970-01-01 00:00:00 +0000
1043+++ tests/0003-msgmaxsize.c 2014-01-01 03:04:53 +0000
1044@@ -0,0 +1,169 @@
1045+/*
1046+ * librdkafka - Apache Kafka C library
1047+ *
1048+ * Copyright (c) 2012-2013, Magnus Edenhill
1049+ * All rights reserved.
1050+ *
1051+ * Redistribution and use in source and binary forms, with or without
1052+ * modification, are permitted provided that the following conditions are met:
1053+ *
1054+ * 1. Redistributions of source code must retain the above copyright notice,
1055+ * this list of conditions and the following disclaimer.
1056+ * 2. Redistributions in binary form must reproduce the above copyright notice,
1057+ * this list of conditions and the following disclaimer in the documentation
1058+ * and/or other materials provided with the distribution.
1059+ *
1060+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
1061+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1062+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1063+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
1064+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
1065+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
1066+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
1067+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
1068+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
1069+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
1070+ * POSSIBILITY OF SUCH DAMAGE.
1071+ */
1072+
1073+/**
1074+ * Tests "message.bytes.max"
1075+ * Issue #24
1076+ */
1077+
1078+#define _GNU_SOURCE
1079+#include <signal.h>
1080+#include <sys/time.h>
1081+#include <time.h>
1082+
1083+#include "test.h"
1084+
1085+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
1086+ * is built from within the librdkafka source tree and thus differs. */
1087+#include "rdkafka.h" /* for Kafka driver */
1088+
1089+
1090+static int msgs_wait = 0; /* bitmask */
1091+
1092+/**
1093+ * Delivery report callback.
1094+ * Called for each message once to signal its delivery status.
1095+ */
1096+static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
1097+ rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
1098+ int msgid = *(int *)msg_opaque;
1099+
1100+ free(msg_opaque);
1101+
1102+ if (err)
1103+ TEST_FAIL("Unexpected delivery error for message #%i: %s\n",
1104+ msgid, rd_kafka_err2str(err));
1105+
1106+ if (!(msgs_wait & (1 << msgid)))
1107+ TEST_FAIL("Unwanted delivery report for message #%i "
1108+ "(waiting for 0x%x)\n", msgid, msgs_wait);
1109+
1110+ TEST_SAY("Delivery report for message #%i: %s\n",
1111+ msgid, rd_kafka_err2str(err));
1112+
1113+ msgs_wait &= ~(1 << msgid);
1114+}
1115+
1116+
1117+int main (int argc, char **argv) {
1118+ char *topic = "rdkafkatest1";
1119+ int partition = 0;
1120+ int r;
1121+ rd_kafka_t *rk;
1122+ rd_kafka_topic_t *rkt;
1123+ rd_kafka_conf_t *conf;
1124+ rd_kafka_topic_conf_t *topic_conf;
1125+ char errstr[512];
1126+ char msg[100000];
1127+ int msgcnt = 10;
1128+ int i;
1129+
1130+ /* Socket hangups are gracefully handled in librdkafka on socket error
1131+ * without the use of signals, so SIGPIPE should be ignored by the
1132+ * calling program. */
1133+ signal(SIGPIPE, SIG_IGN);
1134+
1135+
1136+ test_conf_init(&conf, &topic_conf, 10);
1137+
1138+ /* Set a small maximum message size. */
1139+ if (rd_kafka_conf_set(conf, "message.max.bytes", "100000",
1140+ errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
1141+ TEST_FAIL("%s\n", errstr);
1142+
1143+ /* Set delivery report callback */
1144+ rd_kafka_conf_set_dr_cb(conf, dr_cb);
1145+
1146+ /* Create kafka instance */
1147+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
1148+ errstr, sizeof(errstr));
1149+ if (!rk)
1150+ TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
1151+
1152+ TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
1153+
1154+ rkt = rd_kafka_topic_new(rk, topic, topic_conf);
1155+ if (!rkt)
1156+ TEST_FAIL("Failed to create topic: %s\n",
1157+ strerror(errno));
1158+
1159+ memset(msg, 0, sizeof(msg));
1160+
1161+ /* Produce 'msgcnt' messages, size odd ones larger than max.bytes,
1162+ * and even ones smaller than max.bytes. */
1163+ for (i = 0 ; i < msgcnt ; i++) {
1164+ int *msgidp = malloc(sizeof(*msgidp));
1165+ size_t len;
1166+ int toobig = i & 1;
1167+
1168+ *msgidp = i;
1169+ if (toobig) {
1170+ /* Too big */
1171+ len = 200000;
1172+ } else {
1173+ /* Good size */
1174+ len = 5000;
1175+ msgs_wait |= (1 << i);
1176+ }
1177+
1178+ snprintf(msg, sizeof(msg), "%s test message #%i", argv[0], i);
1179+ r = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
1180+ msg, len, NULL, 0, msgidp);
1181+
1182+ if (toobig) {
1183+ if (r != -1)
1184+ TEST_FAIL("Succeeded to produce too "
1185+ "large message #%i\n", i);
1186+ free(msgidp);
1187+ } else if (r == -1)
1188+ TEST_FAIL("Failed to produce message #%i: %s\n",
1189+ i, strerror(errno));
1190+ }
1191+
1192+ /* Wait for messages to be delivered. */
1193+ while (rd_kafka_outq_len(rk) > 0)
1194+ rd_kafka_poll(rk, 50);
1195+
1196+ if (msgs_wait != 0)
1197+ TEST_FAIL("Still waiting for messages: 0x%x\n", msgs_wait);
1198+
1199+ /* Destroy topic */
1200+ rd_kafka_topic_destroy(rkt);
1201+
1202+ /* Destroy rdkafka instance */
1203+ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
1204+ rd_kafka_destroy(rk);
1205+
1206+ /* Wait for everything to be cleaned up since broker destroys are
1207+ * handled in its own thread. */
1208+ test_wait_exit(10);
1209+
1210+ /* If we havent failed at this point then
1211+ * there were no threads leaked */
1212+ return 0;
1213+}
1214
1215=== modified file 'tests/Makefile'
1216--- tests/Makefile 2013-11-04 16:50:07 +0000
1217+++ tests/Makefile 2014-01-01 03:04:53 +0000
1218@@ -1,4 +1,4 @@
1219-TESTS ?= 0001-multiobj.test 0002-msgtimeout.test
1220+TESTS ?= 0001-multiobj.test 0002-msgtimeout.test 0003-msgmaxsize.test
1221 CC ?= cc
1222 CXX ?= g++
1223 CFLAGS += -g -Wall -Werror -Wfloat-equal -Wpointer-arith -O2 -I../

Subscribers

People subscribed via source and target branches

to all changes: