【发布时间】:2019-05-28 12:51:50
【问题描述】:
我正在使用 Flink 进行一个实时项目,我需要通过先前的交易来丰富每张卡的状态,以计算交易功能如下:
对于每张卡,我都有一个计算过去 24 小时内交易次数的功能。另一方面,我有 2 个数据源:
首先,一个数据库表,它存储卡的交易直到昨天结束。
第二,今天的交易流。
所以第一步是从数据库中获取每张卡的昨天交易并将它们存储在卡状态中。然后第二步是用今天上线的交易更新这个状态,并计算过去 24 小时内的交易数量。 我尝试将数据库数据作为流读取并将其连接到今天的事务。因此,为了达到上述目标,我使用了 RichFlatMap 函数。但是,由于数据库数据本身不是流式传输的,因此输出不正确。 RichFlatMap 函数如下:
transactionsHistory.connect(transactionsStream).flatMap(new
RichCoFlatMapFunction<History, Tuple2<String, Transaction>,
ExtractedFeatures>() {
private ValueState<History> history;
@Override
public void open(Configuration config) throws Exception {
this.history = getRuntimeContext().getState(new
ValueStateDescriptor<>("card history", History.class));
}
//historical data
@Override
public void flatMap1(History history,
Collector<ExtractedFeatures> collector) throws Exception {
this.history.update(history);
}
//new transactions from stream
@Override
public void flatMap2(Tuple2<String, Transaction>
transactionTuple, Collector<ExtractedFeatures> collector) throws
Exception {
History history = this.history.value();
Transaction transaction = transactionTuple.f1;
ArrayList<History> prevDayHistoryList =
history.prevDayTransactions;
// This function returns transactions which are in 24 hours
//window of the current transaction and their count.
Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple =
findHistoricalDate(prevDayHistoryList,
transaction.transactionLocalDate);
prevDayHistoryList = prevDayHistoryTuple.f0;
history.prevDayTransactions = prevDayHistoryList;
this.history.update(history);
ExtractedFeatures ef = new ExtractedFeatures();
ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
collector.collect(ef);
}
});
在 Flink 流程序中实现上述丰富需求的正确设计模式是什么? 我发现堆栈溢出的打击问题与我的问题相似,但我无法解决我的问题,所以我决定寻求帮助:)
Enriching DataStream using static DataSet in Flink streaming
任何帮助将不胜感激。
【问题讨论】:
标签: bigdata apache-flink data-analysis flink-streaming