【问题标题】:Error when running SparkApp from docker container against Spark running in another container从 docker 容器运行 SparkApp 与在另一个容器中运行的 Spark 时出错
【发布时间】:2016-10-13 07:45:38
【问题描述】:

我有一个问题困扰了我好几天,而且我完全没有想法。

我构建了一个 Spark docker 容器,其中 Spark 在独立模式下运行。 master 和 worker 都是从那里开始的。这是在 Azure 中运行的机器。

现在我尝试将我的 Spark Scala 应用程序部署在一个单独的容器(同一台机器)中,我在其中传递了 Spark 主 URL 和其他我需要连接到 Spark 的东西。无缝连接。

我遇到的第一个问题是:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition

然后我创建了一个包含除 Spark 之外的依赖项的文件夹,将它们放在我的应用 JAR 文件旁边的文件夹中,并使用 SparkConf.setJars 将它们添加到 SparkConf,

现在奇怪的事情发生了:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

不仅如此,如果我只是在本地机器上使用 java -cp <dependencies(including spark jars) cp> myApp.jar 运行 scala 应用程序,它可以完美运行,作业运行正常。

我在本地没有任何SPARK_HOMEsetJars 基本上是一个空列表,好像我不会使用它,它仍然有效。 我猜它在我运行我的应用程序时使用类路径中提供的 jars,我不需要提供任何其他内容。

如果你们中的任何人有任何想法,我将不胜感激,我真的无法解释为什么这不起作用,而且我直到现在才进行任何 Spark 部署。我主要在嵌入式 Spark 中运行。

Spark 在我的应用程序依赖项 (2.0.0) 中的版本与在 docker 容器中运行的版本相同。 我用过:
我的应用程序的 Scala 2.11.7
两个容器(app、spark)上的 Java 1.8

这里要求的是我的应用程序的代码

  val jars = Option(new File(Properties.spark_jars_path).listFiles()).toList.flatten.map(_.getAbsolutePath)
  val conf = new SparkConf()
    .setMaster(RunUtils.determineMasterUrl(Properties.mode))
    .setAppName(RunUtils.SPARK_APP_NAME)
    .setJars(jars)
    .set("spark.cassandra.connection.host", Properties.cassandra_connection_host)

  val ssc = new StreamingContext(conf, Seconds(1))

  case class Result(buyDate: Timestamp, endDate: Timestamp, maxDate: Timestamp, buyAmount: Double, buyRate: Double)

  def main(args: Array[String]): Unit = {

    val DateFormatter = new java.text.SimpleDateFormat("yyyy-MM-dd")

    val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)

    //
    // BITSTAMP
    //
    val bitstampTopic = Set("bitstamp_trades")
    val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
    val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
      val jsonNode = JsonMapper.readTree(trade)
      Trade(
        "BITSTAMP",
        "BTC_USD",
        if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
        DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
        new Date(jsonNode.get("timestamp").asLong() * 1000),
        jsonNode.get("amount").asDouble(),
        jsonNode.get("price").asDouble()
      )
    }

    bitstampTradeStream.saveToCassandra("coin_master", "trades", SomeColumns(
      "exchange_house",
      "exchange_currencies",
      "exchange_type",
      "date",
      "trade_time",
      "amount",
      "price")
    )
   ssc.start()
   ssc.awaitTermination()
  }

【问题讨论】:

  • 您能否提供您正在执行 mapparttitons 的代码或您认为为什么会发生此错误的代码?
  • 再次检查问题,我添加了代码。
  • 好的,所以刚刚意识到问题出在我的地图功能中,当我解决时,我会发布答案。

标签: java scala apache-spark docker dependencies


【解决方案1】:

好的,问题出在我的地图功能上。

更具体地说,这一行是问题所在:

val jsonNode = JsonMapper.readTree(trade)

JsonMapper 实际上是来自 Jackson 库的已配置 ObjectMapper,为了使其工作,我应该使用 sparkContext.broadcast,因为必须在每个执行程序上调用一些方法。

您可以在此处阅读更多关于它为什么不起作用的信息: Spark: broadcasting jackson ObjectMapper

所以,在将我的代码更改为这样的代码之后,它就可以工作了:

val broadcastValue = ssc.sparkContext.broadcast(JsonMapper)

val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)

//
// BITSTAMP
//
val bitstampTopic = Set("bitstamp_trades")
val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
  broadcastValue.value.registerModule(new DefaultScalaModule with RequiredPropertiesSchemaModule)
  val jsonNode = broadcastValue.value.readTree(trade)
  Trade(
    "BITSTAMP",
    "BTC_USD",
    if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
    DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
    new Date(jsonNode.get("timestamp").asLong() * 1000),
    jsonNode.get("amount").asDouble(),
    jsonNode.get("price").asDouble()
  )
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-16
    • 1970-01-01
    • 1970-01-01
    • 2019-08-15
    • 1970-01-01
    相关资源
    最近更新 更多