【问题标题】:How to insert processed spark stream into kafka如何将处理过的火花流插入kafka
【发布时间】:2021-05-05 10:16:52
【问题描述】:

我试图在使用下面的 sn-p 处理后将火花流插入 kafka

query = ds1 \
    .selectExpr("CAST(value AS STRING)")\
    .writeStream\
    .foreachBatch(do_something) \
    .format("kafka") \
    .option("topic","topic-name") \
    .option("kafka.bootstrap.servers", "borkers-IPs") \
    .option("checkpointLocation", "/home/location") \
    .start()

但它似乎插入的是原始流而不是处理过的流。

【问题讨论】:

  • 经纪人...经纪人?
  • 不知道 foreachbatch 以这种方式工作 - 即用于任意非标准接收器

标签: pyspark apache-kafka spark-structured-streaming


【解决方案1】:

如您所见,使用 foreachBatch 在这里没有效果。 Spark不会产生错误,它会像进入虚空一样。

引用手册:

结构化流 API 提供了两种方法来编写 对没有现有流的数据源的流式查询 接收器:foreachBatch() 和 foreach()。

这本优秀的读物正是您所寻找的。​​p>

https://aseigneurin.github.io/2018/08/14/kafka-tutorial-8-spark-structured-streaming.html

【讨论】:

    猜你喜欢
    • 2015-10-13
    • 2017-04-27
    • 2019-10-11
    • 1970-01-01
    • 2016-06-25
    • 1970-01-01
    • 1970-01-01
    • 2020-02-24
    • 1970-01-01
    相关资源
    最近更新 更多