【问题标题】:Spark Cassandra Connector: Implement SCD Type 1Spark Cassandra 连接器:实施 SCD 类型 1
【发布时间】:2020-05-08 14:05:51
【问题描述】:

我是 Cassandra 的新手,我想在 Cassandra DB 中实现 SCD Type-1
此 SCD Type1 作业将从 Spark 执行。
数据将存储为时间序列分区数据。即:年/月/日

示例:我有过去 300 天的记录,而我的新记录可能既有新记录也有更新记录。 我想比较最近 100 天的更新记录,如果记录是新的,那么它应该执行插入操作,否则更新。

我没有任何线索来执行此操作,因此不共享任何 CQL :(

示例表结构为:

CREATE TABLE crossfit_gyms_by_city_New (  
 country_code text,  
 state_province text,  
 city text,  
 gym_name text,  
 PRIMARY KEY ((country_code, state_province), gym_name)  
) WITH CLUSTERING ORDER BY (gym_name ASC );

我的示例 Spark 代码:


object SparkUpdateCassandra {
  System.setProperty("hadoop.home.dir", "C:\\hadoop\\")

  def main(args: Array[String]): Unit = {
    val spark = org.apache.spark.sql.SparkSession
      .builder()
      .master("local[*]")
      .config("spark.cassandra.connection.host", "localhost")
      .appName("Spark Cassandra Connector Example")
      .getOrCreate()

    import spark.implicits._

    //Read Cassandra data using DataFrame
    val FirstDF = Seq(("India", "WB", "Kolkata", "Cult Fit"),("India", "KA", "Bengaluru", "Cult Fit")).toDF("country_code", "state_province","city","gym_name")
    FirstDF.show(10)
    FirstDF.write
          .format("org.apache.spark.sql.cassandra")
          .mode("append")
          .option("confirm.truncate", "true")
          .option("spark.cassandra.connection.host", "localhost")
          .option("spark.cassandra.connection.port", "9042")
          .option("keyspace", "emc_test")
          .option("table", "crossfit_gyms_by_city_new")
          .save()
    val loaddf1 = spark.read
      .format("org.apache.spark.sql.cassandra")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
      .load()
    loaddf1.show(10)

//    spark.implicits.wait(5000)

    val SecondDF = Seq(("India", "WB", "Siliguri", "CultFit"),("India", "KA", "Bengaluru", "CultFit")).toDF("country_code", "state_province","city","gym_name")
    SecondDF.show(10)

    SecondDF.write
      .format("org.apache.spark.sql.cassandra")
      .mode("append")
      .option("confirm.truncate", "true")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .option("keyspace", "emc_test")
      .option("table", "crossfit_gyms_by_city_new")
      .save()

    val loaddf2 = spark.read
      .format("org.apache.spark.sql.cassandra")
      .option("spark.cassandra.connection.host", "localhost")
      .option("spark.cassandra.connection.port", "9042")
      .options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
      .load()
    loaddf2.show(10)


  }
}

注意:我将 Scala 用于 Spark 框架。

【问题讨论】:

  • 如果在创建 SparkSession 时设置了.option("spark.cassandra.connection.host", "localhost"),则每次读取都不需要

标签: scala apache-spark cassandra spark-cassandra-connector


【解决方案1】:

在 Cassandra 中,一切都是 upsert - 如果行不存在,则插入,如果存在,则更新,因此您只需将数据放入 RDD 或 DataFrame 并使用 Spark Cassandra 连接器的相应功能:

saveToCassandra for RDD API:

rdd.saveToCassandra("keyspace", "table")

或者只是write inDataFrame API:

df.write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "table_name", "keyspace" -> "keyspace_name"))
  .mode(SaveMode.Append)
  .save()

【讨论】:

  • 我在不添加任何“模式”的情况下执行代码,然后它会引发异常:' SaveMode 设置为 ErrorIfExists 并且表 emc_test.crossfit_gyms_by_city 已经存在并包含数据。也许您打算将 DataFrame 写入模式设置为 Append?示例:df.write.format.options.mode(SaveMode.Append).save()'
  • 为更清楚起见,我添加了 Spark 代码并更新了查询以使问题更易于理解。
  • 只需使用SaveMode.Append - 它会处理现有数据和新数据
  • 更有效的方法是删除 spark 中的重复记录,例如通过 groupByKey 等。在这种情况下,您将只写入唯一值。从 Cassandra 的角度来看,插入和更新之间没有区别。
  • 简短回答 - 不。至少定义服务器上数据位置的分区键需要列名,并且完整主键定义数据的唯一性。我建议在academy.datastax.com上参加 DS201 和 DS220 课程
【解决方案2】:

为了实现这一点,有一些事实可以帮助您浏览您将遇到的代码示例

在之前的 Spark 1 代码中,我们将使用
1 个 SparkContext see docs
2 要连接到 Cassandra,请使用由 SparkContext 构造的 CassandraSQLContext

对于 Spark 2,这已经发生了很大变化
创建一个 Spark 会话和一个 CassandraConnector[1]

然后您将使用 [1] 中所示的会话运行本机 SQL

一旦您完成了此设置并开始工作,您只需为 SCD 类型 1 操作执行适当的 sql,即可找到所涉及的 sql 的good examples

【讨论】:

  • 建议的解决方案相当模糊。
  • 我之前没有看到你的代码更新,我专注于“不共享任何 CQL”
猜你喜欢
  • 1970-01-01
  • 2020-04-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-10-08
  • 2015-03-12
  • 2018-11-28
相关资源
最近更新 更多