【问题标题】:Creating edges based on vertices IDs spark scala基于顶点 ID 创建边 spark scala
【发布时间】:2018-04-17 16:07:13
【问题描述】:

我想基本上加入两个 RDD 的顶点和边。使用以下代码创建顶点和边:

val file = sc.textFile("file.gz") //This tab separated file has more than two columns among which only first two columns with source and destination URL are relevant 

val edges= file.flatMap(f => {
  val urls = f.split("\t")
  if (!(urls.length < 2)) 
{ Some(urls(0) +"\t"+ urls(1)) }
else None }).distinct

val vertices = edges.flatMap(f => f.split("\t")).distinct 
val vertices_zip = vertices.zipWithUniqueId

现在我有一个顶点 (URL) 列表,其中包含使用上述方法生成的 ID,例如:

google.de/2011/10/Extract-host,11
facebook.de/2014/11/photos,28         
community.cloudera.com/t5/,42         
facebook.de/2020/11/photos,91 

我想根据这些 ID 创建边。 Edges RDD 文件是制表符分隔的,如下所示:

google.de/2011/10/Extract-host   facebook.de/2014/11/photos   
facebook.de/2014/11/photos       community.cloudera.com/t5/,42
community.cloudera.com/t5/       google.de/2011/10/Extract-host

要求的结果:

11     28
28     42
42     11

我尝试了以下代码

val edges_id = edges.flatMap( line => line.split( "\t" ) ).map( line => ( line,0) ) .join(vert_zip).map(x=>x._2._2)

但没有得到想要的结果。我得到了

11
28
28
42
42
11

我不确定如何将边与顶点 RDD 连接以获得此结果。任何帮助将不胜感激。

【问题讨论】:

  • Edges RDD are tab separated like below。 RDD 不是制表符分隔的。你的意思是你有制表符分隔的文件吗?你能发布你用来创建vetices rdd和edges rdd的过程/代码吗?我认为一个简单的查找地图可以解决您的问题
  • @RameshMaharjan 我希望现在很清楚
  • 我已经在下面回答了:) 请检查

标签: scala apache-spark


【解决方案1】:

当您zipWithUniqueId 时,将 rdds 收集为地图,然后 使用该地图获取边 rdd 中的索引,如下所示

val vertices_zip = vertices.zipWithUniqueId.collectAsMap

val edges_id = edges.map(f => {
  val urls = f.split("\t")
  vertices_zip(urls(0))+"\t"+vertices_zip(urls(1))
})

仅此而已。希望回答对你有帮助

更新

你评论了

我得到一个异常:java.lang.OutOfMemoryError: Java heap space

为此,您可以使用广播,它将所需的 rdds 调用到执行程序内存而不是所有地图

val vertices_zip = sc.broadcast(vertices.zipWithUniqueId.collectAsMap)

val edges_id = edges.map(f => {
  val urls = f.split("\t")
  vertices_zip.value(urls(0))+"\t"+vertices_zip.value(urls(1))
})

加入

你又评论了

是否可以更改我上面尝试的代码以获得结果(带有连接的那个)?

加入方式需要两个连接,这意味着需要两次洗牌才能获得所需的结果

val vertices_zip = vertices.zipWithUniqueId

val edges_id = edges.map(line => {
  val splitted = line.split("\t")
  (splitted(0), splitted(1))
})
  .join(vertices_zip)
  .map(_._2)
  .join(vertices_zip)
  .map(x => x._2._1+"\t"+x._2._2)

【讨论】:

  • 它不工作。我收到一个异常:java.lang.OutOfMemoryError: Java heap space
  • 是否可以更改我上面尝试的代码以获得结果(带有连接的那个)?
  • 是的,我做到了。但是我的工作中止了。
猜你喜欢
  • 1970-01-01
  • 2019-06-04
  • 2015-02-27
  • 2021-12-28
  • 2016-08-02
  • 2021-09-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多