【问题标题】:Getting data from one database and process and store it to another database using trident topology使用三叉戟拓扑从一个数据库中获取数据并处理并将其存储到另一个数据库
【发布时间】:2013-09-14 04:00:27
【问题描述】:

我想通过 spout 从一个数据库中获取数据并处理数据并使用 trident 将其存储在另一个数据库中。我是 Storm 和 trident 的新手,我不知道如何实现它。我从spout 中的数据库(实现 trident 支持的 IRichSpout 的单独 java 类),我将其作为对象发出。我需要将它传递给 trident 拓扑进行处理(计算记录数)并将其存储到数据库中。

 TridentTopology topology = new TridentTopology();  
 TridentState wordCounts =
          topology.newStream("spout1",spout)

现在新的流接受一个 spout 作为输入,即语法是

 Stream storm.trident.TridentTopology.newStream(String txId, IRichSpout spout)

但我想将 spout 发出的对象作为流的输入,以便 trident 处理并保存到数据库。那么如何将我的 spout 类带入 trident 并将其传递给新的流,或者我应该结合spout 和 trident 同一个类??

有人可以帮忙吗.....

【问题讨论】:

    标签: apache-storm trident


    【解决方案1】:

    你可以这样做

        MyFooSpout spout = new MyFooSpout();
        topology.newStream("spout1", spout)....
    

    MyFooSpout 类应该在哪里实现 IRichSpout

    来自trident tutorial TridentTopology 中的 newStream 方法在从任何输入源读取的拓扑中创建数据流。

    在您的情况下,它可能是 MyFooSpout

    。我在 spout 中从数据库中获取数据(实现 trident 支持的 IRichSpout 的单独 java 类),并将其作为 object

    发出

    你能澄清一下你到底指的是什么吗?你的 spout 代码是什么样子的?作为一个非常通用的示例,如果我们编写类似(取自教程页面)

        TridentState wordCounts = topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word"))
    

    这意味着spout 应该发出一个字段,即sentence。通过调用eachSplit 函数将应用于流中的每个元组,该函数将根据通过使用sentence 字段编写的任何代码执行。但是,这可能会根据您的要求而有所不同。例如,它可以是 Filter 作为 MyFilter extends BaseFilterfunction 作为 MyCustomFuction extends BaseFunction。查看 API 页面了解更多详情。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-06-11
      • 1970-01-01
      相关资源
      最近更新 更多