【问题标题】:Loading avro files into pyspark dataframes from hdfs从 hdfs 将 avro 文件加载到 pyspark 数据帧中
【发布时间】:2018-09-03 19:12:03
【问题描述】:

我构建了一个小型数据管道,将一些虚构的测试数据从本地目录(json 格式)移动到 hdfs(avro 格式)。这似乎工作正常(水槽没有显示错误),但可能是错误已经在这里。下一步是使用 databricks 加载器将 avro 文件转换为一些 pyspark 数据帧(我只能找到为此的 python 库)。现在让我解释一下我是如何做到的,这样你就可以看到我可能失败的地方:

1 ) 使用flume从json文件中创建avro文件

我的目标是将 json 数据从本地目录推送到 HDFS,这样我就可以使用 pySpark 对其进行分析。为此,我正在使用水槽。由于 json 在 HDFS 上的压缩效果不好,我还使用以下 flume.conf 将每个文件转换为 avro:

agent.sources.tail.type = exec
agent.sources.tail.shell = /bin/bash -c
agent.sources.tail.command = cat /home/user/Data/json/*
agent.sources.tail.batchsize = 10
agent.sources.tail.channels = MemChannel

agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 100
agent.channels.MemChannel.transactionCapacity = 100


agent.sinks.HDFS.channel = MemChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.fileSuffix=.avro
agent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/home/user/Data/hdfs/test_data
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.serializer=avro_event
agent.sinks.HDFS.serializer.compressionCodec=snappy

这确实运行没有任何错误,所以我假设 Flume 将每个文件作为正确的 avro 文件移动到 HDFS。

2) 通过加载 avro 文件创建数据框

现在是我尝试在 pyspark 中将单个 avro 文件作为数据帧读取的部分:

from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.avro.compression.codec","snappy") 

# creates a dataframe by reading a single avro file 
df = sqlContext.read.format("com.databricks.spark.avro").load("hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535723039267.avro")

这显示了以下(错误)输出:

df.show()
+-------+--------------------+
|headers|                body|
+-------+--------------------+
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
+-------+--------------------+
only showing top 5 rows

这显然不是我想要的,因为上面的整个代码似乎只是像纯文本文件一样读取 avro 文件,因此没有解析结构。之前,我只是创建一个使用相同数据但存储在原始 json 文件中的数据框。

# creates a dataframe by reading a single json file
df = sqlContext.read.json('hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535702513118.json')

这就是所需(正确)输出的样子:

df.show()
+---------------+--------------------+---+-------------------+-----------------+
|       category|             content| id|          timestamp|             user|
+---------------+--------------------+---+-------------------+-----------------+
|              A|Million near orde...|801|2018-08-30_16:49:53|      Molly Davis|
|              D|Determine company...|802|2018-08-30_16:49:53|       Ronnie Liu|
|              B|Among themselves ...|803|2018-08-30_16:49:53|       Lori Brown|
|              C|Through various d...|804|2018-08-30_16:49:53|   Judith Herrera|
|              C|Week toward so co...|805|2018-08-30_16:49:53|Teresa Cunningham|
+---------------+--------------------+---+-------------------+-----------------+
only showing top 5 rows

我怎样才能为转换后的 avro 文件获得相同的结果?

【问题讨论】:

  • 你应该可以spark.read.avro。如果这仍然不起作用,您需要提供一个架构,以便它可以正确读取数据。
  • 实际上根据this,您的示例看起来架构是正确的。一个好的第一步可能是将您的身体数据转换为字符串,以便您可以阅读它。我链接到的博客还暗示您可以使用自定义架构,所以也许这更符合您的喜好。
  • 简而言之,这不是 Spark 问题,而是 Flume 配置问题。
  • spark.read.avro 对于我的版本似乎不存在(python3 上的 spark2.3.0)。那么您是在告诉我,我的数据帧中的 [headers|body] 结构只是水槽序列化程序提供的标准 avro 模式吗?这意味着创建自定义架构是这里唯一的选择?
  • “不存在”是什么意思?您需要在 spark 提交或 shell 中包含 spark-avro--jars 包。不,它在标准 Spark 安装中不“存在”,但您仍然可以下载它

标签: python apache-spark pyspark apache-spark-sql avro


【解决方案1】:
df=spark.read.format("avro").load("<file-path>")
json_df=df.select(df0.Body.cast("string")).rdd.map(lambda x: x[0])   # parsing body of avro file and converting to json
data=spark.read.json(json_df)
data.show()

【讨论】:

  • 您的答案可以通过额外的支持信息得到改进。请edit 添加更多详细信息,例如引用或文档,以便其他人可以确认您的答案是正确的。你可以找到更多关于如何写好答案的信息in the help center
【解决方案2】:

对于 spark 对于 spark >= 2.4,甚至 spark3 或更高版本:

  1. 下载依赖spark-avro,可以找到maven上的java依赖。请找到您的 spark 和 scala 的当前版本。版本不匹配会导致失败。
  2. 在 Spark3 中,使用此方法创建 Spark 会话并添加您的依赖项。
spark = SparkSession.builder.master('local[*]')\
                           .appName('sample')\
                           .config("spark.jars","YOUR_JAR_PATH/spark-avro_2.12-3.2.1.jar")\
                           .getOrCreate()

并读取您的 avro 数据

sample_df = spark.read.format("avro").load("YOUR_AVRO_DATA_PATH")

其实添加第三方java依赖总是可以用这个方法的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-05
    相关资源
    最近更新 更多