【问题标题】:Turn a net socket into an observable将网络套接字变成可观察的
【发布时间】:2017-03-12 17:59:29
【问题描述】:

我有一个 net tcp 套接字 node.js 部分代码,我想将其从使用 callback 转换为 rx

我在feed.js 模块中看起来像这样:

var net = require('net');
var server = net.createServer(function(socket) {
...

    // Handle incoming messages from clients.
    socket.on('data', function (data) {
        broadcast(data, socket);
    });

...
}

function broadcast(message, sender)
{  
   ...
   onChangeHandler(stock.symbol, 'stock', stock);
}
}

function start(onChange) {
    onChangeHandler = onChange;
}

exports.start = start;
server.listen(....);

然后上面调用的客户端注册一个回调:

feed.start(function(room, type, message) {
   //...Do something with the message
});

我想将其转换为使用 Rx Observable/Observer。我看到有一种方法可以从web socket 创建一个可观察的流(尽管它使用我不需要的双向Subject):

fromWebSocket(address, protocol) {
    var ws = new WebSocket(address, protocol);

    // Handle the data
    var osbervable = Rx.Observable.create (function (obs) {
        // Handle messages  
        ws.onmessage = obs.onNext.bind(obs);
        ws.onerror = obs.onError.bind(obs);
        ws.onclose = obs.onCompleted.bind(obs);

        // Return way to unsubscribe
        return ws.close.bind(ws);
    });

    var observer = Rx.Observer.create(function (data) {
        if (ws.readyState === WebSocket.OPEN) { ws.send(data); }
    });

    return Rx.Subject.create(observer, observable);
}

var socketSubject = fromWebSocket('ws://localhost:9999', 'sampleProtocol');

// Receive data
socketSubject.subscribe(
    function (data) {
        // Do something with the data
    },
    function (error) {
        // Do something with the error
    },
    function () {
        // Do something on completion
    });

// Send data
socketSubject.onNext(42);

net 套接字的等价物是什么?如果有标准库可以使用。

我最初的尝试是这样,但我不知道如何将Rx 和套接字函数绑定到onnext

var net = require('net');

fromNetSocket(address, protocol) {
    var ns = net.createServer(function(socket) {


        socket.on('disconnect', function () { // This seems like it maps to onclose

            console.log('User disconnected. %s. Socket id %s', socket.id);
        });

        // Handle incoming messages from clients.
        socket.on('data', function (data) { //this should map to onnext

        });

        // Handle the data
        var osbervable = Rx.Observable.create (function (obs) {
            // Handle messages  
            ns.onmessage = obs.onNext.bind(obs);
            ns.onerror = obs.onError.bind(obs);
            ns.onclose = obs.onCompleted.bind(obs);

            // Return way to unsubscribe
            return ns.close.bind(ns);
        });        
    });
};  

【问题讨论】:

    标签: node.js rxjs


    【解决方案1】:

    试试下面的

    const createSubject = () => {
    
      return Rx.Observable.create((observer) => {
    
        const socket = net.connect({port: 1705}, () => {
    
          log.i('Connected to Server!');
    
          let socketObservable = Rx.Observable.create((observer) => {
            socket.on('data', (data) => observer.next(JSON.parse(data)));
            socket.on('error', (err) => observer.error(err));
            socket.on('close', () => observer.complete());
          });
    
          let socketObserver = {
            next: (data) => {
              if (!socket.destroyed) {
                socket.write(`${JSON.stringify(data)}\r\n`);
              }
            }
          };
    
          const subject = Rx.Subject.create(socketObserver, socketObservable);
          observer.next(subject);
          observer.complete();
    
        });
    
      });
    
    };
    

    那你就可以这样用主语了

    createSubject().subscribe((con) => {
    
      con.subscribe((data) => console.log(data));
    
      con.next({
        id: utils.UUID(),
        jsonrpc: '2.0',
        method: 'Server.GetRPCVersion'
      });
    
    });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-12-01
      • 2022-06-01
      • 2020-02-25
      • 1970-01-01
      • 2013-03-18
      • 1970-01-01
      相关资源
      最近更新 更多