【问题标题】:Testing window aggregation with Kafka Streams使用 Kafka Streams 测试窗口聚合
【发布时间】:2018-10-04 09:28:23
【问题描述】:

我正在使用 Kafka Streams 的 TopologyTestDriver 来测试我们的数据管道。

它对我们所有的简单拓扑(包括使用 Store 的有状态拓扑)都非常有效。 我的问题是当我尝试使用此测试驱动程序来测试使用窗口聚合的拓扑时。

我复制了一个简单的示例,该示例将在 10 秒窗口内使用相同键接收的整数相加。

public class TopologyWindowTests {

TopologyTestDriver testDriver;
String INPUT_TOPIC = "INPUT.TOPIC";
String OUTPUT_TOPIC = "OUTPUT.TOPIC";

@Before
public void setup(){
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    // EventProcessor is a <String,String> processor
    // so we set those serders
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
    testDriver = new TopologyTestDriver(defineTopology(),config,0L);
}

/**
 * topology test
 */
@Test
public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));

    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());

    Assert.assertNull(outputRecord);
}

@After
public void tearDown() {
    testDriver.close();
}

/**
 * Defines topology
 * @return
 */
public Topology defineTopology(){
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String,Integer> inputStream = builder.stream(INPUT_TOPIC);

    KTable<Windowed<String>, Integer> groupedMetrics = inputStream.groupBy((key,value)->key,
            Serialized.with(Serdes.String(),Serdes.Integer())).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10))).aggregate(
            ()-> 0,
            (String aggKey, Integer newValue, Integer aggValue)->{
                Integer val = aggValue+newValue;
                return val;
            },
            Materialized.<String,Integer,WindowStore<Bytes,byte[]>>as("GROUPING.WINDOW").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())
    );

    groupedMetrics.toStream().map((key,value)->KeyValue.pair(key.key(),value)).to(OUTPUT_TOPIC);

    return builder.build();

}

}

我希望在这个测试用例中,除非我将挂钟时间提前 10 秒,否则不会返回任何输出主题...但是我得到以下输出

java.lang.AssertionError: expected null, but was:<ProducerRecord(topic=OUTPUT.TOPIC, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=k, value=2, timestamp=0)>

我在这里遗漏了什么吗? 我正在使用卡夫卡 2.0.0

更新

提前致谢

根据 Matthias 的回复,我准备了以下测试:

@Test
public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));

    // Testing 2+2=4
    ProducerRecord<String, Integer> outputRecord1 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
    Assert.assertEquals(Integer.valueOf(4),outputRecord1.value());

    // Testing no more events in the window
    ProducerRecord<String, Integer> outputRecord2 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
    Assert.assertNull(outputRecord2);
}

两个输入消息都使用相同的时间戳发送,因此我希望输出主题中只有一个事件包含我的值的总和。但是,我在输出中接收到 2 个事件(第一个值为 2,第二个值为 4),我认为这不是拓扑所需的行为。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    默认情况下,Kafka Streams 在 event-time 上运行窗口操作,而不是 wall-clock-time。这保证了确定性处理语义(挂钟时间处理本质上是非确定性的)。查看文档了解更多详情:https://docs.confluent.io/current/streams/concepts.html#time

    因此,输入记录的时间戳决定了将记录放在哪个窗口中。此外,您的输入记录的时间戳会提前基于这些事件时间戳的内部跟踪“流时间”。

    另请注意,Kafka Streams 遵循连续处理模型,并且确实发出 updated 而不是等待窗口结束条件。这对于处理迟到(又名乱序数据)很重要。比较How to send final kafka-streams aggregation result of a time windowed KTable?https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

    更新

    这是因为“更新”处理模型。聚合时,每个输入记录都会更新“当前”结果,并生成“当前结果输出记录”。这发生在每条记录上(不是每条时间戳)。

    【讨论】:

    • 感谢 Matthias 的快速回复。当我使用相同的时间戳向输入主题发送 2 个事件时,我添加了一个新测试来显示测试驱动程序的行为。我希望输出中只有两个值的总和,但我得到了 2 个事件...... ¿你能解释一下吗?
    • @DavidO 扩展了我的答案
    • 我理解您的意思,但这不是我在针对 Kafka 集群运行拓扑时看到的行为。在这种情况下,我只能在每个窗口中看到一个输出事件,无论我发送了多少记录。所以,我的观点是,有些东西没有按应有的方式运行,因为针对测试驱动程序和针对 Kafka 集群的行为是不同的。你可以在这里找到整个项目:github.com/davidonoro/ks-streaming-example
    • 不同之处在于TopologyTestDriver 在它处理的每条记录之后提交(这包括刷新 KTable 状态存储缓存)。当您针对集群运行时,Kafka Streams 默认情况下仅每 30 秒提交一次,因此连续更新是“重复数据删除”。如果禁用缓存,您可以获得相同的行为。比较docs.confluent.io/current/streams/developer-guide/…
    • 所以根据最后的评论,它真的不可能使用拓扑测试驱动程序准确测试窗口流拓扑吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-15
    • 1970-01-01
    • 2020-03-24
    • 1970-01-01
    • 2016-12-20
    相关资源
    最近更新 更多