【问题标题】:Flink Running out of MemoryFlink 内存不足
【发布时间】:2018-08-23 02:50:02
【问题描述】:

我有一些相当简单的流代码,通过时间窗口聚合数据。窗口偏大(1 小时,限制为 2 小时),流中的值是来自数百台服务器的指标。我总是内存不足,所以我添加了RocksDBStateBackend。这导致 JVM 出现段错误。接下来我尝试了FsStateBackend。这两个后端都从未将任何数据写入磁盘,而只是创建了一个带有 JobID 的目录。我在独立模式下运行此代码,而不是部署。关于为什么状态后端不写入数据以及为什么即使提供 8GB 堆内存也会耗尽内存的任何想法?

    final SingleOutputStreamOperator<Metric> metricStream =
            objectStream.map(node -> new Metric(node.get("_ts").asLong(), node.get("_value").asDouble(), node.get("tags"))).name("metric stream");

    final WindowedStream<Metric, String, TimeWindow> hourlyMetricStream = metricStream
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Metric>(Time.hours(2)) { // set how long metrics can come late
                @Override
                public long extractTimestamp(final Metric metric) {
                    return metric.get_ts() * 1000; // needs to be in ms since Java epoch
                }
            })
            .keyBy(metric -> metric.getMetricName()) // key the stream so we can run the windowing in parallel
            .timeWindow(Time.hours(1)); // setup the time window for the bucket

    // create a stream for each type of aggregation
    hourlyMetricStream.sum("_value") // we want to sum by the _value
    .addSink(new MetricStoreSinkFunction(parameters, "sum"))
    .name("hourly sum stream")
    .setParallelism(6);

    hourlyMetricStream.aggregate(new MeanAggregator())
    .addSink(new MetricStoreSinkFunction(parameters, "mean"))
    .name("hourly mean stream")
    .setParallelism(6);

    hourlyMetricStream.aggregate(new ReMedianAggregator())
    .addSink(new MetricStoreSinkFunction(parameters, "remedian"))
    .name("hourly remedian stream")
    .setParallelism(6);

    env.execute("flink test");

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    很难说为什么你会用完内存,除非你有大量的指标名称(这是我根据你发布的代码能想出的唯一解释)。

    关于磁盘写入,RocksDB 实际上会默认使用一个临时目录来存储它的实际数据库文件。您还可以在配置期间传递显式目录。你可以调用state.setDbStoragePath(someDirectory)

    有点令人困惑的是,FSStateBackend 实际上只在检查点期间写入磁盘,否则它完全是基于堆的。因此,如果您没有启用检查点,您可能在目录中看不到任何内容。这样就可以解释为什么在使用 FSStateBackend 时您仍然可能会耗尽内存。

    假设您确实有 RocksDB(或任何)状态后端工作,您可以通过以下方式启用检查点:

    env.enableCheckpointing(5000); // value is in MS, so however frequently you want to checkpoint
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5000); // this is to help prevent your job from making progress if checkpointing takes a bit. For large state checkpointing can take multiple seconds
    

    【讨论】:

    • 如何添加检查点?另外,我将状态后端构建为:RocksDBStateBackend("file:///home/wspeirs/data", true)。我还需要打电话给setDbStoragePath吗?
    • 是的,您通过的路径是用于检查点。不幸的是,检查点对您的内存问题没有帮助(对于 FSState)。我将更新答案以反映检查点
    • 我可以通过添加以下内容来完成这项工作:stateBackend.setDbStoragePath("file:///home/wspeirs/data");env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
    猜你喜欢
    • 2015-11-03
    • 2019-03-15
    • 1970-01-01
    • 1970-01-01
    • 2014-10-15
    • 2022-01-02
    • 2011-05-21
    • 1970-01-01
    • 2021-12-20
    相关资源
    最近更新 更多