【问题标题】:Flink Table API not able to convert DataSet to DataStreamFlink Table API 无法将 DataSet 转换为 DataStream
【发布时间】:2020-08-28 07:49:00
【问题描述】:

我正在使用使用 Java 的 Flink Table API,我想将 DataSet 转换为 DataStream ...。以下是我的代码:

TableEnvironment tableEnvironment=new TableEnvironment();
Table tab1=table.where("related_value < 2014").select("related_value,ref_id");
DataSet<MyClass>ds2=tableEnvironment.toDataSet(tab1, MyClass.class);
DataStream<MyClass> d=tableEnvironment.toDataStream(tab1, MyClass.class);

但是当我尝试执行这个程序时,它会抛出以下异常:

org.apache.flink.api.table.ExpressionException:JavaStreamingTranslator 的根无效:Root(ArraySeq((related_value,Double), (ref_id,String)))。您是否尝试过将基于 DataSet 的 Table 转换为 DataStream 或反之亦然? 我想知道我们如何使用 Flink Table API 将 DataSet 转换为 DataStream ?

我想知道的另一件事是,对于模式匹配,有可用的 Flink CEP 库。但是使用 Flink Table API 进行模式匹配是否可行?

【问题讨论】:

  • 请不要在一个 Stackoverflow 问题中提出多个问题。改为为您的模式匹配问题打开另一个线程。

标签: java apache-flink flink-streaming flink-cep


【解决方案1】:

Flink 的 Table API 并非旨在将 DataSet 转换为 DataStream,反之亦然。 Table API 无法做到这一点,目前 Flink 也没有其他方法可以做到这一点。

统一DataStreamDataSet API(将批处理作为流的特殊情况,即作为有界流处理)是 Flink 的长期路线图。

【讨论】:

  • 好吧.....只是想知道 Flink 的 Table API 是否可以用于 Pattern Matching 或 CEP ??
  • 请为此打开一个新问题。 Stackoverflow 不是讨论的地方,它是一个问答服务。如果在同一主题下回答了多个问题,其他人将找不到答案。
【解决方案2】:

使用TableEnvironment时无法转换为DataStream API,您必须创建一个StreamTableEnvironment才能从表转换为DataStream,如下所示:

final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(configuration, fsSettings);
DataStream<String> finalRes = fsTableEnv.toAppendStream(tableNameHere, MyClass.class);

希望能以某种方式帮助您。

亲切的问候!

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多