【问题标题】:Spark Structured Streaming not able to see the record detailsSpark Structured Streaming 无法查看记录详细信息
【发布时间】:2020-10-04 17:03:17
【问题描述】:

我正在尝试处理来自 readstream 的记录并尝试打印该行。 但是在我的驱动程序日志或执行程序日志中看不到任何打印的语句。 可能有什么问题?

  1. 对于每条记录或批次(理想情况下)我想打印消息
  2. 对于每个批次,我都想执行一个流程。
val kafka = spark.readStream
    .format("kafka")
    .option("maxOffsetsPerTrigger", MAX_OFFSETS_PER_TRIGGER)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) 
    .option("subscribe", topic) // comma separated list of topics
    .option("startingOffsets", "earliest")
    .option("checkpointLocation", CHECKPOINT_LOCATION)
    .option("failOnDataLoss", "false")
    .option("minPartitions", sys.env.getOrElse("MIN_PARTITIONS", "64").toInt)
    .load()


  import spark.implicits._

 


  println("JSON output to write into sink")

  val consoleOutput = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
    //.select(from_json($"json", schema) as "data")
    //.select("data.*")
    //.select(get_json_object(($"value").cast("string"), "$").alias("body"))
    .writeStream
    .foreach(new ForeachWriter[Row] {
      override def open(partitionId: Long, epochId: Long): Boolean = true

      override def process(row: Row): Unit = {
        logger.info(
          s"Record received in data frame is -> " + row.mkString )
          runProcess() // Want to run some process every microbatch

      }

      override def close(errorOrNull: Throwable): Unit = {}


    })
    .outputMode("append")
    .format("console")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()


  consoleOutput.awaitTermination()

}

【问题讨论】:

    标签: apache-spark databricks spark-structured-streaming


    【解决方案1】:

    我复制了您的代码,它在没有runProcess 函数调用的情况下运行良好。

    如果您打算做两件不同的事情,我建议在从 Kafka 主题中选择相关字段后进行两个单独的查询:

    val kafkaSelection = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
    

    1。对于每条记录或批次(理想情况下)我想打印消息

    val query1 = kafkaSelection
      .writeStream
      .outputMode("append")
      .format("console")
      .trigger(Trigger.ProcessingTime("30 seconds"))
      .option("checkpointLocation", CHECKPOINT_LOCATION1)
      .start()
    

    2。对于每个批次,我都想执行一个流程。

    val query2 = kafkaSelection
      .writeStream
      .foreach(new ForeachWriter[Row] {
          override def open(partitionId: Long, epochId: Long): Boolean = true
    
          override def process(row: Row): Unit = {
            logger.info(
              s"Record received in data frame is -> " + row.mkString )
              runProcess() // Want to run some process every microbatch
    
          }
    
          override def close(errorOrNull: Throwable): Unit = {}
    
        })
      .outputMode("append")
      .option("checkpointLocation", CHECKPOINT_LOCATION2)
      .trigger(Trigger.ProcessingTime("30 seconds"))
      .start()
    
    
    

    另外请注意,我已经为每个查询单独设置了检查点位置,这将确保对 Kafka 偏移量的一致跟踪。确保每个查询有两个不同的检查点位置。您可以并行运行这两个查询。

    在等待它们终止之前定义这两个查询很重要:

    query1.awaitTermination()
    query2.awaitTermination()
    

    使用 Spark 2.4.5 测试:

    【讨论】:

    • 您可以看到消息“”在数据帧中收到的记录是 ->“在哪里?在驱动程序日志或执行程序日志中?是否需要添加检查点位置才能查看我的每批消息?跨度>
    • 我正在使用数据块...不知道为什么我根本看不到任何日志
    • 删除输出 .format("console") 后,我可以在驱动程序日志中看到。
    • 这里的runProcess会在每个executor中运行。我可以有一个只运行一次的东西,如果它返回 true,则停止 spark 作业。
    猜你喜欢
    • 2014-05-25
    • 1970-01-01
    • 1970-01-01
    • 2021-01-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多