【发布时间】:2018-08-22 17:25:12
【问题描述】:
我正在尝试为每条 kafka 消息查询 cassandra 表。
以下是我一直在处理的代码:
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark SQL basic example")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.getOrCreate()
val topicsSet = List("Test").toSet
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "12345",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
val lines = messages.map(_.value)
val lines_myobjects = lines.map(line =>
new Gson().fromJson(line, classOf[myClass]) // The myClass is a simple case class which extends serializable
//This changes every single message into an object
)
现在事情变得复杂了,我无法绕过与来自 kafka 消息的消息相关的查询 cassandra 表的地步。每个单独的 kafka 消息对象都有一个返回方法。
我尝试了多种方法来解决这个问题。例如:
val transformed_data = lines_myobjects.map(myobject => {
val forest = spark.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "mytable", "keyspace" -> "mydb"))
.load()
.filter("userid='" + myobject.getuserId + "'")
)}
我也尝试过ssc.cassandraTable,但没有成功。
主要目标是从数据库中获取用户ID与来自kafka消息的用户ID匹配的所有行。
我想提一提的是,即使每次加载或查询 cassandra 数据库效率不高,但 cassandra 数据库每次都会更改。
【问题讨论】:
标签: scala apache-spark cassandra apache-kafka spark-cassandra-connector