【问题标题】:Deduplicate intermediate results of KStream-KStream joins in Kafka Streams去重 KStream-KStream 的中间结果加入 Kafka Streams
【发布时间】:2019-09-28 01:41:48
【问题描述】:

我有以下场景:

  1. 表 A 和表 B 使用 FK 连接。
  2. 事务性插入/更新到 A 和 B。
  3. Debezium 为表 A 发出一个事件 a,为表 B 发出一个事件 b
  4. Kafka Streams 为表 A 和 B 创建 KStream。
  5. Kafka Streams 应用程序leftJoin KStreams A 和 B。(假设ab 记录具有相同的键并落在连接窗口中)。
  6. 输出记录将为[a, null], [a, b]

你如何丢弃[a, null]

一个选项是执行innerJoin,但在update查询的情况下这仍然是一个问题。

我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但不能保证时间戳的唯一性。

即。最终目标是能够识别最新的聚合,以便我们可以在查询时过滤掉中间结果(在 Athena/Presto 或某些 RDBMS 中)。

【问题讨论】:

  • 加入后可以filter()吗?另请注意,支持外键连接的是 WIP atm:cwiki.apache.org/confluence/display/KAFKA/…
  • 我可以使用filter,但不想依赖插入始终是事务性的实现细节。考虑两个顺序插入,这将是两个事件,对于 KStream-KStream 连接,它们将输出 2 条记录,而不仅仅是我想要的 1 条。
  • @MatthiasJ.Sax 特别是我想弄清楚在使用 S3 接收器之类的东西时如何识别下游每个键的最新消息。我目前为eventCreatedAt 添加了一个字段,但对于同一事务中的事件,这显然是相同的(并且不保证会增加)。

标签: apache-kafka apache-kafka-streams debezium


【解决方案1】:

目前,我发现的最佳工作方法是利用输出记录中的 Kafka 偏移量。

方法可以概括为:

  1. 执行您想要执行的所有逻辑,不必担心同一个键有多个记录。
  2. 将结果写入中间主题,保留时间非常短(1 小时等)
  3. 使用处理器读取中间主题,并在处理器内使用context.offset() 使用 Kafka 偏移量丰富消息。
  4. 将消息写入输出主题。

现在,您的输出主题包含相同键的多条消息,但每条消息的偏移量不同。

现在在查询期间,您可以使用子查询为每个键选择最大偏移量。

TransformerSupplier 示例如下所示

/**
 * @param <K> key type
 * @param <V> value type
 */
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
  @Override
  public Transformer<K, V, KeyValue<String, String>> get() {
    return new OutputTransformer<>();
  }

  private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
      this.context = context;
    }

    /**
     * @param key   the key for the record
     * @param value the value for the record
     */
    @Override
    public KeyValue<String, String> transform(K key, V value) {
      if (value != null) {
        value.setKafkaOffset(context.offset());
      }
      return new KeyValue<>(key, value);
    }

    @Override
    public KeyValue<String, String> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {
      // nothing to close
    }
  }
}

【讨论】:

    猜你喜欢
    • 2020-04-21
    • 2018-09-18
    • 2020-05-02
    • 1970-01-01
    • 2022-10-24
    • 2018-08-30
    • 2019-09-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多