【问题标题】:merging post request in apache kafka在apache kafka中合并发布请求
【发布时间】:2019-09-24 05:54:08
【问题描述】:

我有 3 个提供 api 来访问它们的数据源,每个都提供一些信息,我想处理所有三个请求并将它们全部保存在数据库中的单个请求中。所以我的问题是,如果所有这些来源都有一个共同的 ID 将其保存在数据库中并向下游系统发送单个请求,Kafka 是否可以合并这些请求..?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams kafka-producer-api


    【解决方案1】:

    我们可以将不同数据源的数据存储在不同的topic中,然后使用Kafka Streams将多个topic的消息合并为一个topic。

    Scala 示例合并数据

    val mergedStream = streamBuilder.merge(sensor1Stream, sensor2Stream, sensor3Stream, sensor4Stream)
    
    mergedStream.to(Serdes.String(), heartbeatSerde, "Merged-SensorsHeartbeat")
    

    我建议您点击以下链接了解更多信息。

    http://www.alternatestack.com/development/app-development/kafka-streams-merging-multiple-kstream-s/

    有两种类型的连接器可用Sink and Source Connectors。 Source 连接器用于从外部系统向 kafka 主题写入数据,Sink 连接器用于从 kafka 主题中读取数据并写入外部系统。

    现在您可以使用kafka connectorsdebezium 从合并分区中读取数据并写入您的数据库。

    【讨论】:

    • 请注意,KStreamBuilder 自 1.0.0 版本以来已弃用。您可能想改用StreamsBuilder。参照。 docs.confluent.io/4.0.0/streams/…
    • 另外,merge() 合并流但不合并事件。即,如果您有 3 个输入主题和每个 2 条消息,则输出将是 6 条消息。如果你想合并记录,使用aggregate() 将是要走的路。即使我认为,对于这个用例来说,使用“处理器 API”可能比使用 DSL 更好。
    猜你喜欢
    • 2011-12-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-28
    • 2020-12-19
    • 2016-02-16
    • 2023-03-04
    相关资源
    最近更新 更多