好吧,我尝试了一些可以大大提高性能的方法。
一些适用于我的场景的 neo4j 性能指南:
批量处理:避免对每个 csv 行进行 cypher 调用(通过 bolt api 调用)。遍历几个固定数量的 csv 行,形成地图列表,其中每个地图将是 csv 行。然后将此映射列表作为参数传递给 cypher。 UNWIND 密码内的此列表并执行所需的操作。对下一组 csv 行重复相同的操作。
不要将节点关系对象从 cypher 返回到 java 端。而是尝试返回所需的地图列表作为最终输出。当我们返回节点/关系列表时,我们可能必须通过它们重申将属性与 csv 列合并以形成最终输出行(或映射)
将 csv 列值传递给 cypher: 要实现第 2 点,请将 csv 列值(要与图形属性合并)发送到 cypher。在密码中执行匹配,并通过合并匹配节点的属性和输入 csv 列来形成输出映射。
要匹配的索引节点/关系属性 (Official docs)
参数化密码(API Example,Official docs)
我做了一个快速肮脏的实验,如下所述。
我的输入 csv 看起来像这样。
inputDataCsv.csv
csv-id1,csv-id2,csv-id3,outcol1,outcol2,outcol3
gppe0,ppe1,pe2,outcol1-val-0,outcol2-val-1,outcol3-val-2
gppe3,ppe4,pe5,outcol1-val-3,outcol2-val-4,outcol3-val-5
gppe6,ppe7,pe8,outcol1-val-6,outcol2-val-7,outcol3-val-8
...
为了创建图表,我首先创建了表单的 csvs:
gppe.csv(实体)
gppe0,gppe_out_prop_1_val_0,gppe_out_prop_2_val_0,gppe_prop_X_val_0
gppe3,gppe_out_prop_1_val_3,gppe_out_prop_2_val_3,gppe_prop_X_val_3
gppe6,gppe_out_prop_1_val_6,gppe_out_prop_2_val_6,gppe_prop_X_val_6
...
ppe.csv(实体)
ppe1,ppe_out_prop_1_val_1,ppe_out_prop_2_val_1,ppe_prop_X_val_1
ppe4,ppe_out_prop_1_val_4,ppe_out_prop_2_val_4,ppe_prop_X_val_4
ppe7,ppe_out_prop_1_val_7,ppe_out_prop_2_val_7,ppe_prop_X_val_7
...
pe.csv(实体)
pe2,pe_out_prop_1_val_2,pe_out_prop_2_val_2,pe_prop_X_val_2
pe5,pe_out_prop_1_val_5,pe_out_prop_2_val_5,pe_prop_X_val_5
pe8,pe_out_prop_1_val_8,pe_out_prop_2_val_8,pe_prop_X_val_8
...
gppeHasPpe.csv(关系)
gppe0,ppe1
gppe3,ppe4
gppe6,ppe7
...
ppeContainsPe.csv(关系)
ppe1,pe2
ppe4,pe5
ppe7,pe8
...
我在 neo4j 中加载如下:
USING PERIODIC COMMIT
LOAD CSV FROM 'file:///gppe.csv' AS line
CREATE (:GPPocEntity {id:line[0],gppe_out_prop_1: line[1], gppe_out_prop_2: line[2],gppe_out_prop_X: line[3]})
USING PERIODIC COMMIT
LOAD CSV FROM 'file:///ppe.csv' AS line
CREATE (:PPocEntity {id:line[0],ppe_out_prop_1: line[1], ppe_out_prop_2: line[2],ppe_out_prop_X: line[3]})
USING PERIODIC COMMIT
LOAD CSV FROM 'file:///pe.csv' AS line
CREATE (:PocEntity {id:line[0],pe_out_prop_1: line[1], pe_out_prop_2: line[2],pe_out_prop_X: line[3]})
USING PERIODIC COMMIT
LOAD CSV FROM 'file:///gppeHasPpe.csv' AS line
MATCH(gppe:GPPocEntity {id:line[0]})
MATCH(ppe:PPocEntity {id:line[1]})
MERGE (gppe)-[:has]->(ppe)
USING PERIODIC COMMIT
LOAD CSV FROM 'file:///ppeContainsPe.csv' AS line
MATCH(ppe:PPocEntity {id:line[0]})
MATCH(pe:PocEntity {id:line[1]})
MERGE (ppe)-[:contains]->(pe)
接下来我在查找属性上创建了索引:
CREATE INDEX ON :GPPocEntity(id)
CREATE INDEX ON :PPocEntity(id)
CREATE INDEX ON :PocEntity(id)
以下是将 csv 读入地图列表的实用程序类:
package csv2csv;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.w3c.dom.stylesheets.LinkStyle;
public class Config {
String configFilePath;
Map csvColumnToGraphNodeMapping;
List<Map> mappingsGraphRelations;
Map<String,Map<String,String>> mappingsGraphRelationsMap = new HashMap<String, Map<String,String>>();
List<String> outputColumnsFromCsv;
Map outputColumnsFromGraph;
public Config(String pConfigFilePath) {
configFilePath = pConfigFilePath;
JSONParser parser = new JSONParser();
try {
Object obj = parser.parse(new FileReader(configFilePath));
JSONObject jsonObject = (JSONObject) obj;
csvColumnToGraphNodeMapping = (HashMap) ((HashMap) jsonObject.get("csvColumn-graphNodeProperty-mapping"))
.get("mappings");
mappingsGraphRelations = (ArrayList) ((HashMap) jsonObject.get("csvColumn-graphNodeProperty-mapping"))
.get("mappings-graph-relations");
for(Map m : mappingsGraphRelations)
{
mappingsGraphRelationsMap.put(""+ m.get("start-entity") + "-" + m.get("end-entity"), m);
}
outputColumnsFromCsv = (ArrayList) ((HashMap) jsonObject.get("output-csv-columns"))
.get("columns-from-input-csv");
outputColumnsFromGraph = (HashMap) ((HashMap) jsonObject.get("output-csv-columns"))
.get("columns-from-graph");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
}
下面的类执行合并并创建另一个 csv:
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.internal.value.NodeValue;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.apache.commons.lang3.time.StopWatch;
public class Csv2CsvUtil2 {
static String inCsvFilePath = "D:\\Mahesh\\work\\files\\inputDataCsv.csv";
static String outCsvFilePath = "D:\\Mahesh\\work\\files\\csvout.csv";
private final static Driver driver = GraphDatabase.driver(
"bolt://localhost:7687", AuthTokens.basic("neo4j", "password"));
public static void main(String[] args) throws FileNotFoundException, IOException {
mergeNonBatch();
}
private static void merge() throws FileNotFoundException, IOException
{
List<Map<String,String>> csvRowMapList = new CsvReader(inCsvFilePath).getMapListFromCsv();
Session session = driver.session();
String cypherFilter = "";
String cypher;
PrintWriter pw = new PrintWriter(new File(outCsvFilePath));
StringBuilder sb = new StringBuilder();
List<Map<String, String>> inputMapList = new ArrayList<Map<String,String>>();
Map<String,Object> inputMapListMap = new HashMap<String,Object>();
Map<String, Object> params = new HashMap<String, Object>();
cypher = "WITH {inputMapList} AS inputMapList"
+ " UNWIND inputMapList AS rowMap"
+ " WITH rowMap"
+ " MATCH (gppe:GPPocEntity {id:rowMap.csvid1})-[:has]->(ppe:PPocEntity {id:rowMap.csvid2})-[:contains]->(pe:PocEntity {id:rowMap.csvid3})"
+ " RETURN {id1:gppe.id,id2:ppe.id,id3:pe.id"
+ ",gppeprop1: gppe.gppe_out_prop_1,gppeprop2: gppe.gppe_out_prop_2"
+ ",ppeprop1: ppe.ppe_out_prop_1,ppeprop2: ppe.ppe_out_prop_2"
+ ",peprop1: pe.pe_out_prop_1,peprop2: pe.pe_out_prop_2"
+ ",outcol1:rowMap.outcol1,outcol2:rowMap.outcol2,outcol3:rowMap.outcol3}";
int i;
for(i=0;i<csvRowMapList.size();i++)
{
Map<String, String> rowMap = new HashMap<String, String>();
rowMap.put("csvid1", csvRowMapList.get(i).get("csv-id1"));
rowMap.put("csvid2", csvRowMapList.get(i).get("csv-id2"));
rowMap.put("csvid3", csvRowMapList.get(i).get("csv-id3"));
rowMap.put("outcol1", csvRowMapList.get(i).get("outcol1"));
rowMap.put("outcol2", csvRowMapList.get(i).get("outcol2"));
rowMap.put("outcol3", csvRowMapList.get(i).get("outcol3"));
inputMapList.add(rowMap);
if(i%10000 == 0) //run in batch
{
inputMapListMap.put("inputMapList", inputMapList);
StatementResult stmtRes = session.run(cypher,inputMapListMap);
List<Record> retList = stmtRes.list();
for (Record record2 : retList) {
MapValue retMap = (MapValue) record2.get(0);
sb.append(retMap.get("id1")
+","+retMap.get("id2")
+","+retMap.get("id3")
+","+retMap.get("gppeprop1")
+","+retMap.get("gppeprop2")
+","+retMap.get("ppeprop1")
+","+retMap.get("ppeprop2")
+","+retMap.get("peprop1")
+","+retMap.get("peprop2")
+","+retMap.get("outcol1")
+","+retMap.get("outcol2")
+","+retMap.get("outcol3")
+"\n"
);
}
inputMapList.clear();
}
}
if(inputMapList.size() != 0) //ingest remaining rows which does not complete
//10000 reords failing to form next batch
{
inputMapListMap.put("inputMapList", inputMapList);
StatementResult stmtRes = session.run(cypher,inputMapListMap);
List<Record> retList = stmtRes.list();
for (Record record2 : retList) {
MapValue retMap = (MapValue) record2.get(0);
sb.append(retMap.get("id1")
+","+retMap.get("id2")
+","+retMap.get("id3")
+","+retMap.get("gppeprop1")
+","+retMap.get("gppeprop2")
+","+retMap.get("ppeprop1")
+","+retMap.get("ppeprop2")
+","+retMap.get("peprop1")
+","+retMap.get("peprop2")
+","+retMap.get("outcol1")
+","+retMap.get("outcol2")
+","+retMap.get("outcol3")
+"\n"
);
}
}
pw.write(sb.toString());
pw.close();
}
}
在非批处理模式下运行会减慢 10 倍的速度。当没有遵循指南 2 到 4 时,速度会慢很多。如果以上所有事情都正确,如果我犯了任何错误,如果我遗漏了什么,如果可以进一步改进,我会爱一个人。