【问题标题】:How to programmatically load and stream Kafka topic to a PySpark Dataframe如何以编程方式将 Kafka 主题加载和流式传输到 PySpark 数据帧
【发布时间】:2020-10-02 03:23:36
【问题描述】:

有很多方法可以将 spark 数据帧读/写到 kafka。我正在尝试从 kafka 主题中读取消息并从中创建一个数据框。我能够从主题中获取消息,但无法将其转换为数据名。任何建议都会有所帮助。

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

consumer = KafkaConsumer('Jim_Topic')

for message in consumer:
    data = message
    print(data) # Printing the messages properly
    df = data.map # am unable to convert it to a dataframe.

下面的方法我也试过了,

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "Jim_Topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

遇到错误,

pyspark.sql.utils.AnalysisException:找不到数据源:kafka。请按照《Structured Streaming + Kafka Integration Guide》的部署部分部署应用。;

【问题讨论】:

    标签: python pyspark apache-kafka kafka-consumer-api


    【解决方案1】:

    根据您的用例,您可以

    1. create a Kafka source for streaming queries
    2. create a Kafka source of batch queries

    用于流式查询

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "Jim_Topic")
      .load()
    
    # Query data
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
      .as[(String, String)]
    

    批量查询

    val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "Jim_Topic")
      .load()
    
    # Query data
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
      .as[(String, String)]
    

    确保也添加所需的依赖项:

    org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.2
    

    (替换为您的 Spark 版本 - 以上指的是 Spark 版本2.0.2

    【讨论】:

    • 感谢您的快速帮助。我已经厌倦了这个逻辑,正在低于错误pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
    • @JimMacaulay 你是如何运行你的应用程序的?是通过spark-submit吗?
    • 直接从 PyCharm 运行。不是spark-submit
    • @JimMacaulay 您需要添加所需的依赖项。请参阅我的更新答案。
    • 你能帮我添加依赖吗?我不确定如何添加它。如果 Java 我会添加 maven 依赖项。不确定 Python
    猜你喜欢
    • 2018-11-18
    • 2016-03-24
    • 2018-05-13
    • 2019-05-20
    • 2019-07-21
    • 2021-03-01
    • 2023-03-26
    • 2021-12-10
    相关资源
    最近更新 更多