【问题标题】:Processor node in kafka streamskafka 流中的处理器节点
【发布时间】:2018-05-08 15:50:24
【问题描述】:

我正在 kafka 流中处理 processor node。对于一个简单的代码,我写如下只是为了过滤 UserID ,这是在 kafka 流中执行 processor node 的正确方法吗?

但是,下面的代码无法编译,会抛出错误:The method filter(Predicate<? super Object,? super Object>) in the type KStream<Object,Object> is not applicable for the arguments (new Predicate<String,String>(){})

KStreamBuilder builder = new KStreamBuilder();

builder.stream(topic)
    .filter(new Predicate <String, String>() {
        //@Override
        public boolean test(String key, String value) {
            Hashtable<Object, Object> message;
            // put you processor logic here
            return message.get("UserID").equals("1");
        }
    })
    .to(streamouttopic);

    final KafkaStreams streams = new KafkaStreams(builder, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
       streams.close();
       latch.countDown();
    }
});

try {
    streams.start();
    latch.await();
} catch (Throwable e) {
    System.exit(1);
}
System.exit(0);

有人可以指导我吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    builder.stream(topic) 返回KStream&lt;Object,Object&gt; 类型,因为您没有指定泛型类型。并且&lt;Object,Object&gt;&lt;String,String&gt; 不兼容。

    如果您知道实际类型是KStream&lt;String,String&gt;,您可以按如下方式指定类型:

    builder.<Sting,String>stream(topic)
           .filter(...)
    

    回答您关于“处理器节点”的问题:是的,添加filter() 将在内部添加一个处理器节点。请注意,在 DSL 级别,您通常不需要考虑处理器。

    如果您想明确使用处理器,您可以使用处理器 API 而不是 DSL。查看 WordCount 示例:https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

    注意,使用 DSL,内部代码将被转换为处理器拓扑,这是 Kafka Streams 的运行时模型。

    【讨论】:

    【解决方案2】:

    您可能正在使用另一个包中的Predicate 类。你需要使用

    import org.apache.kafka.streams.kstream.Predicate;
    

    【讨论】:

    • 我使用的是同一个包。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多