【问题标题】:Not able to produce message into kafka topic using aws lambda function无法使用 aws lambda 函数将消息生成到 kafka 主题中
【发布时间】:2020-04-06 22:14:34
【问题描述】:

所以我正在尝试在 S3 事件上编写一个 lambda 函数,它将消息放入 kafka 主题。 我的 aws lambda 函数正在触发并且也没有收到任何错误。 但我无法在 Kafka 主题中看到这些消息。

这是我的 lambda 函数

String srcBucket = record.getS3().getBucket().getName();

        String srcKey = record.getS3().getObject().getUrlDecodedKey();

        System.out.println("Bucket is " + srcBucket + "  and Key is " + srcKey);
        // Assign topicName to string variable
        String topicName = "AWSKafkaTutorialTopic";

        // create instance for properties to access producer configs
        Properties props = new Properties();

        props.put("bootstrap.servers",
                "b-3.205147-riskaudit.rtyrty.c5.kafka.us-east-1.amazonaws.com:9092,b-4.205147-riskaudit.rtyt.c5.kafka.us-east-1.amazonaws.com:9092,b-5.205147-tryrt.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092");
        System.out.println("bootstrap.servers successfully");
        // Set acknowledgements for producer requests.
        props.put("acks", "all");

        // If the request fails, the producer can automatically retry,
        props.put("retries", 0);

        // Specify buffer size in config
        props.put("batch.size", 16384);

        // Reduce the no of requests less than 0
        props.put("linger.ms", 1);

        // The buffer.memory controls the total amount of memory available to the
        // producer for buffering.
        props.put("buffer.memory", 33554432);

        System.out.println("before key.serializer successfully");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        System.out.println("after  key.serializer successfully");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        System.out.println("Inside loop successfully");
        for (int i = 0; i < 10; i++)

            producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
        System.out.println("Message sent successfully");
        producer.close();

        return "Message Pushed success fully";

我的 lambda 函数一直运行到 for 循环,但无法看到之后会发生什么。 请帮忙

【问题讨论】:

    标签: amazon-web-services apache-kafka aws-lambda kafka-producer-api amazon-msk


    【解决方案1】:

    在我看来一切正常 只需添加props.put("producer.type", "async"); 而且您可能没有从启动 MSK 的 vpc 运行您的 lmbda 函数。 另外请注意 IAM 政策。 试试这个AWSLambdaVPCAccessExecutionRole 和安全组。

    如果您设置了所有这些,代码将开始工作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-10-11
      • 1970-01-01
      • 2017-02-22
      • 1970-01-01
      • 1970-01-01
      • 2014-01-04
      • 2019-01-11
      • 2018-10-16
      相关资源
      最近更新 更多