Skip to content

Coordinator load in progress error commit #1405

@oleg0406

Description

@oleg0406

Good afternoon!
Please help

We have a consumer client with manual commit

Below is the consumer configuration:

"bootstrap.servers": strings.Join(c.Brokers, ","),
"group.id": group.GroupID,
"auto.offset.reset": "earliest",
"queued.max.messages.kbytes": 10,
"enable.auto.commit": false,

We receive messages in batches and commit them in the function:

func (t *BaseKafka) fetchBatch(ctx context.Context) ([]kafka.Message, error) {
    gCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    messages := make([]kafka.Message, 0, t.ConsumerConfig.BatchSize)

    for len(messages) < t.ConsumerConfig.BatchSize { 
    select { 
        case <-gCtx.Done(): 
        if errors.Is(gCtx.Err(), context.DeadlineExceeded) || errors.Is(gCtx.Err(), context.Canceled) {
            return messages, nil
         }
         return nil, gCtx.Err()
         default:
         msg, err := t.Consumer.ReadMessage(100 * time.Millisecond)
         if err != nil {
              continue
         }

         if msg == nil {
               return nil, fmt.Errorf("received message is nil")
         }

         if msg.TopicPartition.Error != nil {
             return nil, msg.TopicPartition.Error 
         }

         messages = append(messages, *msg)
     }
}

return messages, nil
}

and an error periodically occurs:

Coordinator load in progress error commit

If this is related to rebalancing, tell me how to handle this error correctly?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions