【问题标题】:Pyspark Streaming with Kafka in PyCharmPyCharm 中使用 Kafka 的 Pyspark 流式传输
【发布时间】:2016-06-04 07:12:00
【问题描述】:

我最近一直在尝试在 Pycharm 中调试 pyspark.streaming.kafka 类,以便与在 linux 机器上工作相比更容易排除故障。

这是我的示例代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition

sc = SparkContext(appName="sample app")
ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "{broker list}",
               "auto.offset.reset": "smallest"}
kafka_stream = KafkaUtils.createDirectStream(ssc, {topic list}, kafkaParams)

但是,我收到以下错误:

Traceback (most recent call last):
  File "C:\Program Files (x86)\JetBrains\PyCharm   5.0.3\helpers\pydev\pydevd.py", line 2411, in <module>
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files (x86)\JetBrains\PyCharm    5.0.3\helpers\pydev\pydevd.py", line 1802, in run
    launch(file, globals, locals)  # execute the script
  File "{script path}", line 30, in <module> {topic}], kafkaParams)
  File "C:\spark-1.6.0-bin-  hadoop2.6\python\lib\pyspark.zip\pyspark\streaming\kafka.py", line 152, in  createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling o20.loadClass.
: java.lang.ClassNotFoundException:   org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Unknown Source)

16/02/22 11:45:49 INFO SparkContext: Invoking stop() from shutdown hook

如果有人可以就如何在 PyCharm 中调试 PySpark Kafka 流模块提供一些指导,我将不胜感激

【问题讨论】:

  • 如何提交您的应用程序?
  • 我只是在使用 Pycharm 调试功能。
  • 它是否适用于其他配置?
  • 如果我使用一些简单的 spark rdd 功能,它就可以工作。 pyspark 库中的 kafka 流类正在扔掉它

标签: apache-spark pycharm apache-kafka pyspark spark-streaming


【解决方案1】:

Kafka 支持依赖于外部 spark-streaming-kafka JAR,它没有随 Spark 二进制文件一起提供。通常,这可以在提交时使用 --packages 参数指定。

对于使用 PyCharm 的本地开发,我能想到的最简单的解决方案是将其添加到 $SPARK_HOME/conf/spark-defaults.conf。假设您使用的是使用 Scala 2.10 构建的 Spark 1.6.0:

spark.jars.packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0

请记住,您将无法将 PyCharm 调试器与 Python 工作进程一起使用。见How can pyspark be called in debug mode?

【讨论】:

  • 感谢您的帮助。我正在使用版本:spark-1.6.0-bin-hadoop2.6。我在我的 Pycharm 解释器中添加了以下文件:C:\spark-1.6.0-bin-hadoop2.6\python\lib\py4j-0.9-src.zip 和 C:\spark-1.6.0-bin-hadoop2。 6\python\lib\pyspark.zip 并且正在使用 pySpark 而不是 scala。 Pycharm还有调试的方法吗?
  • 预构建的二进制文件使用 Scala 2.10,所以应该没问题。您只能将调试器与驱动程序代码一起使用。
  • 所以我只需要在我的 $SPARK_HOME/conf/ 目录下创建一个名为“spark-defaults.conf”的文件并添加 spark.jars.packages org.apache.spark:spark-streaming-kafka_2 .10:1.6.0 到文件?
  • 没错。我已经链接了一些关于调试限制原因的解释。
  • 非常感谢,zero323。在这方面有很大帮助。
猜你喜欢
  • 2021-07-28
  • 2016-02-16
  • 2016-03-24
  • 2018-11-18
  • 2018-02-13
  • 1970-01-01
  • 2023-03-25
  • 2019-04-04
  • 2018-11-04
相关资源
最近更新 更多