【发布时间】:2016-10-13 07:45:38
【问题描述】:
我有一个问题困扰了我好几天,而且我完全没有想法。
我构建了一个 Spark docker 容器,其中 Spark 在独立模式下运行。 master 和 worker 都是从那里开始的。这是在 Azure 中运行的机器。
现在我尝试将我的 Spark Scala 应用程序部署在一个单独的容器(同一台机器)中,我在其中传递了 Spark 主 URL 和其他我需要连接到 Spark 的东西。无缝连接。
我遇到的第一个问题是:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition
然后我创建了一个包含除 Spark 之外的依赖项的文件夹,将它们放在我的应用 JAR 文件旁边的文件夹中,并使用 SparkConf.setJars 将它们添加到 SparkConf,
现在奇怪的事情发生了:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
不仅如此,如果我只是在本地机器上使用 java -cp <dependencies(including spark jars) cp> myApp.jar 运行 scala 应用程序,它可以完美运行,作业运行正常。
我在本地没有任何SPARK_HOME,setJars 基本上是一个空列表,好像我不会使用它,它仍然有效。
我猜它在我运行我的应用程序时使用类路径中提供的 jars,我不需要提供任何其他内容。
如果你们中的任何人有任何想法,我将不胜感激,我真的无法解释为什么这不起作用,而且我直到现在才进行任何 Spark 部署。我主要在嵌入式 Spark 中运行。
Spark 在我的应用程序依赖项 (2.0.0) 中的版本与在 docker 容器中运行的版本相同。
我用过:
我的应用程序的 Scala 2.11.7
两个容器(app、spark)上的 Java 1.8
这里要求的是我的应用程序的代码
val jars = Option(new File(Properties.spark_jars_path).listFiles()).toList.flatten.map(_.getAbsolutePath)
val conf = new SparkConf()
.setMaster(RunUtils.determineMasterUrl(Properties.mode))
.setAppName(RunUtils.SPARK_APP_NAME)
.setJars(jars)
.set("spark.cassandra.connection.host", Properties.cassandra_connection_host)
val ssc = new StreamingContext(conf, Seconds(1))
case class Result(buyDate: Timestamp, endDate: Timestamp, maxDate: Timestamp, buyAmount: Double, buyRate: Double)
def main(args: Array[String]): Unit = {
val DateFormatter = new java.text.SimpleDateFormat("yyyy-MM-dd")
val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)
//
// BITSTAMP
//
val bitstampTopic = Set("bitstamp_trades")
val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
val jsonNode = JsonMapper.readTree(trade)
Trade(
"BITSTAMP",
"BTC_USD",
if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
new Date(jsonNode.get("timestamp").asLong() * 1000),
jsonNode.get("amount").asDouble(),
jsonNode.get("price").asDouble()
)
}
bitstampTradeStream.saveToCassandra("coin_master", "trades", SomeColumns(
"exchange_house",
"exchange_currencies",
"exchange_type",
"date",
"trade_time",
"amount",
"price")
)
ssc.start()
ssc.awaitTermination()
}
【问题讨论】:
-
您能否提供您正在执行 mapparttitons 的代码或您认为为什么会发生此错误的代码?
-
再次检查问题,我添加了代码。
-
好的,所以刚刚意识到问题出在我的地图功能中,当我解决时,我会发布答案。
标签: java scala apache-spark docker dependencies