【问题标题】:Save null Values in Cassandra using DataStax Spark Connector使用 DataStax Spark 连接器在 Cassandra 中保存空值
【发布时间】:2016-01-04 19:48:34
【问题描述】:

我尝试使用 Spark 和 Cassandra Spark 连接器将流数据保存到 Cassandra。

我做了如下的事情:

创建一个模型类:

public class ContentModel {
    String id;

    String available_at; //may be null

  public ContentModel(String id, String available_at){
     this.id=id;
     this.available_at=available_at,
  }
}

将流式内容映射到模型:

JavaDStream<ContentModel> contentsToModel = myStream.map(new Function<String, ContentModel>() {
        @Override
        public ContentModel call(String content) throws Exception {

            String[] parts = content.split(",");
            return new ContentModel(parts[0], parts[1]);
        }
    });

保存:

CassandraStreamingJavaUtil.javaFunctions(contentsToModel).writerBuilder("data", "contents", CassandraJavaUtil.mapToRow(ContentModel.class)).saveToCassandra();

如果某些值为null,我会收到以下错误:

com.datastax.spark.connector.types.TypeConversionException: Cannot convert object null to struct.ValueRepr.

有没有办法使用 Spark Cassandra 连接器存储空值?

【问题讨论】:

    标签: java cassandra apache-spark


    【解决方案1】:

    在 scala 中,您也可以为此使用 Options。

    【讨论】:

    • 提供同样的更多细节
    【解决方案2】:

    Cassandra 没有 null 的概念。一列是空的或填充的。我通过以下方式在 scala 中解决了这个问题:我使用了 map 方法并检查了空值。我用空字符串覆盖了 null 。而已。效果很好。

    【讨论】:

    • 我不这么认为。我尝试使用 Datastax 的 java 映射驱动程序将空值插入 Cassandra 表,并且它是成功的,即使列类型是 int 和 float。并且 Cassandra 也显示 null 并返回 null,因此“Cassandra 没有 f null 的概念”这句话没有太大说服力。
    【解决方案3】:

    我们能否知道您的依赖项的版本(Spark、Connector、Cassandra 等)

    是的,有一种方法可以使用 Cassandra 连接器存储空值。我让您的示例与一个简单的应用程序和一些更改一起正常工作(添加 Serializabe + 将您的模型属性转换为 Camel Case + 相关的 getter 和 setter)。我对Java API不太熟悉(在做Spark时你真的应该使用Scala,它让事情变得更容易),但我的印象是对模型类的反思是在getter/setter级别完成的......可能是错误的。

    模型

    public class ModelClass implements Serializable {
        String id;
    
        String availableAt; //may be null
    
        public ModelClass(String id, String availableAt){
            this.id=id;
            this.availableAt=availableAt;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
           this.id = id;
        }
    
        public String getAvailableAt() {
            return availableAt;
         }
    
        public void setAvailableAt(String availableAt) {
            this.availableAt = availableAt;
        }
    }
    

    司机

    public static void main(String ... args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("Local App");
        conf.setMaster("local[*]");
        JavaSparkContext context = new JavaSparkContext(conf);
    
        List<ModelClass> modelList = new ArrayList<>();
        modelList.add(new ModelClass("Test", null));
        modelList.add(new ModelClass("Test2", "test"));
        context.parallelize(modelList);
        JavaRDD<ModelClass> modelRDD = context.parallelize(modelList);
        javaFunctions(modelRDD).writerBuilder("test", "model", mapToRow(ModelClass.class))
                .saveToCassandra();
    }
    

    生产

    cqlsh:test> select * from model;
    
     id    | available_at
    -------+--------------
      Test |         null
     Test2 |         test
    

    不过,了解“编写”空值的方式的含义很重要。一般来说,我们希望避免写出空值,因为 Cassandra 生成墓碑的方式。如果这些是初始写入,您将希望将它们视为“未设置”。

    全局将所有空值视为未设置

    全局将所有空值视为 Unset WriteConf 现在还包含一个 可以通过使用 SparkConf 键设置的参数 ignoreNulls spark.cassandra.output.ignoreNulls。默认值为 false ,这将 导致空值被视为在以前的版本中(被插入到 卡桑德拉原样)。当设置为 true 时,所有空值都将被视为未设置。 这可以与 DataFrames 一起使用以跳过空记录并避免 墓碑。

    https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#globally-treating-all-nulls-as-unset

    编辑:我应该澄清一下,Cassandra 内部不存储实际的空值 - 它只是 未设置。但是我们可以在应用程序级别使用空值来推理 Cassandra。

    【讨论】:

    • 遇到这个错误:java.lang.NoSuchMethodError: org.apache.spark.SparkContext.getExecutorStorageStatus()[Lorg/apache/spark/storage/StorageStatus;在 com.datastax.spark.connector.cql.CassandraConnector$.apply(CassandraConnector.scala:204) 在 com.datastax.spark.connector.RDDFunctions.saveToCassandra$default$5(RDDFunctions.scala:32) ... 53 省略
    • Spark 版本:2.4.3
    • @SumitAgarwal 当您的 spark 连接器版本与您的 spark 版本不兼容时,您会看到这种异常。您可以查看兼容性矩阵:github.com/datastax/…
    • 谢谢你的帮助,如果可能的话,你能看看这个吗:stackoverflow.com/questions/57659876/…
    猜你喜欢
    • 2015-05-24
    • 2016-08-11
    • 2015-08-16
    • 2017-03-04
    • 2020-10-17
    • 2015-05-21
    • 2017-02-13
    • 2016-02-04
    • 2020-02-12
    相关资源
    最近更新 更多