【问题标题】:Storm fields grouping风暴场分组
【发布时间】:2016-01-08 15:00:47
【问题描述】:

我遇到以下情况:

  • 有许多螺栓计算不同的值
  • 这些值被发送到可视化螺栓
  • 可视化螺栓打开一个网络套接字并发送值以某种方式可视化

问题是,可视化 bolt 始终是相同的,但它会针对可以作为其输入的每种类型的 bolt 发送带有不同标头的消息。例如:

  • BoltSum 计算总和
  • BoltDif 计算差异
  • BoltMul 计算倍数

  • 所有这些螺栓都使用 VisualizationBolt 进行可视化

  • 本例中有 3 个 VisualizationBolt 实例

我的问题是,我是否应该创建 3 个独立的实例,每个实例都有一个线程,例如

builder.setBolt("forSum", new VisualizationBolt(),1).globalGrouping("bolt-sum");
builder.setBolt("forDif", new VisualizationBolt(),1).globalGrouping("bolt-dif");
builder.setBolt("forMul", new VisualizationBolt(),1).globalGrouping("bolt-mul");

或者我应该执行以下操作

builder.setBolt("forAll", new VisualizationBolt(),3)
.fieldsGrouping("forSum", new Fields("type"))
.fieldsGrouping("forDif", new Fields("type"))
.fieldsGrouping("forMul", new Fields("type"));

并从之前的每个螺栓发出类型,以便可以根据它对它们进行分组?

有什么优势?

另外,我是否应该期望每次 bolt-sum 都会转到第一个可视化 bolt,bolt-dif 会转到第二个可视化 bolt,而 bolt-mul 会转到第三个可视化 bolt?不会混在一起吧?

我认为应该是这样,但它目前不在我的实现中,所以我不确定这是一个错误还是我遗漏了什么?

【问题讨论】:

标签: apache-storm


【解决方案1】:

使用三个实例的第一种方法是正确的方法。使用fieldsGrouping 不能确保“sum”值转到“Sum-Visualization-Bolt”,并且 sum/diff/mul 值都不是不同的(即,在不同的螺栓实例中)。

fieldGrouping 的语义更宽松:它只保证相同类型的所有元组将由单个螺栓实例处理,即永远不会出现两个不同的螺栓实例获得同类型。

【讨论】:

  • 哦,我明白了,所以总和和差可能会归于一个螺栓?这就是我遇到的问题,即使我按字段分组,以前螺栓的输出也是混合的。
  • 没错。单个bolt实例可以获取多种类型(使用fieldsGrouping只有一种类型不会去不同的instanced)
  • 非常感谢,先生,我现在明白了。我认为分配会更“公平”,因为有 X 螺栓和 X 键,因此每个螺栓都会获得一个键的实例,但现在我明白事实并非如此。很高兴通过我的两个问题与您讨论。 :) 干杯!
【解决方案2】:

我猜你可以使用部分键分组 (partialKeyGrouping)。在Storm documentation about stream groups 上说:

部分键分组:流按字段分区 在分组中指定,如字段分组,但加载 在两个下游螺栓之间保持平衡,从而提供更好的 传入数据倾斜时的资源利用率。这张纸 很好地解释了它的工作原理和优点 提供。

我使用这个分组实现了一个简单的拓扑结构,Graphite 服务器上的图表显示了比fieldsGrouping 更好的负载平衡。完整的源代码是here

topologyBuilder.setBolt(MqttSensors.BOLT_SENSOR_TYPE.getValue(), new SensorAggregateValuesWindowBolt().withTumblingWindow(Duration.seconds(5)), 2)
        // .fieldsGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        // .fieldsGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .partialKeyGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .partialKeyGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .setNumTasks(4) // This will create 4 Bolt instances 
        .addConfiguration(TagSite.SITE.getValue(), TagSite.EDGE.getValue())
        ;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-28
    • 2023-04-04
    相关资源
    最近更新 更多