【问题标题】:How to connect to a kerberos-secured kafka cluster running on gcp using pyspark?如何使用 pyspark 连接到在 gcp 上运行的受 kerberos 保护的 kafka 集群?
【发布时间】:2020-03-21 23:31:29
【问题描述】:

所以在我开始之前,我想说我知道这个问题可能看起来重复或重复,但那里可用的答案与我的要求无关。它们主要用于 java 和 scala,但只有一点关于 python。

所以我有一个在 gcp 上运行的受 kerberos 保护的 kafka 集群。我通过使用 kafka-python 包创建生产者和消费者检查了两次运行,它运行良好。

但是当我尝试使用 pyspark 使用我的 spark 应用程序连接到该集群时,它不起作用。我的 Spark 应用程序如下所示:-

def application(topic, batchTime, appName, **kwargs):
    import os

    try:
        os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars <full-path-to-spark>/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar pyspark-shell'

        conf = SparkConf().setAppName(appName).setMaster('local[*]') 
        sc = SparkContext(conf=conf)

        stream_context = StreamingContext(sparkContext=sc, batchDuration=batchTime)
        kafka_stream = KafkaUtils.createDirectStream(ssc=stream_context, topics=[topic], 
                                            kafkaParams={"metadata.broker.list":"broker1:9092",
                                                    "ssl.context": 'context',
                                                    'sasl.plain.username': '****',
                                                    'sasl.plain.password': '*********',
                                                    'sasl.mechanism': 'PLAIN',
                                                    'security.protocol': "SASL_PLAINTEXT"})

        lines = kafka_stream.map(lambda x: json.loads(x[1]))
        final_obj = lines.map(lambda line: SparkHelper.get_app_type(line, line['app_type']))
        final_obj.foreachRDD(handler)

        final_obj.pprint()

当我运行它时,它会显示如下错误:-

19/11/26 19:11:59 WARN Utils: Your hostname, openstack-inspiron-3543 resolves to a loopback address: 127.0.1.1; using 10.10.0.25 instead (on interface wlp6s0)
19/11/26 19:11:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/11/26 19:11:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/11/26 19:12:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.mechanism is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.plain.password is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property sasl.plain.username is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property security.protocol is not valid
19/11/26 19:12:01 WARN VerifiableProperties: Property ssl.context is not valid

控制台卡在那里。它不会终止,也不会运行任何东西。是的,这只发生在我的本地 kafka 集群运行时。如果我关闭它并运行 spark 应用程序,它会显示“NoBrokerAvailable”。这不应该发生对吧?当我尝试同时连接到不同的集群时,我的本地 kafka 集群是否正在运行并不重要。

我尝试安装不同的 spark-stream-kafka-assembly jar 文件,但它们都不起作用。我从https://jar-download.com/?search_box=spark-streaming-kafka-assembly下载了jar文件。

我正在使用的当前版本是...

火花:2.4.4, Hadoop:2.7, api_version: 0.10

我正在使用融合平台 5.3.1 运行 kafka 集群。

我不明白问题出在哪里。请指出我正在犯的任何错误,或者如果没有我应该改变什么来完成这项工作。感谢您提前回复!

【问题讨论】:

  • Pyspark 仍然使用内部 Java/Scala kafka 和 Spark 库,因此对这些库的任何回答都应转换为 pyspark,例如您应该为 Spark 提交命令提供 keytab
  • 是的,我尝试了其中一些解决方案。就像为每个 Kafka 参数添加前缀“Kafka”一样。但它不起作用。我也不知道如何提供密钥表。 @cricket_007
  • 运行 spark-submit 时,可以添加 -–conf spark.yarn.keytab=path_to_keytab -–conf spark.yarn.principal=principal@REALM.COM 或者将这些属性添加到 SparkConf 对象中
  • 我在集群中没有密钥表文件。 Kerberos 只是验证 kafkaclient 发送的用户名和密码。集群端只有jaas文件。
  • 如果作业是长时间运行的,那么您将需要一个用于重新授权的 keytab,因为 Kerberos 需要它。如 Spark 文档中所述 - spark.apache.org/docs/latest/…(假设您的 Hadoop 集群也使用 Kerberos)

标签: python pyspark apache-kafka kerberos sasl


【解决方案1】:

您可以尝试使用 Pyspark 结构化流,而不是 DSstream 方法,因为它具有传递 SSL 功能和轻松处理的选项。请参考以下堆栈溢出链接中的测试示例代码。 Failed to find leader for topics; java.lang.NullPointerException NullPointerException at org.apache.kafka.common.utils.Utils.formatAddress

【讨论】:

  • 是的,我试过了,它的工作,但后来无法执行 sql 操作。每次我尝试都会抛出错误
  • 您能分享一下您正在执行的 SQL 操作以及遇到的错误。
  • 我应该为此发布一个新问题还是只编辑这个问题?
  • 如果堆栈溢出策略允许您可以发布新问题!谢谢
  • 确定我会发布一个新问题
猜你喜欢
  • 2019-07-05
  • 1970-01-01
  • 2017-03-28
  • 2017-01-14
  • 1970-01-01
  • 2023-04-04
  • 2017-02-20
  • 2019-04-17
  • 2018-11-22
相关资源
最近更新 更多