【问题标题】:How to speed up downloading a CSV file locally with PySpark (databricks)?如何使用 PySpark(databricks)加速本地下载 CSV 文件?
【发布时间】:2019-12-16 05:23:46
【问题描述】:

我们创建了一个 ImageClassifier 来预测某些 Instagram 图像是否属于某个类别。运行此模型效果很好。

#creating deep image feauturizer using the InceptionV3 lib
featurizer = DeepImageFeaturizer(inputCol="image",
                             outputCol="features",
                             modelName="InceptionV3")

#using lr for speed and reliability
lr = LogisticRegression(maxIter=5, regParam=0.03, 
                    elasticNetParam=0.5, labelCol="label")

#define Pipeline
sparkdn = Pipeline(stages=[featurizer, lr])
spark_model = sparkdn.fit(df)

我们与我们的基表(在更高的集群上运行)分开制作。我们需要将 spark_model 预测提取为 csv 文件,然后将其导入另一个笔记本并将其与我们的基表合并。

为此,我们尝试了以下方法

image_final_estimation = spark_model.transform(image_final)
display(image_final_estimation) #since this gives an option in databricks to 
download the csv

image_final_estimation.coalesce(1).write.csv(path = 'imagesPred2.csv') #and then we would be able to read it back in with spark.read.csv

问题是这些操作需要很长时间(可能是由于任务的性质)并且它们会使我们的集群崩溃。我们能够显示我们的结果,但不仅是使用 '.show()',也不是使用 display() 方法。

还有其他方法可以在本地保存此 csv 吗?或者我们如何才能提高这些任务的速度?

请注意,我们使用 Databricks 的社区版。

【问题讨论】:

    标签: pyspark databricks


    【解决方案1】:

    在文件上存储 DataFrame 时,并行写入的一个好方法是定义与该 DataFrame/RDD 相关的适当数量的分区。 在您显示的代码中,您使用的是 coalesce 函数(它基本上将分区数减少到 1,从而减少了并行性的影响)。

    在 Databricks 社区版上,我使用 Databricks (https://docs.databricks.com/getting-started/databricks-datasets.html) 提供的 CSV 数据集尝试了以下测试。这个想法是通过使用一个分区与使用多个分区将数据写入 csv 来测量经过的时间。

    carDF = spark.read.option("header", True).csv("dbfs:/databricks-datasets/Rdatasets/data-001/csv/car/*")
    
    
    print("Total count of Rows {0}".format(carDF.count()))
    print("Original Partitions Number: {0}".format(carDF.rdd.getNumPartitions()))
    
    >>Total count of Rows 39005
    >>Original Partitions Number: 7
    
    
    
    %timeit carDF.write.format("csv").mode("overwrite").save("/tmp/caroriginal")
    
    >>2.79 s ± 180 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    

    因此,到目前为止,使用 7 个分区将数据集写入本地文件需要 2.79 秒

    newCarDF = carDF.coalesce(1)
    print("Total count of Rows {0}".format(newCarDF.count()))
    print("New Partitions Number: {0}".format(newCarDF.rdd.getNumPartitions()))
    
    >>Total count of Rows 39005
    >>New Partitions Number: 1
    
    %timeit newCarDF.write.format("csv").mode("overwrite").save("/tmp/carmodified")
    
    >>4.13 s ± 172 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    

    因此,对于同一个 DataFrame,写入具有一个分区的 csv 需要 4.13 秒。

    总之,在这种情况下,“coalesce(1)”部分正在影响写作性能。 希望这会有所帮助

    【讨论】:

      猜你喜欢
      • 2021-02-12
      • 1970-01-01
      • 2019-01-23
      • 1970-01-01
      • 2018-08-08
      • 2019-03-24
      • 2011-02-02
      • 1970-01-01
      • 2021-02-19
      相关资源
      最近更新 更多