【问题标题】:Docker-Compose Spark Elasticsearch Connection NetworkClient ErrorDocker-Compose Spark Elasticsearch 连接 NetworkClient 错误
【发布时间】:2018-12-28 23:09:06
【问题描述】:

在 docker 容器中运行 spark-submit 时,我无法找到错误消息的解决方案。

所以总体思路是通过kafka生成数据,有这样的结构:

{'source': 'JFdyGil9YYHU', 'target': 'M4iCWTNB7P9E', 'amount': 5425.76, 'currency': 'EUR'}

然后通过Scala-script在Spark中接收这些数据,即:

package com.example.spark

import kafka.serializer.StringDecoder
import org.apache.spark.{TaskContext, SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.kafka.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.parsing.json.JSON

import org.elasticsearch.spark._

object Receiver {
  def main(args: Array[String]): Unit = {
    /** when starting the receiver, broker and topics must be passed.*/
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectReceiver <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(brokers, topics) = args


    /** Create context:
    *   The --master option specifies the master URL for a distributed cluster,
    *   or local to run locally with one thread,
    *   or local[N] to run locally with N threads,
    *   or local[*] to run locally with as many worker threads as logical cores on your machine.
    *   You should start by using local for testing.
    */
    val sparkConf = new SparkConf().setAppName("Receiver").setMaster("local[*]")

    /** Whether elasticsearch-hadoop should create an index (if its missing)
    *   when writing data to Elasticsearch or fail.
    *   (default: yes, but specifying anyway for the sake of completeness)
    */
    sparkConf.set("es.index.auto.create", "true")

    /** Define that the context batch interval should take 2 seconds.*/
    //val ssc = new StreamingContext(sparkConf, Seconds(2)) // testing alternatives
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet // if there are many
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    /** Get the lines.
    *   messages are of format:
    *   (null, {"key": "value", "key": "value, ...})
    *   .map(_._2) takes the second tuple argument
    */
    val lines = messages.map(_._2)

    /** pairs are now: [Ljava.lang.String;@5922fbe4
    *   it is what "toString" function in scala actually returns:
    *   def toString(): String = this.getClass.getName + "@" + this.hashCode.toHexString
    *   [ means it’s an array
    *   L means it can contain references to objects
    *   java.lang.String means all those objects should be instances of java.lang.String
    *   ; is just because Java loves its semicolons
    *
    *   Get rid of all the unneccessary charecters and split the string by comma for further usage.
    */
    val pairs = lines.map(_.stripPrefix("{").stripSuffix("}").replaceAll("\"|\\s", "").split(","))

    /** Getting key-value from the pairs, which are:
    *   key: value
    *    key: value
    *    key: value
    *    ...
    */
    pairs.foreach(arr =>
        arr.map(
            x => Map( x(0).split(":")(0) -> x(0).split(":")(1) )
        ).saveToEs("spark/json-test")
    )
    /* testing
    pairs.foreach(
        arr => arr.foreach( x =>
            //val source = Map(x.map(_.1) -> x.map(_.2))
            //source.foreach(println)
            x => x.foreach(println)
        )
    )*/

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

我的 docker-compose.yml 文件如下:

version: '3.7'
services:


# kafka (zookeeper integrated)
  kafka:
    container_name: kafka
    build: ./kafka
    environment:
      - KAFKA=localhost:9092
      - ZOOKEEPER=localhost:2181
    expose:
      - 2181
      - 9092
    networks:
      - kaspelki-net

# spark (contains all daemons)
  spark:
    container_name: spark
    build: ./spark
    command: bash
    links:
      - "kafka"
    ports:
      - 8080:8080
      - 7077:7077
      - 6066:6066
      - 4040:4040
    environment:
      - SPARK_MASTER_HOST=spark://localhost:7077
    env_file:
      - ./hadoop/hadoop.env
    tty: true
    expose:
      - 7077
      - 8080
      - 6066
      - 4040
    volumes:
      - ./scripts/spark:/app
    networks:
      - kaspelki-net


# ELK
  elasticsearch:
    container_name: elasticsearch
    build: ./ELK/elasticsearch
    ports:
      - 9200:9200
    expose:
      - 9200
    networks:
      - kaspelki-net


  kibana:
    container_name: kibana
    build: ./ELK/kibana
    ports:
      - 5601:5601
    expose:
      - 5601
    networks:
      - kaspelki-net
    depends_on:
      - elasticsearch



### --- volumes --- ###
volumes:
  data:
networks:
  kaspelki-net:
    name: kaspelki-net

所以我正在运行“sudo docker-compose up -d”,我可以在浏览器中测试“localhost:9200”和“localhost:5601”,它们工作正常,但是当我通过“ sudo docker exec -it spark bash" 并尝试通过以下方式提交我的receiver.jar:

spark-submit --master yarn-client --driver-java-options "-Dlog4j.configuration=file:///app/receiver/log4j.properties" /app/receiver/building_jar/target/scala-2.10 /receiver.jar kafka:9092 测试

然后我收到此错误消息:

18/12/28 09:05:18 错误网络客户端:节点 [127.0.0.1:9200] 失败(连接被拒绝);没有其他节点了 - 正在中止...

与其他一些消息,其中进程退出。 所以我明白连接以某种方式失败,但我不明白为什么:/

可以请人帮忙吗?

【问题讨论】:

    标签: scala docker apache-spark


    【解决方案1】:

    我对 Spark 不熟悉,但是在您的配置中的某个地方,您正尝试从一个容器连接到 localhost:9200,但这是无法正常工作的(这在 docker 之外工作,因为 localhost 是您的机器,但是当每个服务在自己的容器中运行 localhost 是指每个容器的 localhost 而不是主机)。

    因此,在 docker 中运行时更改您的配置以使用 compose 服务名称(在您的情况下为 elasticsearch)而不是 localhost 来引用弹性搜索,它应该都可以工作 - 您需要将 elasticsearch 添加为 link调用服务下的 compose 文件,以便通过服务名称引用它(就像您将 kafka 作为 spark 下的链接一样)。

    【讨论】:

    • 您好@Markoorn,感谢您的回复!我现在明白了,问题是什么。我尝试了一些配置 docker-compose 文件的变体(包括您建议的链接),但我不知道如何“使用 elasticsearch 而不是 localhost 引用弹性搜索”。感觉有点蠢——你能给我一个提示吗,怎么做?
    • 我终于用你的回答解决了这个错误。我必须按照你说的添加链接,并且必须从我的 .scala 文件中删除“.setMaster(local[*])”。现在我又遇到了一个错误,但是这个问题已经解决了:)非常感谢!
    • 似乎删除 "local[*]" -master 设置导致我的火花不再处理任何东西(我花了一段时间才意识到这一点,因为我处理了语法错误) ,尽管我在 spark-submit 命令中传递了 --master yarn-client 。所以问题还是一样,答案还是正确的。 (对我来说)缺少的一件事是我仍然无法使其正常工作:(如果有人可以提供帮助 - 我将不胜感激!
    • 嗨@Nin4ikP - 抱歉,我不知道 Spark 是如何设置的,因为我从未使用过它,有没有办法为不同的环境设置配置文件?基本上,您想要实现的是在本地运行时使用 localhost 作为基地址的设置,然后在 docker 中运行时,您将有一个不同的配置,然后它宁愿使用 compose 服务名称
    • 嘿 :) 谢谢你的帮助!我对这个领域的一切都有些陌生。据我所知,我需要的一切都不是本地的。 (我为 Hello world 运行了一个本地脚本,但我当然需要工具(Kafka、Spark、Elasticsearch、Kibana)进行通信,所以我没有在本地运行任何代码。但我认为问题在于我正在尝试用yarn-client运行代码,因为我读到一些关于这些设置有问题的ppl。所以我想我会在几分钟内问另一个关于这个问题的问题。也许有人也有它并且可以回答:)无论如何-谢谢!
    猜你喜欢
    • 2018-04-30
    • 2023-02-12
    • 2021-07-23
    • 1970-01-01
    • 2017-11-14
    • 2019-04-15
    • 1970-01-01
    • 1970-01-01
    • 2021-06-09
    相关资源
    最近更新 更多