【问题标题】:Apache Spark Structured Streaming - not writing to checkpoint locationApache Spark Structured Streaming - 不写入检查点位置
【发布时间】:2022-01-14 15:43:53
【问题描述】:

我有一个简单的 Apache Spark Structured Streaming python 代码,它从 Kafka 读取数据,并将消息写入控制台。

我已经设置了检查点位置,但是代码没有写入检查点.. 任何想法为什么?

代码如下:

from pyspark.sql import SparkSession, Window


spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate()
# os.environ["SPARK_HOME"] = "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0'

# kafkaBrokers='localhost:9092'
kafkaBrokers='<host>:<port>'
topic = "my-topic"
# bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093
security_protocol="SSL"
ssl_truststore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/ca.p12"
ssl_truststore_password="<pwd_1>"
ssl_keystore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/user.p12"
ssl_keystore_password="<pwd_2>"
consumerGroupId = "my-group"

spark.sparkContext.setLogLevel("ERROR")

df = spark.read.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.truststore.location",ssl_truststore_location) \
    .option("kafka.ssl.truststore.password",ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location)\
    .option("kafka.ssl.keystore.password", ssl_keystore_password)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .load()

query = df.selectExpr("CAST(value AS STRING)") \
    .write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation", "~/PycharmProjects/Kafka/checkpoint/") \
    .option("outputMode", "complete")\
    .save("output")

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming spark-checkpoint


    【解决方案1】:

    根据官方文档,此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时在 DataStreamWriter 中设置为选项。但是在您的代码中,它是驱动程序上的本地路径,因此重新启动流式作业后,它将丢失。

    详情请参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-03
      • 1970-01-01
      • 2019-06-11
      • 2020-09-08
      • 2020-10-29
      • 1970-01-01
      • 2018-07-12
      • 2020-03-19
      相关资源
      最近更新 更多