【发布时间】:2023-03-20 18:52:01
【问题描述】:
我需要根据时区从utc_timestamp 中提取其日期和时间到两个不同的列中。时区名称由配置常量变量中的id 定义。
Input DF Output DF
+-------------+--+ +-------------+--+----------+----+
|utc_timestamp|id| |utc_timestamp|id|date |hour|
+-------------+--+ +-------------+--+----------+----|
|1608000000782|1 | |1608000000782|1 |2020-12-14|20 |
+-------------+--+ +-------------+--+----------+----+
|1608000240782|2 | |1608000240782|2 |2020-12-15|11 |
+-------------+--+ +-------------+--+----------+----+
我有 pandas_udf,它允许我一次提取一列,我必须创建它两次:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType
import pandas as pd
import pytz
TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}
class TimezoneUdfProvider(object):
def __init__(self):
self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])
def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])
def extract_date(utc_timestamp: int, id: str):
timezone_name = TIMEZONE_LIST[id]
timezone_nw = pytz.timezone(timezone_name)
return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()
def extract_hour(utc_timestamp: int, id: str) -> int:
timezone_name = TIMEZONE_LIST[id]
timezone_nw = pytz.timezone(timezone_name)
return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
def extract_from_utc(df: DataFrame) -> DataFrame:
timezone_udf1 = TimezoneUdfProvider()
df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
timezone_udf2 = TimezoneUdfProvider()
df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
return df_with_hour
有没有更好的方法呢?不需要两次使用同一个 udf 提供程序?
【问题讨论】:
标签: python pyspark user-defined-functions