【发布时间】:2019-08-09 15:31:31
【问题描述】:
我想创建一个 KTable(未来将是一个 GlobalKTable)来跟踪使用 Spring Cloud Streams 通过我的 Kafka 代理的条目。
我现在拥有的代码在侦听器的角度上可以正常工作,因为它接收通过 Kafka 传递的数据并提取我需要创建注册表的两个 Id。
最终的目标(简而言之)是在系统中创建一个位置,以便我们可以检查是否正在针对该组合(userId 和 appId)处理数据
// Peek on myBindings interface
public interface myBindings {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
MessageChannel input();
@Output(OUTPUT)
MessageChannel output();
}
//Peek on the service
@StreamListener(target = myBindings.INPUT)
public void listenforMessage(@Payload String jsonAsString) throws IOException {
ObjectNode node = new ObjectMapper().readValue(jsonAsString, ObjectNode.class);
KeyValue kvPair = new KeyValue<>(node.get(Info.USER_ID_KEY), node.get(Info.APP_ID_KEY));
// Here comes the question...
}
在检查(或打印)这对之后,我们看到数据是正确的。那么问题是:
如何用它构建一个 KTable,因为我没有使用 KStreams 并且我的输入对象不同?
我在网上找到的所有示例都使用 KStream/KTables 作为方法的入口点/参数,但是没有使用其他类型的入口的示例。
Kafka/Spring Cloud Streams 的新手。感谢您的帮助!
【问题讨论】:
-
您确实需要
KStream。实际上,您可以声明一个单独的kstream类型的绑定,直接从主题创建 ktable。您甚至可以将其具体化为键值存储,以便通过配置进行查找。
标签: java apache-kafka-streams spring-cloud-stream