【问题标题】:Spark UDF optimization for Graph Database (Neo4j) inserts图形数据库 (Neo4j) 插入的 Spark UDF 优化
【发布时间】:2016-06-23 16:47:32
【问题描述】:

如果我错过了一些信息和平庸的格式,这是我发布的第一个问题。如果需要,我可以更新。

我会尝试添加尽可能多的细节。我有一个不太优化的 Spark 作业,它将 RDBMS 数据转换为 Neo4j 中的图形节点和关系。

为此。以下是我遵循的步骤:

  1. 使用 spark sql 和连接创建非规范化数据帧“数据”。
  2. “数据”中的每个行都运行一个 graphInsert 函数,该函数执行以下操作:

    一个。读取行的内容
    b.制定一个 Neo4j 密码查询(我们使用 Merge 命令,这样我们只有一个城市,例如在 Neo4j 中创建芝加哥,而芝加哥将出现在 RDBMS 表中的多行中)
    c。连接到 neo4j
    d.执行查询
    e.断开与 Neo4j 的连接

这是我面临的问题列表。

  1. 插入速度很慢。

我知道 Merge 查询比 create 慢,但有没有其他方法可以代替为每条记录连接和断开连接?这是我的第一个代码草案,也许我正在努力如何使用一个连接从不同 Spark 工作节点上的多个线程插入。因此,为每条记录连接和断开连接。

  1. 作业不可扩展。它仅在 1 个核心下运行良好。一旦我使用 2 个 spark 核心运行作业,我就会突然得到 2 个同名的城市,即使我正在运行合并查询。例如有 2 个芝加哥城市违反了 Merge 的使用。我假设 Merge 的功能类似于“如果不存在则创建”。

我不知道我在 neo4j 部分或 spark 中的实现是否错误。如果有人可以指导我查看任何有助于我以更好的规模实现这一点的文档,那将很有帮助,因为我有一个大的火花集群,我需要充分利用它来完成这项工作。

如果您有兴趣查看代码而不是算法。这是 scala 中的 graphInsert 实现:

class GraphInsert extends Serializable{
   var case_attributes = new Array[String](4)
   var city_attributes = new Array[String](2)
   var location_attributes = new Array[String](20)
   var incident_attributes = new Array[String](20)
   val prop = new Properties()
   prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
   // properties Neo4j
   val url_neo4j = prop.getProperty("url_neo4j")
   val neo4j_user = prop.getProperty("neo4j_user")
   val neo4j_password = prop.getProperty("neo4j_password")


   def graphInsert(data : Row){  
      val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0)  + ":'" +data(11) + "'," +case_attributes(1)  + ":'" +data(13)  + "'," +case_attributes(2)  + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0)  + ":" +data(0) + "," +incident_attributes(1)  + ":" +data(2)  + "," +incident_attributes(2)  + ":'" +data(3) +  "'," +incident_attributes(3)  + ":'" +data(8)+  "'," +incident_attributes(4)  + ":" +data(5) +  "," +incident_attributes(5)  + ":'" +data(4) +  "'," +incident_attributes(6)  + ":'" +data(6) +  "'," +incident_attributes(7)  + ":'" +data(1) +  "'," +incident_attributes(8)  + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0)  + ":" +data(9) + "," +location_attributes(1)  + ":" +data(10)  + "," +location_attributes(2)  + ":'" +data(19) +  "'," +location_attributes(3)  + ":'" +data(20)+  "'," +location_attributes(4)  + ":" +data(18) +  "," +location_attributes(5)  + ":" +data(21) +  "," +location_attributes(6)  + ":'" +data(17) +  "'," +location_attributes(7)  + ":" +data(22) +  "," +location_attributes(8)  + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
              println(query)
              try{
                      var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
                          var stmt = con.createStatement()
                          var rs = stmt.executeQuery(query)
                          con.close()
              }catch{
              case ex: SQLException =>{
                  println(ex.getMessage)
              }
              }
  } 

def operations(sqlContext: SQLContext){
    ....
    #Get 'data' before this step
    city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
    case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
    location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
    incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()

    data.foreach(graphInsert)

}

object GraphObject {
  def main(args: Array[String]) {  
      val conf = new SparkConf()
        .setAppName("GraphNeo4j")
        .setMaster("xyz")
        .set("spark.cores.max","2")
        .set("spark.executor.memory","10g")

      Logger.getLogger("org").setLevel(Level.ERROR)
      Logger.getLogger("akka").setLevel(Level.ERROR)
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val graph = new GraphInsert()
      graph.operations(sqlContext)

  }
}

【问题讨论】:

  • 使用 Spark 准备数据 -> 转储到 csv -> 批量导入?
  • 这个测试应该是大数据可扩展的。现在我有 50k 条记录。但它应该有数百万。我一直认为一个包含如此大量数据的 csv 文件不会是最佳的大数据解决方案。对此有何建议?举个例子,如果我将城市案例、位置和事件合并在一起,每个都有 10 GB 的信息,并假设它们都有相似的大小。非规范化/处理后的信息将有接近 40GB 的信息。
  • 话虽如此。我绝对可以将其映射并减少到多个 csv 文件并上传。但是任何 cmet 在不转储到磁盘的情况下改进工作?
  • 是的,很多 cmets 首先您正在多次收集数据!这是很多工作只是为了插入数据。使用 SPARK UI 优化代码。不要多次收集数据,而不是收集使用 map 并使用 map 和 flatMap 组合您的数据,然后在其中调用 GraphinsertFunction 这将在很大程度上优化您的代码!之后,您仍然要优化从该函数中提取 get 连接并首次打开它,然后使用时间延迟关闭它。
  • 出于其他原因,我在 4 个地方收集元数据。它只是有助于构建查询。那里的记录不超过10条。它基本上只是获取列列表。所以删除收集不会有太大帮助。大量数据位于“数据”数据框中,graphInsert 运行多次。您的评论很有帮助,我将有一次初始化打开和关闭连接。您可能回答了我的问题 1。任何关于问题编号的 cmets。 2 ?

标签: scala apache-spark neo4j parallel-processing data-ingestion


【解决方案1】:

无论你在闭包中写什么,即它需要在 Worker 上执行,都会被分发。 你可以在这里阅读更多信息:http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka

当你增加核心数量时,我认为它一定不会影响应用程序,因为如果你不指定它!然后它采取贪婪的方法!我希望这份文档对您有所帮助。

【讨论】:

  • 谢谢!这正是正在发生的事情,但我没有做任何计数器或累加器。我只关心在 neo4j 上执行的查询。如果我必须重组我的方法,我应该如何防止创建重复节点?
  • .set("spark.cores.max","2") 是创建重复节点的原因。如果它设置为 1,那么作业运行得非常好。
  • 了解闭包以及驱动程序上运行的内容以及 Worker 上运行的内容。然后你就会明白整个概念!
【解决方案2】:

我已经完成了对流程的改进,但没有什么能像 Cypher 中的 LOAD 命令一样快。 希望这对某人有所帮助: 使用foreachPartition 而不是foreach 在执行此类过程时会获得显着收益。还使用密码添加定期提交。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-04-10
    • 1970-01-01
    • 1970-01-01
    • 2015-11-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多