-
Notifications
You must be signed in to change notification settings - Fork 691
Open
Description
Good afternoon!
Please help
We have a consumer client with manual commit
Below is the consumer configuration:
"bootstrap.servers": strings.Join(c.Brokers, ","),
"group.id": group.GroupID,
"auto.offset.reset": "earliest",
"queued.max.messages.kbytes": 10,
"enable.auto.commit": false,
We receive messages in batches and commit them in the function:
func (t *BaseKafka) fetchBatch(ctx context.Context) ([]kafka.Message, error) {
gCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
messages := make([]kafka.Message, 0, t.ConsumerConfig.BatchSize)
for len(messages) < t.ConsumerConfig.BatchSize {
select {
case <-gCtx.Done():
if errors.Is(gCtx.Err(), context.DeadlineExceeded) || errors.Is(gCtx.Err(), context.Canceled) {
return messages, nil
}
return nil, gCtx.Err()
default:
msg, err := t.Consumer.ReadMessage(100 * time.Millisecond)
if err != nil {
continue
}
if msg == nil {
return nil, fmt.Errorf("received message is nil")
}
if msg.TopicPartition.Error != nil {
return nil, msg.TopicPartition.Error
}
messages = append(messages, *msg)
}
}
return messages, nil
}
and an error periodically occurs:
Coordinator load in progress error commit
If this is related to rebalancing, tell me how to handle this error correctly?
Metadata
Metadata
Assignees
Labels
No labels