【问题标题】:How to save CQL Collection objects to Cassandra with Hadoop?如何使用 Hadoop 将 CQL 集合对象保存到 Cassandra?
【发布时间】:2014-07-13 10:45:47
【问题描述】:

我正在使用 Spark Hadoop API 从 Cassandra 获取数据并将结果保存到 Cassandra。 对于行值,如果列类型很长,这是使用 Hadoop 的 CqlOutputFormat 适配器向 Cassandra 发送数据的方式:

val outVal = new java.util.ArrayList[ByteBuffer](1)
outVal.add(ByteBufferUtil.bytes(count.longValue()))

但是,当列类型为set<text> 时,我无法使其工作。我尝试使用 ObjectOutputStream 序列化 java.util.Set 对象,但节俭客户端抛出 InvalidRequestException(why:string didn't validate.)

val outVal = new java.util.ArrayList[ByteBuffer](1)
val byteOut = new ByteArrayOutputStream()
val out = new ObjectOutputStream(byteOut)
out.writeObject(data)
byteOut.close()
outVal.add(ByteBuffer.wrap(byteOut.toByteArray))
(outKey, outVal)

它似乎期望 outVal 是一个字符串值。我查看了 Cassandra 中 SetSerializer 和 CollectionSerializer 类的源代码,似乎 Cassandra 对 Collection 对象使用了自定义序列化。 Hadoop CQL3 API 是否提供了一种序列化 Collection 对象的方法,还是我必须找到一种从外部使用 Cassandra 内部类的方法?

【问题讨论】:

    标签: java hadoop cassandra cql apache-spark


    【解决方案1】:

    目前看来唯一的解决方案是从 Cassandra 源代码中复制序列化代码。以下是 Cassandra 在内部处理集合对象的方式:

        List<ByteBuffer> bbs = new ArrayList(list.size());
        int size = 0;
        for (String elt : list)
        {
            ByteBuffer bb = ByteBufferUtil.bytes(elt);
            bbs.add(bb);
            size += 2 + bb.remaining();
        }
    
        ByteBuffer result = ByteBuffer.allocate(2 + size);
        result.putShort((short)list.size());
        for (ByteBuffer bb : bbs)
        {
            result.putShort((short)bb.remaining());
            result.put(bb.duplicate());
        }
        return (ByteBuffer)result.flip();
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-10-25
      • 2013-02-13
      • 1970-01-01
      • 2018-03-13
      • 2020-08-24
      • 2020-10-03
      相关资源
      最近更新 更多