【问题标题】:Does Flink SQL support Java Map types?Flink SQL 是否支持 Java Map 类型?
【发布时间】:2020-02-09 02:53:13
【问题描述】:

我正在尝试使用 Flink 的 SQL API 从地图中访问密钥。它失败并出现错误 Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY 请告诉我如何解决它。 这是我的活动课

     public class EventHolder {

        private Map<String,String> event;

        public Map<String, String> getEvent() {
            return event;
        }

        public void setEvent(Map<String, String> event) {
            this.event = event;
        }
    }

这里是提交 flink 作业的主类

public class MapTableSource {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());

        // register a table and use SQL
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("mapEvent", mapEventStream); 
        //tableEnv.registerFunction("orderSizeType", new OrderSizeType());

        Table alerts = tableEnv.sql(
                "select event['key'] from mapEvent ");

        DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);

        alertStream.filter(new FilterFunction<String>() {
            private static final long serialVersionUID = -2438621539037257735L;

            @Override
            public boolean filter(String value) throws Exception {
                System.out.println("Key value is:"+value);
                return value!=null;
            }
        });

        env.execute("map-tablsource-job");
    }

    private static List<EventHolder> getMaps(){
        List<EventHolder> list = new ArrayList<>();
        for(int i=0;i<5;i++){
            EventHolder holder = new EventHolder();
            Map<String,String> map = new HashMap<>();
            map.put("key", "value");
            holder.setEvent(map);
            list.add(holder);
        }
        return list;
    }
}

当我运行它时,我得到了异常

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)

我使用的是 flink 1.3.1

【问题讨论】:

    标签: apache-flink flink-streaming flink-sql


    【解决方案1】:

    我认为问题在于fromCollection。由于 Java 限制(即类型擦除),Flink 无法提取所需的类型信息。因此,您的地图被视为具有 SQL ANY 类型的黑盒。您可以使用tableEnv.scan("mapEvent").printSchema() 验证表的类型。您可以在fromCollection 中指定类型信息和Types.MAP(Types.STRING, Types.STRING)

    【讨论】:

    • 感谢您的更新。在实际程序中,我没有使用 fromCollection。这只是在示例程序中。实际上,我正在使用 FlinkKinesisConsumer 来获取事件。在反序列化器中,我正在执行此 Map map = gson.fromJson(new String(message), new TypeToken>(){}.getType()); EventHolder eventHolder = new EventHolder(); eventHolder.setEvent(地图);返回事件持有者;
    • 我正在像这样注册事件流。 DataStream orderEventStream = env.addSource(new FlinkKinesisConsumer( params.get("events.stream"), new EventDeserializationSchema(), consumerConfig));即使这样,我也会遇到同样的错误。所以我认为问题不仅仅在于 fromCollection
    • 如果你打印new TypeHint&lt;Map&lt;String, String&gt;&gt;(){}.getType(),你会看到即使这是GenericTypeInfo。我同意这令人困惑,但最初不支持的地图。由于向后兼容性,我不知道我们是否可以改变这种行为。
    • 我为这个问题开了一个issue:issues.apache.org/jira/browse/FLINK-7425
    【解决方案2】:

    我用以下方法解决了类似的问题:

    //Should probably make MapVal more generic, but works for this example
    public class MapVal extends ScalarFunction {
        public String eval(Map<String, String> obj, String key) {
            return obj.get(key);
        }
    }
    
    public class Car {
        private String make;
        private String model;
        private int year;
        private Map<String, String> attributes;
        //getters/setters...
    }
    
    //After registering Stream and TableEnv etc
    
    tableEnv.registerFunction("mapval", new MapVal());
    
    Table cars = tableEnv
                    .scan("Cars")
                    .select("make, model, year, attributes.mapval('name')");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-10-20
      • 2023-03-27
      • 2019-04-24
      相关资源
      最近更新 更多