【问题标题】:Spark Predicate pushdown not working on dateSpark Predicate 下推不适用于日期
【发布时间】:2021-07-04 00:06:11
【问题描述】:

我只是在读取一个 parquet 文件并添加一个过滤器来匹配日期内的所有记录 - 这里是 2021-04-03。该列不应该为空,它应该在给定的日期。

输入表

+---------+-----------+-------------------+
|      lat|        lng|       eventDTLocal|
+---------+-----------+-------------------+
|34.269788| -98.239543|2021-04-03 19:18:58|
|29.780977| -95.749744|2021-04-03 19:33:24|
|48.150173|-122.191903|2021-04-03 17:25:00|
|40.652889| -74.185461|2021-04-03 20:27:55|
|41.747148| -87.799557|2021-04-03 19:52:39|
+---------+-----------+-------------------+

我已尝试将列转换为最新,使用 substring_index 函数进行匹配,但我无法在推送的过滤器中获取它。

以下是我尝试过的代码:

df1 = spark.read.parquet("/Users/aadhithyahari/Downloads/awsfiles/part-00000-bfccec4c-7939-4f85-8fa9-5f1cb34f843a.c000.snappy.parquet") \
        .select( 'lat', 'lng', 'eventDTLocal').filter("TO_DATE(CAST(UNIX_TIMESTAMP(`eventDTLocal`, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP),'yyyy-MM-dd') == CAST('2021-04-03' AS DATE)").explain(extended=True)

过滤器仅在数据过滤器中列出,其他任何地方都没有。我在这里错过了什么?

【问题讨论】:

    标签: sql apache-spark pyspark apache-spark-sql parquet


    【解决方案1】:

    并非所有过滤器都可以下推。一般来说,大多数包含函数调用的过滤器,如 substringunix_timestamp 都不能下推。下推过滤器的完整逻辑在DataSourceStrategy中实现。

    在这种情况下,解决此限制的一种方法是将 eventDTLocal 的值存储为 unix 时间戳而不是 parquet 文件中的字符串,然后按特定毫秒进行过滤。

    #create some test data
    data = [(52.5151923, 13.3824107, 1618760421000), 
            (1.0, 1.0, 1)]
    spark.createDataFrame(data, schema=['lat', 'lng', 'eventDTLocal']) \
        .write.mode("overwrite").parquet("dataWithUnixTime")
    
    #get the first and last millisecond of the day
    #the timezone has probably to be adjusted
    from datetime import datetime, timezone
    dt = datetime(2021, 4, 18)
    start = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
    end = start + 24 * 60 * 60 * 1000 - 1
    
    #run the query
    df = spark.read.parquet("dataWithUnixTime") \
        .filter(f"eventDTLocal >= {start} and eventDTLocal <= {end}")
    

    df的实物图

    
    == Physical Plan ==
    *(1) Project [lat#9, lng#10, eventDTLocal#11L]
    +- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
       +- *(1) ColumnarToRow
          +- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: struct<lat:double,lng:double,eventDTLocal:bigint>
    

    现在包含日期列的推送过滤器 GreaterThanOrEqualLessThanOrEqual

    【讨论】:

      猜你喜欢
      • 2015-12-10
      • 1970-01-01
      • 2017-12-31
      • 1970-01-01
      • 2021-04-16
      • 1970-01-01
      • 2013-07-21
      • 2023-04-01
      • 1970-01-01
      相关资源
      最近更新 更多