【发布时间】:2018-09-06 14:16:13
【问题描述】:
我有一个使用 Kafka Streams 的 Spring Boot 应用程序。我有一个带有一些金融货币报价的 KTable,它是这样创建的:
@Bean(name = "indicativeQuotes")
public KTable<String, Quote> quoteKTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.table(quoteTopicName,
Materialized.<String,Quote,KeyValueStore<Bytes,byte[]>>as("quoteTable")
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(Quote.class)));
}
我在另一个组件中 @Autowire 这个 bean,并使用以下代码对其进行测试:
@Autowired
private KTable<String, Quote> indicativeQuotes;
@PostConstruct
private void postConstruct() {
doPrint();
}
public void doPrint() {
ReadOnlyKeyValueStore<String, Quote> store = streamsBuilderFactoryBean.getKafkaStreams().store("quoteTable", QueryableStoreTypes.keyValueStore());
store.all().forEachRemaining(keyValue -> log.info("Key: " + keyValue.key + " Value: " + keyValue.value));
indicativeQuotes.foreach((k,v) -> log.info(k));}
代码在通过 store 查询时记录了正确的值,但它在 foreach() 中没有输出任何内容,就好像表是空的一样。我也尝试过 print() 和其他选项 - 没有任何异常都没有输出。
我开始认为我不能像那样注入 KTable bean,但是关于 kafka 流主题的 Spring 文档非常稀缺,我找不到好的例子。任何帮助将不胜感激。
更新。
我的用例是我有一个预定的 Quartz 作业,它应该在触发时将 KTable 的当前状态写入 Kafka 主题,如下所示:
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
TriggerKey triggerKey = jobExecutionContext.getTrigger().getKey();
log.info("Job was triggered by: {}", triggerKey.getName());
indicativeQuotes.filter((key, value) -> key.equals(triggerKey.getName()))
.mapValues(quoteToCourseFixedMapper)
.toStream()
.peek((instrument, course)-> log.info("Sending courses for instrument: {}, {}", instrument, course))
.to(quoteEventTopicName);
}
但我认为这段代码不起作用,因为它不是处理拓扑的一部分,我不能按需从 Ktable 中获取数据。我在这里有点疑惑,当然我可以在事件触发时通过 store 查询数据,但是对于这种用例,也许有更好的模式?基本上我很感兴趣是否可以将此触发的作业事件作为处理管道的一部分。
【问题讨论】:
-
我猜这与缓存有关。参照。 docs.confluent.io/current/streams/developer-guide/…
-
嗨,马蒂亚斯,感谢您的回答。看完这个问题,由你回答:stackoverflow.com/questions/50440550/… 我觉得我对流处理的概念还没有完全掌握。你能看看更新吗?
-
不确定我是否可以关注。您是否更改了配置以将缓存大小设置为零但仍有问题?此外,默认情况下,您应该会看到大约 30 秒的输出,默认提交间隔是多少 - 提交时缓存会被刷新。
标签: spring spring-boot apache-kafka-streams spring-kafka