【发布时间】:2020-07-29 20:14:32
【问题描述】:
我正在开发 Spring Cloud Stream Kafka 应用程序。我只添加了消费者来使用来自主题的消息并使用 FIX 协议将它们传递给第三方。
到目前为止它工作正常,但现在第三方发回响应,我想将它们生成一个新主题。当我在现有代码中添加供应商时,它开始表现得很奇怪。 bootstrap.servers 配置从 remoteHost 代理更改为 localhost 并开始给出以下错误:
[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.
如果尝试连接本地主机会出现错误,因为没有任何 Kafka 设置。
下面是我的 application.yml 文件:
spring.cloud.stream.function.definition: amerData;emeaData;ackResponse #added new ackResponse here
spring.cloud.stream.kafka.streams:
binder:
brokers: remoteHost:9092
configuration:
schema.registry.url: remoteHost:8081
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
ackResponse-out-0: #new addition
producer.configuration:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings:
amerData-in-0:
destination: topic1
emeaData-in-0:
destination: topic2
ackResponse-out-0: #new addition
destination: topic3
并尝试了供应商的可能选项 -> Supplier<String> ackResponse() 或 Supplier<Message<String>> ackResponse()
它只是在我做Supplier<KStream<String,String>> ackResponse() 时不会将remoteHost 更改为localhost,然后bootstrap.servers 会显示配置的远程主机,但这不正确,我无法编写收到的响应(主要是字符串或json)这是一个 Kafka 主题。
我确实根据需要将我的消费者配置为 Consumer<KStream<String, AVROPOJO1>> amerData() 和 Consumer<KStream<String, AVROPOJO2>> emeaData(),它们工作正常。
我是否遗漏或搞砸了什么?我们不能在同一个 Spring Cloud Stream 应用程序中同时拥有生产者/消费者吗?使用Streambridge 也无法解决这个问题。有人可以帮忙吗?
【问题讨论】:
标签: avro apache-kafka-streams spring-cloud-stream kafka-producer-api confluent-schema-registry