【问题标题】:Send data to Kafka topics based on a condition in Dataframe根据 Dataframe 中的条件向 Kafka 主题发送数据
【发布时间】:2021-06-03 17:55:01
【问题描述】:

我想根据 SparkStreaming 中数据的值更改 Kafka 主题目标以保存数据。 有可能再次这样做吗? 当我尝试下面的代码时,它只执行了第一个,而没有执行较低的进程。

(testdf 
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
      )
            
(testdf 
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
      )

数据存储在主题名称 test 中。 谁能想到办法做到这一点?

我更改了目的地以保存这样的数据框。

|type|value|
| A  |testvalue|
| B  |testvalue|

类型 A 到主题测试。 键入 B 到主题 testB。

【问题讨论】:

  • 只要不是流式处理,我就可以去。

标签: apache-spark pyspark apache-kafka spark-structured-streaming spark-kafka-integration


【解决方案1】:

谢谢麦克。 我可以通过运行以下代码来实现这一点!

(
testdf 
  .withColumn("topic",f.when(f.col("testTime") == "A", f.lit("test")).otherwise(("testB")))
  .selectExpr("CAST(value as STRING) as value", "topic") 
  .writeStream
  .format("kafka") 
  .option("checkpointLocation", "/checkpoint_2") 
  .option("startingOffsets", "latest")
  .option("kafka.bootstrap.servers","9092")
  .start()
)

【讨论】:

    【解决方案2】:

    使用最新版本的 Spark,您只需在数据框中创建一个列 topic,用于将记录定向到相应的主题。

    在你的情况下,这意味着你可以做类似的事情

    testdf 
      .withColumn("topic", when(f.col("value") == "A", lit("test")).otherwise(lit("testB"))
      .selectExpr("CAST(value as STRING) as value", "topic") 
      .writeStream .format("kafka") 
      .option("checkpointLocation", "/checkpoint_1") 
      .option("kafka.bootstrap.servers","~~:9092")
      .start()
    

    【讨论】:

      猜你喜欢
      • 2018-08-03
      • 1970-01-01
      • 1970-01-01
      • 2021-11-16
      • 1970-01-01
      • 2021-11-24
      • 1970-01-01
      • 2021-05-18
      • 1970-01-01
      相关资源
      最近更新 更多