【问题标题】:kafka flink connection error shows NoSuchMethodErrorkafka flink连接报错显示NoSuchMethodError
【发布时间】:2020-01-23 20:14:11
【问题描述】:

当我从 flinkkafkaconsumer09 更改为 flinkkafkaconsumer 时出现新错误 Flink 代码:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;

@SuppressWarnings("deprecation")
public class ReadFromKafka {


  public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test-consumer-group");


    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer<String>("test4", new SimpleStringSchema(), properties));

    stream.map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;

      @Override
      public String map(String value) throws Exception {
        return "Stream Value: " + value;
      }
    }).print();

    env.execute();
  }


}

错误: log4j:WARN 找不到记录器 (org.apache.flink.api.java.ClosureCleaner) 的附加程序。 log4j:WARN 请正确初始化 log4j 系统。 log4j:WARN 请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig 了解更多信息。 线程“主”org.apache.flink.runtime.client.JobExecutionException 中的异常:作业执行失败。 在 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) 在 org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) 在 org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117) 在 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) 在 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489) 在 ReadFromKafka.main(ReadFromKafka.java:33) 原因:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.dataartisans</groupId>
  <artifactId>kafka-example</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>kafkaex</name>
  <description>this is flink kafka example</description>
  <dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.9.1</version>
</dependency>

    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1</version>
    </dependency>  
</dependencies>
</project>

【问题讨论】:

  • 我不确定是哪里出了问题,但 Flink 不喜欢不会产生任何结果的工作。尝试添加接收器 - 例如,stream.print();
  • 其实不是那个问题我在这里减少了代码,只是我在eclipse中添加了print()函数。
  • 不知道出了什么问题。请分享更多信息:导入、依赖项、完整的错误报告(右侧裁剪了一些细节)。
  • 连同 pom 和错误一起完全添加
  • kafka 2.12 flink 1.9.1

标签: apache-flink


【解决方案1】:

flink-connector-kafka_2.12FlinkKafkaConsumer09 不兼容。

flink-connector-kafka_2.12 是一个“通用”的 kafka 连接器,编译后用于 Scala 2.12。此通用连接器可用于 0.11.0 及以后的任何版本的 Kafka。

FlinkKafkaConsumer09 用于 Kafka 0.9.x。如果您的 Kafka 代理运行的是 Kafka 0.9.x,那么您将需要 flink-connector-kafka-0.9_2.11flink-connector-kafka-0.9_2.12,具体取决于您想要的 Scala 版本。

另一方面,如果您的 Kafka 代理运行的是最新版本的 Kafka(0.11.0 或更高版本),则坚持使用 flink-connector-kafka_2.12 并使用 FlinkKafkaConsumer 而不是 FlinkKafkaConsumer09

请参阅the documentation 了解更多信息。

【讨论】:

  • 在更改 flinkkafkaconsumer 时出现另一个错误。
猜你喜欢
  • 2017-06-27
  • 1970-01-01
  • 2017-02-21
  • 1970-01-01
  • 1970-01-01
  • 2020-05-05
  • 2023-03-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多