【问题标题】:Flume memory chanel to HDFS sinkFlume 内存通道到 HDFS 接收器
【发布时间】:2015-09-08 16:03:11
【问题描述】:

我遇到了 Flume 的问题(Cloudera CDH 5.3 上的 1.5):

spoolDir source -> memory channel -> HDFS sink

我正在尝试做的事情:每 5 分钟,大约 20 个文件被推送到假脱机目录(从远程存储中获取)。每个文件包含多行,每一行是一个日志(在 JSON 中)。文件大小在 10KB 到 1MB 之间。

当我启动代理时,所有文件都成功推送到 HDFS。 1 分钟后(这是我在 flume.conf 中设置的),文件滚动(删除 .tmp 后缀并关闭)。

但是,当在假脱机目录中找到新文件时,我收到消息:

org.apache.flume.source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 250 milliseconds

在尝试了很多不同的配置都没有成功(增加/减少通道 transactionCapacity 和容量,增加/减少 batchSize 等)后,我请求您的帮助。

这是我最新的水槽配置:

# source definition
sebanalytics.sources.spooldir-source.type = spooldir
sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in
sebanalytics.sources.spooldir-source.basenameHeader = true
sebanalytics.sources.spooldir-source.basenameHeaderKey = basename
sebanalytics.sources.spooldir-source.batchSize = 10
sebanalytics.sources.spooldir-source.deletePolicy = immediate
# Max blob size: 1.5Go
sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000
# Attach the interceptor to the source
sebanalytics.sources.spooldir-source.interceptors = json-interceptor
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.app.flume.interceptor.JsonInterceptor$Builder
# Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename)
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid

# channel definition
sebanalytics.channels.mem-channel-1.type = memory
sebanalytics.channels.mem-channel-1.capacity = 1000000
sebanalytics.channels.mem-channel-1.transactionCapacity = 10

# sink definition
sebanalytics.sinks.hdfs-sink-1.type = hdfs
sebanalytics.sinks.hdfs-sink-1.hdfs.path = hdfs://StandbyNameNode/data/in
sebanalytics.sinks.hdfs-sink-1.hdfs.filePrefix = %{resources}_%{ssid}
sebanalytics.sinks.hdfs-sink-1.hdfs.fileSuffix = .json
sebanalytics.sinks.hdfs-sink-1.hdfs.fileType = DataStream
sebanalytics.sinks.hdfs-sink-1.hdfs.writeFormat = Text
sebanalytics.sinks.hdfs-sink-1.hdfs.rollInterval = 3600
sebanalytics.sinks.hdfs-sink-1.hdfs.rollSize = 63000000
sebanalytics.sinks.hdfs-sink-1.hdfs.rollCount = 0
sebanalytics.sinks.hdfs-sink-1.hdfs.batchSize = 10
sebanalytics.sinks.hdfs-sink-1.hdfs.idleTimeout = 60

# connect source and sink to channel
sebanalytics.sources.spooldir-source.channels = mem-channel-1
sebanalytics.sinks.hdfs-sink-1.channel = mem-channel-1

【问题讨论】:

    标签: cloudera-cdh flume-ng


    【解决方案1】:

    完整的通道意味着:通道无法从源接收更多事件,因为接收器消耗这些事件的速度比源慢。

    增加通道容量只能解决问题。可能的解决方案:

    • 改进接收器的处理...如果接收器是自定义接收器(改进/避免循环,使用更高效的后端 API 等)。在这种情况下,这似乎是不可能的,因为您使用的是默认的 HDFS 接收器。
    • 降低数据发送到源的频率。不过,我猜你不想/不能这样做,因为你的处理要求。
    • 添加更多并行工作的接收器。我不确定,但我可以想象 Flume 的设计师决定在单独的线程中运行每个水槽。如果这是真的,那么您可以尝试多个并行 HDFS 接收器。要将数据拆分为多个接收器,您必须使用不同于 default replicating onemultiplexing selector

    HTH!

    【讨论】:

    • 我已经尝试过您的解决方案,但没有成功。但是,我将源更改为 HTTP,即使有大量输入,我也再也没有遇到过“通道已满”的问题。所以,我解决了我的问题......但我更愿意理解为什么它不适用于 SpoolDir。但我可以不用:)
    • 原因一定很简单:SpoolDir在数据处理上肯定比Http源慢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多