Skip to content

Add Conn.WriteMulti function to produce to multiple topics/partitions #1094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 216 additions & 26 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1123,6 +1125,199 @@ func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message
return c.writeCompressedMessages(codec, msgs...)
}

type MultiProduceRequest struct {
Topic string
Partitions []MultiProducePartition
}

type MultiProducePartition struct {
Partition int32
Messages []Message
}

type MultiProduceError []TopicPartitionError

func (e MultiProduceError) Error() string {
b := strings.Builder{}
for i, err := range e {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(err.Topic)
b.WriteByte(':')
b.WriteString(strconv.Itoa(int(err.Partition)))
b.WriteByte(' ')
b.WriteString(err.Error.Error())
}
return b.String()
}

type TopicPartitionError struct {
Topic string
Partition int32
Error Error
}

// WriteMulti writes a batch of messages to potentially multiple topics and
// partitions. This is a function for the advanced use case where the caller is
// batching based on the broker and not just the topic+partition. Doing so is
// more complex but can result in far fewer calls over the network, especially
// for topics where the partition counts are much greater than the broker count.
//
// The caller is responsible for making sure that this Conn is the leder for
// all the requested topic+partition combos.
//
// Failures come in two forms. One is complete failure such as a transport
// error (e.g. a network timeout). The second is a partial failure where the
// broker responds with a kaka.Error for one or more partitions. In the latter
// case, this function will return a `MultiProduceError`. The caller must take
// care to inspect the returned error and determine whether it was a partial
// or complete failure and respond appropriately (e.g. by re-trying to publish
// on the failed topic+partitions).
func (c *Conn) WriteMulti(codec CompressionCodec, reqs ...MultiProduceRequest) error {
produceVersion, err := c.negotiateVersion(produce, v2, v3, v7)
if err != nil {
return err
}

writeTime := time.Now()
return c.writeOperation(
func(deadline time.Time, id int32) error {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
switch produceVersion {
case v3, v7:
input := make([]produceInput, len(reqs))
for i, req := range reqs {
input[i] = produceInput{
topic: req.Topic,
partitions: make([]partitionBatch, len(reqs[i].Partitions)),
}

for j, part := range req.Partitions {
for k, msg := range part.Messages {
if msg.Time.IsZero() {
part.Messages[k].Time = writeTime
}
}

rb, err := newRecordBatch(codec, reqs[i].Partitions[j].Messages...)
if err != nil {
return err
}

input[i].partitions[j] = partitionBatch{
partition: part.Partition,
recordBatch: rb,
}
}
}

return c.wb.writeProduceRequest(
produceVersion,
id,
c.clientID,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
c.transactionalID,
input...,
)
default:
input := make([]msgSetProduceInput, len(reqs))
for i, req := range reqs {
input[i] = msgSetProduceInput{
topic: req.Topic,
partitions: make([]msgSetPartitionBatch, len(req.Partitions)),
}

for j, part := range req.Partitions {
for k, msg := range part.Messages {
if msg.Time.IsZero() {
part.Messages[k].Time = writeTime
}
}
input[i].partitions[j] = msgSetPartitionBatch{
partition: part.Partition,
messages: part.Messages,
}
}
}

return c.wb.writeProduceRequestV2(
codec,
id,
c.clientID,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
input...,
)
}
},
func(deadline time.Time, size int) error {
var mpErr MultiProduceError
remain, err := readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (remain int, err error) {
var topic string
if remain, err = readString(r, size, &topic); err != nil {
return remain, err
}

return readArrayWith(r, remain, func(r *bufio.Reader, size int) (remain int, err error) {
var partition int32
if remain, err = readInt32(&c.rbuf, size, &partition); err != nil {
return remain, err
}

// error code is the only thing we care about...if non-zero,
// then we have a partial failure scenario.
var errCode int16
if remain, err = readInt16(&c.rbuf, remain, &errCode); err != nil {
return remain, err
}
if errCode > 0 {
mpErr = append(mpErr, TopicPartitionError{
Topic: topic,
Partition: partition,
Error: Error(errCode),
})
}

// discard the offset
if remain, err = discardN(&c.rbuf, remain, 8); err != nil {
return remain, err
}

// discard the timestamp
if remain, err = discardN(&c.rbuf, remain, 8); err != nil {
return remain, err
}

// discard start offset.
if produceVersion >= v7 {
if remain, err = discardN(&c.rbuf, remain, 8); err != nil {
return remain, err
}
}

return remain, err
})
})
if err != nil {
return err
}

// discard throttle time
if err := expectZeroSize(discardInt32(&c.rbuf, remain)); err != nil {
return err
}

if len(mpErr) > 0 {
return mpErr
}

return nil
})
}

func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
if len(msgs) == 0 {
return
Expand Down Expand Up @@ -1158,7 +1353,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
switch produceVersion {
case v7:
case v3, v7:
recordBatch, err :=
newRecordBatch(
codec,
Expand All @@ -1167,45 +1362,40 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
if err != nil {
return err
}
return c.wb.writeProduceRequestV7(
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
c.transactionalID,
recordBatch,
)
case v3:
recordBatch, err :=
newRecordBatch(
codec,
msgs...,
)
if err != nil {
return err
// force stack allocation
var msgArr [1]partitionBatch
msgArr[0] = partitionBatch{
partition: c.partition,
recordBatch: recordBatch,
}
return c.wb.writeProduceRequestV3(
return c.wb.writeProduceRequest(
produceVersion,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
c.transactionalID,
recordBatch,
produceInput{
topic: c.topic,
partitions: msgArr[:],
},
)
default:
// force stack allocation
msgArr := [1]msgSetPartitionBatch{{
partition: c.partition,
messages: msgs,
}}
return c.wb.writeProduceRequestV2(
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
msgs...,
msgSetProduceInput{
topic: c.topic,
partitions: msgArr[:],
},
)
}
},
Expand Down
Loading