【发布时间】:2018-05-03 13:51:55
【问题描述】:
我正在使用Flink 1.4.1 处理事务事件,并使用 HDFS 存储检查点信息以实现容错。
创建了一个作业来汇总有关客户、星期几和每小时的信息,从而创建了一个配置文件,如下面的代码所示。
val stream = env.addSource(consumer)
val result = stream
.map(openTransaction => {
val transactionDate = openTransaction.get("transactionDate")
val date = if (transactionDate.isTextual)
LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
else
transactionDate.asLong
(openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
})
.keyBy(0)
.window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
.sum(1)
在上面的代码中,流包含三个字段:“transactionDate”、“clientId”和“amount”。我们通过 clientId 和一个滑动窗口来创建一个键控流。我们的数据库中有大约 100.000 个唯一的活动客户端 ID。
运行一段时间后,作业使用的总 RAM 稳定在 36 GB,但 HDFS 中存储的检查点仅使用 3 GB。有没有办法减少作业的 RAM 使用量,可能是通过配置 Flink 的复制因子或使用 RocksDB?
【问题讨论】:
标签: scala hdfs apache-flink ram rocksdb