【问题标题】:Spark Streaming updateStateByKey with tuple as a value以元组为值的 Spark Streaming updateStateByKey
【发布时间】:2017-04-09 23:03:03
【问题描述】:

是否可以将updateStateByKey() 函数与元组作为值一起使用?我正在使用 PySpark,我的输入是 (word, (count, tweet_id)),这意味着 word 是一个键,而一个元组 (count, tweet_id) 是一个值。 updateStateByKey 的任务是对每个单词求和它们的计数并创建一个包含该单词的所有 tweet_ids 的列表。

我实现了以下更新功能,但是我发现错误列表索引超出了new_values 的范围,索引为 1:

def updateFunc(new_values, last_sum):
  count = 0
  tweets_id = []
  if last_sum:
    count = last_sum[0]
    tweets_id = last_sum[1]
  return sum(new_values[0]) + count, tweets_id.extend(new_values[1])

并调用方法:

running_counts.updateStateByKey(updateFunc)

【问题讨论】:

标签: python apache-spark pyspark spark-streaming


【解决方案1】:

我找到了解决方案。问题出在checkpointing 上,这意味着如果发生故障,当前状态会保留在磁盘上。它引起了问题,因为当我更改状态的定义时,在检查点它处于没有元组的旧状态。因此,我从磁盘中删除了检查点并将最终解决方案实现为:

def updateFunc(new_values, last_sum):
  count = 0
  counts = [field[0] for field in new_values]
  ids = [field[1] for field in new_values]
  if last_sum:
    count = last_sum[0]
    new_ids = last_sum[1] + ids
  else:
    new_ids = ids
  return sum(counts) + count, new_ids

最后,我的问题的答案是:是的,状态可以是元组或任何其他用于存储更多值的数据类型。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-02-16
    • 1970-01-01
    • 1970-01-01
    • 2017-10-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-06
    相关资源
    最近更新 更多