【发布时间】:2020-05-30 11:27:20
【问题描述】:
我正在尝试使用 pyspark 从 kafka 读取流。我正在使用 spark version 3.0.0-preview2 和 spark-streaming-kafka-0-10_2.12 在此之前我只是统计 zookeeper、kafka 并创建一个新主题:
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic data_wm
这是我的代码:
import pandas as pd
import os
import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TestApp").getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "data_wm") \
.load()
value = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
这就是我运行脚本的方式:
sudo --preserve-env=pyspark /usr/local/spark/bin/pyspark --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0-preview
作为这个命令的结果,我有这个:
: resolving dependencies :: org.apache.spark#spark-submit-parent-0d7b2a8d-a860-4766-a4c7-141a902d8365;1.0
confs: [default]
found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0-preview in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0-preview in central
found org.apache.kafka#kafka-clients;2.3.1 in central
found com.github.luben#zstd-jni;1.4.3-1 in central
found org.lz4#lz4-java;1.6.0 in central
found org.xerial.snappy#snappy-java;1.1.7.3 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.spark-project.spark#unused;1.0.0 in central :: resolution report :: resolve 380ms :: artifacts dl 7ms
:: modules in use:
com.github.luben#zstd-jni;1.4.3-1 from central in [default]
org.apache.kafka#kafka-clients;2.3.1 from central in [default]
org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0-preview from central in [default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0-preview from central in [default]
org.lz4#lz4-java;1.6.0 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.7.3 from central in [default]
但我总是出现这个错误:
d> f = spark \ ... .readStream \ ... .format("kafka") \ ...
.option("kafka.bootstrap.servers", "localhost:9092") \ ...
.option("subscribe", "data_wm") \ ... .load() Traceback(最 最近通话最后一次):文件“”,第 5 行,在文件中 “/usr/local/spark/python/pyspark/sql/streaming.py”,第 406 行,加载中 返回 self._df(self._jreader.load()) 文件 "/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 第 1286 行,在 call 文件中 “/usr/local/spark/python/pyspark/sql/utils.py”,第 102 行,在 deco 引发转换后的 pyspark.sql.utils.AnalysisException:找不到数据源:kafka。请按照以下要求部署应用程序 《Structured Streaming + Kafka Integration》部署部分 指南”。;
不知道这个错误的原因,请帮忙
【问题讨论】:
-
不相关 - 你为什么使用 sudo 来运行 pyspark?
-
你真的安装了 Spark 3 预览版吗?
-
即使我不使用 sudo 我也有同样的问题,是的,我的笔记本电脑上安装了 spark 3.0.0-preview2
-
不使用预览版有同样的问题吗?
-
没有,即使使用预览版我也有同样的问题
标签: apache-spark pyspark apache-kafka pyspark-sql spark-structured-streaming