【发布时间】:2019-12-07 03:23:10
【问题描述】:
我正在尝试使用 spark 结构化流连接远程 cassandra 节点。
我可以在我的本地机器上连接到现有的 cassandra 节点。
这是我可以在本地机器上连接 Cassandra 的代码:
parsed = parsed_df \
.withWatermark("sourceTimeStamp", "10 minutes") \
.groupBy(
window(parsed_df.sourceTimeStamp, "4 seconds"),
parsed_df.id
) \
.agg({"value": "avg"}) \
.withColumnRenamed("avg(value)", "avg")\
.withColumnRenamed("window", "sourceTime")
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table="opc", keyspace="poc")\
.save()
parsed.writeStream \
.foreachBatch(writeToCassandra) \
.outputMode("update") \
.start()
但是,我想连接远程 cassandra 节点。我该如何指定?
【问题讨论】:
-
这个问题代码有你的answer
标签: apache-spark pyspark cassandra spark-structured-streaming spark-cassandra-connector