【问题标题】:Emitting values from ktable and modifying it从 ktable 发出值并修改它
【发布时间】:2017-03-13 23:40:21
【问题描述】:

我正在尝试使用 kafka 解决以下问题。 有一个话题。我们称之为 src-topic。我不时收到来自该主题的记录。我想将这些值存储在 ktable 中,并每 10 秒将存储在 ktable 中的值发送到 dst-topic。当我第一次从这个 ktable 发出一个值时,我想将 1 附加到我发出的记录中。以后每次我都想将 0 附加到发出的记录中。

我正在寻找一个正确且最好是惯用的解决方案来解决这个问题。 我看到的解决方案之一是在我从 src-topic 摄取时发出一条附加了 1 的记录,然后将附加了 0 的记录存储在 ktable 中。另一个线程将从这个 ktable 中读取并定期发出记录。这种方法的问题是它有一个竞争条件。

我们将不胜感激。

【问题讨论】:

  • "每 10 秒将 ktable 中存储的值发送到 dst-topic" 你能澄清一下吗?是否要发出表中当前包含的所有值,每 10 秒发出一次?
  • 是的。 ktable 中的所有值。
  • 我想知道你试图从更高层次的角度完成什么......

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

没有直接的方法可以做到这一点。请注意,KTable 是一个变更日志(它可能在内部有一个表状态——并非所有 KTable 都有一个状态——但这是一个实现细节)。

因此,KTable 是一个流,您无法刷新流...而且由于状态(如果有)是内部的,因此您也无法刷新状态。

您只能通过还允许进行范围扫描的交互式查询访问状态。但是,这不会向下游发出任何内容,而是将数据提供给应用程序的“非 Streams 部分”。

我认为,你需要使用低级处理器 API 来获得你想要的结果。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-02-23
    • 2020-04-25
    • 1970-01-01
    • 2017-09-13
    • 2012-05-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多