【发布时间】:2017-03-29 21:04:03
【问题描述】:
我有一个简单的管道,它可以读取文本文件和 mysql 的记录并尝试协调它们,即当数据库中不存在记录时插入记录,使用文件更新数据库中的记录,并执行其他操作更新文件中不存在的数据库中的记录。
在 Spark 中使用 2M 记录运行时出现的问题如下:
我的预感是下面的代码会产生这种不平衡
final TupleTag<FileRecord> fileTag = new TupleTag<>();
final TupleTag<MysqlRecord> mysqlTag = new TupleTag<>();
PCollection<KV<Integer, CoGbkResult>> joinedRawCollection =
KeyedPCollectionTuple.of(fileTag, fileRecords)
.and(mysqlTag, mysqlRecords)
.apply(CoGroupByKey.create());
这是 Spark Executor DAG 可视化
最终,一个工人将耗尽内存。我知道在 Spark 中,可以指定分区器来帮助将工作负载分配给工作人员。但是,我如何在 Beam 中做到这一点?
编辑:
我怀疑 JDBCIo 无法正确分发一个查询,因此我将其拆分为多个 PCollection,然后将它们展平。我从 Mysql 中读取速度更快,但最终遇到了同样的问题。
【问题讨论】:
-
实际上,这种不平衡的原因可能是因为它是来自 MySQL 的读取步骤,具有那么多记录。由于 JDBCIO 可能不会分发一个 SELECT 查询,因此我们看到了这种争用。让我试着把它分开。