-
Notifications
You must be signed in to change notification settings - Fork 811
Remove the BatchQueue type #1117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This was added in segmentio#716 to 'preserve message order across batches'. However, as noted in segmentio#1058, a simple channel performs the same function. To allow us to add backpressure for producers (coming in a later commit), let's switch back to a channel.
Looking into test failures. |
Hi @PleasingFungus, With the unbuffered channel, are all writes now synchronous until the last batch is being written for a given topic-partition? Are writes to one topic-partition now blocking writes to other topic-partitions until the final batch for a topic-partition is being sent? |
https://app.circleci.com/pipelines/github/segmentio/kafka-go/1243/workflows/144442e6-8c6b-4872-8dde-f6fb0e9cb1b0/jobs/13622 seems to be a pre-existing test failure in In https://app.circleci.com/pipelines/github/segmentio/kafka-go/1243/workflows/144442e6-8c6b-4872-8dde-f6fb0e9cb1b0/jobs/13625, In https://app.circleci.com/pipelines/github/segmentio/kafka-go/1243/workflows/144442e6-8c6b-4872-8dde-f6fb0e9cb1b0/jobs/13618 , I likewise can't reproduce the failure in |
Hi, @rhansen2! I'm sorry, you're quite right - the initial version of this PR was flawed. I tried to separate out the |
I think these changes still represent a behavior change. Keeping the behavior the same as it was while still reintroducing ordering was the original motivation for the current queuing implementation. Could you describe a bit the need for this behavior in the library vs potentially implementing it at a different layer of an application? |
Sure thing! The core use case that we encountered was a producer which writes messages significantly faster than the Kafka brokers are able to respond. We encountered this during performance testing while evaluating this library, but it's easy to imagine this happening in production - perhaps there's a burst of incoming data, or perhaps the Kafka brokers are under heavy load or otherwise have their response time degraded. Without the changes in this PR, It would not be possible to fix this issue at a higher level of the application. The application does not know how fast the brokers are responding to messages - that information is only available to There are three responses that
ISTM that (3) is the only response which will not lead to crashes or data loss in this scenario. What do you think? |
Thanks for the detailed response!
Could this be accomplished by tracking the number of unwritten messages? I think could be done by tracking the difference between the messages passed to WriteMessages and those that have been passed to a Completion function on the writer. Given the change in behavior here, do you think it'd be possible to implement this is a backwards compatible way? |
Good question! Are you imagining something like this? type sender struct {
w *kafka.Writer
maxInFlight chan struct{}
}
func newSender() (s *sender) {
const batchSize = 100
const maxBatchesInFlight = 100
return &sender{
w: &kafka.Writer{
Completion: func(messages []kafka.Message, err error) {
for i := 0; i < len(messages); i++ {
<-s.maxInFlight
}
},
/*...*/
},
maxInFlight: make(chan struct{}, batchSize*maxBatchesInFlight),
}
}
func (s *sender) sendMessageWithBackpressure(ctx context.Context, msg kafka.Message) error {
s.maxInFlight <- struct{}{}
return s.w.WriteMessages(ctx, msg)
} It seems possible to me that this might work, though it's substantially more complex than either the status quo or this PR - very possible that I'm overlooking some subtle issue. It also leaves the default behavior as the unsafe "unbounded memory usage" from the last sentence of the original PR description.
Sure, fair enough. I've taken a shot at this in d9c7ab9 (only lightly tested) - PTAL. |
Thanks for your continued patience with this issue! The example you posted is what I was thinking. In the backwards compatible commit, does the use of goroutines maintain the ordering of messages in all cases? |
No worries :)
Yes. |
This was added in #716 to 'preserve message order across batches'. However, as noted in #1058, a simple channel performs the same function. To allow us to add backpressure for producers, let's switch back to a channel and add a new
MaxBufferedBatches
configuration option. This prevents producers from seeing unbounded memory usage and quickly OOMing when they produce faster than brokers can keep up with.