【问题标题】:How to customize column mappings with Spark Cassandra Connector in Java?如何使用 Java 中的 Spark Cassandra 连接器自定义列映射?
【发布时间】:2017-04-03 01:21:39
【问题描述】:

我想将列映射更改为追加。在 Java 中使用 Spark Cassandra 连接器自定义列映射是否有比以下更好的方法?

 ColumnName song_id = new ColumnName("song_id", Option.empty());
 CollectionColumnName key_codes = new ColumnName("key_codes", Option.empty()).append();
 List<ColumnRef> collectionColumnNames = Arrays.asList(song_id, key_codes);
 scala.collection.Seq<ColumnRef> columnRefSeq = JavaApiHelper.toScalaSeq(collectionColumnNames);

 javaFunctions(songStream)
                .writerBuilder("demo", "song", mapToRow(PianoSong.class))
                .withColumnSelector(new SomeColumns(columnRefSeq))
                .saveToCassandra();

这取自this Spark Streaming code 样本。

【问题讨论】:

    标签: apache-spark spark-streaming spark-cassandra-connector


    【解决方案1】:

    只需使用 CollectionColumnName

    哪个有构造函数

    case class CollectionColumnName(
        columnName: String,
        alias: Option[String] = None,
        collectionBehavior: CollectionBehavior = CollectionOverwrite) extends ColumnRef 
    

    您可以通过设置alias 来重命名,并且可以使用collectionBehavior 更改插入行为,它采用以下类。

    Api Link

    /** Insert behaviors for Collections. */
    sealed trait CollectionBehavior
    case object CollectionOverwrite extends CollectionBehavior
    case object CollectionAppend extends CollectionBehavior
    case object CollectionPrepend extends CollectionBehavior
    case object CollectionRemove extends CollectionBehavior
    

    这意味着你可以做到

    CollectionColumnName appendColumn = 
      new CollectionColumnName("ColumnName", Option.empty(), CollectionPrepend$.MODULE$);
    

    它看起来更像 Java-y 并且更加明确。您对这段代码还有其他目标吗?

    【讨论】:

      猜你喜欢
      • 2019-05-13
      • 2017-01-09
      • 2021-05-23
      • 1970-01-01
      • 2017-03-23
      • 1970-01-01
      • 2015-09-13
      • 2018-04-03
      • 2015-08-16
      相关资源
      最近更新 更多