【问题标题】:Streaming data from CloudSql into Dataflow将数据从 CloudSql 流式传输到 Dataflow
【发布时间】:2018-02-14 10:28:34
【问题描述】:

我们目前正在探索如何使用 Apache Beam/Google Dataflow 处理存储在 Google Cloud SQL 数据库 (MySQL) 中的大量数据。

数据库在一个表中存储了大约 200GB 的数据。

我们使用JdbcIO 成功地从数据库中读取了行,但到目前为止,这只有在我们LIMIT 查询的行数时才有可能。否则我们会遇到内存问题。我假设默认情况下 SELECT 查询会尝试将所有结果行加载到内存中。

这方面的惯用方法是什么?批处理 SQL 查询?流式传输结果?

我们尝试调整执行语句的fetch size,但没有太大成功。

这就是我们的 JDBC 读取设置的样子:

JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  statementPreparator = statement => statement.setFetchSize(100),
  rowMapper = result => result.getString(1)
)

到目前为止,我还没有找到任何关于来自 sql 的流的资源。

编辑

我将列出我采用的视图方法,以便其他人可以学到一些东西(例如如何去做)。为了了解更多上下文,所讨论的数据库表的结构确实很糟糕:它有一个包含 JSON 字符串的列,以及 id 列(主键)加上一个 addedmodified 列(TIMESTAMP类型)。在第一种方法时,它没有进一步的索引。该表包含 25 个 mio 行。所以这可能更像是一个数据库问题,而不是 Apache Beam/JDBC 问题。不过:

方法 1(上) - 查询所有内容

基本上是这样的:

val readOptions = JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  rowMapper = result => result.getString(1)
)

context
  .jdbcSelect(readOptions)
  .map(/*...*/)

如果我在查询中添加了LIMIT,这将起作用。但显然很慢。

方法 2 - 键集分页

val queries = List(
  "SELECT data from raw_data LIMIT 5000 OFFSET 0",
  "SELECT data from raw_data LIMIT 5000 OFFSET 5000",
  "SELECT data from raw_data LIMIT 5000 OFFSET 10000"
  // ...
)

context
  .parallelize(queries)
  .map(query => {
      val connection = DriverManager.getConnection(/* */)
      val statement = connection.prepareStatement(query)
      val result = statement.executeQuery()

      makeIterable(result) // <-- creates a Iterator[String]
  })
  .flatten
  .map(/* processing */)

虽然我很快了解到LIMIT _ OFFSET _ 组合也从第一行开始扫描,但效果更好一些。因此,每个后续查询都花费了更长的时间,最终收敛到很长时间。

方法 2.5 - 带排序的键集分页

与上述方法类似,但我们在added 列上创建了一个索引并将查询更新为

SELECT data FROM raw_data ORDER BY added LIMIT 5000 OFFSET x

这加快了速度,但最终查询时间变长了。

方法 3 - 无波束/数据流

val connection = DriverManager.getConnection(/* */)
val statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)

val rs = statement.executeQuery("SELECT data FROM raw_data")

while(rs.next()) {
  writer writeLine rs.getString(1)
}

这会将结果集逐行返回并将行写入文件。所有 25 条 mio 记录运行了大约 2 小时。最后。如果有人能指出如何使用 Beam 实现此解决方案,那就太好了。

顺便说一句:现在我将原始数据作为 CSV 文件使用 Beam 处理变得轻而易举。大约 80GB 的原始数据可以在大约 5 分钟内通过自动缩放等转换为另一种 CSV 格式。

【问题讨论】:

  • 哦,我明白了。使用 LIMIT/OFFSET 进行分页不会给您任何并行性,因为数据库通常无法在不扫描并丢弃前 9 页的情况下为您提供第 10 页结果,因此基本上您的查询正在扫描整个数据库 N 次。您需要按主键的 values 对其进行分区,例如“从添加在 X 和 Y 之间的 raw_data 中选择数据”,您提供的 [X, Y) 对的总范围涵盖“添加”的所有可能值。
  • 另外,您似乎没有使用 JdbcIO.readAll()。我想 scio 并没有明确支持它,但它也不会阻止你使用它,对吧? (您可以对 PCollections 应用任意 Beam 变换,而不仅仅是 scio 有包装器的那些)
  • 我可能可以将 readAll() 与 scio 一起使用。我对 scio 和 Beam/Dataflow 都是新手,这就是为什么我仍然没有完全意识到这些可能性。虽然回头看,我发现最终有效的解决方案(不带 Beam)是我用例中最明显和最优雅的解决方案。感谢所有的提示和解释!

标签: jdbc google-cloud-dataflow apache-beam spotify-scio


【解决方案1】:

似乎MySQL JDBC驱动需要一些特殊的措施来使它不会将整个结果集加载到内存中;例如我能够找到this code 在另一个项目中解决问题。 JdbcIO 需要做同样的事情,或者至少可以配置到足以让用户这样做。我提交了问题https://issues.apache.org/jira/browse/BEAM-3714

同时,作为一种解决方法,您可以使用JdbcIO.readAll() 将您的查询划分为许多较小的查询,例如您可以按一系列 ID 对其进行分区。请注意,它们之间不会强制执行任何事务一致性 - 就 MySQL 而言,它们将是独立的查询。

【讨论】:

  • 感谢您的提示。我会检查这是否能解决问题!
  • 遗憾的是,数据库的结构非常糟糕。即使使用键集分页,查询也会变得非常慢。
  • 你能告诉更多,例如显示您的代码的一些 sn-ps,并说明与转储 CSV 相比的性能如何?您的经验对 JdbcIO 的未来用户非常有价值。
  • 我将编辑我原来的问题并提供更多细节!
  • 谢谢。您所做的还不够——还需要 BEAM-3714 中的所有其他步骤(指定结果集类型等)。或者,您是否尝试过使用 readAll() 并对查询进行分区以便可以并行查询主键的不同范围并且每个范围不包含太多数据的建议? (似乎没有,因为您最终通过 CSV 导出 - 但只是好奇)
【解决方案2】:

我认为 JDBCIO 由于其固有的限制(单个 SELECT)而不能很好地扩展。我不知道来自 MySQL 和 BEAM 的流媒体支持。

您可能可以将数据库转储到数据处理系统更容易处理的地方(例如 csv)。对你有用吗?

【讨论】:

  • JdbcIO 不限于单个选择:您可以使用 readAll() 执行多个查询,从 PCollection 获取参数。
  • 我现在将数据库转储为 CSV,这样处理起来更方便。
猜你喜欢
  • 1970-01-01
  • 2020-06-22
  • 2020-11-12
  • 1970-01-01
  • 2021-08-01
  • 1970-01-01
  • 2023-03-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多