【问题标题】:PySpark Working with Delta tables - For Loop Optimization with UnionPySpark 使用 Delta 表 - 使用 Union 进行循环优化
【发布时间】:2021-12-05 04:27:38
【问题描述】:

我目前在数据块中工作,并且有一个包含 20 多列的增量表。我基本上需要从每一行的 1 列中获取一个值,将其发送到一个返回两个值/列的 api,然后创建另外 26 个以将值合并回原始增量表。所以输入是 28 列,输出是 28 列。目前我的代码如下所示:

from pyspark.sql.types import *
from pyspark.sql import functions as F
import requests, uuid, json
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col,lit
from functools import reduce

spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.enabled", "true")
spark.sql('set spark.sql.execution.arrow.pyspark.enabled = true')
spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
spark.conf.set("spark.sql.parquet.compression.codec","gzip")
spark.conf.set("spark.sql.inMemorycolumnarStorage.compressed","true")
spark.conf.set("spark.databricks.optimizer.dynamicFilePruning","true");

output=spark.sql("select * from delta.`table`").cache()

SeriesAppend=[]

for i in output.collect():
    #small mapping fix
    if i['col1']=='val1':
      var0='a'
    elif i['col1']=='val2':
      var0='b'
    elif i['col1']=='val3':
      var0='c'
    elif i['col1']=='val4':
      var0='d'

    var0=set([var0])
    req_var = set(['a','b','c','d'])
    var_list=list(req_var-var0)

    #subscription info

    headers = {header}

    body = [{
      'text': i['col2']
    }]
    
    if len(i['col2'])<500:
      request = requests.post(constructed_url, params=params, headers=headers, json=body)
      response = request.json()
      dumps=json.dumps(response[0])
      loads = json.loads(dumps)
      json_rdd = sc.parallelize(loads)
      json_df = spark.read.json(json_rdd)
      json_df = json_df.withColumn('col1',lit(i['col1']))
      json_df = json_df.withColumn('col2',lit(i['col2']))
      json_df = json_df.withColumn('col3',lit(i['col3']))
      ...
      SeriesAppend.append(json_df)
    
    else:
      pass

Series_output=reduce(DataFrame.unionAll, SeriesAppend)

只有 3 列的 SAMPLE DF:

df = spark.createDataFrame(
    [
        ("a", "cat","owner1"),  # create your data here, be consistent in the types.
        ("b", "dog","owner2"),
        ("c", "fish","owner3"),
        ("d", "fox","owner4"),
        ("e", "rat","owner5"),
    ],
    ["col1", "col2", "col3"])  # add your column names here

我真的只需要将响应 + 其他列值写入 delta 表,因此不一定需要数据帧,但还没有找到比上述更快的方法。现在,我可以运行 5 个输入,如果没有 unionAll,它会在 25.3 秒内返回 15 个。加入工会后,变成了 3 分钟。

最终输出如下:

df = spark.createDataFrame(
    [
        ("a", "cat","owner1","MI", 48003),  # create your data here, be consistent in the types.
        ("b", "dog","owner2", "MI", 48003),
        ("c", "fish","owner3","MI", 48003),
        ("d", "fox","owner4","MI", 48003),
        ("e", "rat","owner5","MI", 48003),
    ],
    ["col1", "col2", "col3", "col4", "col5"])  # add your column names here

如何在 spark 中加快速度?

【问题讨论】:

  • collect 然后for 将在驱动程序中运行整个事情,这是不可可扩展且不可并行的。您是否考虑过编写 UDF?
  • 所以我知道我可以写一个 udf,但我不确定上面想要什么或者我可以在那个 udf 中做些什么来让它运行得更快。你能演示一个例子吗?
  • 如果你向我解释你想做什么,什么是样本输入和预期输出,我可以尝试做一个演示。我可以向你保证,使用 UDF(相对于你上面的代码)是绝对可扩展的。使用小数据集可能不会更快,但尝试使用大数据集,您会发现差异
  • 所以响应只是另外两列。因此,对于我提供的示例 df,其中 1 行是 col1、col2、col3 ... 输出将是 col1、col2、col3、col4、col5。我们可以用 state 和 zipcode 填充符标记这些列。为了清楚起见,我在上面添加了一个输出示例。

标签: pyspark databricks union-all delta-lake


【解决方案1】:

正如我的 cmets 中提到的,您应该使用 UDF 将更多工作负载分配给工作人员,而不是 collect,并让一台机器(驱动程序)来运行它。这是一种错误的方法,而且不可扩展。

# This is your main function, pure Python and you can unittest it in any way you want.
# The most important about this function is:
# - everything must be encapsulated inside the function, no global variable works here
def req(col1, col2):
    if col1 == 'val1':
        var0 = 'a'
    elif col1 == 'val2':
        var0 = 'b'
    elif col1 == 'val3':
        var0 = 'c'
    elif col1 == 'val4':
        var0 = 'd'
    
    var0=set([var0])
    req_var = set(['a','b','c','d'])
    var_list = list(req_var - var0)
    
    #subscription info

    headers = {header} # !!! `header` must available **inside** this function, global won't work

    body = [{
      'text': col2
    }]
    
    if len(col2) < 500:
        # !!! same as `header`, `constructed_url` must available **inside** this function, global won't work
        request = requests.post(constructed_url, params=params, headers=headers, json=body)
        response = request.json()
        return (response.col4, response.col5)
    else:
        return None

# Now you wrap the function above into a Spark UDF.
# I'm using only 2 columns here as input, but you can use as many columns as you wish.
# Same as output, I'm using only a tuple with 2 elements, you can make it as many items as you wish.
df.withColumn('temp', F.udf(req, T.ArrayType(T.StringType()))('col1', 'col2')).show()

# Output
# +----+----+------+------------------+
# |col1|col2|  col3|              temp|
# +----+----+------+------------------+
# |   a| cat|owner1|[foo_cat, bar_cat]|
# |   b| dog|owner2|[foo_dog, bar_dog]|
# |   c|fish|owner3|              null|
# |   d| fox|owner4|              null|
# |   e| rat|owner5|              null|
# +----+----+------+------------------+

# Now all you have to do is extract the tuple and assign to separate columns
# (and delete temp column to cleanup)
(df
    .withColumn('col4', F.col('temp')[0])
    .withColumn('col5', F.col('temp')[1])
    .drop('temp')
    .show()
)

# Output
# +----+----+------+-------+-------+
# |col1|col2|  col3|   col4|   col5|
# +----+----+------+-------+-------+
# |   a| cat|owner1|foo_cat|bar_cat|
# |   b| dog|owner2|foo_dog|bar_dog|
# |   c|fish|owner3|   null|   null|
# |   d| fox|owner4|   null|   null|
# |   e| rat|owner5|   null|   null|
# +----+----+------+-------+-------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-06-14
    • 2019-10-15
    • 2021-05-05
    • 1970-01-01
    • 2022-08-18
    • 2012-11-05
    • 2016-10-24
    相关资源
    最近更新 更多