-
Notifications
You must be signed in to change notification settings - Fork 300
Cannot convert from [[B] to [com.example.DummyMessage] for GenericMessage #1147
Description
Describe the issue
After upgrade from spring-cloud-starter-parent from Hoxton.SR1 to Hoxton.SR2 we received following error when consuming kafka messages avro encoded from legacy microservices used springboot 1.5.x:
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.DummyMessage] for GenericMessage [payload=byte[49], headers={kafka_timestampType=CREATE_TIME, kafka_receivedTopic=Topic_DummyMessages, originalContentType=application/vnd.dummymessage.v1+avro, spanTraceId=b12de318aedd5501, spanId=b12de318aedd5501, nativeHeaders={X-B3-TraceId=[b12de318aedd5501], spanTraceId=[b12de318aedd5501], X-B3-SpanId=[b12de318aedd5501], spanId=[b12de318aedd5501], X-B3-Sampled=[0], spanSampled=[0]}, kafka_offset=11, X-B3-SpanId=b12de318aedd5501, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@398b3b8d, X-B3-Sampled=0, X-B3-TraceId=b12de318aedd5501, id=0b716343-7904-258b-d4b6-12263b6a5216, kafka_receivedPartitionId=0, spanSampled=0, contentType=application/octet-stream, kafka_receivedTimestamp=1629983108296, kafka_groupId=DummyService, timestamp=1630042306997}]
at org.springframework.cloud.stream.config.SmartPayloadArgumentResolver.resolveArgument(SmartPayloadArgumentResolver.java:123) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1696) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1679) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1634) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
The message paylods on kafka had wrapped avro contentType withing application/octet-stream:
contentType="application/octet-stream" originalContentType="application/vnd.dummymessage.v1+avro"
Both producer and consumer service has set embeddedHeaders as headerMode.
To Reproduce
Steps to reproduce the behavior:
- Send avro kafka message from legacy SB 1.5.x microservices
- Kafka message exists on topic
- Start consuming SB microsevice with Hoxton.SR2
- Consumer received error mentioned above
Version of the framework
spring-cloud-starter-parent:Hoxton.SR2
spring-boot-starter-parent: 2.2.4.RELEASE
spring-cloud-stream-binder-kafka: 3.0.2.RELEASE
Expected behavior
Kafka message should be successfully processed as for following configuration:
spring-cloud-starter-parent:Hoxton.SR1
spring-boot-starter-parent: 2.2.2.RELEASE
spring-cloud-stream-binder-kafka: 3.0.1.RELEASE
Can somebody tell me what change there? Does exist some configuration for backward compatibility?
thanks
brmetalpalo