【发布时间】:2026-01-06 15:30:01
【问题描述】:
我一直在尝试使用 Spring Cloud Stream 的动态目的地功能以 Avro 格式发布消息。但是,由于我使用的是本机编码(Confluent Avro 序列化程序),消息转换器无法处理这种情况。显然,当我使用静态目标时,我能够通过在“绑定”级别使用use-native-encoding: true 参数来管理本机编码。但是,有了动态目的地,我似乎没有这样的能力。
private boolean publishMessage(byte[] record, String target, String contentType, Schema schema) {
return resolver.resolveDestination(target)
.send(MessageBuilder
.createMessage(record, new MessageHeaders(
Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
如果我使用以下方法,内容类型为“application/*+avro”,记录为字节[]格式,则会引发以下异常:
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5c778504]; nested exception is org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: \"bytes\"
如果您错过本机编码属性,通常会发生此异常。
如果我在使用以下方法发布消息之前尝试将字节数组反序列化为通用记录,则无法为其找到合适的消息转换器。
public static GenericRecord bytesToGenericAvro(byte[] bytes, Schema schema) {
DatumReader<GenericRecord>
datumReader = new GenericDatumReader<>(schema);
GenericRecord record = null;
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
bais.reset();
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(bais, null);
try {
record = datumReader.read(null, binaryDecoder);
} catch (IOException e) {
log.error("Unable to deserialize byte array to avro generic record", e.getMessage());
} finally {
try {
bais.close();
} catch (IOException e) {
log.warn("Unable to close ByteArrayInputStream", e.getMessage());
}
}
return record;
}
更新: 添加这个bean后仍然面临同样的问题。 Spring Cloud Stream 尝试将消息转换为 Avro 时引发异常!
@Bean
public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
producerProperties.setUseNativeEncoding(true);
producerProperties.setErrorChannelEnabled(true);
producerProperties.setPartitionCount(3);
});
}
例外:
failed to send Message to channel 'output1'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={...}, headers={contentType=application/*+avro, id=c22bf171-c6ae-cedb-b0be-3aa0fcbdf762, timestamp=1567053746112}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:388)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:422)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.example.controller.PublisherController.publishMessage(PublisherController.java:90)
at com.example.controller.PublisherController.replayRecord(PublisherController.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:853)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1587)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
【问题讨论】:
标签: avro spring-kafka spring-cloud-stream