【问题标题】:KStreamBuilder not able to stream data from 1 topic to anotherKStreamBuilder 无法将数据从 1 个主题流式传输到另一个主题
【发布时间】:2016-07-26 14:21:43
【问题描述】:

我试图使用 KStreamBuilder 将数据从一个主题移动到另一个主题。我尝试了以下代码,但有异常

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public class StreamsInTopic {

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStreamBuilder builder = new KStreamBuilder();
    System.out.println("KStreamBuilder initialized!!");

    builder.stream("nil_PF1_P1").to("nil_RF1_P1_1");
    System.out.println("Streaming prepared!!");

    KafkaStreams streams = new KafkaStreams(builder, props);
    System.out.println("KafkaStreams Initialised!!");

    streams.start();
    System.out.println("Streams started!!");

    Thread.sleep(30000L);
    streams.close();
    System.out.println("Streams closed!!");
}
}

输出:

KStreamBuilder initialized!!
Streaming prepared!!
log4j:WARN No appenders could be found for logger (org.apache.kafka.streams.StreamsConfig).
log4j:WARN Please initialize the log4j system properly.
KafkaStreams Initialised!!
Streams started!!
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Streams closed!!

然后我尝试使用数据。

$  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF1_P1_1 --from-beginning

有什么想法吗?我需要任何额外的配置吗? 我正在使用 kafka 0.10.0.0 集群和客户端。

使用的依赖项。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.0.0</version>
</dependency>

【问题讨论】:

  • java.lang.IllegalArgumentException: Invalid timestamp -1 错误的原因很可能是您一直在使用非 0.10 生产者将数据写入输入主题。

标签: apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:

查看您在问题中分享的内容,问题似乎是您没有将任何数据写入(= 生成)输入主题“nil_PF1_P1”:

  • Kafka Streams 应用程序配置为将数据从 Kafka 输入主题“nil_PF1_P1”写入 Kafka 输出主题“nil_RF1_P1_1”。
  • 控制台使用者从(应用程序的输出主题)“nil_RF1_P1_1”读取任何数据。
  • 但您没有提及是否或如何将数据输入主题“nil_PF1_P1”。

另外:您将立即在代码中关闭 Kafka Streams 实例:

streams.start();
System.out.println("Streams started!!");

//Thread.sleep(1000L);
streams.close();

这不会给应用程序足够的时间来实际执行任何处理。通常,您只需在上面的 main 方法中调用 streams.start(),并在您的 Java 应用程序中注册一个关闭钩子,该钩子会在被触发时调用 streams.close()

出于测试/开发目的,您当然也可以从main() 中调用streams.close(),但是我会增加启动和关闭之间的睡眠时间(例如,尝试 30 秒而不是 1 秒)--但当然,您还需要确保在该时间窗口内您实际上正在向应用程序的输入主题写入一些数据。

编辑:java.lang.IllegalArgumentException: Invalid timestamp -1 错误的原因很可能是您使用非 0.10 生产者将数据写入输入主题。详情请见http://docs.confluent.io/current/streams/faq.html#invalid-timestamp-exception

【讨论】:

  • 谢谢米古诺。我增加了睡眠时间。它给出了我重新发布的例外情况。请看一下,如果你能帮忙!!
【解决方案2】:

Kafka Stream 的第一个版本是 0.10,因此要求写入主题的所有记录都具有关联的时间戳(键和值旁边的附加字段,在 v0.10 中引入)。对于 Streams,此时间戳不能为负(即使代理不检查此并允许插入具有负时间戳的数据)。

因此,使用较旧的 Java 生产者(即 0.10 之前的生产者)编写的主题可能会写入缺少时间戳字段的记录。您也有可能使用“旧”主题,即写入 0.9 代理的主题,然后您将代理升级到 0.10——所有这些消息都不会设置时间戳。出于兼容性原因,KafkaConsumer (v0.10) 将缺少的时间戳设置为值 -1

在Kafka Streams中,在内部,来自输入消息的时间戳被“转发”到输出消息,因此,如果您使用没有时间戳的消息,Kafka Streams会尝试将带有时间戳-1的消息写入输出主题,从而导致上述错误. (Kafka Streams 使用 0.10 Java 生产者检查时间戳是否有效,并为负时间戳值抛出上述异常)。

为避免此问题,您需要通过流配置参数timestamp.extractor 更改使用的时间戳提取器(请参阅http://docs.confluent.io/3.0.0/streams/developer-guide.html#optional-configuration-parameters)。根据您的语义,您可以使用 WallclockTimestampExtractor 或提供自定义提取器。

【讨论】:

    猜你喜欢
    • 2017-07-12
    • 2018-10-13
    • 2017-12-13
    • 1970-01-01
    • 2018-11-18
    • 1970-01-01
    • 2020-05-09
    • 2019-07-25
    • 1970-01-01
    相关资源
    最近更新 更多