diff --git a/consumergroup.go b/consumergroup.go index f4bb382c..90c0d623 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -925,12 +925,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // the leader. Otherwise, GroupMemberAssignments will be nil. // // Possible kafka error codes returned: -// * GroupLoadInProgress: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * InconsistentGroupProtocol: -// * InvalidSessionTimeout: -// * GroupAuthorizationFailed: +// - GroupLoadInProgress: +// - GroupCoordinatorNotAvailable: +// - NotCoordinatorForGroup: +// - InconsistentGroupProtocol: +// - InvalidSessionTimeout: +// - GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { @@ -1073,11 +1073,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember // Readers subscriptions topic => partitions // // Possible kafka error codes returned: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * IllegalGeneration: -// * RebalanceInProgress: -// * GroupAuthorizationFailed: +// - GroupCoordinatorNotAvailable: +// - NotCoordinatorForGroup: +// - IllegalGeneration: +// - RebalanceInProgress: +// - GroupAuthorizationFailed: func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) { request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) response, err := conn.syncGroup(request) diff --git a/example_groupbalancer_test.go b/example_groupbalancer_test.go index e81cb494..be24d1eb 100644 --- a/example_groupbalancer_test.go +++ b/example_groupbalancer_test.go @@ -31,9 +31,9 @@ func ExampleNewReader_rackAffinity() { } // findRack is the basic rack resolver strategy for use in AWS. It supports -// * ECS with the task metadata endpoint enabled (returns the container -// instance's availability zone) -// * Linux EC2 (returns the instance's availability zone) +// - ECS with the task metadata endpoint enabled (returns the container +// instance's availability zone) +// - Linux EC2 (returns the instance's availability zone) func findRack() string { switch whereAmI() { case "ecs": diff --git a/groupbalancer.go b/groupbalancer.go index 9491bc50..61741b73 100644 --- a/groupbalancer.go +++ b/groupbalancer.go @@ -41,14 +41,15 @@ type GroupBalancer interface { // RangeGroupBalancer groups consumers by partition // // Example: 5 partitions, 2 consumers -// C0: [0, 1, 2] -// C1: [3, 4] +// +// C0: [0, 1, 2] +// C1: [3, 4] // // Example: 6 partitions, 3 consumers -// C0: [0, 1] -// C1: [2, 3] -// C2: [4, 5] // +// C0: [0, 1] +// C1: [2, 3] +// C2: [4, 5] type RangeGroupBalancer struct{} func (r RangeGroupBalancer) ProtocolName() string { @@ -92,14 +93,15 @@ func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions // RoundrobinGroupBalancer divides partitions evenly among consumers // // Example: 5 partitions, 2 consumers -// C0: [0, 2, 4] -// C1: [1, 3] +// +// C0: [0, 2, 4] +// C1: [1, 3] // // Example: 6 partitions, 3 consumers -// C0: [0, 3] -// C1: [1, 4] -// C2: [2, 5] // +// C0: [0, 3] +// C1: [1, 4] +// C2: [2, 5] type RoundRobinGroupBalancer struct{} func (r RoundRobinGroupBalancer) ProtocolName() string { diff --git a/kafka_test.go b/kafka_test.go index 4c6af706..c9aa7c46 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -189,3 +189,15 @@ func (l *testKafkaLogger) Printf(msg string, args ...interface{}) { l.T.Logf(msg, args...) } } + +type testRebalanceEventCallback struct { + NoticeChan chan map[string][]PartitionAssignment +} + +func newTestRebalanceEventCallback(c chan map[string][]PartitionAssignment) RebalanceEventInterceptor { + return &testRebalanceEventCallback{NoticeChan: c} +} + +func (c *testRebalanceEventCallback) Callback(partitionAssignments map[string][]PartitionAssignment) { + c.NoticeChan <- partitionAssignments +} diff --git a/logger.go b/logger.go index d359ab78..4f77fcf0 100644 --- a/logger.go +++ b/logger.go @@ -7,11 +7,12 @@ type Logger interface { // LoggerFunc is a bridge between Logger and any third party logger // Usage: -// l := NewLogger() // some logger -// r := kafka.NewReader(kafka.ReaderConfig{ -// Logger: kafka.LoggerFunc(l.Infof), -// ErrorLogger: kafka.LoggerFunc(l.Errorf), -// }) +// +// l := NewLogger() // some logger +// r := kafka.NewReader(kafka.ReaderConfig{ +// Logger: kafka.LoggerFunc(l.Infof), +// ErrorLogger: kafka.LoggerFunc(l.Errorf), +// }) type LoggerFunc func(string, ...interface{}) func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) } diff --git a/reader.go b/reader.go index 04d90f35..d2d4a1a0 100644 --- a/reader.go +++ b/reader.go @@ -331,6 +331,10 @@ func (r *Reader) run(cg *ConsumerGroup) { r.subscribe(gen.Assignments) + r.withRebalanceEventInterceptor(func(l RebalanceEventInterceptor) { + l.Callback(gen.Assignments) + }) + gen.Start(func(ctx context.Context) { r.commitLoop(ctx, gen) }) @@ -522,6 +526,9 @@ type ReaderConfig struct { // This flag is being added to retain backwards-compatibility, so it will be // removed in a future version of kafka-go. OffsetOutOfRangeError bool + + // If not nil, specifies a callback usd to report rebalance events + RebalanceEventInterceptor RebalanceEventInterceptor } // Validate method validates ReaderConfig properties. @@ -1142,6 +1149,12 @@ func (r *Reader) withErrorLogger(do func(Logger)) { } } +func (r *Reader) withRebalanceEventInterceptor(do func(RebalanceEventInterceptor)) { + if r.config.RebalanceEventInterceptor != nil { + do(r.config.RebalanceEventInterceptor) + } +} + func (r *Reader) activateReadLag() { if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) { // read lag will only be calculated when not using consumer groups diff --git a/reader_test.go b/reader_test.go index f413d742..36e6ff5e 100644 --- a/reader_test.go +++ b/reader_test.go @@ -395,6 +395,52 @@ func deleteTopic(t *testing.T, topic ...string) { } } +func TestReaderCollectsRebalanceEvents(t *testing.T) { + const GroupId = "a" + const Partitions = 5 + topic := makeTopic() + createTopic(t, topic, Partitions) + defer deleteTopic(t, topic) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + c := make(chan map[string][]PartitionAssignment) + eventReceived := false + defer func() { + if !eventReceived { + t.Error("no rebalance event received") + } + }() + go func() { + firstAssignment := <-c + if len(firstAssignment) != 1 { + t.Error("multiple topics assigned") + } + info, ok := firstAssignment[topic] + if !ok { + t.Error("wrong topic assigned") + } + if len(info) != Partitions { + t.Error("wrong number of partitions assigned") + } + eventReceived = true + }() + + r := NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: topic, + GroupID: GroupId, + MinBytes: 1, + MaxBytes: 1000, + MaxWait: 100 * time.Millisecond, + RebalanceEventInterceptor: newTestRebalanceEventCallback(c), + }) + defer r.Close() + + prepareReader(t, ctx, r, makeTestSequence(1)...) + _, _ = r.ReadMessage(ctx) +} + func TestReaderOnNonZeroPartition(t *testing.T) { tests := []struct { scenario string diff --git a/rebalance_callback.go b/rebalance_callback.go new file mode 100644 index 00000000..2afa9cf6 --- /dev/null +++ b/rebalance_callback.go @@ -0,0 +1,12 @@ +package kafka + +// RebalanceEventInterceptor defines the rebalance event callback API +type RebalanceEventInterceptor interface { + Callback(map[string][]PartitionAssignment) +} + +type RebalanceFunc func(map[string][]PartitionAssignment) + +func (f RebalanceFunc) Callback(partitionAssignments map[string][]PartitionAssignment) { + f(partitionAssignments) +}