【问题标题】:why my spark job stuck in kafka streaming为什么我的 Spark 工作卡在 kafka 流媒体中
【发布时间】:2017-11-19 15:19:12
【问题描述】:

spark 作业提交到 minicube 创建的 kubernetes 集群中的 spark 集群后的输出:

----------------- RUNNING ----------------------
[Stage 0:>                                                          (0 + 0) / 2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
xxxxxxxxxxxxxxxxxxxxx
[Stage 0:>                                                          (0 + 0) / 2]

来自 spark web ui 的信息:

foreachRDD at myfile.scala:49 +details

org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625) myfile.run(myfile.scala:49) Myjob$.main(Myjob.scala:100) Myjob.main(Myjob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我的代码:

  println("----------------- RUNNING ----------------------");
    eventsStream.foreachRDD { rdd =>
        println("xxxxxxxxxxxxxxxxxxxxx")
        //println(rdd.count());
    if( !rdd.isEmpty )
    {
      println("yyyyyyyyyyyyyyyyyyyyyyy")
        val df = sqlContext.read.json(rdd);
        df.registerTempTable("data");

        val rules = rulesSource.rules();
        var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD;
        rules.foreach { rule =>
        ...
        }

        sqlContext.dropTempTable("data")
    }
    else
    {
        println("-------");
        println("NO DATA");
        println("-------");
    }
}

有什么想法吗?谢谢

更新

我的 spark 作业在独立 spark 的 docker 容器中运行良好。但是如果提交到 kubernetes 集群中的 spark 集群,它就会卡在 kafka 流中。不知道为什么?

spark master的yaml文件来自https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    name: spark-master
  name: spark-master
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: spark-master
    spec:
      containers:
      - name : spark-master
        image: spark-2.1.0-bin-hadoop2.6 
        imagePullPolicy: "IfNotPresent"
        name: spark-master
        ports:
        - containerPort: 7077
          protocol: TCP
        command:
         - "/bin/bash"
         - "-c"
         - "--"
        args :
- './start-master.sh ; sleep infinity'

【问题讨论】:

  • 我遇到了类似的问题。我正在使用 kafaka broker 0.10 的实验性火花流。一项任务卡住了,没有使用内存。另一个很快就回来了。所以整个事情都卡住了。

标签: scala apache-spark apache-kafka


【解决方案1】:

日志将有助于诊断问题。

基本上你不能在 RDD 操作中创建另一个 RDD。 即rdd1.map{rdd2.count()} 无效

查看 implicit sqlContext 导入后如何将 RDD 转换为数据帧。

        import sqlContext.implicits._
        eventsStream.foreachRDD { rdd =>

            println("yyyyyyyyyyyyyyyyyyyyyyy")

            val df = rdd.toDF(); 
            df.registerTempTable("data");
            .... //Your logic here.
            sqlContext.dropTempTable("data")
        }

【讨论】:

  • 我的 spark 作业在独立 spark 的 docker 容器中运行良好。但是如果提交到 kubernetes 集群中的 spark 集群,它就会卡在 kafka 流中。不知道为什么?
  • 你的火花日志说什么?你能看到 Spark Web UI 吗?它给你任何提示吗?在批处理持续时间过去后,查看 Spark Web UI 是否有 Streaming 选项卡的一件事。
猜你喜欢
  • 2014-11-12
  • 2016-06-23
  • 2014-11-25
  • 1970-01-01
  • 2020-11-06
  • 2018-10-04
  • 2017-12-13
  • 1970-01-01
  • 2017-04-15
相关资源
最近更新 更多