【发布时间】:2021-09-01 02:15:16
【问题描述】:
代码:
df_streaming = spark \
.readStream \
.format("kafka") \
... \
.load() \
.xxx()
df_streaming = df_streaming \
.groupBy(["name", "height"]) \
.apply(cal_feature)
stream_writer = df_streaming \
.writeStream \
.format("console") \
.start()
stream_writer.awaitTermination()
df_streaming 像这样:
name height weight
jack 173 100
tom 175 110
tom 175 115
和cal_feature:
@pandas_udf(FEATURE_SCHEMA, PandasUDFType.GROUPED_MAP)
def cal_feature(df):
feature_df = pd.DataFrame(columns=FEATURE_NAMES)
feature_df["name"] = df["name"].iloc[0]
feature_df["height"] = df["height"].iloc[0]
feature_df["max_weight"] = df["weight"].max()
# other complicated processing
xxx...
return feature_df
我知道agg(functions.max("weight")) 可以得到它,但我还想在cal_feature 中使用其他pandas 函数。
当使用静态数据帧(来自 csv 文件)时,会有输出。 agg(max) 也明白了,是否支持结构化流数据帧?
火花-2.4.5 python-3.7.10
【问题讨论】:
标签: spark-structured-streaming