【发布时间】:2020-05-08 08:43:35
【问题描述】:
我有一个使用 spring-cloud-stream 库的 kafka 流处理应用程序。此应用程序利用 3 个 application.id 值来收听 3 个主题。对于其中 2 个输入主题,在处理完数据后,我将消息推送到各自的输出主题上,然后我使用这些主题创建 GlobalKTables,如下所示:
streamsBuilder.globalTable(firstSSTopic, Consumed.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>
as("ss-1")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
streamsBuilder.globalTable(secondSSTopic, Consumed.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>
as("ss-2")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
那么,问题是,它使用什么 application.id 来使用来自“firstSSTopic”和“secondSSTopic”的数据?或者它只是作为没有任何组的独立消费者的 GlobalStreamThread?当我检查默认状态目录 (tmp/kafka-streams) 时,我可以在所有 3 个 application.id 目录下看到两个全局状态存储的 sst 和日志文件。我怎样才能避免这种情况?因为这将占用 3 倍的磁盘空间,并可能导致存储空间被快速填满。
【问题讨论】:
-
你为什么使用
GlobalKTable?您无法避免它(在所有 3 个 application.id 目录下)。这就是GlobalKTable的工作方式(这是正常行为,取决于 GlobalKTable 的本质) -
我使用 GlobalKTable 的原因是我希望将所有分区中的数据存储在所有应用程序实例中。我想,它会在应用程序的多个实例中复制,但不明白为什么它会在多个位置(即每个 application.id)在同一个应用程序实例中保留相同的数据。
标签: apache-kafka-streams spring-cloud-stream