【问题标题】:Dynamically materialize KTable aggregate to different state stores将 KTable 聚合动态物化到不同的状态存储
【发布时间】:2020-07-01 13:36:58
【问题描述】:

有没有办法动态选择将 KTable 物化到哪个状态存储?目前的名字是MonthlyAggregates.NAME,我想每个月在当月命名的状态存储中实现,例如2020-JULY。但由于拓扑会在系统启动时被读取和构建,因此只会使用初始化期间使用的状态存储。

    @Bean
    fun leistung() =
        Consumer<KStream<ByteArray, ByteArray>> {
            it
                .transform(TransformerSupplier { EventTypeAwareTransformer(EVENT_TYPE_MAPPING, objectMapper) })
                .map { _, v -> KeyValue(createKey(v.payload as VerrechenbareLeistungPerformedEvent), v.payload) }
                .peek { key, value -> validateTechnicalLocation(key, value) }
                .transform(TransformerSupplier { DuplicateFilteringTransformer() }, LeistungEvents.NAME)
                .groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(VerrechenbareLeistungPerformedEvent::class.java)))
                .aggregate(
                    { ItemAggregator() },
                    { _, event, aggregator -> aggregator.addItem(event) },
                    Named.`as`("aggregate"),
                    Materialized.`as`<String, ItemAggregator, KeyValueStore<Bytes, ByteArray>>(MonthlyAggregates.NAME)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(JsonSerde(ItemAggregator::class.java))
                )
        }

我也尝试通过利用处理器 API 动态添加状态存储:

class DuplicateFilteringProcessor(private val topology: Topology) : Processor<String, VerrechenbareLeistungPerformedEvent> {
    private lateinit var processorContext: ProcessorContext
    private lateinit var stateStore: KeyValueStore<String, VerrechenbareLeistungPerformedEvent>

    override fun init(context: ProcessorContext) {
        processorContext = context
        val storeBuilder = KeyValueStoreBuilder<String, VerrechenbareLeistungPerformedEvent>(
            Stores.persistentKeyValueStore(LeistungEvents.NAME),
            Serdes.StringSerde(),
            JsonSerde(VerrechenbareLeistungPerformedEvent::class.java),
            Time.SYSTEM
        )
        topology.addStateStore(storeBuilder, "filter_duplicates")
        topology.connectProcessorAndStateStores("filter_duplicates", LeistungEvents.NAME)
        val store = storeBuilder.build()
        processorContext.register(store) { _, _ -> }
        stateStore = processorContext.getStateStore(LeistungEvents.NAME) as KeyValueStore<String, VerrechenbareLeistungPerformedEvent>
    }

    override fun process(key: String, value: VerrechenbareLeistungPerformedEvent) {
        val eventInStateStore = stateStore.get(value.businessId)
        if (eventInStateStore == null) {
            stateStore.putIfAbsent(value.businessId, value)
            processorContext.forward(key, value)
        } else {
            logger().error("""Event with businessId ${value.businessId} has already been processed
                            Event in state store: $eventInStateStore
                            Event just received:  $value""".trimIndent())
        }
//        processorContext.commit()
    }

    override fun close() {
        // stateStore closing will be managed by Kafka/Spring
    }

}

但在这种情况下,状态存储不会转换为打开状态,并且任何请求都会导致 NPE。

【问题讨论】:

    标签: apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    不支持。无法重命名状态存储。

    【讨论】:

    • 我不想重命名商店,我想每个月都用一个新的,但我自己也没有找到解决办法。
    • 我也用处理器 API 尝试过,虽然我可以添加存储,但是当我在拓扑的 KafkaStreams 启动后这样做时,存储仍然没有打开(即.isOpen()返回假)。那么,状态存储真的不打算用作数据库,而只是用作由 Kafka 单独管理的存储?
    • 好吧,如果这就是您所说的“用作数据库”,那么您不能动态添加/删除存储/表。 Kafka Streams 执行一个静态数据流程序。您需要预先定义所有内容。 -- 但是,使用交互式查询,您可以像在数据库中一样访问状态(虽然是只读的)。 ——不确定你的期望是什么? -- 最后,它显然不是数据库,如果您使用 KS 构建应用程序,与使用传统数据库相比,您会应用不同的模式。
    猜你喜欢
    • 2018-12-28
    • 2016-09-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-22
    • 1970-01-01
    • 1970-01-01
    • 2021-05-19
    相关资源
    最近更新 更多