【问题标题】:Kafka Streams (Suppress): Restarting the TopologyKafka Streams(抑制):重新启动拓扑
【发布时间】:2019-10-23 20:38:48
【问题描述】:

我发现了两个问题,问如果没有新记录放入分区,为什么不发出结果记录:
1."Kafka Stream Suppress session-windowed-aggregation"
2."Kafka Streams (Suppress): Closing a TimeWindow by timeout"

在这两个问题的答案中,解释是必须发送一条新记录才能发出一条记录。

我不明白为什么在没有新记录的情况下在超时后发出记录会违反禁止合同并希望得到解释。

目前最好的建议是使用虚拟记录来触发发射。

我认为关闭和重新启动流(拓扑)可能比编写虚拟记录更合适。我认为流的新实例会使记录达到峰值并在超时已经到期时发出结果。

但是,我尝试并发现它不起作用。如果可能的话,我将不胜感激。

@Slf4j
public class KafkaStreamVerticle extends AbstractVerticle {

  private KafkaStreams streams;

  @Override
  public void start(Future<Void> startFuture) throws Exception {

    Single.fromCallable(() -> getStreamConfiguration()).subscribe(config -> {

      final StreamsBuilder builder = new StreamsBuilder();

      builder.<String, String>stream(KafkaProducerVerticle.TOPIC)
          .flatMapValues((k, v) -> List.<JsonObject>of(new JsonObject(v).put("origKey", k)))
          .selectKey((k, v) -> v.getString(KafkaProducerVerticle.CATEGORY))
          .flatMapValues(v -> List.<String>of(v.toString()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
          .windowedBy(TimeWindows.of(Duration.ofSeconds(4)).grace(Duration.ZERO)).count()
          // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())).toStream().foreach((k,
          .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(4), BufferConfig.unbounded()))
          .toStream().foreach((k, v) -> log.info("********* {}: {} - {}: {}", k.key(),
              k.window().start(), k.window().end(), v));

      streams = buildAndStartsNewStreamsInstance(config, builder);
      Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
      restartStreamsPeriodicaly(config, builder, 30_000L);
      log.info("consumer deployed");
      startFuture.complete();
    });
  }

  private KafkaStreams buildAndStartsNewStreamsInstance(Properties config,
      final StreamsBuilder builder) {
    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.cleanUp();
    streams.start();
    return streams;
  }

  private void restartStreamsPeriodicaly(Properties config, final StreamsBuilder builder,
      @NonNull Long period) {
    vertx.setPeriodic(period, l -> {
      log.info("restarting streams!!");
      streams.close();
      streams = buildAndStartsNewStreamsInstance(config, builder);
    });
  }

  private Properties getStreamConfiguration() {
    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "suppress-example");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "suppress-client");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName());
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
    return streamsConfiguration;
  }
}

【问题讨论】:

    标签: apache-kafka-streams suppress


    【解决方案1】:

    Kafka Stream 提供事件时间语义,这意味着,它的内部时间仅根据记录的时间戳提前(内部时间永远不会根据挂钟时间提前)。您正在考虑的“超时”,它也基于事件时间(不是挂钟时间)。

    假设您有一个大小为 5 的窗口(即,[0,5) 将是一个窗口),并且您看到 ts=1,2,3 的数据。这意味着下一条记录可能有 timestamp=4 并且必须包含在窗口中。但是,如果没有新数据到达,则无论等待多长时间,都无法发出窗口结果。只有当时间戳=5 的记录到达时,内部时间才会提前并且现在大于窗口结束时间,并且会发出窗口的结果。如果 suppress() 会在某个基于挂钟的超时后发出数据,并且下一条记录的时间戳 = 4,则它会发出错误的结果。

    此外,suppress() 会记住它的内部状态和时间。因此,即使您重新启动应用程序,suppress() 仍会缓冲数据,并仍会等待 timestamp=5 的记录发出数据。

    【讨论】:

    • 非常感谢马蒂亚斯!现在我更好地理解了其中的原理。但是,它使“最终窗口结果”的当前实现无法用于任何我们在任何情况下都需要结果而不依赖于下一条记录来提前时间的关键业务应用程序。如果有多个生产者(例如一组 Kafka 连接器),使用虚拟记录确实很笨拙,而且更加困难,因为他们需要决定哪个生产者应该将虚拟记录发送到哪些分区。我希望迟早会有另一个解决方案。
    • 请注意,每个分区的时间会单独提前,对于带有分区的 any 记录。时间不是基于每个键来跟踪的。由于它是流处理,因此预计不会停止输入并且分区不再有任何数据。 -- 另一种方法可能是,使用自定义 valueTransformer() 实现您想要的。
    猜你喜欢
    • 2017-06-09
    • 1970-01-01
    • 1970-01-01
    • 2018-06-25
    • 1970-01-01
    • 2020-08-28
    • 1970-01-01
    • 2019-10-31
    • 1970-01-01
    相关资源
    最近更新 更多