v2.1.0
Summary
We're excited to announce the Kafka Consumer utility for Java, which transparently handles message deserialization, provides an intuitive developer experience, and integrates seamlessly with the rest of the Powertools for AWS Lambda ecosystem.
Key features
- Automatic deserialization of Kafka messages (JSON, Avro, and Protocol Buffers)
- Simplified event record handling with familiar Kafka
ConsumerRecords
interface - Support for key and value deserialization
- Support for Event Source Mapping (ESM) with and without Schema Registry integration
Getting Started
To get started, add the Powertools for AWS Lambda Kafka dependency to your project along with the required dependencies for your schema types:
Maven:
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-kafka</artifactId>
<version>2.1.0</version>
</dependency>
<!-- Kafka clients dependency - compatibility works for >= 3.0.0 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.0.0</version>
</dependency>
Gradle:
dependencies {
implementation 'software.amazon.lambda:powertools-kafka:2.1.0'
// Kafka clients dependency - compatibility works for >= 3.0.0
implementation 'org.apache.kafka:kafka-clients:4.0.0'
}
Processing Kafka events
You can use the Kafka consumer utility to transform raw Kafka events into an intuitive format for processing using the @Deserialization
annotation.
The @Deserialization
annotation can deserialize both keys and values independently based on your schema configuration. This flexibility allows you to easily combine primitive types (strings, integers) with complex types (Avro, Protocol Buffers, JSON) in the same message.
Working with Avro:
public class AvroKafkaHandler implements RequestHandler<ConsumerRecords<String, User>, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroKafkaHandler.class);
@Override
@Logging
@Deserialization(type = DeserializationType.KAFKA_AVRO)
public String handleRequest(ConsumerRecords<String, User> records, Context context) {
for (ConsumerRecord<String, User> record : records) {
User user = record.value(); // User class is auto-generated from Avro schema
LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
}
return "OK";
}
}
Working with Protocol Buffers:
public class ProtobufKafkaHandler implements RequestHandler<ConsumerRecords<String, UserProto.User>, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufKafkaHandler.class);
@Override
@Logging
@Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
public String handleRequest(ConsumerRecords<String, UserProto.User> records, Context context) {
for (ConsumerRecord<String, UserProto.User> record : records) {
UserProto.User user = record.value(); // UserProto.User class is auto-generated from Protocol Buffer schema
LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
}
return "OK";
}
}
Working with JSON messages:
public class JsonKafkaHandler implements RequestHandler<ConsumerRecords<String, User>, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaHandler.class);
@Override
@Logging
@Deserialization(type = DeserializationType.KAFKA_JSON)
public String handleRequest(ConsumerRecords<String, User> records, Context context) {
for (ConsumerRecord<String, User> record : records) {
User user = record.value(); // Deserialized JSON object into User POJO
LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
}
return "OK";
}
}
Error handling
The Java Kafka utility uses Lambda custom serializers and performs eager deserialization of all records in the batch before your handler method is invoked.
This means that if any record in the batch fails deserialization, a RuntimeException
will be thrown with a concrete error message explaining why deserialization failed, and your handler method will never be called.
Key implications:
- Batch-level failure: If one record fails deserialization, the entire batch fails
- Early failure detection: Deserialization errors are caught before your business logic runs
- Clear error messages: The
RuntimeException
provides specific details about what went wrong - No partial processing: You cannot process some records while skipping failed ones within the same batch
Example of deserialization failure:
// If any record in the batch has invalid Avro data, you'll see:
// RuntimeException: Failed to deserialize Kafka record: Invalid Avro schema for record at offset 12345
Handling deserialization failures:
Since deserialization happens before your handler is called, you cannot catch these exceptions within your handler method. Instead, configure your Event Source Mapping with appropriate error handling:
- Dead Letter Queue (DLQ): Configure a DLQ to capture failed batches for later analysis
- Maximum Retry Attempts: Set appropriate retry limits to avoid infinite retries
- Batch Size: Use smaller batch sizes to minimize the impact of individual record failures
# Example SAM template configuration for error handling
Events:
KafkaEvent:
Type: MSK
Properties:
# ... other properties
BatchSize: 10 # Smaller batches reduce failure impact
MaximumRetryAttempts: 3
DestinationConfig:
OnFailure:
Type: SQS
Destination: !GetAtt DeadLetterQueue.Arn
To learn more about the launch, read the blog post published alongside the release.
Changes
- chore(ci): bump version to 2.1.0 (#1904) by @github-actions[bot]
- fix(kafka): Add support for confluent message indices. (#1902) by @phipag
- feat(kafka): New Kafka utility (#1898) by @phipag
- chore(ci): Update workflows to make v2 the default (#1888) by @sthulb
- chore(ci): bump version to 2.0.0 (#1876) by @github-actions[bot]
📜 Documentation updates
🐛 Bug and hot fixes
- fix(ci): Add project description for new kafka utility (#1903) by @phipag
- fix(metrics): Do not flush when no metrics were added to avoid printing root-level _aws dict (#1891) by @phipag
This release was made possible by the following contributors:
@github-actions[bot], @phipag, @sthulb and github-actions[bot]