【问题标题】:Structured Streaming: Reading from multiple Kafka topics at once结构化流:一次读取多个 Kafka 主题
【发布时间】:2020-07-25 16:43:28
【问题描述】:

我有一个 Spark 结构化流应用程序,它必须一次读取 12 个 Kafka 主题(不同的模式,Avro 格式),反序列化数据并存储在 HDFS 中。当我使用我的代码阅读单个主题时,它工作正常且没有错误,但是在一起运行多个查询时,我收到以下错误

java.lang.IllegalStateException: Race while writing batch 0

我的代码如下:

def main(args: Array[String]): Unit = {


  val kafkaProps = Util.loadProperties(kafkaConfigFile).asScala
  val topic_list = ("topic1", "topic2", "topic3", "topic4")

  topic_list.foreach(x => {
kafkaProps.update("subscribe", x)

val source= Source.fromInputStream(Util.getInputStream("/schema/topics/" + x)).getLines.mkString
val schemaParser = new Schema.Parser
val schema = schemaParser.parse(source)
val sqlTypeSchema = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]

val kafkaStreamData = spark
  .readStream
  .format("kafka")
  .options(kafkaProps)
  .load()

val udfDeserialize = udf(deserialize(source), DataTypes.createStructType(sqlTypeSchema.fields))

val transformedDeserializedData = kafkaStreamData.select("value").as(Encoders.BINARY)
  .withColumn("rows", udfDeserialize(col("value")))
  .select("rows.*")

val query = transformedDeserializedData
  .writeStream
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .format("parquet")
  .option("path", "/output/topics/" + x)
  .option("checkpointLocation", checkpointLocation + "//" + x)
  .start()  
})  
spark.streams.awaitAnyTermination()  
 }

【问题讨论】:

  • 为什么不使用 KAFKA Connect?让生活更轻松,也可能会出现小文件问题。
  • 有趣的方法。当我假设只有一个这样的主题时,格式是正确的。将在某个阶段尝试这个。另请参阅:waitingforcode.com/apache-spark-structured-streaming/…。并发问题?
  • @thebluephantom 这可能与小文件无关,因为数据非常大。无论如何,你能指导我如何使用 Kafka Connect 做到这一点吗?
  • 好的,小文件的常见问题很酷。 KAFKA Connect 是通过 Confluent 或 KAFKA CONNECT 本身实现的。关键是您需要一个 KAFKA Connect 集群,这通常是一个需要通过管理员设置的系统。您需要了解您的组织对此有何看法。\

标签: apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

另类。您可以使用 KAFKA Connect(来自 Confluent)、NIFI、StreamSets 等,因为您的用例似乎适合“dump/persist to HDFS”。也就是说,您需要拥有这些工具(已安装)。你说的小文件问题不是问题,就这样吧。

从 Apache Kafka 0.9 或更高版本,您可以使用 Kafka Connect API for KAFKA --> HDFS Sink(各种支持的 HDFS 格式)。虽然您需要一个 KAFKA Connect 集群,但无论如何这都是基于您现有的集群,所以没什么大不了的。但是需要有人维护。

一些链接可以帮助您上路:

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-09-19
    • 1970-01-01
    • 2019-08-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-29
    • 1970-01-01
    相关资源
    最近更新 更多