【问题标题】:Migrate Kafka Streams code to Spring Cloud Stream?将 Kafka Streams 代码迁移到 Spring Cloud Stream?
【发布时间】:2023-04-05 14:57:01
【问题描述】:

Spring 云流是否支持以下 Kafka Streams 应用程序。 以下是 Kafka 示例应用程序的摘录代码。感谢您提供任何反馈或支持。

        ...
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        KStream<String, Purchase> purchaseKStream = streamsBuilder.stream.....
        KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
        patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
        patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
        purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
        purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));

        // adding State to processor
        String rewardsStateStoreName = "rewardsPointsStore";
        RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
        KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
        StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
        streamsBuilder.addStateStore(storeBuilder);
        KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
        KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream
                .transformValues(() -> new PurchaseRewardTransformer(rewardsStateStoreName), rewardsStateStoreName);
        statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
        statefulRewardAccumulator.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), getProperties());

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-kafka spring-cloud-stream


    【解决方案1】:

    假设您使用的是 Spring Cloud Stream Horsham (3.0.0) 版本,以下伪代码应该可以工作。我没有测试此代码,但这应该适用于目的地的正确配置等。请查看docs

    @SpringBootApplicaiton
    public class SampleApp {
    
      public static void main(String[] args) {
       SpringApplication.run(SampleApp.class, args);
      }
    
       @Bean
       public Function<KStream<String, Purchase>, KStream<String, RewardAccumulator>> process() {
         return purchaseKStream -> {
               purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
               KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
               patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
               patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
                    purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));
    
               KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
               KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(()
                    statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
    
               return statefulRewardAccumulator;
           }        
       }
    
       @Bean
       public StoreBuilder storeBuilder() {
         String rewardsStateStoreName = "rewardsPointsStore";
         RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
         KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
         StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
         return storeBuilder;
       }
    
    }
    

    【讨论】:

    • 感谢@sobychacko,感谢您在社区中的积极支持。
    • 嗨@sobychacko,在上面的代码中,函数中没有引用storeBuilder()方法,该方法只是为了提供信息吗?
    • 在您的原始代码中如何使用该状态存储?
    • 状态存储在 PurchaseRewardTransformer 类中,下面是方法。 public RewardAccumulator transform(Purchase value) { RewardAccumulator rewardAccumulator = RewardAccumulator.builder(value).build(); Integer accumulatedSoFar = stateStore.get(rewardAccumulator.getCustomerId()); if (accumulatedSoFar != null) { rewardAccumulator.addRewardPoints(accumulatedSoFar); } stateStore.put(rewardAccumulator.getCustomerId(), rewardAccumulator.getTotalRewardPoints()); return rewardAccumulator; }
    • 我已经编辑了原始问题,以便更清楚地了解商店。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-03
    • 2018-04-28
    • 1970-01-01
    • 2021-06-07
    • 2019-03-16
    相关资源
    最近更新 更多