【发布时间】:2016-09-17 04:48:51
【问题描述】:
在过去的两天里,我一直在尝试在我们的拓扑中实现一个 KafkaSpout。这里 是一些重要的信息。
所有三个服务都在同一个实例上运行。 Kafka 的代理默认使用 9092
端口,advertised.listeners 设置为 PLAINTEXT://localhost:9092。 Zookeeper,使用默认值
客户端端口 2181。而 Storm Nimbus 主机名也已设置为 localhost。
自定义 Kafka Producer 成功创建日志消息,而使用 zkCli Zookeeper 脚本我看到在使用 /brokers 路径时,分区和其他相关 信息存储正确。
但是,我在激活时不断收到错误消息,然后监控拓扑。 下面是我实现的 Storm 拓扑的源代码:
BrokerHosts hosts = new ZkHosts("127.0.0.1:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/", "bytes" + UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList("127.0.0.1");
spoutConfig.zkPort = 2181;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("bytes", kafkaSpout);
builder.setBolt("byteSize", new KafkaByteProcessingBolt()).shuffleGrouping("bytes");
StormTopology topology = builder.createTopology();
Config config = new Config();
StormSubmitter.submitTopology("topology", config, topology);
但是,我在执行bin/storm monitor <topology_name> -m bytes 时不断收到以下错误消息:
Exception in thread "main" java.lang.IllegalArgumentException: stream: default not found
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223)
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159)
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.monitor.main(Unknown Source)
而通过检查工作人员的日志(worker.log 文件),我得出的结论是 KafkaSpout 在 open() 方法上失败。
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101]
... 5 more
有人可以解释一下 KafkaSpout 在 打开()方法?
非常感谢您的帮助!
【问题讨论】:
-
您使用的是什么版本的 Kafka 和 Storm?还要看storm-kafka的版本。你在使用 HDP 集群吗?