【问题标题】:Optimizations for merging graph with million nodes and csv with million rows合并百万节点的图和百万行的csv的优化
【发布时间】:2018-05-03 21:40:07
【问题描述】:

考虑我有以下节点和关系的图表:

(:GPE)-[:contains]->(:PE)-[:has]->(:E)

具有以下属性:

GPE: joinProp1, outProp1, randomProps
PE : joinProp2, outProp2, randomProps
E  : joinProp3, outProp3, randomProps

现在考虑我有以下格式的 csv:

joinCol1, joinCol2, joinCol3, outCol1, outCol2, outCol3, randomProps

现在考虑我在这个 csv 文件中有数百万行。此外,我在图中每个(:GPE),(:PE),(:E) 都有数百万个实例。我想将图形和 csv 合并到新的 csv 中。为此,我想映射/等同

  • joinCol1 和 joinProp1
  • joinCol2 和 joinProp2
  • joinCol3 和 joinProp3

csv 中每一行的类似(伪密码):

MATCH (gpe:GPE {joinProp1:joinCol1})-[:contains]->(pe:PE {joinProp2:joinCol2})-[:has]->(e:E {joinProp3:joinCol3}) RETURN gpe.outProp1, pe.outProp2, e.outProp3

所以输出的 csv 格式是:

joinCol1, joinCol2, joinCol3, outCol1, outCol2, outCol3, outProp1, outProp2, outProp3

如果我在所有 joinProps 上创建索引并使用参数化密码(考虑到我正在使用 java api 实现这个简单的逻辑),我可以在其中完成此任务的粗略最小执行时间估计(分钟或小时)是多少。我只想知道什么是粗略的估计。我们实现了类似(可能未优化)的任务,这需要几个小时才能完成。挑战在于缩短执行时间。我可以做些什么来优化并将执行时间缩短到几分钟?任何快速优化点/链接?使用除 java api 之外的其他方法会提高性能吗?

【问题讨论】:

  • 我不知道这是否可以帮助你,但前段时间我写了一篇关于在具有巨大关系的大型数据集上提高 Neo4j 性能的答案。希望对您有所帮助:stackoverflow.com/questions/45770769/…

标签: neo4j cypher


【解决方案1】:

好吧,我尝试了一些可以大大提高性能的方法。

一些适用于我的场景的 neo4j 性能指南:

  1. 批量处理:避免对每个 csv 行进行 cypher 调用(通过 bolt api 调用)。遍历几个固定数量的 csv 行,形成地图列表,其中每个地图将是 csv 行。然后将此映射列表作为参数传递给 cypher。 UNWIND 密码内的此列表并执行所需的操作。对下一组 csv 行重复相同的操作。

  2. 不要将节点关系对象从 cypher 返回到 java 端。而是尝试返回所需的地图列表作为最终输出。当我们返回节点/关系列表时,我们可能必须通过它们重申将属性与 csv 列合并以形成最终输出行(或映射)

  3. 将 csv 列值传递给 cypher: 要实现第 2 点,请将 csv 列值(要与图形属性合并)发送到 cypher。在密码中执行匹配,并通过合并匹配节点的属性和输入 csv 列来形成输出映射。

  4. 要匹配的索引节点/关系属性 (Official docs)

  5. 参数化密码API ExampleOfficial docs

我做了一个快速肮脏的实验,如下所述。

我的输入 csv 看起来像这样。

inputDataCsv.csv

csv-id1,csv-id2,csv-id3,outcol1,outcol2,outcol3
gppe0,ppe1,pe2,outcol1-val-0,outcol2-val-1,outcol3-val-2
gppe3,ppe4,pe5,outcol1-val-3,outcol2-val-4,outcol3-val-5
gppe6,ppe7,pe8,outcol1-val-6,outcol2-val-7,outcol3-val-8
...

为了创建图表,我首先创建了表单的 csvs:

gppe.csv(实体)

gppe0,gppe_out_prop_1_val_0,gppe_out_prop_2_val_0,gppe_prop_X_val_0
gppe3,gppe_out_prop_1_val_3,gppe_out_prop_2_val_3,gppe_prop_X_val_3
gppe6,gppe_out_prop_1_val_6,gppe_out_prop_2_val_6,gppe_prop_X_val_6
...

ppe.csv(实体)

ppe1,ppe_out_prop_1_val_1,ppe_out_prop_2_val_1,ppe_prop_X_val_1
ppe4,ppe_out_prop_1_val_4,ppe_out_prop_2_val_4,ppe_prop_X_val_4
ppe7,ppe_out_prop_1_val_7,ppe_out_prop_2_val_7,ppe_prop_X_val_7
...

pe.csv(实体)

pe2,pe_out_prop_1_val_2,pe_out_prop_2_val_2,pe_prop_X_val_2
pe5,pe_out_prop_1_val_5,pe_out_prop_2_val_5,pe_prop_X_val_5
pe8,pe_out_prop_1_val_8,pe_out_prop_2_val_8,pe_prop_X_val_8
...

gppeHasPpe.csv(关系)

gppe0,ppe1
gppe3,ppe4
gppe6,ppe7
...

ppeContainsPe.csv(关系)

ppe1,pe2
ppe4,pe5
ppe7,pe8
...

我在 neo4j 中加载如下:

USING PERIODIC COMMIT  
LOAD CSV FROM 'file:///gppe.csv' AS line    
CREATE (:GPPocEntity {id:line[0],gppe_out_prop_1: line[1], gppe_out_prop_2: line[2],gppe_out_prop_X: line[3]})   

USING PERIODIC COMMIT    
LOAD CSV FROM 'file:///ppe.csv' AS line    
CREATE (:PPocEntity {id:line[0],ppe_out_prop_1: line[1], ppe_out_prop_2: line[2],ppe_out_prop_X: line[3]})

USING PERIODIC COMMIT    
LOAD CSV FROM 'file:///pe.csv' AS line    
CREATE (:PocEntity {id:line[0],pe_out_prop_1: line[1], pe_out_prop_2: line[2],pe_out_prop_X: line[3]})

USING PERIODIC COMMIT
LOAD CSV FROM 'file:///gppeHasPpe.csv' AS line
MATCH(gppe:GPPocEntity {id:line[0]})
MATCH(ppe:PPocEntity {id:line[1]})
MERGE (gppe)-[:has]->(ppe)

USING PERIODIC COMMIT
LOAD CSV FROM 'file:///ppeContainsPe.csv' AS line
MATCH(ppe:PPocEntity {id:line[0]})
MATCH(pe:PocEntity {id:line[1]})
MERGE (ppe)-[:contains]->(pe)

接下来我在查找属性上创建了索引:

CREATE INDEX ON :GPPocEntity(id)

CREATE INDEX ON :PPocEntity(id)

CREATE INDEX ON :PocEntity(id)

以下是将 csv 读入地图列表的实用程序类:

package csv2csv;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.w3c.dom.stylesheets.LinkStyle;

public class Config {

    String configFilePath;
    Map csvColumnToGraphNodeMapping;
    List<Map> mappingsGraphRelations;
    Map<String,Map<String,String>> mappingsGraphRelationsMap = new HashMap<String, Map<String,String>>(); 
    List<String> outputColumnsFromCsv;
    Map outputColumnsFromGraph;

    public Config(String pConfigFilePath) {
        configFilePath = pConfigFilePath;

        JSONParser parser = new JSONParser();

        try {

            Object obj = parser.parse(new FileReader(configFilePath));

            JSONObject jsonObject = (JSONObject) obj;

            csvColumnToGraphNodeMapping = (HashMap) ((HashMap) jsonObject.get("csvColumn-graphNodeProperty-mapping"))
                    .get("mappings");
            mappingsGraphRelations = (ArrayList) ((HashMap) jsonObject.get("csvColumn-graphNodeProperty-mapping"))
                    .get("mappings-graph-relations");
            for(Map m : mappingsGraphRelations)
            {
                mappingsGraphRelationsMap.put(""+ m.get("start-entity") + "-" + m.get("end-entity"), m);
            }
            outputColumnsFromCsv = (ArrayList) ((HashMap) jsonObject.get("output-csv-columns"))
                    .get("columns-from-input-csv");
            outputColumnsFromGraph = (HashMap) ((HashMap) jsonObject.get("output-csv-columns"))
                    .get("columns-from-graph");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }
}

下面的类执行合并并创建另一个 csv:

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.internal.value.NodeValue;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;

import org.apache.commons.lang3.time.StopWatch;

public class Csv2CsvUtil2 {
    static String inCsvFilePath = "D:\\Mahesh\\work\\files\\inputDataCsv.csv";
    static String outCsvFilePath = "D:\\Mahesh\\work\\files\\csvout.csv";

    private final static Driver driver = GraphDatabase.driver(
              "bolt://localhost:7687", AuthTokens.basic("neo4j", "password"));   

    public static void main(String[] args) throws FileNotFoundException, IOException {
        mergeNonBatch();    
    }

    private static void merge() throws FileNotFoundException, IOException
    {       
        List<Map<String,String>> csvRowMapList = new CsvReader(inCsvFilePath).getMapListFromCsv();
        Session session = driver.session();
        String cypherFilter = "";
        String cypher;
        PrintWriter pw = new PrintWriter(new File(outCsvFilePath));
        StringBuilder sb = new StringBuilder();

        List<Map<String, String>> inputMapList = new ArrayList<Map<String,String>>();
        Map<String,Object> inputMapListMap = new HashMap<String,Object>();
        Map<String, Object> params = new HashMap<String, Object>(); 
        cypher = "WITH {inputMapList} AS inputMapList"
                + " UNWIND inputMapList AS rowMap"
                + " WITH rowMap"
                + " MATCH (gppe:GPPocEntity {id:rowMap.csvid1})-[:has]->(ppe:PPocEntity {id:rowMap.csvid2})-[:contains]->(pe:PocEntity {id:rowMap.csvid3})"
                + " RETURN {id1:gppe.id,id2:ppe.id,id3:pe.id"
                +           ",gppeprop1: gppe.gppe_out_prop_1,gppeprop2: gppe.gppe_out_prop_2"
                +           ",ppeprop1: ppe.ppe_out_prop_1,ppeprop2: ppe.ppe_out_prop_2"
                +           ",peprop1: pe.pe_out_prop_1,peprop2: pe.pe_out_prop_2"
                +           ",outcol1:rowMap.outcol1,outcol2:rowMap.outcol2,outcol3:rowMap.outcol3}";

        int i;
        for(i=0;i<csvRowMapList.size();i++)
        {
            Map<String, String> rowMap = new HashMap<String, String>();
            rowMap.put("csvid1", csvRowMapList.get(i).get("csv-id1"));
            rowMap.put("csvid2", csvRowMapList.get(i).get("csv-id2"));
            rowMap.put("csvid3", csvRowMapList.get(i).get("csv-id3"));
            rowMap.put("outcol1", csvRowMapList.get(i).get("outcol1"));
            rowMap.put("outcol2", csvRowMapList.get(i).get("outcol2"));
            rowMap.put("outcol3", csvRowMapList.get(i).get("outcol3"));
            inputMapList.add(rowMap);

            if(i%10000 == 0)  //run in batch
            {
                inputMapListMap.put("inputMapList", inputMapList);
                StatementResult stmtRes = session.run(cypher,inputMapListMap);
                List<Record> retList = stmtRes.list();
                for (Record record2 : retList) {
                    MapValue retMap = (MapValue) record2.get(0);
                    sb.append(retMap.get("id1")
                            +","+retMap.get("id2")
                            +","+retMap.get("id3")
                            +","+retMap.get("gppeprop1")
                            +","+retMap.get("gppeprop2")
                            +","+retMap.get("ppeprop1")
                            +","+retMap.get("ppeprop2")
                            +","+retMap.get("peprop1")
                            +","+retMap.get("peprop2")
                            +","+retMap.get("outcol1")
                            +","+retMap.get("outcol2")
                            +","+retMap.get("outcol3")
                            +"\n"
                            );  
                }
                inputMapList.clear();
            }
        }

        if(inputMapList.size() != 0)  //ingest remaining rows which does not complete 
                                      //10000 reords failing to form next batch 
        {
            inputMapListMap.put("inputMapList", inputMapList);
            StatementResult stmtRes = session.run(cypher,inputMapListMap);
            List<Record> retList = stmtRes.list();

            for (Record record2 : retList) {
                MapValue retMap = (MapValue) record2.get(0);
                sb.append(retMap.get("id1")
                        +","+retMap.get("id2")
                        +","+retMap.get("id3")
                        +","+retMap.get("gppeprop1")
                        +","+retMap.get("gppeprop2")
                        +","+retMap.get("ppeprop1")
                        +","+retMap.get("ppeprop2")
                        +","+retMap.get("peprop1")
                        +","+retMap.get("peprop2")
                        +","+retMap.get("outcol1")
                        +","+retMap.get("outcol2")
                        +","+retMap.get("outcol3")
                        +"\n"
                        );  
            }
        }
        pw.write(sb.toString());
        pw.close();
    }
}

在非批处理模式下运行会减慢 10 倍的速度。当没有遵循指南 2 到 4 时,速度会慢很多。如果以上所有事情都正确,如果我犯了任何错误,如果我遗漏了什么,如果可以进一步改进,我会爱一个人。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-12-14
    • 2023-01-05
    • 2017-05-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多