【发布时间】:2019-09-24 05:54:08
【问题描述】:
我有 3 个提供 api 来访问它们的数据源,每个都提供一些信息,我想处理所有三个请求并将它们全部保存在数据库中的单个请求中。所以我的问题是,如果所有这些来源都有一个共同的 ID 将其保存在数据库中并向下游系统发送单个请求,Kafka 是否可以合并这些请求..?
【问题讨论】:
标签: apache-kafka apache-kafka-streams kafka-producer-api
我有 3 个提供 api 来访问它们的数据源,每个都提供一些信息,我想处理所有三个请求并将它们全部保存在数据库中的单个请求中。所以我的问题是,如果所有这些来源都有一个共同的 ID 将其保存在数据库中并向下游系统发送单个请求,Kafka 是否可以合并这些请求..?
【问题讨论】:
标签: apache-kafka apache-kafka-streams kafka-producer-api
我们可以将不同数据源的数据存储在不同的topic中,然后使用Kafka Streams将多个topic的消息合并为一个topic。
Scala 示例合并数据
val mergedStream = streamBuilder.merge(sensor1Stream, sensor2Stream, sensor3Stream, sensor4Stream)
mergedStream.to(Serdes.String(), heartbeatSerde, "Merged-SensorsHeartbeat")
我建议您点击以下链接了解更多信息。
http://www.alternatestack.com/development/app-development/kafka-streams-merging-multiple-kstream-s/
有两种类型的连接器可用Sink and Source Connectors。 Source 连接器用于从外部系统向 kafka 主题写入数据,Sink 连接器用于从 kafka 主题中读取数据并写入外部系统。
现在您可以使用kafka connectors 或debezium 从合并分区中读取数据并写入您的数据库。
【讨论】:
KStreamBuilder 自 1.0.0 版本以来已弃用。您可能想改用StreamsBuilder。参照。 docs.confluent.io/4.0.0/streams/…
merge() 合并流但不合并事件。即,如果您有 3 个输入主题和每个 2 条消息,则输出将是 6 条消息。如果你想合并记录,使用aggregate() 将是要走的路。即使我认为,对于这个用例来说,使用“处理器 API”可能比使用 DSL 更好。