【发布时间】:2020-04-29 23:30:08
【问题描述】:
我正在使用 Apache Flink 对流数据执行分析。
我正在使用一个依赖项,其对象的创建时间超过 10 秒,因为它在初始化之前会读取 hdfs 中存在的多个文件。
如果我在 open 方法中初始化对象,我会得到一个超时异常,如果在 sink/flatmap 的构造函数中,我会得到序列化异常。
目前我正在使用静态块在其他类中初始化对象,在主文件中使用 Preconditions.checkNotNull(MGenerator.mGenerator),然后如果在接收器的平面图中使用它就可以工作。
有没有办法在 Flink 的 flatmap 或 sink 中创建一个可能需要超过 10 秒才能初始化的不可序列化依赖对象?
public class DependencyWrap {
static MGenerator mGenerator;
static {
final String configStr = "{}";
final Config config = new Gson().fromJson(config, Config.class);
mGenerator = new MGenerator(config);
}
}
public class MyStreaming {
public static void main(String[] args) throws Exception {
Preconditions.checkNotNull(MGenerator.mGenerator);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
...
input.flatMap(new RichFlatMapFunction<Map<String,Object>,List<String>>() {
@Override
public void open(Configuration parameters) {
}
@Override
public void flatMap(Map<String,Object> value, Collector<List<String>> out) throws Exception {
out.collect(MFVGenerator.mfvGenerator.generateMyResult(value.f0, value.f1));
}
});
}
}
另外,如果我对这个问题有错误,请纠正我。
【问题讨论】:
-
能分享一下异常的堆栈跟踪吗?我会遇到类似的问题(
open()的长时间延迟导致超时),堆栈跟踪帮助我确定了要增加哪个 Flink 配置值。
标签: apache-flink flink-streaming