【发布时间】:2017-02-21 21:21:02
【问题描述】:
我正在使用:
- flink 1.1.2
- 卡夫卡 2.10-0.10.0.1
- flink-connector-kafka-0.9.2.10-1.0.0
我正在使用以下非常简单/基本的应用程序
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:33334");
properties.setProperty("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
properties.setProperty("group.id", "test");
String topic = "mytopic";
FlinkKafkaConsumer09<String> fkc =
new FlinkKafkaConsumer09<String>(topic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(fkc);
env.execute()
使用 maven 编译后,当我尝试使用以下命令运行时:
bin/flink run -c com.mycompany.app.App fkaf/target/fkaf-1.0-SNAPSHOT.jar
我看到以下运行时错误:
Submitting job with JobID: f6e290ec7c28f66d527eaa5286c00f4d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-1679485245]
10/12/2016 15:10:06 Job execution switched to status RUNNING.
10/12/2016 15:10:06 Source: Custom Source(1/1) switched to SCHEDULED
10/12/2016 15:10:06 Source: Custom Source(1/1) switched to DEPLOYING
10/12/2016 15:10:06 Map -> Sink: Unnamed(1/1) switched to SCHEDULED
10/12/2016 15:10:06 Map -> Sink: Unnamed(1/1) switched to DEPLOYING
10/12/2016 15:10:06 Source: Custom Source(1/1) switched to RUNNING
10/12/2016 15:10:06 Map -> Sink: Unnamed(1/1) switched to RUNNING
10/12/2016 15:10:06 Map -> Sink: Unnamed(1/1) switched to CANCELED
10/12/2016 15:10:06 Source: Custom Source(1/1) switched to FAILED
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.open(FlinkKafkaConsumer09.java:282)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:722)
知道为什么找不到方法 assign() 吗?方法在里面 lib/kafka-clients-0.10.0.1.jar.
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
// print() will write the contents of the stream to the TaskManager's standard out stream
// the rebelance call is causing a repartitioning of the data so that all machines
// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
env.execute();
【问题讨论】: