【发布时间】:2019-07-12 00:00:17
【问题描述】:
在阅读了有关 Apache Flume 以及它在处理客户端事件方面提供的好处之后,我决定是时候开始更详细地研究这个问题了。另一个巨大的好处似乎是它可以处理 Apache Avro 对象 :-) 但是,我很难理解 Avro 模式如何用于验证接收到的 Flume 事件。
为了帮助更详细地了解我的问题,我在下面提供了代码 sn-ps;
Avro 架构
出于本文的目的,我使用了一个示例架构,它定义了一个包含 2 个字段的嵌套 Object1 记录。
{
"namespace": "com.example.avro",
"name": "Example",
"type": "record",
"fields": [
{
"name": "object1",
"type": {
"name": "Object1",
"type": "record",
"fields": [
{
"name": "value1",
"type": "string"
},
{
"name": "value2",
"type": "string"
}
]
}
}
]
}
嵌入式 Flume 代理
在我的 Java 项目中,我目前正在使用 Apache Flume 嵌入式代理,详情如下;
public static void main(String[] args) {
final Event event = EventBuilder.withBody("Test", Charset.forName("UTF-8"));
final Map<String, String> properties = new HashMap<>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "100");
properties.put("sinks", "sink1");
properties.put("sink1.type", "avro");
properties.put("sink1.hostname", "192.168.99.101");
properties.put("sink1.port", "11111");
properties.put("sink1.batch-size", "1");
properties.put("processor.type", "failover");
final EmbeddedAgent embeddedAgent = new EmbeddedAgent("TestAgent");
embeddedAgent.configure(properties);
embeddedAgent.start();
try {
embeddedAgent.put(event);
} catch (EventDeliveryException e) {
e.printStackTrace();
}
}
在上面的示例中,我正在创建一个新的 Flume 事件,其中“测试”定义为将事件发送到在 VM (192.168.99.101) 内运行的单独 Apache Flume 代理的事件主体。
远程 Flume 代理
如上所述,我已将此代理配置为从嵌入式 Flume 代理接收事件。该代理的 Flume 配置如下所示;
# Name the components on this agent
hello.sources = avroSource
hello.channels = memoryChannel
hello.sinks = loggerSink
# Describe/configure the source
hello.sources.avroSource.type = avro
hello.sources.avroSource.bind = 0.0.0.0
hello.sources.avroSource.port = 11111
hello.sources.avroSource.channels = memoryChannel
# Describe the sink
hello.sinks.loggerSink.type = logger
# Use a channel which buffers events in memory
hello.channels.memoryChannel.type = memory
hello.channels.memoryChannel.capacity = 1000
hello.channels.memoryChannel.transactionCapacity = 1000
# Bind the source and sink to the channel
hello.sources.avroSource.channels = memoryChannel
hello.sinks.loggerSink.channel = memoryChannel
我正在执行以下命令来启动代理;
./bin/flume-ng agent --conf conf --conf-file ../sample-flume.conf --name hello -Dflume.root.logger=TRACE,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
当我执行 Java 项目主方法时,我看到“测试”事件通过以下输出传递到我的记录器接收器;
2019-02-18 14:15:09,998 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 54 65 73 74 Test }
但是,我不清楚应该在哪里配置 Avro 模式以确保 Flume 仅接收和处理有效事件。有人可以帮我理解我哪里出错了吗?或者,如果我误解了 Flume 是如何将 Flume 事件转换为 Avro 事件的意图?
除了上述之外,我还尝试在更改 Avro 架构以指定直接与远程 Flume 代理对话的协议后使用 Avro RPC 客户端,但是当我尝试发送事件时,我看到以下错误;
Exception in thread "main" org.apache.avro.AvroRuntimeException: Not a remote message: test
at org.apache.avro.ipc.Requestor$Response.getResponse(Requestor.java:532)
at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:359)
at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:322)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.messageReceived(NettyTransceiver.java:613)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:595)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我的目标是能够确保我的应用程序填充的事件符合生成的 Avro 模式,以避免发布无效事件。我希望我使用嵌入式 Flume 代理来实现这一点,但如果这不可能,那么我会考虑使用 Avro RPC 方法直接与我的远程 Flume 代理对话。
任何帮助/指导都会有很大帮助。提前致谢。
更新
进一步阅读后,我想知道我是否误解了 Apache Flume 的目的。我最初认为这可以用于根据数据/模式自动创建 Avro 事件,但现在想知道应用程序是否应该负责生成 Avro 事件,这些事件将根据通道配置存储在 Flume 中并通过sink(在我的例子中是 Spark Streaming 集群)。
如果以上是正确的,那么我想知道 Flume 是否需要了解架构或只是我的 Spark Streaming 集群最终将处理这些数据?如果 Flume 需要了解架构,那么您能否详细说明如何实现这一点?
提前致谢。
【问题讨论】: