【问题标题】:Can apache flume hdfs sink accept dynamic path to write?apache flume hdfs sink 可以接受动态路径写入吗?
【发布时间】:2013-02-12 10:17:07
【问题描述】:

我是 apache flume 的新手。
我正在尝试查看如何获取 json(作为 http 源),对其进行解析并根据内容将其存储到 hdfs 上的动态路径中。
例如:
如果 json 是:

[{   
  "field1" : "value1",
  "field2" : "value2"
}]

那么 hdfs 路径将是:
/some-default-root-path/value1/value2/some-value-name-file
有这样的水槽配置可以让我这样做吗?

这是我当前的配置(通过http接受一个json,并根据时间戳将其存储在一个路径中):

#flume.conf: http source, hdfs sink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type =  org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 9000
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/uri/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

谢谢!

【问题讨论】:

    标签: apache hadoop hdfs flume


    【解决方案1】:

    解决方案在水槽中documentation for the hdfs sink

    这是修改后的配置:

    #flume.conf: http source, hdfs sink
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type =  org.apache.flume.source.http.HTTPSource
    a1.sources.r1.port = 9000
    #a1.sources.r1.handler = org.apache.flume.http.JSONHandler
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /user/uri/events/%{field1}
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1  
    

    和卷曲:

    curl -X POST -d '[{  "headers" : {           "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"            },  "body" : "random_body"  }]' localhost:9000
    

    【讨论】:

    • 我正在使用不同的源 (RabbitMQ),并且我自己正在传递一个 JSON 有效负载。您描述的方法似乎不适用于我的情况。我假设,除非您遇到类似的问题,否则我的结果有问题
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-27
    • 1970-01-01
    • 2013-09-07
    相关资源
    最近更新 更多