Open
Description
Describe the bug
When a BufferChunkOverflowError occurs, it gets caught in an unexpected error and infinitely reconnects until BufferChunkOverflowError is resolved.
I think the BufferChunkOverflow error should be included in the BufferError and retry without reconnecting the consumer, is this a bug?
To Reproduce
Checking the fluentd-kafka-plugin log after a BufferChunkOverflowError
Reproduce the steps
- Set the output plugin buffer size to 1
- send message
- check the log
Expected behavior
It looks like BufferChunkOverflowError should be included in BufferError and retried without restarting the consumer.
Your Environment
- Fluentd version: 1.16.2
- TD Agent version:
- fluent-plugin-kafka version: v0.19.2
- ruby-kafka version: 1.5.0
- Operating system:
- Kernel version:
Your Configuration
<source>
@type kafka_group
consumer_group fluentd-consumer
brokers kafka-controller-0.kafka-controller-headless.kafka.svc.cluster.local:9092,kafka-controller-1.kafka-controller-headless.kafka.svc.cluster.local:9092,kafka-controller-2.kafka-controller-headless.kafka.svc.cluster.local:9092
topics test-topic
format json
offset_commit_interval 5
offset_commit_threshold 100
</source>
<match test-topic>
@type stdout
<buffer>
@type memory
retry_max_times 3
flush_mode interval
flush_interval 1s
flush_thread_interval 0.1
flush_thread_burst_interval 0.01
flush_thread_count 5
chunk_full_threshold 0.1
chunk_limit_size 1 # This is just a setup to intentionally throw a BufferChunkOverFlow error.
</buffer>
</match>
Your Error Log
2024-07-04 00:52:37 +0000 [info]: #1 Subscribe to topics matching the regex test-topic
2024-07-04 00:52:37 +0000 [warn]: #1 Re-starting consumer 2024-07-04 00:52:37 +0000
2024-07-04 00:52:44 +0000 [warn]: #1 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 3268 bytes record (nth: 0) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 1) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 2) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 3) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 4) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 5) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 6) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 7) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 8) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 9) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 10) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 11) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 12) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 13) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 14) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 15) is larger than buffer chunk limit size (1)" location="/usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:457:in `write'" tag="telemetry.ab"
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:198:in `rescue in emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:195:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:111:in `block (2 levels) in process'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:110:in `each'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:110:in `block in process'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:108:in `each'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:108:in `process'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:885:in `emit_sync'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:196:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:376:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:347:in `process_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:358:in `block in run'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:336:in `block (3 levels) in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:44 +0000 [warn]: #1 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 3268 bytes record (nth: 0) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 1) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 2) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 3) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 4) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 5) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 6) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 7) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 8) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 9) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 10) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 11) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 12) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 13) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 14) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 15) is larger than buffer chunk limit size (1)" location="/usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:457:in `write'" tag="test-topic"
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:198:in `rescue in emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:195:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:376:in `emit_events'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:347:in `process_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:358:in `block in run'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:336:in `block (3 levels) in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:45 +0000 [error]: #1 unexpected error during consuming events from kafka. Re-fetch events. error="Kafka::ProcessingError"
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:345:in `rescue in block (3 levels) in each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:338:in `block (3 levels) in each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch'
2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run'
2024-07-04 00:52:45 +0000 [warn]: #1 Stopping Consumer
2024-07-04 00:52:45 +0000 [warn]: #1 Could not connect to broker. retry_time:0. Next retry will be in 30 seconds
Additional context
No response
Metadata
Metadata
Assignees
Type
Projects
Status
To-Do