【发布时间】:2021-12-05 08:17:18
【问题描述】:
我正在尝试运行以下代码
import findspark
findspark.init('/opt/spark')
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'
import sys
import time
from pyspark.context import SparkContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
n_secs = 1
topic = "video-stream-event"
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'127.0.0.1 :9092',
'group.id':'video-group',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'})
#lines = kafkaStream.map(lambda x: x[1])
print(kafkastream)
我收到以下错误
从 pyspark.streaming.kafka 导入 KafkaUtils ModuleNotFoundError: 没有名为“pyspark.streaming.kafka”的模块 log4j:WARN 没有附加程序 可以找到记录器(org.apache.spark.util.ShutdownHookManager)。 log4j:WARN 请正确初始化log4j系统。
使用 python ==3.7 和 pyspark ==3.1.2 更改为 pyspark 2.4.5 和 2.4.6 并执行相同的代码,得到以下错误
> 21/10/18 14:05:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind
> to another address WARNING: An illegal reflective access operation has
> occurred WARNING: Illegal reflective access by
> org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.11-2.4.5.jar) to method
> java.nio.Bits.unaligned() WARNING: Please consider reporting this to
> the maintainers of org.apache.spark.unsafe.Platform WARNING: Use
> --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be
> denied in a future release 21/10/18 14:05:47 WARN NativeCodeLoader:
> Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable Traceback (most recent call
> last): File "/home/deepika/Desktop/kafka/kafka_pyspark.py", line 12,
> in <module>
> from pyspark.context import SparkContext File "/home/deepika/Downloads/code_dump/spark/python/pyspark/__init__.py",
> line 51, in <module>
> from pyspark.context import SparkContext File "/home/deepika/Downloads/code_dump/spark/python/pyspark/context.py",
> line 31, in <module>
> from pyspark import accumulators File "/home/deepika/Downloads/code_dump/spark/python/pyspark/accumulators.py",
> line 97, in <module>
> from pyspark.serializers import read_int, PickleSerializer File "/home/deepika/Downloads/code_dump/spark/python/pyspark/serializers.py",
> line 72, in <module>
> from pyspark import cloudpickle File "/home/deepika/Downloads/code_dump/spark/python/pyspark/cloudpickle.py",
> line 145, in <module>
> _cell_set_template_code = _make_cell_set_template_code() File "/home/deepika/Downloads/code_dump/spark/python/pyspark/cloudpickle.py",
> line 126, in _make_cell_set_template_code
> return types.CodeType( TypeError: an integer is required (got type bytes) log4j:WARN No appenders could be found for logger
> (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please
> initialize the log4j system properly. log4j:WARN See
> http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
知道现在该做什么吗?我想运行上面的代码。试过 python 3.7 和 3.8 以及 pysrk 版本也以这 2 个错误结尾 使用此链接安装 pyspark:
【问题讨论】:
标签: python pyspark apache-kafka