Skip to content

Commit 3660b0a

Browse files
authored
Merge pull request #909 from aws-powertools/feat/kafka-consumer-proto-schemas
feat(kafka): add logic to handle protobuf deserialization
2 parents 0f12fc8 + 376ae66 commit 3660b0a

15 files changed

+712
-698
lines changed

libraries/src/AWS.Lambda.Powertools.Kafka.Avro/PowertoolsKafkaAvroSerializer.cs

Lines changed: 30 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class PowertoolsKafkaAvroSerializer : PowertoolsKafkaSerializerBase
5555
public PowertoolsKafkaAvroSerializer() : base()
5656
{
5757
}
58-
58+
5959
/// <summary>
6060
/// Initializes a new instance of the <see cref="PowertoolsKafkaAvroSerializer"/> class
6161
/// with custom JSON serialization options.
@@ -64,7 +64,7 @@ public PowertoolsKafkaAvroSerializer() : base()
6464
public PowertoolsKafkaAvroSerializer(JsonSerializerOptions jsonOptions) : base(jsonOptions)
6565
{
6666
}
67-
67+
6868
/// <summary>
6969
/// Initializes a new instance of the <see cref="PowertoolsKafkaAvroSerializer"/> class
7070
/// with a JSON serializer context for AOT-compatible serialization.
@@ -73,62 +73,41 @@ public PowertoolsKafkaAvroSerializer(JsonSerializerOptions jsonOptions) : base(j
7373
public PowertoolsKafkaAvroSerializer(JsonSerializerContext serializerContext) : base(serializerContext)
7474
{
7575
}
76-
77-
/// <summary>
78-
/// Gets the Avro schema for the specified type.
79-
/// The type must have a public static _SCHEMA field defined.
80-
/// </summary>
81-
/// <param name="payloadType">The type to get the Avro schema for.</param>
82-
/// <returns>The Avro Schema object.</returns>
83-
/// <exception cref="InvalidOperationException">Thrown if no schema is found for the type.</exception>
84-
[RequiresDynamicCode("Avro schema access requires reflection which may be incompatible with AOT.")]
85-
[RequiresUnreferencedCode("Avro schema access requires reflection which may be incompatible with trimming.")]
86-
private Schema? GetAvroSchema([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields)] Type payloadType)
87-
{
88-
var schemaField = payloadType.GetField("_SCHEMA",
89-
BindingFlags.Public | BindingFlags.Static);
90-
91-
if (schemaField == null)
92-
return null;
93-
94-
return schemaField.GetValue(null) as Schema;
95-
}
9676

9777
/// <summary>
9878
/// Deserializes complex (non-primitive) types using Avro format.
79+
/// Requires types to have a public static _SCHEMA field.
9980
/// </summary>
100-
/// <param name="data">The binary data to deserialize.</param>
101-
/// <param name="targetType">The type to deserialize to.</param>
102-
/// <param name="isKey">Whether this data represents a key (true) or a value (false).</param>
103-
/// <returns>The deserialized object.</returns>
10481
[RequiresDynamicCode("Avro deserialization might require runtime code generation.")]
10582
[RequiresUnreferencedCode("Avro deserialization might require types that cannot be statically analyzed.")]
106-
protected override object? DeserializeComplexTypeFormat(byte[] data,
107-
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields)]
108-
Type targetType, bool isKey)
83+
protected override object? DeserializeComplexTypeFormat(byte[] data,
84+
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields)]
85+
Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
10986
{
110-
try
87+
var schema = GetAvroSchema(targetType);
88+
if (schema == null)
11189
{
112-
// Try to get Avro schema for the type
113-
var schema = GetAvroSchema(targetType);
114-
115-
if (schema != null)
116-
{
117-
using var stream = new MemoryStream(data);
118-
var decoder = new BinaryDecoder(stream);
119-
var reader = new SpecificDatumReader<object>(schema, schema);
120-
return reader.Read(null!, decoder);
121-
}
122-
123-
// If no Avro schema was found, throw an exception
124-
throw new InvalidOperationException($"Unsupported type for Avro deserialization: {targetType.Name}. " +
125-
"Avro deserialization requires a type with a static _SCHEMA field. " +
126-
"Consider using an alternative Deserializer.");
127-
}
128-
catch (Exception ex)
129-
{
130-
// Preserve the error message while wrapping in SerializationException for consistent error handling
131-
throw new System.Runtime.Serialization.SerializationException($"Failed to deserialize {(isKey ? "key" : "value")} data: {ex.Message}", ex);
90+
throw new InvalidOperationException(
91+
$"Unsupported type for Avro deserialization: {targetType.Name}. " +
92+
"Avro deserialization requires a type with a static _SCHEMA field. " +
93+
"Consider using an alternative Deserializer.");
13294
}
95+
96+
using var stream = new MemoryStream(data);
97+
var decoder = new BinaryDecoder(stream);
98+
var reader = new SpecificDatumReader<object>(schema, schema);
99+
return reader.Read(null!, decoder);
100+
}
101+
102+
/// <summary>
103+
/// Gets the Avro schema for the specified type from its static _SCHEMA field.
104+
/// </summary>
105+
[RequiresDynamicCode("Avro schema access requires reflection.")]
106+
[RequiresUnreferencedCode("Avro schema access requires reflection.")]
107+
private Schema? GetAvroSchema(
108+
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields)] Type payloadType)
109+
{
110+
var schemaField = payloadType.GetField("_SCHEMA", BindingFlags.Public | BindingFlags.Static);
111+
return schemaField?.GetValue(null) as Schema;
133112
}
134-
}
113+
}

libraries/src/AWS.Lambda.Powertools.Kafka.Json/PowertoolsKafkaJsonSerializer.cs

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class PowertoolsKafkaJsonSerializer : PowertoolsKafkaSerializerBase
3434
public PowertoolsKafkaJsonSerializer() : base()
3535
{
3636
}
37-
37+
3838
/// <summary>
3939
/// Initializes a new instance of the <see cref="PowertoolsKafkaJsonSerializer"/> class
4040
/// with custom JSON serialization options.
@@ -43,7 +43,7 @@ public PowertoolsKafkaJsonSerializer() : base()
4343
public PowertoolsKafkaJsonSerializer(JsonSerializerOptions jsonOptions) : base(jsonOptions)
4444
{
4545
}
46-
46+
4747
/// <summary>
4848
/// Initializes a new instance of the <see cref="PowertoolsKafkaJsonSerializer"/> class
4949
/// with a JSON serializer context for AOT-compatible serialization.
@@ -52,62 +52,37 @@ public PowertoolsKafkaJsonSerializer(JsonSerializerOptions jsonOptions) : base(j
5252
public PowertoolsKafkaJsonSerializer(JsonSerializerContext serializerContext) : base(serializerContext)
5353
{
5454
}
55-
55+
5656
/// <summary>
5757
/// Deserializes complex (non-primitive) types using JSON format.
5858
/// </summary>
59-
/// <param name="data">The binary data to deserialize.</param>
60-
/// <param name="targetType">The type to deserialize to.</param>
61-
/// <param name="isKey">Whether this data represents a key (true) or a value (false).</param>
62-
/// <returns>The deserialized object.</returns>
6359
[RequiresDynamicCode("JSON deserialization might require runtime code generation.")]
6460
[RequiresUnreferencedCode("JSON deserialization might require types that cannot be statically analyzed.")]
65-
protected override object? DeserializeComplexTypeFormat(byte[] data,
66-
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicProperties |
61+
protected override object? DeserializeComplexTypeFormat(byte[] data,
62+
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicProperties |
6763
DynamicallyAccessedMemberTypes.PublicFields)]
68-
Type targetType, bool isKey)
64+
Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
6965
{
7066
if (data == null || data.Length == 0)
7167
{
7268
return targetType.IsValueType ? Activator.CreateInstance(targetType) : null;
7369
}
74-
75-
try
76-
{
77-
// Convert bytes to JSON string
78-
var jsonStr = Encoding.UTF8.GetString(data);
7970

80-
// First try context-based deserialization if available
81-
if (SerializerContext != null)
71+
var jsonStr = Encoding.UTF8.GetString(data);
72+
73+
// Try context-based deserialization first
74+
if (SerializerContext != null)
75+
{
76+
var typeInfo = SerializerContext.GetTypeInfo(targetType);
77+
if (typeInfo != null)
8278
{
83-
// Try to get type info from context for AOT compatibility
84-
var typeInfo = SerializerContext.GetTypeInfo(targetType);
85-
if (typeInfo != null)
86-
{
87-
try
88-
{
89-
var result = JsonSerializer.Deserialize(jsonStr, typeInfo);
90-
if (result != null)
91-
{
92-
return result;
93-
}
94-
}
95-
catch
96-
{
97-
// Continue to fallback if context-based deserialization fails
98-
}
99-
}
79+
return JsonSerializer.Deserialize(jsonStr, typeInfo);
10080
}
101-
102-
// Fallback to regular deserialization - this should handle types not in the context
103-
#pragma warning disable IL2026, IL3050
104-
return JsonSerializer.Deserialize(jsonStr, targetType, JsonOptions);
105-
#pragma warning restore IL2026, IL3050
106-
}
107-
catch
108-
{
109-
// If all deserialization attempts fail, return null or default
110-
return targetType.IsValueType ? Activator.CreateInstance(targetType) : null;
11181
}
82+
83+
// Fallback to regular deserialization
84+
#pragma warning disable IL2026, IL3050
85+
return JsonSerializer.Deserialize(jsonStr, targetType, JsonOptions);
86+
#pragma warning restore IL2026, IL3050
11287
}
113-
}
88+
}

0 commit comments

Comments
 (0)