【问题标题】:Writing Spark Structure Streaming data into Cassandra将 Spark 结构化流数据写入 Cassandra
【发布时间】:2018-10-06 19:36:01
【问题描述】:

我想使用 Pyspark API 将结构流数据写入 Cassandra。

我的数据流如下:

Nifi -> Kafka -> Spark Structure Streaming -> Cassandra

我尝试过以下方式:

query = df.writeStream\
  .format("org.apache.spark.sql.cassandra")\
  .option("keyspace", "demo")\
  .option("table", "test")\
  .start()

但收到以下错误消息: “org.apache.spark.sql.cassandra”不支持流式写入。

我也尝试过另一种方法:[来源 - DSE 6.0 Administrator Guide]

query = df.writeStream\
   .cassandraFormat("test", "demo")\
   .start()

但出现异常:AttributeError: 'DataStreamWriter' 对象没有属性 'cassandraFormat'

谁能给我一些想法,我该如何进一步进行?

提前致谢。

【问题讨论】:

    标签: apache-spark cassandra pyspark datastax spark-structured-streaming


    【解决方案1】:

    升级 DSE 6.0(最新版本)后,我可以将结构化流数据写入 Cassandra。 [Spark 2.2 和 Cassandra 3.11]

    参考代码:

    query = fileStreamDf.writeStream\
     .option("checkpointLocation", '/tmp/check_point/')\
     .format("org.apache.spark.sql.cassandra")\
     .option("keyspace", "analytics")\
     .option("table", "test")\
     .start()
    

    DSE 文档网址:https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/structuredStreaming.html

    【讨论】:

    • 除了使用 DSE 中的 Cassandra 之外,您是否还必须使用与 DSE 捆绑的 Spark?我尝试使用 IDEA 中的 Spark 库,但无法摆脱“org.apache.spark.sql.cassandra 不支持流式写入”错误。顺便说一句,我发现了这个讨论:groups.google.com/a/lists.datastax.com/forum/#!msg/…
    • 我有类似的问题。我无法使用 DataProc Spark 使其工作。
    【解决方案2】:

    此答案用于将数据写入 Cassandra,而不是 DSE (which supports Structured Streaming for storing data)

    对于 Spark 2.4.0 及更高版本,您可以使用 foreachBatch 方法,该方法允许您使用 Spark Cassandra 连接器提供的 Cassandra 批处理数据写入器将流式查询的每个微批处理的输出写入 Cassandra:

    import org.apache.spark.sql.cassandra._
    
    df.writeStream
      .foreachBatch { (batchDF, _) => 
        batchDF
         .write
         .cassandraFormat("tableName", "keyspace")
         .mode("append")
         .save
      }.start
    

    对于低于 2.4.0 的 Spark 版本,您需要实现一个 foreach 接收器。

    import com.datastax.spark.connector.cql.CassandraConnector
    import com.datastax.driver.core.querybuilder.QueryBuilder
    import com.datastax.driver.core.Statement
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.Row
    
    class CassandraSink(sparkConf: SparkConf) extends ForeachWriter[Row] {
        def open(partitionId: Long, version: Long): Boolean = true
    
        def process(row: Row) = {
          def buildStatement: Statement =
            QueryBuilder.insertInto("keyspace", "tableName")
              .value("key", row.getAs[String]("value"))
          CassandraConnector(sparkConf).withSessionDo { session =>
            session.execute(buildStatement)
          }
        }
    
        def close(errorOrNull: Throwable) = Unit
    }
    

    然后你可以按如下方式使用foreach sink:

    df.writeStream
     .foreach(new CassandraSink(spark.sparkContext.getConf))
     .start
    

    【讨论】:

    • 谢谢,但我没有使用 DSE cassandra ...我正在使用 spark-sql-2.4.x 和 spark-cassandra-connector-2_11 和 cassandra 3.0,这可以吗?跨度>
    • 是的,我的答案是写入 Cassandra,而不是 DSE。您可以将 foreachBatch 用于 Spark 2.4.x
    • 甚至认为它工作正常......它需要很长时间才能产生结果......我尝试了 15 秒的“触发”选项......即...... companyDfAfterJoin .writeStream () .trigger(Trigger.ProcessingTime("15 seconds")) .foreachBatch((batchDf, batchId) -> {..... 我应该怎么做才能让它每 15 秒处理一次。
    • 出现错误:org.apache.spark.sql.AnalysisException:当流数据帧/数据集没有水印的流数据帧/数据集存在流聚合时,不支持附加输出模式;;项目 [company_id#80, company_name#66, year#81, Quarter#82, mean_revenu
    • 我点击了这个链接 ...spark.apache.org/docs/latest/… .... 上面的错误搞砸了。
    【解决方案3】:

    除了:

    • 关注(并投票支持)corresponding JIRA
    • 实现所需功能并自行打开 PR。

    除此之外,你可以直接创建使用foreach sink并直接写。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-11-29
      • 2020-02-06
      • 2020-06-16
      • 2017-08-25
      • 1970-01-01
      • 1970-01-01
      • 2016-05-14
      相关资源
      最近更新 更多