【问题标题】:Issue with Spring Cloud Stream using the Default Schema Registry client instead of the Avro Schema Registry clientSpring Cloud Stream 使用 Default Schema Registry 客户端而不是 Avro Schema Registry 客户端的问题
【发布时间】:2019-04-09 04:07:41
【问题描述】:

我们正在使用带有 Spring Cloud Stream 的 Kafka,我们需要在 Spring Boot 组件中连接到 Confluent Schema Registry,请参阅https://github.com/donalthurley/KafkaConsumeScsAndConfluent

我们添加了以下配置来创建所需的 ConfluentSchemaRegistryClient bean,参见 https://github.com/donalthurley/KafkaConsumeScsAndConfluent/blob/master/src/main/java/com/example/kafka/KafkaConfig.java,它应该覆盖 Spring Cloud Stream 中的默认模式注册表。

但是,我们在一些部署后间歇性地看到以下故障。

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel

根本原因显示此堆栈跟踪

Caused by: java.lang.NullPointerException
    at     org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:71)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:238)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:179)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)
    at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:322)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:611)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)

AvroSchemaRegistryClientMessageConverter 正在调用 DefaultSchemaRegistryClient 这一事实向我们表明,我们的 ConfluentSchemaRegistryClient bean 的接线存在问题。

我们的配置中是否还需要其他东西来确保正确连接 ConfluentSchemaRegistryClient bean?

【问题讨论】:

  • 如果您发布一个项目(在 Github 上或其他地方),并且只需最少的配置即可重现该问题,这将有所帮助。很难推测可能出了什么问题,尽管 NPE 在我们这边肯定闻起来像一个错误。
  • 这是一个工作示例,展示了 Spring Cloud Stream 应用程序如何连接到 Confluent 模式注册表。您可以比较笔记并查看配置中缺少的内容吗? github.com/spring-cloud/spring-cloud-stream-samples/tree/master/…

标签: java spring apache-kafka spring-cloud-stream confluent-schema-registry


【解决方案1】:

我现在已将 @EnableSchemaRegistryClient 注释移动到项目应用程序类(参见https://github.com/donalthurley/KafkaConsumeScsAndConfluent/commit/b4cf5427d7ab0a4fed619fe54b042890f5ccb594)并重新部署,这解决了我在我们的环境中部署它时遇到的问题。

我一直在使用 @EnableSchemaRegistryClient 注解来注解我的生产者和消费者类。

在我所有的本地测试中,这一直针对我的本地 docker 融合模式注册表。 然而,在部署到我们的环境时,它大部分时间都在工作,但在一些部署后偶尔会失败。

我没有成功在本地复制这个。

我还在本地测试中注意到,如果我删除 Confluent Schema Registry 的配置,我会得到相同的空指针异常堆栈跟踪。

所以我想我看到的问题是,当项目应用程序类中不存在 @EnableSchemaRegistryClient 注释时,AvroSchemaRegistryClientMessageConverter bean 没有与融合模式注册表 bean 连接。

我不明白为什么需要这样做,但我认为它可能已经解决了问题。

【讨论】:

    【解决方案2】:

    它对我有用。这就是我所做的:

    • 完全像你一样使用配置类
    • 在项目中使用了@EnableSchemaRegistryClient注解
    • 将 avro 序列化程序添加到类路径:io.confluent:kafka-avro-serializer
    • 如下设置属性:

      spring.cloud.stream.kafka.bindings.channel.consumer.configuration.schema.registry.url=你的注册地址 spring.cloud.stream.kafka.bindings.channel.consumer.configuration.specific.avro.reader=true

    其中频道对应于您应用中的频道名称。

    我认为最后一个属性对于告诉 Spring 使用 Avro 序列化程序而不是默认序列化程序非常重要。

    我使用的是 Spring Cloud Stream Elmhurst.RELEASE,因此如果您使用其他版本,属性名称可能会略有不同。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-06-13
      • 2018-09-27
      • 2018-01-31
      • 2018-05-17
      • 2017-10-06
      • 2021-03-08
      • 2019-12-09
      相关资源
      最近更新 更多