【发布时间】:2020-06-14 00:49:06
【问题描述】:
我有一个简单的 Flink 应用,它总结了最后一分钟内具有相同 id 和时间戳的事件:
DataStream<String> input = env
.addSource(consumerProps)
.uid("app");
DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));
pixels
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(simpleNotificationServiceSink);
env.execute(jobName);
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
public TimestampsAndWatermarks() {
super(Time.seconds(90));
}
// timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
@Override
public long extractTimestamp(Pixel pixel) {
return Long.parseLong(pixel.timestampReadable);
}
}
我想实现这个场景:
启动嵌入式 Kafka
向主题发布几条消息
使用 Flink 消费消息
检查 Flink 产生的输出的正确性
Flink 是否提供实用程序来测试嵌入式 Kafka 的作业?如果是,推荐的方法是什么?
谢谢。
【问题讨论】:
标签: testing apache-kafka integration-testing apache-flink flink-streaming