You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This proposal aims to improve schema validation for Avro schemas by introducing mechanisms to handle both startup and runtime schema validation, ensuring better fault tolerance and observability.
Goals
Validate Avro schemas during application startup to prevent starting with incompatible schemas.
Periodically validate schemas at runtime to detect and handle schema changes.
Send records with deserialization errors to a DLQ for further analysis.
Provide configuration options to enable/disable startup and runtime validation independently.
Current Implementation
In the current implementation, schema deserialization errors are not explicitly handled. Here's what happens:
Default Behavior:
If a deserialization error occurs (e.g., due to an incompatible schema), Kafka Streams invokes the configured DeserializationExceptionHandler.
By default, Kafka Streams uses the LogAndFailExceptionHandler, which logs the error and stops the application.
Uncaught Exceptions:
If the deserialization error leads to an uncaught exception, the StreamsUncaughtExceptionHandler is triggered.
The default behavior is to shut down the Kafka Streams client (SHUTDOWN_CLIENT) when an uncaught exception occurs, as defined in the onStreamsUncaughtException method.
DLQ Handling:
There is no explicit logic to send records with deserialization errors to a Dead Letter Queue (DLQ). Only records that fail during processing (e.g., in a processor) can be sent to the DLQ if it is implemented.
In short, deserialization errors are logged, and the application may stop depending on the configured exception handlers, but the records are not sent to a DLQ by default.
Proposed Changes
1. Startup Schema Validation
Adding a method to validate schemas during the init phase of KafkaStreamsInitializer.
If validation fails, the application will log the error and terminate as the probes will fail.
2. Runtime Schema Validation
Introducing a scheduled task to periodically validate schemas against the schema registry.
If an incompatible schema is detected, we can log the issue and optionally shut down the application or take corrective actions. (But what ?)
3. Enhanced DLQ Handling
Extending the DLQ mechanism to handle records that fail deserialization at the topology level. (Is it okay to do so? or can have issues in unmonitored applications ?)
Implementing a custom DeserializationExceptionHandler to forward such records to the DLQ. (Is it okay to do so? or can have issues in unmonitored applications ?)
4. Configuration Options
Adding configuration properties to enable/disable startup and runtime schema validation:
schema.validation.startup.enabled
schema.validation.runtime.enabled
schema.validation.runtime.interval (e.g., in minutes/hrs/days ?)
Motivation
This proposal aims to improve schema validation for Avro schemas by introducing mechanisms to handle both startup and runtime schema validation, ensuring better fault tolerance and observability.
Goals
Current Implementation
In the current implementation, schema deserialization errors are not explicitly handled. Here's what happens:
Default Behavior:
DeserializationExceptionHandler
.LogAndFailExceptionHandler
, which logs the error and stops the application.Uncaught Exceptions:
StreamsUncaughtExceptionHandler
is triggered.SHUTDOWN_CLIENT
) when an uncaught exception occurs, as defined in theonStreamsUncaughtException
method.DLQ Handling:
In short, deserialization errors are logged, and the application may stop depending on the configured exception handlers, but the records are not sent to a DLQ by default.
Proposed Changes
1. Startup Schema Validation
init
phase ofKafkaStreamsInitializer
.2. Runtime Schema Validation
3. Enhanced DLQ Handling
DeserializationExceptionHandler
to forward such records to the DLQ. (Is it okay to do so? or can have issues in unmonitored applications ?)4. Configuration Options
schema.validation.startup.enabled
schema.validation.runtime.enabled
schema.validation.runtime.interval
(e.g., in minutes/hrs/days ?)@loicgreffier , @sebastienviale
The text was updated successfully, but these errors were encountered: