【发布时间】:2023-03-21 05:10:01
【问题描述】:
我正在阅读文档here,它提供了一个重用对象的用例,如下所示:
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
// Create an instance that we will reuse on every call
private Tuple2<String, Long> result = new Tuple<>();
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Auto-boxing!! A new Long value may be created
result.f1 = changesCount;
// Reuse the same Tuple2 object
collector.collect(result);
}
}
所以每次与其创建一个新的 Tuple 不同,它似乎可以通过利用其可变特性来使用相同的 Tuple 以减少 GC 的压力。它是否适用于所有运算符,我们可以通过collector.collect(...) 调用在管道中改变和传递相同的对象?
通过使用这个想法,我想知道在哪些地方我可以在不破坏代码或引入鬼鬼祟祟的错误的情况下进行这样的优化。再次以 KeySelector 为例,它返回从下面给出的this 答案中获取的元组:
KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer =
streamEmployee.keyBy(
new KeySelector<Employee, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(Employee value) throws Exception {
return Tuple2.of(value.getCountry(), value.getEmployer());
}
}
);
我想知道在这种情况下,我是否可以通过使用不同的输入对其进行变异来重用相同的元组,如下所示。当然,在所有情况下,我都假设并行度大于 1,在实际用例中可能要高得多。
KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer =
streamEmployee.keyBy(
new KeySelector<Employee, Tuple2<String, String>>() {
Tuple2<String, String> tuple = new Tuple2<>();
@Override
public Tuple2<String, String> getKey(Employee value) throws Exception {
tuple.f0 = value.getCountry();
tuple.f1 = value.value.getEmployer();
return tuple;
}
}
);
我不知道,Flink 是否会在管道中的各个阶段之间复制对象,所以我想知道做这样的优化是否安全。我在文档中阅读了有关 enableObjectReuse() 配置的信息,但我不确定我是否真的理解它。实际上,它可能有点 Flink 内部,虽然无法理解 Flink 什么时候管理管道中的数据/对象/记录。是不是我应该先说清楚?
谢谢,
【问题讨论】:
标签: java apache-flink flink-streaming