【问题标题】:PUB/SUB Api doesn't send messages back, what might be the problem?PUB/SUB Api 不发回消息,可能是什么问题?
【发布时间】:2019-09-01 21:12:01
【问题描述】:

我几乎完成了 pub/sub fake-server,它请求用户密码和电子邮件(来自客户端),将此信息与数据库进行比较并返回数据。它有“api_in”和“api_out”帧,然后是 JSON。 发布者顺利地获取并处理所有信息,但它似乎没有将任何内容发送回客户端(订阅者),我不知道为什么,因为它已连接到后续端口。

而且我知道这个实现不是经典的 PUB/SUB 模式,但这是先决条件,要这样做。

我尝试了不同的发布/订阅选项,但没有任何改变。

服务器

let zmq = require('zeromq');
const sqlite3 = require('sqlite3').verbose();

const DBSOURCE = "./db.sqlite";

let db  = new sqlite3.Database(DBSOURCE, (err) => {
    if(err) {
        console.error(err.message);
        throw err;
    } else {
        console.log('Connected to SQLite database');
        db.run(`CREATE TABLE users (
        user_id INTEGER, 
        email TEXT,
        passw TEXT)`,
            (err) => {
                if (err) {
                    // Table already created
                } else {
                    // Creating rows
                    let insert = 'INSERT INTO users (user_id, email, passw) VALUES (?,?,?)';
                    db.run(insert, [123098, 'phillCollins@gmail.com','5502013']);
                    db.run(insert, [42424242,'dukenukem3d@mustdie.com','RodriguesShallLiveLong']);
                    db.run(insert, [5,'yourchick@yandex.ru','semolinaAndPain666']);

                }
            })
    }
});


const args = require('minimist')(process.argv.slice(2));

const pubSocket = zmq.socket('pub', null);

pubSocket.bindSync(`tcp://127.0.0.1:${args['pub']}`);

const subSocket = zmq.socket('sub', null);

subSocket.subscribe('api_in');

subSocket.on('message', function(data) {
    let message = data.toString().replace(/api_in/g, '');
    let mes = JSON.parse(message);

    let api_out = 'api_out';

    let errorWrongPWD = 'WRONG_PWD';
    let errorWrongFormat = 'WRONG_FORMAT';

    if(mes.type = 'login') {
        db.get(`SELECT user_id from users WHERE email = ? and passw = ?`, [mes.email, mes.pwd], function(err, row) {
            if(err) {
                console.log(err);
            } else {
                if(row) {
                    let msg = {
                        msg_id: mes.msg_id,
                        user_id: row.user_id,
                        status: 'ok'
                    }

                    let outMessage = api_out + JSON.stringify(msg);

                    console.log(outMessage);
                    subSocket.send(outMessage);
                } else {
                    let msg = {
                        msg_id: mes.msg_id,
                        status: 'error',
                        error: mes.email == '' || mes.pwd == '' ?  errorWrongFormat : errorWrongPWD
                    }
                    console.log(msg);

                    let outMessage = api_out + JSON.stringify(msg);

                    subSocket.send(outMessage);
                }
            }
        });
    }
});


subSocket.bindSync(`tcp://127.0.0.1:${args['sub']}`);

客户

let zmq = require('zeromq');
let uniqid = require('uniqid');

let readline = require('readline').createInterface({
    input: process.stdin,
    output: process.stdout
});

const args = require('minimist')(process.argv.slice(2));

const pubSocket = zmq.socket('pub', null);

let pubSocketTCP = `tcp://127.0.0.1:${args['sub']}`;

pubSocket.connect(pubSocketTCP);

const subSocket = zmq.socket('sub', null);

let subSocketTCP = `tcp://127.0.0.1:${args['pub']}`;

subSocket.connect(subSocketTCP);



let api_in = 'api_in';
let secondFrame = {
    type: 'login',
    email: '',
    pwd: '',
    msg_id: uniqid()
}

readline.question('What is your email? \n', (email) => {
    secondFrame.email = email;
    readline.question('What is your password \n', (pwd) => {
        secondFrame.pwd = pwd;
        let msg = api_in + JSON.stringify(secondFrame);
        console.log(msg);
        pubSocket.send(msg);
    });
});

subSocket.subscribe('api_out');

subSocket.on('message', (response) => {
/*     let res = response.toString().replace('api_out');
    let responseParsed = JSON.parse(res);
    console.log(responseParsed.status);
    if(response.status == 'error') console.log(response.error); */
    console.log(response);
}); 

我希望服务器端发回信息。

【问题讨论】:

    标签: javascript node.js zeromq publish-subscribe


    【解决方案1】:

    那么,首先,欢迎来到 Zen-of-Zero 域。 ZeroMQ 是一个强大的智能信号/消息传递工具,所以如果你注意它所有的内在美,就没有你做不到的事情(在这条前进的道路上仍然无法避免噩梦)。如果您对这个领域很陌生,可以先阅读一下"ZeroMQ Principles in less than Five Seconds",然后再深入了解该主题的更多细节,或者重复使用here发布的一些技巧

    Q它似乎没有向客户端(订阅者)发送任何内容,我不知道为什么

    有两段代码似乎同时使用了 PUBSUB 可扩展的正式通信模式原型,但在这些代码的获取方式上存在一些问题配置好了。


    服务器代码:

    似乎尝试实例化 PUB-archetype 并为该实例配备单个 AccessPoint,使用 .bindSync()-method 和 cli-parameter args['pub'] 接受普通和常见的连接tcp://-transport-class。

    在为第二个实例定义事件处理程序 .on( 'message', ... ) 后,作为 SUB-archetype,这个变为 .bindSync()-配备单个 AccessPoint,使用 tcp://-传输类,用于使用tcp://-transport-class 接收连接。

    如果您确实必须在 SUB-alike 原型上创建 .send(),则必须使用 XSUB 替代方案,您可以在其中发送数据并使用实际执行一些技巧PUB-side 或 XPUB-side 上的有效负载(请参阅 API 文档,了解有关 ZMQ_XPUB_MANUAL 模式功能的详细信息以及 XPUB-side 上一些更狂野的数据处理的限制)

    ZMQ_XSUB

    ZMQ_SUB 相同,只是您通过向套接字发送订阅消息进行订阅。订阅消息是字节 1(用于订阅)或字节 0(用于取消订阅),后跟订阅正文。也可以发送没有 sub/unsub 前缀的消息,但不会影响订阅状态。


    客户端代码:

    似乎将tcp://-transport-class 上的客户端本地PUBSUB 原型实例化和.connect() 到服务器端接入点(两者都应该将ZMQ_LINGER 设置为0,以便避免无限挂起孤儿(版本相关的默认值没有其他解决方案,但对此有明确的设置)。


    可能的改进:

    • XPUB/XSUBZMQ_XPUB_MANUAL 可以解决通过 SUB-archetype
    • 发送的问题
    • XPUB/SUBZMQ_XPUB_MANUAL 可以解决通过 SUB-archetype 发送的问题,而不太容易伪装通过 .subscribe()-method 发送的消息
    • PUB/SUB 严格通过本地 PUB-archetype 实例制作所有 .send()-s。
    • 如果仍然丢失消息,请明确使用 ZMQ_SNDHWMZMQ_RCVHWM 参数
    • 只有在成功完成{ .bind() + .connect() } 方法后才能明确设置.subscribe()(系统地使用原始zmq_errno()zmq_strerror() 函数来主动检测和修复潜在的碰撞状态)
    • 可以使用.setsockopt( ZMQ_IMMEDIATE, 1 ) 请求仅使用已完成的连接(分布式系统对自主分布式(许多)代理执行的操作顺序零保证)

    【讨论】:

      【解决方案2】:

      您的服务器正在尝试在子套接字上发送

      subSocket.send(outMessage);

      您不能在子套接字上发送。它应该在 pub 套接字上发送。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-10-11
        • 2021-10-22
        • 2021-06-17
        • 2022-01-01
        • 2019-05-17
        • 2015-04-01
        • 2020-03-17
        • 2021-08-01
        相关资源
        最近更新 更多