【发布时间】:2018-05-22 17:31:31
【问题描述】:
我正在使用 Dataflow SDK 2.X Java API (Apache Beam SDK) 将数据写入 mysql。我创建了基于Apache Beam SDK documentation 的管道,以使用数据流将数据写入mysql。它一次插入单行,因为我需要实现批量插入。我在官方文档中找不到启用批量插入模式的任何选项。
想知道是否可以在数据流管道中设置批量插入模式?如果是,请让我知道我需要在下面的代码中更改什么。
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query) {
query.setInt(1, kv.getKey());
query.setString(2, kv.getValue());
}
})
【问题讨论】:
-
我很困惑:您包含的代码 读取 数据,而不是插入:您使用的是 JdbcIO.read()。您的意思是包含不同的代码 sn-p 吗?如果您使用 JdbcIO.write(),它会自动将写入批处理到多达 1000 个元素中(实际上最终可能会更少,具体取决于您的管道结构、运行器、数据到达率等)。跨度>
-
感谢您的回复@jkff。有没有办法更新批量插入的元素数量?
-
目前没有。对您的需求来说是太多还是太少?
-
对我的要求来说太少了。
-
嗯,你的意思是使用更大的值可以显着提高性能?我很好奇你会建议什么价值,以及它使整个管道端到端的速度有多快?您可以通过复制 JdbcIO 并对其进行编辑来尝试。
标签: mysql google-cloud-dataflow apache-beam apache-beam-io