这在 Kafka Streams 中非常简单,所以我会选择 2。
首先,您必须正确地对输入数据进行建模。您的示例使用本地时间,这使得无法计算两个时间戳之间的持续时间。 Use something 喜欢epoch time。
从像这样的源数据模型开始
interface SensorState {
String getId();
Instant getTime();
State getState();
enum State {
OFF,
ON
}
}
和目标
interface SensorStateWithDurationX {
SensorState getEvent();
Duration getDuration();
}
现在您已经定义了输入和输出流(但请参阅“Data Types and Serialization”),您只需要通过简单地定义 ValueTransformer 来转换值(“Applying processors and transformers”)。
它必须做两件事:
检查状态存储以获取传感器的历史数据,并在必要时使用新数据进行更新
当历史数据可用时,计算时间戳之间的差异,并将数据与计算的持续时间一起发出
class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
KeyValueStore<String, SensorState> store;
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
}
public SensorStateWithDuration transform(SensorState sensorState) {
// Nothing to do
if (sensorState == null) {
return null;
}
// Check for the previous state, update if necessary
var oldState = checkAndUpdateSensorState(sensorState);
// When we have historical data, return duration so far. Otherwise return null
return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
}
public void close() {}
/**
* Checks the state store for historical state based on sensor ID and updates it, if necessary.
*
* @param sensorState The new sensor state
* @return The old sensor state
*/
Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
// The Sensor ID is our index
var index = sensorState.getId();
// Get the historical state (might be null)
var oldState = store.get(index);
if (neetToUpdate(oldState, sensorState)) {
// Update the state store to the new state
store.put(index, sensorState);
}
return Optional.ofNullable(oldState);
}
/**
* Check if we need to update the state in the state store.
*
* <p>Either we have no historical data, or the state has changed.
*
* @param oldState The old sensor state
* @param sensorState The new sensor state
* @return Flag whether we need to update
*/
boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
return oldState == null || oldState.getState() != sensorState.getState();
}
/**
* Wrap the old state with a duration how log it lasted.
*
* @param oldState The state of the sensor so far
* @param sensorState The new state of the sensor
* @return Wrapped old state with duration
*/
SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
var duration = Duration.between(oldState.getTime(), sensorState.getTime());
return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
}
}
将所有内容(“Connecting Processors and State Stores”)放在一个简单的Topology 中:
var builder = new StreamsBuilder();
// Our state store
var storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("SensorStates"),
Serdes.String(),
storeSerde);
// Register the store builder
builder.addStateStore(storeBuilder);
builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
.transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
.to("result-topic", Produced.with(Serdes.String(), resultSerde));
var topology = builder.build();
完整的申请地址为github.com/melsicon/kafka-sensors。