【问题标题】:How to filter Apache flink stream on the basis of other?如何在其他基础上过滤Apache flink流?
【发布时间】:2017-09-17 03:35:51
【问题描述】:

我有两个流,一个是 Int ,另一个是 json 。在 json Schema 中有一个 key 是一些 int 。所以我需要通过与另一个整数流的键比较来过滤 json 流,所以有可能吗在 Flink 中?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    是的,你可以用 Flink 做这种流处理。您需要 Flink 的基本构建块是连接的流和有状态的函数——这是一个使用 RichCoFlatMap 的示例:

    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
    import org.apache.flink.util.Collector;
    
    public class Connect {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Event> control = env.fromElements(
                    new Event(17),
                    new Event(42))
                    .keyBy("key");
    
            DataStream<Event> data = env.fromElements(
                    new Event(2),
                    new Event(42),
                    new Event(6),
                    new Event(17),
                    new Event(8),
                    new Event(42)
                    )
                    .keyBy("key");
    
            DataStream<Event> result = control
                    .connect(data)
                    .flatMap(new MyConnectedStreams());
    
            result.print();
    
            env.execute();
        }
    
        static final class MyConnectedStreams
                extends RichCoFlatMapFunction<Event, Event, Event> {
    
            private ValueState<Boolean> seen = null;
    
            @Override
            public void open(Configuration config) {
                ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
                        // state name
                        "have-seen-key",
                        // type information of state
                        TypeInformation.of(new TypeHint<Boolean>() {
                        }));
                seen = getRuntimeContext().getState(descriptor);
            }
    
            @Override
            public void flatMap1(Event control, Collector<Event> out) throws Exception {
                seen.update(Boolean.TRUE);
            }
    
            @Override
            public void flatMap2(Event data, Collector<Event> out) throws Exception {
                if (seen.value() == Boolean.TRUE) {
                    out.collect(data);
                }
            }
        }
    
    
        public static final class Event {
            public Event() {
            }
    
            public Event(int key) {
                this.key = key;
            }
    
            public int key;
    
            public String toString() {
                return String.valueOf(key);
            }
        }
    }
    

    在这个例子中,只有那些在控制流上看到的键被传递到数据流中——所有其他事件都被过滤掉了。我利用了Flink's managed keyed stateconnected streams

    为了简单起见,我忽略了您对数据流具有 JSON 的要求,但您可以在其他地方找到如何使用 JSON 和 Flink 的示例。

    请注意,您的结果将是不确定的,因为您无法控制两个流相对于彼此的时间。您可以通过向流中添加事件时间时间戳来管理这一点,然后改用 RichCoProcessFunction。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-05-04
      • 2018-03-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多