【问题标题】:Object reuse - mutating same object - in Flink operators对象重用 - 改变相同的对象 - 在 Flink 运算符中
【发布时间】:2023-03-21 05:10:01
【问题描述】:

我正在阅读文档here,它提供了一个重用对象的用例,如下所示:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create an instance that we will reuse on every call
        private Tuple2<String, Long> result = new Tuple<>();
    
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Auto-boxing!! A new Long value may be created
            result.f1 = changesCount;
            
            // Reuse the same Tuple2 object
            collector.collect(result);
        }
    }

所以每次与其创建一个新的 Tuple 不同,它似乎可以通过利用其可变特性来使用相同的 Tuple 以减少 GC 的压力。它是否适用于所有运算符,我们可以通过collector.collect(...) 调用在管道中改变和传递相同的对象?

通过使用这个想法,我想知道在哪些地方我可以在不破坏代码或引入鬼鬼祟祟的错误的情况下进行这样的优化。再次以 KeySelector 为例,它返回从下面给出的this 答案中获取的元组:

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        return Tuple2.of(value.getCountry(), value.getEmployer());
      }
    }
  );

我想知道在这种情况下,我是否可以通过使用不同的输入对其进行变异来重用相同的元组,如下所示。当然,在所有情况下,我都假设并行度大于 1,在实际用例中可能要高得多。

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {
      
      Tuple2<String, String> tuple = new Tuple2<>();

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        tuple.f0 = value.getCountry();
        tuple.f1 = value.value.getEmployer();
        return tuple;
      }
    }
  );

我不知道,Flink 是否会在管道中的各个阶段之间复制对象,所以我想知道做这样的优化是否安全。我在文档中阅读了有关 enableObjectReuse() 配置的信息,但我不确定我是否真的理解它。实际上,它可能有点 Flink 内部,虽然无法理解 Flink 什么时候管理管道中的数据/对象/记录。是不是我应该先说清楚?

谢谢,

【问题讨论】:

    标签: java apache-flink flink-streaming


    【解决方案1】:

    查看 Dave Anderson 对Flink, rule of using 'object reuse mode'的回答

    基本上你不记得跨函数调用的输入对象引用或 修改输入对象。因此,在上述KeySelector 的情况下,您正在修改您创建的对象,而不是输入对象。

    【讨论】:

    • 你是说case对象重用模式是ON吗?
    • 对象重用模式不适用于键选择器,不能安全地重用输出对象。我很确定这会导致 HashMapStateBackend 出现奇怪的问题和错误。
    【解决方案2】:

    这是在KeySelector 中的一种重用,是安全的。 keyBy 不是运算符,并且关于运算符链中对象重用的通常规则(我介绍过here)不适用。

    【讨论】:

    • 感谢大卫的回答。如果我没有误解页面上的 ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/… keyBy 列在运算符下。所以所有的转换都不是运算符,而只是 DataStream 上的方法?
    • 我会说文档中的分类具有误导性。 keyBy 不是转换或运算符。它是对两个运算符如何连接的声明性描述(在某些方面类似于再平衡所扮演的角色)。但鉴于键与键组和键控状态的关系,keyBy 扮演着非常独特的角色。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-10-11
    • 2013-05-20
    • 2011-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多