【问题标题】:NPE in Flink KeyedCoProcessFunctionFlink KeyedCoProcessFunction 中的 NPE
【发布时间】:2021-11-18 13:52:29
【问题描述】:

我在连接的流上使用 KeyedCoProcessFunction,两个流都由id 键控,如果键不存在,我也在使用 MapState 并放置一个类型为 list 的值,并且我正在检查是否存在processElement2 中的密钥也是如此,所以理想情况下没有 NPE 的机会,但仍然得到它。

val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
        .connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
        .process(new EnrichJoinFunction());

public static class EnrichJoinFunction
  extends KeyedCoProcessFunction<Long, Row, Row, Row> {


final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

private MapState<Long, Row> map = null;
private MapState<Long, List<Row>> clickstreamState = null;

@Override
public void open(Configuration parameters) throws Exception {
  MapStateDescriptor<Long, Row> MapStateDescriptor =
      new MapStateDescriptor<Long, Row>(
          "state",
          TypeInformation.of(Long.class),
          TypeInformation.of(new TypeHint<Row>() {}));
  MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
  map = getRuntimeContext().getMapState(MapStateDescriptor);

  MapStateDescriptor<Long, List<Row>> clickstreamStateMapStateDescriptor =
      new MapStateDescriptor<Long, List<Row>>(
          "clickstreamState",
          TypeInformation.of(Long.class),
          TypeInformation.of(new TypeHint<List<Row>>() {}));
  clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(5).build());
  clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}

@Override
public void processElement1(
    Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
    throws Exception {
  log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
  Long id = lookupRow.<Long>getFieldAs("id");
  if (!map.contains(id)) {
    map.put(id, lookupRow);
  }

  // join immediately any matching click events, waiting for counterpart
  if (clickstreamState.contains(id)) {
    for (Row curRow : clickstreamState.get(id)) {
      // enrich join
      Row joinRow = join(clickstreamState.get(id), lookupRow);
      out.collect(joinRow);
    }
    clickstreamState.remove(id);
  } 
}

@Override
public void processElement2(
    Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
    throws Exception {
  log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

  Long id = clickRow.<Long>getFieldAs("id");

  if (map.contains(id)) {
      // enrich join
      Row joinRow = join(clickRow, map.get(id));
      out.collect(joinRow);
  } else {
    if (clickstreamState.contains(id)) {
      List<Row> rows = clickstreamState.get(id);
      if (rows != null) {
        rows.add(clickRow);
      } else {
        throw new NullPointerException("This exception should never throw NPE");
      }
    } else {
      val clickList = new ArrayList<Row>();
      clickList.add(clickRow);
      clickstreamState.put(id, clickList);
    }
  }
}

public Row join(Row clickRow, Row lookupRow) throws ParseException {
  Row joinedRow = new Row(RowKind.INSERT, 13);
  // row setter join ouput
  return joinedRow;
}
}

【问题讨论】:

  • 能否提供例外情况?它发生在哪一行?它多久发生一次?一种猜测可能是状态 ttl 在containsget 调用之间过期。但是,这不太可能
  • throw new NullPointerException("这个异常永远不应该抛出 NPE");
  • 理想情况下,由于在 else 块中,我正在初始化数组列表,因此 get 值在任何时候都不应该为 null。
  • 你能在processElement1 中尝试用clickstreamState.put(id, new ArrayList&lt;Row&gt;()); 代替clickstreamState.remove(id); 吗?
  • 同样的错误,我应该尝试使用线程安全列表而不是 ArrayList

标签: apache-flink flink-streaming


【解决方案1】:

从根本上说,我认为实施存在缺陷。当你的 processElement1processElement2 方法被调用时,操作符的状态已经被限定为键值。所以不需要MapState&lt;id, Row&gt;MapState&lt;id, List&lt;Row&gt;&gt; 状态,你只需要ValueState&lt;Row&gt;ListState&lt;Row&gt;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-14
    相关资源
    最近更新 更多