【问题标题】:Stuck with type hints in clojure for generic class卡在 clojure 中的泛型类类型提示
【发布时间】:2015-11-14 08:18:52
【问题描述】:

我试图从 Apache flink 在 clojure 中运行一个小例子,但现在我被卡住了,因为 clojure 中的类型提示和 flink 中的一些奇怪的怪癖。

这是我的代码:

(ns pipeline.core
 (:import
 (org.apache.flink.api.java ExecutionEnvironment)
 (org.apache.flink.api.common.functions FlatMapFunction)
 (org.apache.flink.api.java.tuple Tuple2)
 (org.apache.flink.util Collector)
 (java.lang String)))

(def flink-env (ExecutionEnvironment/createLocalEnvironment))

(def dataset (.fromElements flink-env (to-array ["please test me"])))

(defn tokenizer [] (reify FlatMapFunction
                 ( flatMap [this value collector] 
                   (println value))))

(.flatMap dataset (tokenizer))

如果我不提供类型提示,我会从 flink api 收到错误:

Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662)

如果我提供类型提示:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector collector] 
                   (println value))))

我从 clojure 编译器收到一个错误:

Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065) 

有没有办法在 clojure 中使用泛型类添加类型提示? 应该是这样的:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector] 
                   (println value))))

但这不起作用。有什么想法吗?

lein 配置如下所示:

(defproject pipeline "0.1.0-SNAPSHOT"
 :description "FIXME: write description"
 :url "http://example.com/FIXME"
 :license {:name "Eclipse Public License"
        :url "http://www.eclipse.org/legal/epl-v10.html"}
 :dependencies [[org.clojure/clojure "1.7.0"]               
             [org.apache.flink/flink-java "0.9.0"]              
             ]
  :aot :all)

【问题讨论】:

    标签: generics clojure type-hinting clojure-java-interop apache-flink


    【解决方案1】:

    Clojure 无法处理反射,因此需要通过 Flink 方法 returns 手动指定返回类型。

    (.returns (.flatMap dataset (tokenizer)) String)
    

    另外,由于Flink无法处理匿名类,所以需要使用deftype定义tokenizer并实例化一个新对象:

    (deftype tokenizer [] FlatMapFunction
                          (flatMap [this value collector] 
                            (println value)))
    
    (.flatMap dataset (tokenizer.))
    

    这是一个完整的“字数示例”,可以打包到 jar 中并执行。

    注意类型提示和类型转换。对于tokenizer,需要输出(int 1),否则Long 将是Tuple2 的第二种类型。此外,我们使用 String 来声明 tokenizer 的输出类型(类类型是不够的,因为还必须指定反射类型)。最后,我们需要输入提示(int-array [0]) 来解决groupBy 的重载(没有它,该方法对Clojure 编译器来说是不明确的)。

    (ns org.apache.flink.flink-clojure.WordCount
     (:import
     (org.apache.flink.api.common.functions FlatMapFunction)
     (org.apache.flink.api.java DataSet)
     (org.apache.flink.api.java ExecutionEnvironment)
     (org.apache.flink.api.java.tuple Tuple2)
     (org.apache.flink.util Collector)
     (java.lang String))
     (:require [clojure.string :as str])
     (:gen-class))
    
    (def flink-env (ExecutionEnvironment/createLocalEnvironment))
    
    (def text (.fromElements flink-env (to-array ["please test me and me too"])))
    
    (deftype tokenizer [] FlatMapFunction
                          (flatMap [this value collector]
                            (doseq [v (str/split value #"\s")]
                              (.collect collector (Tuple2. v (int 1))))))
    
    (def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))
    
    (def counts (.sum (.groupBy tokens (int-array [0])) 1))
    
    (defn -main []
      (.print counts)
    )
    

    【讨论】:

    • 嗨,不幸的是,这并没有帮助,我仍然得到了 `The types of interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred.` 但谢谢对于您的输入,我将进一步研究 SingleInputUdfOperator。顺便说一句,应该是`(.returns (.flatMap dataset (tokenizer)) String)`
    • 所以,我直接在java中尝试了。我修改了 wordcount 示例,以便 Tokenizer 只实现“FlatMapFunction”(没有泛型)。即使使用 ".returns("Tuple2") 我也会遇到同样的异常。所以也许我需要为我的目的编写一个额外的 java 包装器来处理泛型类型。
    • 您好,该错误现已在当前主服务器中修复。我刚刚更新了我的答案(包括一个完整的字数示例)。
    • 使用字符串类型提示deprecated in Flink 1.1、@MatthiasJ.Sax,您对如何使用替代的TypeHint 方法签名有任何想法吗?
    • 我想根本没有...... Clojure 无法处理 Java 泛型 AFAIK。您需要定义一个自定义类 MyType extends Tuple2&lt;String,Integer&gt; 并改用它(您可以完全省略 returns,因为对于自定义“MyType”,Flink 将能够自动确定返回类型。)
    【解决方案2】:

    作为此评论的后续Stuck with type hints in clojure for generic class

    使用最新的 flink 版本(在 1.6.1 上测试),您需要定义一个自定义类,否则您会收到如下错误:

    Exception in thread "main" java.lang.IllegalArgumentException: No matching method found: returns for class org.apache.flink.api.java.operators.FlatMapOperator, compiling:(WordCount.clj:69:13)
    

    自定义类:

    package org.apache.flink.java;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    
    
    public class WordCountTuple extends Tuple2<String, Integer> {
    
    }
    

    clojure 代码

    (ns org.apache.flink.clojure.WordCount
      (:import
       (org.apache.flink.api.common.functions FlatMapFunction)
       (org.apache.flink.api.java DataSet)
       (org.apache.flink.api.java ExecutionEnvironment)
       (org.apache.flink.api.java.tuple Tuple2)
       (org.apache.flink.java WordCountTuple)
       (org.apache.flink.util Collector)
       (java.lang String))
      (:require [clojure.string :as str])
      (:gen-class))
    
    (def flink-env (ExecutionEnvironment/getExecutionEnvironment))
    
    (def text (.fromElements flink-env (to-array ["please test me and me too"])))
    
    (deftype tokenizer [] FlatMapFunction
             (flatMap [this value collector]
               (doseq [v (str/split value #"\s")]
                 (.collect collector (Tuple2. v (int 1))))))
    
    (def tokens (.returns (.flatMap text (tokenizer.)) WordCountTuple))
    
    (def counts (.sum (.groupBy tokens (int-array [0])) 1))
    
    (defn -main []
      (.print counts))
    

    工作示例叉在这里https://github.com/guillaume/flink-external

    【讨论】:

      猜你喜欢
      • 2020-10-24
      • 2017-07-27
      • 2021-11-25
      • 2011-06-22
      • 2019-02-14
      • 1970-01-01
      • 1970-01-01
      • 2021-08-27
      • 2017-08-18
      相关资源
      最近更新 更多