【问题标题】:Structured Streaming (Spark 2.3.0) cannot write to Parquet file sink when submitted as a job当作为作业提交时,结构化流 (Spark 2.3.0) 无法写入 Parquet 文件接收器
【发布时间】:2018-10-30 19:23:01
【问题描述】:

我正在使用 Kafka 并在 EMRFS 中写入 parquet。以下代码适用于spark-shell

val filesink_query = outputdf.writeStream
  .partitionBy(<some column>)
  .format("parquet")
  .option("path", <some path in EMRFS>)
  .option("checkpointLocation", "/tmp/ingestcheckpoint")
  .trigger(Trigger.ProcessingTime(10.seconds))
  .outputMode(OutputMode.Append)
  .start 

SBT 能够无错误地打包代码。当 .jar 被发送到 spark-submit 时,该作业被接受并永远保持在运行状态,而不会将数据写入 HDFS。

.inprogress 日志中没有错误

一些帖子建议水印持续时间过长会导致它,但我没有设置自定义水印持续时间。

【问题讨论】:

  • 遇到同样的问题。你找到解决办法了吗?
  • 我们出于不相关的原因放弃了 EMR,但如果我没记错的话,我改为写入本地磁盘,然后分批复制到 S3 一段时间。

标签: apache-spark spark-structured-streaming


【解决方案1】:

我可以使用 Pyspark 写入 parquet,我把我的代码给你,以防万一有用:

stream = self.spark.readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", self.kafka_bootstrap_servers) \
          .option("subscribe", self.topic) \
          .option("startingOffsets", self.startingOffsets) \
          .option("max.poll.records", self.max_poll_records) \
          .option("auto.commit.interval.ms", self.auto_commit_interval_ms) \
          .option("session.timeout.ms", self.session_timeout_ms) \
          .option("key.deserializer", self.key_deserializer) \
          .option("value.deserializer", self.value_deserializer) \
          .load()

self.query = stream \
        .select(col("value")) \
        .select((self.proto_function("value")).alias("value_udf")) \
        .select(*columns,
                date_format(column_time, "yyyy").alias("date").alias("year"),
                date_format(column_time, "MM").alias("date").alias("month"),
                date_format(column_time, "dd").alias("date").alias("day"),
                date_format(column_time, "HH").alias("date").alias("hour"))

query = self.query \
            .writeStream \
            .format("parquet") \
            .option("checkpointLocation", self.path) \
            .partitionBy("year", "month", "day", "hour") \
            .option("path", self.path) \
            .start()

另外,您需要以这种方式运行代码:spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 &lt;code&gt;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-21
    • 1970-01-01
    • 2021-04-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多