Skip to content

v2.1.0

Compare
Choose a tag to compare
@github-actions github-actions released this 20 Jun 11:24
· 14 commits to refs/heads/main since this release

Summary

We're excited to announce the Kafka Consumer utility for Java, which transparently handles message deserialization, provides an intuitive developer experience, and integrates seamlessly with the rest of the Powertools for AWS Lambda ecosystem.

Key features

  • Automatic deserialization of Kafka messages (JSON, Avro, and Protocol Buffers)
  • Simplified event record handling with familiar Kafka ConsumerRecords interface
  • Support for key and value deserialization
  • Support for Event Source Mapping (ESM) with and without Schema Registry integration

Getting Started

To get started, add the Powertools for AWS Lambda Kafka dependency to your project along with the required dependencies for your schema types:

Maven:

<dependency>
    <groupId>software.amazon.lambda</groupId>
    <artifactId>powertools-kafka</artifactId>
    <version>2.1.0</version>
</dependency>
<!-- Kafka clients dependency - compatibility works for >= 3.0.0 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>4.0.0</version>
</dependency>

Gradle:

dependencies {
    implementation 'software.amazon.lambda:powertools-kafka:2.1.0'
    // Kafka clients dependency - compatibility works for >= 3.0.0
    implementation 'org.apache.kafka:kafka-clients:4.0.0'
}

Processing Kafka events

Docs

You can use the Kafka consumer utility to transform raw Kafka events into an intuitive format for processing using the @Deserialization annotation.

The @Deserialization annotation can deserialize both keys and values independently based on your schema configuration. This flexibility allows you to easily combine primitive types (strings, integers) with complex types (Avro, Protocol Buffers, JSON) in the same message.

Working with Avro:

public class AvroKafkaHandler implements RequestHandler<ConsumerRecords<String, User>, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AvroKafkaHandler.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_AVRO)
    public String handleRequest(ConsumerRecords<String, User> records, Context context) {
        for (ConsumerRecord<String, User> record : records) {
            User user = record.value(); // User class is auto-generated from Avro schema
            LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
        }
        return "OK";
    }
}

Working with Protocol Buffers:

public class ProtobufKafkaHandler implements RequestHandler<ConsumerRecords<String, UserProto.User>, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufKafkaHandler.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
    public String handleRequest(ConsumerRecords<String, UserProto.User> records, Context context) {
        for (ConsumerRecord<String, UserProto.User> record : records) {
            UserProto.User user = record.value(); // UserProto.User class is auto-generated from Protocol Buffer schema
            LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
        }
        return "OK";
    }
}

Working with JSON messages:

public class JsonKafkaHandler implements RequestHandler<ConsumerRecords<String, User>, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaHandler.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_JSON)
    public String handleRequest(ConsumerRecords<String, User> records, Context context) {
        for (ConsumerRecord<String, User> record : records) {
            User user = record.value(); // Deserialized JSON object into User POJO
            LOGGER.info("Processing user: {}, age {}", user.getName(), user.getAge());
        }
        return "OK";
    }
}

Error handling

Docs

The Java Kafka utility uses Lambda custom serializers and performs eager deserialization of all records in the batch before your handler method is invoked.

This means that if any record in the batch fails deserialization, a RuntimeException will be thrown with a concrete error message explaining why deserialization failed, and your handler method will never be called.

Key implications:

  • Batch-level failure: If one record fails deserialization, the entire batch fails
  • Early failure detection: Deserialization errors are caught before your business logic runs
  • Clear error messages: The RuntimeException provides specific details about what went wrong
  • No partial processing: You cannot process some records while skipping failed ones within the same batch

Example of deserialization failure:

// If any record in the batch has invalid Avro data, you'll see:
// RuntimeException: Failed to deserialize Kafka record: Invalid Avro schema for record at offset 12345

Handling deserialization failures:

Since deserialization happens before your handler is called, you cannot catch these exceptions within your handler method. Instead, configure your Event Source Mapping with appropriate error handling:

  • Dead Letter Queue (DLQ): Configure a DLQ to capture failed batches for later analysis
  • Maximum Retry Attempts: Set appropriate retry limits to avoid infinite retries
  • Batch Size: Use smaller batch sizes to minimize the impact of individual record failures
# Example SAM template configuration for error handling
Events:
  KafkaEvent:
    Type: MSK
    Properties:
      # ... other properties
      BatchSize: 10 # Smaller batches reduce failure impact
      MaximumRetryAttempts: 3
      DestinationConfig:
        OnFailure:
          Type: SQS
          Destination: !GetAtt DeadLetterQueue.Arn

To learn more about the launch, read the blog post published alongside the release.

Changes

📜 Documentation updates

🐛 Bug and hot fixes

  • fix(ci): Add project description for new kafka utility (#1903) by @phipag
  • fix(metrics): Do not flush when no metrics were added to avoid printing root-level _aws dict (#1891) by @phipag

This release was made possible by the following contributors:

@github-actions[bot], @phipag, @sthulb and github-actions[bot]