【发布时间】:2017-03-03 09:49:30
【问题描述】:
我有一个文件夹包含很多 gzip 文件。每个 gzip 文件都包含 xml 文件。我曾使用水槽将文件流式传输到 HDFS。下面是我的配置文件:
agent1.sources = src
agent1.channels = ch
agent1.sinks = sink
agent1.sources.src.type = spooldir
agent1.sources.src.spoolDir = /home/tester/datafiles
agent1.sources.src.channels = ch
agent1.sources.src.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
agent1.channels.ch.type = memory
agent1.channels.ch.capacity = 1000
agent1.channels.ch.transactionCapacity = 1000
agent1.sinks.sink.type = hdfs
agent1.sinks.sink.channel = ch
agent1.sinks.sink.hdfs.path = /user/tester/datafiles
agent1.sinks.sink.hdfs.fileType = CompressedStream
agent1.sinks.sink.hdfs.codeC = gzip
agent1.sinks.sink.hdfs.fileSuffix = .gz
agent1.sinks.sink.hdfs.rollInterval = 0
agent1.sinks.sink.hdfs.rollSize = 122000000
agent1.sinks.sink.hdfs.rollCount = 0
agent1.sinks.sink.hdfs.idleTimeout = 1
agent1.sinks.sink.hdfs.batchSize = 1000
将文件流式传输到 HDFS 后,我使用 Spark 使用以下代码读取它:
df = sparkSession.read.format('com.databricks.spark.xml').options(rowTag='Panel', compression='gzip').load('/user/tester/datafiles')
但我在阅读时遇到问题。如果我手动将一个 gzip 文件上传到 HDFS 文件夹并重新运行上述 Spark 代码,它可以毫无问题地读取它。我不确定是不是因为水槽。
我尝试下载flume流式传输的文件并解压缩,当我查看内容时,它不再显示xml格式,它是一些不可读的字符。任何人都可以让我了解一下吗?谢谢。
【问题讨论】:
-
如何解压文件?使用
gunzip?用 spark 读取文件对您有什么问题?您是否尝试过手动为 spark-xml 指定模式? -
Mariusz,我没有解压缩文件。我试图在 gz 文件中流式传输并使用 spark 读取它。我没有手动指定架构。当我阅读流入的 gzip 文件并显示它的内容时,它会显示一些特殊字符。但是当我尝试在 HDFS 中手动上传 gzip 文件时,我可以毫无问题地读取它,它可以毫无问题地显示内容和架构。我想是因为 Flume?
标签: hadoop apache-spark flume-ng