【问题标题】:Flink Statefun connections to Flink Table APIFlink Statefun 连接到 Flink Table API
【发布时间】:2020-07-24 09:51:49
【问题描述】:

我们有兴趣从new Stateful Functions ???? 连接到常规 Flink Streaming 应用程序,最好使用 Table API。思路是从 Statefun 查阅在 Flink 中注册的表,这可能吗,正确的做法是什么?

到目前为止,我的想法是在一些主函数中初始化我的表流并注册一个有状态的函数提供程序以连接到表:

@AutoService(StatefulFunctionModule.class)
public class Module implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // ingest a DataStream from an external source
    DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

    // SQL query with an inlined (unregistered) table
    Table myTable = tableEnv.fromDataStream(ds, "user, product, amount");
    tableEnv.createTemporaryView("my_table", myTable);

    TableFunctionProvider tableProvider = new TableFunctionProvider();
    binder.bindFunctionProvider(FnEnrichmentCallback.TYPE, tableProvider);

    //continue registering my other messages
    //...
  }
}

有状态的函数提供者会返回一个FnTableQuery,只要它收到一条消息就会简单地查询表:

public class TableFunctionProvider implements StatefulFunctionProvider {

  @Override
  public StatefulFunction functionOfType(FunctionType type) {
    return new FnTableQuery();
  }
}

查询函数对象将作为每个已建立进程的参与者进行操作,并在调用时简单地查询表:

public class FnTableQuery extends StatefulMatchFunction {

  static final FunctionType TYPE = new FunctionType(Identifiers.NAMESPACE, "my-table");

  private Table myTable;

  @Override
  public void configure(MatchBinder binder) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    myTable = tableEnv.from("my_table");

    binder
        .otherwise(this::catchAll);
  }

  private void catchAll(Context context, Object message) {
    context.send(FnEnrichmentCallback.TYPE, myTable.select("max(amount)").toString(), message);
  }
}

如果这种方法没有意义,我提前道歉,因为我不知道:

  1. Flink 和 Statefun 应用程序可以在源/接收器领域之外协同工作,特别是因为此特定功能是无状态的并且表是有状态的

  2. 我们可以这样查询 Flink 表,我只是将它们作为中间对象查询,发送到 sink 或 datastream

  3. 在 Module.configure 中初始化东西是有意义的,如果有状态的函数提供者和它的匹配函数都是called once per parallel worker

【问题讨论】:

    标签: apache-flink flink-statefun


    【解决方案1】:

    Apache Flink 社区确实考虑在未来支持 Flink DataStreams 作为 StateFun 入口/出口。

    这意味着您可以获取使用 Flink Table API / Flink CEP / DataStream API 等的结果流,并使用流中的事件调用函数。

    【讨论】:

      猜你喜欢
      • 2022-12-15
      • 1970-01-01
      • 1970-01-01
      • 2022-10-20
      • 2020-11-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多