【发布时间】:2015-11-26 10:40:46
【问题描述】:
我想在一个项目中使用 Apache Storm 的 TridentTopology。我发现很难理解storm.trident.Stream 类中的.each() 函数。以下是他们教程中给出的示例代码供参考:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);
我不明白 .each() 方法的签名。以下是我的理解。如果我错了,请纠正我,并为我的知识提供更多信息。
.each()
- 第一个参数将作为相关键的字段带到 从 spout 发出值并从 getOutputFields() 返回 喷口中的方法。我仍然不知道为什么要使用该参数 为。
- 第二个参数是扩展 BaseFunction 的类。它 处理元组。
- 第三个参数的理解与第一个参数类似。
【问题讨论】:
标签: apache-storm trident