【问题标题】:Flink: how to store state and use in another stream?Flink:如何存储状态并在另一个流中使用?
【发布时间】:2016-12-12 18:15:20
【问题描述】:

我有一个 Flink 用例,我需要从文件中读取信息,存储每一行​​,然后使用此状态过滤另一个流。

我现在可以使用connect 运算符和RichCoFlatMapFunction 完成所有这些工作,但感觉过于复杂。另外,我担心flatMap2 可能会在从文件中加载所有状态之前开始执行:

fileStream
    .connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
    .keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
    .flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
        private transient ValueState<String> storedPartId;
        @Override
        public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
            // store state
            storedPartId.update(partId);
        }

        @Override
        public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
            if (record.getPartId().equals(storedPartId.value())) {
                out.collect(record);
            } else {
                // do nothing
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<String> descriptor =
                    new ValueStateDescriptor<>(
                            "partId", // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            null);
            storedPartId = getRuntimeContext().getState(descriptor);
        }
    });

有没有更好的方法(从 Flink 1.1.3 开始)来完成这种加载状态模式,然后在后续流中使用它?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    您对CoFlatMapFunction 的担忧是正确的。调用flatMap1flatMap2 的顺序无法控制,取决于数据到达的顺序。因此,flatMap2 可能会在flatMap1 读取所有数据之前被调用。

    在 Flink 1.1.3 中,在开始处理流之前读取所有数据的唯一方法是使用 RichFlatMapFunctionopen() 方法中的数据,即您必须手动读取和解析文件。

    这基本上是一种广播连接策略,即操作员的每个并行实例都会这样做。缺点是文件的数据会被复制。好处是您不必随机播放“主”流(无需使用keyBy())。

    【讨论】:

    • 啊,我明白了。所以我可以在open()方法内解析文件,但只要我不使用keyBy()上游,就会只有一个操作符实例。不过,这实际上是一个串行操作,对吗?
    • FlatMap 运算符也可以在不调用keyBy() 的情况下并行运行。您只需指定运算符的并行度。但是,数据将随机分布在并行线程中。 keyBy 会对数据进行哈希分区。如果您有多个 FlatMap 运算符,则每个运算符都会读取文件并保存状态。所以你有多余的 IO 和内存使用,但操作符将并行运行。
    • @FabianHueske 通过这种方式,每次更新文件时,都需要重新启动作业才能获取更新的内容。 flink 中是否有一种方法,在文件状态可用之前,不处理主流记录。基本上我们可以设置元素的处理顺序。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-10-09
    • 1970-01-01
    • 2020-06-16
    • 2019-07-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多