【发布时间】:2021-12-05 04:27:38
【问题描述】:
我目前在数据块中工作,并且有一个包含 20 多列的增量表。我基本上需要从每一行的 1 列中获取一个值,将其发送到一个返回两个值/列的 api,然后创建另外 26 个以将值合并回原始增量表。所以输入是 28 列,输出是 28 列。目前我的代码如下所示:
from pyspark.sql.types import *
from pyspark.sql import functions as F
import requests, uuid, json
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col,lit
from functools import reduce
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.enabled", "true")
spark.sql('set spark.sql.execution.arrow.pyspark.enabled = true')
spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
spark.conf.set("spark.sql.parquet.compression.codec","gzip")
spark.conf.set("spark.sql.inMemorycolumnarStorage.compressed","true")
spark.conf.set("spark.databricks.optimizer.dynamicFilePruning","true");
output=spark.sql("select * from delta.`table`").cache()
SeriesAppend=[]
for i in output.collect():
#small mapping fix
if i['col1']=='val1':
var0='a'
elif i['col1']=='val2':
var0='b'
elif i['col1']=='val3':
var0='c'
elif i['col1']=='val4':
var0='d'
var0=set([var0])
req_var = set(['a','b','c','d'])
var_list=list(req_var-var0)
#subscription info
headers = {header}
body = [{
'text': i['col2']
}]
if len(i['col2'])<500:
request = requests.post(constructed_url, params=params, headers=headers, json=body)
response = request.json()
dumps=json.dumps(response[0])
loads = json.loads(dumps)
json_rdd = sc.parallelize(loads)
json_df = spark.read.json(json_rdd)
json_df = json_df.withColumn('col1',lit(i['col1']))
json_df = json_df.withColumn('col2',lit(i['col2']))
json_df = json_df.withColumn('col3',lit(i['col3']))
...
SeriesAppend.append(json_df)
else:
pass
Series_output=reduce(DataFrame.unionAll, SeriesAppend)
只有 3 列的 SAMPLE DF:
df = spark.createDataFrame(
[
("a", "cat","owner1"), # create your data here, be consistent in the types.
("b", "dog","owner2"),
("c", "fish","owner3"),
("d", "fox","owner4"),
("e", "rat","owner5"),
],
["col1", "col2", "col3"]) # add your column names here
我真的只需要将响应 + 其他列值写入 delta 表,因此不一定需要数据帧,但还没有找到比上述更快的方法。现在,我可以运行 5 个输入,如果没有 unionAll,它会在 25.3 秒内返回 15 个。加入工会后,变成了 3 分钟。
最终输出如下:
df = spark.createDataFrame(
[
("a", "cat","owner1","MI", 48003), # create your data here, be consistent in the types.
("b", "dog","owner2", "MI", 48003),
("c", "fish","owner3","MI", 48003),
("d", "fox","owner4","MI", 48003),
("e", "rat","owner5","MI", 48003),
],
["col1", "col2", "col3", "col4", "col5"]) # add your column names here
如何在 spark 中加快速度?
【问题讨论】:
-
collect然后for将在驱动程序中运行整个事情,这是不可可扩展且不可并行的。您是否考虑过编写 UDF? -
所以我知道我可以写一个 udf,但我不确定上面想要什么或者我可以在那个 udf 中做些什么来让它运行得更快。你能演示一个例子吗?
-
如果你向我解释你想做什么,什么是样本输入和预期输出,我可以尝试做一个演示。我可以向你保证,使用 UDF(相对于你上面的代码)是绝对可扩展的。使用小数据集可能不会更快,但尝试使用大数据集,您会发现差异
-
所以响应只是另外两列。因此,对于我提供的示例 df,其中 1 行是 col1、col2、col3 ... 输出将是 col1、col2、col3、col4、col5。我们可以用 state 和 zipcode 填充符标记这些列。为了清楚起见,我在上面添加了一个输出示例。
标签: pyspark databricks union-all delta-lake