【问题标题】:spark parallelise on iterator with a function使用函数在迭代器上触发并行化
【发布时间】:2019-01-02 17:30:26
【问题描述】:

我有一个迭代器,它对 WARC 文档序列进行操作,并为每个文档生成修改后的令牌列表:

class MyCorpus(object):
def __init__(self, warc_file_instance):
    self.warc_file = warc_file_instance
def clean_text(self, html):
    soup = BeautifulSoup(html) # create a new bs4 object from the html data loaded
    for script in soup(["script", "style"]): # remove all javascript and stylesheet code
        script.extract()
    # get text
    text = soup.get_text()
    # break into lines and remove leading and trailing space on each
    lines = (line.strip() for line in text.splitlines())
    # break multi-headlines into a line each
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    # drop blank lines
    text = '\n'.join(chunk for chunk in chunks if chunk)
    return text
def __iter__(self):
    for r in self.warc_file:
        try:
            w_trec_id = r['WARC-TREC-ID']
            print w_trec_id
        except KeyError:
            pass
        try:
            text = self.clean_text(re.compile('Content-Length: \d+').split(r.payload)[1])
            alnum_text = re.sub('[^A-Za-z0-9 ]+', ' ', text)
            yield list(set(alnum_text.encode('utf-8').lower().split()))
        except:
            print 'An error occurred'

现在我应用 apache spark paraellize 来进一步应用所需的地图功能:

warc_file = warc.open('/Users/akshanshgupta/Workspace/00.warc')
documents = MyCorpus(warc_file) 
x = sc.parallelize(documents, 20)
data_flat_map = x.flatMap(lambda xs: [(x, 1) for x in xs])
sorted_map = data_flat_map.sortByKey()
counts = sorted_map.reduceByKey(add)
print(counts.max(lambda x: x[1]))

我有以下疑问:

  1. 这是实现这一目标的最佳方法还是有更简单的方法?
  2. 当我并行化迭代器时,实际处理是否并行发生?还是顺序的吗?
  3. 如果我有多个文件怎么办?如何将其扩展到非常大的语料库,比如 TB?

【问题讨论】:

  • x = sc.parallelize(documents, 20) 确保您的 RDD 分区数等于集群中的核心数,这样所有分区将并行处理并且资源也被平等使用。此外,如果您希望设置影响每一行的全局参数,那么您可以使用广播变量。
  • 出于兴趣,从答案中受益?

标签: apache-spark pyspark warc


【解决方案1】:

更多来自 Scala 上下文,但是:

  1. 我的一个疑问是在 reduceByKey 之前执行 sortByKey。
  2. 如果使用map、foreachPartition、Dataframe Writer等或通过sc和sparksession读取,则处理是并行的,Spark范式通常适用于非顺序依赖算法。 mapPartitions 和其他通常用于提高性能的 API。该函数应该是我认为的 mapPartitions 的一部分,或者与 map 一起使用或在 map 闭包中使用。注意可序列化的问题,请参阅:

  3. 更多的计算机资源允许以更好的性能和吞吐量进行更多的扩展。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-11-25
    • 1970-01-01
    • 2014-07-08
    • 2015-06-15
    • 2020-03-24
    • 1970-01-01
    • 1970-01-01
    • 2018-01-19
    相关资源
    最近更新 更多