【问题标题】:How to use Flink's KafkaSource in Scala?如何在 Scala 中使用 Flink 的 KafkaSource?
【发布时间】:2015-10-03 20:16:29
【问题描述】:

我正在尝试使用 Flink 的 KafkaSource 运行一个简单的测试程序。我正在使用以下内容:

  • Flink 0.9
  • Scala 2.10.4
  • Kafka 0.8.2.1

按照herehere 的描述,我按照文档测试了KafkaSource(添加了依赖项,将Kafka 连接器flink-connector-kafka 捆绑在插件中)。

下面是我的简单测试程序:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

但是,编译总是报错 KafkaSource not found:

[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR]     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))

我错过了什么?

【问题讨论】:

    标签: scala apache-kafka apache-flink


    【解决方案1】:

    我是 sbt 用户,所以我使用了以下build.sbt

    organization := "pl.japila.kafka"
    scalaVersion := "2.11.7"
    
    libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
    libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"
    

    这让我可以运行程序:

    import org.apache.flink.streaming.api.environment._
    import org.apache.flink.streaming.connectors.kafka
    import org.apache.flink.streaming.connectors.kafka.api._
    import org.apache.flink.streaming.util.serialization._
    
    object TestKafka {
      def main(args: Array[String]) {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env
         .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
         .print
      }
    }
    

    输出:

    [kafka-flink]> run
    [info] Running TestKafka
    log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    [success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
    

    【讨论】:

    • 谢谢雅克! sbt 文件很有用,编译通过。但是,当我使用 ./bin/flink run /path/to/project/target/scala-2.11/TestKafka_2.11-1.0.jar 运行程序时,出现以下运行时错误:java.lang.NoClassDefFoundError: org/ apache/flink/streaming/connectors/kafka/api/KafkaSource at TestKafka$.main(TestKafka.scala:10) .......... : java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.api.KafkaSource
    • 好的,我下载了kafka_2.10-0.8.2.1.jar(回到Scala 2.10)和flink-connector-kafka-0.9.0.jar,然后把它们放到$FLINK_HOME/lib/中。现在运行时错误消失了。但这只是一种解决方法。我相信有更好的解决方案。
    【解决方案2】:

    问题似乎是 SBT 和 Maven 配置文件不能很好地协同工作。

    Flink POM 将 Scala 版本(2.10、2.11、...)称为变量,其中一些在构建配置文件中定义。 SBT 未正确评估配置文件,因此包装无法正常工作。

    有一个问题和待处理的拉取请求来解决这个问题:https://issues.apache.org/jira/browse/FLINK-2408

    【讨论】:

      【解决方案3】:
      object FlinkKafkaStreaming {
          def main(args: Array[String]) {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val properties = new Properties()
          properties.setProperty("bootstrap.servers", "localhost:9092")
         // only required for Kafka 0.8
         properties.setProperty("zookeeper.connect", "localhost:2181")
         properties.setProperty("group.id", "flink-kafka")
         val stream = env.addSource(new FlinkKafkaConsumer08[String] 
        ("your_topic_name",new SimpleStringSchema(), properties))   
        stream.setParallelism(1).writeAsText("your_local_dir_path")
        env.execute("XDFlinkKafkaStreaming")
        }
      }
      

      为了测试你可以这样做:

      1. 先运行flink demo;
      2. 运行 Kafka_Proudcer;

      【讨论】:

        猜你喜欢
        • 2019-05-20
        • 2022-06-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-11-11
        • 2019-07-18
        • 2019-10-13
        相关资源
        最近更新 更多