【发布时间】:2024-01-22 03:24:02
【问题描述】:
我正在尝试在 KafkaConsumer 对象中实现(刚开始使用 Java 和 Flink)一个非键控状态,因为在这个阶段没有调用 keyBy()。该对象是前端,也是处理来自 Kafka 的消息的第一个模块。 SourceOutput 是代表消息的 proto 文件。
我有 KafkaConsumer 对象:
public class KafkaSourceFunction extends ProcessFunction<byte[], SourceOutput> implements Serializable
{
@Override
public void processElement(byte[] bytes, ProcessFunction<byte[], SourceOutput>.Context
context, Collector<SourceOutput> collector) throws Exception
{
// Here, I want to call to sorting method
collector.collect(output);
}
}
我有一个对象 (KafkaSourceSort) 进行所有排序,并且应该将无序消息保持在 priorityQ 的状态中,并且如果消息以正确的顺序通过收集器,还负责传递消息。
class SessionInfo
{
public PriorityQueue<SourceOutput> orderedMessages = null;
public void putMessage(SourceOutput Msg)
{
if(orderedMessages == null)
orderedMessages = new PriorityQueue<SourceOutput>(new SequenceComparator());
orderedMessages.add(Msg);
}
}
public class KafkaSourceState implements Serializable
{
public TreeMap<String, SessionInfo> Sessions = new TreeMap<>();
}
我读到我需要使用非键控状态 (ListState),它应该包含会话映射,而每个会话都包含一个 priorityQ,其中包含与该会话相关的所有消息。
我找到了一个例子,所以我实现了这个:
public class KafkaSourceSort implements SinkFunction<KafkaSourceSort>,
CheckpointedFunction
{
private transient ListState<KafkaSourceState> checkpointedState;
private KafkaSourceState state;
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception
{
checkpointedState.clear();
checkpointedState.add(state);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception
{
ListStateDescriptor<KafkaSourceState> descriptor =
new ListStateDescriptor<KafkaSourceState>(
"KafkaSourceState",
TypeInformation.of(new TypeHint<KafkaSourceState>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored())
{
state = (KafkaSourceState) checkpointedState.get();
}
}
@Override
public void invoke(KafkaSourceState value, SinkFunction.Context contex) throws Exception
{
state = value;
// ...
}
}
我看到我需要实现一个调用消息,它可能会从 processElement() 调用,但是 invoke() 的签名不包含收集器,我不明白该怎么做,即使我做到了到现在还可以。
请,我们将不胜感激。 谢谢。
【问题讨论】:
标签: java apache-kafka state apache-flink