【发布时间】:2020-10-09 18:06:13
【问题描述】:
我的场景是,我想根据另一个流输入调用一个流。两种流类型不同。以下是我的示例代码。我想在收到来自 Kafka 流的消息时触发一个流。
当应用程序启动时,我可以从数据库中读取数据。然后我又想根据一些 kafka 消息从 DB 获取数据。当我在流中收到 kafka 消息时,我想再次从 DB 获取数据。这是我的实际用例。
如何做到这一点?有可能吗?
public class DataStreamCassandraExample implements Serializable{
private static final long serialVersionUID = 1L;
static Logger LOG = LoggerFactory.getLogger(DataStreamCassandraExample.class);
private transient static StreamExecutionEnvironment env;
static DataStream<Tuple4<UUID,String,String,String>> inputRecords;
public static void main(String[] args) throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool argParameters = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(argParameters);
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("testtopic", new SimpleStringSchema(), kafkaProps);
ClusterBuilder cb = new ClusterBuilder() {
private static final long serialVersionUID = 1L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1")
.withPort(9042)
.withoutJMXReporting()
.build();
}
};
CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat =
new CassandraInputFormat<> ("select * from employee_details", cb);
//While Application is start up , Read data from table and send as stream
inputRecords = getDBData(env,cassandraInputFormat);
// If any data comes from kafka means, again i want to get data from table.
//How to i trigger getDBData() method from inside this stream.
//The below code is not working
DataStream<String> inputRecords1= env.addSource(kafkaConsumer)
.map(new MapFunction<String,String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
inputRecords = getDBData(env,cassandraInputFormat);
return "OK";
}
});
//This is not printed , when i call getDBData() stream from inside the kafka stream.
inputRecords1.print();
DataStream<Employee> empDataStream = inputRecords.map(new MapFunction<Tuple4<UUID,String,String,String>, Tuple2<String,Employee>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Employee> map(Tuple4<UUID,String,String,String> value) throws Exception {
Employee emp = new Employee();
try{
emp.setEmpid(value.f0);
emp.setFirstname(value.f1);
emp.setLastname(value.f2);
emp.setAddress(value.f3);
}
catch(Exception e){
}
return new Tuple2<>(emp.getEmpid().toString(), emp);
}
}).keyBy(0).map(new MapFunction<Tuple2<String,Employee>,Employee>() {
private static final long serialVersionUID = 1L;
@Override
public Employee map(Tuple2<String, Employee> value)
throws Exception {
return value.f1;
}
});
empDataStream.print();
env.execute();
}
private static DataStream<Tuple4<UUID,String,String,String>> getDBData(StreamExecutionEnvironment env,
CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat){
DataStream<Tuple4<UUID,String,String,String>> inputRecords = env
.createInput
(cassandraInputFormat
,TupleTypeInfo.of(new TypeHint<Tuple4<UUID,String,String,String>>() {}));
return inputRecords;
}
}
【问题讨论】:
-
你能详细说明一下具体的用例吗?
-
我需要从 casandra 获取数据取决于从另一个流接收到的输入。 IE。我将定期从 kafka 主题(Stream)接收 emp 编号,并且取决于 empno Flink 作业需要从另一个选项卡获取历史数据需要从 casandra 表中获取数据,如消息中所示。在这种情况下,我们需要调用 casandra 从 kafka(输入流)获取流。
-
但是,您是在获取数据后以任何方式加入获取的数据还是将其保存在某个地方?
-
没有。我没有加入。因为两种数据类型不同。我需要第一个 kafka 流数据作为下一个流的过滤器,而不是将其存储在任何地方。
-
有很多方法可以解决这个问题。 youtube.com/watch?v=cJS18iKLUIY是对整体话题的优秀介绍,还有代码示例here。
标签: java apache-kafka cassandra stream apache-flink