【问题标题】:Testing Flink with embedded Kafka使用嵌入式 Kafka 测试 Flink
【发布时间】:2020-06-14 00:49:06
【问题描述】:

我有一个简单的 Flink 应用,它总结了最后一分钟内具有相同 id 和时间戳的事件:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));

pixels
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(simpleNotificationServiceSink);


env.execute(jobName);


private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
        public TimestampsAndWatermarks() {
            super(Time.seconds(90));
        }

        // timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
        @Override
        public long extractTimestamp(Pixel pixel) {
            return Long.parseLong(pixel.timestampReadable);
        }
    }

我想实现这个场景:

  1. 启动嵌入式 Kafka

  2. 向主题发布几条消息

  3. 使用 Flink 消费消息

  4. 检查 Flink 产生的输出的正确性

Flink 是否提供实用程序来测试嵌入式 Kafka 的作业?如果是,推荐的方法是什么?

谢谢。

【问题讨论】:

    标签: testing apache-kafka integration-testing apache-flink flink-streaming


    【解决方案1】:

    您可以使用一条 JUnit 规则来调出嵌入式 Kafka——请参阅(请参阅https://github.com/charithe/kafka-junit)。

    要让测试完全终止,请尝试以下操作:

    public class TestDeserializer extends YourKafkaDeserializer<T> {
      public final static String END_APP_MARKER = "END_APP_MARKER"; // tests send as last record
    
      @Override
      public boolean isEndOfStream(ParseResult<T> nextElement) {
        if (nextElement.getParseError() == null)
          return false;
    
        if (END_APP_MARKER.equals(nextElement.getParseError().getRawData()))
          return true;
    
        return false;
      }
    }
    

    【讨论】:

    • 我推荐(测试容器)[testcontainers.org/modules/kafka/],因为它也可以用于其他资源。我通常将它与mguenther.github.io/kafka-junit 结合起来创建主题并添加/读取数据。
    • 有人能指出一个在依赖 Kafka 的测试中使用 testcontainers 的示例吗?就像 OP 所描述的那样?
    • @salvalcantara 如果你在 Flink 源码中搜索,你会发现 testcontainers 已经被使用了一点。 https://github.com/search?q=testcontainers+flink+kafka&amp;type=code 有很多点击量——你可能会在那里找到一些有用的东西。
    • 感谢大卫,这些都是很好的例子!
    猜你喜欢
    • 2021-07-15
    • 2016-12-03
    • 1970-01-01
    • 2019-07-12
    • 1970-01-01
    • 2021-05-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多