【问题标题】:Why is Apache Storm KafkaSpout emitting so many items from Kafka topic?为什么 Apache Storm KafkaSpout 从 Kafka 主题发出这么多项目?
【发布时间】:2017-10-19 19:13:15
【问题描述】:

我遇到了 Kafka 和 Storm 的问题。目前我不确定这是我设置的 KafkaSpout 配置有问题,还是我没有正确确认或什么。

我将 50 个项目排队到我的 Kafka 主题中,但我的 spout 发出了超过 1300 个(并且还在增加)元组。此外,Spout 报告说几乎所有人都“失败了”。拓扑实际上并没有失败,它正在成功写入数据库,但我只是不知道为什么它显然会重播所有内容(如果它正在这样做的话)

最大的问题是:

当我只将 50 传递给 Kafka 时,为什么它会发出这么多元组?

这是我设置拓扑和 KafkaSpout 的方式

  public static void main(String[] args) {
    try {
      String databaseServerIP = "";
      String kafkaZookeepers = "";
      String kafkaTopicName = "";
      int numWorkers = 1;
      int numAckers = 1;
      int numSpouts = 1;
      int numBolts = 1;
      int messageTimeOut = 10;
      String topologyName = "";

      if (args == null || args[0].isEmpty()) {
        System.out.println("Args cannot be null or empty. Exiting");
        return;
      } else {
        if (args.length == 8) {
          for (String arg : args) {
            if (arg == null) {
              System.out.println("Parameters cannot be null. Exiting");
              return;
            }
          }
          databaseServerIP = args[0];
          kafkaZookeepers = args[1];
          kafkaTopicName = args[2];
          numWorkers = Integer.valueOf(args[3]);
          numAckers = Integer.valueOf(args[4]);
          numSpouts = Integer.valueOf(args[5]);
          numBolts = Integer.valueOf(args[6]);
          topologyName = args[7];
        } else {
          System.out.println("Bad parameters: found " + args.length + ", required = 8");
          return;
        }
      }

      Config conf = new Config();

      conf.setNumWorkers(numWorkers);
      conf.setNumAckers(numAckers);
      conf.setMessageTimeoutSecs(messageTimeOut);

      conf.put("databaseServerIP", databaseServerIP);
      conf.put("kafkaZookeepers", kafkaZookeepers);
      conf.put("kafkaTopicName", kafkaTopicName);

      /**
       * Now would put kafkaSpout instance below instead of TemplateSpout()
       */
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout(topologyName + "-flatItems-from-kafka-spout", getKafkaSpout(kafkaZookeepers, kafkaTopicName), numSpouts);
      builder.setBolt(topologyName + "-flatItem-Writer-Bolt", new ItemWriterBolt(), numBolts).shuffleGrouping(topologyName + "-flatItems-from-kafka-spout");


      StormTopology topology = builder.createTopology();

      StormSubmitter.submitTopology(topologyName, conf, topology);

    } catch (Exception e) {
      System.out.println("There was a problem starting the topology. Check parameters.");
      e.printStackTrace();
    }
  }

  private static KafkaSpout getKafkaSpout(String zkHosts, String topic) throws Exception {

    //String topic = "FLAT-ITEMS";
    String zkNode = "/" + topic + "-subscriber-pipeline";
    String zkSpoutId = topic + "subscriberpipeline";
    KafkaTopicInZkCreator.createTopic(topic, zkHosts);


    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHosts), topic, zkNode, zkSpoutId);
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

    // spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
    //spoutConfig.startOffsetTime = System.currentTimeMillis();
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

    return new KafkaSpout(spoutConfig);

  }

这是主题的创建,以防万一

  public static void createTopic(String topicName, String zookeeperHosts) throws Exception {
    ZkClient zkClient = null;
    ZkUtils zkUtils = null;
    try {

      int sessionTimeOutInMs = 15 * 1000; // 15 secs
      int connectionTimeOutInMs = 10 * 1000; // 10 secs

      zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
      zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

      int noOfPartitions = 1;
      int noOfReplication = 1;
      Properties topicConfiguration = new Properties();

      boolean topicExists = AdminUtils.topicExists(zkUtils, topicName);
      if (!topicExists) {
        AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
      }
    } catch (Exception ex) {
      ex.printStackTrace();
    } finally {
      if (zkClient != null) {
        zkClient.close();
      }
    }
  }

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    你需要看看bolt中的消息是否失败。

    如果他们也都失败了,你可能没有确认bolt中的消息,或者bolt代码有异常。

    如果螺栓消息得到确认,则更有可能是超时。增加拓扑超时配置或 paralisim 应该可以解决问题。

    【讨论】:

    • 谢谢。确认螺栓的正确方法是什么?如何增加拓扑超时?
    • @markg 如果您使用的是 BaseBasicBolt,则无需处理 ack。它使用BaseRichBolt,你应该在execute方法中调用ack()。
    • @markg 拓扑超时时间为“topology.message.timeout”配置,可以在拓扑代码中设置,也可以在supervisor的storm.yaml中设置
    猜你喜欢
    • 2021-03-20
    • 2017-10-25
    • 2016-11-16
    • 1970-01-01
    • 2016-06-29
    • 2020-04-06
    • 2016-08-31
    • 1970-01-01
    • 2021-06-16
    相关资源
    最近更新 更多