【发布时间】:2017-09-17 03:35:51
【问题描述】:
我有两个流,一个是 Int ,另一个是 json 。在 json Schema 中有一个 key 是一些 int 。所以我需要通过与另一个整数流的键比较来过滤 json 流,所以有可能吗在 Flink 中?
【问题讨论】:
标签: apache-flink flink-streaming
我有两个流,一个是 Int ,另一个是 json 。在 json Schema 中有一个 key 是一些 int 。所以我需要通过与另一个整数流的键比较来过滤 json 流,所以有可能吗在 Flink 中?
【问题讨论】:
标签: apache-flink flink-streaming
是的,你可以用 Flink 做这种流处理。您需要 Flink 的基本构建块是连接的流和有状态的函数——这是一个使用 RichCoFlatMap 的示例:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
public class Connect {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> control = env.fromElements(
new Event(17),
new Event(42))
.keyBy("key");
DataStream<Event> data = env.fromElements(
new Event(2),
new Event(42),
new Event(6),
new Event(17),
new Event(8),
new Event(42)
)
.keyBy("key");
DataStream<Event> result = control
.connect(data)
.flatMap(new MyConnectedStreams());
result.print();
env.execute();
}
static final class MyConnectedStreams
extends RichCoFlatMapFunction<Event, Event, Event> {
private ValueState<Boolean> seen = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
// state name
"have-seen-key",
// type information of state
TypeInformation.of(new TypeHint<Boolean>() {
}));
seen = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap1(Event control, Collector<Event> out) throws Exception {
seen.update(Boolean.TRUE);
}
@Override
public void flatMap2(Event data, Collector<Event> out) throws Exception {
if (seen.value() == Boolean.TRUE) {
out.collect(data);
}
}
}
public static final class Event {
public Event() {
}
public Event(int key) {
this.key = key;
}
public int key;
public String toString() {
return String.valueOf(key);
}
}
}
在这个例子中,只有那些在控制流上看到的键被传递到数据流中——所有其他事件都被过滤掉了。我利用了Flink's managed keyed state 和connected streams。
为了简单起见,我忽略了您对数据流具有 JSON 的要求,但您可以在其他地方找到如何使用 JSON 和 Flink 的示例。
请注意,您的结果将是不确定的,因为您无法控制两个流相对于彼此的时间。您可以通过向流中添加事件时间时间戳来管理这一点,然后改用 RichCoProcessFunction。
【讨论】: