【问题标题】:Pyspark create batch number column based on accountPyspark根据账户创建批号列
【发布时间】:2020-10-23 23:11:40
【问题描述】:

假设我有一个 pyspark 数据框,其中包含多个唯一帐户值,每个帐户值都有唯一数量的条目,如下所示:

+-------------_+--------+--------+
| account| col1|  col2  | col3   |
+--------+-----+--------+--------+
| 325235 |   59|      -6|  625.64|
| 325235 |   23|    -282|  923.47|
| 325235 |   77|-1310.89| 3603.48|
| 245623 |  120|    1.53| 1985.63|
| 245623 |  106|     -12| 1985.06|
| 658567 |   84|     -12|  194.67|

我想指定一个批次大小,并根据批次大小将多个帐户分配给同一个批次。假设我选择batch size = 2,那么输出应该如下:

+-------------_+--------+--------+--------------+
| account| col1|  col2  | col3   | batch_number |
+--------+-----+--------+--------+--------------+
| 325235 |   59|      -6|  625.64|             1|
| 325235 |   23|    -282|  923.47|             1|
| 325235 |   77|-1310.89| 3603.48|             1|
| 245623 |  120|    1.53| 1985.63|             1|
| 245623 |  106|     -12| 1985.06|             1|
| 658567 |   84|     -12|  194.67|             2|

然后我可以在batch_number 列上进行分组,并且每批有多个帐户。这是我的工作代码,但是因为我正在执行 toPandas(),所以它太慢了。

# Get unique accounts in source data
accounts = [row.account for row in source_data.select("account").distinct().collect()]
    
# Find number of batches based. Last batch will have size = remainder
num_batches, remainder = divmod(len(accounts), batchsize)
    
# Create batch dataframe where a batch number is assigned to each account.
batches = [i for _ in range(batchsize) for i in range(1, int(num_batches) + 1)] + [num_batches + 1 for i in range(remainder)]
batch_df = pd.DataFrame({"account": accounts, "batch_number": batches}, columns=["account", "batch_number"]).set_index("account")
    
# Add a zero column for batch number to source data which will be populated
source_data = source_data.withColumn("batch_number", lit(0))
    
# Map batch numbers of accounts back into the source data
source_data_p = source_data.toPandas()
for ind in source_data_p.index:
    source_data_p.at[ind, "batch_number"] = batch_df.at[source_data_p.at[ind, "account"], "batch_number"]
        
# Convert mapped pandas df back to spark df
batched_df = sqlcontext.createDataFrame(source_data_p)

理想情况下,我希望摆脱 toPandas() 调用,并在 pyspark 中进行映射。我看过一些相关的帖子,比如这个:How to batch up items from a PySpark DataFrame,但这不适合我的代码流程,所以我将不得不重写整个项目来实现它。

【问题讨论】:

    标签: python pandas pyspark group-by batch-processing


    【解决方案1】:

    据我了解,您可以使用mllib 或任何其他方式使用索引器,然后进行楼层划分:

    import pyspark.sql.functions as F
    from pyspark.ml.feature import StringIndexer
    

    n=2
    
    idx = StringIndexer(inputCol="account",outputCol="batch_number")
    
    (idx.fit(df).transform(df)
        .withColumn("batch_number",F.floor(F.col("batch_number")/n)+1)).show()
    

    +-------+----+--------+-------+------------+
    |account|col1|    col2|   col3|batch_number|
    +-------+----+--------+-------+------------+
    | 325235|  59|    -6.0| 625.64|           1|
    | 325235|  23|  -282.0| 923.47|           1|
    | 325235|  77|-1310.89|3603.48|           1|
    | 245623| 120|    1.53|1985.63|           1|
    | 245623| 106|   -12.0|1985.06|           1|
    | 658567|  84|   -12.0| 194.67|           2|
    +-------+----+--------+-------+------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-10-24
      • 2022-11-29
      • 2020-11-22
      • 2022-12-21
      • 2020-11-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多