【发布时间】:2017-07-09 19:00:56
【问题描述】:
我有一个 nodejs 后端在使用 websockets 的角度应用程序和通过 tcp 套接字的 snapserver 之间充当代理。
我正在使用 rxjs 将 nodejs 网络套接字包装为主题。 由于某种原因,我的 tcp 套接字的 'data' 事件在每条消息发送时都会再次调用一次。
经过一些请求,我得到了一个(node:21209) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit
const log = require('./logger').init('SNAPCAST');
const utils = require('./utils');
const Rx = require('rxjs/Rx');
const net = require('net');
const createSubject = () => {
return Rx.Observable.create((observer) => {
const socketConnection = net.connect({port: 1705}, () => {
const socketObservable = Rx.Observable.create((observer) => {
socketConnection.on('data', (data) => {
if(data) {
String(data.toString().trim()).split('\n').forEach((line) => {
if(line) {
observer.next(JSON.parse(line));
}
});
}
});
socketConnection.on('error', (err) => observer.error(err));
socketConnection.on('close', () => observer.complete());
});
const socketObserver = {
next: (data) => {
if (!socketConnection.destroyed) {
socketConnection.write(`${JSON.stringify(data)}\r\n`);
}
}
};
const socket = Rx.Subject.create(socketObserver, socketObservable);
observer.next(socket);
observer.complete();
});
socketConnection.on('error', (err) => {
observer.error(err);
});
});
};
module.exports = () => {
return Rx.Observable.create((observer) => {
createSubject().subscribe((socket) => {
const sendRequest = (message) => {
message.id = message.id || utils.UUID();
message.jsonrpc = message.jsonrpc || '2.0';
socket.next(message);
return socket.filter((response) => {
return response.id === message.id;
}).first();
};
observer.next({
proxyRequest: (request) => {
return sendRequest(request);
}
});
observer.complete();
}, (err) => {
observer.error(err);
});
});
};
【问题讨论】:
-
尝试简化您的问题 // 代码示例,以便更好地改变它的答案。