【问题标题】:Observing incoming websocket messages with Reactive Extensions?使用 Reactive Extensions 观察传入的 websocket 消息?
【发布时间】:2016-08-24 00:50:55
【问题描述】:

我想使用 linq 处理通过 websocket 连接接收到的事件。这是我目前所拥有的:

    private static void Main()
    {
        string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
        using (WebSocket ws = new WebSocket(WsEndpoint))
        {
            ws.OnMessage += Ws_OnMessage;

            ws.Connect();
            Console.ReadKey();
            ws.Close();
        }
    }

    private static void Ws_OnMessage(object sender, MessageEventArgs e)
    {
        Console.WriteLine(e.Data);
    }

第一个困扰我的想法是如何将ws.OnMessage 转换为某种事件流。我在网上找不到任何示例来观察带有响应式扩展的 external 事件源。我打算将消息解析成 json 对象,然后过滤和聚合它们。

有人可以提供一个从 websocket 消息创建 observable 并订阅它的示例吗?


编辑:最终工作代码

与所选答案的唯一区别是我在将 websocket 传递给Observable.Using之前对其进行了初始化

//-------------------------------------------------------
// Create websocket connection
//-------------------------------------------------------
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
WebSocket socket = new WebSocket(wsEndpoint);


//-------------------------------------------------------
// Create an observable by wrapping ws.OnMessage
//-------------------------------------------------------
var globalEventStream = Observable
    .Using(
        () => socket,
        ws =>
            Observable
                .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
                    handler => ws.OnMessage += handler,
                    handler => ws.OnMessage -= handler));
//---------------------------------------------------------
// Subscribe to globalEventStream
//---------------------------------------------------------

IDisposable subscription = globalEventStream.Subscribe(ep =>
{
    Console.WriteLine("Event Recieved");
    Console.WriteLine(ep.EventArgs.Data);
});

//----------------------------------------------------------
// Send message over websocket
//----------------------------------------------------------
socket.Connect();
socket.Send("test message");
// When finished, close the connection.
socket.Close();

【问题讨论】:

  • 你在使用什么WebSocket 库?
  • 我正在使用WebSocketSharp
  • @mooglinux - 为什么你有一个.Publish() 电话?这将阻止所有值,直到您在 observable 上调用 .Connect()。您可能只需删除.Publish()
  • 这是我试图看看发生了什么的事情之一。当然没用,所以我会更新它。

标签: c# linq websocket system.reactive


【解决方案1】:

你应该像这样设置你的 observable:

    var observable =
        Observable
            .Using(
                () => new WebSocket(WsEndpoint),
                ws =>
                    Observable
                        .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
                            handler => ws.OnMessage += handler,
                            handler => ws.OnMessage -= handler));

这将正确创建套接字,然后在订阅 observable 时观察事件。处理订阅后,它将正确地从事件中分离并处理套接字。


observable 的类型将是 IObservable&lt;EventPattern&lt;MessageEventArgs&gt;&gt;。你以这种方式消费这个 observable:

IDisposable subscription = observable.Subscribe(ep =>
{
    Console.WriteLine(ep.EventArgs.Data);
});

感谢您发布 NuGet 参考。

这是工作代码:

const string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";

Console.WriteLine("Defining Observable:");

IObservable<EventPattern<WebSocketSharp.MessageEventArgs>> observable =
    Observable
        .Using(
            () =>
            {
                var ws = new WebSocketSharp.WebSocket(WsEndpoint);
                ws.Connect();
                return ws;
            },
            ws =>
                Observable
                    .FromEventPattern<EventHandler<WebSocketSharp.MessageEventArgs>, WebSocketSharp.MessageEventArgs>(
                        handler => ws.OnMessage += handler,
                        handler => ws.OnMessage -= handler));

Console.WriteLine("Subscribing to Observable:");

IDisposable subscription = observable.Subscribe(ep =>
{
    Console.WriteLine("Event Recieved");
    Console.WriteLine(ep.EventArgs.Data);
});

Console.WriteLine("Writing to Source:");

using (var source = new WebSocketSharp.WebSocket(WsEndpoint))
{
    source.Connect();
    source.Send("test");
}

【讨论】:

  • 与其他答案相比,将 websocket 放入 Observable.Using 有什么好处?
  • 当你释放 observable 的订阅时,socket 也会被释放。
  • 我是否为handler 编写自定义函数?如果是,返回类型是什么?我试图将handler 更改为打印数据并返回void 的函数,但这不起作用。
  • @mooglinux - 没有必要更改 handler - 事实上,如果你这样做,你很可能会破坏代码。我已经添加了如何使用 observable 的答案。
  • 还是不行。 =/ 我检查了,我的原始版本仍在工作,所以这不是网络问题。将用我现在拥有的内容更新我的问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-09-09
  • 2017-09-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多