【发布时间】:2017-06-17 12:38:25
【问题描述】:
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:
time, user, url, category。
我想通过bar.csv 的某些列过滤掉foo.csv。最后,我想要(用户,类别)的键/值对:[list,of,urls]。例如:
foo.csv:
11:50:00, 111, www.google.com, search
11:50:00, 222, www.espn.com, news
11:50:00, 333, www.reddit.com, news
11:50:00, 444, www.amazon.com, store
11:50:00, 111, www.bing.com, search
11:50:00, 222, www.cnn.com, news
11:50:00, 333, www.aol.com, news
11:50:00, 444, www.jet.com, store
11:50:00, 111, www.yahoo.com, search
11:50:00, 222, www.bbc.com, news
11:50:00, 333, www.nytimes.com, news
11:50:00, 444, www.macys.com, store
bar.csv:
11:50:00, 222, www.bbc.com, news
11:50:00, 444, www.yahoo.com, store
应该导致:
{
(111, search):[www.google.com, www.bing.com, www.yahoo.com],
(333, news): [www.reddit.com, www.aol.com, www.nytimes.com]
}
换句话说,如果bar.csv 中存在(用户,类别)对,我想过滤掉foo.csv 中的所有行,如果它们具有相同的(用户,类别)对。因此在上面的例子中,我想用(222, news) 和(444, store) 删除foo.csv 中的所有行。最终,在我删除我想要的行之后,我想要一个包含键/值对的字典,例如:(user, category): [list, of, urls]。
这是我的代码:
fooRdd = sc.textFile("file:///foo.txt/")
barRdd = sc.textFile("file:///bar.txt/")
parseFooRdd= fooRdd.map(lambda line: line.split(", "))
parseBarRdd = barRdd.map(lambda line: line.split(", "))
# (n[1] = user_id, n[3] = category_id) --> [n[2] = url]
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
barGroupRdd = parseBarRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
上面的代码工作并以我想要的格式获取数据集:
(user_id, category): [all, urls, visited, by, user, in, that, category]
但是,有几个问题:1)我认为它返回一个字典列表,其中只有一对 k/v 和 2)我不知道下一步该做什么。我知道用英语做什么:获取barGroupRdd(元组)中的键,并删除 fooGroupRdd 中具有相同键的所有行。但我是 pyspark 的新手,我觉得有些命令我没有利用。我认为我的代码可以优化。例如,我认为我不需要创建 barGroupRdd 行,因为我需要来自 bar.csv 的只是 (user_id, category) - 我不需要创建字典。我也认为我应该先过滤掉,然后然后从结果中创建字典。感谢您提供任何帮助或建议,谢谢!
【问题讨论】:
标签: python apache-spark mapreduce pyspark