【发布时间】:2018-04-18 13:57:57
【问题描述】:
Flume 版本:- 1.6
卡夫卡版本:- 1.0.0
动物园管理员版本:- 3.4.10
我们有一个将 Flume 与 Kafka 和 Hadoop 连接起来的快速需求,因此我们从 Kafka Consumer 摄取并将事件摄取到 Hadoop。一切都是使用 conf 文件配置的,到这里一切都很好。
现在我们需要检查是否可以使用自定义 Java 代码来完成。我已经尝试了互联网上的很多可用选项来设计 kafka 源和 HDFS 接收器。我在 cloudera 虚拟机中试过这个。
Kafka 和 zookeeper 都已启动并正在运行。
代码正在运行,但是当我生成消息时,HDFS 中没有插入任何内容。
如果有人能指出我所缺少的,那将非常有帮助。
我试过的代码是..
KafkaChannel channel = new KafkaChannel();
Map<String, String> channelParamters = new HashMap<String, String>();
channelParamters.put("brokerList", "localhost:9092");
channelParamters.put("zookeeperConnect","localhost:2181");
channelParamters.put("topic","integration");
channelParamters.put("groupId","channel");
channelParamters.put("batchSize", "15");
channelParamters.put("zookeeper.connect","localhost:2181");
channelParamters.put("clientId", "channel");
channelParamters.put("readSmallestOffset","true");
channelParamters.put("interceptors","i1");
channelParamters.put("interceptors.i1.type","host");
channelParamters.put("consumer.timeout.ms","1000");
channelParamters.put("parseAsFlumeEvent", "false");
channel.setName("KafkaSource");
Context channelContext = new Context(channelParamters);
final Map<String, String> properties = new HashMap<String, String>();
/** Sink Properties start **/
HDFSEventSink eventSink = new HDFSEventSink();
eventSink.setName("HDFSEventSink-" + "kafkaEventSink");
String hdfsBasePath = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events";
properties.put("hdfs.type", "hdfs");
properties.put("hdfs.path", hdfsBasePath + "/%Y/%m/%d/%H");
properties.put("hdfs.rollInterval ", "0");
properties.put("hdfs.rollSize ", "2048");
properties.put("hdfs.rollCount ", "0");
properties.put("hdfs.fileType ", " DataStream");
properties.put("channel", channel.getName());
properties.put("hdfs.maxOpenFiles", String.valueOf(1));
properties.put("hdfs.filePrefix ", " kafka_host");
properties.put("hdfs.fileSuffix ", " .txt");
properties.put("hdfs.idleTimeout ", "60");
/** Sink Properties end **/
Context sinkContext = new Context(properties);
eventSink.configure(sinkContext);
eventSink.setChannel(channel);
Configurables.configure(channel, channelContext);
eventSink.start();
channel.start();
【问题讨论】:
标签: java hadoop apache-kafka flume flume-ng