Skip to content
This repository was archived by the owner on Nov 20, 2024. It is now read-only.
This repository was archived by the owner on Nov 20, 2024. It is now read-only.

Cannot convert from [[B] to [com.example.DummyMessage] for GenericMessage #1147

@metalpalo

Description

@metalpalo

Describe the issue
After upgrade from spring-cloud-starter-parent from Hoxton.SR1 to Hoxton.SR2 we received following error when consuming kafka messages avro encoded from legacy microservices used springboot 1.5.x:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.DummyMessage] for GenericMessage [payload=byte[49], headers={kafka_timestampType=CREATE_TIME, kafka_receivedTopic=Topic_DummyMessages, originalContentType=application/vnd.dummymessage.v1+avro, spanTraceId=b12de318aedd5501, spanId=b12de318aedd5501, nativeHeaders={X-B3-TraceId=[b12de318aedd5501], spanTraceId=[b12de318aedd5501], X-B3-SpanId=[b12de318aedd5501], spanId=[b12de318aedd5501], X-B3-Sampled=[0], spanSampled=[0]}, kafka_offset=11, X-B3-SpanId=b12de318aedd5501, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@398b3b8d, X-B3-Sampled=0, X-B3-TraceId=b12de318aedd5501, id=0b716343-7904-258b-d4b6-12263b6a5216, kafka_receivedPartitionId=0, spanSampled=0, contentType=application/octet-stream, kafka_receivedTimestamp=1629983108296, kafka_groupId=DummyService, timestamp=1630042306997}]
	at org.springframework.cloud.stream.config.SmartPayloadArgumentResolver.resolveArgument(SmartPayloadArgumentResolver.java:123) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1696) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1679) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1634) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

The message paylods on kafka had wrapped avro contentType withing application/octet-stream:
contentType="application/octet-stream" originalContentType="application/vnd.dummymessage.v1+avro"

Both producer and consumer service has set embeddedHeaders as headerMode.

To Reproduce
Steps to reproduce the behavior:

  1. Send avro kafka message from legacy SB 1.5.x microservices
  2. Kafka message exists on topic
  3. Start consuming SB microsevice with Hoxton.SR2
  4. Consumer received error mentioned above

Version of the framework
spring-cloud-starter-parent:Hoxton.SR2
spring-boot-starter-parent: 2.2.4.RELEASE
spring-cloud-stream-binder-kafka: 3.0.2.RELEASE

Expected behavior
Kafka message should be successfully processed as for following configuration:
spring-cloud-starter-parent:Hoxton.SR1
spring-boot-starter-parent: 2.2.2.RELEASE
spring-cloud-stream-binder-kafka: 3.0.1.RELEASE

Can somebody tell me what change there? Does exist some configuration for backward compatibility?
thanks
brmetalpalo

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions