【问题标题】:Window Overload method cannot resolve in spark structured streaming-scala窗口重载方法无法在 spark 结构化流scala 中解决
【发布时间】:2020-12-15 10:48:24
【问题描述】:

以下代码在 spark scala 结构化流中引发过载错误。

错误:

Cannot resolve overloaded method window

Code
package Stream
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.streaming.Trigger
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.sql.functions.window




object SparkRestApi {
  def main(args: Array[String]): Unit = {

    val logger = Logger.getLogger("Datapipeline")
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .appName("StreamTest")
      .config("spark.driver.memory", "2g")
      .master("local[*]")
      //.enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val userSchema = new StructType()
      .add("id", "string")
      .add("Faulttime", "timestamp")
      .add("name", "string")
      .add("Parentgroup", "string")
      .add("childgroup", "string")
      .add("MountStyle", "string")


val JSONDF = spark
      .readStream
      .option("header",true)
      .option("sep", ",")
      .schema(userSchema)      // Specify schema of the csv files
      .json("D:/TEST")
     

val windowColumn = window($"timestamp", "10 minutes", "5 minutes")

    val df2 = JSONDF.withWatermark("timestamp", "1 minutes")
    .groupBy("Parentgroup","childgroup","MountStyle",window("timestamp", "5 minutes", "1 minutes"))
      .agg(countDistinct("id"))

 df2.
      writeStream
      .outputMode("Append")
      .format("csv")
      .option("checkpointLocation", "D:/TEST/chkdir")
      .option("path", "D:/TEST/OutDir")
      .option("truncate",false)
      .start()
      .awaitTermination()

    spark.stop()


  }

}

非常感谢所有宝贵的建议。 即使添加了所有库,这也会引发错误.......................... ..................................................... ..................................................... ..................................................... ..................................................... ..................................................... .....................

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    来自手册的示例:

    val windowedCounts = words
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            window($"timestamp", "10 minutes", "5 minutes"),
            $"word")
        .count()
    

    尝试将您的窗口条款放在前面,我会冒险猜测。并在示例中使用 $ 作为字段名称。

    【讨论】:

    • 谢谢...但我想做的是在结构化的火花流中根据列组计算不同的计数。像这样的东西。 val df2 = JSONDF.withWatermark("timestamp", "1 minutes") .groupBy("Parentgroup","childgroup","MountingType",window("timestamp", "5 minutes", "1 minutes")) .agg (countDistinct("id"))............不幸的是发生了错误。如果可能,请分享您的评论。
    • 听起来是个不同的问题
    • 所以你得到了答案然后这个?
    • 尝试了很多.....但无法修复它。如果可能,请分享您的评论。
    【解决方案2】:

    val JSONDF = explodedf.withWatermark("timestamp", "1 minutes")

    val aggDF = JSONDF.groupBy(functions.window(JSONDF.col("timestamp"), "30 seconds", "30 seconds"),JSONDF.col("jsonData.name")) .avg("jsonData.price").alias("AveragePrice")

    试试这个,稍后谢谢我

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-12
      • 2017-10-09
      • 2020-10-27
      • 2021-11-11
      • 1970-01-01
      相关资源
      最近更新 更多