【问题标题】:Query Cassandra table for every Kafka Message为每个 Kafka 消息查询 Cassandra 表
【发布时间】:2018-08-22 17:25:12
【问题描述】:

我正在尝试为每条 kafka 消息查询 cassandra 表。

以下是我一直在处理的代码:

 def main(args: Array[String]) {
 val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("Spark SQL basic example")
  .config("spark.cassandra.connection.host", "localhost")
  .config("spark.cassandra.connection.port", "9042")
  .getOrCreate()

val topicsSet = List("Test").toSet
val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "12345",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
          )
val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

val lines = messages.map(_.value)

val lines_myobjects = lines.map(line =>
  new Gson().fromJson(line, classOf[myClass]) // The myClass is a simple case class which extends serializable
//This changes every single message into an object
)

现在事情变得复杂了,我无法绕过与来自 kafka 消息的消息相关的查询 cassandra 表的地步。每个单独的 kafka 消息对象都有一个返回方法。

我尝试了多种方法来解决这个问题。例如:

val transformed_data = lines_myobjects.map(myobject => {
   val forest = spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(Map( "table" -> "mytable", "keyspace" -> "mydb"))
    .load()
    .filter("userid='" + myobject.getuserId + "'")
)}

我也尝试过ssc.cassandraTable,但没有成功。

主要目标是从数据库中获取用户ID与来自kafka消息的用户ID匹配的所有行。

我想提一提的是,即使每次加载或查询 cassandra 数据库效率不高,但 cassandra 数据库每次都会更改。

【问题讨论】:

    标签: scala apache-spark cassandra apache-kafka spark-cassandra-connector


    【解决方案1】:

    您不能在.map( 中执行spark.readssc.cassandraTable。因为这意味着您将尝试为每条消息创建新的 RDD。它不应该那样工作。

    请考虑以下选项:

    1 - 如果您可以通过一/两个 CQL 查询来询问所需数据,请尝试在 .mapPartitions( 中使用 CassandraConnector。像这样的:

    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql._
    
    val connector = ...instantiate CassandraConnector onece here
    val transformed_data = lines_myobjects.mapPartitions(it => {
       connector.withSessionDo { session =>
           it.map(myobject => session.execute("CQL QUERY TO GET YOUR DATA HERE", myobject.getuserId)
    })
    

    2 - 否则(如果您按主/分区键选择)考虑.joinWithCassandraTable。像这样的:

    import com.datastax.spark.connector._
    
    val mytableRDD = sc.cassandraTable("mydb", "mytable")
    val transformed_data = lines_myobjects
        .map(myobject => {
           Tuple1(myobject.getuserId) // you need to wrap ids to a tuple to do join with Cassandra
        })
        .joinWithCassandraTable("mydb", "mytable")
        // process results here
    

    【讨论】:

    • 谢谢。第二种情况绝对不是我的选择。我想知道会话是否是性能杀手。查询可以有 MB 的数据。工作是否分布在工作节点上?
    • 您可以为每个分区创建会话。只需使用 .mapPartitions 代替
    • 我无法绕过 mapPartitions 方法。我要么得到 myobject.getUserId 不是迭代器的成员,而是期待迭代器。
    • 如果是mapPartitions,您将有一个myobjects 的迭代器。
    【解决方案2】:

    我会以不同的方式处理这个问题。 流入 Cassandra 的数据,通过 Kafka 路由(并使用 Kafka Connect sink 从 Kafka 发送到 Cassandra)。 使用 Kafka 中的数据,您可以在数据流之间连接,无论是在 Spark 中,还是使用 Kafka 的 Streams API 或 KSQL。 Kafka Streams 和 KSQL 都支持您在此处执行的流表连接。您可以使用 KSQL herehere 看到它的实际效果。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-06-12
      • 1970-01-01
      相关资源
      最近更新 更多