【发布时间】:2018-08-23 02:50:02
【问题描述】:
我有一些相当简单的流代码,通过时间窗口聚合数据。窗口偏大(1 小时,限制为 2 小时),流中的值是来自数百台服务器的指标。我总是内存不足,所以我添加了RocksDBStateBackend。这导致 JVM 出现段错误。接下来我尝试了FsStateBackend。这两个后端都从未将任何数据写入磁盘,而只是创建了一个带有 JobID 的目录。我在独立模式下运行此代码,而不是部署。关于为什么状态后端不写入数据以及为什么即使提供 8GB 堆内存也会耗尽内存的任何想法?
final SingleOutputStreamOperator<Metric> metricStream =
objectStream.map(node -> new Metric(node.get("_ts").asLong(), node.get("_value").asDouble(), node.get("tags"))).name("metric stream");
final WindowedStream<Metric, String, TimeWindow> hourlyMetricStream = metricStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Metric>(Time.hours(2)) { // set how long metrics can come late
@Override
public long extractTimestamp(final Metric metric) {
return metric.get_ts() * 1000; // needs to be in ms since Java epoch
}
})
.keyBy(metric -> metric.getMetricName()) // key the stream so we can run the windowing in parallel
.timeWindow(Time.hours(1)); // setup the time window for the bucket
// create a stream for each type of aggregation
hourlyMetricStream.sum("_value") // we want to sum by the _value
.addSink(new MetricStoreSinkFunction(parameters, "sum"))
.name("hourly sum stream")
.setParallelism(6);
hourlyMetricStream.aggregate(new MeanAggregator())
.addSink(new MetricStoreSinkFunction(parameters, "mean"))
.name("hourly mean stream")
.setParallelism(6);
hourlyMetricStream.aggregate(new ReMedianAggregator())
.addSink(new MetricStoreSinkFunction(parameters, "remedian"))
.name("hourly remedian stream")
.setParallelism(6);
env.execute("flink test");
【问题讨论】:
标签: apache-flink flink-streaming