【问题标题】:Kafka time difference last two records, KSQL or other?Kafka时差最后两条记录,KSQL还是其他?
【发布时间】:2019-07-09 20:36:24
【问题描述】:

所以我正在评估 Kafka。在我们的用例中,必须创建包含从一个事件到下一个事件“经过的时间”的新主题,本质上是因为传感器将向 Kafka 报告为“开启”或“关闭”。因此,有了时间戳、传感器名称和状态,就可以创建具有“开启”和“关闭”状态持续时间的新主题。

  1. 这在 KSQL 中是否可行,如何实现?
  2. 还是真的应该让消费者或流处理器来解决这个问题?

我的数据是这样的:

{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on} 

得到结果

{ 2019:02:15 00:30:00, sensor1, off, 30sec }. 

本质上必须结合多个传感器的状态来确定机器的组合状态。工厂中有数百个甚至数千个传感器

【问题讨论】:

  • 您能否举例说明您设想的数据流和预期输出?从您描述的内容中很难确定 KSQL 当前是否可以执行此操作。它基于 Kafka Streams 构建,因此如果您使用流处理器,您可以在 KSQL 中完成。
  • 类似这样的:{ 2019:02:15 00:00:00, sensor1, off} 然后下一条记录 { 2019:02:15 00:00:30, sensor1, on} 以获得结果{ 2019:02:15 00:30:00,传感器 1,关闭,30 秒}。本质上必须结合多个传感器的状态来确定机器的组合状态。工厂中有数百个甚至数千个传感器
  • 听起来你想要一个关于传感器名称+状态组合的会话窗口?根据 off/on 打开和关闭会话?
  • 首先感谢您的编辑。是的,基本上。会话窗口是基于时间的还是基于记录的?传入数据流包含多个传感器,但传感器状态的变化可能是几秒钟或几小时的频率。
  • 问题可以改写为:如何使用 kafka/ksql 实现 LAG 功能:SELECT sensor, difference = value - lag(value) FROM bla GROUP BY sensor?

标签: apache-kafka apache-kafka-streams ksqldb


【解决方案1】:

这在 Kafka Streams 中非常简单,所以我会选择 2。

首先,您必须正确地对输入数据进行建模。您的示例使用本地时间,这使得无法计算两个时间戳之间的持续时间。 Use something 喜欢epoch time

从像这样的源数据模型开始

interface SensorState {
  String getId();
  Instant getTime();
  State getState();
  enum State {
    OFF,
    ON
  }
}

和目标

interface SensorStateWithDurationX {
  SensorState getEvent();
  Duration getDuration();
}

现在您已经定义了输入和输出流(但请参阅“Data Types and Serialization”),您只需要通过简单地定义 ValueTransformer 来转换值(“Applying processors and transformers”)。

它必须做两件事:

  1. 检查状态存储以获取传感器的历史数据,并在必要时使用新数据进行更新

  2. 当历史数据可用时,计算时间戳之间的差异,并将数据与计算的持续时间一起发出

class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
  KeyValueStore<String, SensorState> store;

  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
    this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
  }

  public SensorStateWithDuration transform(SensorState sensorState) {
    // Nothing to do
    if (sensorState == null) {
      return null;
    }

    // Check for the previous state, update if necessary
    var oldState = checkAndUpdateSensorState(sensorState);

    // When we have historical data, return duration so far. Otherwise return null
    return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
  }

  public void close() {}

  /**
   * Checks the state store for historical state based on sensor ID and updates it, if necessary.
   *
   * @param sensorState The new sensor state
   * @return The old sensor state
   */
  Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
    // The Sensor ID is our index
    var index = sensorState.getId();

    // Get the historical state (might be null)
    var oldState = store.get(index);
    if (neetToUpdate(oldState, sensorState)) {
      // Update the state store to the new state
      store.put(index, sensorState);
    }
    return Optional.ofNullable(oldState);
  }

  /**
   * Check if we need to update the state in the state store.
   *
   * <p>Either we have no historical data, or the state has changed.
   *
   * @param oldState The old sensor state
   * @param sensorState The new sensor state
   * @return Flag whether we need to update
   */
  boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
    return oldState == null || oldState.getState() != sensorState.getState();
  }

  /**
   * Wrap the old state with a duration how log it lasted.
   *
   * @param oldState The state of the sensor so far
   * @param sensorState The new state of the sensor
   * @return Wrapped old state with duration
   */
  SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
    var duration = Duration.between(oldState.getTime(), sensorState.getTime());
    return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
  }
}

将所有内容(“Connecting Processors and State Stores”)放在一个简单的Topology 中:

var builder = new StreamsBuilder();

// Our state store
var storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("SensorStates"),
        Serdes.String(),
        storeSerde);

// Register the store builder
builder.addStateStore(storeBuilder);

builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
    .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
    .to("result-topic", Produced.with(Serdes.String(), resultSerde));

var topology = builder.build();

完整的申请地址为github.com/melsicon/kafka-sensors

【讨论】:

    【解决方案2】:

    根据https://github.com/confluentinc/ksql/issues/2562 的想法使用自联接,我想出了以下解决方案:

    1. 创建数据
    #kafka-topics --bootstrap-server localhost:9092  --delete --topic temptest
    echo '{"temp": 3.0, "counter": 1}' | kafkacat -b localhost -t temptest
    echo '{"temp": 4.0, "counter": 2}' | kafkacat -b localhost -t temptest
    echo '{"temp": 6.0, "counter": 3}' | kafkacat -b localhost -t temptest
    echo '{"temp": 3.0, "counter": 4}' | kafkacat -b localhost -t temptest
    echo '{"temp": 3.1, "counter": 6}' | kafkacat -b localhost -t temptest
    echo '{"temp": 3.1, "counter": 5}' | kafkacat -b localhost -t temptest
    

    这里我们假设连续事件已经有一个 counter 属性。这种计数器也可以通过简单地聚合一段时间内的事件计数来添加到 ksql 中。

    1. 区分函数
    -- import the topic into ksql
    CREATE STREAM temp_json (ingesttime BIGINT, row VARCHAR, temp DOUBLE, counter INTEGER) WITH (kafka_topic='temptest', value_format='JSON', KEY='counter');
    
    --- change format to avro and repartion
    CREATE STREAM temp WITH (VALUE_FORMAT='AVRO') AS SELECT temp, counter, CAST(counter AS VARCHAR) as counter_key FROM temp_json PARTITION BY counter_key;
    
    --- create second stream with shifted counter
    CREATE STREAM temp_shift AS SELECT temp, counter as counter_orig, counter+ 1 as counter from temp PARTITION BY counter;
    
    -- join the streams by counter
    CREATE STREAM temp_diff AS SELECT
      prev.temp-cur.temp as temp_difference, cur.temp as temp,  prev.temp as prev_temp, cur.counter as counter
      FROM temp cur
      LEFT JOIN temp_shift prev WITHIN 2 HOURS
      ON cur.counter = prev.counter;
    

    测试一下

    ksql> SELECT * FROM temp_diff LIMIT 4;
    1574321370281 | 1 | null | 3.0 | null | 1
    1574321372307 | 2 | -1.0 | 4.0 | 3.0 | 2
    1574321372319 | 3 | -2.0 | 6.0 | 4.0 | 3
    1574321372331 | 4 | 3.0 | 3.0 | 6.0 | 4
    

    传感器本身被省略以保持解决方案的简单性。但是,可以通过使用复合键轻松添加分区,如https://www.confluent.io/stream-processing-cookbook/ksql-recipes/creating-composite-key中所述

    【讨论】:

    • 您没有使用问题中的输入格式,而是您编造的东西 - “计数器”(它不存在,您必须作为解决方案的一部分提供)并且您假设存在只是一个传感器,我看不出它是如何“轻松”采用的。此外,使用双精度计算时间会导致各种舍入错误。
    • 感谢您的反馈。确实格式不同。计数器和传感器分组的缺乏都在周围的描述中得到解决。 double 不用于时间,而是用于传感器读数(温度)计算,所以我希望舍入误差可以忽略不计。
    猜你喜欢
    • 1970-01-01
    • 2020-08-09
    • 2020-07-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多