Skip to content

Commit bf8a7a7

Browse files
committed
cleanup
1 parent 572ff0d commit bf8a7a7

File tree

2 files changed

+46
-227
lines changed

2 files changed

+46
-227
lines changed

libraries/src/AWS.Lambda.Powertools.Kafka.Protobuf/PowertoolsKafkaProtobufSerializer.cs

Lines changed: 45 additions & 226 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
using System.Collections.Concurrent;
1716
using System.Diagnostics.CodeAnalysis;
1817
using System.Reflection;
19-
using System.Text;
2018
using System.Text.Json;
2119
using System.Text.Json.Serialization;
2220
using Google.Protobuf;
@@ -49,9 +47,6 @@ namespace AWS.Lambda.Powertools.Kafka.Protobuf;
4947
/// </example>
5048
public class PowertoolsKafkaProtobufSerializer : PowertoolsKafkaSerializerBase
5149
{
52-
// Cache for Protobuf parsers to improve performance
53-
private static readonly ConcurrentDictionary<Type, MessageParser> _parserCache = new();
54-
5550
/// <summary>
5651
/// Initializes a new instance of the <see cref="PowertoolsKafkaProtobufSerializer"/> class
5752
/// with default JSON serialization options.
@@ -82,282 +77,106 @@ public PowertoolsKafkaProtobufSerializer(JsonSerializerContext serializerContext
8277
/// Deserializes complex (non-primitive) types using Protobuf format.
8378
/// Handles different parsing strategies based on schema metadata:
8479
/// - No schema ID: Pure Protobuf deserialization
85-
/// - UUID schema ID (16+ chars): Glue format - strips first byte
86-
/// - 4-character schema ID: Confluent format - strips message field numbers
80+
/// - UUID schema ID (16+ chars): Glue format - removes magic uint32
81+
/// - Short schema ID (≤10 chars): Confluent format - removes message indexes
8782
/// </summary>
88-
/// <param name="data">The binary data to deserialize.</param>
89-
/// <param name="targetType">The type to deserialize to.</param>
90-
/// <param name="isKey">Whether this data represents a key (true) or a value (false).</param>
91-
/// <param name="schemaMetadata">Optional schema metadata for the data.</param>
92-
/// <returns>The deserialized object.</returns>
9383
[RequiresDynamicCode("Protobuf deserialization might require runtime code generation.")]
94-
[RequiresUnreferencedCode(
95-
"Protobuf deserialization might require types that cannot be statically analyzed.")]
84+
[RequiresUnreferencedCode("Protobuf deserialization might require types that cannot be statically analyzed.")]
9685
protected override object? DeserializeComplexTypeFormat(byte[] data,
9786
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicProperties |
9887
DynamicallyAccessedMemberTypes.PublicFields)]
9988
Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
10089
{
101-
try
90+
if (!typeof(IMessage).IsAssignableFrom(targetType))
10291
{
103-
// Check if it's a Protobuf message type
104-
if (typeof(IMessage).IsAssignableFrom(targetType))
105-
{
106-
// This is a Protobuf message type - try to get the parser
107-
var parser = GetProtobufParser(targetType);
108-
if (parser == null)
109-
{
110-
throw new InvalidOperationException($"Could not find Protobuf parser for type {targetType.Name}");
111-
}
92+
throw new InvalidOperationException(
93+
$"Unsupported type for Protobuf deserialization: {targetType.Name}. " +
94+
"Protobuf deserialization requires a type that implements IMessage. " +
95+
"Consider using an alternative Deserializer.");
96+
}
11297

113-
// Determine parsing strategy based on schema metadata
114-
var parsingStrategy = DetermineParsingStrategy(schemaMetadata);
98+
var parser = GetProtobufParser(targetType);
99+
if (parser == null)
100+
{
101+
throw new InvalidOperationException($"Could not find Protobuf parser for type {targetType.Name}");
102+
}
115103

116-
try
117-
{
118-
return parsingStrategy switch
119-
{
120-
ProtobufParsingStrategy.Pure => DeserializePureProtobuf(data, parser),
121-
ProtobufParsingStrategy.Glue => DeserializeGlueProtobuf(data, parser),
122-
ProtobufParsingStrategy.Confluent => DeserializeConfluentProtobuf(data, parser),
123-
_ => throw new InvalidOperationException($"Unknown parsing strategy: {parsingStrategy}")
124-
};
125-
}
126-
catch (Exception ex)
127-
{
128-
throw new InvalidOperationException(
129-
$"Failed to deserialize {targetType.Name} using Protobuf with {parsingStrategy} strategy. " +
130-
"The data may not be in a valid Protobuf format.", ex);
131-
}
132-
}
133-
else
134-
{
135-
// For non-Protobuf complex types, throw the specific expected exception
136-
throw new InvalidOperationException(
137-
$"Unsupported type for Protobuf deserialization: {targetType.Name}. " +
138-
"Protobuf deserialization requires a type of com.google.protobuf.Message. " +
139-
"Consider using an alternative Deserializer.");
140-
}
104+
try
105+
{
106+
return DeserializeByStrategy(data, parser, schemaMetadata);
141107
}
142108
catch (Exception ex)
143109
{
144-
// Preserve the error message while wrapping in SerializationException for consistent error handling
145110
throw new System.Runtime.Serialization.SerializationException(
146111
$"Failed to deserialize {(isKey ? "key" : "value")} data: {ex.Message}", ex);
147112
}
148113
}
149114

150115
/// <summary>
151-
/// Determines the parsing strategy based on schema metadata.
116+
/// Deserializes protobuf data using the appropriate strategy based on schema metadata.
152117
/// </summary>
153-
/// <param name="schemaMetadata">The schema metadata to analyze.</param>
154-
/// <returns>The appropriate parsing strategy.</returns>
155-
private ProtobufParsingStrategy DetermineParsingStrategy(SchemaMetadata? schemaMetadata)
118+
private IMessage DeserializeByStrategy(byte[] data, MessageParser parser, SchemaMetadata? schemaMetadata)
156119
{
157-
if (schemaMetadata?.SchemaId == null || string.IsNullOrEmpty(schemaMetadata.SchemaId))
120+
var schemaId = schemaMetadata?.SchemaId;
121+
122+
if (string.IsNullOrEmpty(schemaId))
158123
{
159-
return ProtobufParsingStrategy.Pure;
124+
// Pure protobuf - no preprocessing needed
125+
return parser.ParseFrom(data);
160126
}
161127

162-
var schemaId = schemaMetadata.SchemaId;
163-
164-
// Check for UUID format (longer than 10 characters indicates Glue Schema Registry)
165128
if (schemaId.Length > 10)
166129
{
167-
return ProtobufParsingStrategy.Glue;
130+
// Glue Schema Registry - remove magic uint32
131+
return DeserializeGlueFormat(data, parser);
168132
}
169133

170-
// Check for Confluent format (numeric schema ID, typically shorter)
171-
if (schemaId.Length <= 10)
172-
{
173-
return ProtobufParsingStrategy.Confluent;
174-
}
175-
176-
// Default to pure protobuf for unknown formats
177-
return ProtobufParsingStrategy.Pure;
134+
// Confluent Schema Registry - remove message indexes
135+
return DeserializeConfluentFormat(data, parser);
178136
}
179137

180138
/// <summary>
181-
/// Deserializes pure Protobuf data without any preprocessing.
139+
/// Deserializes Glue Schema Registry format by removing the magic uint32.
182140
/// </summary>
183-
/// <param name="data">The binary data to deserialize.</param>
184-
/// <param name="parser">The Protobuf message parser.</param>
185-
/// <returns>The deserialized Protobuf message.</returns>
186-
private IMessage DeserializePureProtobuf(byte[] data, MessageParser parser)
187-
{
188-
return parser.ParseFrom(data);
189-
}
190-
191-
/// <summary>
192-
/// Deserializes Glue format Protobuf data by reading and skipping the magic uint32.
193-
/// Based on Glue Schema Registry protobuf deserializer implementation.
194-
/// </summary>
195-
/// <param name="data">The binary data to deserialize.</param>
196-
/// <param name="parser">The Protobuf message parser.</param>
197-
/// <returns>The deserialized Protobuf message.</returns>
198-
private IMessage DeserializeGlueProtobuf(byte[] data, MessageParser parser)
141+
private IMessage DeserializeGlueFormat(byte[] data, MessageParser parser)
199142
{
200143
using var inputStream = new MemoryStream(data);
201144
using var codedInput = new CodedInputStream(inputStream);
202145

203-
try
204-
{
205-
// Seek one byte forward. Based on Glue Proto deserializer implementation
206-
codedInput.ReadUInt32();
207-
208-
// Parse the remaining data as protobuf
209-
return parser.ParseFrom(codedInput);
210-
}
211-
catch (Exception ex)
212-
{
213-
throw new InvalidOperationException("Failed to parse Glue protobuf data after removing magic bytes", ex);
214-
}
215-
}
216-
217-
/// <summary>
218-
/// Deserializes Confluent format Protobuf data by handling message index bytes.
219-
/// Based on the TypeScript reference implementation for Confluent Schema Registry.
220-
/// </summary>
221-
/// <param name="data">The binary data to deserialize.</param>
222-
/// <param name="parser">The Protobuf message parser.</param>
223-
/// <returns>The deserialized Protobuf message.</returns>
224-
private IMessage DeserializeConfluentProtobuf(byte[] data, MessageParser parser)
225-
{
226-
try
227-
{
228-
if (data.Length < 1)
229-
{
230-
throw new InvalidOperationException("Confluent data too short");
231-
}
232-
233-
// Try int32 varint reading first (most common)
234-
try
235-
{
236-
return ClipConfluentSchemaRegistryBuffer(data, parser, useSignedVarint: false);
237-
}
238-
catch (Exception)
239-
{
240-
// If int32 fails, try sint32 varint reading
241-
try
242-
{
243-
return ClipConfluentSchemaRegistryBuffer(data, parser, useSignedVarint: true);
244-
}
245-
catch (Exception ex)
246-
{
247-
throw new InvalidOperationException($"Failed to parse Confluent protobuf data with both int32 and sint32 varint types: {ex.Message}", ex);
248-
}
249-
}
250-
}
251-
catch (Exception ex)
252-
{
253-
// Final fallback to direct parsing
254-
try
255-
{
256-
return parser.ParseFrom(data);
257-
}
258-
catch (Exception directEx)
259-
{
260-
throw new InvalidOperationException(
261-
$"Failed to parse Confluent protobuf data: {ex.Message}. " +
262-
$"Direct parsing also failed: {directEx.Message}. " +
263-
$"Data (hex): {Convert.ToHexString(data.Take(20).ToArray())}", ex);
264-
}
265-
}
146+
codedInput.ReadUInt32(); // Skip magic bytes
147+
return parser.ParseFrom(codedInput);
266148
}
267149

268150
/// <summary>
269-
/// Clips the Confluent Schema Registry buffer to remove the index bytes.
270-
/// Based on the Java reference implementation logic.
271-
/// Uses signed varint encoding (ZigZag) as per Confluent Schema Registry specification.
151+
/// Deserializes Confluent Schema Registry format by removing message indexes.
152+
/// Based on Java reference implementation.
272153
/// </summary>
273-
/// <param name="buffer">The buffer to clip.</param>
274-
/// <param name="parser">The Protobuf message parser.</param>
275-
/// <param name="useSignedVarint">Whether to use signed varint (sint32) or unsigned (int32).</param>
276-
/// <returns>The deserialized Protobuf message.</returns>
277-
private IMessage ClipConfluentSchemaRegistryBuffer(byte[] buffer, MessageParser parser, bool useSignedVarint)
154+
private IMessage DeserializeConfluentFormat(byte[] data, MessageParser parser)
278155
{
279-
using var inputStream = new MemoryStream(buffer);
156+
using var inputStream = new MemoryStream(data);
280157
using var codedInput = new CodedInputStream(inputStream);
281158

282-
// Read the first signed varint to get the size (number of message indexes)
283-
// Confluent uses signed varint encoding (ZigZag) for the message index count
284-
var size = codedInput.ReadSInt32();
159+
// Read number of message indexes
160+
var indexCount = codedInput.ReadSInt32();
285161

286-
// Only if the size is greater than zero, continue reading varInt
287-
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
288-
if (size > 0)
162+
// Skip message indexes if any exist
163+
if (indexCount > 0)
289164
{
290-
for (int i = 0; i < size; i++)
165+
for (int i = 0; i < indexCount; i++)
291166
{
292-
// Read and discard each message index varint
293-
// These could be either signed or unsigned depending on the schema
294-
if (useSignedVarint)
295-
{
296-
codedInput.ReadSInt32();
297-
}
298-
else
299-
{
300-
codedInput.ReadUInt32();
301-
}
167+
codedInput.ReadSInt32(); // Read and discard each index
302168
}
303169
}
304170

305-
// Parse the remaining data as protobuf
306171
return parser.ParseFrom(codedInput);
307172
}
308173

309174
/// <summary>
310-
/// Gets a Protobuf parser for the specified type, using a cache for better performance.
175+
/// Gets the Protobuf parser for the specified type.
311176
/// </summary>
312-
/// <param name="messageType">The Protobuf message type.</param>
313-
/// <returns>A MessageParser for the specified type, or null if not found.</returns>
314177
private MessageParser? GetProtobufParser(Type messageType)
315178
{
316-
return _parserCache.GetOrAdd(messageType, type =>
317-
{
318-
try
319-
{
320-
var parserProperty = type.GetProperty("Parser",
321-
BindingFlags.Public | BindingFlags.Static);
322-
323-
if (parserProperty == null)
324-
{
325-
return null!;
326-
}
327-
328-
var parser = parserProperty.GetValue(null) as MessageParser;
329-
if (parser == null)
330-
{
331-
return null!;
332-
}
333-
334-
return parser;
335-
}
336-
catch
337-
{
338-
return null!;
339-
}
340-
});
341-
}
342-
343-
/// <summary>
344-
/// Enum representing different Protobuf parsing strategies.
345-
/// </summary>
346-
private enum ProtobufParsingStrategy
347-
{
348-
/// <summary>
349-
/// Pure Protobuf deserialization without preprocessing.
350-
/// </summary>
351-
Pure,
352-
353-
/// <summary>
354-
/// Glue format - strips the first byte before deserialization.
355-
/// </summary>
356-
Glue,
357-
358-
/// <summary>
359-
/// Confluent format - strips message field numbers before deserialization.
360-
/// </summary>
361-
Confluent
179+
var parserProperty = messageType.GetProperty("Parser", BindingFlags.Public | BindingFlags.Static);
180+
return parserProperty?.GetValue(null) as MessageParser;
362181
}
363182
}

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Protobuf/PowertoolsKafkaProtobufSerializerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void DeserializeComplexKey_WhenAllDeserializationMethodsFail_ReturnsExcep
143143
var message =
144144
Assert.Throws<SerializationException>(() =>
145145
serializer.Deserialize<ConsumerRecords<TestModel, string>>(stream));
146-
Assert.Contains("Failed to deserialize key data: Failed to deserialize", message.Message);
146+
Assert.Contains("Failed to deserialize key data: Unsupported", message.Message);
147147
}
148148

149149
[Fact]

0 commit comments

Comments
 (0)