【问题标题】:Kafka streams aggregate with deletes and/or key changesKafka 流与删除和/或关键更改聚合
【发布时间】:2019-03-23 16:13:00
【问题描述】:

我正在尝试定义一个 kafka 流,它接受来自某个主题的记录,例如 EMPLOYEE,其中记录包含有关员工及其部门的属性,并将其转换为另一个主题 DEPARTMENT,其中包含部门属性,以及所有员工的列表(包含一些无状态转换的员工)。

EMPLOYEE 记录重复部门数据。 (我实际上是在处理一些 DICOM 标头数据,但我会坚持一个更普遍理解的关系。我试图理解一个通用的解决方案)。此外,主题中的记录只有当前数据(即:没有先前的部门ID,如果部门发生了变化。)

这似乎是一项聚合工作。我有一些似乎适用于简单案例的东西:

        ...
        KStream<String, Employee> stream = kStreamBuilder.stream("EMPLOYEE"); // Stream from raw EMPLOYEE
        stream.map((k, v) -> new KeyValue<>(k, transformEmployee(v))) // <-- some stateless enrichment of the employee
                .groupBy((k, emp) -> emp.getDepartmentId(), jsonSerialisedWith(Employee.class))

                // dummy reduce to a get a ktable for agg:
                .reduce((aggValue, newEmp) -> newEmp) 
                .groupBy((k, emp2) -> new KeyValue<>(emp2.getDepartmentId(), emp2), jsonSerialisedWith(Employee.class))

                .aggregate(Department::new, this::addEmployee, this::removeEmployee,
                           jsonValueMaterializedAs("DEPARTMENT-AGG", Department.class))
                .toStream()
                .to("DEPARTMENT", jsonProducedWith(Department.class));
        ...

    private Department addEmployee(String deptId, Employee employee, Department department) {
        department.addEmployee(employee);
        if (department.getId() == null) {
            department.setId(employee.getDepartmentId());
            department.setName(employee.getDepartmentName());
        }
        return department;
    }

这适用于添加或更新。但是,随着时间的推移,员工可能会被删除或重新分配到另一个部门。我认为删除应该是发送到 EMPLOYEE 主题的墓碑记录(k:empId,v:null)。但是,我不再拥有部门 ID,我必须进行空检查(并为部门 ID 返回空),因此删除员工时永远不会发生 removeEmployee。 更改部门ID 的类似问题。

那么,kafka 的处理方法是什么?

【问题讨论】:

  • 您的需求很复杂,因此使用 DSL 不容易表达。我建议改用处理器 API。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

我认为使用你的代码就足够了,但稍微改变删除员工的语义。

您应该添加某种Mock 部门(将在用户从部门中删除时使用)。

如果员工被删除,而不是将部门设置为null,则应将其分配给Mock部门。

【讨论】: