【问题标题】:Spring Cloud Stream dynamic destinations Avro with the native encoding is not working具有本机编码的 Spring Cloud Stream 动态目的地 Avro 不起作用
【发布时间】:2026-01-06 15:30:01
【问题描述】:

我一直在尝试使用 Spring Cloud Stream 的动态目的地功能以 Avro 格式发布消息。但是,由于我使用的是本机编码(Confluent Avro 序列化程序),消息转换器无法处理这种情况。显然,当我使用静态目标时,我能够通过在“绑定”级别使用use-native-encoding: true 参数来管理本机编码。但是,有了动态目的地,我似乎没有这样的能力。

private boolean publishMessage(byte[] record, String target, String contentType, Schema schema) {
    return resolver.resolveDestination(target)
        .send(MessageBuilder
            .createMessage(record, new MessageHeaders(
                Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
  }

如果我使用以下方法,内容类型为“application/*+avro”,记录为字节[]格式,则会引发以下异常:

error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5c778504]; nested exception is org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: \"bytes\"

如果您错过本机编码属性,通常会发生此异常。

如果我在使用以下方法发布消息之前尝试将字节数组反序列化为通用记录,则无法为其找到合适的消息转换器。

public static GenericRecord bytesToGenericAvro(byte[] bytes, Schema schema) {
    DatumReader<GenericRecord>
        datumReader = new GenericDatumReader<>(schema);

    GenericRecord record = null;
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    bais.reset();

    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(bais, null);
    try {
      record = datumReader.read(null, binaryDecoder);
    } catch (IOException e) {
      log.error("Unable to deserialize byte array to avro generic record", e.getMessage());
    } finally {
      try {
        bais.close();
      } catch (IOException e) {
        log.warn("Unable to close ByteArrayInputStream", e.getMessage());
      }
    }
    return record;
  }

更新: 添加这个bean后仍然面临同样的问题。 Spring Cloud Stream 尝试将消息转换为 Avro 时引发异常!

  @Bean
  public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
    return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
      producerProperties.setUseNativeEncoding(true);
      producerProperties.setErrorChannelEnabled(true);
      producerProperties.setPartitionCount(3);
    });
  }

例外:

failed to send Message to channel 'output1'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={...}, headers={contentType=application/*+avro, id=c22bf171-c6ae-cedb-b0be-3aa0fcbdf762, timestamp=1567053746112}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:388)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:422)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at com.example.controller.PublisherController.publishMessage(PublisherController.java:90)
    at com.example.controller.PublisherController.replayRecord(PublisherController.java:72)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:853)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1587)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

    标签: avro spring-kafka spring-cloud-stream


    【解决方案1】:

    您可以修改动态绑定的绑定属性,添加NewDestinationBindingCallback bean 并将其传递给解析器。见the documentation

    如果事先知道通道名称,您可以像配置任何其他目标一样配置生产者属性。或者,如果您注册了一个NewDestinationBindingCallback&lt;&gt; bean,它会在绑定创建之前被调用。回调采用绑定器使用的扩展生产者属性的通用类型。它有一种方法:

    void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
            T extendedProducerProperties);
    

    以下示例展示了如何使用 RabbitMQ binder:

    @Bean
    public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
        return (name, channel, props, extended) -> {
            props.setRequiredGroups("bindThisQueue");
            extended.setQueueNameGroupOnly(true);
            extended.setAutoBindDlq(true);
            extended.setDeadLetterQueueName("myDLQ");
        };
    }
    

    如果您需要支持具有多种绑定器类型的动态目标,请使用 Object 作为泛型类型并根据需要强制转换扩展参数。

    编辑

    这是解析器中的一个错误;在创建和配置通道之前,不会调用回调来更新属性。它适用于大多数属性,但不是这个。

    这是一个解决方法:

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So57688303Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So57688303Application.class, args);
        }
    
        @Bean
        public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
            return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
                producerProperties.setUseNativeEncoding(true);
                producerProperties.setErrorChannelEnabled(true);
                producerProperties.setPartitionCount(3);
                extendedProducerProperties.getConfiguration().put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                        MySerializer.class.getName());
            });
        }
    
    
        @Bean
        public ApplicationRunner runner(BinderAwareChannelResolver resolver) {
            return args -> {
                MessageChannel channel = resolver.resolveDestination("dynamic");
                ((AbstractMessageChannel) channel).removeInterceptor(0); // only need to do this on the first resolution
                channel.send(new GenericMessage<>("foo"));
            };
        }
    
        public static class MySerializer implements Serializer<String> {
    
            @Override
            public void configure(Map<String, ?> configs, boolean isKey) {
            }
    
            @Override
            public byte[] serialize(String topic, String data) {
                System.out.println("In my serializer with data of type " + data.getClass().getSimpleName());
                return data.getBytes();
            }
    
            @Override
            public void close() {
            }
    
        }
    
     }
    

    In my serializer with data of type String
    

    【讨论】:

    • 感谢您的帮助。虽然我可以在拦截绑定时看到这种方法,属性设置正确(本机编码),但它仍然对我不起作用,因为转换器无法正确转换此消息。仍然由于某种原因,未触发 Confluent Avro 序列化程序。如果我删除“application/*+avro”的标头,它会尝试将其写入 JSON,而不是让 Confluent Avro Serializer 完成工作。
    • 使用本机编码时会绕过消息转换器。
    • 所以我猜有些东西没有正确设置,因为仍然涉及转换器。
    • 解析器在调用回调之前创建了通道,这意味着它错误地添加了转换拦截器。所以解决方法是在发送到新解析的频道之前将其删除;如果在创建通道之前设置属性,您最终会发生什么。 I opened this issue this morning.
    • 可能会添加其他拦截器 - 例如this one in the next release。为了绝对安全,您可以检查已删除拦截器的类名(由remove() 返回)。如果它不以MessageConverterConfigurer 开头,请将其添加回来(在同一索引处)并尝试remove(1) 等。
    最近更新 更多