【问题标题】:KTable returns no data in Spring Boot application, however it can be queriedKTable 在 Spring Boot 应用程序中不返回任何数据,但可以查询
【发布时间】:2018-09-06 14:16:13
【问题描述】:

我有一个使用 Kafka Streams 的 Spring Boot 应用程序。我有一个带有一些金融货币报价的 KTable,它是这样创建的:

@Bean(name = "indicativeQuotes")
public KTable<String, Quote> quoteKTable(StreamsBuilder streamsBuilder) {
    return streamsBuilder.table(quoteTopicName,
            Materialized.<String,Quote,KeyValueStore<Bytes,byte[]>>as("quoteTable")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(Quote.class)));
}

我在另一个组件中 @Autowire 这个 bean,并使用以下代码对其进行测试:

@Autowired
private KTable<String, Quote> indicativeQuotes;

@PostConstruct
private void postConstruct() {
    doPrint();
}

public void doPrint() {
        ReadOnlyKeyValueStore<String, Quote> store = streamsBuilderFactoryBean.getKafkaStreams().store("quoteTable", QueryableStoreTypes.keyValueStore());
        store.all().forEachRemaining(keyValue -> log.info("Key: " + keyValue.key + " Value: " + keyValue.value));
        indicativeQuotes.foreach((k,v) -> log.info(k));}

代码在通过 store 查询时记录了正确的值,但它在 foreach() 中没有输出任何内容,就好像表是空的一样。我也尝试过 print() 和其他选项 - 没有任何异常都没有输出。

我开始认为我不能像那样注入 KTable bean,但是关于 kafka 流主题的 Spring 文档非常稀缺,我找不到好的例子。任何帮助将不胜感激。

更新。

我的用例是我有一个预定的 Quartz 作业,它应该在触发时将 KTable 的当前状态写入 Kafka 主题,如下所示:

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    TriggerKey triggerKey = jobExecutionContext.getTrigger().getKey();
    log.info("Job was triggered by: {}", triggerKey.getName());

    indicativeQuotes.filter((key, value) -> key.equals(triggerKey.getName()))
            .mapValues(quoteToCourseFixedMapper)
            .toStream()
            .peek((instrument, course)-> log.info("Sending courses for instrument: {}, {}", instrument, course))
            .to(quoteEventTopicName);
}

但我认为这段代码不起作用,因为它不是处理拓扑的一部分,我不能按需从 Ktable 中获取数据。我在这里有点疑惑,当然我可以在事件触发时通过 store 查询数据,但是对于这种用例,也许有更好的模式?基本上我很感兴趣是否可以将此触发的作业事件作为处理管道的一部分。

【问题讨论】:

  • 我猜这与缓存有关。参照。 docs.confluent.io/current/streams/developer-guide/…
  • 嗨,马蒂亚斯,感谢您的回答。看完这个问题,由你回答:stackoverflow.com/questions/50440550/… 我觉得我对流处理的概念还没有完全掌握。你能看看更新吗?
  • 不确定我是否可以关注。您是否更改了配置以将缓存大小设置为零但仍有问题?此外,默认情况下,您应该会看到大约 30 秒的输出,默认提交间隔是多少 - 提交时缓存会被刷新。

标签: spring spring-boot apache-kafka-streams spring-kafka


【解决方案1】:

如果您只想将更新发布到另一个主题,请将 KTable 转换为 KStream 并使用 to() 函数。

KTable ktable = ...;
KStream ksteram = ktable.toStream();
kstream.to("topic", Produces.with(keySerde, valueSerde))

主题将包含该表的更改日志。

但是

显然因为一些生命周期相关的概念,你不能只注入(@autowire)KStream/KTable。您应该尽可能地保持与 KafkaStreams 相关的代码类型。

因此,在您希望在某个“随机”时间对表的当前状态执行某些操作的特定情况下,您必须查询存储(表)。所以搜索 kafka steams 交互式查询。请记住,您需要从应用程序的所有实例中获取数据(如果您有多个实例。或者您可以使用全局存储。它需要一两天的搜索时间。

【讨论】:

    猜你喜欢
    • 2016-12-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-02
    • 2019-11-17
    • 2011-09-22
    • 1970-01-01
    • 2021-07-16
    相关资源
    最近更新 更多