【问题标题】:How can I create a custom SQLTransformer in PySpark ML to pivot data如何在 PySpark ML 中创建自定义 SQLTransformer 以透视数据
【发布时间】:2018-08-23 00:55:10
【问题描述】:

我有一个类似于以下结构的数据框:

# Prepare training data
training = spark.createDataFrame([
    (990011, 1001, 01, "Salary", 1000, 0.0),
    (990011, 1002, 02, "POS Purchase", 50, 0.0),
    (990022, 1003, 01, "Cash Withdrawl", 500, 1.0),
    (990022, 1004, 02, "Interest Charge", 35, 1.0)
], ["customer_id", "transaction_id", "week_of_year", "category", "amount", "label"])

我可以使用 PySpark follow 动态地旋转这些数据,这样就不需要每周和类别的硬代码案例语句:

# Attempt 1
tx_pivot = training \
    .withColumn("week_of_year", sf.concat(sf.lit("T"), sf.col("week_of_year"))) \
    .groupBy("customer_id") \
    .pivot("week_of_year") \
    .sum("amount")

tx_pivot.show(20)

我想开发一个自定义 Transformer 来动态透视数据,这样我就可以将这个自定义 Transform 阶段合并到 Spark ML Pipeline 中。不幸的是,目前 Spark/PySpark 中的 SQLTransfomer 只支持 SQL,例如 E.g. 'SELECT ... FROM THIS'(参考https://github.com/apache/spark/blob/master/python/pyspark/ml/feature.py)。

任何有关如何创建自定义转换器以动态透视数据的指导将不胜感激。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-mllib


    【解决方案1】:

    实现一个接受一个数据帧并返回另一个数据帧的自定义转换器非常简单。在你的情况下:

    import pyspark.ml.pipeline.Transformer as Transformer
    
    class PivotTransformer(Transformer):
    
        def _transform(self, data):           
            return data.withColumn("week_of_year",sf.concat(sf.lit("T"),\
                        sf.col("week_of_year"))) \
                       .groupBy("customer_id") \
                       .pivot("week_of_year") \
                       .sum("amount")
    

    【讨论】:

      猜你喜欢
      • 2015-11-26
      • 1970-01-01
      • 1970-01-01
      • 2016-09-13
      • 1970-01-01
      • 2020-05-21
      • 1970-01-01
      • 1970-01-01
      • 2019-04-16
      相关资源
      最近更新 更多