【问题标题】:AWS Lambda publishing to IOT Topic fires indefinitelyAWS Lambda 发布到 IOT 主题无限期触发
【发布时间】:2018-12-04 10:52:39
【问题描述】:

问题:

我有一个 node.js (8.10) AWS Lambda 函数,它接受一个 json 对象并将其发布到一个 IOT 主题。该函数成功发布到主题,但是,一旦触发,它就会被连续调用,直到我将并发限制为零以停止对该函数的任何进一步调用。

我正试图找出我错误地实现了哪些导致函数的多个实例被调用的原因。

功能:

这是我的功能:

var AWS = require('aws-sdk');

exports.handler = function (event, context) {
    var iotdata = new AWS.IotData({endpoint: 'xxxxxxxxxx.iot.us-east-1.amazonaws.com'});
    var params = {
        topic: '/PiDevTest/SyncDevice',
        payload: JSON.stringify(event),
        qos: 0
    };

    iotdata.publish(params, function(err, data) {
        if (err) {
          console.log(err, err.stack);
        } else {
            console.log("Message sent.");
            context.succeed();
        }     
    });
};

我的测试 json 是:

{
  "success": 1,
  "TccvID": "TestID01"
}

测试控制台的响应为“null”,但IOT主题显示来自测试json的数据,大约每秒一次发布到主题。

我的尝试

-我试图在它自己的非匿名函数中定义处理程序,称为处理程序,然后让exports.handler = handler;这没有产生任何错误,但也没有成功发布到 iot 主题。

-我认为问题可能出在 node.js 回调上。我已经尝试过实现它并将其排除在外(上面的当前迭代),但似乎两种方法都没有影响。我曾在某处读过该函数会在出错时重试,但我相信这只发生了 3 次,因此它无法解释函数的无限期调用。

-我还尝试从另一个 lambda 调用该函数,以确保问题不是 aws 测试工具。不过,这会产生相同的行为。

总结:

我做错了什么导致这个函数无限期地将json数据发布到iot主题?

提前感谢您的时间和专业知识。

【问题讨论】:

  • Lambda 函数是如何准确调用/触发的?您确定 Lambda 函数未设置为由新的 IoT 事件触发,因此每次发布到该主题时都会自行触发?
  • 目前仅从另一个 Lambda 或 aws 测试功能调用它。我没有订阅该主题,只是发布到它,所以我不希望它被发布到该主题的新数据触发。不过我可以测试一下。
  • 好主意,它确实出现在手动将相同的 json 发布到主题中,它调用了 lambda。这告诉我,我在某个地方有一个 lambda,它不加选择地监听该主题,然后调用第二个 lambda。
  • @MarkB 你让我明白了,解决方案与另一个 lambda 有关,他正在听同一主题并调用我正在处理的 lambda。如果您想将其发布为答案,我会接受。

标签: node.js aws-lambda aws-sdk aws-iot


【解决方案1】:

使用 aws-iot-device-sdk 创建一个 MQTT 客户端,并使用它的 messageHandler 和 publish 方法将您的消息发布到 IOT 主题。示例 MQTT 客户端代码如下,

import * as DeviceSdk from 'aws-iot-device-sdk';
import * as AWS from 'aws-sdk';

let instance: any = null;

export default class IoTClient {

  client: any;
  /**
   * Constructor
   *
   * @params {boolean} createNewClient - Whether or not to use existing client instance
   */
  constructor(createNewClient = false, options = {}) {

  }

  async init(createNewClient, options) {
    if (createNewClient && instance) {
      instance.disconnect();
      instance = null;
    }

    if (instance) {
      return instance;
    }
    instance = this;
    this.initClient(options);
    this.attachDebugHandlers();
  }

  /**
   * Instantiate AWS IoT device object
   * Note that the credentials must be initialized with empty strings;
   * When we successfully authenticate to the Cognito Identity Pool,
   * the credentials will be dynamically updated.
   *
   * @params {Object} options - Options to pass to DeviceSdk
   */
  initClient(options) {
    const clientId = getUniqueId();

    this.client = DeviceSdk.device({
      region: options.region || getConfig('iotRegion'),

      // AWS IoT Host endpoint
      host: options.host || getConfig('iotHost'),

      // clientId created earlier
      clientId: options.clientId || clientId,

      // Connect via secure WebSocket
      protocol: options.protocol || getConfig('iotProtocol'),

      // Set the maximum reconnect time to 500ms; this is a browser application
      // so we don't want to leave the user waiting too long for reconnection after
      // re-connecting to the network/re-opening their laptop/etc...
      baseReconnectTimeMs: options.baseReconnectTimeMs || 500,
      maximumReconnectTimeMs: options.maximumReconnectTimeMs || 1000,

      // Enable console debugging information
      debug: (typeof options.debug === 'undefined') ? true : options.debug,

      // AWS access key ID, secret key and session token must be
      // initialized with empty strings
      accessKeyId: options.accessKeyId,
      secretKey: options.secretKey,
      sessionToken: options.sessionToken,
      // Let redux handle subscriptions
      autoResubscribe: (typeof options.debug === 'undefined') ? false : options.autoResubscribe,
    });
  }

  disconnect() {
    this.client.end();
  }

  attachDebugHandlers() {
    this.client.on('reconnect', () => {
      logger.info('reconnect');
    });

    this.client.on('offline', () => {
      logger.info('offline');
    });

    this.client.on('error', (err) => {
      logger.info('iot client error', err);
    });

    this.client.on('message', (topic, message) => {
      logger.info('new message', topic, JSON.parse(message.toString()));
    });
  }

  updateWebSocketCredentials(accessKeyId, secretAccessKey, sessionToken) {
    this.client.updateWebSocketCredentials(accessKeyId, secretAccessKey, sessionToken);
  }

  attachMessageHandler(onNewMessageHandler) {
    this.client.on('message', onNewMessageHandler);
  }

  attachConnectHandler(onConnectHandler) {
    this.client.on('connect', (connack) => {
      logger.info('connected', connack);
      onConnectHandler(connack);
    });
  }

  attachCloseHandler(onCloseHandler) {
    this.client.on('close', (err) => {
      logger.info('close', err);
      onCloseHandler(err);
    });
  }

  publish(topic, message) {
    this.client.publish(topic, message);
  }

  subscribe(topic) {
    this.client.subscribe(topic);
  }

  unsubscribe(topic) {
    this.client.unsubscribe(topic);
    logger.info('unsubscribed from topic', topic);
  }
}

***getConfig() 是从yml文件中获取环境变量,或者直接在这里指定。

【讨论】:

  • 感谢您的意见,Sameera。发布到 IOT 主题不是问题。 Mark B. 的评论让我找到了正确的解决方案。
【解决方案2】:

虽然他只是将其作为评论发布,但 MarkB 为我指明了正确的方向。

问题是解决方案与另一个 lambda 有关,他正在听同一主题并调用我正在处理的 lambda。这导致了循环逻辑,因为退出条件从未满足。修复该代码解决了这个问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-11-04
    • 1970-01-01
    • 2017-02-22
    • 2018-01-15
    • 2018-10-01
    • 1970-01-01
    • 2018-03-25
    • 2016-12-22
    相关资源
    最近更新 更多