【发布时间】:2020-07-25 16:43:28
【问题描述】:
我有一个 Spark 结构化流应用程序,它必须一次读取 12 个 Kafka 主题(不同的模式,Avro 格式),反序列化数据并存储在 HDFS 中。当我使用我的代码阅读单个主题时,它工作正常且没有错误,但是在一起运行多个查询时,我收到以下错误
java.lang.IllegalStateException: Race while writing batch 0
我的代码如下:
def main(args: Array[String]): Unit = {
val kafkaProps = Util.loadProperties(kafkaConfigFile).asScala
val topic_list = ("topic1", "topic2", "topic3", "topic4")
topic_list.foreach(x => {
kafkaProps.update("subscribe", x)
val source= Source.fromInputStream(Util.getInputStream("/schema/topics/" + x)).getLines.mkString
val schemaParser = new Schema.Parser
val schema = schemaParser.parse(source)
val sqlTypeSchema = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
val kafkaStreamData = spark
.readStream
.format("kafka")
.options(kafkaProps)
.load()
val udfDeserialize = udf(deserialize(source), DataTypes.createStructType(sqlTypeSchema.fields))
val transformedDeserializedData = kafkaStreamData.select("value").as(Encoders.BINARY)
.withColumn("rows", udfDeserialize(col("value")))
.select("rows.*")
val query = transformedDeserializedData
.writeStream
.trigger(Trigger.ProcessingTime("5 seconds"))
.outputMode("append")
.format("parquet")
.option("path", "/output/topics/" + x)
.option("checkpointLocation", checkpointLocation + "//" + x)
.start()
})
spark.streams.awaitAnyTermination()
}
【问题讨论】:
-
为什么不使用 KAFKA Connect?让生活更轻松,也可能会出现小文件问题。
-
有趣的方法。当我假设只有一个这样的主题时,格式是正确的。将在某个阶段尝试这个。另请参阅:waitingforcode.com/apache-spark-structured-streaming/…。并发问题?
-
@thebluephantom 这可能与小文件无关,因为数据非常大。无论如何,你能指导我如何使用 Kafka Connect 做到这一点吗?
-
好的,小文件的常见问题很酷。 KAFKA Connect 是通过 Confluent 或 KAFKA CONNECT 本身实现的。关键是您需要一个 KAFKA Connect 集群,这通常是一个需要通过管理员设置的系统。您需要了解您的组织对此有何看法。\
标签: apache-spark apache-kafka spark-structured-streaming