【发布时间】:2018-12-28 23:09:06
【问题描述】:
在 docker 容器中运行 spark-submit 时,我无法找到错误消息的解决方案。
所以总体思路是通过kafka生成数据,有这样的结构:
{'source': 'JFdyGil9YYHU', 'target': 'M4iCWTNB7P9E', 'amount': 5425.76, 'currency': 'EUR'}
然后通过Scala-script在Spark中接收这些数据,即:
package com.example.spark
import kafka.serializer.StringDecoder
import org.apache.spark.{TaskContext, SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.kafka.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.parsing.json.JSON
import org.elasticsearch.spark._
object Receiver {
def main(args: Array[String]): Unit = {
/** when starting the receiver, broker and topics must be passed.*/
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectReceiver <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics) = args
/** Create context:
* The --master option specifies the master URL for a distributed cluster,
* or local to run locally with one thread,
* or local[N] to run locally with N threads,
* or local[*] to run locally with as many worker threads as logical cores on your machine.
* You should start by using local for testing.
*/
val sparkConf = new SparkConf().setAppName("Receiver").setMaster("local[*]")
/** Whether elasticsearch-hadoop should create an index (if its missing)
* when writing data to Elasticsearch or fail.
* (default: yes, but specifying anyway for the sake of completeness)
*/
sparkConf.set("es.index.auto.create", "true")
/** Define that the context batch interval should take 2 seconds.*/
//val ssc = new StreamingContext(sparkConf, Seconds(2)) // testing alternatives
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet // if there are many
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
/** Get the lines.
* messages are of format:
* (null, {"key": "value", "key": "value, ...})
* .map(_._2) takes the second tuple argument
*/
val lines = messages.map(_._2)
/** pairs are now: [Ljava.lang.String;@5922fbe4
* it is what "toString" function in scala actually returns:
* def toString(): String = this.getClass.getName + "@" + this.hashCode.toHexString
* [ means it’s an array
* L means it can contain references to objects
* java.lang.String means all those objects should be instances of java.lang.String
* ; is just because Java loves its semicolons
*
* Get rid of all the unneccessary charecters and split the string by comma for further usage.
*/
val pairs = lines.map(_.stripPrefix("{").stripSuffix("}").replaceAll("\"|\\s", "").split(","))
/** Getting key-value from the pairs, which are:
* key: value
* key: value
* key: value
* ...
*/
pairs.foreach(arr =>
arr.map(
x => Map( x(0).split(":")(0) -> x(0).split(":")(1) )
).saveToEs("spark/json-test")
)
/* testing
pairs.foreach(
arr => arr.foreach( x =>
//val source = Map(x.map(_.1) -> x.map(_.2))
//source.foreach(println)
x => x.foreach(println)
)
)*/
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
我的 docker-compose.yml 文件如下:
version: '3.7'
services:
# kafka (zookeeper integrated)
kafka:
container_name: kafka
build: ./kafka
environment:
- KAFKA=localhost:9092
- ZOOKEEPER=localhost:2181
expose:
- 2181
- 9092
networks:
- kaspelki-net
# spark (contains all daemons)
spark:
container_name: spark
build: ./spark
command: bash
links:
- "kafka"
ports:
- 8080:8080
- 7077:7077
- 6066:6066
- 4040:4040
environment:
- SPARK_MASTER_HOST=spark://localhost:7077
env_file:
- ./hadoop/hadoop.env
tty: true
expose:
- 7077
- 8080
- 6066
- 4040
volumes:
- ./scripts/spark:/app
networks:
- kaspelki-net
# ELK
elasticsearch:
container_name: elasticsearch
build: ./ELK/elasticsearch
ports:
- 9200:9200
expose:
- 9200
networks:
- kaspelki-net
kibana:
container_name: kibana
build: ./ELK/kibana
ports:
- 5601:5601
expose:
- 5601
networks:
- kaspelki-net
depends_on:
- elasticsearch
### --- volumes --- ###
volumes:
data:
networks:
kaspelki-net:
name: kaspelki-net
所以我正在运行“sudo docker-compose up -d”,我可以在浏览器中测试“localhost:9200”和“localhost:5601”,它们工作正常,但是当我通过“ sudo docker exec -it spark bash" 并尝试通过以下方式提交我的receiver.jar:
spark-submit --master yarn-client --driver-java-options "-Dlog4j.configuration=file:///app/receiver/log4j.properties" /app/receiver/building_jar/target/scala-2.10 /receiver.jar kafka:9092 测试
然后我收到此错误消息:
18/12/28 09:05:18 错误网络客户端:节点 [127.0.0.1:9200] 失败(连接被拒绝);没有其他节点了 - 正在中止...
与其他一些消息,其中进程退出。 所以我明白连接以某种方式失败,但我不明白为什么:/
可以请人帮忙吗?
【问题讨论】:
标签: scala docker apache-spark