【问题标题】:Flink Kafka Connector runtime errorFlink Kafka 连接器运行时错误
【发布时间】:2017-02-21 21:21:02
【问题描述】:

我正在使用:

  • flink 1.1.2
  • 卡夫卡 2.10-0.10.0.1
  • flink-connector-kafka-0.9.2.10-1.0.0

我正在使用以下非常简单/基本的应用程序

Properties properties = new Properties();                               
properties.setProperty("bootstrap.servers", "localhost:33334");         

properties.setProperty("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
properties.setProperty("group.id", "test");                             
String topic = "mytopic";                                                

FlinkKafkaConsumer09<String> fkc =                                      
    new FlinkKafkaConsumer09<String>(topic, new SimpleStringSchema(), properties);

DataStream<String> stream = env.addSource(fkc);    
env.execute()

使用 maven 编译后,当我尝试使用以下命令运行时:

bin/flink run -c  com.mycompany.app.App fkaf/target/fkaf-1.0-SNAPSHOT.jar

我看到以下运行时错误:

Submitting job with JobID: f6e290ec7c28f66d527eaa5286c00f4d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-1679485245]
10/12/2016 15:10:06     Job execution switched to status RUNNING.
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to SCHEDULED 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to DEPLOYING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to RUNNING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to RUNNING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to CANCELED 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to FAILED 
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.open(FlinkKafkaConsumer09.java:282)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:722)

知道为什么找不到方法 assign() 吗?方法在里面 lib/kafka-clients-0.10.0.1.jar.

    ParameterTool parameterTool = ParameterTool.fromArgs(args);             

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    // print() will write the contents of the stream to the TaskManager's standard out stream
    // the rebelance call is causing a repartitioning of the data so that all machines
    // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
    messageStream.rebalance().map(new MapFunction<String, String>() {       
        private static final long serialVersionUID = -6867736771747690202L; 

        @Override                                                           
        public String map(String value) throws Exception {                  
            return "Kafka and Flink says: " + value;                        
        }                                                                   
    }).print();                                                             

    env.execute();  

【问题讨论】:

    标签: apache-kafka apache-flink


    【解决方案1】:

    NoSuchMethodError 表示版本不匹配。

    我猜问题是您尝试将 Kafka 0.9 消费者连接到 Kafka 0.10 实例。 Flink 1.1.x 不提供 Kafka 0.10 消费者。但是,0.10 消费者将包含在即将发布的 1.2.0 版本中。

    您可以尝试从当前的主分支 (1.2-SNAPSHOT) 自己构建 Kafka 0.10 消费者,并将该消费者与 Flink 1.1.2 一起使用。对应的 Flink API 应该是稳定的,并且从 1.2 向后兼容到 1.1。

    【讨论】:

    • 非常感谢。这样可行。我已经转移到 Kafka 0.9(而不是为 0.10 编译消费者)。现在该过程运行良好,但没有收到任何消息。我有一个使用 rdkafka 库的生产者,当我使用“./rdkafka_consumer_example -g test -b localhost:33334 mytopic”时,它会收到生产者正在生成的消息。但是当我运行“bin/flink run -c com.mycompany.app.App fkaf/target/fkaf-1.0-SNAPSHOT.jar --topic mytopic --bootstrap.servers localhost:33334”时,flink-kafka 消费者没有收到它”。 W
    • 知道是什么导致 flink 消费者收不到它吗?
    • 澄清一下,我已经删除了上面代码中显示的“setProperty”,并开始使用 parameterTool。其余代码相同。 KafkaConsumer09 没有收到消息我做错了什么?感谢任何帮助。 -- 谢谢
    • 您如何注意到没有收到数据?您的程序不包含一个接收器来发出数据。如果在env.execute() 之前添加stream.print(),则数据应写入./log/ 目录下的TaskManager .out 文件中。
    • 再次感谢您。我确实放了stream.print()。很抱歉没有说明这一点。这是我的完整程序
    猜你喜欢
    • 2018-03-21
    • 1970-01-01
    • 1970-01-01
    • 2017-06-27
    • 1970-01-01
    • 2023-03-07
    • 1970-01-01
    • 2021-01-08
    • 1970-01-01
    相关资源
    最近更新 更多