【发布时间】:2021-09-02 03:09:49
【问题描述】:
我们有一个从输入主题(带有binder:x - 代理地址:x)读取的拓扑,并且使用spring cloud stream kafka流处理记录并写入输出主题(带有binder:y - 代理地址:y)。记录不会写入输出主题。但是,当我将活页夹(经纪人地址)设置为相同(同时使用 x 或 y)时,记录将写入主题 y。 我应该在拓扑中使用同一个代理吗?我需要为输入和输出主题使用不同的绑定器和代理吗?我该如何解决这个问题?
错误: 2021-06-17 12:17:21.483 WARN 20848 --- [read-1-producer] OakcNetworkClient:[Producer clientId=inputTopic-32100000000000000000015-f0bd5423-e670-43e8-ab0b-84ec5505c2fd-StreamThread-1-producer]在获取相关 id 为 182 的元数据时:{inputTopic=UNKNOWN_TOPIC_OR_PARTITION}
Application.yml
spring:
cloud:
stream:
kafka:
streams:
bindings:
doob-output-topic-out:
applicationId: doob-output-topic-out
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
doob-input-topic-in:
consumer:
applicationId: doob-input-topic-in
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binders:
outputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${1kafka.brokers1}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
inputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${2kafka.brokers2}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
max:
request:
size: 20000000
bindings:
doob-output-topic-out:
destination: outputTopic
binder: outputKafka
producer:
partition-count: 8
doob-input-topic-in:
destination: inputTopic
binder: inputKafka
manage:
storeName: trackList15
源代码:
@StreamListener(BASE_TOPIC_INPUT)
@SendTo(BASE_TOPIC_OUTPUT)
public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
.reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(Serdes.String()).
withValueSerde(baseDataSerde)).toStream()
.peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
}
【问题讨论】:
-
1) 听起来您的代理不属于同一个集群 2) 您应该使用逗号分隔的列表作为代理属性
标签: java apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka