【发布时间】:2019-04-09 04:07:41
【问题描述】:
我们正在使用带有 Spring Cloud Stream 的 Kafka,我们需要在 Spring Boot 组件中连接到 Confluent Schema Registry,请参阅https://github.com/donalthurley/KafkaConsumeScsAndConfluent。
我们添加了以下配置来创建所需的 ConfluentSchemaRegistryClient bean,参见 https://github.com/donalthurley/KafkaConsumeScsAndConfluent/blob/master/src/main/java/com/example/kafka/KafkaConfig.java,它应该覆盖 Spring Cloud Stream 中的默认模式注册表。
但是,我们在一些部署后间歇性地看到以下故障。
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel
根本原因显示此堆栈跟踪
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:71)
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:238)
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:179)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)
at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:322)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:611)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
AvroSchemaRegistryClientMessageConverter 正在调用 DefaultSchemaRegistryClient 这一事实向我们表明,我们的 ConfluentSchemaRegistryClient bean 的接线存在问题。
我们的配置中是否还需要其他东西来确保正确连接 ConfluentSchemaRegistryClient bean?
【问题讨论】:
-
如果您发布一个项目(在 Github 上或其他地方),并且只需最少的配置即可重现该问题,这将有所帮助。很难推测可能出了什么问题,尽管 NPE 在我们这边肯定闻起来像一个错误。
-
这是一个工作示例,展示了 Spring Cloud Stream 应用程序如何连接到 Confluent 模式注册表。您可以比较笔记并查看配置中缺少的内容吗? github.com/spring-cloud/spring-cloud-stream-samples/tree/master/…
标签: java spring apache-kafka spring-cloud-stream confluent-schema-registry