【问题标题】:prepare method executing multiple times多次执行prepare方法
【发布时间】:2019-12-28 20:20:49
【问题描述】:

您好,我正在使用 apache-storm 创建一个拓扑,其中我的 Spout 正在从 Kakfa 主题收集数据并将其发送到螺栓。

我正在对元组进行一些验证,并再次为其他螺栓发出流。

现在的问题是我的第二个螺栓使用第一个螺栓的流有一个重载方法prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) 每隔 2 秒执行一次。

拓扑代码是

topologyBuilder.setBolt("abc",new ValidationBolt()).shuffleGrouping(configurations.SPOUT_ID);

topologyBuilder.setBolt("TEST",new TestBolt()).shuffleGrouping("abc",Utils.VALIDATED_STREAM);

第一个螺栓“abc”的代码是

@Override
    public void execute(Tuple tuple) {
        String document = String.valueOf(tuple.getValue(4));
        if (Utils.isJSONValid(document)) {
            outputCollector.emit(Utils.VALIDATED_STREAM,new Values(document));
        }
    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(Utils.VALIDATED_STREAM,new Fields("document"));
    }

我在搜索的时候发现了

The prepare method is called when the bolt is initialised and is 
similar to the open method in spout. It is called only once for the bolt.
It gets the configuration for the bolt and also the context of the bolt. 
The collector is used to emit or output the tuples from this bolt. 

链接到公共要点以获取日志 Storm topology log

【问题讨论】:

  • 工人崩溃了吗? Storm 每个bolt任务只会调用一次prepare,但是如果worker崩溃重启,当然会再次调用。
  • @Stig 日志中没有异常。但是不断地获取一些日志。附加有问题的日志。

标签: java apache-kafka apache-zookeeper apache-storm


【解决方案1】:

您的日志显示您正在使用 LocalCluster。它是一个测试/演示工具,请勿将其用于生产工作负载。而是建立一个真正的分布式集群。

关于正在发生的事情:

当您在 LocalCluster 中运行拓扑时,Storm 通过将所有组件(Nimbus、Supervisor 和 worker)作为线程运行在单个 JVM 中来模拟真实的集群。您的日志显示以下几行:

20:14:12.451 [SLOT_1027] 信息 o.a.s.ProcessSimulator - 开始杀死进程 2ea97301-24c9-4c1a-bcba-61008693971a

20:14:12.451 [SLOT_1027] 信息 o.a.s.d.w.Worker - 关闭工作者 smart-transactional-data-1-1566571315 72bbf510-c342-4385-9599-0821a2dee94e 1027

20:14:15.518 [SLOT_1027] 信息 oasdsSlot - 运行 msInState 的状态:33328 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a -> kill-blob-update msInState : 3001 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a

20:14:15.540 [SLOT_1027] 信息 o.a.s.d.w.Worker - 为 smart-transactional-data-1-1566571315 启动工作器

LocalCluster 正在关闭一个模拟工作程序,因为 blobstore 中的一个 blob(例如拓扑 jar、拓扑配置、其他类型的共享文件,请参阅 https://storm.apache.org/releases/2.0.0/distcache-blobstore.html 了解更多信息)发生了变化。通常,当这种情况发生在真正的集群中时,worker JVM 将被杀死,blob 将被更新并且worker 将重新启动。由于您使用的是 LocalCluster,它只会杀死工作线程并重新启动它。这就是为什么您会看到多次调用 prepare

【讨论】:

  • 我尝试使用 StormSubmitter 运行它,它按预期工作。感谢您的解释。
  • 有什么方法可以使用本地集群实现同样的效果吗?
  • 如果你真的想在 LocalCluster 中只执行一次,只需在拓扑设置代码中运行它,或者在某处初始化一个静态变量。
  • 有没有例子。我对风暴很陌生。谢谢
  • 你当然也可以在你的bolt中放一个静态布尔字段,并且只有当该字段为假时才运行准备。运行准备后,将该字段设置为 true。这样可以防止 LocalCluster 多次运行 prepare,同时仍然允许分布式模式 Storm 工作。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-21
  • 1970-01-01
  • 2017-04-15
  • 1970-01-01
相关资源
最近更新 更多