【问题标题】:Spark Group By and with Rank function is running very slowSpark Group By 和 Rank 功能运行速度很慢
【发布时间】:2018-06-14 22:47:48
【问题描述】:

我正在编写一个 spark 应用程序,用于在一个时间范围内查找前 n 个访问的 URL。但是这项工作一直在运行,并且对于一个实例来说,ES 中的389451 记录需要几个小时。我想减少这个时间。

我正在阅读以下 Spark 中的弹性搜索

 val df = sparkSession.read
    .format("org.elasticsearch.spark.sql")
    .load(date + "/" + business)
    .withColumn("ts_str", date_format($"ts", "yyyy-MM-dd HH:mm:ss")).drop("ts").withColumnRenamed("ts_str", "ts")
    .select(selects.head, selects.tail:_*)
    .filter($"ts" === ts)
    .withColumn("url", split($"uri", "\\?")(0)).drop("uri").withColumnRenamed("url", "uri").cache()

在上面的 DF 中,我正在从 ElasticSearch 中读取和过滤。我还从 URI 中删除查询参数。

然后我在做group by

var finalDF = df.groupBy("col1","col2","col3","col4","col5","uri").agg(sum("total_bytes").alias("total_bytes"), sum("total_req").alias("total_req"))

然后我在运行一个窗口函数

val partitionBy = Seq("col1","col2","col3","col4","col5")

val window = Window.partitionBy(partitionBy.head, partitionBy.tail:_*).orderBy(desc("total_req"))


finalDF = finalDF.withColumn("rank", rank.over(window)).where($"rank" <= 5).drop("rank")

然后我将 finalDF 写入 cassandra

finalDF.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "table", "keyspace" -> "keyspace")).mode(SaveMode.Append).save()

我在 ES 集群中有 4 个数据节点,我的 Spark 机器是 16 核 64GB Ram VM。请帮我找出问题所在。

【问题讨论】:

  • 能否发一张SQL查询计划的截图?另外,您是说您有一个运行 Spark 应用程序的虚拟机吗?你是如何启动这个 SparkContext 的(例如,使用本地模式、独立集群、多少驱动程序内存、执行程序内存、每个执行程序的核心数等)?

标签: scala apache-spark apache-spark-sql elasticsearch-spark


【解决方案1】:

在读取后保留数据框可能是个好主意,因为您将在排名函数中使用这么多次。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-06-14
    • 1970-01-01
    • 2021-12-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多