为了在使用结构化 Spark 流处理流的同时在 Cassandra 中注册数据,您需要:
- 导入 com.datastax.driver.core.Session
- 导入 com.datastax.spark.connector.cql.CassandraConnector
然后,构建您的连接器:
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
同时拥有 session 和 connector,您现在可以调用您在 Statement 中编写的 prepared Statement 函数斯卡拉类
connector.withSessionDo { session =>
Statements.PreparedStatement()
}
你终于可以用下面的函数在Cassandra中写数据了,cql是将变量绑定到准备好的Statement并执行它的函数:
private def processRow(value: Commons.UserEvent) = {
connector.withSessionDo { session =>
session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream))
}
}
当然,您必须在 foreach 编写器中调用此函数 (processRow)
// This Foreach sink writer writes the output to cassandra.
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Commons.UserEvent) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {}
}
val query =
ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start