【发布时间】:2018-12-23 04:38:11
【问题描述】:
我正在尝试从 kafka 主题中读取 dat 并将其写入 HDFS 文件系统,我使用来自 [https://github.com/apache/apex-malhar/tree/master/examples/kafka]. 不幸的是,在设置了 kafka 属性和 hadoop 配置后,我的 hdfs 2.6.0 系统中没有创建数据。 PS:控制台没有显示任何错误,一切似乎都正常
这里是我用于我的应用程序的代码
public class TestConsumer {
public static void main(String[] args) {
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
consumerThread.start();
ApplicationTest a = new ApplicationTest();
try {
a.testApplication();
} catch (Exception e) {
e.printStackTrace();
}
}
}
这里是来自 apex malhar 的 ApplicationTest 类示例
package org.apache.apex.examples.kafka.kafka2hdfs;
import org.apache.log4j.Logger;
import javax.validation.ConstraintViolationException;
import org.junit.Rule;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import com.datatorrent.api.LocalMode;
import info.batey.kafka.unit.KafkaUnitRule;
/**
* Test the DAG declaration in local mode.
*/
public class ApplicationTest
{
private static final Logger LOG = Logger.getLogger(ApplicationTest.class);
private static final String TOPIC = "kafka2hdfs";
private static final int zkPort = NetUtils.getFreeSocketPort();
private static final int brokerPort = NetUtils.getFreeSocketPort();
private static final String BROKER = "localhost:" + brokerPort;
private static final String FILE_NAME = "test";
private static final String FILE_DIR = "./target/tmp/FromKafka";
// broker port must match properties.xml
@Rule
private static KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
public void testApplication() throws Exception
{
try {
// run app asynchronously; terminate after results are checked
LocalMode.Controller lc = asyncRun();
lc.shutdown();
} catch (ConstraintViolationException e) {
LOG.error("constraint violations: " + e.getConstraintViolations());
}
}
private Configuration getConfig()
{
Configuration conf = new Configuration(false);
String pre = "dt.operator.kafkaIn.prop.";
conf.setEnum(pre + "initialOffset", AbstractKafkaInputOperator.InitialOffset.EARLIEST);
conf.setInt(pre + "initialPartitionCount", 1);
conf.set(pre + "topics", TOPIC);
conf.set(pre + "clusters", BROKER);
pre = "dt.operator.fileOut.prop.";
conf.set(pre + "filePath", FILE_DIR);
conf.set(pre + "baseName", FILE_NAME);
conf.setInt(pre + "maxLength", 40);
conf.setInt(pre + "rotationWindows", 3);
return conf;
}
private LocalMode.Controller asyncRun() throws Exception
{
Configuration conf = getConfig();
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(new KafkaApp(), conf);
LocalMode.Controller lc = lma.getController();
lc.runAsync();
return lc;
}
}
【问题讨论】:
-
如果我不得不猜测,你在做
runAsync,然后立即调用控制器的关闭方法 -
无论如何,如果你有Kafka,那么你可以使用Kafka Connect将消息写入HDFS(不需要安装Confluent)
-
感谢回复,但我认为将数据写入 hdfs 的部分在“lma.prepareDAG(new KafkaApp(), conf);”中定义asyncRun() 方法内的行。 PS:这是我第一次使用它,你能告诉我更清楚吗?
-
我不知道 Apex。我听说卡夫卡标签。请参阅来自 Confluent docs.confluent.io/current/connect/connect-hdfs/docs/index.html 的文档和博客 engineering.pandora.com/… 我的意思是,您不需要编写比某些配置文件更多的代码
-
关于你正在尝试的,不过,这个例子看起来不像你的代码......github.com/apache/apex-malhar/tree/master/examples/kafka/src/…
标签: java apache-kafka hdfs apache-apex