【问题标题】:Spark (SQL / Structured Streaming) Cassandra - PreparedStatementSpark (SQL / 结构化流) Cassandra - PreparedStatement
【发布时间】:2017-08-25 17:18:59
【问题描述】:

我正在使用 Spark 结构化流实时进行机器学习,我想将预测存储在我的 Cassandra 集群中。

由于我在流式上下文中,每秒执行多次相同的请求,因此一项强制性优化是使用 PreparedStatement。

在 cassandra spark 驱动程序 (https://github.com/datastax/spark-cassandra-connector) 中无法使用 PreparedStatement(在 scala 或 python 中,我不考虑使用 java)

我应该使用 scala (https://github.com/outworkers/phantom) / python (https://github.com/datastax/python-driver) cassandra 驱动程序吗? 那么它是如何工作的,我的连接对象需要可序列化才能传递给工作人员?

如果有人可以帮助我!

谢谢:)

【问题讨论】:

    标签: apache-spark cassandra spark-streaming spark-dataframe spark-cassandra-connector


    【解决方案1】:

    为了在使用结构化 Spark 流处理流的同时在 Cassandra 中注册数据,您需要:

    • 导入 com.datastax.driver.core.Session
    • 导入 com.datastax.spark.connector.cql.CassandraConnector

    然后,构建您的连接器:

     val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf) 
    

    同时拥有 sessionconnector,您现在可以调用您在 Statement 中编写的 prepared Statement 函数斯卡拉类

     connector.withSessionDo { session =>
     Statements.PreparedStatement()
    

    }

    你终于可以用下面的函数在Cassandra中写数据了,cql是将变量绑定到准备好的Statement并执行它的函数:

      private def processRow(value: Commons.UserEvent) = {
      connector.withSessionDo { session =>
      session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream))
    }
    

    }

    当然,您必须在 foreach 编写器中调用此函数 (processRow)

         // This Foreach sink writer writes the output to cassandra.
    import org.apache.spark.sql.ForeachWriter
    val writer = new ForeachWriter[Commons.UserEvent] {
      override def open(partitionId: Long, version: Long) = true
      override def process(value: Commons.UserEvent) = {
        processRow(value)
      }
      override def close(errorOrNull: Throwable) = {}
    }
    
    val query =
      ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start
    

    【讨论】:

      猜你喜欢
      • 2018-10-06
      • 2017-05-04
      • 2019-12-07
      • 2020-08-11
      • 1970-01-01
      • 2017-03-06
      • 1970-01-01
      • 1970-01-01
      • 2019-01-10
      相关资源
      最近更新 更多