【问题标题】:Apache Storm Trident .each() function explanationApache Storm Trident .each() 函数说明
【发布时间】:2015-11-26 10:40:46
【问题描述】:

我想在一个项目中使用 Apache Storm 的 TridentTopology。我发现很难理解storm.trident.Stream 类中的.each() 函数。以下是他们教程中给出的示例代码供参考:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

我不明白 .each() 方法的签名。以下是我的理解。如果我错了,请纠正我,并为我的知识提供更多信息。

.each()

  • 第一个参数将作为相关键的字段带到 从 spout 发出值并从 getOutputFields() 返回 喷口中的方法。我仍然不知道为什么要使用该参数 为。
  • 第二个参数是扩展 BaseFunction 的类。它 处理元组。
  • 第三个参数的理解与第一个参数类似。

【问题讨论】:

    标签: apache-storm trident


    【解决方案1】:

    第一个参数是输入元组的投影。在您的示例中,仅将名称为“句子”的字段提供给Split。如果您的源发出带有架构Fields("first", "sentence", "third") 的元组,您只能访问Split 中的“句子”。此外,“句子”在Split 中的索引为零(而不是一)。请注意,它不是对输出的投影——所有字段都将保留在输出元组中!这只是对Split 中整个元组的有限看法。

    最后一个参数是Values 在Split 中赋予emit() 的架构。此字段名称作为新属性附加到输出元组。因此,输出元组的模式是输入元组的模式(原始的,不是由第一个参数投影的)加上最后一个参数的字段。

    请参阅文档中的“功能”部分:https://storm.apache.org/releases/0.10.0/Trident-API-Overview.html

    【讨论】:

      猜你喜欢
      • 2018-09-19
      • 1970-01-01
      • 1970-01-01
      • 2013-03-09
      • 2018-11-03
      • 1970-01-01
      • 1970-01-01
      • 2014-08-22
      • 1970-01-01
      相关资源
      最近更新 更多