Skip to content

Commit 572ff0d

Browse files
committed
feat: enhance Protobuf deserialization with schema metadata support for confluent, glue and pure proto
1 parent 0f12fc8 commit 572ff0d

File tree

10 files changed

+634
-255
lines changed

10 files changed

+634
-255
lines changed

libraries/src/AWS.Lambda.Powertools.Kafka.Avro/PowertoolsKafkaAvroSerializer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,13 @@ public PowertoolsKafkaAvroSerializer(JsonSerializerContext serializerContext) :
100100
/// <param name="data">The binary data to deserialize.</param>
101101
/// <param name="targetType">The type to deserialize to.</param>
102102
/// <param name="isKey">Whether this data represents a key (true) or a value (false).</param>
103+
/// <param name="schemaMetadata">Optional schema metadata for the data.</param>
103104
/// <returns>The deserialized object.</returns>
104105
[RequiresDynamicCode("Avro deserialization might require runtime code generation.")]
105106
[RequiresUnreferencedCode("Avro deserialization might require types that cannot be statically analyzed.")]
106107
protected override object? DeserializeComplexTypeFormat(byte[] data,
107108
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields)]
108-
Type targetType, bool isKey)
109+
Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
109110
{
110111
try
111112
{

libraries/src/AWS.Lambda.Powertools.Kafka.Json/PowertoolsKafkaJsonSerializer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,14 @@ public PowertoolsKafkaJsonSerializer(JsonSerializerContext serializerContext) :
5959
/// <param name="data">The binary data to deserialize.</param>
6060
/// <param name="targetType">The type to deserialize to.</param>
6161
/// <param name="isKey">Whether this data represents a key (true) or a value (false).</param>
62+
/// <param name="schemaMetadata">Optional schema metadata for the data.</param>
6263
/// <returns>The deserialized object.</returns>
6364
[RequiresDynamicCode("JSON deserialization might require runtime code generation.")]
6465
[RequiresUnreferencedCode("JSON deserialization might require types that cannot be statically analyzed.")]
6566
protected override object? DeserializeComplexTypeFormat(byte[] data,
6667
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicProperties |
6768
DynamicallyAccessedMemberTypes.PublicFields)]
68-
Type targetType, bool isKey)
69+
Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
6970
{
7071
if (data == null || data.Length == 0)
7172
{

libraries/src/AWS.Lambda.Powertools.Kafka.Protobuf/PowertoolsKafkaProtobufSerializer.cs

Lines changed: 201 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System.Collections.Concurrent;
1717
using System.Diagnostics.CodeAnalysis;
1818
using System.Reflection;
19+
using System.Text;
1920
using System.Text.Json;
2021
using System.Text.Json.Serialization;
2122
using Google.Protobuf;
@@ -79,19 +80,23 @@ public PowertoolsKafkaProtobufSerializer(JsonSerializerContext serializerContext
7980

8081
/// <summary>
8182
/// Deserializes complex (non-primitive) types using Protobuf format.
82-
/// Handles both standard protobuf serialization and Confluent Schema Registry serialization.
83+
/// Handles different parsing strategies based on schema metadata:
84+
/// - No schema ID: Pure Protobuf deserialization
85+
/// - UUID schema ID (16+ chars): Glue format - strips first byte
86+
/// - 4-character schema ID: Confluent format - strips message field numbers
8387
/// </summary>
8488
/// <param name="data">The binary data to deserialize.</param>
8589
/// <param name="targetType">The type to deserialize to.</param>
8690
/// <param name="isKey">Whether this data represents a key (true) or a value (false).</param>
91+
/// <param name="schemaMetadata">Optional schema metadata for the data.</param>
8792
/// <returns>The deserialized object.</returns>
8893
[RequiresDynamicCode("Protobuf deserialization might require runtime code generation.")]
8994
[RequiresUnreferencedCode(
9095
"Protobuf deserialization might require types that cannot be statically analyzed.")]
9196
protected override object? DeserializeComplexTypeFormat(byte[] data,
9297
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicProperties |
9398
DynamicallyAccessedMemberTypes.PublicFields)]
94-
Type targetType, bool isKey)
99+
Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
95100
{
96101
try
97102
{
@@ -104,41 +109,201 @@ public PowertoolsKafkaProtobufSerializer(JsonSerializerContext serializerContext
104109
{
105110
throw new InvalidOperationException($"Could not find Protobuf parser for type {targetType.Name}");
106111
}
107-
112+
113+
// Determine parsing strategy based on schema metadata
114+
var parsingStrategy = DetermineParsingStrategy(schemaMetadata);
115+
108116
try
109117
{
110-
// First, try standard protobuf deserialization
111-
return parser.ParseFrom(data);
118+
return parsingStrategy switch
119+
{
120+
ProtobufParsingStrategy.Pure => DeserializePureProtobuf(data, parser),
121+
ProtobufParsingStrategy.Glue => DeserializeGlueProtobuf(data, parser),
122+
ProtobufParsingStrategy.Confluent => DeserializeConfluentProtobuf(data, parser),
123+
_ => throw new InvalidOperationException($"Unknown parsing strategy: {parsingStrategy}")
124+
};
112125
}
113-
catch
126+
catch (Exception ex)
114127
{
115-
try
116-
{
117-
// If standard deserialization fails, try message index handling
118-
return DeserializeWithMessageIndex(data, parser);
119-
}
120-
catch (Exception ex)
121-
{
122-
// If both methods fail, throw with helpful message
123-
throw new InvalidOperationException(
124-
$"Failed to deserialize {targetType.Name} using Protobuf. " +
125-
"The data may not be in a valid Protobuf format.", ex);
126-
}
128+
throw new InvalidOperationException(
129+
$"Failed to deserialize {targetType.Name} using Protobuf with {parsingStrategy} strategy. " +
130+
"The data may not be in a valid Protobuf format.", ex);
127131
}
128132
}
129133
else
130134
{
131135
// For non-Protobuf complex types, throw the specific expected exception
132-
throw new InvalidOperationException($"Unsupported type for Protobuf deserialization: {targetType.Name}. " +
133-
"Protobuf deserialization requires a type of com.google.protobuf.Message. " +
134-
"Consider using an alternative Deserializer.");
136+
throw new InvalidOperationException(
137+
$"Unsupported type for Protobuf deserialization: {targetType.Name}. " +
138+
"Protobuf deserialization requires a type of com.google.protobuf.Message. " +
139+
"Consider using an alternative Deserializer.");
135140
}
136141
}
137142
catch (Exception ex)
138143
{
139144
// Preserve the error message while wrapping in SerializationException for consistent error handling
140-
throw new System.Runtime.Serialization.SerializationException($"Failed to deserialize {(isKey ? "key" : "value")} data: {ex.Message}", ex);
145+
throw new System.Runtime.Serialization.SerializationException(
146+
$"Failed to deserialize {(isKey ? "key" : "value")} data: {ex.Message}", ex);
147+
}
148+
}
149+
150+
/// <summary>
151+
/// Determines the parsing strategy based on schema metadata.
152+
/// </summary>
153+
/// <param name="schemaMetadata">The schema metadata to analyze.</param>
154+
/// <returns>The appropriate parsing strategy.</returns>
155+
private ProtobufParsingStrategy DetermineParsingStrategy(SchemaMetadata? schemaMetadata)
156+
{
157+
if (schemaMetadata?.SchemaId == null || string.IsNullOrEmpty(schemaMetadata.SchemaId))
158+
{
159+
return ProtobufParsingStrategy.Pure;
160+
}
161+
162+
var schemaId = schemaMetadata.SchemaId;
163+
164+
// Check for UUID format (longer than 10 characters indicates Glue Schema Registry)
165+
if (schemaId.Length > 10)
166+
{
167+
return ProtobufParsingStrategy.Glue;
168+
}
169+
170+
// Check for Confluent format (numeric schema ID, typically shorter)
171+
if (schemaId.Length <= 10)
172+
{
173+
return ProtobufParsingStrategy.Confluent;
174+
}
175+
176+
// Default to pure protobuf for unknown formats
177+
return ProtobufParsingStrategy.Pure;
178+
}
179+
180+
/// <summary>
181+
/// Deserializes pure Protobuf data without any preprocessing.
182+
/// </summary>
183+
/// <param name="data">The binary data to deserialize.</param>
184+
/// <param name="parser">The Protobuf message parser.</param>
185+
/// <returns>The deserialized Protobuf message.</returns>
186+
private IMessage DeserializePureProtobuf(byte[] data, MessageParser parser)
187+
{
188+
return parser.ParseFrom(data);
189+
}
190+
191+
/// <summary>
192+
/// Deserializes Glue format Protobuf data by reading and skipping the magic uint32.
193+
/// Based on Glue Schema Registry protobuf deserializer implementation.
194+
/// </summary>
195+
/// <param name="data">The binary data to deserialize.</param>
196+
/// <param name="parser">The Protobuf message parser.</param>
197+
/// <returns>The deserialized Protobuf message.</returns>
198+
private IMessage DeserializeGlueProtobuf(byte[] data, MessageParser parser)
199+
{
200+
using var inputStream = new MemoryStream(data);
201+
using var codedInput = new CodedInputStream(inputStream);
202+
203+
try
204+
{
205+
// Seek one byte forward. Based on Glue Proto deserializer implementation
206+
codedInput.ReadUInt32();
207+
208+
// Parse the remaining data as protobuf
209+
return parser.ParseFrom(codedInput);
210+
}
211+
catch (Exception ex)
212+
{
213+
throw new InvalidOperationException("Failed to parse Glue protobuf data after removing magic bytes", ex);
214+
}
215+
}
216+
217+
/// <summary>
218+
/// Deserializes Confluent format Protobuf data by handling message index bytes.
219+
/// Based on the TypeScript reference implementation for Confluent Schema Registry.
220+
/// </summary>
221+
/// <param name="data">The binary data to deserialize.</param>
222+
/// <param name="parser">The Protobuf message parser.</param>
223+
/// <returns>The deserialized Protobuf message.</returns>
224+
private IMessage DeserializeConfluentProtobuf(byte[] data, MessageParser parser)
225+
{
226+
try
227+
{
228+
if (data.Length < 1)
229+
{
230+
throw new InvalidOperationException("Confluent data too short");
231+
}
232+
233+
// Try int32 varint reading first (most common)
234+
try
235+
{
236+
return ClipConfluentSchemaRegistryBuffer(data, parser, useSignedVarint: false);
237+
}
238+
catch (Exception)
239+
{
240+
// If int32 fails, try sint32 varint reading
241+
try
242+
{
243+
return ClipConfluentSchemaRegistryBuffer(data, parser, useSignedVarint: true);
244+
}
245+
catch (Exception ex)
246+
{
247+
throw new InvalidOperationException($"Failed to parse Confluent protobuf data with both int32 and sint32 varint types: {ex.Message}", ex);
248+
}
249+
}
250+
}
251+
catch (Exception ex)
252+
{
253+
// Final fallback to direct parsing
254+
try
255+
{
256+
return parser.ParseFrom(data);
257+
}
258+
catch (Exception directEx)
259+
{
260+
throw new InvalidOperationException(
261+
$"Failed to parse Confluent protobuf data: {ex.Message}. " +
262+
$"Direct parsing also failed: {directEx.Message}. " +
263+
$"Data (hex): {Convert.ToHexString(data.Take(20).ToArray())}", ex);
264+
}
265+
}
266+
}
267+
268+
/// <summary>
269+
/// Clips the Confluent Schema Registry buffer to remove the index bytes.
270+
/// Based on the Java reference implementation logic.
271+
/// Uses signed varint encoding (ZigZag) as per Confluent Schema Registry specification.
272+
/// </summary>
273+
/// <param name="buffer">The buffer to clip.</param>
274+
/// <param name="parser">The Protobuf message parser.</param>
275+
/// <param name="useSignedVarint">Whether to use signed varint (sint32) or unsigned (int32).</param>
276+
/// <returns>The deserialized Protobuf message.</returns>
277+
private IMessage ClipConfluentSchemaRegistryBuffer(byte[] buffer, MessageParser parser, bool useSignedVarint)
278+
{
279+
using var inputStream = new MemoryStream(buffer);
280+
using var codedInput = new CodedInputStream(inputStream);
281+
282+
// Read the first signed varint to get the size (number of message indexes)
283+
// Confluent uses signed varint encoding (ZigZag) for the message index count
284+
var size = codedInput.ReadSInt32();
285+
286+
// Only if the size is greater than zero, continue reading varInt
287+
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
288+
if (size > 0)
289+
{
290+
for (int i = 0; i < size; i++)
291+
{
292+
// Read and discard each message index varint
293+
// These could be either signed or unsigned depending on the schema
294+
if (useSignedVarint)
295+
{
296+
codedInput.ReadSInt32();
297+
}
298+
else
299+
{
300+
codedInput.ReadUInt32();
301+
}
302+
}
141303
}
304+
305+
// Parse the remaining data as protobuf
306+
return parser.ParseFrom(codedInput);
142307
}
143308

144309
/// <summary>
@@ -176,54 +341,23 @@ public PowertoolsKafkaProtobufSerializer(JsonSerializerContext serializerContext
176341
}
177342

178343
/// <summary>
179-
/// Deserializes Protobuf data that may include a Confluent Schema Registry message index.
180-
/// Handles both the simple case (single 0) and complex case (length-prefixed array of indexes).
344+
/// Enum representing different Protobuf parsing strategies.
181345
/// </summary>
182-
/// <param name="data">The binary data to deserialize.</param>
183-
/// <param name="parser">The Protobuf message parser.</param>
184-
/// <returns>The deserialized Protobuf message or throws an exception if parsing fails.</returns>
185-
private IMessage DeserializeWithMessageIndex(byte[] data, MessageParser parser)
346+
private enum ProtobufParsingStrategy
186347
{
187-
using var inputStream = new MemoryStream(data);
188-
using var codedInput = new CodedInputStream(inputStream);
348+
/// <summary>
349+
/// Pure Protobuf deserialization without preprocessing.
350+
/// </summary>
351+
Pure,
189352

190-
try
191-
{
192-
// Read the first varint - this could be either a simple 0 or the length of message index array
193-
var firstValue = codedInput.ReadUInt32();
353+
/// <summary>
354+
/// Glue format - strips the first byte before deserialization.
355+
/// </summary>
356+
Glue,
194357

195-
if (firstValue == 0)
196-
{
197-
// Simple case: Single 0 byte means first message type
198-
return parser.ParseFrom(codedInput);
199-
}
200-
else
201-
{
202-
// Complex case: firstValue is the length of the message index array
203-
// Skip each message index value
204-
for (int i = 0; i < firstValue; i++)
205-
{
206-
codedInput.ReadUInt32();
207-
}
208-
209-
// Now the remaining data should be the actual protobuf message
210-
return parser.ParseFrom(codedInput);
211-
}
212-
}
213-
catch (Exception ex)
214-
{
215-
// If reading message index fails, try another approach with the remaining data
216-
try
217-
{
218-
// Reset stream position and try again with the whole data
219-
inputStream.Position = 0;
220-
return parser.ParseFrom(inputStream);
221-
}
222-
catch
223-
{
224-
// If that also fails, throw the original exception
225-
throw new InvalidOperationException("Failed to parse protobuf data with or without message index", ex);
226-
}
227-
}
358+
/// <summary>
359+
/// Confluent format - strips message field numbers before deserialization.
360+
/// </summary>
361+
Confluent
228362
}
229363
}

0 commit comments

Comments
 (0)