【发布时间】:2019-02-23 12:54:54
【问题描述】:
对于传入记录,我需要验证值并根据结果对象将错误转发到不同的主题,如果成功验证,则使用 context.forward() 转发相同的内容。 可以使用此链接中提供的 DSL 来完成
using kafka-streams to conditionally sort a json input stream
我没有在 processorAPI 中找到明确的方法。
ValidateProcessor.java
@Override
public void process(String key, String value) {
Object result = //validation logic
if(result.isSuccessful()) {
context().forward(key, value);
}else {
context.forward("error",Object)
}
}
现在调用者再次需要检查并根据 key 需要区分 sink 主题。 我正在使用处理器API,因为我需要使用标头。
编辑:
branch(new predicate{
business logic
if(condition)
return true
else
return false;
当条件为假时如何推送到不同的流。目前正在创建另一个谓词,该谓词收集所有其他不满足链中上述谓词的记录。 有没有办法在同一个谓词中做?
【问题讨论】:
标签: java apache-kafka apache-kafka-streams