【问题标题】:Spark scala cassandra save/updateSpark scala cassandra 保存/更新
【发布时间】:2020-07-27 05:08:13
【问题描述】:

我有一个实体的 spark 数据集,它应该保存/更新到名为“offer”的 cassandra 表中。

case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String)
val offerDataset: Dataset[Offer] = ....

我想将上述'offerDataset'保存或更新到cassandra,写入时间戳由'offer'的"metadata_last_modified_source_time"字段决定 实体。

offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("metadata_last_modified_source_time")))

在写信给 Cassandra 时,我面临以下异常。有人可以帮我理解这个问题。 'metadata_last_modified_source_time' 的 util.Date 和 Long 类型出现相同的错误。

com.datastax.driver.core.exceptions.InvalidTypeException: Value metadata_last_modified_source_time is of type bigint, not timestamp
at com.datastax.driver.core.AbstractGettableByIndexData.checkType(AbstractGettableByIndexData.java:83)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:529)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:536)
at com.datastax.driver.core.BoundStatement.set(BoundStatement.java:870)
at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnNull(BoundStatementBuilder.scala:59)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$5.apply(BoundStatementBuilder.scala:83)

【问题讨论】:

  • 您使用的是什么版本的 Spark Cassandra 连接器?在原始案例类声明的情况下,java.sql.Timestamp 错误是正确的...
  • metadata_last_modified_source_time 是 bigint 类型,尝试将 epoch timestamp 保存为时间戳的 bigint 值,而不是 java.sql.Timestamp
  • @AlexOtt,我正在使用 spark-cassandra-connector-2.0.3。 metadata_last_modified_source_time 的类型是 sql.Timestamp。错误是说类型是 bigInt。错误怎么可能是正确的?
  • 尝试更新版本 - 2.4.3 等

标签: scala cassandra timestamp save apache-spark-dataset


【解决方案1】:

我在浏览此文档后找到了解决方案 - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

Offer 案例类中引入了一个新字段 writeTime,该字段应映射到 cassandra 表的写入时间戳

case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String, writeTime: sql.Date)

在构建 offerDataSet 时,我将 writeTime 字段的值设置为

val offerDataset: Dataset[Offer] = {....
   ....
    val writeTime = new Date(metadata_last_modified_source_time.getTime())
   ....
   ....
}

offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))

【讨论】:

    猜你喜欢
    • 2016-01-30
    • 1970-01-01
    • 1970-01-01
    • 2019-01-20
    • 2018-04-19
    • 2018-03-16
    • 1970-01-01
    • 2018-03-11
    • 2019-02-22
    相关资源
    最近更新 更多