【问题标题】:Spark DataFrame InsertIntoJDBC - TableAlreadyExists ExceptionSpark DataFrame InsertIntoJDBC - TableAlreadyExists 异常
【发布时间】:2015-10-02 20:52:06
【问题描述】:

使用 Spark 1.4.0,我正在尝试使用 insertIntoJdbc() 将来自 Spark DataFrame 的数据插入到 MemSQL 数据库中(这应该与与 MySQL 数据库交互完全一样)。但是,我不断收到 Runtime TableAlreadyExists 异常。

首先我像这样创建 MemSQL 表:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);

然后我在 Spark 中创建一个简单的数据框并尝试像这样插入到 MemSQL 中:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)

java.lang.RuntimeException: Table table1 already exists.

【问题讨论】:

    标签: mysql apache-spark spark-dataframe singlestore


    【解决方案1】:

    这个解决方案适用于一般的 JDBC 连接,尽管@wayne 的回答可能是专门针对 memSQL 的更好的解决方案。

    insertIntoJdbc 从 1.4.0 开始似乎已被弃用,使用它实际上会调用 write.jdbc()。

    write() 返回一个 DataFrameWriter 对象。如果要将数据附加到表中,则必须将对象的保存模式更改为 "append"

    上述问题中示例的另一个问题是 DataFrame 架构与目标表的架构不匹配。

    下面的代码给出了 Spark shell 的一个工作示例。我正在使用spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar 启动我的 spark-shell 会话。

    import java.util.Properties
    
    val prop = new Properties() 
    prop.put("user", "root")
    prop.put("password", "")  
    
    val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")   
    val dfWriter = df.write.mode("append") 
    
    dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 
    

    【讨论】:

    • 嗨,Elbow,我使用的是 spark 1.5,即使在说 write.mode("append") 之后我仍然收到 table already exists 异常你想对此发表评论吗?已经有一个名为数据库中的“customer_spark”
    • 嘿@DJElbow,同样在这里,仍然得到“表'table1'已经存在”异常。当 write.mode(SaveMode.Append)。我检查了,当使用“root”用户时,它工作得很好,但是当使用具有 CREATE/INSERT/UPDATE 权限的用户时,我收到了这个错误。
    【解决方案2】:

    insertIntoJDBC 文档实际上是不正确的;他们说该表必须已经存在,但实际上如果存在,它会抛出一个错误,如上所示:

    https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

    我们建议使用我们的 MemSQL Spark 连接器,您可以在此处找到:

    https://github.com/memsql/memsql-spark-connector

    如果您在代码中包含该库并导入 com.memsql.spark.connector._,则可以使用 df.saveToMemSQL(...) 将 DataFrame 保存到 MemSQL。您可以在此处找到我们的连接器的文档:

    http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions

    【讨论】:

    • 非常好。这简化了事情。有没有编译好的 jar 可以在某处下载?找不到一个。
    • 如果您添加 maven.memsql.com 作为解析器,您可以将其作为依赖项包含在您的项目中:github.com/memsql/memsql-spark-connector#using
    【解决方案3】:

    我有同样的问题。将 spark 版本更新到 1.6.2 效果很好

    【讨论】:

      猜你喜欢
      • 2016-10-10
      • 1970-01-01
      • 1970-01-01
      • 2017-02-27
      • 1970-01-01
      • 2017-01-30
      • 1970-01-01
      • 1970-01-01
      • 2015-12-08
      相关资源
      最近更新 更多