【发布时间】:2018-05-08 15:50:24
【问题描述】:
我正在 kafka 流中处理 processor node。对于一个简单的代码,我写如下只是为了过滤 UserID ,这是在 kafka 流中执行 processor node 的正确方法吗?
但是,下面的代码无法编译,会抛出错误:The method filter(Predicate<? super Object,? super Object>) in the type KStream<Object,Object> is not applicable for the arguments (new Predicate<String,String>(){})
KStreamBuilder builder = new KStreamBuilder();
builder.stream(topic)
.filter(new Predicate <String, String>() {
//@Override
public boolean test(String key, String value) {
Hashtable<Object, Object> message;
// put you processor logic here
return message.get("UserID").equals("1");
}
})
.to(streamouttopic);
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
有人可以指导我吗?
【问题讨论】:
标签: apache-kafka apache-kafka-streams