【问题标题】:structured streaming `apply` has no output结构化流 `apply` 没有输出
【发布时间】:2021-09-01 02:15:16
【问题描述】:

代码:

df_streaming = spark \
    .readStream \
    .format("kafka") \
    ... \
    .load() \
    .xxx()

df_streaming = df_streaming \
    .groupBy(["name", "height"]) \
    .apply(cal_feature)

stream_writer = df_streaming \
    .writeStream \
    .format("console") \
    .start()

stream_writer.awaitTermination()

df_streaming 像这样:

name height weight
jack 173    100
tom  175    110
tom  175    115

cal_feature:

@pandas_udf(FEATURE_SCHEMA, PandasUDFType.GROUPED_MAP)
def cal_feature(df):
    feature_df = pd.DataFrame(columns=FEATURE_NAMES)

    feature_df["name"] = df["name"].iloc[0]
    feature_df["height"] = df["height"].iloc[0]
    feature_df["max_weight"] = df["weight"].max()
    
    # other complicated processing
    xxx...
    
    return feature_df

我知道agg(functions.max("weight")) 可以得到它,但我还想在cal_feature 中使用其他pandas 函数。

当使用静态数据帧(来自 csv 文件)时,会有输出。 agg(max) 也明白了,是否支持结构化流数据帧?

火花-2.4.5 python-3.7.10

【问题讨论】:

    标签: spark-structured-streaming


    【解决方案1】:

    我还没有看到任何在结构化流中的流聚合上使用 Grouped Map Pandas UDF 的成功证明。我最近写了一篇related question,关于结构化流中的 Pandas Grouped Map UDF。我在结构化流中从分组映射 Pandas UDF 获取结果/记录输出没有问题,但是在让它们处理正确的数据集并返回正确的结果(而不是从部分输入返回许多不正确的结果)方面存在许多问题。但是,我的经验仅限于 Python API。如果您成功或了解更多信息,请在此处留下更新。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-08-11
      • 1970-01-01
      • 2020-07-17
      • 2016-11-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-10
      相关资源
      最近更新 更多