【问题标题】:How to execute an action before start()?如何在 start() 之前执行一个动作?
【发布时间】:2018-10-27 22:39:15
【问题描述】:

我正在开发火花流作业(使用结构化流而不使用 DStream)。我从 kafka 收到一条消息,其中将包含许多带有逗号分隔值的字段,其中第一列将是文件名。现在基于该文件名,我将不得不从 HDFS 读取文件并创建一个数据帧并在此基础上进一步操作。这似乎很简单,但 spark 不允许我在调用 start 之前运行任何操作。 Spark 文档也引用了相同的内容。

此外,还有一些 Dataset 方法不适用 流数据集。它们是会立即运行查询的操作 并返回结果,这在流数据集上没有意义。

以下是我尝试过的。

object StructuredStreamingExample {
  case class filenameonly(value:String)
  def main(args:Array[String])
  {
    val spark = SparkSession.builder.appName("StructuredNetworkWordCount").master("local[*]").getOrCreate()

    spark.sqlContext.setConf("spark.sql.shuffle.partitions", "5")

    import spark.implicits._
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "strtest")
      .load()
   val values=lines.selectExpr("CAST(value AS STRING)").as[String]
   val filename = values.map(x => x.split(",")(0)).toDF().select($"value")
   //Here how do i convert the filename which is a Dataframe to string and apply that to spark.readtextfile(filename)
   datareadfromhdfs
  .writeStream
  .trigger(ProcessingTime("10 seconds"))
  .outputMode("append")
  .format("console")
  .start()
  .awaitTermination()

现在在上面的代码中,在我获得作为 Dataframe 的文件名之后,我如何将其转换为字符串,以便我可以执行 spark.readtextfile(filename) 来读取 HDFS 中的文件。

【问题讨论】:

  • 鉴于我们的讨论,我认为标题可能会产生误导。
  • 是的,一开始我期待使用 Streaming Dataframe 的解决方案,但经过我们的讨论,这似乎不是流本身的用例。所以让主题相同,我将等待与流数据帧相关的答案。

标签: scala apache-spark spark-structured-streaming


【解决方案1】:

我不确定它是火花流的最佳用途,但在这种情况下,我会调用 filename.foreachRDD 并从那里读取 hdfs 文件,然后做任何你需要的事情。 (请记住,在 foreachRDD 中运行时,您不能使用全局 spark 会话,但需要像这样从构建器中获取或创建它:val sparkSession = SparkSession.builder.config(myCurrentForeachRDD.sparkContext.getConf).getOrCreate()

您似乎依靠流来告诉您在哪里查找和加载文件。您是否尝试过简单地在该文件夹上使用文件流并让 spark 为您自动监控和读取新文件?

【讨论】:

  • 感谢您的关注。当我遵循方法一时,如果我错了,请纠正我。假设我有一个包含 15 个分区的 kafka 主题,并且来自生产者端的消息以循环方式发送到该主题。因此,如果生产者发送 30 条消息,那么每个分区将有 2 条消息。现在从火花的角度来看,并行度将是 1:1(kafka 分区:火花核心)。在这种情况下,如果我使用 foreachRDD(在驱动程序中执行)并从特定主题中获取所有值,则整个 30 条消息将由单个 spark 核心处理,因为我不会循环到 foreachPartition 此处。
  • 使用第二种方法的问题是,正如我之前提到的,目录名称将是来自 kafka 消息的动态,因此我无法查找特定目录。
  • 我不熟悉 kafka 连接器的工作原理,所以我不知道在那里进行批处理。我可以说的是,当我将 Kinesis 与多个分片一起使用时,我会收到具有多个分区的 RDD。由于它是一个 RDD,我仍然可以对其进行重新分区以匹配我需要的并行度。
  • 但问题通常是我们通常希望源来决定并行度,因为他们比火花更了解数据。
  • 当然,但总的来说,对于 Spark,有一系列值可以最大化吞吐量。根据我的经验,太多的分区也会影响性能。所以我一般寻求分区数是核心数的2-3倍。
【解决方案2】:

使用 spark 结构化流肯定不是最好的用例。如果您正确理解 spark 结构化流,则所有数据转换/聚合都应该发生在生成结果表的查询上。但是,您仍然可以实现一些解决方法,您可以编写代码以从 (falt)mapWithGroupState 中的 HDFS 读取数据。但是,同样不建议这样做。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-10-27
    • 2016-07-26
    • 1970-01-01
    • 2012-09-19
    • 2022-11-03
    • 1970-01-01
    • 1970-01-01
    • 2018-02-07
    相关资源
    最近更新 更多