【问题标题】:Kafka Streams join unrelated streamsKafka Streams 加入不相关的流
【发布时间】:2020-02-04 07:55:44
【问题描述】:

我有一个事件流,我需要与 ktable / changelog 主题进行匹配,但匹配是通过对 ktable 条目的属性进行模式匹配来完成的。所以我不能基于一个键加入流,因为我还不知道哪个匹配。

示例:

ktable X:

{
  [abc]: {id: 'abc', prop: 'some pattern'},
  [efg]: {id: 'efg', prop: 'another pattern'}
}

流 A:

{ id: 'xyz', match: 'some pattern'}

所以stream A应该转发{match: 'abc'}

所以我基本上需要遍历 ktable 条目并通过对该属性的模式匹配找到匹配的条目。

基于 ktable 创建全局状态存储,然后从处理器 API 访问它并遍历条目是否可行?

我还可以将 ktable 的所有条目聚合到 1 个集合中,然后加入“假”键?但这似乎也相当 hacky。

或者我只是强迫一些不是真正流的东西,而只是使用普通的消费者 API 将其放入 redis 缓存中,这也有点尴尬,因为我宁愿让它得到 RocksDB 的支持。

编辑:我想这有点与this question有关

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    GlobalKTable 不起作用,因为 stream-globalTable 联接允许您从流中提取非键联接属性,但对表的查找仍然基于表键。

    但是,您可以将表输入主题读取为KStream,提取连接属性,将其设置为键,然后进行返回集合(即列表、集合等)的聚合。这样,您可以对键进行流表连接,然后是 flatMapValues()(或 flatMap())将连接结果拆分为多个记录(取决于表集合中有多少记录) .

    只要您的连接属性没有太多重复项(对于表输入主题),因此表中的值侧集合不会增长太大,这应该可以正常工作。您将需要提供一个自定义值-Serde 来(反)序列化收集数据。

    【讨论】:

    • 这里的问题是,我没有一个属性,这正是我可以加入的关键。属性匹配由属性的某些处理(模式匹配)确定。所以基本上,对于我的 Stream A 的每个事件,我需要一个集合(聚合)中 Ktable X 的所有可用数据,以便我可以找到匹配项。换句话说,我需要一个 ktable 的所有条目作为另一个流中的一个集合
    • 对于这种情况,您需要使用处理器 API。使用全局状态存储,您可以为每个输入记录扫描整个存储以查找具有相应模式的记录。DSL 仅支持== 作为连接条件。
    • 您能详细说明一下全局状态存储吗?听起来不太灵活....有没有办法简单地自定义当前外键连接的连接条件?我希望具有与外键连接相同的功能,但是在我的情况下,外键提取器可以提取多条记录,并且如果在另一侧,其中一个键匹配,则需要进行匹配。如果数据表示图表,或者您想要多值属性,情况就是这样
    • 您始终可以通过使用处理器 API 实现自己的运算符来自定义所有内容。 -- 但是,DSL 有一组固定的预定义运算符,并且只有 1:1 PK-equi-joins 和 1:n FK-equi-joins 可用。
    【解决方案2】:

    通常我会映射表数据,以便获得所需的连接键。我们最近有一个类似的案例,我们必须在 KTable 中加入带有相应数据的流。在我们的例子中,流键是表键的第一部分,所以我们可以按第一个键部分分组并将结果聚合到一个列表中。最后它看起来像这样。

    final KTable<String, ArrayList<String>> theTable = builder
            .table(TABLE_TOPIC, Consumed.with(keySerde, Serdes.String()))
            .groupBy((k, v) -> new KeyValue<>(k.getFirstKeyPart(), v))
            .aggregate(
                    ArrayList::new,
                    (key, value, list) -> {
                        list.add(value);
                        return list;
                    },
                    (key, value, list) -> {
                        list.remove(value);
                        return list;
                    },
                    Materialized.with(Serdes.String(), stringListSerde));
    
    final KStream<String, String> theStream = builder.stream(STREAM_TOPIC);
    
    theStream
            .join(theTable, (streamEvent, tableEventList) -> tableEventList)
            .flatMapValues(value -> value)
            .map(this::doStuff)
            .to(TARGET_TOPIC);
    

    我不确定,如果您也可以这样做,意思是,也许您可​​以以某种方式将表数据映射到联接。

    我知道这并不完全属于你的情况,但我希望无论如何它可能会有所帮助。也许您可以澄清一下,您的案例的匹配情况如何。

    【讨论】:

    • 匹配实际上是基于地理数据匹配完成的,所以我猜确实没有部分或任何可以使用的键,但感谢您的输入!
    • 我只是又在想你的问题,以及卡夫卡地理匹配的问题。我想,也许有一种方法可以简单地对数据进行聚类,这样你至少可以减少结果的数量,然后尝试在减少的结果中找到完全匹配的结果。我找到了这篇文章,描述了如何对地理坐标进行聚类。也许对你有帮助。
    猜你喜欢
    • 2019-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-09
    相关资源
    最近更新 更多