【问题标题】:c-like mutex in nodejsnodejs中的类似c的互斥锁
【发布时间】:2018-02-01 13:31:17
【问题描述】:

我正在尝试在 node.js 中实现类似 c 的等待和信号。我想到了二进制信号量或互斥体。请记住,我是 node.js 的新手,对 javascript 不是很熟练。

我的事情是:

  • 我有一个python交互进程在服务器上运行。

  • 客户端向服务器创建一个 Ajax,然后服务器通过在其标准输入上写入来询问这个 python 进程。 http 响应对象保存在一个变量中。

  • 然后我拦截包含我的答案的标准输出,并将其发送到保留的响应对象中。

我想让这个“python 部分”原子化,这意味着:

  • 如果在 python 进程已经运行时收到另一个请求,请等待。否则请求将丢失。

  • 当标准输出触发时,发送信号释放访问。

同样,与 P 和 V 在二进制信号量或互斥锁和互斥解锁中的行为相同。

那么,如果您是高级 nodejs 用户有什么建议吗?我查看了 npm,但除了类似 Promise 的互斥体之外什么也没找到,我认为这并不适合我的情况。

也许我错了,有办法。 无论如何,我期待阅读您的建议或解决方法。 谢谢。

这里是相关的示例代码。

route.js

var express = require('express');
var router = express.Router();
var pycaffe = require('./pycaffe');

var py = pycaffe.getInstance();

router.post('/getParams', function(req, res, next){
    in_data = req.body.imgBase64;
    py.queryParams(res, in_data);

});

pycaffe.js

const { spawn } = require('child_process');

var pycaffe = (function () {

    var pycaffeInstance;

    function create () {
        var response;  

        console.log('Loading nets...');

        const defaults = {
            cwd: './nets',
            env: process.env
        };

        var py = spawn('python', ['-i', 'deploy.py'], defaults);

        py.stdout.on('data', function(data){
            console.log(`stdout: ${data}`);
            var fig = JSON.parse(data);

            if(response !== undefined)
                //send http response
                response.send(fig)
                //mutex.unlock()
        });

        function queryParams(res, data) {
            //mutex.lock()
            response = res;
            py.stdin.write("query_params('"+data+"')\n");
        }

        return {
            queryParams: queryParams
        };
    }

    return {
        getInstance: function() {
            if(!pycaffeInstance) {
                pycaffeInstance = create();
            }
            return pycaffeInstance;
        }
    };
})();

module.exports = pycaffe;

【问题讨论】:

  • 如何知道python实例何时返回响应?它会写信给stdout吗?
  • 是的,它写的内容是在 http 响应中发送的

标签: javascript python c node.js asynchronous


【解决方案1】:

您不需要互斥锁或信号量,因为在节点中您只有一个线程。如果有任何请求正在处理,您只需将请求存储在队列(可以是数组)中。当您通过标准输出收到响应时,请检查队列是否包含另一个请求和进程。

您可以拥有基于 Promise 的类似互斥锁的抽象。这是一个例子:

class Mutex {
    constructor () {
        this.queue = [];
        this.locked = false;
    }

    lock () {
        return new Promise((resolve, reject) => {
            if (this.locked) {
                this.queue.push([resolve, reject]);
            } else {
                this.locked = true;
                resolve();
            }
        });
    }

    release () {
        if (this.queue.length > 0) {
            const [resolve, reject] = this.queue.shift();
            resolve();
        } else {
            this.locked = false;
        }
    }
}

使用示例:

const mutex = new Mutex();

// using promise syntax
const handleRequest = () => {
    mutex.lock().then(() => {
        // do something here

        mutex.release();
    })
};

// using async syntax
const handleRequest = async () => {
    await mutex.lock();

    // do something here

    mutex.release();
};

我用这段代码来测试:

const delay = ms => new Promise((resolve, reject) => setTimeout(resolve, ms));

let processNumber = 1;
const startProcess = async (mutex) => {
    const thisProcessId = processNumber++;
    console.log(new Date(), `started process ${thisProcessId}`);
    await mutex.lock();
    console.log(new Date(), `after lock ${thisProcessId}`);
    await delay(3000);
    mutex.release();
    console.log(new Date(), `finished process ${thisProcessId}`);
};

(() => {
    const mutex = new Mutex();
    for (let i = 0; i < 5; i++) {
        setTimeout(() => startProcess(mutex), Math.random() * 10000);
    }
})();

...得到了这个结果:

2018-02-01T19:02:36.418Z 'started process 1'
2018-02-01T19:02:36.421Z 'after lock 1'
2018-02-01T19:02:38.565Z 'started process 2'
2018-02-01T19:02:39.426Z 'finished process 1'
2018-02-01T19:02:39.426Z 'after lock 2'
2018-02-01T19:02:40.048Z 'started process 3'
2018-02-01T19:02:42.309Z 'started process 4'
2018-02-01T19:02:42.428Z 'finished process 2'
2018-02-01T19:02:42.428Z 'after lock 3'
2018-02-01T19:02:43.200Z 'started process 5'
2018-02-01T19:02:45.429Z 'finished process 3'
2018-02-01T19:02:45.429Z 'after lock 4'
2018-02-01T19:02:48.433Z 'finished process 4'
2018-02-01T19:02:48.433Z 'after lock 5'
2018-02-01T19:02:51.438Z 'finished process 5'

【讨论】:

    【解决方案2】:

    一种选择是生成多个 python 进程:

    router.post('/getParams', function(req, res, next){
      in_data = req.body.imgBase64;
      pycaffe.create().queryParams(res, in_data);
    });
    

    为此,您需要公开create

    return {
        getInstance: function() {
            if(!pycaffeInstance) {
                pycaffeInstance = create();
            }
            return pycaffeInstance;
        },
        create // expose create here
    };
    

    或者,如果您真的只想要一个 python 进程,您应该使用 异步队列 而不是 互斥锁。 nodejs 中没有互斥锁,因为没有并行代码执行。示例异步队列如下所示:

     class AsyncQueue {
       constructor(task) {
         this.task = task;
         this.queue = [];
      }
    
      push(el){
       this.queue.push(el);
       this.start();
      }
    
      async start(){
       if(this.running) return;
       this.running = true;
    
       let el;
       while(el = this.queue.shift()){
          await this.task(el);
      }
    
      this.running = false;
     }
    }
    

    可以这样使用:

    function pyRequest({res, data}){
      py.requestParams(res, data);
    
      return new Promise(resolve => res.on("data", resolve));
    }
    
    const pyQueue = new AsyncQueue(pyRequest);
    
    router.post('/getParams', function(req, res, next){
      pyQueue.push({
        data: req.body.imgBase64,
        res
      });
    });
    

    【讨论】:

    • 所以这个想法是使用单个 python 进程,因为它的 init 非常重(加载 4 个 CNN)。我将如何使用异步队列?我必须以某种方式等待标准输出触发。
    猜你喜欢
    • 1970-01-01
    • 2015-10-06
    • 2022-07-31
    • 1970-01-01
    • 2011-01-21
    • 2011-08-10
    • 1970-01-01
    • 1970-01-01
    • 2018-05-23
    相关资源
    最近更新 更多