【问题标题】:Dask: TimeOut Error When Reading Parquet from S3Dask:从 S3 读取 Parquet 时出现超时错误
【发布时间】:2022-02-22 09:20:31
【问题描述】:

我在 EMR 集群上使用 Dask-Yarn 时遇到了一些令人沮丧的问题。我正在尝试从存储在 S3 中的分区拼花文件中读取大约 5M+ 行数据。我在 800 个 Dask 工作人员之间重新分配数据,然后将数据保存到内存中。在这一点上没有问题。然后当我使用下游函数来操作这些数据时,我开始在整个过程的四分之一处遇到TimeOut 错误,这没有意义,因为我认为我已经将这些数据持久化到内存中。有谁可以解决这些timeout 问题。任何帮助将不胜感激。还有为什么它会再次读取镶木地板文件;我已经将它们持久化到内存中了吗?

错误:

ConnectTimeoutError: Connect timeout on endpoint URL: "https://data-files.s3.amazonaws.com/users/rhun/data.parquet/batch%3D0.0/part-00114-811278f0-e6cc-43d4-a38c-2043509029ac.c000.snappy.parquet"

代码示例:

cluster = YarnCluster(environment='doctype.tar.gz', 
                      worker_memory='12GiB', 
                      worker_vcores=8
                     )
client = Client(cluster)
cluster.scale(800)
df = dd.read_parquet('s3://data-files/users/rhun/data_2022-02-18.parquet/batch=0.0/',
                         columns=['filehash',
                                  'sentences',
                                  'div2style'
                                 ],
                         engine='pyarrow')
df = df.repartition(npartitions=5000).persist()

def calc_pdf_features(df):
    
    files_to_download = df['filehash'].tolist()
    
    AWS_BUCKET = "my_data"

    session = boto3.Session()
    client = session.client("s3")
    func = partial(download_one_file, AWS_BUCKET, client)

    res = []
    successful_downloads = []

    # download pdf files concurrently
    with ThreadPoolExecutor(max_workers=32) as executor:
        futures = {
            executor.submit(func, file_to_download): file_to_download for file_to_download in files_to_download
        }
        for future in as_completed(futures):
            if future.exception():
                res.append({'filehash': futures[future],
                            'bullet_count': float(0),
                            'item_count': float(0),
                            'colon_count': float(0),
                            'element_tags': [],
                            'max_element_leng': float(0)})
            else:
                successful_downloads.append(futures[future])
        
    def traverse_pdf(fh):
        doc = fitz.open(fh + '.pdf')
        font_counts, styles = fonts(doc, granularity=False)
        size_tag = font_tags(font_counts, styles)
        elements = headers_para(doc, size_tag)
        res.append({'filehash': fh,
                    'bullet_count': float(bullet_counter_row(elements)),
                    'item_count': float(item_counter_row(elements)),
                    'colon_count': float(colon_counter_row(elements)),
                    'element_tags': header_tags(elements),
                    'max_element_leng': max_first3Elements(elements)
                   })

    # extract features from PDF files concurrently 
    with ThreadPoolExecutor(max_workers=32) as executor:
        futures = {
            executor.submit(traverse_pdf, fh): fh for fh in successful_downloads
        }
        for future in as_completed(futures):
            if future.exception():
                res.append({'filehash': futures[future],
                            'bullet_count': float(0),
                            'item_count': float(0),
                            'colon_count': float(0),
                            'element_tags': [],
                            'max_element_leng': float(0)})
                
    return pd.merge(df, pd.DataFrame(res), on=['filehash'], how='inner')

df = adf.map_partitions(calc_pdf_features, 
                                     meta={'filehash': str,
                                           'sentences': object,
                                           'div2style': object,
                                           'bullet_count': float,
                                           'item_count': float,
                                           'colon_count': float,
                                           'element_tags': object,
                                           'max_element_leng': object
                                          }
                                    )
df.repartition(npartitions=200).to_parquet(
    's3://my-data/DocType_v2/features/batch=0.0/',
    engine='pyarrow')

【问题讨论】:

  • 连接超时似乎与您在dd.read_parquet 中读取的文件不同。您的示例非常复杂,但是您似乎正在从镶木地板的一列中读取文件路径列表,然后使用此文件路径列表来使用 ThreadPoolExecutor 安排其他读取操作,其中一个导致超时?在 SO 上提问时,请始终发布完整的回溯。

标签: python amazon-s3 dask amazon-emr dask-distributed


【解决方案1】:

我有几点可能是一个问题,以及如何解决它。

  • 我认为在 calc_pdf_features 函数中做自己的线程池是错误的。如果您已经将并行处理委托给 Dask - 您不应该这样做。我会尝试让您的每个分区处理单线程,而不是让 Dask 进行调度。
  • 为了调试,我会放一些“非常简单”的东西而不是 calc_pdf_features 并查看一切正常 - 因此您将区分由 Dask / AWS 等引起的问题和超时,因为处理分区需要太多时间。李>

【讨论】:

  • 感谢您的帮助。我尝试了您的建议并将我的函数重构为单线程。昏昏沉沉的工人总是在中途死亡。太令人沮丧了!现在我收到一个错误:KilledWorker: ("('assign-e3e7d3da6b20dc687115364c82bef10d', 81)", <WorkerState 'tcp://172.20.47.98:34019', name: dask.worker_515, status: closed, memory: 0, processing: 2>)
  • 如果你让你的功能变得“轻”,就像只是计算记录一样,它是否工作顺利?
  • 是的。当事情变得沉重时,使用 Dask 通常会导致问题
  • 好的,我们离我们更近了一步。我看到 2 个可能的问题 - 超时(Dask 认为这需要很长时间),或者,某些进程因内存问题而死。让我们找出原因。您能否在时间消耗上也放入“类似的东西”,但内存不足?如果它可以工作 - 我们有内存问题,如果没有 - 我们需要找到配置超时的地方
  • 感谢@David Gruzman - 是的,我可以确认这是内存泄漏。我检查了工人日志。我不知道为什么它会耗尽内存。每个工人都有 16GB 的 RAM。我认为这已经足够了。
【解决方案2】:

如果我正确理解代码,在最大负载下有 800 名工作人员,每个工作人员可能启动 32 个下载进程。这是推测,但这个请求数可能会超过s3 中允许的并发请求,因此一些工作人员最终等待连接的时间过长。

一种解决方法是在超时之前允许更长的等待时间,请参阅this answer。但是,这仍然不理想,因为您将让工人闲置。相反,可以将代码重构为具有单个连接,以避免嵌套并行化,并让dask 处理所有下载和处理。

【讨论】:

    猜你喜欢
    • 2017-06-26
    • 2021-05-13
    • 2023-03-27
    • 2018-05-14
    • 2021-01-11
    • 2018-12-13
    • 1970-01-01
    • 2021-09-05
    • 1970-01-01
    相关资源
    最近更新 更多