-
Notifications
You must be signed in to change notification settings - Fork 696
Description
Description
When sending data to kafka using an async, idempotent producer, we often see delivery report channel receive errors only showing:
"Local: Fatal error"
Our delivery report logging looks like:
func (p *KafkaProducer) Run(doneChan chan bool) {
defer close(doneChan)
// Delivery report handler for produced messages
go func() {
for e := range p.Producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
msg := ev
if msg.TopicPartition.Error != nil {
log.WithError(msg.TopicPartition.Error).Error("Data delivery to kafka failed")
} /*else {
log.Infof("Data delivered successfully to (%s) [%d]. Offset %v\n",
*msg.TopicPartition.Topic, msg.TopicPartition.Partition, msg.TopicPartition.Offset)
}*/
default:
log.WithFields(log.Fields{
"event": ev,
}).Info("Ignore event")
}
}
}()
}
I did a little digging and this is just a generic error that the underlying librdkafka gives: https://docs.confluent.io/5.5.1/clients/librdkafka/rdkafka_8h.html#a44c976534da6f3877cc514826c71607c.
But it's not clear on what that exactly means:
| RD_KAFKA_RESP_ERR__FATAL | Fatal error: see rd_kafka_fatal_error() |
|---|
Has anyone seen this specific error before and perhaps any helpful information? In the meantime, I've added debug attribute to the ConfigMap to see if we can spot the issue but it hasn't started happening again yet.
I do not see anything in the Broker logs at an INFO level that but I may be looking for the wrong thing.
See comment: #830 (comment) on why this is happening to us
How to reproduce
For us, this seems to happen over time, we do put a large volume of messages in but I'm sure others do more. It doesn't seem to happen at first but gradually ends up in that state at times. Unsure at the moment on how to exactly trigger this error.
See comment: #830 (comment) on how to reproduce.
Checklist
Please provide the following information:
- confluent-kafka-go and librdkafka version (
LibraryVersion()): v1.9.1 - confluent-kafka-go - Apache Kafka broker version: 2.8.1
- Client configuration:
ConfigMap{...}:
{
"bootstrap.servers": brokers,
"compression.type": "gzip",
"linger.ms": 500,
"retries": "999",
"enable.idempotence": "true",
"acks": "all",
}
- Operating system: Docker Alpine 1.18
- Provide client logs (with
"debug": ".."as necessary)
%0|1659595801.132|FATAL|rdkafka#producer-1| [thrd:KAFKA_BROKER_2:9092/bootst]: Fatal error: Broker: Broker received an out of order sequence number: ProduceRequest for REDACT [0] with 9 message(s) failed due to sequence desynchronization with broker 2 (PID{Id:155002,Epoch:0}, base seq 110234700, idemp state change 52807639ms ago, last partition error NOT_LEADER_FOR_PARTITION (actions Refresh,MsgNotPersisted, base seq 110234700..110234708, base msgid 110234701, 127ms ago)
%7|1659595801.133|IDEMPSTATE|rdkafka#producer-1| [thrd:KAFKA_BROKER_2:9092/bootst]: Idempotent producer state change DrainBump -> FatalError
- Provide broker log excerpts - nothing showing on info level
- Critical issue