【问题标题】:Write dataframe to Teradata table from Spark从 Spark 将数据帧写入 Teradata 表
【发布时间】:2017-03-29 04:12:10
【问题描述】:

到目前为止,我可以使用用于 Spark 的 Teradata jdbc 连接器从 Teradata 读取数据帧。 阅读语法如下:

val df = hc.read.format("jdbc").options(
  Map(
    "url" -> url,
    "dbtable" -> (sel * from tableA) as data,
    "driver" ->   "com.teradata.jdbc.TeraDriver"
  )
).load()

其中 hc = hiveContext,url = teradata 的连接 url

我想将数据框保存到 Teradata 表。我尝试通过将 dbtable 更改为 insert 语句来使用上述语法,

 val df = hc.read.format("jdbc").options(
  Map(
    "url" -> url,
    "dbtable" -> (insert into db.tabA  values (1,2,3)) as data,
    "driver" ->   "com.teradata.jdbc.TeraDriver"
  )
).load()

但是上面的语句给了我一个错误:

Error: Exception in thread "main" java.sql.SQLException: [Teradata Database] [TeraJDBC 15.10.00.22] [Error 3706] [SQLState 42000] Syntax error: expected something between '(' and the 'insert' keyword.

我想在 Spark 中将数据帧保存到 Teradata,最好的方法是什么?

【问题讨论】:

  • SQL 异常是 Teradata 抱怨接收到“(插入...”命令(它不需要括号)。尝试"dbtable" -> "insert into db.tabA values (1,2,3)",但我认为您还有其他东西检查:我不是 Spark 专家,但您必须使用“读取”方法“写入”到数据库中,这看起来很奇怪。
  • 我找到了一个例子 (sparkexpert.com/2015/04/17/…)。在您的示例中,您没有数据框。您首先需要使用一些数据(插入插入的“1,2,3”)创建数据框,然后使用“insertIntoJDBC”方法。
  • 谢谢@Insac。我找到了一种将数据帧写入 Teradata 的方法。我正在使用 ScalikeJDBC 创建与 Teradata 的 JDBC 连接并通过其 api 进行写入。
  • 好!您是否要输入您的解决方案作为答案?这样,遇到相同问题的其他人将能够解决它,并且您可能会收到可以帮助您改进解决方案的 cmets。
  • @Insac 谢谢。也更新了答案。 :)

标签: apache-spark apache-spark-sql teradata spark-dataframe


【解决方案1】:

AFAIK as data 不正确,剩下的在我看来是正确的。

"dbtable" -> (insert into db.tabA  values (1,2,3)) as data,

"dbtable" -> (insert into db.tabA  values (1,2,3)) ,

下面的操作应该没有任何麻烦。

val df = hc.read.format("jdbc").options(
  Map(
    "url" -> url,
    "dbtable" -> (insert into db.tabA  values (1,2,3)),
    "driver" ->   "com.teradata.jdbc.TeraDriver"
  )
).load()

【讨论】:

【解决方案2】:

我能够使用Scalikejdbc 将数据写入 Teradata 表。 我使用批量更新来存储结果。

使用ScalikeJdbc插入批处理行的示例代码:

 DB localTx { implicit session =>
  val batchParams: Seq[Seq[Any]] = (2001 to 3000).map(i => Seq(i, "name" + i))
  withSQL {
    insert.into(Emp).namedValues(column.id -> sqls.?, column.name -> sqls.?)
  }.batch(batchParams: _*).apply()
}

【讨论】:

  • 这是另一种选择,你的意思是,没有办法从spark jdbc中插入行?
  • 我发现这个非常有效,所以继续使用它。也可以通过 spark jdbc 实现,但目前我不知道。
猜你喜欢
  • 2019-06-22
  • 2015-01-02
  • 2019-10-28
  • 2020-07-17
  • 2019-01-05
  • 2018-08-13
  • 2018-09-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多