【问题标题】:Data parallelism in StormStorm 中的数据并行性
【发布时间】:2015-05-08 18:54:22
【问题描述】:

我已经阅读了有关 Apache 风暴的信息并做了一些基本教程。我有以下拓扑结构,我想用storm来实现,但不确定如何处理数据分布。 业务要求是 实时评估客户组合。 简而言之,它包括: 1) 接受市场价格的实时动态(货币、商品等...) 2) 对于每个价格变动,计算每个头寸的当前利润并将其转换为客户账户货币 3) 分析每个客户所有头寸的总盈亏和交易量,并在需要时生成信号 4) 在客户级别的计算必须是顺序的和原子的/序列化的。 IE。所有仓位都必须按照其进入系统的顺序在每个报价单上进行评估,并且即使客户有 100 个仓位,也必须根据相同的价格计算总数。 5) 分析系统中按交易品种/客户类型/国家/等聚合的所有头寸的交易量/趋势...并在某种仪表盘中提供它们。

所有订单都执行并存储在 rdbms 中。 我的主要问题是如何在每个节点处理它自己的部分的不同节点上的 Storm bolts 上分配成千上万个位置。使用 Modulo 足以对客户进行分区,但是我如何为每个 bolt 实例提供 id,以便每个实例只处理它自己的相同部分客户? Storm 中是否有开箱即用的功能可以做到这一点? 另一个问题是如何有效地进行上述聚合?

【问题讨论】:

    标签: apache-storm computational-finance


    【解决方案1】:

    您可以为此使用fieldsGrouping。您可以声明一个用于对元组进行分组的字段(在您的情况下为id)。

    我只是假设您的输入流是带有 id 和 body 字段的 JSON 对象

    {"id":"1234","body":"some body"}
    

    还假设您的拓扑有一个 spout,两个 Bolt,即 BoltA 和 BoltB。

    在 BoltB 中,重写 declareOutputFields 方法并填写详细信息。

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id","log"));
    }
    

    你可以像下面这样声明拓扑

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", spout, 1);
    builder.setBolt("boltA", new BoltA(), 1)
           .shuffleGrouping("spout");
    builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id"));
    

    在这种情况下,来自 boltA 的具有相同 id 的元组将被传递到 boltB 的同一实例

    【讨论】:

    • 你能不能也回答我的第二个问题?
    猜你喜欢
    • 2015-07-07
    • 1970-01-01
    • 1970-01-01
    • 2016-04-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-19
    • 1970-01-01
    相关资源
    最近更新 更多