【问题标题】:Flume Java Custom Sink and SourceFlume Java 自定义接收器和源
【发布时间】:2018-04-18 13:57:57
【问题描述】:

Flume 版本:- 1.6

卡夫卡版本:- 1.0.0

动物园管理员版本:- 3.4.10

我们有一个将 Flume 与 Kafka 和 Hadoop 连接起来的快速需求,因此我们从 Kafka Consumer 摄取并将事件摄取到 Hadoop。一切都是使用 conf 文件配置的,到这里一切都很好。

现在我们需要检查是否可以使用自定义 Java 代码来完成。我已经尝试了互联网上的很多可用选项来设计 kafka 源和 HDFS 接收器。我在 cloudera 虚拟机中试过这个。

Kafka 和 zookeeper 都已启动并正在运行。

代码正在运行,但是当我生成消息时,HDFS 中没有插入任何内容。

如果有人能指出我所缺少的,那将非常有帮助。

我试过的代码是..

KafkaChannel channel = new KafkaChannel();

Map<String, String> channelParamters = new HashMap<String, String>();

channelParamters.put("brokerList", "localhost:9092");
channelParamters.put("zookeeperConnect","localhost:2181");
channelParamters.put("topic","integration");
channelParamters.put("groupId","channel");
channelParamters.put("batchSize", "15");
channelParamters.put("zookeeper.connect","localhost:2181");
channelParamters.put("clientId", "channel");
channelParamters.put("readSmallestOffset","true");
channelParamters.put("interceptors","i1");
channelParamters.put("interceptors.i1.type","host");
channelParamters.put("consumer.timeout.ms","1000");

channelParamters.put("parseAsFlumeEvent", "false");

channel.setName("KafkaSource");

Context channelContext = new Context(channelParamters);


final Map<String, String> properties = new HashMap<String, String>();

/** Sink Properties start **/

HDFSEventSink eventSink = new HDFSEventSink();

eventSink.setName("HDFSEventSink-" + "kafkaEventSink");

String hdfsBasePath = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events";

properties.put("hdfs.type", "hdfs");
properties.put("hdfs.path", hdfsBasePath + "/%Y/%m/%d/%H");
properties.put("hdfs.rollInterval ", "0");
properties.put("hdfs.rollSize ", "2048");
properties.put("hdfs.rollCount ", "0");
properties.put("hdfs.fileType ", " DataStream");
properties.put("channel", channel.getName());
properties.put("hdfs.maxOpenFiles", String.valueOf(1));

properties.put("hdfs.filePrefix ", " kafka_host");
properties.put("hdfs.fileSuffix ", " .txt");
properties.put("hdfs.idleTimeout ", "60");

/** Sink Properties end **/

Context sinkContext = new Context(properties);

eventSink.configure(sinkContext);

eventSink.setChannel(channel);

Configurables.configure(channel, channelContext);

eventSink.start();

channel.start();

【问题讨论】:

    标签: java hadoop apache-kafka flume flume-ng


    【解决方案1】:

    我不清楚您要使用自定义 Java 代码实现什么目标。

    更好的方法是使用 Kafka Connect(Apache Kafka 的一部分)和开源 HDFS connector。它完全按照您在此处执行的操作,只是要设置的配置文件、处理架构、缩放、自动故障转移等。

    【讨论】:

    • 感谢您抽出时间回答我的问题。我们尝试使用 Kafka Connect,但遇到了一些问题。要求还保持基于 Java 的模块。主要需求是将数据从 kafka 主题存储到 HDFS 层以供进一步使用,因此我们尝试使用 kafka 通道存储到 HDFS 事件接收器(在上面的代码中)。
    • 如果您想发布有关 Kafka Connect 问题的问题,我很乐意提供帮助。自己编写此代码有点试图解决已解决的问题,并通过具有相关维护和支持考虑的定制代码库来实现。
    • 那么我应该创建一个关于错误的新问题,或者我可以提及我们在 Kafka connect 中遇到的一个小 sn-p 错误?
    • 我会把它作为一个新问题发布。
    猜你喜欢
    • 1970-01-01
    • 2012-10-23
    • 1970-01-01
    • 1970-01-01
    • 2015-09-02
    • 2014-11-11
    • 1970-01-01
    • 1970-01-01
    • 2019-02-10
    相关资源
    最近更新 更多