【发布时间】:2020-07-17 15:36:36
【问题描述】:
我正在运行一个没有问题的 Pyspark 进程。该过程的第一步是将特定的 UDF 应用于数据帧。这是函数:
import html2text
class Udfs(object):
def __init__(self):
self.h2t = html2text.HTML2Text()
self.h2t.ignore_links = True
self.h2t.ignore_images = True
def extract_text(self, raw_text):
try:
texto = self.h2t.handle(raw_text)
except:
texto = "PARSE HTML ERROR"
return texto
这是我应用 UDF 的方式:
import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs
udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))
它处理大约 2900 万行和 300GB。问题是某些任务需要太多时间来处理。任务的平均时间为:
其他任务已完成,持续时间超过 1 小时。
但有些任务需要太多时间处理:
该进程在 AWS 中使用 EMR 在具有 100 个节点的集群中运行,每个节点具有 32gb 的 RAM 和 4 个 CPU。还启用了火花推测。
这些任务的问题在哪里? UDF有问题吗? 是线程问题?
【问题讨论】:
-
你的分区数是多少?您是否尝试重新分区或更改 DataFrame 的分区数?也许您的分区不平衡:您在调用 udf 之前是否执行了可能导致分区不平衡的操作?
-
不平衡是什么意思?如何平衡数据框?在运行数据框之前,我已经使用 80000 个分区进行了重新分区。
-
不平衡是指某些分区,我的意思是您的 2900 万行没有在分区之间均匀拆分。你可以找到一些元素here。我认为这是很多分区的方式。如果我是你,我会尝试使用更少的分区。
标签: python apache-spark pyspark bigdata user-defined-functions