【问题标题】:How to Speed-Up Writing Dataframe to s3 from EMR PySpark Notebook?如何加快从 EMR PySpark Notebook 向 s3 写入数据帧的速度?
【发布时间】:2021-12-11 05:34:16
【问题描述】:

所以我通过在连接到 EMR 集群的 jupyter notebook 中玩弄 DMOZ 数据集来学习 PySpark。我试图实现的过程如下:

  1. 将包含 s3 公共数据集中文件位置的 csv 加载到 PySpark DataFrame(约 130k 行)中
  2. 使用检索文件内容 (html) 并撕下文本的函数映射 DF
  3. 将输出与原始 DF 作为新列加入
  4. 将加入的 DF 写入 s3(问题:它似乎永远挂起,它不是一项大工作,输出 json 应该只有几场演出)

所有的编写都在一个名为 run_job() 的函数中完成

我让它在具有 10 个 m5.8xlarge 实例的集群上放置了大约 2 个小时,应该足够了 (?)。除了 df.write() 之外,所有其他步骤都可以自行执行。我已经测试过 小得多的子集,它毫无问题地写入 s3,但是当我去做整个文件时,它似乎挂在“0/n 个作业完成”处。

我是 PySpark 和一般分布式计算的新手,所以它可能是我缺少的一个简单的“最佳实践”。 (编辑:也许它在笔记本的配置中?我目前没有使用任何魔法来配置火花,我需要吗?)

代码如下...

import html2text

import boto3
import botocore

import os
import re
import zlib
import gzip

from bs4 import BeautifulSoup as bs
from bs4 import Comment

# from pyspark import SparkContext, SparkConf
# from pyspark.sql import SQLContext, SparkSession
# from pyspark.sql.types import StructType, StructField, StringType, LongType

import logging

def load_index():
        input_file='s3://cc-stuff/uploads/DMOZ_bussineses_ccindex.csv'
        df = spark.read.option("header",True) \
             .csv(input_file)
        #df = df.select('url_surtkey','warc_filename', 'warc_record_offset', 'warc_record_length','content_charset','content_languages','fetch_time','fetch_status','content_mime_type')
        return df
    
def process_warcs(id_,iterator):
        html_textract = html2text.HTML2Text()
        html_textract.ignore_links = True
        html_textract.ignore_images = True

        no_sign_request = botocore.client.Config(signature_version=botocore.UNSIGNED)
        s3client = boto3.client('s3', config=no_sign_request)

        text = None
        s3pattern = re.compile('^s3://([^/]+)/(.+)')
        PREFIX = "s3://commoncrawl/"

        for row in iterator:
            try:

                start_byte = int(row['warc_record_offset'])
                stop_byte = (start_byte + int(row['warc_record_length']))

                s3match = s3pattern.match((PREFIX + row['warc_filename']))
                bucketname = s3match.group(1)
                path = s3match.group(2)

                #print('Bucketname: ',bucketname,'\nPath: ',path)
                resp = s3client.get_object(Bucket=bucketname, Key=path, Range='bytes={}-{}'.format(start_byte, stop_byte))

                content = resp['Body'].read()#.decode()

                data = zlib.decompress(content, wbits = zlib.MAX_WBITS | 16).decode('utf-8',errors='ignore')
                data = data.split('\r\n\r\n',2)[2]

                soup = bs(data,'html.parser')

                for x in soup.findAll(text=lambda text:isinstance(text, Comment)):
                    x.extract()  
                for x in soup.find_all(["head","script","button","form","noscript","style"]):
                    x.decompose()

                text = html_textract.handle(str(soup))

            except Exception as e:
                pass

            yield (id_,text)
            
def run_job(write_out=True):
        df = load_index()
        df2 = df.rdd.repartition(200).mapPartitionsWithIndex(process_warcs).toDF()
        df2 = df2.withColumnRenamed('_1','idx').withColumnRenamed('_2','page_md')
        df = df.join(df2.select('page_md'))
        if write_out:             
            output = "s3://cc-stuff/emr-out/DMOZ_bussineses_ccHTML"
            df.coalesce(4).write.json(output) 
        return df

df = run_job(write_out=True)

【问题讨论】:

  • 你还没有分享当前的botlennext是什么,你想加速什么?当前的基准是什么
  • 瓶颈是当我将最终的 DF 保存到 s3 时,pyspark 似乎只是挂起。我目前没有基准,我让它在各种配置上运行,最大的是 10 个 m5.8xlarge 核心节点,2 小时后我终止了,因为我认为我做错了什么。这是一项相对较小的工作,总文件可能最多只有几场演出。
  • 您检查过输出日志和错误(stdout 和 stderr)吗?
  • 我现在将检查默认日志。我将在异常中添加一些日志记录,并将更新结果。
  • 我检查了日志,但我仍然不知道。在 EMR 上又花了 50 美元,虽然试图让它工作 :-) 。也许它的配置?

标签: python amazon-web-services amazon-s3 pyspark amazon-emr


【解决方案1】:

所以我设法让它工作。我将此归因于以下两个更改中的任何一个。我还更改了硬件配置并选择了更多的小型实例。天哪,当我一整天都处于完全混乱的状态时,我只是喜欢它,而我需要做的就是在保存位置添加一个“/”......

  1. 我在 s3 中的输出文件位置添加了一个尾随“/”

1 旧:

 output = "s3://cc-stuff/emr-out/DMOZ_bussineses_ccHTML"

1 新:

 output = "s3://cc-stuff/emr-out/DMOZ_bussineses_ccHTML/"
  1. 我删除了“run_job()”函数中的“coalesce”,我现在有 200 个输出文件,但它可以工作而且速度非常快(不到 1 分钟)。

2 旧:

 df.coalesce(4).write.json(output) 

2 新:

 df.write.mode('overwrite').json(output)

【讨论】:

    猜你喜欢
    • 2023-04-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-02
    • 1970-01-01
    • 2018-05-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多