Description
Description
A deadlock may occur when consumer rdkafka is being destroyed at the same time as it is handling a RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE
response from server. The relevant stack traces look like:
Thread 4738 (Thread 0x7f0c24949640 (LWP 8629)):
#0 __lll_lock_wait () at /opt/sbox/kdubuc/.solgdb/100.0kafka_clients_2.0.151/vmr/lib64/libc.so.6
#1 pthread_mutex_lock@@GLIBC_2.2.5 () at /opt/sbox/kdubuc/.solgdb/100.0kafka_clients_2.0.151/vmr/lib64/libc.so.6
#2 mtx_lock (mtx=mtx@entry=0x7f0d49ebec88) at tinycthread.c:136
#3 rd_kafka_topic_partition_cnt_update (rkt=rkt@entry=0x7f0b230767e0, partition_cnt=partition_cnt@entry=0) at rdkafka_topic.c:987
#4 rd_kafka_topic_partitions_remove (rkt=0x7f0b230767e0) at rdkafka_topic.c:1534
#5 rd_kafka_destroy_internal (rk=rk@entry=0x7f0a300cdae0) at rdkafka.c:1185
#6 rd_kafka_thread_main (arg=arg@entry=0x7f0a300cdae0) at rdkafka.c:2227
#7 _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
In particular, rd_kafka_topic_partitions_remove
has progressed to the point where all of the rktp queues have been disabled.
Thread 4761 (Thread 0x7f0c1d6d7640 (LWP 8746)):
#0 __lll_lock_wait () at /opt/sbox/kdubuc/.solgdb/100.0kafka_clients_2.0.151/vmr/lib64/libc.so.6
#1 pthread_mutex_lock@@GLIBC_2.2.5 () at /opt/sbox/kdubuc/.solgdb/100.0kafka_clients_2.0.151/vmr/lib64/libc.so.6
#2 mtx_lock (mtx=mtx@entry=0x7f0d49ebec88) at tinycthread.c:136
#3 rd_kafka_offset_reset_op_cb (rk=<optimized out>, rkq=<optimized out>, rko=0x7f0af1bc6b80) at rdkafka_offset.c:773
#4 rd_kafka_op_destroy (rko=0x7f0af1bc6b80) at rdkafka_op.c:300
#5 rd_kafka_op_reply (rko=rko@entry=0x7f0af1bc6b80, err=err@entry=RD_KAFKA_RESP_ERR__DESTROY) at rdkafka_op.c:606
#6 rd_kafka_q_enq1 (rkq=<optimized out>, rko=rko@entry=0x7f0af1bc6b80, orig_destq=<optimized out>, do_lock=1, at_head=0) at rdkafka_queue.h:439
#7 rd_kafka_q_enq (rko=0x7f0af1bc6b80, rkq=<optimized out>) at rdkafka_queue.h:480
#8 rd_kafka_offset_reset (rktp=rktp@entry=0x7f0d49ebebf0, broker_id=43, err_pos=..., err=RD_KAFKA_RESP_ERR__DESTROY, fmt=fmt@entry=0xc3a05d "%s") at rdkafka_offset.c:819
#9 rd_kafka_offset_reset_op_cb (rk=<optimized out>, rkq=<optimized out>, rko=0x7f0af120c350) at rdkafka_offset.c:774
#10 rd_kafka_op_destroy (rko=0x7f0af120c350) at rdkafka_op.c:300
#11 rd_kafka_op_reply (rko=rko@entry=0x7f0af120c350, err=err@entry=RD_KAFKA_RESP_ERR__DESTROY) at rdkafka_op.c:606
#12 rd_kafka_q_enq1 (rkq=<optimized out>, rko=rko@entry=0x7f0af120c350, orig_destq=<optimized out>, do_lock=1, at_head=0) at rdkafka_queue.h:439
#13 rd_kafka_q_enq (rko=0x7f0af120c350, rkq=<optimized out>) at rdkafka_queue.h:480
#14 rd_kafka_offset_reset (rktp=rktp@entry=0x7f0d49ebebf0, broker_id=43, err_pos=..., err=err@entry=RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE, fmt=fmt@entry=0xc944b8 "fetch failed due to requested offset not available on the broker") at rdkafka_offset.c:819
#15 rd_kafka_fetch_reply_handle_partition_error (HighwaterMarkOffset=-1, err=<optimized out>, tver=<optimized out>, rktp=0x7f0d49ebebf0, rkb=<optimized out>) at rdkafka_fetcher.c:286
#16 rd_kafka_fetch_reply_handle_partition (ErrorCode=<optimized out>, request=<optimized out>, rkbuf=0x7f0a614afe50, rkt=<optimized out>, topic=0x7f0c1d6d1530, rkb=<optimized out>) at rdkafka_fetcher.c:571
#17 rd_kafka_fetch_reply_handle (rkb=rkb@entry=0x7f0a9c6f0660, rkbuf=0x7f0a614afe50, request=<optimized out>) at rdkafka_fetcher.c:671
#18 rd_kafka_broker_fetch_reply (rk=<optimized out>, rkb=0x7f0a9c6f0660, err=RD_KAFKA_RESP_ERR_NO_ERROR, reply=<optimized out>, request=<optimized out>, opaque=<optimized out>) at rdkafka_fetcher.c:724
#19 rd_kafka_buf_callback (rk=0x7f0a300cdae0, rkb=0x7f0a9c6f0660, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x7f0a614afe50, request=0x7f0af14c6540) at rdkafka_buf.c:499
#20 rd_kafka_req_response (rkbuf=0x7f0a614afe50, rkb=0x7f0a9c6f0660) at rdkafka_broker.c:1902
#21 rd_kafka_recv (rkb=rkb@entry=0x7f0a9c6f0660) at rdkafka_broker.c:2021
#22 rd_kafka_transport_io_event (rktrans=rktrans@entry=0x7f0a6006b7b0, events=events@entry=1, socket_errstr=0x0) at rdkafka_transport.c:788
#23 rd_kafka_transport_io_serve (rktrans=0x7f0a6006b7b0, rkq=<optimized out>, timeout_ms=<optimized out>) at rdkafka_transport.c:1047
#24 rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0x7f0a9c6f0660, abs_timeout=330237630797) at rdkafka_broker.c:3552
#25 rd_kafka_broker_consumer_serve (rkb=rkb@entry=0x7f0a9c6f0660, abs_timeout=abs_timeout@entry=330237630797) at rdkafka_broker.c:4255
#26 rd_kafka_broker_serve (rkb=rkb@entry=0x7f0a9c6f0660, timeout_ms=<optimized out>, timeout_ms@entry=1000) at rdkafka_broker.c:4397
#27 rd_kafka_broker_thread_main (arg=arg@entry=0x7f0a9c6f0660) at rdkafka_broker.c:4597
#28 _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:576
rd_kafka_offset_reset
(#14
) tries to send an op from the broker thread to the rktp thread, with a callback of rd_kafka_offset_reset_op_cb
. But, the rktp queue has been disabled, so rd_kafka_q_enq
(#13
) calls rd_kafka_op_reply
(#12
) immediately with an err of RD_KAFKA_RESP_ERR__DESTROY
, which calls rd_kafka_op_destroy
(#10
) which calls the rd_kafka_offset_reset_op_cb
(#9
) callback. This re-enters rd_kafka_offset_reset
(#8
). rd_kafka_offset_reset
doesn't check for RD_KAFKA_RESP_ERR__DESTROY
. Instead, rd_kafka_offset_reset
tries to re-run this sequence. This would result in infinite recursion and a crash, except that rd_kafka_offset_reset_op_cb
takes the non-recursive rktp mutex with each re-entry, so the second entry through rd_kafka_offset_reset_op_cb
instead triggers a deadlock.
As far as I can tell, the following patch should resolve this:
--- src/rdkafka_offset.c
+++ src/rdkafka_offset.c
@@ -802,6 +802,9 @@ void rd_kafka_offset_reset(rd_kafka_toppar_t *rktp,
char reason[512];
va_list ap;
+ if (err == RD_KAFKA_RESP_ERR__DESTROY)
+ return;
+
va_start(ap, fmt);
rd_vsnprintf(reason, sizeof(reason), fmt, ap);
va_end(ap);
Note that we're working off our own fork of librdkafka, last synced with v2.2.0, so the line numbers quoted above may not exactly match yours. However, the latest version of rdkafka_offset.c (95a542c) still seems to be vulnerable to this issue.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- librdkafka version (release number or git tag): v2.2.0
- Apache Kafka version:
- librdkafka client configuration:
- Operating system: Red Hat Enterprise Linux 9.3 x86_64
- Provide logs (with
debug=..
as necessary) from librdkafka - Provide broker log excerpts
- Critical issue