【发布时间】:2017-04-09 23:03:03
【问题描述】:
是否可以将updateStateByKey() 函数与元组作为值一起使用?我正在使用 PySpark,我的输入是 (word, (count, tweet_id)),这意味着 word 是一个键,而一个元组 (count, tweet_id) 是一个值。 updateStateByKey 的任务是对每个单词求和它们的计数并创建一个包含该单词的所有 tweet_ids 的列表。
我实现了以下更新功能,但是我发现错误列表索引超出了new_values 的范围,索引为 1:
def updateFunc(new_values, last_sum):
count = 0
tweets_id = []
if last_sum:
count = last_sum[0]
tweets_id = last_sum[1]
return sum(new_values[0]) + count, tweets_id.extend(new_values[1])
并调用方法:
running_counts.updateStateByKey(updateFunc)
【问题讨论】:
-
你能分享一下 pyspark 代码吗?我可以自己试试这样的例子。想知道为什么给出 -1
-
谢谢,如果减去 1 会尝试摆脱
标签: python apache-spark pyspark spark-streaming