【问题标题】:Spring Cloud Stream Kafka Consumer application doesn't allow adding SupplierSpring Cloud Stream Kafka Consumer 应用程序不允许添加供应商
【发布时间】:2020-07-29 20:14:32
【问题描述】:

我正在开发 Spring Cloud Stream Kafka 应用程序。我只添加了消费者来使用来自主题的消息并使用 FIX 协议将它们传递给第三方。

到目前为止它工作正常,但现在第三方发回响应,我想将它们生成一个新主题。当我在现有代码中添加供应商时,它开始表现得很奇怪。 bootstrap.servers 配置从 remoteHost 代理更改为 localhost 并开始给出以下错误:

[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.

如果尝试连接本地主机会出现错误,因为没有任何 Kafka 设置。

下面是我的 application.yml 文件:

spring.cloud.stream.function.definition: amerData;emeaData;ackResponse  #added new ackResponse here
spring.cloud.stream.kafka.streams:
  binder:
    brokers: remoteHost:9092
    configuration:
      schema.registry.url: remoteHost:8081
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
  bindings:
    ackResponse-out-0:           #new addition
      producer.configuration:
        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

spring.cloud.stream.bindings:
  amerData-in-0:
    destination: topic1
  emeaData-in-0:
    destination: topic2
  ackResponse-out-0:       #new addition
    destination: topic3

并尝试了供应商的可能选项 -> Supplier<String> ackResponse()Supplier<Message<String>> ackResponse() 它只是在我做Supplier<KStream<String,String>> ackResponse() 时不会将remoteHost 更改为localhost,然后bootstrap.servers 会显示配置的远程主机,但这不正确,我无法编写收到的响应(主要是字符串或json)这是一个 Kafka 主题。

我确实根据需要将我的消费者配置为 Consumer<KStream<String, AVROPOJO1>> amerData()Consumer<KStream<String, AVROPOJO2>> emeaData(),它们工作正常。

我是否遗漏或搞砸了什么?我们不能在同一个 Spring Cloud Stream 应用程序中同时拥有生产者/消费者吗?使用Streambridge 也无法解决这个问题。有人可以帮忙吗?

【问题讨论】:

    标签: avro apache-kafka-streams spring-cloud-stream kafka-producer-api confluent-schema-registry


    【解决方案1】:

    如果您像之前那样添加Supplier bean,它将成为使用基于MessageChannel 的Kafka binder 的常规生产者。您需要在项目中添加常规的 Kafka 活页夹 (spring-cloud-stream-binder-kafka)。它的绑定应该在spring.cloud.stream.kafka.bindings 下。我看到您在上面的spring.cloud.stream.kafka.streams.bindings 下定义了它。不知道是不是这个问题?

    【讨论】:

    • 谢谢@sobychacko!在 spring.cloud.stream.kafka.binder 下配置 broker,然后在 spring.cloud.stream.kafka.bindings 下输出绑定解决了这个问题。
    猜你喜欢
    • 2020-10-21
    • 1970-01-01
    • 2019-10-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-06
    相关资源
    最近更新 更多