【问题标题】:Is there a way to track end-to-end data lineage through Neo4j Cypher query?有没有办法通过 Neo4j Cypher 查询来跟踪端到端数据沿袭?
【发布时间】:2026-01-18 21:50:01
【问题描述】:

我正在使用 Spring-Data 和 SpringBoot 来填充我的 Neo4j 图形数据库。

我定义了以下 Neo4j 实体:

Source实体 -->

@NodeEntity
public class Source implements Comparable<Source> {

    @GraphId private Long id;

    private String name;
    private SourceType type;
    private String dataStoreName;
    private String dataStoreDesc;

    private Source() {
        // Empty constructor required as of Neo4j API 2.0.5
    };

    public Source(String name, SourceType type, String dataStoreName, String dataStoreDesc) {
        this.name = name;
        this.type = type;
        this.dataStoreName = dataStoreName;
        this.dataStoreDesc = dataStoreDesc;
    }
    @Relationship(type = "CONTAINS", direction = Relationship.UNDIRECTED)
    public Set<Field> fields;

    public void contains(Field field) {
        if (fields == null) {
            fields = new HashSet<Field>();
        }
        fields.add(field);
    }


    /* Getter and Setters */

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public SourceType getType() {
        return type;
    }

    public void setType(SourceType type) {
        this.type = type;
    }

    public String getDataStoreName() {
        return dataStoreName;
    }

    public void setDataStoreName(String dataStoreName) {
        this.dataStoreName = dataStoreName;
    }

    public String getDataStoreDesc() {
        return dataStoreDesc;
    }

    public void setDataStoreDesc(String dataStoreDesc) {
        this.dataStoreDesc = dataStoreDesc;
    }

    public Set<Field> getFields() {
        return fields;
    }

    public void setFields(Set<Field> fields) {
        this.fields = fields;
    }

    @Override
    public int compareTo(Source other) {
        String name = other.getName();
        SourceType type = other.getType();
        if(this.name.equalsIgnoreCase(name) && this.type.equals(type))
            return 0;

        return -1;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        result = prime * result + ((type == null) ? 0 : type.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        Source other = (Source) obj;
        if (name == null) {
            if (other.name != null)
                return false;
        } else if (!name.equals(other.name))
            return false;
        if (type != other.type)
            return false;
        return true;
    }

}

Field实体-->

@NodeEntity
    public class Field implements Comparable<Field> {

        @GraphId private Long id;

        private String name;
        private FieldType fieldType;
        private SourceType sourceType;

        private String logicalName;
        private String dataType;
        private String dataSize;
        private String description;

        private Field() {
            // Empty constructor required as of Neo4j API 2.0.5
        };

        public Field(String name, FieldType fieldType, SourceType sourceType, String logicalName, String dataType, String dataSize, String description) {
            this.name = name;
            this.fieldType = fieldType;
            this.sourceType = sourceType;
            this.logicalName = logicalName;
            this.dataType = dataType;
            this.dataSize = dataSize;
            this.description = description;
        }
        @Relationship(type = "MAPS-TO", direction = Relationship.UNDIRECTED)
        public Set<Field> fields;

        public void mapsTo(Field field) {
            if (fields == null) {
                fields = new HashSet<Field>();
            }
            fields.add(field);
        }

        /* Getter and Setters */

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public FieldType getFieldType() {
            return fieldType;
        }

        public void setFieldType(FieldType fieldType) {
            this.fieldType = fieldType;
        }

        public SourceType getSourceType() {
            return sourceType;
        }

        public void setSourceType(SourceType sourceType) {
            this.sourceType = sourceType;
        }

        public String getLogicalName() {
            return logicalName;
        }

        public void setLogicalName(String logicalName) {
            this.logicalName = logicalName;
        }

        public String getDataType() {
            return dataType;
        }

        public void setDataType(String dataType) {
            this.dataType = dataType;
        }

        public String getDataSize() {
            return dataSize;
        }

        public void setDataSize(String dataSize) {
            this.dataSize = dataSize;
        }

        public String getDescription() {
            return description;
        }

        public void setDescription(String description) {
            this.description = description;
        }

        public Set<Field> getFields() {
            return fields;
        }

        public void setFields(Set<Field> fields) {
            this.fields = fields;
        }

        @Override
        public int compareTo(Field other) {
            String name = other.getName();
            FieldType fieldType = other.getFieldType();
            SourceType sourceType = other.getSourceType();
            if(this.name.equalsIgnoreCase(name) && this.fieldType.equals(fieldType) && this.sourceType.equals(sourceType))
                return 0;

            return -1;
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((fieldType == null) ? 0 : fieldType.hashCode());
            result = prime * result + ((name == null) ? 0 : name.hashCode());
            result = prime * result + ((sourceType == null) ? 0 : sourceType.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Field other = (Field) obj;
            if (fieldType != other.fieldType)
                return false;
            if (name == null) {
                if (other.name != null)
                    return false;
            } else if (!name.equals(other.name))
                return false;
            if (sourceType != other.sourceType)
                return false;
            return true;
        }


    }

所以,SourceCONTAINS 多个Fields。而FieldMAPS-TO 一个或多个其他Fields。

每个Source 都属于SourceType

我不同的SourceTypes 是:生产者、入站、分期、中间、出站、消费者。

public enum SourceType {
        PRODUCER, INBOUND, STAGING, INTERMEDIATE, OUTBOUND, CONSUMER;
    }

每个Field 都属于FieldType

我不同的FieldTypes 是:FILE_FIELD、DB_COLUMN。

public enum FieldType {
    FILE_FIELD, DB_COLUMN;
}

我的数据沿袭如下: 生产者 --> 入站 --> 分期 --> 中间 --> 出站 --> 消费者

我现在正在寻找一个高级 Cypher 查询,如果我在 CONSUMER Source 中提供 Field,我可以跟踪它的血统back,直到 PRODUCER Source .

同样,我也在寻找一个查询,如果我在 PRODUCER Source 中提供 Field,我可以跟踪它的沿袭向前直到 CONSUMER Source

我尝试使用 shortestPathneighbors 函数构建查询,但它似乎无法提取我正在寻找的结果。

任何建议/指针将不胜感激。

提前致谢!

UPDATE-1

我的数据沿袭背景: 我的应用程序从外部应用程序 (PRODUCE) 获取文件。 我知道哪些数据库表/外部应用程序的列填充了文件中的字段。 所以在这里,PRODUCER 将是我的Source 节点;外部应用程序(填充文件)的每个 table.column 是一个 Field 节点,PRODUCER Source 节点将与所有 Field 节点具有 CONTAINS 关系(表示填充的外部应用程序数据库表的 table.column文件)。

来自外部应用程序的文件称为 INBOUND。它是一个逗号分隔的文件。 我知道文件中的字段名称和顺序是什么。 所以在这里,INBOUND 将是我的Source 节点;文件中的每个字段都是Field 节点,INBOUND Source 节点将与所有Field 节点具有CONTAINS 关系(表示入站文件中的文件字段)。 此外,INBOUND Source 的每个 Field 节点将与生产者 SourceField 节点具有 MAPS_TO 关系(一对一映射)。

继续进行类似的工作流程,我的下一个阶段称为 STAGING,其中我将入站文件字段加载到我的数据库表/列中。 所以在这里,STAGING 将是我的Source 节点,并且数据库表的每一列(我将文件字段加载到其中)将代表一个Field 节点。 STAGING Source 节点将与所有 Field 节点(代表我加载文件字段的 db table 的 db table.column)有 CONTAINS 关系。 此外,STAGING Source 的每个 Field 节点将与 INBOUND SourceField 节点具有 MAPS_TO 关系(一对一映射)。

同样,我的下一个阶段是中级。在这个阶段,我正在查询加载输入文件字段的表,然后将输出刷新到另一个文件中(根据我的业务用例,我可能选择查询所有或仅查询表列的子集从输入文件填充)。 我知道哪些字段以及以什么顺序进入我的中间文件。 所以在这里,中间是我的Source 节点,进入中间文件的每个字段都代表我的Field 节点。此外,INTERMEDIATE Source 将与代表中间文件中的字段的所有 Field 节点具有 CONTAINS 关系。 此外,这些Field 节点中的每一个都将与 STAGING Source 的字段具有MAPS_TO 关系(一对一映射)。

同样,我有 OUTBOUND 阶段,最后是 CONSUMER 阶段。

...(我希望你现在能够形象化血统)

我的查询的目标是,比如说,如果我给出一个 Field 名称(代表 PRODUCER 的 table.column)作为输入,那么我应该能够追踪它的沿袭直到 CONSUMER(即最后一个我血统的阶段)。

【问题讨论】:

  • 快速说明:Neo4j 标签不能有连字符,所以你应该将MAPS-TO 更改为MAPS_TO
  • “不能有”或“不应该有”?因为我已经用上述关系填充了我的数据库,我也可以在 Neo4j 上直观地看到它们。
  • 好的,你可以在图中插入带有连字符的标签,但是 Cypher 不支持这样的标签。如果您尝试运行查询MATCH (n)-[:MAPS-TO]-&gt;(m) RETURN n, m,您将得到:Invalid input '-': expected an identifier character, whitespace, '|', a length specification, a property map or ']' (line 1, column 17 (offset: 16))
  • @GáborSzárnyas 带有特殊字符的标签、类型或属性需要反引号,但除此之外没有问题。

标签: neo4j path spring-data cypher data-lineage


【解决方案1】:

同样,我也在寻找一个查询,如果我在 PRODUCER Source 中提供 Field,我可以跟踪它的沿袭向前直到 CONSUMER Source

我认为我还没有完全理解您的数据模型和要求,但这里是这个查询的一个想法:

MATCH
  (:Field  {name: { fieldName } })<-[:CONTAINS]-
  (:Source {type: "PRODUCER"    })-[:MAPS_TO]->
  (:Source {type: "INBOUND"     })-[:MAPS_TO]->
  (:Source {type: "STAGING"     })-[:MAPS_TO]->
  (:Source {type: "INTERMEDIATE"})-[:MAPS_TO]->
  (:Source {type: "OUTBOUND"    })-[:MAPS_TO]->
  (consumer:Source {type: "CONSUMER"    })
RETURN consumer

【讨论】:

  • Tx 但这似乎并没有让我了解血统。我刚刚更新了我的初始帖子以详细描述我的数据模型。很抱歉描述太长,但只是想确保正确理解我的血统。 Tx 以提供任何进一步的建议/指针。
  • 我能够通过以下查询获得所需的数据沿袭:MATCH (f5:Field)-[:MAPS_TO]-(f4:Field)-[:MAPS_TO]-(f3:Field)-[:MAPS_TO]-(f2:Field)-[:MAPS_TO]-(f1:Field)-[:MAPS_TO]-(f:Field)&lt;-[:CONTAINS]-(s:Source {type: "SOURCE"}) WHERE f.name="&lt;my input source field&gt;" RETURN f,s,f1,f2,f3,f4,f5
  • 太棒了!请随时将您的解决方案作为单独的答案发布并接受。需要考虑的一件事:我会为名称使用参数,因此条件变为f.name = { name },您可以在调用期间设置name 参数。
  • 谢谢!我已经发布了我的答案,但直到明天才能接受(我认为这是 SO 的限制)。不确定我是否可以在此线程中提问,但在不同的上下文中,您介意看看我面临的其他非常 weird 问题:[link]github.com/spring-projects/spring-data-neo4j/issues/374。再次感谢!
【解决方案2】:

我能够通过以下查询获得所需的数据沿袭:

MATCH (f5:Field)-[:MAPS_TO]-(f4:Field)-[:MAPS_TO]-(f3:Field)-[:MAP‌​S_TO]-(f2:Field)-[:M‌​APS_TO]-(f1:Field)-[‌​:MAPS_TO]-(f:Field)<‌​-[:CONTAINS]-(s:Sour‌​ce {type: "SOURCE"}) WHERE f.name="<my input source field>" RETURN f,s,f1,f2,f3,f4,f5

【讨论】: