【问题标题】:Kafka Streams - Aggregation with old stateKafka Streams - 与旧状态的聚合
【发布时间】:2018-10-05 21:30:12
【问题描述】:

我有一个 KStream,其中包含来自主题 to1 的数据,如下所示:

T1-KEY -> {T1}
T2-KEY -> {T2}

还有一个KTable,构造如下:

我正在使用 org.apache.kafka.streams.StreamsBuilder 从某个主题 to2 创建 KTable,如下所示:

A1-KEY -> { "A1", "Set": [
                          {"B1", "Rel": "T1"},
                          {"B2", "Rel": "T1"}
                         ]
          } 

..

该流随后被平面映射并按 Key s.t. 分组。生成的 KTable 如下所示:

T1 -> { ["B1", "B2"] }

稍后,现在主题 to2 中出现以下消息:

A1-KEY -> { "A1", "Set": [
                          {"B2", "Rel": "T1"}
                         ]
          } 

现在我希望我的 KTable 能够反映这些变化并如下所示:

T1 -> { ["B2"] }

但它看起来像这样:

T1 -> { ["B1", "B2"] }

我注意到,在我的Aggregator<Tx-KEY, Bx, Set<Bx>> 中给出的最后一个参数是集合["B1", "B2"],即使当我在聚合之前偷看时我只得到一个匹配"B2"

我是否理解聚合错误或这里发生了什么?

编辑

我想我缩小了范围:显然聚合的 Initializer 只在 very 第一次调用 - 之后聚合总是接收 last aggregate 作为最后一个参数,例如

@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {

}

Set&lt;Bx&gt; aggregate 在第一次调用时是 [](通过初始化程序创建),而在第二次调用时是 ["B1", "B2"]

有什么想法吗?

编辑 2

public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {

    @Override
    public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}

编辑 3

我不能只使用平面地图,因为我必须组合多个 Ax 元素,例如

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          },
A2-KEY -> { "A2", "Set": [
                      {"B2", "Rel": "T1"}
                     ]
          },
...

然后我期待一些类似的分组

T1 -> { ["B1", "B2"] }

并且在下一次迭代中,当消息

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          }

如期而至

T1 -> { ["B1"] }

..

【问题讨论】:

  • 请提供您的代码,尤其是您的聚合器的 apply 方法。您在编辑中写的内容是正确的。 Initializer,顾名思义,仅用于初始化,稍后将结果传递给第二个参数。
  • 感谢您的回复,我在上面添加了申请功能。不过,我确实想在那里有一套新的。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

请注意,在您的聚合器中,您只会将元素添加到聚合集。使用这种逻辑,您的集合(对于给定的键)永远不会缩小。在这种情况下,我认为您将流压扁了太多。我建议您不要将其展平到您的消息采用(Tx-KEY key, Bx value) 的形式,而是让它们始终保留其设置的形式:(Tx-KEY key, Set&lt;Bx&gt; value)。你根本不需要聚合。 为此,我建议您转换输入集

"Set": [
     {"B1", "Rel": "T1"},
     {"B2", "Rel": "T1"}
]

进入

T1 -> { ["B1", "B2"] }

通过在 KStream flatmap 方法调用中使用标准 java 代码(Collections 或 Streams api)按“Rel”字段分组,这样您就只会在 KStream 上发出带有Set&lt;Bx&gt; 类型值的消息,而不是Bx - 单独键入值。

如果您提供当前平面地图实现的代码,我们很乐意详细说明。

【讨论】:

  • Notice how in your aggregator you are only ever adding elements to the aggregate set. With this logic, your set (for a given key) can never shrink. 是的,那是由于我对初始化程序的误解。我在上面添加了第三个编辑,因为您提出的解决方案尚不符合我的要求。
  • 我不知道您所说的“在下一次迭代中”是什么意思。你指的是什么迭代?有没有办法从消息中确定迭代次数?我怀疑您可能需要使用比 Set 更复杂的结构,以便也能够捕获该信息,但需要有关您的用例的更多高级信息才能完全理解,因为目前您的要求对于不熟悉您的人来说是矛盾的用例。
  • 无论什么标志着新迭代的开始,如果您可以在聚合器函数中检测到它,那么您可以在将新元素放入其中之前清空集合。
  • 是的,这就是必需的。我必须能够注意到我必须从哪里读取另一个流(偏移开始和偏移结束)。感谢您的帮助!
猜你喜欢
  • 2019-07-03
  • 2019-10-15
  • 2022-10-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-23
  • 2019-09-05
相关资源
最近更新 更多