【发布时间】:2020-11-05 06:00:00
【问题描述】:
我是 flink 新手,我正在尝试编写 junit 测试用例来测试 KeyedBroadCastProcessFunction。下面是我的代码,我目前正在调用 TestUtils 类中的 getDataStreamOutput 方法,并将 inputdata 和 patternrules 传递给方法,一旦根据模式规则列表评估输入数据并且如果输入数据满足条件,我将获得信号并调用 sink 函数和在 getDataStreamOutput 方法中将输出数据作为字符串返回
@Test
public void testCompareInputAndOutputDataForInputSignal() throws Exception {
Assertions.assertEquals(sampleInputSignal,
TestUtils.getDataStreamOutput(
inputSignal,
patternRules));
}
public static String getDataStreamOutput(JSONObject input, Map<String, String> patternRules) throws Exception {
env.setParallelism(1);
DataStream<JSONObject> inputSignal = env.fromElements(input);
DataStream<Map<String, String>> rawPatternStream =
env.fromElements(patternRules);
//Generate a key,value pair of set of patterns where key is pattern name and value is pattern condition
DataStream<Tuple2<String, Map<String, String>>> patternRuleStream =
rawPatternStream.flatMap(new FlatMapFunction<Map<String, String>,
Tuple2<String, Map<String, String>>>() {
@Override
public void flatMap(Map<String, String> patternRules,
Collector<Tuple2<String, Map<String, String>>> out) throws Exception {
for (Map.Entry<String, String> stringEntry : patternRules.entrySet()) {
JSONObject jsonObject = new JSONObject(stringEntry.getValue());
Map<String, String> map = new HashMap<>();
for (String key : jsonObject.keySet()) {
String value = jsonObject.get(key).toString();
map.put(key, value);
}
out.collect(new Tuple2<>(stringEntry.getKey(), map));
}
}
});
BroadcastStream<Tuple2<String, Map<String, String>>> patternRuleBroadcast =
patternStream.broadcast(patternRuleDescriptor);
DataStream<Tuple2<String, JSONObject>> validSignal = inputSignal.map(new MapFunction<JSONObject,
Tuple2<String, JSONObject>>() {
@Override
public Tuple2<String, JSONObject> map(JSONObject inputSignal) throws Exception {
String source =
inputSignal.getSource();
return new Tuple2<>(source, inputSignal);
}
}).keyBy(0).connect(patternRuleBroadcast).process(new MyKeyedBroadCastProcessFunction());
validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
JSONObject>() {
@Override
public JSONObject map(Tuple2<String, JSONObject> inputSignal) throws Exception {
return inputSignal.f1;
}
}).addSink(new getDataStreamOutput());
env.execute("TestFlink");
}
return (getDataStreamOutput.dataStreamOutput);
}
@SuppressWarnings("serial")
public static final class getDataStreamOutput implements SinkFunction<JSONObject> {
public static String dataStreamOutput;
public void invoke(JSONObject inputSignal) throws Exception {
dataStreamOutput = inputSignal.toString();
}
}
我需要使用相同的广播规则测试不同的输入,但是每次我调用此函数时,它都会一次又一次地从开始获取输入信号广播数据进行处理,有没有办法我可以广播一次并继续将输入发送到我探索的方法我可以使用 CoFlatMapFunction 类似下面的东西来组合数据流并在方法运行时继续发送输入规则,但是对于这个数据流必须继续从 kafka 主题获取数据,它会在加载 kafka 的方法上负担过重实用程序和服务器
DataStream<JSONObject> inputSignalFromKafka = env.addSource(inputSignalKafka);
DataStream<org.json.JSONObject> inputSignalFromMethod = env.fromElements(inputSignal));
DataStream<JSONObject> inputSignal = inputSignalFromMethod.connect(inputSignalFromKafka)
.flatMap(new SignalCoFlatMapper());
public static class SignalCoFlatMapper
implements CoFlatMapFunction<JSONObject, JSONObject, JSONObject> {
@Override
public void flatMap1(JSONObject inputValue, Collector<JSONObject> out) throws Exception {
out.collect(inputValue);
}
@Override
public void flatMap2(JSONObject kafkaValue, Collector<JSONObject> out) throws Exception {
out.collect(kafkaValue);
}
}
我在 stackoverflow How to unit test BroadcastProcessFunction in flink when processElement depends on broadcasted data 中找到了一个链接,但这让我很困惑
在测试用例中,我只能在 Before 方法中广播一次,并不断向我的广播函数发送不同类型的数据
【问题讨论】:
-
请注意,这也在邮件列表中进行了讨论。见apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/…。
标签: junit apache-flink flink-streaming flink-cep