【问题标题】:How to use historical dataset for enriching Flink DataStream如何使用历史数据集丰富 Flink DataStream
【发布时间】:2019-05-28 12:51:50
【问题描述】:

我正在使用 Flink 进行一个实时项目,我需要通过先前的交易来丰富每张卡的状态,以计算交易功能如下:

对于每张卡,我都有一个计算过去 24 小时内交易次数的功能。另一方面,我有 2 个数据源:

首先,一个数据库表,它存储卡的交易直到昨天结束。

第二,今天的交易流。

所以第一步是从数据库中获取每张卡的昨天交易并将它们存储在卡状态中。然后第二步是用今天上线的交易更新这个状态,并计算过去 24 小时内的交易数量。 我尝试将数据库数据作为流读取并将其连接到今天的事务。因此,为了达到上述目标,我使用了 RichFlatMap 函数。但是,由于数据库数据本身不是流式传输的,因此输出不正确。 RichFlatMap 函数如下:

transactionsHistory.connect(transactionsStream).flatMap(new         
RichCoFlatMapFunction<History, Tuple2<String, Transaction>,         
ExtractedFeatures>() {
    private ValueState<History> history;
    @Override
    public void open(Configuration config) throws Exception {
        this.history = getRuntimeContext().getState(new 
    ValueStateDescriptor<>("card history", History.class));
    }
    //historical data 
    @Override
    public void flatMap1(History history, 
    Collector<ExtractedFeatures> collector) throws Exception {
        this.history.update(history);
    }
    //new transactions from stream 
    @Override
    public void flatMap2(Tuple2<String, Transaction> 
    transactionTuple, Collector<ExtractedFeatures> collector) throws 
    Exception {
        History history = this.history.value();
        Transaction transaction = transactionTuple.f1;
        ArrayList<History> prevDayHistoryList = 
        history.prevDayTransactions;

        // This function returns transactions which are in 24 hours 
        //window of the current transaction and their count.
        Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple = 
        findHistoricalDate(prevDayHistoryList,
                transaction.transactionLocalDate);
        prevDayHistoryList = prevDayHistoryTuple.f0;
        history.prevDayTransactions = prevDayHistoryList;
        this.history.update(history);
        ExtractedFeatures ef = new ExtractedFeatures();
        ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
        collector.collect(ef);
    }
});

在 Flink 流程序中实现上述丰富需求的正确设计模式是什么? 我发现堆栈溢出的打击问题与我的问题相似,但我无法解决我的问题,所以我决定寻求帮助:)

Enriching DataStream using static DataSet in Flink streaming

任何帮助将不胜感激。

【问题讨论】:

    标签: bigdata apache-flink data-analysis flink-streaming


    【解决方案1】:

    但是,由于数据库数据本身不是流式传输的,因此输出不正确。

    当然可以使用来自关系数据库的信息来丰富流数据。然而,棘手的是要以某种方式保证丰富数据在需要之前被摄取。通常,您可能需要缓冲要丰富的流,直到丰富数据被引导/摄取。例如,有时采用的一种方法是

    1. 在禁用要丰富的流的情况下运行应用程序
    2. 在富集数据被完全摄取并以 flink 状态存储后创建一个保存点
    3. 从保存点重新启动应用并启用要丰富的流

    但是,在您描述的情况下,似乎更简单的方法会起作用。如果您只需要 24 小时的历史数据,那么为什么不忽略历史交易数据库呢?只需运行您的应用程序,直到它看到 24 小时的流数据,之后历史数据库就变得无关紧要了。

    但是,如果您必须提取历史数据,并且您不喜欢上面概述的基于保存点的方法,那么还有其他几种可能性:

    • 在 flink 状态(例如 ListState 或 MapState)中缓冲未丰富的事件,直到提取历史流为止
    • 编写一个自定义 SourceFunction,在提取历史数据之前阻止主流

    有关此主题的更深入探索,请参阅Bootstrapping State In Apache Flink

    计划在未来的版本中更好地支持此用例,顺便说一句。

    【讨论】:

    • 亲爱的@David,你能告诉我如何缓冲流吗?谢谢。
    • 我已经更新了我的答案,以提供有时使用的其他解决方案。
    • @DavidAnderson 我在处理类似问题时偶然发现了您的答案,我想知道您所说的“在 Flink 状态中缓冲未丰富的事件直到摄取历史流”是什么意思,您能否详细说明多一点?
    • @Scrotch 您将使用 RichCoFlatMap 或 KeyedCoProcessFunction 进行扩充。在处理尚无法扩充的事件时,将其存储在 ListState 或 MapState 中,直到必要的扩充数据可用为止。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-02-11
    • 2020-06-29
    • 2011-07-26
    • 1970-01-01
    相关资源
    最近更新 更多