【问题标题】:spark streaming for python not working in databrickspython的火花流在databricks中不起作用
【发布时间】:2021-08-08 16:36:48
【问题描述】:

我正在尝试通过数据块中的 python 火花流从融合主题中读取数据。

所以我有两个问题

  1. 我试图阅读一个主题,但它一直给我一个“未能构建 kafka 消费者”
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
        
        
df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "pkc-xxxxxxxxx.confluent.cloud:9092") \
  .option("subscribe", "topic1") \
  .option("kafka.sasl.mechanisms", "PLAIN")\
  .option("kafka.security.protocol", "SASL_SSL")\
  .option("kafka.sasl.username","xxxx")\
  .option("kafka.sasl.password", "xxxx")\
  .option("startingOffsets", "earliest")\
  .option("failOnDataLoss", "false")\
  .load()\
  .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key')

然后我试着做一个

display(df); 

我不断得到一个 kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

我有什么遗漏吗?我试图查看我试图从我的融合主题中获取的数据框

  1. 如何让 spark 流在数据块中持续监听我的主题?在我的笔记本电脑上,我可以将 spark 提交到集群,但我不太确定是否使用数据块。

感谢任何帮助!

谢谢。

【问题讨论】:

  • 您需要发布更大的堆栈跟踪 - 您应该有以 Caused by... 开头的行...
  • 我设法修复了它。一切都好。

标签: apache-spark databricks


【解决方案1】:

致那些想知道为什么的人

df = (spark 
  .readStream 
  .format("kafka") 
  .option("kafka.bootstrap.servers", "pkc-xxxx:9092") 
  .option("subscribe", "GOOG_GLOBAL_MOBILITY_REPORT_RAW") 
  .option("kafka.security.protocol","SASL_SSL") 
  .option("kafka.sasl.mechanism", "PLAIN") 
  .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxxx";""")
  .load()
  .withColumn('key', fn.col("key").cast(StringType()))
  .withColumn('value', fn.col("value").cast(StringType()))
  )

【讨论】:

    猜你喜欢
    • 2019-03-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-09
    • 1970-01-01
    • 2017-03-16
    相关资源
    最近更新 更多