【问题标题】:PySpark The conversion of a datetime2 data type to a datetime data type resulted in an out-of-range valuePySpark 将 datetime2 数据类型转换为 datetime 数据类型导致值超出范围
【发布时间】:2020-09-18 10:37:17
【问题描述】:

好的,所以我正在尝试将数据库从 pyspark 写入 azure sql 数据库,但是遇到了 datetime 超出范围值的问题。我知道日期时间和日期时间的值范围不同。

我的表定义如下:

CREATE TABLE [dbo].[DimTour]
(
    [TourSk] BIGINT NOT NULL,
    [TourBk] INT NOT NULL,
    [TourType] VARCHAR(20) NOT NULL,
    [RequestedDateTimeUTC] DATETIME2 NOT NULL,
    [ScheduledDateTimeUTC] DATETIME2 NOT NULL,
    [TourStatus]  VARCHAR(20) NOT NULL,
    CONSTRAINT [PK_DimTour] PRIMARY KEY CLUSTERED([TourSk] ASC)
);

当我第一次将这个表上传到天蓝色时,日期类型是 Datetime2,从数据源读入我的数据框的数据是 Datetime2 格式。但是,当我尝试将数据帧上传到 sql 数据库时,出现以下错误:“将 datetime2 数据类型转换为 datetime 数据类型导致值超出范围。”然后当我查看我的 sql 数据库表时,类型已从 datetime2 切换到 datetime。我知道这个错误源于 0001 年,但为什么数据一直试图转换为日期时间,我该如何解决这个问题?

我是这样写数据的:

dimTour.write.mode('Overwrite').jdbc(url=jdbcUrl, table='dbo.DimTour', properties=connectionProperties)

【问题讨论】:

    标签: sql pyspark apache-spark-sql azure-sql-database databricks


    【解决方案1】:

    问题是 Spark 默认将时间戳映射到 DATETIME。您可以通过注册一个自定义的org.apache.spark.sql.jdbc.JdbcDialect 来覆盖它,它映射到DATETIME2,如下所示:

    import org.apache.spark.sql.jdbc.JdbcDialect
    import org.apache.spark.sql.jdbc.JdbcType
    import org.apache.spark.sql.types.BooleanType
    import org.apache.spark.sql.types.DataType
    import org.apache.spark.sql.types.MetadataBuilder
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.types.TimestampType
    
    /**
     * Copied from [[https://github.com/apache/spark/blob/v2.4.0/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala MsSqlServerDialect]]
     *
     * This implementation differs from [[org.apache.spark.sql.jdbc.MsSqlServerDialect]] only insofar,
     * as [[TimestampType]] is mapped to `DATETIME2` instead of `DATETIME`. This is preferable, because among other limitations,
     * `DATETIME` cannot represent dates before 1753-01-01.
     */
    object CustomMsSqlServerDialect extends JdbcDialect {
      override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
    
      override def getCatalystType(
                                    sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
        if (typeName.contains("datetimeoffset")) {
          // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
          Option(StringType)
        } else {
          None
        }
      }
    
      override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
        case TimestampType => Some(JdbcType("DATETIME2", java.sql.Types.TIMESTAMP))
        case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR))
        case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT))
        case _ => None
      }
    
      override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
    }
    

    要注册自定义方言,请放置

    import org.apache.spark.sql.jdbc.JdbcDialects
    
    // ...
    JdbcDialects.registerDialect(CustomMsSqlServerDialect)
    // ...
    

    在初始化代码中的某个位置,最好是在创建 Spark 会话之前。

    【讨论】:

    • 这应该被标记为正确答案,因为它解决了问题。
    猜你喜欢
    • 2011-04-07
    • 2011-11-15
    • 2010-11-22
    • 1970-01-01
    相关资源
    最近更新 更多