【问题标题】:How to iterate on each line of a rdd which contains textFile如何迭代包含 textFile 的 rdd 的每一行
【发布时间】:2017-04-17 10:41:11
【问题描述】:

我正在尝试做这样的事情

文件 = sc.textFile('mytextfile') 定义我的函数(我的字符串): 新值 = 我的字符串 对于我在 file.toLocalIterator() 如果我在我的字符串中: 新值 = 我 返回新值; rdd_row = some_data_frame.map(lambda u: Row(myfunction(u.column_name)))

但是我得到了这个错误

您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它在工作人员上运行的代码中

【问题讨论】:

  • 如果你能解释你想要达到的目标会更好,然后你也可以尝试提供帮助。
  • 我有一个文本文件,它实际上是一个黑名单,我有一个数据框,其中有一列包含 url。我想解析每个网址并检查它是否在黑名单中。如果是,我将替换该值,否则我将保持相同的值。

标签: python-2.7 apache-spark pyspark


【解决方案1】:

问题是(正如错误消息中明确指出的那样)您正在尝试使用地图内的 RDD。文件是一个 RDD。它可以对其进行各种转换(例如,您正在尝试对其进行本地迭代器)。但是您正试图在另一个 - 地图中使用转换。

更新

如果我理解正确,您有一个带有列 URL 的数据框 df。您还有一个包含黑名单值的文本文件。 让我们假设您的黑名单文件是一个带有 blacklistNames 列的 csv,并且数据框 df 的 URL 列已经被解析。即您只想检查 URL 是否在 blacklistNames 列中。

你可以这样做:

df.join(blackListDF, df["URL"]==blackListDF["blacklistNames"], "left_outer")

此连接基本上会在您的原始数据框中添加一个 blacklistNames 列,如果它在黑名单中,它将包含匹配的名称,否则为 null。现在您需要做的就是根据新列是否为空进行过滤。

【讨论】:

  • 感谢您的回复。是的,我明白了,所以我的问题是,还有另一种方法吗?我可以将我的 texfile 的每个元素放在一个列表中并对其进行迭代,但是处理起来需要很长时间,因为我要检查一百万个 url,并且文本文件中有 85000 个元素
  • 是的,这是个好主意,但问题是,通过连接比较两个值是否严格相等,我想检查 url 值是否包含黑名单中的值。你看出区别了吗?
  • 那么我建议你使用外部程序将黑名单转换为一些文本分析器,然后使用地图。例如使用 aho-corasick 或类似的算法进行测试。取 850K 个元素,从一个子集生成一个 trie,该子集可以适合单个 executor 的内存,并继续做同样的事情,直到你覆盖所有内容。然后在每次测试相关子集时运行多个过滤器。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-06-25
  • 2016-06-03
  • 2022-01-19
  • 1970-01-01
  • 1970-01-01
  • 2014-11-20
  • 2017-05-29
相关资源
最近更新 更多