【发布时间】:2018-09-03 19:12:03
【问题描述】:
我构建了一个小型数据管道,将一些虚构的测试数据从本地目录(json 格式)移动到 hdfs(avro 格式)。这似乎工作正常(水槽没有显示错误),但可能是错误已经在这里。下一步是使用 databricks 加载器将 avro 文件转换为一些 pyspark 数据帧(我只能找到为此的 python 库)。现在让我解释一下我是如何做到的,这样你就可以看到我可能失败的地方:
1 ) 使用flume从json文件中创建avro文件
我的目标是将 json 数据从本地目录推送到 HDFS,这样我就可以使用 pySpark 对其进行分析。为此,我正在使用水槽。由于 json 在 HDFS 上的压缩效果不好,我还使用以下 flume.conf 将每个文件转换为 avro:
agent.sources.tail.type = exec
agent.sources.tail.shell = /bin/bash -c
agent.sources.tail.command = cat /home/user/Data/json/*
agent.sources.tail.batchsize = 10
agent.sources.tail.channels = MemChannel
agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 100
agent.channels.MemChannel.transactionCapacity = 100
agent.sinks.HDFS.channel = MemChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.fileSuffix=.avro
agent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/home/user/Data/hdfs/test_data
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.serializer=avro_event
agent.sinks.HDFS.serializer.compressionCodec=snappy
这确实运行没有任何错误,所以我假设 Flume 将每个文件作为正确的 avro 文件移动到 HDFS。
2) 通过加载 avro 文件创建数据框
现在是我尝试在 pyspark 中将单个 avro 文件作为数据帧读取的部分:
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
# creates a dataframe by reading a single avro file
df = sqlContext.read.format("com.databricks.spark.avro").load("hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535723039267.avro")
这显示了以下(错误)输出:
df.show()
+-------+--------------------+
|headers| body|
+-------+--------------------+
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
+-------+--------------------+
only showing top 5 rows
这显然不是我想要的,因为上面的整个代码似乎只是像纯文本文件一样读取 avro 文件,因此没有解析结构。之前,我只是创建一个使用相同数据但存储在原始 json 文件中的数据框。
# creates a dataframe by reading a single json file
df = sqlContext.read.json('hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535702513118.json')
这就是所需(正确)输出的样子:
df.show()
+---------------+--------------------+---+-------------------+-----------------+
| category| content| id| timestamp| user|
+---------------+--------------------+---+-------------------+-----------------+
| A|Million near orde...|801|2018-08-30_16:49:53| Molly Davis|
| D|Determine company...|802|2018-08-30_16:49:53| Ronnie Liu|
| B|Among themselves ...|803|2018-08-30_16:49:53| Lori Brown|
| C|Through various d...|804|2018-08-30_16:49:53| Judith Herrera|
| C|Week toward so co...|805|2018-08-30_16:49:53|Teresa Cunningham|
+---------------+--------------------+---+-------------------+-----------------+
only showing top 5 rows
我怎样才能为转换后的 avro 文件获得相同的结果?
【问题讨论】:
-
你应该可以
spark.read.avro。如果这仍然不起作用,您需要提供一个架构,以便它可以正确读取数据。 -
实际上根据this,您的示例看起来架构是正确的。一个好的第一步可能是将您的身体数据转换为字符串,以便您可以阅读它。我链接到的博客还暗示您可以使用自定义架构,所以也许这更符合您的喜好。
-
简而言之,这不是 Spark 问题,而是 Flume 配置问题。
-
spark.read.avro 对于我的版本似乎不存在(python3 上的 spark2.3.0)。那么您是在告诉我,我的数据帧中的 [headers|body] 结构只是水槽序列化程序提供的标准 avro 模式吗?这意味着创建自定义架构是这里唯一的选择?
-
“不存在”是什么意思?您需要在 spark 提交或 shell 中包含
spark-avro和--jars包。不,它在标准 Spark 安装中不“存在”,但您仍然可以下载它
标签: python apache-spark pyspark apache-spark-sql avro