【发布时间】:2020-10-23 23:11:40
【问题描述】:
假设我有一个 pyspark 数据框,其中包含多个唯一帐户值,每个帐户值都有唯一数量的条目,如下所示:
+-------------_+--------+--------+
| account| col1| col2 | col3 |
+--------+-----+--------+--------+
| 325235 | 59| -6| 625.64|
| 325235 | 23| -282| 923.47|
| 325235 | 77|-1310.89| 3603.48|
| 245623 | 120| 1.53| 1985.63|
| 245623 | 106| -12| 1985.06|
| 658567 | 84| -12| 194.67|
我想指定一个批次大小,并根据批次大小将多个帐户分配给同一个批次。假设我选择batch size = 2,那么输出应该如下:
+-------------_+--------+--------+--------------+
| account| col1| col2 | col3 | batch_number |
+--------+-----+--------+--------+--------------+
| 325235 | 59| -6| 625.64| 1|
| 325235 | 23| -282| 923.47| 1|
| 325235 | 77|-1310.89| 3603.48| 1|
| 245623 | 120| 1.53| 1985.63| 1|
| 245623 | 106| -12| 1985.06| 1|
| 658567 | 84| -12| 194.67| 2|
然后我可以在batch_number 列上进行分组,并且每批有多个帐户。这是我的工作代码,但是因为我正在执行 toPandas(),所以它太慢了。
# Get unique accounts in source data
accounts = [row.account for row in source_data.select("account").distinct().collect()]
# Find number of batches based. Last batch will have size = remainder
num_batches, remainder = divmod(len(accounts), batchsize)
# Create batch dataframe where a batch number is assigned to each account.
batches = [i for _ in range(batchsize) for i in range(1, int(num_batches) + 1)] + [num_batches + 1 for i in range(remainder)]
batch_df = pd.DataFrame({"account": accounts, "batch_number": batches}, columns=["account", "batch_number"]).set_index("account")
# Add a zero column for batch number to source data which will be populated
source_data = source_data.withColumn("batch_number", lit(0))
# Map batch numbers of accounts back into the source data
source_data_p = source_data.toPandas()
for ind in source_data_p.index:
source_data_p.at[ind, "batch_number"] = batch_df.at[source_data_p.at[ind, "account"], "batch_number"]
# Convert mapped pandas df back to spark df
batched_df = sqlcontext.createDataFrame(source_data_p)
理想情况下,我希望摆脱 toPandas() 调用,并在 pyspark 中进行映射。我看过一些相关的帖子,比如这个:How to batch up items from a PySpark DataFrame,但这不适合我的代码流程,所以我将不得不重写整个项目来实现它。
【问题讨论】:
标签: python pandas pyspark group-by batch-processing