【问题标题】:Apache flink with AWS kinesis consume data-Getting java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerApache flink 与 AWS kinesis 消费数据-Getting java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer
【发布时间】:2021-11-01 00:12:27
【问题描述】:

这是我连接 aws kinesis 的代码。当我尝试连接 FlinkKinesisConsumer 时,它会抛出 Classnotfound 异常之类的错误。

import configs.AWSConfigConstants;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

public class StreamingJob {
    public static void main(String[] args) {
        try {
            final StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            String region = "us-east-1";
            String inputStreamName = "";
            String accesskey = "";
            String secretkey = "";
            String initPosition = "LATEST";
            String arn = "";
            
            Properties consumerConfig = new Properties();           
            consumerConfig.put(AWSConfigConstants.AWS_REGION, region);
            consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, accesskey);
            consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretkey);
            consumerConfig.put(AWSConfigConstants.AWS_ROLE_ARN, arn );          
            consumerConfig.put(AWSConfigConstants.STREAM_INITIAL_POSITION, initPosition);

            System.out.println("Consume config properties:");
            System.out.println(consumerConfig);
            
            DataStream<String> kinesisInputStream = sEnv.addSource(new FlinkKinesisConsumer<>
            (inputStreamName,new SimpleStringSchema(),consumerConfig));
            
            System.out.println(kinesisInputStream);
            sEnv.execute("Flink Streaming Processor");          
        } catch(Exception e) {
            System.out.println(e);
        }
    }

我收到以下错误

Output Screenshot

提前感谢您的帮助。

【问题讨论】:

标签: java maven apache-flink amazon-kinesis-analytics


【解决方案1】:

您需要在构建中包含适当版本​​的连接器。像这样,其中 2.11 是 scala 版本,而 1.13.2 是 Fl​​ink 版本。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis_2.11</artifactId>
    <version>1.13.2</version>
</dependency>

有关详细信息,请参阅the docs

【讨论】:

  • 我已将它们添加到我的依赖项中。即使它不起作用。现在我正面临这个问题。如果你们中的任何人知道这一点,请帮助我。 org.apache.flink.streaming.api.datastream.DataStreamSource@12591ac8 java.lang.IllegalStateException:流拓扑中未定义运算符。不能执行。无论如何,感谢您的回复大卫。
  • 每个 flink 作业必须至少有一个 source 和一个 sink。您需要添加一个水槽。
  • 添加接收器后,我试图打印 addsource 存储变量的结果。下面是我得到的结果。 org.apache.flink.streaming.api.datastream.DataStreamSource@17503f6b JobID 536da6a4fb932045cb7dba17657f45d0 java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 536da6a4fb932045cb7dba17657)不知道这里的输出在说什么。请帮帮我大卫。
  • 我建议您按照自己的方式完成文档中的教程和示例,从这里开始:ci.apache.org/projects/flink/flink-docs-release-1.13/docs/…
  • 我现在在失败部分的 flink 作业中遇到了以下异常。你能帮我解决这个异常吗?引起:org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:用户:******************无权执行: kinesis:ListShards on resource: ********************************* (服务:AmazonKinesis;状态码:400;错误代码:AccessDeniedException;请求 ID:****************************************;代理:空)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-25
相关资源
最近更新 更多