【发布时间】:2020-04-15 21:48:43
【问题描述】:
我正在尝试在 Spark 中创建一个从 Kafka 获取数据的流。当我检查 RDD 中的记录计数时,似乎计数与 Web UI 不同。
我为 DStream 中的所有 RDD 执行一个函数(代码在 Python 中生成):
rdds = KafkaUtils.createStream(...)
rdds = rdds.repartition(1)
rdds.foreachRDD(doJob)
doJob 函数我有一个循环和一个计数器
def doJob(time, p_rdd):
if not p_rdd.isEmpty:
batch_count = 0
...
...
rdd_collected = p_rdd.collect()
for record in rdd_collected:
...
...
batch_count = batch_count + 1
log("Count: " + str(batch_count))
我的期望是 batch_count 应该与 http://webui.adress/my_app_id/Streaming page -> Completed Batch section -> Input size 相同。但似乎他们不是。我应该在哪里检查 web ui 中的 RDD 记录计数,我错过了什么?
谢谢。
【问题讨论】:
标签: python apache-spark pyspark spark-streaming