【问题标题】:Spark job fails while filtering kafka messagesSpark 作业在过滤 kafka 消息时失败
【发布时间】:2019-10-24 22:09:19
【问题描述】:

需要通过检查消息是否有需要的字段来检查发送到Kafka的事件消息是否有效,如果是,则将数据推送到Elasticsearch。我就是这样做的:

object App {

  val parseJsonStream = (inStream: RDD[String]) => {
    inStream.flatMap(json => {
      try {
        val parsed = parse(json)
        Option(parsed)
      } catch {
        case e: Exception => System.err.println("Exception while parsing JSON: " + json)
          e.printStackTrace()
          None
      }
    }).flatMap(v => {
      if (v.values.isInstanceOf[List[Map[String, Map[String, Any]]]])
        v.values.asInstanceOf[List[Map[String, Map[String, Any]]]]
      else if (v.values.isInstanceOf[Map[String, Map[String, Any]]])
        List(v.values.asInstanceOf[Map[String, Map[String, Any]]])
      else {
        System.err.println("EVENT WRONG FORMAT: " + v.values)
        List()
      }
    }).flatMap(mapa => {
      val h = mapa.get("header")
      val b = mapa.get("body")
      if (h.toSeq.toString.contains("session.end") && !b.toSeq.toString.contains("duration")) {
        System.err.println("session.end HAS NO DURATION FIELD!")
        None
      }
      else if (h.isEmpty || h.get.get("userID").isEmpty || h.get.get("timestamp").isEmpty) {
        throw new Exception("FIELD IS MISSING")
        None
      }
      else {
        Some(mapa)
      }
    })
  }

  val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
    ssc, PreferBrokers, Subscribe[String, String](KAFKA_EVENT_TOPICS, kafkaParams)
  )
  val kafkaStreamParsed = kafkaStream.transform(rdd => {
    val eventJSON = rdd.map(_.value)
    parseJsonStream(eventJSON)
  }
  )

  val esEventsStream = kafkaStreamParsed.map(addElasticMetadata(_))

  try {
    EsSparkStreaming.saveToEs(esEventsStream, ELASTICSEARCH_EVENTS_INDEX + "_{postfix}" + "/" + ELASTICSEARCH_TYPE, Map("es.mapping.id" -> "docid")
    )
  } catch {
    case e: Exception =>
      EsSparkStreaming.saveToEs(esEventsStream, ELASTICSEARCH_FAILED_EVENTS)
      e.printStackTrace()
  }
}

我猜有人正在发送无效事件(这就是我为什么要进行此检查的原因),但 Spark job 不会跳过该消息,它会失败并显示消息:

用户类抛出异常:org.apache.spark.SparkException: Job 由于阶段失败而中止:阶段 6.0 中的任务 2 失败了 4 次,大多数 最近失败:在 6.0 阶段丢失任务 2.3(TID 190,xxx.xxx.host.xx, 执行器 3): java.lang.Exception: FIELD IS MISSING

如何防止它崩溃而只是跳过消息?它是YARN 应用程序,使用:

Spark 2.3.1
Spark-streaming-kafka-0-10_2.11:2.3.1
Scala 2.11.8

【问题讨论】:

    标签: scala apache-spark apache-kafka


    【解决方案1】:

    而不是这个

    throw new Exception("FIELD IS MISSING")
    None
    

    就这样做

    None
    

    抛出此异常会导致您的程序终止。

    【讨论】:

    • 我多么愚蠢,特别是当我在第一个之后使用sout print error,甚至没有看到throw new Exception :)
    • 没有问题,每个人盯着屏幕这么久都会犯类似的错误。 :)
    猜你喜欢
    • 1970-01-01
    • 2019-12-07
    • 2020-03-31
    • 2017-10-30
    • 1970-01-01
    • 2015-03-30
    • 1970-01-01
    • 1970-01-01
    • 2022-05-20
    相关资源
    最近更新 更多