【问题标题】:Spark reading Partitioned avro significantly slower than pointing to exact locationSpark读取分区的avro比指向确切位置慢得多
【发布时间】:2020-06-23 10:47:25
【问题描述】:

我正在尝试读取基于年、月和日分区的分区 Avro 数据,这似乎比直接指向路径要慢得多。 在物理计划中,我可以看到分区过滤器正在传递,因此它没有扫描整个目录集,但速度仍然明显慢。

例如像这样读取分区数据

profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/"
 
profitLoss = spark.read.\
    format("com.databricks.spark.avro").\
    option("header", "false").\
    option("inferSchema", "false").load(profitLossPath)
 
profitLoss.createOrReplaceTempView("ProfitLosstt")

df=sqlContext.sql("SELECT * \
                             FROM ProfitLosstt \
                             where Year= " + year + " and Month=" + month_nz + " and Day=" + date_nz )

大约需要 3 分钟

而我使用字符串生成器指向确切位置的地方,在 2 秒内完成

profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/Year=" +year +"/Month=" + month_nz + "/Day=" + date_nz
 
profitLoss = spark.read.\
    format("com.databricks.spark.avro").\
    option("header", "false").\
    option("inferSchema", "false").load(profitLossPath)

 
profitLoss.createOrReplaceTempView("ProfitLosstt")

df=sqlContext.sql("SELECT * \
                             FROM ProfitLosstt "
                              )
                  
display(df)

查看第一个(较慢)的物理计划确实表明分区过滤器已通过

什么可以解释发现阶段需要这么长时间?

任何问题,我可以详细说明。

【问题讨论】:

  • 你能在第一个上做df.explain 并把输出放在这里吗?我怀疑是没有谓词下推
  • 嗨,正如我所提到的,分区过滤器正在传递:这是 Spark UI 的物理计划:== 物理计划 == PartitionFilters:[isnotnull(Year#448), isnotnull(Month #449), isnotnull(Day#450), (Year#448 = 2020), (Month#449 = 6..., PushedFilters: [], ReadSchema: struct
  • 时间都花在了找工作上。很长一段时间,它只是说“运行命令”而没有任何工作。当工作开始时,它很快就完成了。那么,不知何故创建执行计划需要很长时间?
  • 从驱动程序日志中,我发现建立 InMemoryFileIndex 需要很长时间。大约 2.18 分钟。现在看这个:stackoverflow.com/questions/53111210/…
  • 是的,这就是它的解释——要推送过滤器,它需要知道它们可以应用到哪里......

标签: apache-spark pyspark avro azure-databricks


【解决方案1】:

好的,缓慢的原因是因为 InMemoryFileIndex 的构建。

虽然会进行分区修剪,但 Spark 需要了解分区和文件信息,而这正是它需要执行该步骤的地方。 这篇 S.O 帖子详细说明了它:here

因此,当时的想法是创建一个外部表,以便构建此信息,我使用这样的脚本(我使用了内联架构,如果有的话可以使用架构文件)

create external table ProfitLossAvro 


partitioned by (Year int, Month int, Day int)

ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'


Stored As 

 inputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'

 outputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

Location 'abfss://raw@datalakename.dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/'

TBLPROPERTIES (
    'avro.schema.literal'='{
      "name": "Microsoft.Hadoop.Avro.Specifications.ProfitLoss",
      "type": "record",
      "fields": [{ "name":"MK_DatesID_TradeDate", "type":["int", "null"]},{ "name":"MK_UCRAccountsID_AccountID", "type":["int", "null"]},{ "name":"MK_ProductCategoriesID_ProductCategoryID", "type":["int", "null"]},{ "name":"CurrencyCode", "type":["string", "null"]},{ "name":"ProfitLoss", "type":["double", "null"]},{ "name":"MK_PnLAmountTypesID_PLBookingTypeID", "type":["int", "null"]}]
    }');

但是,如果您随后查询此表,您将获得 0 行。这是因为现有分区不会自动添加。因此,您可以使用

msck repair table ProfitLossAvro

每次向数据湖添加数据时,都可以添加分区。 像这样的东西:-

ALTER TABLE ProfitLossAvro ADD PARTITION (Year=2020, Month=6, Day=26)

如果您随后使用如下命令查询数据,它将运行得更快

df=sqlContext.sql("select * \
               from ProfitLossAvro \
               where Year=" + year + " and Month=" + month_nz + " and Day=" + date_nz)

display(df)

【讨论】:

    猜你喜欢
    • 2020-10-18
    • 2018-11-24
    • 2017-04-01
    • 1970-01-01
    • 2015-12-30
    • 1970-01-01
    • 2019-05-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多