【问题标题】:Inserting rows to cassandra using Storm Trident使用 Storm Trident 向 cassandra 插入行
【发布时间】:2014-02-08 20:32:02
【问题描述】:

我正在尝试在 Cassandra 2.0.5、Storm 版本 0.9.0.1 中的表中插入一个简单的行。

我的测试如下:

我有一个由 id (int) 和 sentence (text) 列组成的表。 id 是主键。

我的 spout 生成句子,我添加了一个 ID(代码中的静态增量)。

这是我的拓扑:

TridentTopology topology = new TridentTopology();
StateFactory cassandraStateFactory = CassandraMapState.nonTransactional(options);
Fields fields = new Fields("id", "sentence");
MyTridentTupleMapper tupleMapper = new MyTridentTupleMapper(keyspace, fields);
CassandraUpdater updater = new CassandraUpdater(tupleMapper);
TridentState wordCounts = topology.newStream("spout1", spout)
            .each(new Fields("sentence"), new AddId(), new Fields("id"))
            .partitionPersist(cassandraStateFactory, fields, updater);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, topology.build());    

MyTridentTupleMapper 的代码:

https://github.com/guywald/trident-cassandra-read-write-examples/blob/master/src/test/java/com/guywald/storm/trident/cassandra/MyTridentTupleMapper.java

我得到以下异常:

2014-02-08 22:20:14 ERROR executor:0 - 
java.lang.RuntimeException: java.lang.ClassCastException: storm.trident.state.map.SnapshottableMap cannot be cast to com.hmsonline.storm.cassandra.trident.CassandraState
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
    at backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
    at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:744)

我不知道为什么它会返回这个并希望得到帮助。

【问题讨论】:

    标签: java cassandra apache-storm trident


    【解决方案1】:

    看起来 CassandraUpdater 需要 CassandraState,而 CassandraMapState.nonTransactional 创建不兼容的 SnapshottableMap。 我相信常见的(或异国情调的)MapState 更新程序将与 CassandraMapState 一起使用。这里有一个关于何时使用 State 和 MapState 的很好的解释:https://groups.google.com/forum/#!topic/storm-user/TASr2zWyzKs

    我认为应该使用 CassandraStateFactory 作为你的示例工作的状态工厂。

    【讨论】:

      猜你喜欢
      • 2014-03-06
      • 2013-03-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-09-14
      相关资源
      最近更新 更多