【问题标题】:Flink connect streams using KeyedCoProcessFunctionFlink 使用 KeyedCoProcessFunction 连接流
【发布时间】:2021-11-11 13:59:56
【问题描述】:

对于 1:1 加入,我使用 KeyedCoProcessFunction,我有两个流,查找流(每秒 100 条记录)和点击流流(每秒 10000 条记录)。在processElement2 方法中,我正在寻找MapState<Long,Row> 中的密钥,如果找到,则用它来丰富点击流数据,否则将此记录设置为侧输出,然后将侧输出下沉到kafka。我没有在两个输入流上使用任何窗口。对于 kakfa 中的 dlq 主题,我不断看到每秒产生 1-2 条记录,我如何才能在将其推送到侧面输出之前以某种方式等待 processElement2 方法中的查找 id 几毫秒。

val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
            .connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
            .process(new EnrichJoinFunction());
public static class EnrichJoinFunction
      extends KeyedCoProcessFunction<Long, Row, Row, Row> {


    final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

    private MapState<Long, Row> map = null;

    @Override
    public void open(Configuration parameters) throws Exception {
      val MapStateDescriptor =
          new MapStateDescriptor<Long, Row>(
              "state",
              TypeInformation.of(Long.class),
              TypeInformation.of(new TypeHint<Row>() {}));
      MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
      /*MapStateDescriptor.setQueryable("test");*/
      map = getRuntimeContext().getMapState(MapStateDescriptor);
    }

    @Override
    public void processElement1(
        Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
      val id = lookupRow.<Long>getFieldAs("id");
      if (!map.contains(id)) {
        map.put(id, lookupRow);
      }
    }

    @Override
    public void processElement2(
        Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

      val id = clickRow.<Long>getFieldAs("id");

      if (map.contains(id)) {
          // enrich join
          val joinRow = join(clickRow, map.get(id));
          out.collect(joinRow);
      } else {
        // lookup entry not yet arrived, send it to side output - dlq
        ctx.output(outputTag, clickRow);
      }
    }

    public Row join(Row clickRow, Row lookupRow) throws ParseException {
      Row joinedRow = new Row(RowKind.INSERT, 13);
      // row setter join ouput
      return joinedRow;
    }
}}

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    您可以使用TimerService 来实现此目的。

    因此,我们的想法是将没有立即匹配的查找数据的点击流行存储在专用的MapState&lt;Long,Row&gt; 中并注册processingTimeTimer/eventTimeTimer 计时器,该计时器将在一段时间后触发。在计时器回调时,您可以尝试在那里加入查找数据和点击流数据。如果再次找不到匹配项,则最后将此单击事件发送到侧输出。

    它可能如下所示:

    public static class EnrichJoinFunction
          extends KeyedCoProcessFunction<Long, Row, Row, Row> {
    
    
        final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
    
        private MapState<Long, Row> map = null;
        private MapState<Long, Row> clickstreamState = null;
    
        @Override
        public void open(Configuration parameters) throws Exception {
          MapStateDescriptor<Long, Row> MapStateDescriptor =
              new MapStateDescriptor<Long, Row>(
                  "state",
                  TypeInformation.of(Long.class),
                  TypeInformation.of(new TypeHint<Row>() {}));
          MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
          /*MapStateDescriptor.setQueryable("test");*/
          map = getRuntimeContext().getMapState(MapStateDescriptor);
    
          MapStateDescriptor<Long, Row> clickstreamStateMapStateDescriptor =
              new MapStateDescriptor<Long, Row>(
                  "clickstreamState",
                  TypeInformation.of(Long.class),
                  TypeInformation.of(new TypeHint<Row>() {}));
          clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(1)).build());
          clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
        }
    
        @Override
        public void processElement1(
            Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
            throws Exception {
          log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
          Long id = lookupRow.<Long>getFieldAs("id");
          if (!map.contains(id)) {
            map.put(id, lookupRow);
          }
    
          // join immediately any matching click events, waiting for counterpart
          if (clickstreamState.contains(id)) {
              // enrich join
              Row joinRow = join(clickstreamState.get(id), lookupRow);
              out.collect(joinRow);
              clickstreamState.remove(id)
          } 
        }
    
        @Override
        public void processElement2(
            Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
            throws Exception {
          log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
    
          Long id = clickRow.<Long>getFieldAs("id");
    
          if (map.contains(id)) {
              // enrich join
              Row joinRow = join(clickRow, map.get(id));
              out.collect(joinRow);
          } else {
            // put in state and check in 1 second
            clickstreamState.put(id, clickRow)
            Long currTimestamp = ctx.timestamp()
            ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
          }
        }
    
        public Row join(Row clickRow, Row lookupRow) throws ParseException {
          Row joinedRow = new Row(RowKind.INSERT, 13);
          // row setter join ouput
          return joinedRow;
        }
    
        @Override
        public void onTimer(
          Long timestamp,
          KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out
        ) {
           Long id = ctx.getCurrentKey
           Row clickRow = clickstreamState.get(id)
           if (map.contains(id)) {
              // enrich join
              val joinRow = join(clickRow, map.get(id));
              out.collect(joinRow);
           } else {
              // lookup entry not arrived even in 1 second, send it to side output - dlq
              ctx.output(outputTag, clickRow);
           }
           clickstreamState.remove(id)
      }
    }}
    

    【讨论】:

    • Long currTimestamp = ctx.timestamp(),我得到 null,因为输入流没有水印,我可以使用 System.currentTimeMillis 吗?
    • @gauravmiglani,我会说是的,以防你最终得到处理时间蒸汽特性,因为它依赖于本地机器的时间
    • 酷,还有一件事,即使在一秒钟后,如果查找条目尚未到达,我不想将其发送到 dlq,而是注册另一个计时器以在 1 秒内再次检查,而不是ctx.output(outputTag, clickRow),我可以在onTimer方法里面写ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
    • 说实话,我还没有这样做,但你可以试试,它可能会奏效。或者,如果您无论如何都要加入它,我会建议您最初注册一个计时器很长时间以获得您可以等待的最长时间,但同时在processElement1 函数中添加一个检查以匹配点击事件state - 如果存在,则加入processElement1 并从状态中删除计时器和事件。这样您就不需要不断地重新触发计时器。
    • 我不明白,你能通过代码编辑解释一下吗
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-01
    • 2022-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多