【问题标题】:create and display spark dataframe from simple json file从简单的 json 文件创建和显示 spark 数据框
【发布时间】:2015-06-24 23:56:01
【问题描述】:

以下简单的 json DataFrame 测试在本地模式下运行 Spark 时可以正常工作。这是 Scala sn-p,但我也成功地在 Java 和 Python 中实现了同样的功能:

sparkContext.addFile(jsonPath)

val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val dataFrame = sqlContext.jsonFile(jsonPath)
dataFrame.show()

我确保 jsonPath 在驱动端和工作端都有效。而我正在调用 addFile... json 文件非常琐碎:

[{"age":21,"name":"abc"},{"age":30,"name":"def"},{"age":45,"name":"ghi"}]

当我切换出本地模式并使用具有单个主/从器的单独 Spark 服务器时,完全相同的代码会失败。我在 Scala、Java 和 Python 中尝试过同样的测试,试图找到一些有效的组合。他们都得到基本相同的错误。以下错误来自 Scala 驱动程序,但 Java/Python 错误消息几乎相同:

15/04/17 18:05:26 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.0.2.15): java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747)
    at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033)
    at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
    at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
    at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
    at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)

这很令人沮丧。我基本上是在尝试从官方文档中获取代码 sn-ps 来工作。

更新:感谢 Paul 的深入回应。我在执行相同的步骤时遇到错误。仅供参考,之前我使用的是驱动程序,因此命名为 sparkContext 而不是 shell 默认名称 sc。这是删除了多余日志的缩写 sn-p:

➜  spark-1.3.0  ./bin/spark-shell --master spark://172.28.128.3:7077
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Scala version 2.11.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val dataFrame = sqlContext.jsonFile("/private/var/userspark/test.json")
15/04/20 18:01:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.0.2.15): java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747)
    at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1033)
    at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
    at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
    at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
    at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
    at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
    at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
    (...)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.0.2.15): java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2747)

【问题讨论】:

  • 我记得阅读它并不能真正解析完整的 JSON,即不需要开始/结束方括号,并且每行需要一个对象,以便它可以拆分文件并并行运行。

标签: apache-spark dataframe


【解决方案1】:

虽然我可以让您的简单示例正常运行,但我同意 spark 可能会令人沮丧...

我这里有 spark 1.3.0,使用 openjdk 8 从源代码构建。

将您的文件与spark-shellspark-submit 一起使用会因各种原因而失败,其中示例/文档与已发布的代码相比可能已过时,需要稍作调整。

例如,在 spark-shell 中,sparkContext 已经作为sc 提供,而不是作为sparkContext,并且有一个类似的预定义sqlContext。 spark-shell 发出 INFO 消息,宣布这些上下文的创建。

对于spark-submit,我遇到了某种 jar 错误。这可能是本地问题。

无论如何,如果我缩短它,它运行良好。为了执行这个简短的示例,json 文件是否每行有一个对象似乎也无关紧要。对于未来的测试,生成一个大示例并确定它是否跨内核并行运行以及是否需要每行一个对象(没有逗号或顶部括号)来完成此操作可能很有用。

so1-works.sc

val dataFrame = sqlContext.jsonFile("/data/so1.json")
dataFrame.show()

输出、抑制INFO等消息...

paul@ki6cq:~/spark/spark-1.3.0$ ./bin/spark-shell --master spark://192.168.1.10:7077 <./so1-works.sc 2>/dev/null
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.8.0_40-internal)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> 

scala> val dataFrame = sqlContext.jsonFile("/data/so1.json")
dataFrame: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> dataFrame.show()
age name
21  abc 
30  def 
45  ghi 

scala> Stopping spark context.
paul@ki6cq:~/spark/spark-1.3.0$ paul@ki6cq:~/spark/spark-1.3.0$ 

奇怪的提示:之后,我必须执行reset 才能让我的 linux 终端恢复正常。

好的,那么,首先,尝试像我所做的那样缩短示例。

如果这不能解决问题,您可以尝试复制我的环境。

这可能很简单,因为我将 docker 用于 master 和 worker,并将图像发布到公共 dockerhub。

未来读者须知:我的公共 dockerhub 图像不是 spark 的官方图像,可能会更改或删除。

您需要两台计算机(一台运行 Linux 或与 docker 兼容的操作系统来在 docker 容器中托管 master 和 worker,另一台最好也运行 Linux 或具有 spark-1.3.0 构建的东西)在家庭防火墙路由器设备后面(DLink、Netgear 等...)。我假设本地网络是 192.168.1.*,192.168.1.10 和 .11 是免费的,并且路由器将正确路由,或者您知道如何使其正确路由。您可以在下面的运行脚本中更改这些地址。

如果您只有一台计算机,由于我在这里使用的桥接网络方法的特殊性,可能无法正常工作以与主机进行通信。
它可以工作,但比我想添加到已经很长的帖子中要多一点。

在一台 Linux 计算机上,install dockerpipework utility 和这些 shell 脚本(调整授予 spark 的内存,似乎不需要编辑额外的工作人员):

./run-docker-spark

#!/bin/bash
sudo -v
MASTER=$(docker run --name="master" -h master --add-host master:192.168.1.10 --add-host spark1:192.168.1.11 --add-host spark2:192.168.1.12 --add-host spark3:192.168.1.13 --add-host spark4:192.168.1.14 --expose=1-65535 --env SPARK_MASTER_IP=192.168.1.10 -d drpaulbrewer/spark-master:latest)
sudo pipework eth0 $MASTER 192.168.1.10/24@192.168.1.1
SPARK1=$(docker run --name="spark1" -h spark1 --add-host master:192.168.1.10 --add-host spark1:192.168.1.11 --add-host spark2:192.168.1.12 --add-host spark3:192.168.1.13 --add-host spark4:192.168.1.14 --expose=1-65535 --env mem=10G --env master=spark://192.168.1.10:7077 -v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
sudo pipework eth0 $SPARK1 192.168.1.11/24@192.168.1.1

./stop-docker-spark

#!/bin/bash
docker kill master spark1
docker rm master spark1

另一台 linux 计算机将是您的​​用户计算机,并且需要构建 spark-1.3.0。在两台计算机上创建一个 /data 目录并在那里安装 json 文件。然后只在计算机上运行 ./run-docker-spark 一次,该计算机充当将容纳 master 和 worker 的容器(如 VM)的组合主机。要停止 spark 系统,请使用停止脚本。如果您重新启动,或者出现严重错误,您需要在运行脚本再次运行之前运行停止脚本。

查看master和worker在http://192.168.1.10:8080配对

如果是这样,那么你应该很好地尝试顶部的 spark-shell 命令行。

您不需要这些 dockerfile,因为构建发布在公共 dockerhub 上,并且下载是在 docker run 中自动进行的。但如果您想了解事物的构建方式、JDK、maven 命令等,它们就在这里。

我从一个常见的 Dockerfile 开始,我将它放在一个名为 spark-roasted-elephant 的目录中,因为这是一个非 hadoop 构建,并且 O'Reilley 的 hadoop 书上有一个大象。您需要从 spark 网站获取 spark-1.3.0 源 tarball 以放入包含 Dockerfile 的目录。这个 Dockerfile 可能没有暴露足够的端口(spark 对端口的使用非常混乱,而不幸的是 docker 旨在包含和记录端口的使用),并且在运行 master 和 worker 的 shell 脚本中覆盖了暴露.如果您要求 docker 列出正在运行的内容,这会引起一些不快,因为这包括一个端口列表。

paul@home:/Z/docker$ cat ./spark-roasted-elephant/Dockerfile
# Copyright 2015 Paul Brewer http://eaftc.com
# License: MIT
# this docker file builds a non-hadoop version of spark for standalone experimentation
# thanks to article at http://mbonaci.github.io/mbo-spark/ for tips
FROM ubuntu:15.04
MAINTAINER drpaulbrewer@eaftc.com
RUN adduser --disabled-password --home /spark spark
WORKDIR /spark
ADD spark-1.3.0.tgz /spark/ 
WORKDIR /spark/spark-1.3.0
RUN sed -e 's/archive.ubuntu.com/www.gtlib.gatech.edu\/pub/' /etc/apt/sources.list > /tmp/sources.list && mv /tmp/sources.list /etc/apt/sources.list
RUN apt-get update && apt-get --yes upgrade \
    && apt-get --yes install sed nano curl wget openjdk-8-jdk scala \
    && echo "JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64" >>/etc/environment \
    && export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" \
    && ./build/mvn -Phive -Phive-thriftserver -DskipTests clean package \
    && chown -R spark:spark /spark \
    && mkdir /var/run/sshd
EXPOSE 2222 4040 6066 7077 7777 8080 8081 

master 是从目录 ./spark-master 构建的,使用 dockerfile 和 shell 脚本包含到容器中。这是 dockerfile 和 shell 脚本。

paul@home:/Z/docker$ cat ./spark-master/Dockerfile
FROM drpaulbrewer/spark-roasted-elephant:latest
MAINTAINER drpaulbrewer@eaftc.com
ADD my-spark-master.sh /spark/
USER spark
CMD /spark/my-spark-master.sh

paul@home:/Z/docker$ cat ./spark-master/my-spark-master.sh
#!/bin/bash -e
cd /spark/spark-1.3.0
# set SPARK_MASTER_IP to a net interface address, e.g. 192.168.1.10
export SPARK_MASTER_IP
./sbin/start-master.sh 
sleep 10000d

对于工人来说:

paul@home:/Z/docker$ cat ./spark-worker/Dockerfile
FROM drpaulbrewer/spark-roasted-elephant:latest
MAINTAINER drpaulbrewer@eaftc.com
ADD my-spark-worker.sh /spark/
CMD /spark/my-spark-worker.sh
paul@home:/Z/docker$ cat ./spark-worker/my-spark-worker.sh
#!/bin/bash -e
cd /spark/spark-1.3.0
sleep 10
# dont use ./sbin/start-slave.sh it wont take numeric URL
mkdir -p /Z/data
mkdir -p /user/hive/warehouse
chown -R spark:spark /user
su -c "cd /spark/spark-1.3.0 && ./bin/spark-class org.apache.spark.deploy.worker.Worker --memory $mem $master" spark

尽管到目前为止,这篇文章已经变成了“如何为 spark 制作 Dockerfile?”的答案。它不是故意的。这些 Dockerfile 对我来说是实验性的,我不会在生产中使用它们,也不保证它们的质量。我不喜欢 Docker 上一个备受推崇的 spark 源,因为他们从事将一堆容器链接在一起的懒惰做法,而且它非常庞大并且需要很长时间才能下载。在这里,层数少得多,下载量也更小。此处发布的内容与其说是 docker 示例,不如说是为了让您可以确定您自己的环境中有什么不同。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-04-18
    • 2017-06-27
    • 1970-01-01
    • 1970-01-01
    • 2016-12-21
    • 2017-04-24
    • 2017-12-24
    相关资源
    最近更新 更多