【问题标题】:How to use Spark Streaming with Kafka with Kerberos?如何使用带有 Kerberos 的 Kafka 的 Spark Streaming?
【发布时间】:2025-12-08 15:55:02
【问题描述】:

我在尝试使用 Kerberized Hadoop 集群中的 Spark Streaming 应用程序使用来自 Kafka 的消息时遇到了一些问题。我尝试了两种方法listed here

  • 基于接收器的方法:KafkaUtils.createStream
  • 直接接近(无接收器):KafkaUtils.createDirectStream

基于接收器的方法 (KafkaUtils.createStream) 会抛出 2 种类型的异常(无论我是在本地模式 (--master local[*]) 还是在 YARN 模式 (--master yarn --deploy-mode client) 下都会抛出不同的异常:

  • Spark 本地应用程序中的奇怪 kafka.common.BrokerEndPointNotAvailableException
  • Spark on YARN 应用程序中的 Zookeeper 超时。我曾经设法完成这项工作(成功连接到 Zookeeper),但没有收到任何消息

在这两种模式(本地或 YARN)中,直接方法 (KafkaUtils.createDirectStream) 返回一个无法解释的 EOFException(请参阅下面的详细信息)。

我的最终目标是在 YARN 上启动 Spark Streaming 作业,因此我将把 Spark 本地作业放在一边。

这是我的测试环境:

  • Cloudera CDH 5.7.0
  • Spark 1.6.0
  • Kafka 0.10.1.0

出于测试目的,我正在开发一个单节点集群(主机名 = quickstart.cloudera)。对于那些有兴趣重现测试的人,我正在开发一个基于 cloudera/quickstart (Git repo) 的自定义 Docker 容器。

下面是我在spark-shell 中使用的示例代码。当然,此代码在未启用 Kerberos 时有效:由 kafka-console-producer 生成的消息由 Spark 应用程序接收。

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder

val ssc = new StreamingContext(sc, Seconds(5))

val topics = Map("test-kafka" -> 1)

def readFromKafkaReceiver(): Unit = {
    val kafkaParams = Map(
        "zookeeper.connect" -> "quickstart.cloudera:2181",
        "group.id" -> "gid1",
        "client.id" -> "cid1",
        "zookeeper.session.timeout.ms" -> "5000",
        "zookeeper.connection.timeout.ms" -> "5000"
    )

    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
    stream.print
}

def readFromKafkaDirectStream(): Unit = {
    val kafkaDirectParams = Map(
        "bootstrap.servers" -> "quickstart.cloudera:9092",
        "group.id" -> "gid1",
        "client.id" -> "cid1"
    )

    val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
    directStream.print
}

readFromKafkaReceiver() // or readFromKafkaDirectStream()

ssc.start

Thread.sleep(20000)

ssc.stop(stopSparkContext = false, stopGracefully = true)

启用 Kerberos 后,此代码不起作用。我遵循了这个指南:Configuring Kafka Security,并创建了两个配置文件:

jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

client.properties

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

我可以生成消息:

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer \
    --broker-list quickstart.cloudera:9092 \
    --topic test-kafka \
    --producer.config client.properties

但我无法使用 Spark Streaming 应用程序中的这些消息。为了在yarn-client 模式下启动spark-shell,我刚刚创建了一个新的 JAAS 配置 (jaas_with_zk_yarn.conf),其中包含 Zookeeper 部分 (Client),并且对 keytab 的引用只是文件的名称( keytab 然后通过--keytab 选项传递):

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

这个新文件在--files选项中传递:

spark-shell --master yarn --deploy-mode client \
    --num-executors 2 \
    --files /home/simpleuser/jaas_with_zk_yarn.conf \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
    --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
    --keytab /home/simpleuser/simpleuser.keytab \
    --principal simpleuser

我使用的代码和之前一样,只是我添加了另外两个Kafka参数,对应consumer.properties文件的内容:

"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"

一旦 Spark Streaming Context 启动,readFromKafkaReceiver() 会抛出以下错误(无法连接到 Zookeeper):

ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
        at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
        at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
        at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
        at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
        at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
        at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

有时与 ZK 的连接已建立(未达到超时),但随后没有收到任何消息。

readFromKafkaDirectStream()在调用此方法后立即抛出以下错误

org.apache.spark.SparkException: java.io.EOFException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)

没有更多的解释,只是一个EOFException。我认为 Spark 和 Kafka 代理之间存在通信问题,但没有更多解释。我也试过metadata.broker.list而不是bootstrap.servers,但没有成功。

也许我在 JAAS 配置文件或 Kafka 参数中遗漏了一些东西?也许 Spark 选项 (extraJavaOptions) 无效?我尝试了很多可能性,我有点迷失了。

如果有人可以帮助我解决这些问题中的至少一个(直接方法或基于接收器),我会很高兴。谢谢:)

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming kerberos jaas


    【解决方案1】:

    如 Cloudera 文档中所述,Spark 1.6 不支持它:

    Spark Streaming 在开始使用 Kafka 0.9 Consumer API 之前无法从安全的 Kafka 消费

    https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark_streaming_consumer_api

    1.6 中的 Spark-streaming 使用旧的消费者 API,不支持安全消费。

    您可以使用支持安全 Kafka 的 Spark 2.1: https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/

    【讨论】:

      最近更新 更多