【问题标题】:Storm Bolt not printing/logging Kafka SpoutStorm Bolt 不打印/记录 Kafka Spout
【发布时间】:2015-04-22 12:41:03
【问题描述】:

编辑:我在 Bolt 中添加了一个 .ack()(这要求我使用 Rich Bolt 而不是基本 Bolt)并且遇到了同样的问题 - 没有任何东西告诉我 Bolt 正在处理元组。

如果重要的话,我会在 EC2 实例上的 CentOS 映像上运行它。任何帮助将不胜感激。



我正在尝试设置一个非常基本的 HelloWorld Storm 示例来读取来自 Kafka 集群的消息并打印/记录我收到的消息。

目前我在 Kafka 集群中有 20 条消息。当我运行拓扑时(看起来启动得很好),我可以看到我的 Kafka Spout 以及 Echo Bolt。在 Storm UI 中,Kafka Spout Acked 列的值为 20 - 我假设这是它能够读取/访问的消息数(?)

然而,Echo Bolt 行仅指出我有 1 个执行程序和 1 个任务。所有其他列均为 0。

查看生成的 Storm 工作日志,我看到这一行:Read partition information from: /HelloWorld Spout/partition_0 --> {"topic":"helloworld","partition":0,"topology":{"id":"<UUID>","name":"Kafka-Storm test"},"broker":{"port":6667,"host":"ip-10-0-0-35.ec2.internal"},"offset":20}

接下来的几行如下:

s.k.PartitionManager [INFO] Last commit offset from zookeeper: 0
s.k.PartitionManager [INFO] Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
s.k.PartitionManager [INFO] Starting Kafka ip-10-0-0-35.ec2.internal:0 from offset 0
s.k.ZkCoordinator [INFO] Task [1/1] Finished refreshing
s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections
s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=ip-10-0-0-35.ec2.internal:6667}}

工作日志的其余部分没有显示 Bolt 处理的消息的日志/打印输出。我不知道为什么 Bolt 似乎没有从 Kafka 集群获得任何消息。任何帮助都会很棒。谢谢。

构建 KafkaSpout

private static KafkaSpout setupSpout() {
  BrokerHosts hosts = new ZkHosts("localhost:2181");
  SpoutConfig spoutConfig = new SpoutConfig(hosts, "helloworld", "", "HelloWorld Spout");
  spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  spoutConfig.forceFromStart = true;
  spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
  return new KafkaSpout(spoutConfig);
}

构建拓扑并提交

public static void main(String[] args) {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("Kafka Spout", setupSpout());
  builder.setBolt("Echo Bolt", new SystemOutEchoBolt());

  try {
    System.setProperty("storm.jar", "/tmp/storm.jar");
    StormSubmitter.submitTopology("Kafka-Storm test", new Config(), builder.createTopology());
  } //catchExceptionsHere
}

螺栓

public class SystemOutEchoBolt extends BaseRichBolt {

  private static final long serialVersionUID = 1L;
  private static final Logger logger = LoggerFactory.getLogger(SystemOutEchoBolt.class);

  private OutputCollector m_collector;

  @SuppressWarnings("rawtypes")
  @Override
  public void prepare(Map _map, TopologyContext _conetxt, OutputCollector _collector) {
    m_collector = _collector;
  }

  @Override
  public void execute(Tuple _tuple) {
    System.out.println("Printing tuple with toString(): " + _tuple.toString());
    System.out.println("Printing tuple with getString(): " + _tuple.getString(0));
    logger.info("Logging tuple with logger: " + _tuple.getString(0));
    m_collector.ack(_tuple);
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer _declarer) {}
}

【问题讨论】:

    标签: java apache-storm apache-kafka


    【解决方案1】:

    答案很简单。我从来没有告诉螺栓订阅哪个流。添加.shuffleGrouping("Kafka Spout"); 解决了这个问题。

    【讨论】:

      【解决方案2】:

      你需要在你的 bolts 中调用一个 ack 或者 fail ,否则 spout 不知道这个 tuple 已经被完全处理了。这将导致您看到的计数问题。

      public class SystemOutEchoBolt extends BaseBasicBolt {
      
        private static final long serialVersionUID = 1L;
        private static final Logger logger = LoggerFactory.getLogger(SystemOutEchoBolt.class);
      
        @Override
        public void execute(Tuple _tuple, BasicOutputCollector _collector) {
          System.out.println("Printing tuple with toString(): " + _tuple.toString());
          System.out.println("Printing tuple with getString(): " + _tuple.getString(0));
          logger.info("Logging tuple with logger: " + _tuple.getString(0));
          _collector.ack(_tuple);
        }
      
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {}
        }
      }
      

      【讨论】:

      • 感谢您的回复 - 我会试一试。看起来我需要使用 Rich Bolt,因为 BasicOutputCollector 没有 .ack() 方法。这可能会解决我的计数问题,但它也会解决我无法在工作日志文件中看到日志/打印语句的问题吗?
      猜你喜欢
      • 2016-04-17
      • 1970-01-01
      • 1970-01-01
      • 2017-02-23
      • 1970-01-01
      • 2019-03-02
      • 1970-01-01
      • 2013-06-24
      • 2015-03-13
      相关资源
      最近更新 更多