【问题标题】:Nodejs net multiple data eventsNodejs 网络多个数据事件
【发布时间】:2017-07-09 19:00:56
【问题描述】:

我有一个 nodejs 后端在使用 websockets 的角度应用程序和通过 tcp 套接字的 snapserver 之间充当代理。

我正在使用 rxjs 将 nodejs 网络套接字包装为主题。 由于某种原因,我的 tcp 套接字的 'data' 事件在每条消息发送时都会再次调用一次。

经过一些请求,我得到了一个(node:21209) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit

const log = require('./logger').init('SNAPCAST');
const utils = require('./utils');
const Rx = require('rxjs/Rx');
const net = require('net');

const createSubject = () => {

  return Rx.Observable.create((observer) => {

    const socketConnection = net.connect({port: 1705}, () => {

      const socketObservable = Rx.Observable.create((observer) => {
        socketConnection.on('data', (data) => {

          if(data) {
            String(data.toString().trim()).split('\n').forEach((line) => {

              if(line) {
                observer.next(JSON.parse(line));
              }

            });
          }

        });
        socketConnection.on('error', (err) => observer.error(err));
        socketConnection.on('close', () => observer.complete());
      });

      const socketObserver = {
        next: (data) => {
          if (!socketConnection.destroyed) {
            socketConnection.write(`${JSON.stringify(data)}\r\n`);
          }
        }
      };

      const socket = Rx.Subject.create(socketObserver, socketObservable);

      observer.next(socket);
      observer.complete();

    });

    socketConnection.on('error', (err) => {
      observer.error(err);
    });

  });

};

module.exports = () => {

  return Rx.Observable.create((observer) => {

    createSubject().subscribe((socket) => {

      const sendRequest = (message) => {

        message.id = message.id || utils.UUID();
        message.jsonrpc = message.jsonrpc || '2.0';

        socket.next(message);

        return socket.filter((response) => {
          return response.id === message.id;
        }).first();

      };

      observer.next({
        proxyRequest: (request) => {
          return sendRequest(request);
        }
      });

      observer.complete();

    }, (err) => {
      observer.error(err);
    });

  });


};

【问题讨论】:

  • 尝试简化您的问题 // 代码示例,以便更好地改变它的答案。

标签: node.js rxjs


【解决方案1】:

泄漏事件发射器可能是由于未删除侦听器而导致的。您的代码示例中似乎就是这种情况。我建议你修改你的代码如下:

Observable.create(obs => {
  function handleSocketData = data => {};

  socketConnection.addEventListener('data', handleSocketData);

  //unsubscribe function
  return () => {
    socketConnection.removeEventListener('data', handleSocketData);    
  }
});

这将导致您的 eventListeners 在 observable 完成/错误/取消订阅时被删除。

使用Observable.fromEvent(socketConnection, 'data') 可以进一步简化上述代码,该Observable.fromEvent(socketConnection, 'data') 抽象出添加/删除事件侦听器逻辑。

【讨论】:

  • 谢谢,我会试试的。我的猜测是 sendRequest 函数会导致 .filter() 出现问题,并添加 .first() 以在第一次匹配后自动取消订阅。
【解决方案2】:

这就是 TCP 的工作原理,它是一个数据流。因此,应用程序级消息可以通过一个或多个'data' 事件到达。您必须准备好应对任何一种情况。

【讨论】:

  • 我知道 tcp 块可以以多个部分的形式到达,因为一帧不适合。那不是这里的问题。我的数据事件被整个数据触发多次。 json 适合一帧。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-06-23
相关资源
最近更新 更多