【问题标题】:pyspark streaming and utils import issuespyspark 流和实用程序导入问题
【发布时间】:2021-12-05 08:17:18
【问题描述】:

我正在尝试运行以下代码

import findspark
findspark.init('/opt/spark')
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
from pyspark.context import SparkContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

n_secs = 1
topic = "video-stream-event"
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1  :9092', 
                        'group.id':'video-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
#lines = kafkaStream.map(lambda x: x[1])
print(kafkastream)

我收到以下错误

从 pyspark.streaming.kafka 导入 KafkaUtils ModuleNotFoundError: 没有名为“pyspark.streaming.kafka”的模块 log4j:WARN 没有附加程序 可以找到记录器(org.apache.spark.util.ShutdownHookManager)。 log4j:WARN 请正确初始化log4j系统。

使用 python ==3.7 和 pyspark ==3.1.2 更改为 pyspark 2.4.5 和 2.4.6 并执行相同的代码,得到以下错误

> 21/10/18 14:05:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind
> to another address WARNING: An illegal reflective access operation has
> occurred WARNING: Illegal reflective access by
> org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.11-2.4.5.jar) to method
> java.nio.Bits.unaligned() WARNING: Please consider reporting this to
> the maintainers of org.apache.spark.unsafe.Platform WARNING: Use
> --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be
> denied in a future release 21/10/18 14:05:47 WARN NativeCodeLoader:
> Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable Traceback (most recent call
> last):   File "/home/deepika/Desktop/kafka/kafka_pyspark.py", line 12,
> in <module>
>     from pyspark.context import SparkContext   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/__init__.py",
> line 51, in <module>
>     from pyspark.context import SparkContext   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/context.py",
> line 31, in <module>
>     from pyspark import accumulators   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/accumulators.py",
> line 97, in <module>
>     from pyspark.serializers import read_int, PickleSerializer   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/serializers.py",
> line 72, in <module>
>     from pyspark import cloudpickle   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/cloudpickle.py",
> line 145, in <module>
>     _cell_set_template_code = _make_cell_set_template_code()   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/cloudpickle.py",
> line 126, in _make_cell_set_template_code
>     return types.CodeType( TypeError: an integer is required (got type bytes) log4j:WARN No appenders could be found for logger
> (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please
> initialize the log4j system properly. log4j:WARN See
> http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

知道现在该做什么吗?我想运行上面的代码。试过 python 3.7 和 3.8 以及 pysrk 版本也以这 2 个错误结尾 使用此链接安装 pyspark:

Installation Link for pyspark

【问题讨论】:

    标签: python pyspark apache-kafka


    【解决方案1】:
    1. 你应该使用spark-sql-kafka-0-10

    2. 您需要将findspark.init() 移动到os.environ 行之后。此外,您实际上并不需要此行,因为您可以通过 findspark 提供包。

    SPARK_VERSION = '3.1.2'
    SCALA_VERSION = '2.12'
    
    import findspark
    findspark.add_packages(['org.apache.spark:spark-sql-kafka-0-10_' + SCALA_VERSION + ':' + SPARK_VERSION ])
    
    findspark.init()
     
    from pyspark import SparkContext, SparkConf
    

    另外,如果刚开始使用 Spark,请使用最新版本

    https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

    关于log4j错误,需要在$SPARK_HOME/conf中创建一个log4j.properties文件

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多