【问题标题】:Apache-Apex : How to write data from kafka topic to hdfs filesystem?Apache-Apex:如何将数据从 kafka 主题写入 hdfs 文件系统?
【发布时间】:2018-12-23 04:38:11
【问题描述】:

我正在尝试从 kafka 主题中读取 dat 并将其写入 HDFS 文件系统,我使用来自 [https://github.com/apache/apex-malhar/tree/master/examples/kafka]. 不幸的是,在设置了 kafka 属性和 hadoop 配置后,我的 hdfs 2.6.0 系统中没有创建数据。 PS:控制台没有显示任何错误,一切似乎都正常

这里是我用于我的应用程序的代码

public class TestConsumer {
    public static void main(String[] args) {
        Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
        consumerThread.start();
        ApplicationTest a = new ApplicationTest();
         try {
            a.testApplication();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

这里是来自 apex malhar 的 ApplicationTest 类示例

package org.apache.apex.examples.kafka.kafka2hdfs;

import org.apache.log4j.Logger;
import javax.validation.ConstraintViolationException;

import org.junit.Rule;


import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;

import com.datatorrent.api.LocalMode;

import info.batey.kafka.unit.KafkaUnitRule;




/**
 * Test the DAG declaration in local mode.
 */

public class ApplicationTest
{
  private static final Logger LOG = Logger.getLogger(ApplicationTest.class);
  private static final String TOPIC = "kafka2hdfs";

  private static final int zkPort = NetUtils.getFreeSocketPort();
  private static final int brokerPort = NetUtils.getFreeSocketPort();
  private static final String BROKER = "localhost:" + brokerPort;
  private static final String FILE_NAME = "test";
  private static final String FILE_DIR = "./target/tmp/FromKafka";



  // broker port must match properties.xml
  @Rule
  private static  KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);

  public void testApplication() throws Exception
  {
    try {
      // run app asynchronously; terminate after results are checked
      LocalMode.Controller lc = asyncRun();


      lc.shutdown();
    } catch (ConstraintViolationException e) {
        LOG.error("constraint violations: " + e.getConstraintViolations());

    }
  }
  private Configuration getConfig()
  {
    Configuration conf = new Configuration(false);
    String pre = "dt.operator.kafkaIn.prop.";
    conf.setEnum(pre + "initialOffset", AbstractKafkaInputOperator.InitialOffset.EARLIEST);
    conf.setInt(pre + "initialPartitionCount", 1);
    conf.set(pre + "topics", TOPIC);
    conf.set(pre + "clusters", BROKER);

    pre = "dt.operator.fileOut.prop.";
    conf.set(pre + "filePath", FILE_DIR);
    conf.set(pre + "baseName", FILE_NAME);
    conf.setInt(pre + "maxLength", 40);
    conf.setInt(pre + "rotationWindows", 3);

    return conf;
  }

  private LocalMode.Controller asyncRun() throws Exception
  {
    Configuration conf = getConfig();
    LocalMode lma = LocalMode.newInstance();
    lma.prepareDAG(new KafkaApp(), conf);
    LocalMode.Controller lc = lma.getController();
    lc.runAsync();
    return lc;
  }
}

【问题讨论】:

  • 如果我不得不猜测,你在做runAsync,然后立即调用控制器的关闭方法
  • 无论如何,如果你有Kafka,那么你可以使用Kafka Connect将消息写入HDFS(不需要安装Confluent)
  • 感谢回复,但我认为将数据写入 hdfs 的部分在“lma.prepareDAG(new KafkaApp(), conf);”中定义asyncRun() 方法内的行。 PS:这是我第一次使用它,你能告诉我更清楚吗?
  • 我不知道 Apex。我听说卡夫卡标签。请参阅来自 Confluent docs.confluent.io/current/connect/connect-hdfs/docs/index.html 的文档和博客 engineering.pandora.com/… 我的意思是,您不需要编写比某些配置文件更多的代码
  • 关于你正在尝试的,不过,这个例子看起来不像你的代码......github.com/apache/apex-malhar/tree/master/examples/kafka/src/…

标签: java apache-kafka hdfs apache-apex


【解决方案1】:

在 runAsync 之后和关闭之前,您需要等待预期的结果(否则 DAG 将立即退出)。这实际上就是在example 中发生的事情。

【讨论】:

  • 很遗憾,这并不能解决问题,我仍然无法找到存储在 hdfs 文件系统中的数据
猜你喜欢
  • 1970-01-01
  • 2019-07-30
  • 2019-06-15
  • 1970-01-01
  • 2019-04-23
  • 2015-10-05
  • 1970-01-01
  • 2018-09-16
相关资源
最近更新 更多