【问题标题】:Apache Flink Streaming type mismatch in flatMap functionflatMap 函数中的 Apache Flink Streaming 类型不匹配
【发布时间】:2015-11-15 14:43:33
【问题描述】:

尝试在 scala 2.10.4 中使用 0.10.0 flink 版本的流 api。在尝试编译第一个版本时:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._

object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val words : DataStream[String] = text.flatMap[String](
      new Function[String,TraversableOnce[String]] { 
        def apply(line:String):TraversableOnce[String] = line.split(" ")
      })

    env.execute("Window Stream wordcount")
  }
}

我收到编译时错误:

[error]  found   : String => TraversableOnce[String]
[error]  required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error]       new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error]       ^

在我包含在项目中的 DataStream.class 的反编译版本中,有一些函数可以接受这种类型(最后一个):

public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
        if (flatMapper == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        TypeInformation outType = (TypeInformation)Predef..MODULE$.implicitly(evidence$12);
        return package..MODULE$.javaToScalaStream((org.apache.flink.streaming.api.datastream.DataStream)this.javaStream.flatMap(flatMapper).returns(outType));
    }

    public <R> DataStream<R> flatMap(Function2<T, Collector<R>, BoxedUnit> fun, TypeInformation<R> evidence$14, ClassTag<R> evidence$15) {
        if (fun == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        Function2<T, Collector<R>, BoxedUnit> cleanFun = this.clean((F)fun);
        .anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
        return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence$14, evidence$15);
    }

    public <R> DataStream<R> flatMap(Function1<T, TraversableOnce<R>> fun, TypeInformation<R> evidence$16, ClassTag<R> evidence$17) {
        if (fun == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        Function1<T, TraversableOnce<R>> cleanFun = this.clean((F)fun);
        .anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
        return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence$16, evidence$17);
    }

这里可能有什么问题?如果您能提供一些见解,我将不胜感激。 提前谢谢你。

【问题讨论】:

    标签: scala apache-flink flink-streaming


    【解决方案1】:

    问题是你导入的是Flink的JavaStreamExecutionEnvironmentorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment

    您必须像这样使用StreamExecutionEnvironment 的Scala 变体:import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment。 有了这个改变,一切都在成功构建!

    原答案: 问题是您将Function 传递给flatMap() 方法。但是 flatMap() 需要 FlatMapFunction

     val words : DataStream[String] = text.flatMap[String](
          new FlatMapFunction[String,String] {
            override def flatMap(t: String, collector: Collector[String]): Unit = t.split(" ")
          })
    

    【讨论】:

    • 是的。这行编译。我在上面的代码中尝试的是满足 "flatMap(Function1> fun, ..." 类型。我在这里查看docs.scala-lang.org/tutorials/tour/… 并看到 (x: Int) => x + 1 是“new Function1[Int, Int] ...”的简写。
    • 在尝试之前,我尝试为 Scala 编译 ci.apache.org/projects/flink/flink-docs-release-0.10/apis/…,这导致匿名函数的类型参数丢失。
    • 要使 flink 文档中的示例正常工作,您需要添加以下导入语句 import org.apache.flink.api.scala._
    • 我在查看您在stackoverflow.com/questions/29540121/… 的答案后添加了此导入,尽管没有效果。错误是“缺少参数类型 [error] val words = text.flatMap{ x => x.split(" ")}"。
    • 抱歉这个愚蠢的问题,但您要运行哪个代码?你是从你的 IDE 中启动 Flink 吗?您在 IDE 中配置了哪个 scala 库版本?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-14
    • 1970-01-01
    • 2022-08-19
    • 1970-01-01
    • 2018-04-27
    • 2022-08-18
    相关资源
    最近更新 更多