【发布时间】:2021-11-18 13:52:29
【问题描述】:
我在连接的流上使用 KeyedCoProcessFunction,两个流都由id 键控,如果键不存在,我也在使用 MapState 并放置一个类型为 list 的值,并且我正在检查是否存在processElement2 中的密钥也是如此,所以理想情况下没有 NPE 的机会,但仍然得到它。
val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
.connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
.process(new EnrichJoinFunction());
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
private MapState<Long, List<Row>> clickstreamState = null;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Long, Row> MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
map = getRuntimeContext().getMapState(MapStateDescriptor);
MapStateDescriptor<Long, List<Row>> clickstreamStateMapStateDescriptor =
new MapStateDescriptor<Long, List<Row>>(
"clickstreamState",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<List<Row>>() {}));
clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(5).build());
clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
Long id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
// join immediately any matching click events, waiting for counterpart
if (clickstreamState.contains(id)) {
for (Row curRow : clickstreamState.get(id)) {
// enrich join
Row joinRow = join(clickstreamState.get(id), lookupRow);
out.collect(joinRow);
}
clickstreamState.remove(id);
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
Long id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
Row joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
if (clickstreamState.contains(id)) {
List<Row> rows = clickstreamState.get(id);
if (rows != null) {
rows.add(clickRow);
} else {
throw new NullPointerException("This exception should never throw NPE");
}
} else {
val clickList = new ArrayList<Row>();
clickList.add(clickRow);
clickstreamState.put(id, clickList);
}
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
}
【问题讨论】:
-
能否提供例外情况?它发生在哪一行?它多久发生一次?一种猜测可能是状态 ttl 在
contains和get调用之间过期。但是,这不太可能 -
throw new NullPointerException("这个异常永远不应该抛出 NPE");
-
理想情况下,由于在 else 块中,我正在初始化数组列表,因此 get 值在任何时候都不应该为 null。
-
你能在
processElement1中尝试用clickstreamState.put(id, new ArrayList<Row>());代替clickstreamState.remove(id);吗? -
同样的错误,我应该尝试使用线程安全列表而不是 ArrayList
标签: apache-flink flink-streaming