Skip to content

Commit 84f40ed

Browse files
author
Steve van Loben Sels
committed
Add Conn.WriteMulti function to produce to multiple topics/partitions
1 parent f6986fb commit 84f40ed

File tree

4 files changed

+501
-117
lines changed

4 files changed

+501
-117
lines changed

conn.go

+216-26
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"os"
1111
"path/filepath"
1212
"runtime"
13+
"strconv"
14+
"strings"
1315
"sync"
1416
"sync/atomic"
1517
"time"
@@ -1124,6 +1126,199 @@ func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message
11241126
return c.writeCompressedMessages(codec, msgs...)
11251127
}
11261128

1129+
type MultiProduceRequest struct {
1130+
Topic string
1131+
Partitions []MultiProducePartition
1132+
}
1133+
1134+
type MultiProducePartition struct {
1135+
Partition int32
1136+
Messages []Message
1137+
}
1138+
1139+
type MultiProduceError []TopicPartitionError
1140+
1141+
func (e MultiProduceError) Error() string {
1142+
b := strings.Builder{}
1143+
for i, err := range e {
1144+
if i > 0 {
1145+
b.WriteString(", ")
1146+
}
1147+
b.WriteString(err.Topic)
1148+
b.WriteByte(':')
1149+
b.WriteString(strconv.Itoa(int(err.Partition)))
1150+
b.WriteByte(' ')
1151+
b.WriteString(err.Error.Error())
1152+
}
1153+
return b.String()
1154+
}
1155+
1156+
type TopicPartitionError struct {
1157+
Topic string
1158+
Partition int32
1159+
Error Error
1160+
}
1161+
1162+
// WriteMulti writes a batch of messages to potentially multiple topics and
1163+
// partitions. This is a function for the advanced use case where the caller is
1164+
// batching based on the broker and not just the topic+partition. Doing so is
1165+
// more complex but can result in far fewer calls over the network, especially
1166+
// for topics where the partition counts are much greater than the broker count.
1167+
//
1168+
// The caller is responsible for making sure that this Conn is the leder for
1169+
// all the requested topic+partition combos.
1170+
//
1171+
// Failures come in two forms. One is complete failure such as a transport
1172+
// error (e.g. a network timeout). The second is a partial failure where the
1173+
// broker responds with a kaka.Error for one or more partitions. In the latter
1174+
// case, this function will return a `MultiProduceError`. The caller must take
1175+
// care to inspect the returned error and determine whether it was a partial
1176+
// or complete failure and respond appropriately (e.g. by re-trying to publish
1177+
// on the failed topic+partitions).
1178+
func (c *Conn) WriteMulti(codec CompressionCodec, reqs ...MultiProduceRequest) error {
1179+
produceVersion, err := c.negotiateVersion(produce, v2, v3, v7)
1180+
if err != nil {
1181+
return err
1182+
}
1183+
1184+
writeTime := time.Now()
1185+
return c.writeOperation(
1186+
func(deadline time.Time, id int32) error {
1187+
now := time.Now()
1188+
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
1189+
switch produceVersion {
1190+
case v3, v7:
1191+
input := make([]produceInput, len(reqs))
1192+
for i, req := range reqs {
1193+
input[i] = produceInput{
1194+
topic: req.Topic,
1195+
partitions: make([]partitionBatch, len(reqs[i].Partitions)),
1196+
}
1197+
1198+
for j, part := range req.Partitions {
1199+
for k, msg := range part.Messages {
1200+
if msg.Time.IsZero() {
1201+
part.Messages[k].Time = writeTime
1202+
}
1203+
}
1204+
1205+
rb, err := newRecordBatch(codec, reqs[i].Partitions[j].Messages...)
1206+
if err != nil {
1207+
return err
1208+
}
1209+
1210+
input[i].partitions[j] = partitionBatch{
1211+
partition: part.Partition,
1212+
recordBatch: rb,
1213+
}
1214+
}
1215+
}
1216+
1217+
return c.wb.writeProduceRequest(
1218+
produceVersion,
1219+
id,
1220+
c.clientID,
1221+
deadlineToTimeout(deadline, now),
1222+
int16(atomic.LoadInt32(&c.requiredAcks)),
1223+
c.transactionalID,
1224+
input...,
1225+
)
1226+
default:
1227+
input := make([]msgSetProduceInput, len(reqs))
1228+
for i, req := range reqs {
1229+
input[i] = msgSetProduceInput{
1230+
topic: req.Topic,
1231+
partitions: make([]msgSetPartitionBatch, len(req.Partitions)),
1232+
}
1233+
1234+
for j, part := range req.Partitions {
1235+
for k, msg := range part.Messages {
1236+
if msg.Time.IsZero() {
1237+
part.Messages[k].Time = writeTime
1238+
}
1239+
}
1240+
input[i].partitions[j] = msgSetPartitionBatch{
1241+
partition: part.Partition,
1242+
messages: part.Messages,
1243+
}
1244+
}
1245+
}
1246+
1247+
return c.wb.writeProduceRequestV2(
1248+
codec,
1249+
id,
1250+
c.clientID,
1251+
deadlineToTimeout(deadline, now),
1252+
int16(atomic.LoadInt32(&c.requiredAcks)),
1253+
input...,
1254+
)
1255+
}
1256+
},
1257+
func(deadline time.Time, size int) error {
1258+
var mpErr MultiProduceError
1259+
remain, err := readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (remain int, err error) {
1260+
var topic string
1261+
if remain, err = readString(r, size, &topic); err != nil {
1262+
return remain, err
1263+
}
1264+
1265+
return readArrayWith(r, remain, func(r *bufio.Reader, size int) (remain int, err error) {
1266+
var partition int32
1267+
if remain, err = readInt32(&c.rbuf, size, &partition); err != nil {
1268+
return remain, err
1269+
}
1270+
1271+
// error code is the only thing we care about...if non-zero,
1272+
// then we have a partial failure scenario.
1273+
var errCode int16
1274+
if remain, err = readInt16(&c.rbuf, remain, &errCode); err != nil {
1275+
return remain, err
1276+
}
1277+
if errCode > 0 {
1278+
mpErr = append(mpErr, TopicPartitionError{
1279+
Topic: topic,
1280+
Partition: partition,
1281+
Error: Error(errCode),
1282+
})
1283+
}
1284+
1285+
// discard the offset
1286+
if remain, err = discardN(&c.rbuf, remain, 8); err != nil {
1287+
return remain, err
1288+
}
1289+
1290+
// discard the timestamp
1291+
if remain, err = discardN(&c.rbuf, remain, 8); err != nil {
1292+
return remain, err
1293+
}
1294+
1295+
// discard start offset.
1296+
if produceVersion >= v7 {
1297+
if remain, err = discardN(&c.rbuf, remain, 8); err != nil {
1298+
return remain, err
1299+
}
1300+
}
1301+
1302+
return remain, err
1303+
})
1304+
})
1305+
if err != nil {
1306+
return err
1307+
}
1308+
1309+
// discard throttle time
1310+
if err := expectZeroSize(discardInt32(&c.rbuf, remain)); err != nil {
1311+
return err
1312+
}
1313+
1314+
if len(mpErr) > 0 {
1315+
return mpErr
1316+
}
1317+
1318+
return nil
1319+
})
1320+
}
1321+
11271322
func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
11281323
if len(msgs) == 0 {
11291324
return
@@ -1159,7 +1354,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11591354
now := time.Now()
11601355
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
11611356
switch produceVersion {
1162-
case v7:
1357+
case v3, v7:
11631358
recordBatch, err :=
11641359
newRecordBatch(
11651360
codec,
@@ -1168,45 +1363,40 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11681363
if err != nil {
11691364
return err
11701365
}
1171-
return c.wb.writeProduceRequestV7(
1172-
id,
1173-
c.clientID,
1174-
c.topic,
1175-
c.partition,
1176-
deadlineToTimeout(deadline, now),
1177-
int16(atomic.LoadInt32(&c.requiredAcks)),
1178-
c.transactionalID,
1179-
recordBatch,
1180-
)
1181-
case v3:
1182-
recordBatch, err :=
1183-
newRecordBatch(
1184-
codec,
1185-
msgs...,
1186-
)
1187-
if err != nil {
1188-
return err
1366+
// force stack allocation
1367+
var msgArr [1]partitionBatch
1368+
msgArr[0] = partitionBatch{
1369+
partition: c.partition,
1370+
recordBatch: recordBatch,
11891371
}
1190-
return c.wb.writeProduceRequestV3(
1372+
return c.wb.writeProduceRequest(
1373+
produceVersion,
11911374
id,
11921375
c.clientID,
1193-
c.topic,
1194-
c.partition,
11951376
deadlineToTimeout(deadline, now),
11961377
int16(atomic.LoadInt32(&c.requiredAcks)),
11971378
c.transactionalID,
1198-
recordBatch,
1379+
produceInput{
1380+
topic: c.topic,
1381+
partitions: msgArr[:],
1382+
},
11991383
)
12001384
default:
1385+
// force stack allocation
1386+
msgArr := [1]msgSetPartitionBatch{{
1387+
partition: c.partition,
1388+
messages: msgs,
1389+
}}
12011390
return c.wb.writeProduceRequestV2(
12021391
codec,
12031392
id,
12041393
c.clientID,
1205-
c.topic,
1206-
c.partition,
12071394
deadlineToTimeout(deadline, now),
12081395
int16(atomic.LoadInt32(&c.requiredAcks)),
1209-
msgs...,
1396+
msgSetProduceInput{
1397+
topic: c.topic,
1398+
partitions: msgArr[:],
1399+
},
12101400
)
12111401
}
12121402
},

0 commit comments

Comments
 (0)