【问题标题】:Spark streaming with cassandra direct join don't work使用 cassandra 直接加入的 Spark 流式传输不起作用
【发布时间】:2022-08-12 16:30:19
【问题描述】:

嗨,伙计们!我正在尝试开发火花流媒体应用程序,但有一些问题。 一些细节: 我们有 Kafka 主题,spark 3.2.1 和 Cassandra 4.0.4,datastax spark-cassandra-connector 版本 com.datastax.spark:spark-cassandra-connector_2.12:3.1.0

我需要数据的下一条路线。

在 spark 中获取 kafka 消息并转换为 DataFrame -> 在两列上与 cassandra 现有表左连接,这是 cassandra 表中的复合主键* -> 如果具有该键的行已经存在,则以另一种方式 - 写入数据。

documentation 中写了关于新功能,自 SCC 2.5 以来可用的 DataFrame API 不仅来自 DSE,还包括 DirectJoinjoinWithCassandraTable在 RDD API 中。如果我尝试使用 Datasourse V2 API,我会在 spark 端获得通常的 SortMergeJoin。坦率地说,它不是真正的“流媒体”应用程序,在 cassandra 中添加数据我使用微批处理方式。

== Physical Plan ==
AppendData (12)
+- * Project (11)
   +- * Filter (10)
      +- * SortMergeJoin LeftOuter (9)
         :- * Sort (4)
         :  +- Exchange (3)
         :     +- * SerializeFromObject (2)
         :        +- Scan (1)
         +- * Sort (8)
            +- Exchange (7)
               +- * Project (6)
                  +- BatchScan (5)


(1) Scan
Output [1]: [obj#342]
Arguments: obj#342: org.apache.spark.sql.Row, MapPartitionsRDD[82] at start at RunnableStream.scala:13

(2) SerializeFromObject [codegen id : 1]
Input [1]: [obj#342]
Arguments: [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), LongType) AS user_id#343L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_type), StringType), true, false, true) AS user_type#344, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, order_id), StringType), true, false, true) AS order_id#345, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, status_name), StringType), true, false, true) AS status_name#346, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, status_dttm), TimestampType), true, false, true) AS status_dttm#347]

(3) Exchange
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: hashpartitioning(user_id#343L, user_type#344, 16), ENSURE_REQUIREMENTS, [id=#793]

(4) Sort [codegen id : 2]
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: [user_id#343L ASC NULLS FIRST, user_type#344 ASC NULLS FIRST], false, 0

(5) BatchScan
Output [2]: [user_id#348L, user_type#349]
Cassandra Scan: keyspace_name.table_name
 - Cassandra Filters: []
 - Requested Columns: [user_id,user_type]

(6) Project [codegen id : 3]
Output [2]: [user_id#348L, user_type#349]
Input [2]: [user_id#348L, user_type#349]

(7) Exchange
Input [2]: [user_id#348L, user_type#349]
Arguments: hashpartitioning(user_id#348L, user_type#349, 16), ENSURE_REQUIREMENTS, [id=#801]

(8) Sort [codegen id : 4]
Input [2]: [user_id#348L, user_type#349]
Arguments: [user_id#348L ASC NULLS FIRST, user_type#349 ASC NULLS FIRST], false, 0

(9) SortMergeJoin [codegen id : 5]
Left keys [2]: [user_id#343L, user_type#344]
Right keys [2]: [user_id#348L, user_type#349]
Join condition: None

(10) Filter [codegen id : 5]
Input [7]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347, user_id#348L, user_type#349]
Condition : (isnull(user_id#348L) = true)

(11) Project [codegen id : 5]
Output [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Input [7]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347, user_id#348L, user_type#349]

(12) AppendData
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3358/1878168161@32616db8, org.apache.spark.sql.connector.write.WriteBuilder$1@1d354f3b

以另一种方式,如果我尝试使用 Datasource V1 并明确指出直接加入设置将 cassandra 表作为 DataFrame 获取时,例如

spark.read.cassandraFormat(\"tableName\", \"keyspace\").option(\"directJoinSetting\", \"on\").load

这在加入时调用错误:

Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.UnaryExecNode.children$(Lorg/apache/spark/sql/execution/UnaryExecNode;)Lscala/collection/Seq;
    at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinExec.children(CassandraDirectJoinExec.scala:18)
    at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$.hasCassandraChild(CassandraDirectJoinStrategy.scala:206)
    at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$$anonfun$1.applyOrElse(CassandraDirectJoinStrategy.scala:241)
    at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$$anonfun$1.applyOrElse(CassandraDirectJoinStrategy.scala:240)

完整的 spark-submit 命令

/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master yarn --deploy-mode cluster --name \"name\" \\
--conf spark.driver.cores=1 \\
--conf spark.driver.memory=1g \\
--conf spark.driver.extraJavaOptions=\"-XX:+UseG1GC -Duser.timezone=GMT -Dfile.encoding=utf-8 -Dlog4j.configuration=name_Log4j.properties\" \\
--conf spark.executor.instances=1 \\
--conf spark.executor.cores=4 \\
--conf spark.executor.memory=8g \\
--conf spark.executor.extraJavaOptions=\"-XX:+UseG1GC -Duser.timezone=GMT -Dfile.encoding=utf-8 -Dlog4j.configuration=name_Log4j.properties\" \\
--conf spark.yarn.queue=default \\
--conf spark.yarn.submit.waitAppCompletion=true \\
--conf spark.eventLog.enabled=true \\
--conf spark.eventLog.dir=hdfs:///spark3-history/ \\
--conf spark.eventLog.compress=true \\
--conf spark.sql.shuffle.partitions=16 \\
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \\
--conf spark.sql.catalog.cassandracatalog=com.datastax.spark.connector.datasource.CassandraCatalog \\
--conf spark.sql.dse.search.enableOptimization=on \\
--conf spark.cassandra.connection.host=cassandra_host \\
--conf spark.cassandra.auth.username=user_name \\
--conf spark.cassandra.auth.password=*** \\
--conf spark.sql.directJoinSetting=on \\
--class ...

到 cassandra 的类连接器

import org.apache.spark.sql._

class CassandraConnector(
  val ss: SparkSession,
  catalog: String,
  keyspace: String,
  table: String
) extends Serializable {

  def read: DataFrame = ss.read.table(s\"$catalog.$keyspace.$table\")
  def writeDirect(dataFrame: DataFrame): Unit = dataFrame.writeTo(s\"$catalog.$keyspace.$table\").append()

}

卡桑德拉 ttl 表

CREATE KEYSPACE IF NOT EXISTS keyspace_name
WITH replication = {\'class\': \'SimpleStrategy\', \'replication_factor\': 3};

CREATE TABLE IF NOT EXISTS keyspace_name.table_name
(
    user_id BIGINT,
    user_type VARCHAR,
    order_id VARCHAR,
    status_name VARCHAR,
    status_dttm timestamp,
    PRIMARY KEY (user_id, user_type)
);

正在加入并写入 cassandra 的方法

 override def writeBatch(batch: Dataset[Row], batchId: Long): Unit = {
    val result =
      batch
        .as(\"df\")
        .join(
          cassandraConnector.read
            .as(\"cass\"),
          col(\"df.user_id\") === col(\"cass.user_id\")
            && col(\"df.user_type\") === col(\"cass.user_type\"),
          \"left\"
        )
        .withColumn(\"need_write\", when(col(\"cass.user_id\").isNull, true).otherwise(false))
        .filter(col(\"need_write\") === true)
        .select(\"df.user_id\", \"df.user_type\", \"df.order_id\", \"df.status_name\", \"df.status_dttm\")

    cassandraConnector.writeDirect(result)

  }

有人可以解释我做错了什么吗?

  • 使用什么版本的 Spark Cassandra 连接器?没看到版本
  • @AlexOtt,在问题中添加了版本com.datastax.spark:spark-cassandra-connector_2.12:3.1.0 thk
  • 同样的问题在这里讨论community.datastax.com/questions/12524/…

标签: apache-spark apache-spark-sql cassandra spark-cassandra-connector


【解决方案1】:

是的,Spark Cassandra 连接器的版本是问题的根源 - 高级功能,例如 Direct Join 严重依赖于可能在版本之间更改的 Spark 内部类。所以如果你使用Spark 3.2,那么你需要使用对应版本的SCC:com.datastax.spark:spark-cassandra-connector_2.12:3.2.0

请注意,目前还没有 Spark 3.3 的版本...

附言我有一个关于使用直接连接的blog post - 这对你来说可能很有趣。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-08-11
    • 2016-07-07
    • 2016-05-03
    • 1970-01-01
    • 2015-08-20
    • 1970-01-01
    • 2021-07-28
    • 1970-01-01
    相关资源
    最近更新 更多