【发布时间】:2021-11-11 13:59:56
【问题描述】:
对于 1:1 加入,我使用 KeyedCoProcessFunction,我有两个流,查找流(每秒 100 条记录)和点击流流(每秒 10000 条记录)。在processElement2 方法中,我正在寻找MapState<Long,Row> 中的密钥,如果找到,则用它来丰富点击流数据,否则将此记录设置为侧输出,然后将侧输出下沉到kafka。我没有在两个输入流上使用任何窗口。对于 kakfa 中的 dlq 主题,我不断看到每秒产生 1-2 条记录,我如何才能在将其推送到侧面输出之前以某种方式等待 processElement2 方法中的查找 id 几毫秒。
val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
.connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
.process(new EnrichJoinFunction());
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
@Override
public void open(Configuration parameters) throws Exception {
val MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
val id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
val id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not yet arrived, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
}}
【问题讨论】:
标签: apache-flink flink-streaming