【问题标题】:Spark Structured Streaming from Kafka to Elastic Search从 Kafka 到 Elastic Search 的 Spark 结构化流
【发布时间】:2022-01-01 03:07:09
【问题描述】:

我想写一个从 Kafka 到 Elasticsearch 的 Spark Streaming Job。在这里,我想在从 Kafka 读取模式时动态检测模式。

你能帮我做吗?

我知道,这可以通过下一行在 Spark 批处理中完成。

val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema

但是在通过 Spark Streaming Job 执行相同的操作时,我们无法执行上述操作,因为流式处理只能在 Action 上进行。

请告诉我。

【问题讨论】:

    标签: apache-spark spark-streaming-kafka spark-kafka-integration elasticsearch-spark


    【解决方案1】:

    如果您正在收听 kafka 主题,则不能依靠 spark 自动推断 json 模式,因为这将花费大量时间。因此,您需要以某种方式将架构提供给您的应用程序。

    如果您正在从文件源监听,则可以这样做。

    'spark.sql.streaming.schemaInference', 'true'
    

    【讨论】:

    • 问题状态数据来自 Kafka 源,而不是文件。 Kafka 源始终是字节
    • 你好,OneCricketeer,我已经发布了关于 kafka 源的答案,这只是临时类型模式推断的附加信息:)
    • 对不起,当答案包含文件源时感到困惑......无论如何,我认为真正的答案是根本不将“动态” json 放入 Kafka 主题中;生产者的模式应该保持一致
    猜你喜欢
    • 2020-01-31
    • 1970-01-01
    • 2018-09-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-23
    • 2019-08-01
    相关资源
    最近更新 更多