Skip to content

Commit c64eccf

Browse files
committed
feat: improve error handling in serializers and add tests for corrupted data
1 parent ae48702 commit c64eccf

File tree

6 files changed

+159
-96
lines changed

6 files changed

+159
-96
lines changed

libraries/src/AWS.Lambda.Powertools.Kafka.Avro/PowertoolsKafkaAvroSerializer.cs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,10 @@ public PowertoolsKafkaAvroSerializer(JsonSerializerContext serializerContext) :
9393
"Consider using an alternative Deserializer.");
9494
}
9595

96-
try
97-
{
98-
using var stream = new MemoryStream(data);
99-
var decoder = new BinaryDecoder(stream);
100-
var reader = new SpecificDatumReader<object>(schema, schema);
101-
return reader.Read(null!, decoder);
102-
}
103-
catch (Exception ex)
104-
{
105-
throw new System.Runtime.Serialization.SerializationException(
106-
$"Failed to deserialize {(isKey ? "key" : "value")} data: {ex.Message}", ex);
107-
}
96+
using var stream = new MemoryStream(data);
97+
var decoder = new BinaryDecoder(stream);
98+
var reader = new SpecificDatumReader<object>(schema, schema);
99+
return reader.Read(null!, decoder);
108100
}
109101

110102
/// <summary>

libraries/src/AWS.Lambda.Powertools.Kafka.Protobuf/PowertoolsKafkaProtobufSerializer.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,7 @@ public PowertoolsKafkaProtobufSerializer(JsonSerializerContext serializerContext
101101
throw new InvalidOperationException($"Could not find Protobuf parser for type {targetType.Name}");
102102
}
103103

104-
try
105-
{
106-
return DeserializeByStrategy(data, parser, schemaMetadata);
107-
}
108-
catch (Exception ex)
109-
{
110-
throw new System.Runtime.Serialization.SerializationException(
111-
$"Failed to deserialize {(isKey ? "key" : "value")} data: {ex.Message}", ex);
112-
}
104+
return DeserializeByStrategy(data, parser, schemaMetadata);
113105
}
114106

115107
/// <summary>
@@ -156,6 +148,15 @@ private IMessage DeserializeConfluentFormat(byte[] data, MessageParser parser)
156148
using var inputStream = new MemoryStream(data);
157149
using var codedInput = new CodedInputStream(inputStream);
158150

151+
/*
152+
ReadSInt32() behavior:
153+
ReadSInt32() properly handles signed varint encoding using ZigZag encoding
154+
ZigZag encoding maps signed integers to unsigned integers: (n << 1) ^ (n >> 31)
155+
This allows both positive and negative numbers to be efficiently encoded
156+
The key insight is that Confluent Schema Registry uses signed varint encoding for the message index count, not unsigned length encoding.
157+
The ByteUtils.readVarint() in Java typically reads signed varints, which corresponds to ReadSInt32() in C# Google.Protobuf.
158+
*/
159+
159160
// Read number of message indexes
160161
var indexCount = codedInput.ReadSInt32();
161162

libraries/src/AWS.Lambda.Powertools.Kafka/PowertoolsKafkaSerializerBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ private void ProcessValue(JsonElement recordElement, object record, Type recordT
328328
}
329329
catch (Exception ex)
330330
{
331-
throw new SerializationException(ex.Message);
331+
throw new SerializationException($"Failed to deserialize value data: {ex.Message}", ex);
332332
}
333333
}
334334
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright JsonCons.Net authors. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
using System.Runtime.Serialization;
17+
using System.Text;
18+
using AWS.Lambda.Powertools.Kafka.Avro;
19+
using AWS.Lambda.Powertools.Kafka.Json;
20+
using AWS.Lambda.Powertools.Kafka.Protobuf;
21+
22+
namespace AWS.Lambda.Powertools.Kafka.Tests;
23+
24+
public class ErrorHandlingTests
25+
{
26+
[Theory]
27+
[InlineData(typeof(PowertoolsKafkaJsonSerializer))]
28+
[InlineData(typeof(PowertoolsKafkaAvroSerializer))]
29+
[InlineData(typeof(PowertoolsKafkaProtobufSerializer))]
30+
public void AllSerializers_WithCorruptedKeyData_ThrowSerializationException(Type serializerType)
31+
{
32+
// Arrange
33+
var serializer = (PowertoolsKafkaSerializerBase)Activator.CreateInstance(serializerType)!;
34+
var corruptedData = new byte[] { 0xDE, 0xAD, 0xBE, 0xEF };
35+
36+
string kafkaEventJson = CreateKafkaEvent(
37+
Convert.ToBase64String(corruptedData),
38+
Convert.ToBase64String(Encoding.UTF8.GetBytes("valid-value"))
39+
);
40+
41+
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
42+
43+
// Act & Assert
44+
var ex = Assert.Throws<SerializationException>(() =>
45+
serializer.Deserialize<ConsumerRecords<TestModel, string>>(stream));
46+
47+
Assert.Contains("Failed to deserialize key data", ex.Message);
48+
}
49+
50+
[Theory]
51+
[InlineData(typeof(PowertoolsKafkaJsonSerializer))]
52+
[InlineData(typeof(PowertoolsKafkaAvroSerializer))]
53+
[InlineData(typeof(PowertoolsKafkaProtobufSerializer))]
54+
public void AllSerializers_WithCorruptedValueData_ThrowSerializationException(Type serializerType)
55+
{
56+
// Arrange
57+
var serializer = (PowertoolsKafkaSerializerBase)Activator.CreateInstance(serializerType)!;
58+
var corruptedData = new byte[] { 0xDE, 0xAD, 0xBE, 0xEF };
59+
60+
string kafkaEventJson = CreateKafkaEvent(
61+
Convert.ToBase64String(Encoding.UTF8.GetBytes("valid-key")),
62+
Convert.ToBase64String(corruptedData)
63+
);
64+
65+
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
66+
67+
// Act & Assert
68+
var ex = Assert.Throws<SerializationException>(() =>
69+
serializer.Deserialize<ConsumerRecords<string, TestModel>>(stream));
70+
71+
Assert.Contains("Failed to deserialize value data", ex.Message);
72+
}
73+
74+
private string CreateKafkaEvent(string keyValue, string valueValue)
75+
{
76+
return @$"{{
77+
""eventSource"": ""aws:kafka"",
78+
""records"": {{
79+
""mytopic-0"": [
80+
{{
81+
""topic"": ""mytopic"",
82+
""partition"": 0,
83+
""offset"": 15,
84+
""key"": ""{keyValue}"",
85+
""value"": ""{valueValue}""
86+
}}
87+
]
88+
}}
89+
}}";
90+
}
91+
}

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/Json/PowertoolsKafkaJsonSerializerTests.cs

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16+
using System.Runtime.Serialization;
1617
using System.Text;
1718
using System.Text.Json;
1819
using System.Text.Json.Serialization;
@@ -27,47 +28,22 @@ public void Deserialize_KafkaEventWithJsonPayload_DeserializesToCorrectType()
2728
{
2829
// Arrange
2930
var serializer = new PowertoolsKafkaJsonSerializer();
30-
string kafkaEventJson = File.ReadAllText("Json/kafka-json-event.json");
31+
var testModel = new TestModel { Name = "Test Product", Value = 123 };
32+
var jsonValue = JsonSerializer.Serialize(testModel);
33+
var base64Value = Convert.ToBase64String(Encoding.UTF8.GetBytes(jsonValue));
34+
35+
string kafkaEventJson = CreateKafkaEvent("NDI=", base64Value); // Key is 42 in base64
3136
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(kafkaEventJson));
3237

3338
// Act
34-
var result = serializer.Deserialize<ConsumerRecords<string, JsonProduct>>(stream);
39+
var result = serializer.Deserialize<ConsumerRecords<int, TestModel>>(stream);
3540

3641
// Assert
3742
Assert.NotNull(result);
38-
Assert.Equal("aws:kafka", result.EventSource);
39-
40-
// Verify records were deserialized
41-
Assert.True(result.Records.ContainsKey("mytopic-0"));
42-
var records = result.Records["mytopic-0"];
43-
Assert.Equal(3, records.Count);
44-
45-
// Verify first record's content
46-
var firstRecord = records[0];
47-
Assert.Equal("mytopic", firstRecord.Topic);
48-
Assert.Equal(0, firstRecord.Partition);
49-
Assert.Equal(15, firstRecord.Offset);
50-
Assert.Equal("recordKey", firstRecord.Key);
51-
52-
// Verify deserialized JSON value
53-
var product = firstRecord.Value;
54-
Assert.Equal("product5", product.Name);
55-
Assert.Equal(12345, product.Id);
56-
Assert.Equal(45, product.Price);
57-
58-
// Verify second record
59-
var secondRecord = records[1];
60-
var p2 = secondRecord.Value;
61-
Assert.Equal("product5", p2.Name);
62-
Assert.Equal(12345, p2.Id);
63-
Assert.Equal(45, p2.Price);
64-
65-
// Verify third record
66-
var thirdRecord = records[2];
67-
var p3 = thirdRecord.Value;
68-
Assert.Equal("product5", p3.Name);
69-
Assert.Equal(12345, p3.Id);
70-
Assert.Equal(45, p3.Price);
43+
var record = result.First();
44+
Assert.Equal(42, record.Key);
45+
Assert.Equal("Test Product", record.Value.Name);
46+
Assert.Equal(123, record.Value.Value);
7147
}
7248

7349
[Fact]

libraries/tests/AWS.Lambda.Powertools.Kafka.Tests/PowertoolsKafkaSerializerBaseTests.cs

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -46,55 +46,51 @@ public TestKafkaSerializer(JsonSerializerOptions options, JsonSerializerContext
4646
: base(options, context)
4747
{
4848
}
49-
49+
5050
// Implementation of the abstract method for test purposes
51-
protected override object? DeserializeComplexTypeFormat(byte[] data,
51+
protected override object? DeserializeComplexTypeFormat(byte[] data,
5252
Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
5353
{
54-
try
54+
// Test implementation using JSON for all complex types
55+
var jsonStr = Encoding.UTF8.GetString(data);
56+
57+
if (SerializerContext != null)
5558
{
56-
// Test implementation using JSON for all complex types
57-
var jsonStr = Encoding.UTF8.GetString(data);
58-
59-
if (SerializerContext != null)
59+
var typeInfo = SerializerContext.GetTypeInfo(targetType);
60+
if (typeInfo != null)
6061
{
61-
var typeInfo = SerializerContext.GetTypeInfo(targetType);
62-
if (typeInfo != null)
63-
{
64-
return JsonSerializer.Deserialize(jsonStr, typeInfo);
65-
}
62+
return JsonSerializer.Deserialize(jsonStr, typeInfo);
6663
}
67-
68-
return JsonSerializer.Deserialize(jsonStr, targetType, JsonOptions);
69-
}
70-
catch
71-
{
72-
return null;
7364
}
65+
66+
return JsonSerializer.Deserialize(jsonStr, targetType, JsonOptions);
7467
}
75-
68+
7669
// Expose protected methods for direct testing
77-
public object? TestDeserializeFormatSpecific(byte[] data, Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
70+
public object? TestDeserializeFormatSpecific(byte[] data, Type targetType, bool isKey,
71+
SchemaMetadata? schemaMetadata = null)
7872
{
7973
return DeserializeFormatSpecific(data, targetType, isKey, schemaMetadata);
8074
}
81-
82-
public object? TestDeserializeComplexTypeFormat(byte[] data, Type targetType, bool isKey, SchemaMetadata? schemaMetadata = null)
75+
76+
public object? TestDeserializeComplexTypeFormat(byte[] data, Type targetType, bool isKey,
77+
SchemaMetadata? schemaMetadata = null)
8378
{
8479
return DeserializeComplexTypeFormat(data, targetType, isKey, schemaMetadata);
8580
}
86-
81+
8782
public object? TestDeserializePrimitiveValue(byte[] data, Type targetType)
8883
{
8984
return DeserializePrimitiveValue(data, targetType);
9085
}
91-
86+
9287
public bool TestIsPrimitiveOrSimpleType(Type type)
9388
{
9489
return IsPrimitiveOrSimpleType(type);
9590
}
96-
97-
public object TestDeserializeValue(string base64Value, Type valueType, SchemaMetadata? schemaMetadata = null)
91+
92+
public object TestDeserializeValue(string base64Value, Type valueType,
93+
SchemaMetadata? schemaMetadata = null)
9894
{
9995
return DeserializeValue(base64Value, valueType, schemaMetadata);
10096
}
@@ -610,7 +606,9 @@ public void DeserializeFormatSpecific_PrimitiveType_UsesDeserializePrimitiveValu
610606
var stringBytes = Encoding.UTF8.GetBytes("primitive-test");
611607

612608
// Act
613-
var result = serializer.TestDeserializeFormatSpecific(stringBytes, typeof(string), isKey: false, schemaMetadata: null);
609+
var result =
610+
serializer.TestDeserializeFormatSpecific(stringBytes, typeof(string), isKey: false,
611+
schemaMetadata: null);
614612

615613
// Assert
616614
Assert.Equal("primitive-test", result);
@@ -625,7 +623,9 @@ public void DeserializeFormatSpecific_ComplexType_UsesDeserializeComplexTypeForm
625623
var jsonBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(complexObject));
626624

627625
// Act
628-
var result = serializer.TestDeserializeFormatSpecific(jsonBytes, typeof(TestModel), isKey: false, schemaMetadata: null);
626+
var result =
627+
serializer.TestDeserializeFormatSpecific(jsonBytes, typeof(TestModel), isKey: false,
628+
schemaMetadata: null);
629629

630630
// Assert
631631
Assert.NotNull(result);
@@ -643,7 +643,9 @@ public void DeserializeComplexTypeFormat_ValidJson_DeserializesCorrectly()
643643
var jsonBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(complexObject));
644644

645645
// Act
646-
var result = serializer.TestDeserializeComplexTypeFormat(jsonBytes, typeof(TestModel), isKey: true, schemaMetadata: null);
646+
var result =
647+
serializer.TestDeserializeComplexTypeFormat(jsonBytes, typeof(TestModel), isKey: true,
648+
schemaMetadata: null);
647649

648650
// Assert
649651
Assert.NotNull(result);
@@ -653,17 +655,19 @@ public void DeserializeComplexTypeFormat_ValidJson_DeserializesCorrectly()
653655
}
654656

655657
[Fact]
656-
public void DeserializeComplexTypeFormat_InvalidJson_ReturnsNull()
658+
public void DeserializeComplexTypeFormat_InvalidJson_ThrowsException()
657659
{
658660
// Arrange
659661
var serializer = new TestKafkaSerializer();
660662
var invalidBytes = new byte[] { 0xDE, 0xAD, 0xBE, 0xEF }; // Invalid JSON data
661663

662-
// Act
663-
var result = serializer.TestDeserializeComplexTypeFormat(invalidBytes, typeof(TestModel), isKey: true, schemaMetadata: null);
664+
// Act & Assert
665+
// The TestKafkaSerializer throws JsonException directly for invalid JSON
666+
var ex = Assert.Throws<JsonException>(() =>
667+
serializer.TestDeserializeComplexTypeFormat(invalidBytes, typeof(TestModel), isKey: true,
668+
schemaMetadata: null));
664669

665-
// Assert
666-
Assert.Null(result);
670+
Assert.Contains("invalid", ex.Message.ToLower());
667671
}
668672

669673
[Fact]
@@ -702,18 +706,18 @@ public void IsPrimitiveOrSimpleType_ChecksVariousTypes()
702706
{
703707
// Arrange
704708
var serializer = new TestKafkaSerializer();
705-
709+
706710
// Act & Assert
707711
// Primitive types
708712
Assert.True(serializer.TestIsPrimitiveOrSimpleType(typeof(int)));
709713
Assert.True(serializer.TestIsPrimitiveOrSimpleType(typeof(long)));
710714
Assert.True(serializer.TestIsPrimitiveOrSimpleType(typeof(bool)));
711-
715+
712716
// Simple types
713717
Assert.True(serializer.TestIsPrimitiveOrSimpleType(typeof(string)));
714718
Assert.True(serializer.TestIsPrimitiveOrSimpleType(typeof(Guid)));
715719
Assert.True(serializer.TestIsPrimitiveOrSimpleType(typeof(DateTime)));
716-
720+
717721
// Complex types
718722
Assert.False(serializer.TestIsPrimitiveOrSimpleType(typeof(TestModel)));
719723
Assert.False(serializer.TestIsPrimitiveOrSimpleType(typeof(Dictionary<string, int>)));
@@ -758,5 +762,4 @@ public class TestModel
758762
public string Name { get; set; }
759763
public int Value { get; set; }
760764
}
761-
}
762-
765+
}

0 commit comments

Comments
 (0)