【问题标题】:Read newline delimited json from Kafka message in Spark Structured Streaming从 Spark Structured Streaming 中的 Kafka 消息中读取换行符分隔的 json
【发布时间】:2021-05-05 21:43:45
【问题描述】:

我开发了一个 Python Kafka 生产者,它将多个 json 记录作为 nd-json 二进制字符串发送到 Kafka 主题。 然后我尝试使用 PySpark 在 Spark Structured Streaming 中读取这些消息,如下所示:

events_df = select(from_json(col("value").cast("string"), schema).alias("value"))

但此代码仅适用于单个 json 文档。 如果该值包含多条记录作为换行符分隔的 json,Spark 无法正确解码。

我不想为每个事件发送 kafka 消息。我怎样才能做到这一点?

【问题讨论】:

  • 好吧,这里的schema 是什么?为什么不能将单个对象作为消息发送?如果该模式仅表示单个 JSON 对象,则该模式将成为问题......并且from_json 也不适用于 ndjson。否则,如果您可以map 对记录进行拆分功能,以便将其变为单独的记录,那么您应该在此处执行此操作
  • 我通过从磁盘导入单个事件 json 自动生成了模式。如何拆分 kafka 消息的值,然后使用正确的模式正确解析 json?从磁盘读取实际上支持 ndjson 和模式推断
  • 就像我说的那样,schema 是 Struct,而不是 ndjson,AFAIK 没有有效的模式类型。解决方法是通过平面图分割新行上的记录,然后将它们表示为单独的数据帧行。您仍然没有说明为什么不想发送单个消息(请记住,Kafka 不是用于“文件传输”,因此您不应该比较从磁盘读取和从 Kafka 消费)
  • 在实时流中,海量数据源源不断地到来,将数据拆分并一个一个事件发送是不可想象的。
  • 是吗?您的回答正是这样做的。如果您确实希望消息中有多个对象,请使用适当的数组

标签: apache-spark pyspark apache-kafka spark-structured-streaming ndjson


【解决方案1】:

我设法以这种方式做我正在寻找的事情,用换行符分割全文字符串,然后将数组分解成行以使用模式进行解析:

    events = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "quickstart-events") \
        .option("startingOffsets", "earliest")\
        .load()\
        .selectExpr("CAST(value AS STRING) as data")
    
    events = events.select(explode(split(events.data, '\n')))
    events = events.select(from_json(col("col"), event_schema).alias('value'))
    events = events.selectExpr('value.*')```

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-12-30
    • 2019-04-27
    • 2021-12-05
    • 2023-03-08
    • 2019-10-26
    • 1970-01-01
    • 2015-09-13
    相关资源
    最近更新 更多