【问题标题】:Kafka Streams processor API context.forwardKafka Streams 处理器 API context.forward
【发布时间】:2019-02-23 12:54:54
【问题描述】:

对于传入记录,我需要验证值并根据结果对象将错误转发到不同的主题,如果成功验证,则使用 context.forward() 转发相同的内容。 可以使用此链接中提供的 DSL 来完成

using kafka-streams to conditionally sort a json input stream

我没有在 processorAPI 中找到明确的方法。

    ValidateProcessor.java

    @Override
    public void process(String key, String value) {
        Object result = //validation logic
        if(result.isSuccessful()) {
            context().forward(key, value);
         }else {
            context.forward("error",Object)
        }

}

现在调用者再次需要检查并根据 key 需要区分 sink 主题。 我正在使用处理器API,因为我需要使用标头。

编辑:

branch(new predicate{
 business logic 
 if(condition)
   return true
 else
   return false;

当条件为假时如何推送到不同的流。目前正在创建另一个谓词,该谓词收集所有其他不满足链中上述谓词的记录。 有没有办法在同一个谓词中做?

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    当您指定 Topology 时,您为所有节点分配名称并连接它们:

    Topology topology = new Topology();
    topology.addSource("source", ...);
    topology.addProcessor("X", ..., "source"); // connect source->X
    topology.addSink("Y", ..., "X"); // connect X->Y
    topology.addSink("Z", ..., "X"); // connect X->Z
    

    如果处理器“X”连接到下游处理器“Y”和“Z”,您可以使用节点名称将记录发送到“Y”或“Z”。如果您不指定名称,则记录将发送到所有下游(“子”)处理器。

    // this is `process()` of "X"
    public void process(String key, String value) {
        context.forward(newKey, newValue); // send to both Y and Z
        context.forward(newKey, newValue, To.child("Y")); // send it only to Y
        context.forward(newKey, newValue, To.child("Z")); // send it only to Z
    }
    

    【讨论】:

    • 我在streamsBuilder和branch()中使用了谓词。如果谓词被评估为真,那么记录将被发送到一个分支流。但是在谓词的相同计算中,如果谓词被评估为假,我想要一些业务对象,但我不确定这会起作用。
    • 这个问题是关于处理器 API 的。 StreamsBuilderbranch() 是 DSL 的一部分。您应该发布一个新问题。
    • 如果我不转发上下文,记录是发送到所有下游(“子”)处理器还是不发送?
    • 如果您不调用forward(),则不会向下游发送任何记录。例如,如果您实现了一个过滤器,并且想要删除一条记录,您将跳过调用forward()
    猜你喜欢
    • 2017-10-07
    • 2017-01-07
    • 1970-01-01
    • 2019-06-02
    • 2018-08-25
    • 2017-07-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多