【问题标题】:Apache Flink : How to Call One Stream from Another StreamApache Flink:如何从另一个流调用一个流
【发布时间】:2020-10-09 18:06:13
【问题描述】:

我的场景是,我想根据另一个流输入调用一个流。两种流类型不同。以下是我的示例代码。我想在收到来自 Kafka 流的消息时触发一个流。

当应用程序启动时,我可以从数据库中读取数据。然后我又想根据一些 kafka 消息从 DB 获取数据。当我在流中收到 kafka 消息时,我想再次从 DB 获取数据。这是我的实际用例。

如何做到这一点?有可能吗?



public class DataStreamCassandraExample implements Serializable{

   private static final long serialVersionUID = 1L;

   static Logger LOG = LoggerFactory.getLogger(DataStreamCassandraExample.class);

   private transient static StreamExecutionEnvironment env;
    static DataStream<Tuple4<UUID,String,String,String>> inputRecords;

        public static void main(String[] args) throws Exception {
             env = StreamExecutionEnvironment.getExecutionEnvironment();

            ParameterTool argParameters = ParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(argParameters);

               Properties kafkaProps = new Properties();
               kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
               kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");

               FlinkKafkaConsumer<String> kafkaConsumer =  new FlinkKafkaConsumer<>("testtopic", new SimpleStringSchema(), kafkaProps);


               ClusterBuilder cb = new ClusterBuilder() {

               private static final long serialVersionUID = 1L;

                   @Override
                   public Cluster buildCluster(Cluster.Builder builder) {
                       return builder.addContactPoint("127.0.0.1")
                               .withPort(9042)
                               .withoutJMXReporting()
                               .build();
                   }
               };

               CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat =
                       new CassandraInputFormat<> ("select * from employee_details", cb);

               //While Application is start up , Read data from table and send as stream
               inputRecords = getDBData(env,cassandraInputFormat);

               // If any data comes from kafka means, again i want to get data from table.
               //How to i trigger getDBData() method from inside this stream.
               //The below code is not working
               DataStream<String> inputRecords1= env.addSource(kafkaConsumer)
                           .map(new MapFunction<String,String>() {
                               private static final long serialVersionUID = 1L;

                               @Override
                               public String map(String value) throws Exception {
                                   inputRecords =  getDBData(env,cassandraInputFormat);
                                   return "OK";
                               }
                           });

               //This is not printed , when i call getDBData() stream from inside the kafka stream.
               inputRecords1.print();


                DataStream<Employee> empDataStream = inputRecords.map(new MapFunction<Tuple4<UUID,String,String,String>, Tuple2<String,Employee>>() {
                       private static final long serialVersionUID = 1L;

                       @Override
                       public Tuple2<String, Employee> map(Tuple4<UUID,String,String,String> value) throws Exception {
                           Employee emp = new Employee();
                           try{
                           emp.setEmpid(value.f0);
                           emp.setFirstname(value.f1);
                           emp.setLastname(value.f2);
                           emp.setAddress(value.f3);

                           }
                           catch(Exception e){
                           }

                           return new Tuple2<>(emp.getEmpid().toString(), emp);
                       }
                   }).keyBy(0).map(new MapFunction<Tuple2<String,Employee>,Employee>() {

                       private static final long serialVersionUID = 1L;

                       @Override
                       public Employee map(Tuple2<String, Employee> value)
                               throws Exception {
                           return value.f1;
                       }   


                   });

             empDataStream.print();

                env.execute();
        }


        private static  DataStream<Tuple4<UUID,String,String,String>> getDBData(StreamExecutionEnvironment env,
                                                                   CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat){

            DataStream<Tuple4<UUID,String,String,String>> inputRecords = env
                    .createInput
                    (cassandraInputFormat   
                    ,TupleTypeInfo.of(new TypeHint<Tuple4<UUID,String,String,String>>() {}));
           return inputRecords;

        }          
}



【问题讨论】:

  • 你能详细说明一下具体的用例吗?
  • 我需要从 casandra 获取数据取决于从另一个流接收到的输入。 IE。我将定期从 kafka 主题(Stream)接收 emp 编号,并且取决于 empno Flink 作业需要从另一个选项卡获取历史数据需要从 casandra 表中获取数据,如消息中所示。在这种情况下,我们需要调用 casandra 从 kafka(输入流)获取流。
  • 但是,您是在获取数据后以任何方式加入获取的数据还是将其保存在某个地方?
  • 没有。我没有加入。因为两种数据类型不同。我需要第一个 kafka 流数据作为下一个流的过滤器,而不是将其存储在任何地方。
  • 有很多方法可以解决这个问题。 youtube.com/watch?v=cJS18iKLUIY是对整体话题的优秀介绍,还有代码示例here

标签: java apache-kafka cassandra stream apache-flink


【解决方案1】:

这将是一个非常冗长的答案。

要正确使用 Flink 作为开发者,你需要了解它的基本概念。我建议您从架构概述 (https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html) 开始,它包含了您从编程开始进入 Flink 世界所需了解的所有内容。

现在,查看您的代码,由于 Flink 读取它的方式,它不应该按照您的预期进行。您需要了解 Flink 在执行您的代码时至少有两个大步骤:首先它构建一个执行图,该图仅描述它需要做什么。这发生在作业经理级别。第二个大步骤是要求一个或多个工人执行图表。这两个步骤是连续的,您对图表描述所做的任何事情都必须在作业管理器级别完成,而不是在您的操作内部。

在你的情况下,图表有:

  • Kafak 来源。
  • 将在工作人员级别调用 getDBData() 的映射(不好,因为 getDBData() 每次调用时都会通过添加一个新输入来更改图形)。

inputRecords = getDBData(env,cassandraInputFormat); 行将创建图的孤立分支。并且DataStream&lt;Employee&gt; empDataStream = inputRecords.map... 行会将map-&gt;keyBy-&gt;map 的一个分支附加到该孤立分支。这将构建图表的一部分,该部分将从 Cassandra 读取所有员工记录并应用 map-&gt;keyBy-&gt;map 转换。这不会以任何方式与 Kafka 源链接。

现在让我们回到您的需求。我了解当员工的 id 来自 Kafka 并进行一些操作时,您需要为他/她获取数据。

处理此问题的最简洁的方法称为侧输入。这是您在构建图表时声明的数据输入,作业管理器处理数据的读取并将其传输给工作人员。坏消息是 Side Inputs 还不能用于 Flink 中的流式作业(https://issues.apache.org/jira/browse/FLINK-2491 - 这个错误会导致流式作业无法创建检查点,因为侧输入很快完成,这会使作业处于奇怪的状态)。

话虽如此,您还有另外三个选择。正确的选项取决于您的员工 cassandra 表的大小。

第二个选项是将所有员工加载到静态最终变量employees 并在您的地图函数中使用它。这种方法的不利之处在于,作业管理器会将此变量的序列化副本发送给所有工作人员,这可能会阻塞您的网络,也可能会使 RAM 过载。如果 table 的大小很小并且将来不应该变大,那么在 Side Inputs 用于流式作业之前,这可能是一个可以接受的解决方法。如果表的大小很大或者应该在未来发展,那么考虑第三种选择。

第三个选项是对第二个选项的改进。它使用 Flink 的广播变量(参见 https://flink.apache.org/2019/06/26/broadcast-state.htmlhttps://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html)。简短的故事:它和以前一样,但更好的转移管理。 Flink 会找到存储变量并将其发送给工作人员的最佳方式。这种方法虽然正确实施有点复杂。

通常情况下不建议使用最后一个选项。它只是在您的地图操作中调用 Cassandra。这不是一个好的做法,因为它会增加所有地图执行的重复延迟(调用的次数与通过 Kafka 的项目一样多)。调用意味着创建连接,即带有查询的实际请求并等待 Cassandra 回复并释放连接。对于图表中的一个步骤,这可能需要大量工作。当您确实找不到任何替代方案时,这是一个可以考虑的解决方案。

对于您的情况,我会建议第三种选择。我想员工表应该不会很大,使用广播变量是一个不错的选择。

【讨论】:

    猜你喜欢
    • 2018-06-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-06-22
    相关资源
    最近更新 更多