【发布时间】:2016-08-24 00:50:55
【问题描述】:
我想使用 linq 处理通过 websocket 连接接收到的事件。这是我目前所拥有的:
private static void Main()
{
string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
using (WebSocket ws = new WebSocket(WsEndpoint))
{
ws.OnMessage += Ws_OnMessage;
ws.Connect();
Console.ReadKey();
ws.Close();
}
}
private static void Ws_OnMessage(object sender, MessageEventArgs e)
{
Console.WriteLine(e.Data);
}
第一个困扰我的想法是如何将ws.OnMessage 转换为某种事件流。我在网上找不到任何示例来观察带有响应式扩展的 external 事件源。我打算将消息解析成 json 对象,然后过滤和聚合它们。
有人可以提供一个从 websocket 消息创建 observable 并订阅它的示例吗?
编辑:最终工作代码
与所选答案的唯一区别是我在将 websocket 传递给Observable.Using之前对其进行了初始化
//-------------------------------------------------------
// Create websocket connection
//-------------------------------------------------------
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
WebSocket socket = new WebSocket(wsEndpoint);
//-------------------------------------------------------
// Create an observable by wrapping ws.OnMessage
//-------------------------------------------------------
var globalEventStream = Observable
.Using(
() => socket,
ws =>
Observable
.FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
handler => ws.OnMessage += handler,
handler => ws.OnMessage -= handler));
//---------------------------------------------------------
// Subscribe to globalEventStream
//---------------------------------------------------------
IDisposable subscription = globalEventStream.Subscribe(ep =>
{
Console.WriteLine("Event Recieved");
Console.WriteLine(ep.EventArgs.Data);
});
//----------------------------------------------------------
// Send message over websocket
//----------------------------------------------------------
socket.Connect();
socket.Send("test message");
// When finished, close the connection.
socket.Close();
【问题讨论】:
-
你在使用什么
WebSocket库? -
我正在使用
WebSocketSharp -
@mooglinux - 为什么你有一个
.Publish()电话?这将阻止所有值,直到您在 observable 上调用.Connect()。您可能只需删除.Publish()。 -
这是我试图看看发生了什么的事情之一。当然没用,所以我会更新它。
标签: c# linq websocket system.reactive