【问题标题】:How to connect to a secured Kafka cluster from Zeppelin ("Failed to construct kafka consumer")?如何从 Zeppelin 连接到安全的 Kafka 集群(“无法构建 kafka 消费者”)?
【发布时间】:2019-12-11 07:14:39
【问题描述】:

我正在尝试使用结构化流从 Kafka 代理读取一些数据,以将其显示在 Zeppelin 笔记中。我在 Heroku 上使用启用了 SSL 的 Spark 2.4.3、Scala 2.11、Python 2.7、Java 9 和 Kafka 2.2,但得到 StreamingQueryException: 'Failed to construction kafka consumer'。

我正在使用以下依赖项(在 Spark 解释器设置中设置):

org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3  
org.apache.spark:spark-streaming_2.11:2.4.3     
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 

我尝试过旧版本和新版本,但这些版本应该与我正在使用的 Spark/Scala 版本相匹配。

我已经使用简单的 Python 生产者和消费者成功地从 Kafka 写入和读取。

我正在使用的代码:

%pyspark
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
from pyspark.sql.functions import col, expr, when

schema = StructType().add("power", IntegerType()).add("colorR", IntegerType()).add("colorG",IntegerType()).add("colorB",IntegerType()).add("colorW",IntegerType())

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", brokers) \
  .option("kafka.security.protocol", "SSL") \
  .option("kafka.ssl.truststore.location", "/home/ubuntu/kafka/truststore.jks") \
  .option("kafka.ssl.keystore.location", "/home/ubuntu/kafka/keystore.jks") \
  .option("kafka.ssl.keystore.password", password) \
  .option("kafka.ssl.truststore.password", password) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("startingOffsets", "earliest") \
  .option("subscribe", topic) \
  .load()

schema = ArrayType(
    StructType([StructField("power", IntegerType()), 
                StructField("colorR", IntegerType()),
                StructField("colorG", IntegerType()),
                StructField("colorB", IntegerType()),
                StructField("colorW", IntegerType())]))

readDF = df.select( \
  col("key").cast("string"),
  from_json(col("value").cast("string"), schema))

query = readDF.writeStream.format("console").start()
query.awaitTermination()

我得到的错误:

Fail to execute line 43: query.awaitTermination()
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-2171412221151055324.py", line 380, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 43, in <module>
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, in awaitTermination
    return self._jsq.awaitTermination()
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco
    raise StreamingQueryException(s.split(': ', 1)[1], stackTrace)
StreamingQueryException: u'Failed to construct kafka consumer\n=== Streaming Query ===\nIdentifier: [id = 2ee20c47-8293-469a-bc0b-ef71a1f118bc, runId = 72422290-090a-4b6d-bd66-088a5a534240]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [cast(key#7 as string) AS key#22, jsontostructs(ArrayType(StructType(StructField(power,IntegerType,true), StructField(colorR,IntegerType,true), StructField(colorG,IntegerType,true), StructField(colorB,IntegerType,true), StructField(colorW,IntegerType,true)),true), cast(value#8 as string), Some(Etc/UTC)) AS jsontostructs(CAST(value AS STRING))#21]\n+- StreamingExecutionRelation KafkaV2[Subscribe[tanana-44614.lightbulb]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'

当我使用读写而不是 readStream 和 writeStream 时,我没有收到任何错误,但是当我向 Kafka 发送一些数据时,控制台上什么也没有显示。

我还应该尝试什么?

【问题讨论】:

  • 请删除 org.apache.spark:spark-streaming* 依赖项,因为它们根本不需要。
  • 谢谢你,@JacekLaskowski - 提供完全指定的路径很有帮助。我现在可以连接到 Kafka,但收到 Not authorized to access group: spark-kafka-source-ec67109b-c580-49fe-a97b-b762bb96db08--776798765-driver-2 等错误。因为我必须先创建消费者组才能使用它们(我的 Kafka 托管计划的限制),所以我认为我必须使用直接流式传输并指定 group.id
  • 根据官方文档,Spark不支持Python与Kafka进行SSL通信spark.apache.org/docs/2.4.3/streaming-kafka-integration.html

标签: apache-spark pyspark apache-zeppelin spark-structured-streaming


【解决方案1】:

看起来 Kafka 消费者无法访问 ~/kafka/truststore.jks,因此出现异常。将 ~ 替换为完全指定的路径(不带波浪号),问题应该会消失。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-03-14
    • 2023-03-20
    • 2017-08-07
    • 2021-03-10
    • 1970-01-01
    • 2018-08-25
    • 2020-05-23
    • 2018-02-28
    相关资源
    最近更新 更多