@@ -208,12 +208,17 @@ func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, origin
208
208
if pc .SubType != Assign {
209
209
return originOffset
210
210
}
211
- value , exist := pc .mq2seekOffset .LoadAndDelete (mq )
211
+ value , exist := pc .mq2seekOffset .LoadAndDelete (* mq )
212
212
if ! exist {
213
213
return originOffset
214
214
} else {
215
215
nextOffset := value .(int64 )
216
216
_ = pc .updateOffset (mq , nextOffset )
217
+ rlog .Info ("pull consumer assign new offset" , map [string ]interface {}{
218
+ "group" : pc .GroupName ,
219
+ "mq" : mq ,
220
+ "offset" : nextOffset ,
221
+ })
217
222
return nextOffset
218
223
}
219
224
}
@@ -711,7 +716,7 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.Mes
711
716
}
712
717
713
718
func (pc * defaultPullConsumer ) SeekOffset (mq * primitive.MessageQueue , offset int64 ) {
714
- pc .mq2seekOffset .Store (mq , offset )
719
+ pc .mq2seekOffset .Store (* mq , offset )
715
720
rlog .Info ("pull consumer seek offset" , map [string ]interface {}{
716
721
"mq" : mq ,
717
722
"offset" : offset ,
@@ -881,6 +886,8 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
881
886
pullRequest .SysFlag = clearCommitOffsetFlag (pullRequest .SysFlag )
882
887
}
883
888
889
+ rlog .Debug (fmt .Sprintf ("defaultPullConsumer pull message from broker: %s, request: %+v" , brokerResult .BrokerAddr , pullRequest ), nil )
890
+
884
891
result , err := pc .client .PullMessage (context .Background (), brokerResult .BrokerAddr , pullRequest )
885
892
if err != nil {
886
893
rlog .Warning ("defaultPullConsumer pull message from broker error" , map [string ]interface {}{
0 commit comments