【发布时间】:2020-10-02 03:23:36
【问题描述】:
有很多方法可以将 spark 数据帧读/写到 kafka。我正在尝试从 kafka 主题中读取消息并从中创建一个数据框。我能够从主题中获取消息,但无法将其转换为数据名。任何建议都会有所帮助。
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
consumer = KafkaConsumer('Jim_Topic')
for message in consumer:
data = message
print(data) # Printing the messages properly
df = data.map # am unable to convert it to a dataframe.
下面的方法我也试过了,
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "Jim_Topic") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
遇到错误,
pyspark.sql.utils.AnalysisException:找不到数据源:kafka。请按照《Structured Streaming + Kafka Integration Guide》的部署部分部署应用。;
【问题讨论】:
标签: python pyspark apache-kafka kafka-consumer-api