【问题标题】:How to use non-keyed state with Kafka Consumer in Flink?如何在 Flink 中对 Kafka Consumer 使用非键控状态?
【发布时间】:2024-01-22 03:24:02
【问题描述】:

我正在尝试在 KafkaConsumer 对象中实现(刚开始使用 Java 和 Flink)一个非键控状态,因为在这个阶段没有调用 keyBy()。该对象是前端,也是处理来自 Kafka 的消息的第一个模块。 SourceOutput 是代表消息的 proto 文件。

我有 KafkaConsumer 对象:

public class KafkaSourceFunction extends ProcessFunction<byte[], SourceOutput> implements Serializable
{
    @Override
    public void processElement(byte[] bytes, ProcessFunction<byte[], SourceOutput>.Context 
         context, Collector<SourceOutput> collector) throws Exception
    {
          // Here, I want to call to sorting method
          collector.collect(output);
    }
}

我有一个对象 (KafkaSourceSort) 进行所有排序,并且应该将无序消息保持在 priorityQ 的状态中,并且如果消息以正确的顺序通过收集器,还负责传递消息。

class SessionInfo
{
    public PriorityQueue<SourceOutput>  orderedMessages = null;

    public void putMessage(SourceOutput Msg)
    {
        if(orderedMessages == null)
            orderedMessages = new PriorityQueue<SourceOutput>(new SequenceComparator());

        orderedMessages.add(Msg);
    }
}

public class KafkaSourceState  implements Serializable
{
    public TreeMap<String, SessionInfo> Sessions = new TreeMap<>();
}

我读到我需要使用非键控状态 (ListState),它应该包含会话映射,而每个会话都包含一个 priorityQ,其中包含与该会话相关的所有消息。

我找到了一个例子,所以我实现了这个:

public class KafkaSourceSort implements SinkFunction<KafkaSourceSort>,
        CheckpointedFunction
{
    private transient ListState<KafkaSourceState> checkpointedState;
    private KafkaSourceState state;

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception
    {
        checkpointedState.clear();
        checkpointedState.add(state);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception
    {
        ListStateDescriptor<KafkaSourceState> descriptor =
                new ListStateDescriptor<KafkaSourceState>(
                        "KafkaSourceState",
                        TypeInformation.of(new TypeHint<KafkaSourceState>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored())
        {
            state = (KafkaSourceState) checkpointedState.get();
        }
    }

    @Override
    public void invoke(KafkaSourceState value, SinkFunction.Context contex) throws Exception
    {
        state = value;

       // ...
    }
}

我看到我需要实现一个调用消息,它可能会从 processElement() 调用,但是 invoke() 的签名不包含收集器,我不明白该怎么做,即使我做到了到现在还可以。

请,我们将不胜感激。 谢谢。

【问题讨论】:

    标签: java apache-kafka state apache-flink


    【解决方案1】:

    SinkFunction 是 DAG 中的终端节点,它是您的作业图。它的接口中没有Collector,因为它不能向下游发出任何东西。预计会连接到外部服务或数据存储并在那里发送数据。

    如果您分享更多关于您想要完成的工作的信息,也许我们可以提供更多帮助。可能有更简单的方法来解决这个问题。

    【讨论】:

    • 感谢您的回复。该程序的目的是从 Kafka 获取许多会话的消息,并在发送它们之前对其进行排序。在无序流的情况下,我需要将消息保持在状态(因为它们将从 Kafka 中删除),直到再次设置顺序,然后,我需要转发所有有序消息并清理状态。如前所述,在这个阶段我不调用 keyBy() 因此我不能使用 ValueState
    最近更新 更多