【问题标题】:Spark last 30 days filter, best approach to improve performanceSpark 过去 30 天过滤器,提高性能的最佳方法
【发布时间】:2021-02-21 07:04:16
【问题描述】:

我有一个记录的 RDD,转换为 DataFrame,我想按日期时间戳过滤并计算最近 30 天的统计数据,按列过滤并计算结果。

在进入 for 循环之前,Spark 应用程序非常快,所以我想知道这是否是一种反模式方法,我怎样才能获得良好的性能,我应该使用 spark 笛卡尔,如何?

//FILTER PROJECT RECORDS
val clientRecordsDF = recordsDF.filter($"rowkey".contains(""+client_id))
client_records_total = clientRecordsDF.count().toLong

这是 clientRecordsDF 的内容

root
 |-- rowkey: string (nullable = true) //CLIENT_ID-RECORD_ID
 |-- record_type: string (nullable = true)
 |-- device: string (nullable = true)
 |-- timestamp: long (nullable = false) // MILLISECOND
 |-- datestring: string (nullable = true) // yyyyMMdd

[1-575e7f80673a0,login,desktop,1465810816424,20160613]
[1-575e95fc34568,login,desktop,1465816572216,20160613]
[1-575ef88324eb7,registration,desktop,1465841795153,20160613]
[1-575efe444d2be,registration,desktop,1465843268317,20160613]
[1-575e6b6f46e26,login,desktop,1465805679292,20160613]
[1-575e960ee340f,login,desktop,1465816590932,20160613]
[1-575f1128670e7,action,mobile-phone,1465848104423,20160613]
[1-575c9a01b67fb,registration,mobile-phone,1465686529750,20160612]
[1-575dcfbb109d2,registration,mobile-phone,1465765819069,20160612]
[1-575dcbcb9021c,registration,desktop,1465764811593,20160612] 
...


the for loop with bad performances

var dayCounter = 0;
for( dayCounter <- 1 to 30){ 
    //LAST 30 DAYS

    // CREATE DAY TIMESTAMP
    var cal = Calendar.getInstance(gmt);

    cal.add(Calendar.DATE, -dayCounter);
    cal.set(Calendar.HOUR_OF_DAY, 0);
    cal.set(Calendar.MINUTE, 0);
    cal.set(Calendar.SECOND, 0);
    cal.set(Calendar.MILLISECOND, 0);
    val calTime=cal.getTime()
    val dayTime = cal.getTimeInMillis()

    cal.set(Calendar.HOUR_OF_DAY, 23);
    cal.set(Calendar.MINUTE, 59);
    cal.set(Calendar.SECOND, 59);
    cal.set(Calendar.MILLISECOND, 999);
    val dayTimeEnd = cal.getTimeInMillis()

    //FILTER PROJECT RECORDS
    val dailyClientRecordsDF = clientRecordsDF.filter(
      $"timestamp" >= dayTime && $"timestamp" <= dayTimeEnd
    )
    val daily_client_records = dailyClientRecordsDF.count().toLong

    println("dayCounter "+dayCounter+" records = "+daily_project_records);

    // perform other filter on dailyClientRecordsDF
    // save daily statistics to hbase

  }
}

【问题讨论】:

  • 你为什么不尝试按日期分组并在你想要的范围内放置一个日期过滤器?
  • 怎么样,可以提供一个基本的例子吗?
  • 让我们用一种简单的方式来表达:永远不要在 DataFrames 或 RDDs 上循环!

标签: performance scala hadoop apache-spark statistics


【解决方案1】:

几乎在所有情况下都应避免创建 UDF。这样做会阻止Catalyst Optimizer 正确处理查询。

改为使用内置 SQL 函数:

(
  spark.read.table("table_1")
  .join(
    spark.read.table("table_2"), 
    "user_id"
  )
  .where("p_eventdate > current_date() - 30")
)

【讨论】:

  • 不起作用:cannot resolve '(current_date() - 30)' due to data type mismatch: differing types in '(current_date() - 30)' (date and int)
【解决方案2】:

这种方法是遵循 SQL 的。 首先,我注册了一个要查询的表。 然后,我需要定义一个 UDF(用户定义函数)来将时间戳转换为日期。 最后,您需要像在 sql 中一样在您想要的日期范围内进行过滤和分组。

    def mk(timestamp: Long): Long = {
            val blockTime: Int = 3600 * 24 // daily
          //  val blockTime: Int = 3600 // hourly
            (timestamp - timestamp % blockTime)
          }

    recordsDF.registerTempTable("client") // define your table
    sqlContext.udf.register("makeDaily", (timestamp: Long) => mk(timestamp)) // register your function

    val res = sqlContext.sql("""select makeDaily(timestamp) as date, count(*) as count 
                                from client 
                                where timestamp between 111111 and 222222 
                                group by makeDaily(timestamp)""").collect()

添加: 比如count all record_type是30天内注册。

sqlContext.sql("select count(*) 
                from client 
                where record_type='registration' and timestamp between 1111 and 2222")

【讨论】:

  • 这是在 for 循环中吗?
  • 没有。你不再需要循环了。因为where timestamp between 111111 and 222222 过滤了您的 30 天。按日期分组让您计算每个日期,因此您只需运行 1 并按日期获取 30 天范围内的所有结果。
  • 这似乎是正确的方法,但是(抱歉)我不知道如何将函数应用于每个组。我想执行其他过滤器计数并保存最近 30 条记录(每天都这样)
  • 我使用了这种方法,定义了我的自定义函数并在 spark-sql 查询中使用了多个计数,以便为每一行提供所有计数器。非常感谢@giaosudau
【解决方案3】:

date_sub(current_date(), 30) 1.5.0 后可用。

【讨论】:

    猜你喜欢
    • 2020-09-28
    • 2017-01-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多