【问题标题】:OpenWhisk send message to Kafka timeoutOpenWhisk 向 Kafka 发送消息超时
【发布时间】:2021-04-27 09:23:25
【问题描述】:

环境详情

CentOS7、独立OpenWhisk

问题描述

我打算在openwhisk中向Kafka发送消息,数据流程是:WSK CLI -> OpenWhisk action -> kafka-console-consume

但过程中会出现间歇性故障,如:我发送“test01”~“test06”,只得到“test02”、“test04”、“test06”。

根据日志,失败的原因是超时。

这是我的动作脚本:

// js
const kafka = require('kafka-node');

const Producer = kafka.Producer;
const client = new kafka.KafkaClient({
    kafkaHost: "192.168.68.132:3093"
});

const producer = new Producer(client);

function main(actionObj) {
    var message = JSON.stringify(actionObj);
    const payloads = [
        {
            topic: 'beforeReductionTopic',
            messages: message,
            partition: 0
        }
    ];
    return new Promise(function (resolve, reject) {
        producer
        .on('ready', () => {
            producer.send(payloads, function (err, data) {
                if (err) {
                    console.log("++++++ producer err: ", err);
                    return reject(err);
                } else {
                    console.log("++++++ producer data: ", data);
                    return resolve(data);
                }
            });
        })
        .on("error", err => {
            console.error(err);
            return reject(err);
        });
    });
}

exports.main = main;

根据日志,是“action developer error”

我用下面的代码来测试action脚本,kafka可以接收所有消息。

const action = require("./openwhisk-kafka.js");
for(let i = 0; i < 3; i++) {
    action.main("ccc");
}

我不知道如何修改动作脚本。

日志详情

使用 WSK CLI 调用 openwhisk 操作的操作记录

[root@localhost openwhisk-kafka]# wsk action update scicatTest --kind nodejs:10 /test/actions/openwhisk-kafka/openwhisk-kafka.zip 
ok: updated action scicatTest
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test01"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 78e633791c114d29a633791c11fd29c2
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test02"
{
    "beforeReductionTopic": {
        "0": 57
    }
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test03"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 09165a45a0f94011965a45a0f920110b
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test04"
{
    "beforeReductionTopic": {
        "0": 58
    }
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test05"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test06"
{
    "beforeReductionTopic": {
        "0": 59
    }
}
[root@localhost openwhisk-kafka]# 

失败操作日志

[root@localhost openwhisk-kafka]# wsk activation get 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
ok: got activation 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
{
    "namespace": "guest",
    "name": "scicatTest",
    "version": "0.0.4",
    "subject": "guest",
    "activationId": "8a9f38f9d5ab4fa49f38f9d5ab6fa45a",
    "start": 1619540684355,
    "end": 1619540744418,
    "duration": 60063,
    "statusCode": 0,
    "response": {
        "status": "action developer error",
        "statusCode": 0,
        "success": false,
        "result": {
            "error": "The action exceeded its time limits of 60000 milliseconds."
        }
    },
    "logs": [],
    "annotations": [
        {
            "key": "path",
            "value": "guest/scicatTest"
        },
        {
            "key": "waitTime",
            "value": 652
        },
        {
            "key": "kind",
            "value": "nodejs:10"
        },
        {
            "key": "timeout",
            "value": true
        },
        {
            "key": "limits",
            "value": {
                "concurrency": 1,
                "logs": 10,
                "memory": 256,
                "timeout": 60000
            }
        }
    ],
    "publish": false
}

【问题讨论】:

    标签: apache-kafka openwhisk


    【解决方案1】:

    不要使用“kafka-node”。替换为“kafkajs”

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-01-16
      • 1970-01-01
      • 2015-03-19
      • 2019-10-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多