【问题标题】:Unable to read data with StormSpout无法使用 StormSpout 读取数据
【发布时间】:2015-01-24 23:59:58
【问题描述】:

我正在尝试使用 KafkaSpout 从 Kafka 队列中读取消息。我要么一无所获,要么出现以下错误:

2 [Thread-10-kafka-storm-spout] ERROR util:0 - Async loop died!
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
    at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:38)
    at kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34)
    at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:43)
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:57)
    at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:80)
    at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:52)
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118)
    at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563)
    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:744)
11 [Thread-10-kafka-storm-spout] ERROR executor:0 - 
java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
    at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:38)
    at kafka.javaapi.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:34)
    at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:43)
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:57)
    at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:80)
    at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:52)
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118)
    at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563)
    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:744)

这是我的代码:

TopologyBuilder builder = new TopologyBuilder();
        String TOPIC_NAME = "topic";
        String spoutName = "kafka-storm-spout";             

        BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, TOPIC_NAME, "", "storm");

        builder.setSpout(spoutName, new KafkaSpout(kafkaConfig), 1);
        builder.setBolt("kafka-bolt", new TestBolt()).shuffleGrouping(spoutName);

        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("kafka-test", config, builder.createTopology());

        System.out.println("Topology submitted");
        Utils.sleep(5000);
        System.out.println("Shutting down");
        cluster.shutdown();

有什么想法吗?

【问题讨论】:

    标签: java apache-storm apache-kafka


    【解决方案1】:

    您很可能尝试使用不同版本的 scala。 Kafka 是为不同版本的 scala (https://kafka.apache.org/downloads.html) 构建的。查看您的依赖项并确保您只使用一个版本的 scala。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-11-12
      • 2011-11-26
      • 2018-06-21
      • 2021-09-03
      • 2021-06-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多