【问题标题】:Parallelizing a for loop with map and reduce in spark with pyspark使用 map 并行化 for 循环并使用 pyspark 减少火花
【发布时间】:2016-12-30 22:16:04
【问题描述】:

在我的应用程序中,我从 S3 上不同位置的数据创建不同的数据帧,然后尝试将数据帧合并为单个数据帧。现在我正在为此使用 for 循环。但我觉得这可以通过使用 pyspark 中的 map 和 reduce 函数以更有效的方式完成。这是我的代码:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, GroupedData
import pandas as pd
from datetime import datetime


sparkConf = SparkConf().setAppName('myTestApp')
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)

filepath = 's3n://my-s3-bucket/report_date='

date_from = pd.to_datetime('2016-08-01',format='%Y-%m-%d')
date_to = pd.to_datetime('2016-08-22',format='%Y-%m-%d')
datelist = pd.date_range(date_from, date_to)

First = True

#THIS is the for-loop I want to get rid of
for dt in datelist:
    date_string = datetime.strftime(dt, '%Y-%m-%d')
    print('Running the pyspark - Data read for the date - '+date_string)
    df = sqlContext.read.format("com.databricks.spark.csv").options(header = "false", inferschema = "true", delimiter = "\t").load(filepath + date_string + '/*.gz')

    if First:
        First=False
        df_Full = df
    else:
        df_Full = df_Full.unionAll(df)

【问题讨论】:

  • 您对使用 Spark 有限制吗?如果没有,您是否考虑过使用 dask 代替? dask 的设计包括其他好东西,可以轻松实现您想要做的事情
  • 我需要使用 spark,因为之后我将在其中运行 Spark 算法。而且数据量也很大。

标签: python apache-spark pyspark


【解决方案1】:

实际上迭代union,虽然不是最理想的,但并不是这里最大的问题。架构推断引入了更严重的问题 (inferschema = "true")。

它不仅使数据框的创建不懒惰,而且还需要单独的数据扫描来进行推理。如果您事先知道架构,则应将其作为DataFrameReader 的参数提供:

schema = ...

df = sqlContext.read.format("com.databricks.spark.csv").schema(schema)

否则您可以从第一个DataFrame 中提取它。结合经过良好调整的并行性,它应该可以正常工作,但如果您获取的文件数量很大,您还应该考虑比迭代联合更智能的方法。你会在我对Spark union of multiple RDDs 的回答中找到一个例子。它更昂贵,但具有更好的一般性能。

关于你的想法,不可能在分布式数据结构上嵌套操作,所以如果你想在 map 中读取数据,你必须直接使用 S3 客户端而不使用 SQLContext

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-10-17
    • 2019-01-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-18
    相关资源
    最近更新 更多