【发布时间】:2021-06-03 17:55:01
【问题描述】:
我想根据 SparkStreaming 中数据的值更改 Kafka 主题目标以保存数据。 有可能再次这样做吗? 当我尝试下面的代码时,它只执行了第一个,而没有执行较低的进程。
(testdf
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
)
(testdf
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
)
数据存储在主题名称 test 中。 谁能想到办法做到这一点?
我更改了目的地以保存这样的数据框。
|type|value|
| A |testvalue|
| B |testvalue|
类型 A 到主题测试。 键入 B 到主题 testB。
【问题讨论】:
-
只要不是流式处理,我就可以去。
标签: apache-spark pyspark apache-kafka spark-structured-streaming spark-kafka-integration