【问题标题】:Can´t find "window" function in Spark Structured Streaming在 Spark Structured Streaming 中找不到“窗口”功能
【发布时间】:2018-03-17 03:56:14
【问题描述】:

我在 Spark Structured Streaming 中编写了一个小示例,我正在尝试处理 netstatcommand 的输出,但不知道如何调用 windowfunction。

这些是我的 build.sbt 的相关行:

scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"

libraryDependencies ++= {

  val sparkVer = "2.3.0"
  Seq(
    "org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
    "org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
    "org.apache.spark" %% "spark-hive" % sparkVer % "provided",
  )
}

还有代码:

case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)

def convertToNetEntry(x: String): NetEntry = {
    // tcp        0      0 eselivpi14:icl-twobase1 eselivpi149.int.e:48442 TIME_WAIT
   val array = x.replaceAll("\\s+"," ").split(" ").slice(3,6)
   NetEntry(java.sql.Timestamp.valueOf(LocalDateTime.now()), array(0),array(1),array(2))
}

def main(args: Array[String]) {

    // Initialize spark context
    val spark: SparkSession = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    val lines = spark.readStream
    .format("socket")
    .option("host", args(0))
    .option("port", args(1).toInt)
    .load()

    import spark.implicits._
    val df = lines.as[String].map(x => convertToNetEntry(x))

    val wordsArr: Dataset[NetEntry] = df.as[NetEntry]
    wordsArr.printSchema()

    // Never get past this point
    val windowColumn = window($"timestamp", "10 minutes", "5 minutes")

    val windowedCounts = wordsArr.groupBy( windowColumn, $"targetHost").count()

    val query = windowedCounts.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()
}

我使用 Spark 2.1、2,2 和 2.3 时的结果相同。真正奇怪的是,我有一个 Spark 集群,我登录 Spark Shell 并复制所有行……它可以工作!知道我做错了什么吗?

编译时的错误:

[error] C:\code_legacy\edos-dp-mediation-spark-consumer\src\main\scala\com\ericsson\streaming\structured\StructuredStreamingMain.scala:39: not found: value window
[error]     val windowColumn = window($"timestamp", "10 minutes", "5 minutes")
[error]                        ^
[warn] 5 warnings found
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 19 s, completed 16-mar-2018 20:13:40

更新:为了让事情变得更奇怪,我检查了 API 文档,但在这里也找不到有效的参考: https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.SparkSession$implicits$

【问题讨论】:

    标签: spark-streaming spark-structured-streaming


    【解决方案1】:

    需要导入window函数进行编译,spark-shell中已经导入。

    添加此导入语句:

    import org.apache.spark.sql.functions.window
    

    【讨论】:

    • 这解决了我的问题!然而,这在文档中找不到,我们在这里讨论的是“Hello World”示例......
    • 您从哪个来源遵循此示例?这里的结构化流编程指南:spark.apache.org/docs/latest/… 确实有这个导入语句,它加载所有的 sql 函数:import org.apache.spark.sql.functions._
    猜你喜欢
    • 2019-10-21
    • 1970-01-01
    • 2018-08-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-07
    • 2018-08-30
    • 2018-05-12
    相关资源
    最近更新 更多