【发布时间】:2018-05-23 14:19:19
【问题描述】:
我正在使用 Apache Kafka 流对从 Kafka 主题消耗的数据进行聚合。然后聚合被序列化到另一个主题,它本身被消耗并将结果存储在数据库中。我想这是非常经典的用例。
聚合调用的结果是创建一个由 Kafka 更改日志“主题”备份的 KTable。
这比实际情况更复杂,但假设它存储给定键的事件计数和总和(以计算平均值):
KTable<String, Record> countAndSum = groupedByKeyStream.aggregate(...)
该更改日志“主题”似乎没有设置保留期(根据我的全局保留设置,与其他主题相反,我没有看到它“过期”)。
这实际上是好的/必要的,因为这样可以避免在将来的事件带有相同的键时丢失我的聚合状态。
但是从长远来看,这意味着这个变更日志将永远增长(随着更多的密钥进入)?而且我确实可能有很多键(而且我的聚合不像计数/总和那么小)。
由于我有办法知道我不会再收到特定键的事件(某些事件被标记为“最终”),有没有办法让我剥离这些特定键的聚合状态更改日志以避免它永远增长,因为我不再需要它们,可能会稍微延迟“只是”以防万一?
或者也许有一种方法可以完全不同地使用 Kafka 流来避免这个“问题”?
【问题讨论】:
-
我刚刚阅读了有关墓碑消息的内容,密钥将是一个可能允许我删除这些消息的空消息。还需要测试。并且仍然对什么是正确的模式感兴趣。
-
是:更改日志主题配置了日志压缩而不是保留时间。如果您收到“最终”记录,您的聚合可以返回
null作为聚合结果。这将从本地 RocksDB 存储以及底层更改日志主题中删除它。 -
感谢 Matthias,我已经测试并确认一切正常,在达到“最终”记录时返回 null。
-
发表了我的评论作为答案。
标签: apache-kafka apache-kafka-streams