【问题标题】:Delete Downstream ChangeLog Objects KafkaStreams删除下游 ChangeLog 对象 KafkaStreams
【发布时间】:2020-03-18 16:56:42
【问题描述】:

我正在尝试删除值为 null 的记录,在下游变更日志中,我知道在 statestore 中它们只是通过为 null(墓碑)而被删除,但是当您对 KTable 或 Stream 进行聚合时,它们会跳过null 并且不要删除它。我需要想办法在聚合中设置删除标志,让 Kafka 知道可以删除记录。这是我的代码:

   public void deleteByEntity(String inputTopic, String target, String stateStoreName) {

      // Need to set property to true in application.properties
//      if ("true".equals(utils.getProperty(ApplicationConfigs.KAFKA_DELETE_BY_ENTITY))) {
      Materialized<String, String, KeyValueStore<Bytes, byte[]>> storeName =
              Materialized.as(stateStoreName);

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> docStream = streamsBuilder.stream(inputTopic);

    KTable<?, ?> dataInTable =
        docStream
            .groupByKey()
            .reduce(
                (value1, value2) -> {
                  //                  System.out.println("aa");
                  if (value1.equals(target)) {
                    // If key equals target topic return null, creates tombstone deletes from
                    // statestore, sends null record downstream
                    return null;
                  }
                  return value2;
                },
                storeName);
    //    System.out.println(dataInTable);
  }

谢谢

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    如果您从Reducer 中删除return null,它将从存储中删除数据并且它将发送相应的输出记录&lt;key,null&gt;。因此,不需要进行下游处理。

    请注意,null 键和 null 值仅在 input 记录到reduce() 时被忽略。

    【讨论】:

    • 以下代码是否适用于查找键然后删除对象?if (value1.equals(target)) { // 如果键等于目标主题 return null,则从 statestore 中删除主题 return null; }
    • Reducer#apply(...) 被调用时,您并不真正知道密钥——因此,您需要在调用groupBy() 之前将密钥复制到值中。
    • 好的,所以实际上在我调用 groupByKey 之前,我需要以某种方式获取要从中删除值的键,所以如果键是“a”并且我想将“a”的值设为空" 我必须以某种方式获得那个关键值?
    • 是的。并且需要将要删除的键放入消息的值中,以便apply() 可以读取并返回null
    • 好的,我编辑了我的初始代码,在过去几天我也对其进行了更改。我想我明白了,谢谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-22
    • 1970-01-01
    • 2016-02-25
    • 1970-01-01
    • 2012-01-31
    相关资源
    最近更新 更多