【问题标题】:How to Use both Scala and Python in a same Spark project?如何在同一个 Spark 项目中同时使用 Scala 和 Python?
【发布时间】:2016-01-03 16:55:51
【问题描述】:

是否可以通过管道将 Spark RDD 传输到 Python?

因为我需要一个 python 库来对我的数据进行一些计算,但我的主要 Spark 项目是基于 Scala 的。 有没有办法将它们混合使用或让 python 访问相同的 spark 上下文?

【问题讨论】:

  • 我建议在 PySpark 中重写您的代码,但并非所有 Python 库都可以使用 rdds。您使用的是哪个 Python 库?
  • 您想要 Spark 操作的结果,还是想要在 Python 中操作原始 RDD。前者不仅是可能的,而且在大数据世界中很常见。为 Spark 结果编写 Python 插件需要适当注意细节,但这通常是一个很好的解决方案。你的互联网搜索怎么没有出现这些信息?您使用了哪些搜索词? stackoverflow.com/help/how-to-ask。请发布您的 Spark API 和您想要与之交互的 Python 级别。
  • 我想用Jieba进行文本处理,这是一个中文分词库。我使用 Spark Streaming + Kafka 进行数据收集。现在的问题是我需要将原始数据传输到 Python 进行文本处理,然后将结果发送回其他 Scala 代码进行其他分析。
  • 你可以在Spark中使用管道。基本上可以通过管道发送数据并获取外部程序的输出作为输入。

标签: python scala apache-spark pyspark spark-streaming


【解决方案1】:

您确实可以使用 Scala 和 Spark 以及常规 Python 脚本通过管道输出到 Python 脚本。

test.py

#!/usr/bin/python

import sys

for line in sys.stdin:
  print "hello " + line

spark-shell (scala)

val data = List("john","paul","george","ringo")

val dataRDD = sc.makeRDD(data)

val scriptPath = "./test.py"

val pipeRDD = dataRDD.pipe(scriptPath)

pipeRDD.foreach(println)

输出

你好约翰

你好林哥

你好乔治

你好保罗

【讨论】:

  • 仅本地在我的笔记本电脑上,而不是在集群上。
  • 是的,我知道这种方法,但是python脚本是在executor上运行的,所以我有一个问题,如果我将太多数据传递给外部脚本,worker会崩溃吗?我的意思是,外部 Python 脚本不是并行计算。
  • @WilsonLiao 记录一次通过管道传输 1 项,因此只要您的外部脚本可以处理 1 个 RDD 元素的数据(并且在运行时不累积状态),您应该美好的。 scala 中 pipe 函数的文档很有帮助:github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/…
【解决方案2】:

您可以在 Spark 中通过 Pipe 运行 Python 代码。

使用 pipe(),您可以编写 RDD 的转换,从标准输入读取每个 RDD 元素作为字符串,按照脚本指令操作该字符串,然后将结果作为字符串写入标准输出。

SparkContext.addFile(path),我们可以为每个工作节点添加文件列表,以便在 Spark 作业启动时下载。所有工作节点都将拥有脚本副本,因此我们将通过以下方式获得并行操作管道。我们需要在所有worker和executor节点上安装所有的库和依赖。

示例:

Python 文件:将输入数据转为大写的代码

#!/usr/bin/python
import sys
for line in sys.stdin:
    print line.upper()

Spark 代码:用于管道数据

val conf = new SparkConf().setAppName("Pipe")
val sc = new SparkContext(conf)
val distScript = "/path/on/driver/PipeScript.py"
val distScriptName = "PipeScript.py"
sc.addFile(distScript)
val ipData = sc.parallelize(List("asd","xyz","zxcz","sdfsfd","Ssdfd","Sdfsf"))
val opData = ipData.pipe(SparkFiles.get(distScriptName))
opData.foreach(println)

【讨论】:

    【解决方案3】:

    如果我理解正确,只要您从scala 获取数据并将其转换为RDDSparkContext,那么您就可以使用pyspark 来使用Spark Python API 操作数据.

    您还可以关注programming guide 以使用spark 中的不同语言

    【讨论】:

    • 不能在同一个 RDD 上同时使用 scala 和 pyspark。通过将 RDD 转换为 Dataframe/Dataset 然后将其注册为临时视图,可以在 Notebooks (Zeppelin/Jupyter) 中实现。其他选项是如上所述的管道。
    猜你喜欢
    • 2016-06-13
    • 2019-05-04
    • 1970-01-01
    • 1970-01-01
    • 2010-11-05
    • 2016-09-01
    • 2015-03-21
    • 2014-05-25
    • 2022-01-25
    相关资源
    最近更新 更多