【发布时间】:2022-02-22 09:20:31
【问题描述】:
我在 EMR 集群上使用 Dask-Yarn 时遇到了一些令人沮丧的问题。我正在尝试从存储在 S3 中的分区拼花文件中读取大约 5M+ 行数据。我在 800 个 Dask 工作人员之间重新分配数据,然后将数据保存到内存中。在这一点上没有问题。然后当我使用下游函数来操作这些数据时,我开始在整个过程的四分之一处遇到TimeOut 错误,这没有意义,因为我认为我已经将这些数据持久化到内存中。有谁可以解决这些timeout 问题。任何帮助将不胜感激。还有为什么它会再次读取镶木地板文件;我已经将它们持久化到内存中了吗?
错误:
ConnectTimeoutError: Connect timeout on endpoint URL: "https://data-files.s3.amazonaws.com/users/rhun/data.parquet/batch%3D0.0/part-00114-811278f0-e6cc-43d4-a38c-2043509029ac.c000.snappy.parquet"
代码示例:
cluster = YarnCluster(environment='doctype.tar.gz',
worker_memory='12GiB',
worker_vcores=8
)
client = Client(cluster)
cluster.scale(800)
df = dd.read_parquet('s3://data-files/users/rhun/data_2022-02-18.parquet/batch=0.0/',
columns=['filehash',
'sentences',
'div2style'
],
engine='pyarrow')
df = df.repartition(npartitions=5000).persist()
def calc_pdf_features(df):
files_to_download = df['filehash'].tolist()
AWS_BUCKET = "my_data"
session = boto3.Session()
client = session.client("s3")
func = partial(download_one_file, AWS_BUCKET, client)
res = []
successful_downloads = []
# download pdf files concurrently
with ThreadPoolExecutor(max_workers=32) as executor:
futures = {
executor.submit(func, file_to_download): file_to_download for file_to_download in files_to_download
}
for future in as_completed(futures):
if future.exception():
res.append({'filehash': futures[future],
'bullet_count': float(0),
'item_count': float(0),
'colon_count': float(0),
'element_tags': [],
'max_element_leng': float(0)})
else:
successful_downloads.append(futures[future])
def traverse_pdf(fh):
doc = fitz.open(fh + '.pdf')
font_counts, styles = fonts(doc, granularity=False)
size_tag = font_tags(font_counts, styles)
elements = headers_para(doc, size_tag)
res.append({'filehash': fh,
'bullet_count': float(bullet_counter_row(elements)),
'item_count': float(item_counter_row(elements)),
'colon_count': float(colon_counter_row(elements)),
'element_tags': header_tags(elements),
'max_element_leng': max_first3Elements(elements)
})
# extract features from PDF files concurrently
with ThreadPoolExecutor(max_workers=32) as executor:
futures = {
executor.submit(traverse_pdf, fh): fh for fh in successful_downloads
}
for future in as_completed(futures):
if future.exception():
res.append({'filehash': futures[future],
'bullet_count': float(0),
'item_count': float(0),
'colon_count': float(0),
'element_tags': [],
'max_element_leng': float(0)})
return pd.merge(df, pd.DataFrame(res), on=['filehash'], how='inner')
df = adf.map_partitions(calc_pdf_features,
meta={'filehash': str,
'sentences': object,
'div2style': object,
'bullet_count': float,
'item_count': float,
'colon_count': float,
'element_tags': object,
'max_element_leng': object
}
)
df.repartition(npartitions=200).to_parquet(
's3://my-data/DocType_v2/features/batch=0.0/',
engine='pyarrow')
【问题讨论】:
-
连接超时似乎与您在
dd.read_parquet中读取的文件不同。您的示例非常复杂,但是您似乎正在从镶木地板的一列中读取文件路径列表,然后使用此文件路径列表来使用 ThreadPoolExecutor 安排其他读取操作,其中一个导致超时?在 SO 上提问时,请始终发布完整的回溯。
标签: python amazon-s3 dask amazon-emr dask-distributed