【发布时间】:2021-09-08 14:03:04
【问题描述】:
我目前正在从事 kafka 流项目,该项目尝试计算例如每 5 秒产生的句子中的单词“cat”(每秒产生一个句子),并以 cat xxx 之类的形式发回答案(过去 5 秒内发生的次数)。我是 java 新手,但在搜索了类似问题后,我发现了代码(基于 kafka 流字数演示),但运行时出现错误。
Java 脚本
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import java.time.Duration;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public final class WordCountDemo {
public static final String INPUT_TOPIC = "streams-plaintext-input";
public static final String OUTPUT_TOPIC = "streams-wordcount-output";
static Properties getStreamsConfig(final String[] args) throws IOException {
final Properties props = new Properties();
if (args != null && args.length > 0) {
try (final FileInputStream fis = new FileInputStream(args[0])) {
props.load(fis);
}
if (args.length > 1) {
System.out.println("Warning: Some command line arguments were ignored. This demo only accepts an optional configuration file.");
}
}
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
static void createWordCountStream(final StreamsBuilder builder) {
final KStream<String, String> source = builder.stream(INPUT_TOPIC);
final KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count()
.filter((key, value) -> value.equals("cat"));
counts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
}
public static void main(final String[] args) throws IOException {
final Properties props = getStreamsConfig(args);
final StreamsBuilder builder = new StreamsBuilder();
createWordCountStream(builder);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
一个错误
C:\src\kafka-2.7.0-src>START /b /wait cmd /C "gradle assemble -x clients:javadoc streams:test-utils:javadoc streams:streams-scala:scaladoc connect:mirror-client:javadoc connect:api:javadoc core:javadoc core:compileScala"
> Configure project :
Building project 'core' with Scala version 2.13.3
Building project 'streams-scala' with Scala version 2.13.3
> Task :streams:examples:compileJava FAILED
C:\src\kafka-2.7.0-src\streams\examples\src\main\java\org\apache\kafka\streams\examples\wordcount\WordCountDemo.java:86: error: incompatible types: KTable<Windowed<String>,Long> cannot be converted to KTable<String,Long>
.filter((key, value) -> value.equals("cat"));
^
1 error
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':streams:examples:compileJava'.
> Compilation failed; see the compiler error output for details.
* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.
* Get more help at https://help.gradle.org
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.6/userguide/command_line_interface.html#sec:command_line_warnings
BUILD FAILED in 3s
156 actionable tasks: 4 executed, 152 up-to-date
【问题讨论】:
-
你为什么要编辑/编译 Kafka 源代码?这些示例旨在在项目之外复制(尽管我认为它们用于回归测试)
标签: java apache-kafka apache-kafka-streams