TL;DR
df.rdd.collect() 将时间戳列 (UTC) 转换为 pyspark 中的本地时区 (IST)
不,它没有。实际上,您读取的数据帧内的时间戳没有时区。你看到的只是show() 基于会话本地时区的行为。
当您将datetime.datetime 值存储在TimestampType 类型的列中时,时区信息会丢失
如in the docs所述
日期时间类型
- TimestampType:表示包含字段年、月、日、小时、分钟和秒的值以及会话本地时区的值。时间戳值代表一个绝对时间点。
see in code TimestampType 是 python datetime.datetime 的包装器,但它会去除时区并在内部将其存储为 epoch time。
class TimestampType(AtomicType, metaclass=DataTypeSingleton):
"""Timestamp (datetime.datetime) data type.
"""
def needConversion(self):
return True
def toInternal(self, dt):
if dt is not None:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
return int(seconds) * 1000000 + dt.microsecond
def fromInternal(self, ts):
if ts is not None:
# using int to avoid precision loss in float
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
更多示例代码:
from typing import Union
from pyspark.sql.types import TimestampType, StringType
from datetime import datetime
from pyspark.sql import DataFrame, functions as F
def to_str(val: Union[str, datetime]) -> str:
type_str = f'{type(val).__name__}:'
if isinstance(val, str):
return type_str + val
else:
return f'{type_str}{val.isoformat()} tz:{val.tzinfo}'
def print_df_info(df: DataFrame):
df.show(truncate=False)
for row in df.collect():
log('DF :', ','.join([to_str(cell) for cell in row]))
for row in df.rdd.collect():
log('RDD:', ','.join([to_str(cell) for cell in row]))
spark.conf.set("spark.sql.session.timeZone", "UTC")
timestamps = ['2021-04-01 10:00:00 -05:00']
timestamp_data = [{'col_original_str': s} for s in timestamps]
my_df = spark.createDataFrame(timestamp_data)
# 1. col_original_str -> col_to_timestamp (convert to UTC and stored WITHOUT timezone)
my_df = my_df.withColumn('col_to_timestamp', F.to_timestamp(my_df.col_original_str))
# 2. col_to_timestamp -> col_date_format (convert an Epoch time (which has no timezone) to string)
my_df = my_df.withColumn('col_date_format', F.date_format(my_df.col_to_timestamp, "yyyy-MM-dd HH:mm:ss.SSSXXX"))
# This is really confusing.
# 3. col_to_timestamp -> col_to_utc_timestamp (tell pyspark to interpret col_to_timestamp with
# timezone Asia/Kolkata, and convert it to UTC)
my_df = my_df.withColumn('col_reinterpret_tz', F.to_utc_timestamp(my_df.col_to_timestamp, 'Asia/Kolkata'))
my_df.printSchema()
log('#################################################')
log('df with session.timeZone set to UTC')
spark.conf.set("spark.sql.session.timeZone", "UTC")
print_df_info(my_df)
log('#################################################')
log('df with session.timeZone set to Asia/Kolkata')
spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
print_df_info(my_df)
输出中的注释:
-
DF : 和 RDD :(参见 print_df_info() 的日志)具有完全相同的内容。它们是相同数据的不同外观。
- 更改
spark.sql.session.timeZone 对“内部表示”没有影响。查看来自print_df_info() 的日志。
- 更改
spark.sql.session.timeZone 会改变show() 打印timestamp 类型值的方式。
2021-11-08T12:16:22.817 spark.version: 3.0.3
root
|-- col_original_str: string (nullable = true)
|-- col_to_timestamp: timestamp (nullable = true)
|-- col_date_format: string (nullable = true)
|-- col_reinterpret_tz: timestamp (nullable = true)
2021-11-08T13:57:54.243 #################################################
2021-11-08T13:57:54.244 df with session.timeZone set to UTC
+--------------------------+-------------------+------------------------+-------------------+
|col_original_str |col_to_timestamp |col_date_format |col_reinterpret_tz |
+--------------------------+-------------------+------------------------+-------------------+
|2021-04-01 10:00:00 -05:00|2021-04-01 15:00:00|2021-04-01 15:00:00.000Z|2021-04-01 09:30:00|
+--------------------------+-------------------+------------------------+-------------------+
2021-11-08T13:57:54.506 DF : str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None
2021-11-08T13:57:54.569 RDD: str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None
2021-11-08T13:57:54.570 #################################################
2021-11-08T13:57:54.570 df with session.timeZone set to Asia/Kolkata
+--------------------------+-------------------+------------------------+-------------------+
|col_original_str |col_to_timestamp |col_date_format |col_reinterpret_tz |
+--------------------------+-------------------+------------------------+-------------------+
|2021-04-01 10:00:00 -05:00|2021-04-01 20:30:00|2021-04-01 15:00:00.000Z|2021-04-01 15:00:00|
+--------------------------+-------------------+------------------------+-------------------+
2021-11-08T13:57:54.828 DF : str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None
2021-11-08T13:57:54.916 RDD: str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None
一些参考资料: