【发布时间】:2022-08-12 16:30:19
【问题描述】:
嗨,伙计们!我正在尝试开发火花流媒体应用程序,但有一些问题。 一些细节: 我们有 Kafka 主题,spark 3.2.1 和 Cassandra 4.0.4,datastax spark-cassandra-connector 版本 com.datastax.spark:spark-cassandra-connector_2.12:3.1.0
我需要数据的下一条路线。
在 spark 中获取 kafka 消息并转换为 DataFrame -> 在两列上与 cassandra 现有表左连接,这是 cassandra 表中的复合主键* -> 如果具有该键的行已经存在,则以另一种方式 - 写入数据。
在documentation 中写了关于新功能,自 SCC 2.5 以来可用的 DataFrame API 不仅来自 DSE,还包括 DirectJoinjoinWithCassandraTable在 RDD API 中。如果我尝试使用 Datasourse V2 API,我会在 spark 端获得通常的 SortMergeJoin。坦率地说,它不是真正的“流媒体”应用程序,在 cassandra 中添加数据我使用微批处理方式。
== Physical Plan == AppendData (12) +- * Project (11) +- * Filter (10) +- * SortMergeJoin LeftOuter (9) :- * Sort (4) : +- Exchange (3) : +- * SerializeFromObject (2) : +- Scan (1) +- * Sort (8) +- Exchange (7) +- * Project (6) +- BatchScan (5) (1) Scan Output [1]: [obj#342] Arguments: obj#342: org.apache.spark.sql.Row, MapPartitionsRDD[82] at start at RunnableStream.scala:13 (2) SerializeFromObject [codegen id : 1] Input [1]: [obj#342] Arguments: [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), LongType) AS user_id#343L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_type), StringType), true, false, true) AS user_type#344, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, order_id), StringType), true, false, true) AS order_id#345, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, status_name), StringType), true, false, true) AS status_name#346, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, status_dttm), TimestampType), true, false, true) AS status_dttm#347] (3) Exchange Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347] Arguments: hashpartitioning(user_id#343L, user_type#344, 16), ENSURE_REQUIREMENTS, [id=#793] (4) Sort [codegen id : 2] Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347] Arguments: [user_id#343L ASC NULLS FIRST, user_type#344 ASC NULLS FIRST], false, 0 (5) BatchScan Output [2]: [user_id#348L, user_type#349] Cassandra Scan: keyspace_name.table_name - Cassandra Filters: [] - Requested Columns: [user_id,user_type] (6) Project [codegen id : 3] Output [2]: [user_id#348L, user_type#349] Input [2]: [user_id#348L, user_type#349] (7) Exchange Input [2]: [user_id#348L, user_type#349] Arguments: hashpartitioning(user_id#348L, user_type#349, 16), ENSURE_REQUIREMENTS, [id=#801] (8) Sort [codegen id : 4] Input [2]: [user_id#348L, user_type#349] Arguments: [user_id#348L ASC NULLS FIRST, user_type#349 ASC NULLS FIRST], false, 0 (9) SortMergeJoin [codegen id : 5] Left keys [2]: [user_id#343L, user_type#344] Right keys [2]: [user_id#348L, user_type#349] Join condition: None (10) Filter [codegen id : 5] Input [7]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347, user_id#348L, user_type#349] Condition : (isnull(user_id#348L) = true) (11) Project [codegen id : 5] Output [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347] Input [7]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347, user_id#348L, user_type#349] (12) AppendData Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347] Arguments: org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3358/1878168161@32616db8, org.apache.spark.sql.connector.write.WriteBuilder$1@1d354f3b以另一种方式,如果我尝试使用 Datasource V1 并明确指出直接加入设置将 cassandra 表作为 DataFrame 获取时,例如
spark.read.cassandraFormat(\"tableName\", \"keyspace\").option(\"directJoinSetting\", \"on\").load这在加入时调用错误:
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.UnaryExecNode.children$(Lorg/apache/spark/sql/execution/UnaryExecNode;)Lscala/collection/Seq; at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinExec.children(CassandraDirectJoinExec.scala:18) at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$.hasCassandraChild(CassandraDirectJoinStrategy.scala:206) at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$$anonfun$1.applyOrElse(CassandraDirectJoinStrategy.scala:241) at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$$anonfun$1.applyOrElse(CassandraDirectJoinStrategy.scala:240)完整的 spark-submit 命令
/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master yarn --deploy-mode cluster --name \"name\" \\ --conf spark.driver.cores=1 \\ --conf spark.driver.memory=1g \\ --conf spark.driver.extraJavaOptions=\"-XX:+UseG1GC -Duser.timezone=GMT -Dfile.encoding=utf-8 -Dlog4j.configuration=name_Log4j.properties\" \\ --conf spark.executor.instances=1 \\ --conf spark.executor.cores=4 \\ --conf spark.executor.memory=8g \\ --conf spark.executor.extraJavaOptions=\"-XX:+UseG1GC -Duser.timezone=GMT -Dfile.encoding=utf-8 -Dlog4j.configuration=name_Log4j.properties\" \\ --conf spark.yarn.queue=default \\ --conf spark.yarn.submit.waitAppCompletion=true \\ --conf spark.eventLog.enabled=true \\ --conf spark.eventLog.dir=hdfs:///spark3-history/ \\ --conf spark.eventLog.compress=true \\ --conf spark.sql.shuffle.partitions=16 \\ --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \\ --conf spark.sql.catalog.cassandracatalog=com.datastax.spark.connector.datasource.CassandraCatalog \\ --conf spark.sql.dse.search.enableOptimization=on \\ --conf spark.cassandra.connection.host=cassandra_host \\ --conf spark.cassandra.auth.username=user_name \\ --conf spark.cassandra.auth.password=*** \\ --conf spark.sql.directJoinSetting=on \\ --class ...到 cassandra 的类连接器
import org.apache.spark.sql._ class CassandraConnector( val ss: SparkSession, catalog: String, keyspace: String, table: String ) extends Serializable { def read: DataFrame = ss.read.table(s\"$catalog.$keyspace.$table\") def writeDirect(dataFrame: DataFrame): Unit = dataFrame.writeTo(s\"$catalog.$keyspace.$table\").append() }卡桑德拉 ttl 表
CREATE KEYSPACE IF NOT EXISTS keyspace_name WITH replication = {\'class\': \'SimpleStrategy\', \'replication_factor\': 3}; CREATE TABLE IF NOT EXISTS keyspace_name.table_name ( user_id BIGINT, user_type VARCHAR, order_id VARCHAR, status_name VARCHAR, status_dttm timestamp, PRIMARY KEY (user_id, user_type) );正在加入并写入 cassandra 的方法
override def writeBatch(batch: Dataset[Row], batchId: Long): Unit = { val result = batch .as(\"df\") .join( cassandraConnector.read .as(\"cass\"), col(\"df.user_id\") === col(\"cass.user_id\") && col(\"df.user_type\") === col(\"cass.user_type\"), \"left\" ) .withColumn(\"need_write\", when(col(\"cass.user_id\").isNull, true).otherwise(false)) .filter(col(\"need_write\") === true) .select(\"df.user_id\", \"df.user_type\", \"df.order_id\", \"df.status_name\", \"df.status_dttm\") cassandraConnector.writeDirect(result) }有人可以解释我做错了什么吗?
-
使用什么版本的 Spark Cassandra 连接器?没看到版本
-
@AlexOtt,在问题中添加了版本
com.datastax.spark:spark-cassandra-connector_2.12:3.1.0thk -
同样的问题在这里讨论community.datastax.com/questions/12524/…
标签: apache-spark apache-spark-sql cassandra spark-cassandra-connector