【问题标题】:ON DUPLICATE KEY UPDATE while inserting from pyspark dataframe to an external database table via JDBC在通过 JDBC 从 pyspark 数据帧插入外部数据库表时进行重复键更新
【发布时间】:2026-01-16 12:40:02
【问题描述】:

嗯,我正在使用 PySpark,我有一个 Spark 数据框,我使用它将数据插入到 mysql 表中。

url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"

df.write.jdbc(url=url, table="myTable", mode="append")

我想通过列值和特定数字的总和来更新列值(不在主键中)。

我尝试了不同的模式(追加、覆盖)DataFrameWriter.jdbc() 函数。

我的问题是我们如何更新列值,就像在 mysql 中使用 ON DUPLICATE KEY UPDATE 一样,同时将 pyspark 数据帧数据插入表中。

【问题讨论】:

  • 您找到解决方案了吗?

标签: apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql


【解决方案1】:

一种解决方法是将数据插入临时表,然后使用驱动程序执行的 SQL 语句将其迁移到最终表中。您可以使用与您的数据库提供程序相关的任何有效 SQL 语法。

【讨论】:

    【解决方案2】:

    这在 vanilla pyspark(或 Scala Spark,就此而言)是不可能的,因为您只有 4 种写入模式(来源 https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc):

    append:将此DataFrame的内容追加到现有数据中。

    overwrite:覆盖现有数据。

    ignore:如果数据已经存在,则忽略此操作。

    error 或 errorifexists(默认情况):如果数据已经存在,则抛出异常。

    但是,有几个 hacky 解决方法:

    1. 有一个jython 包允许您直接编写jdbc 查询,因此您可以将代码结构化为INSERT ... ON DUPLICATE KEY UPDATE ...。这是链接:https://pypi.org/project/JayDeBeApi/

    2. 如果您精通 Scala,您可以编写新模式或将org.apache.spark.sql.execution.datasources.jdbcJdbcUtils.scala INSERT INTO 覆盖为INSERT ... ON DUPLICATE KEY UPDATE ...。或者更好的是,使用 MERGE 语句,例如:

    MERGE INTO table-name
    USING table-ref
    AS name
    ON cond
    WHEN NOT MATCHED THEN INSERT 
    WHEN MATCHED THEN UPDATE
    

    取决于您的 SQL 风格。

    1. 使用一个暂存表覆盖,然后在这个暂存环境中编写一个简单的mysql触发器,使其运行INSERT INTO target_table ON DUPLICATE KEY UPDATE

    2. 将您的 Spark DataFrame 移动到 pandas DataFrame 并使用 sqlalchemy 和原始查询在那里编写您的 upsert 查询。

    3. 使用由 Apache Kafka 支持的 Spark Streaming 创建管道,然后使用具有 jdbc upsert 功能的工具(例如 Kafka Connect)将 upsert 直接插入目标表。或者使用 Kafka Connect for upserting 从临时表到目标表。这是一些阅读https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html#idempotent-writes

    【讨论】:

      最近更新 更多