【问题标题】:How to windowing the kafka stream events?如何窗口化kafka流事件?
【发布时间】:2021-09-08 14:03:04
【问题描述】:

我目前正在从事 kafka 流项目,该项目尝试计算例如每 5 秒产生的句子中的单词“cat”(每秒产生一个句子),并以 cat xxx 之类的形式发回答案(过去 5 秒内发生的次数)。我是 java 新手,但在搜索了类似问题后,我发现了代码(基于 kafka 流字数演示),但运行时出现错误。

Java 脚本

package org.apache.kafka.streams.examples.wordcount;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;

import java.time.Duration;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public final class WordCountDemo {

    public static final String INPUT_TOPIC = "streams-plaintext-input";
    public static final String OUTPUT_TOPIC = "streams-wordcount-output";

    static Properties getStreamsConfig(final String[] args) throws IOException {
        final Properties props = new Properties();
        if (args != null && args.length > 0) {
            try (final FileInputStream fis = new FileInputStream(args[0])) {
                props.load(fis);
            }
            if (args.length > 1) {
                System.out.println("Warning: Some command line arguments were ignored. This demo only accepts an optional configuration file.");
            }
        }
        props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    static void createWordCountStream(final StreamsBuilder builder) {
        final KStream<String, String> source = builder.stream(INPUT_TOPIC);

        final KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
            .count()
            .filter((key, value) -> value.equals("cat"));
            

        counts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
    }

    public static void main(final String[] args) throws IOException {
        final Properties props = getStreamsConfig(args);

        final StreamsBuilder builder = new StreamsBuilder();
        createWordCountStream(builder);
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

一个错误

C:\src\kafka-2.7.0-src>START /b /wait cmd /C "gradle assemble -x clients:javadoc streams:test-utils:javadoc streams:streams-scala:scaladoc connect:mirror-client:javadoc connect:api:javadoc core:javadoc core:compileScala"

> Configure project :
Building project 'core' with Scala version 2.13.3
Building project 'streams-scala' with Scala version 2.13.3

> Task :streams:examples:compileJava FAILED
C:\src\kafka-2.7.0-src\streams\examples\src\main\java\org\apache\kafka\streams\examples\wordcount\WordCountDemo.java:86: error: incompatible types: KTable<Windowed<String>,Long> cannot be converted to KTable<String,Long>
            .filter((key, value) -> value.equals("cat"));
                   ^
1 error

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':streams:examples:compileJava'.
> Compilation failed; see the compiler error output for details.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.6/userguide/command_line_interface.html#sec:command_line_warnings

BUILD FAILED in 3s
156 actionable tasks: 4 executed, 152 up-to-date

【问题讨论】:

  • 你为什么要编辑/编译 Kafka 源代码?这些示例旨在在项目之外复制(尽管我认为它们用于回归测试)

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

错误表明您的类型在您过滤时不正确。

如果要过滤表,请在groupBy 之前使用filter,而不是在count 之后,因为count 操作的“值”是Long,而不是String

如果您只想过滤输出主题,请使用toStream().filter


或者,不要 filter 流/表中的特定单词,而是使用 StateStore 选择性地查找 "cat" 和 KTable 中的任何其他单词

【讨论】:

  • 我曾尝试在 group by 之前使用过滤器,但它不起作用,我想过滤掉输出消息而不是主题。但是我只能看到“.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))”是问题所在,因为我可以通过注释这一行来运行脚本而不会出现任何错误(每次收到结果时都会打印出不延迟 5 秒循环的句子),问题似乎是窗口方法的数据类型输出与其他方法不兼容。你能在这方面帮助我吗?
  • 如果你使用控制台生产者,你不能在那里过滤数据,所以INPUT_TOPIC总是有需要的字符串。您需要在生成数据的同时运行 Streams 代码才能使时间窗口正常工作;您不会期望窗口与主题中的现有数据一起工作,或者等待比窗口更长的时间您为窗口生成数据以捕获任何内容之后。你可以调整窗口,如果这不是你想要的kafka.apache.org/28/documentation/streams/developer-guide/…
  • 问题不在于“兼容性”,而在于您运行代码的顺序,而不是其编写方式(编译器错误与运行时行为不同)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-12-18
  • 2020-04-02
  • 2018-11-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多