【问题标题】:df.rdd.collect() converts timestamp column(UTC) to local timezone(IST) in pysparkdf.rdd.collect() 将时间戳列(UTC)转换为 pyspark 中的本地时区(IST)
【发布时间】:2021-12-12 16:37:40
【问题描述】:

spark 从 MySQL 读取一个表,该表有一个存储 UTC 时区值的时间戳列。 Spark 在本地(IST)中配置。 MySQL 存储低于时间戳值。

spark.conf.set("spark.sql.session.timeZone" , "UTC")

df.show(100,False)

使用上述配置后,我可以看到df.show() 的正确记录。稍后 df.rdd.collect() 将这些值转换回 IST 时区。

for row in df.rdd.collect():
    print("row.Mindate ",row.Mindate)
    

row.Mindate 2021-03-02 19:30:31
row.Mindate 2021-04-01 14:05:03
row.Mindate 2021-06-15 11:39:40
row.Mindate 2021-07-07 18:14:17
row.Mindate 2021-08-03 10:48:51
row.Mindate 2021-10-06 10:21:11

spark 数据框和 df.rdd 显示不同的结果集。 即使在"spark.sql.session.timeZone" , "UTC" 之后,它如何将值更改回本地时区。

提前致谢

编辑 1:

df.printSchema()

root
 |-- Mindate: timestamp (nullable = true)
 |-- Maxdate: timestamp (nullable = true)

【问题讨论】:

  • 你能printSchema 发到这里吗?您确定这是实际转换,而不仅仅是打印时的格式(日期-> str)。
  • printSchema()编辑了上面的帖子

标签: apache-spark datetime pyspark


【解决方案1】:

TL;DR

df.rdd.collect() 将时间戳列 (UTC) 转换为 pyspark 中的本地时区 (IST)

不,它没有。实际上,您读取的数据帧内的时间戳没有时区。你看到的只是show() 基于会话本地时区的行为。


当您将datetime.datetime 值存储在TimestampType 类型的列中时,时区信息会丢失

in the docs所述

日期时间类型

  • TimestampType:表示包含字段年、月、日、小时、分钟和秒的值以及会话本地时区的值。时间戳值代表一个绝对时间点。

see in code TimestampType 是 python datetime.datetime 的包装器,但它会去除时区并在内部将其存储为 epoch time

class TimestampType(AtomicType, metaclass=DataTypeSingleton):
    """Timestamp (datetime.datetime) data type.
    """

    def needConversion(self):
        return True

    def toInternal(self, dt):
        if dt is not None:
            seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
                       else time.mktime(dt.timetuple()))
            return int(seconds) * 1000000 + dt.microsecond

    def fromInternal(self, ts):
        if ts is not None:
            # using int to avoid precision loss in float
            return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)

更多示例代码:

from typing import Union
from pyspark.sql.types import TimestampType, StringType
from datetime import datetime
from pyspark.sql import DataFrame, functions as F


def to_str(val: Union[str, datetime]) -> str:
    type_str = f'{type(val).__name__}:'
    if isinstance(val, str):
        return type_str + val
    else:
        return f'{type_str}{val.isoformat()} tz:{val.tzinfo}'


def print_df_info(df: DataFrame):
    df.show(truncate=False)
    for row in df.collect():
        log('DF :', ','.join([to_str(cell) for cell in row]))
    for row in df.rdd.collect():
        log('RDD:', ','.join([to_str(cell) for cell in row]))


spark.conf.set("spark.sql.session.timeZone", "UTC")
timestamps = ['2021-04-01 10:00:00 -05:00']
timestamp_data = [{'col_original_str': s} for s in timestamps]

my_df = spark.createDataFrame(timestamp_data)
# 1. col_original_str -> col_to_timestamp (convert to UTC and stored WITHOUT timezone)
my_df = my_df.withColumn('col_to_timestamp', F.to_timestamp(my_df.col_original_str))
# 2. col_to_timestamp -> col_date_format (convert an Epoch time (which has no timezone) to string)
my_df = my_df.withColumn('col_date_format', F.date_format(my_df.col_to_timestamp, "yyyy-MM-dd HH:mm:ss.SSSXXX"))
# This is really confusing.
# 3. col_to_timestamp -> col_to_utc_timestamp (tell pyspark to interpret col_to_timestamp with
#                                              timezone Asia/Kolkata, and convert it to UTC)
my_df = my_df.withColumn('col_reinterpret_tz', F.to_utc_timestamp(my_df.col_to_timestamp, 'Asia/Kolkata'))

my_df.printSchema()

log('#################################################')
log('df with session.timeZone set to UTC')
spark.conf.set("spark.sql.session.timeZone", "UTC")
print_df_info(my_df)

log('#################################################')
log('df with session.timeZone set to Asia/Kolkata')
spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
print_df_info(my_df)

输出中的注释:

  1. DF :RDD :(参见 print_df_info() 的日志)具有完全相同的内容。它们是相同数据的不同外观。
  2. 更改spark.sql.session.timeZone 对“内部表示”没有影响。查看来自print_df_info() 的日志。
  3. 更改spark.sql.session.timeZone 会改变show() 打印timestamp 类型值的方式。
2021-11-08T12:16:22.817 spark.version: 3.0.3
root
 |-- col_original_str: string (nullable = true)
 |-- col_to_timestamp: timestamp (nullable = true)
 |-- col_date_format: string (nullable = true)
 |-- col_reinterpret_tz: timestamp (nullable = true)

2021-11-08T13:57:54.243 #################################################
2021-11-08T13:57:54.244 df with session.timeZone set to UTC

+--------------------------+-------------------+------------------------+-------------------+
|col_original_str          |col_to_timestamp   |col_date_format         |col_reinterpret_tz |
+--------------------------+-------------------+------------------------+-------------------+
|2021-04-01 10:00:00 -05:00|2021-04-01 15:00:00|2021-04-01 15:00:00.000Z|2021-04-01 09:30:00|
+--------------------------+-------------------+------------------------+-------------------+

2021-11-08T13:57:54.506 DF : str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None
2021-11-08T13:57:54.569 RDD: str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None

2021-11-08T13:57:54.570 #################################################
2021-11-08T13:57:54.570 df with session.timeZone set to Asia/Kolkata

+--------------------------+-------------------+------------------------+-------------------+
|col_original_str          |col_to_timestamp   |col_date_format         |col_reinterpret_tz |
+--------------------------+-------------------+------------------------+-------------------+
|2021-04-01 10:00:00 -05:00|2021-04-01 20:30:00|2021-04-01 15:00:00.000Z|2021-04-01 15:00:00|
+--------------------------+-------------------+------------------------+-------------------+

2021-11-08T13:57:54.828 DF : str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None
2021-11-08T13:57:54.916 RDD: str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None

一些参考资料:

【讨论】:

  • rdd 方法实际上是在强制转换时间——无论spark.sql.session.timeZone 的值如何,所收集行中的datetime.datetime 对象在本地时间都是一个幼稚的日期时间。您可以通过比较 df.toPandas()pd.DataFrame([row.asDict() for row in df.collect()]) 来验证这种不一致
  • @charleyc,1. 不确定 Pandas 是在哪里讨论的,这不是 OP 或我的回应的一部分。 2. 我已经看到并发布了显示存储时间戳时时区被剥离的代码。 3.不知道你的意思是什么“无论spark.sql.session.timeZone的值如何,收集的行中的datetime.datetime对象是本地时间的天真日期时间” - Word“ local" 表示时区。
猜你喜欢
  • 1970-01-01
  • 2019-03-09
  • 2019-08-15
  • 2018-04-17
  • 1970-01-01
  • 2011-02-06
  • 2013-06-30
  • 2019-01-05
  • 2013-03-23
相关资源
最近更新 更多