【问题标题】:Hot to create difference Stream using Kafka Streams in Java?在 Java 中使用 Kafka 流创建差异流很热门?
【发布时间】:2019-07-03 17:40:05
【问题描述】:

我正在尝试从 Kafka Java 中的 KStream 创建一个“差异”流。

我有一个输入流,其中的值是一组 Doubles V0 ... Vn。输出流应计算 V0 - 0、V1 - V0、V2 - V1 ... Vn -Vn-1 之间的差异。

我的第一个想法是做这样的事情:

    KStream<String, Double> stream = builder.stream(TOPIC)

    KTable<String, Double> difference = stream.groupByKey().reduce(
            (oldValue, newValue) -> {
              return newValue - oldValue
            }
    ).toStream()

假设我有一个具有以下值的 KStream 输入:

Key  -> Value
"A1" -> 2 
"B2" -> 4
"A1" -> 6
"A1" -> 10
"B2" -> 13 
"A1" -> 7

我想使用以下值创建一个新的 Stream 输出:

Key  -> Value
"A1" ->  2  (2-0  =  2) 
"B2" ->  4  (4-0  =  4)
"A1" ->  4  (6-2  =  4)
"A1" ->  4  (10-6 =  4)
"B2" ->  9  (13-4 =  9)
"A1" -> -3  (7-10 = -3)

【问题讨论】:

    标签: java apache-kafka stream


    【解决方案1】:

    你可以使用类似的东西

            stream.groupByKey().aggregate(Diff::new, new Aggregator<String, Double, Diff>() {
    
            @Override
            public Diff apply(String key, Double newValue, Diff aggregate) {
                Double difference = newValue - aggregate.getLastValue();
                aggregate.setDifference(difference);
                aggregate.setLastValue(newValue);
                return aggregate;
            }
            }).mapValues(new ValueMapper<Diff, Double>() {
    
            @Override
            public Double apply(Diff value) {
                return value.getDifference();
            }
    
        }).toStream().to("diff");
    

    在哪里

    public class Diff {
    
      private Double lastValue = 0d;
    
      private Double difference = 0d;
      //getters and setters
      // ...
    }
    

    【讨论】:

      猜你喜欢
      • 2016-03-27
      • 2018-08-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-24
      • 1970-01-01
      • 2018-12-21
      相关资源
      最近更新 更多