【发布时间】:2018-04-02 22:48:53
【问题描述】:
我正在使用 Rx 框架编写消息侦听器。
我面临的问题是,我正在使用的库使用了一个消费者,该消费者在消息到达时发布事件。
我已经设法通过Observable.FromEventPattern 使用传入的消息,但我对服务器中已经存在的消息有疑问。
目前我有以下命令链
- 创建消费者
- 使用
FromEventPattern创建一个可观察序列并应用所需的转换 - 告诉消费者开始
- 订阅序列
最简单的解决方案是交换步骤 3 和 4。但由于它们发生在系统的不同组件中,我很难这样做。
理想情况下,我希望在第 4 步发生时执行第 3 步(例如 OnSubscribe 方法)。
感谢您的帮助:)
PS:添加更多细节,事件来自 RabbitMQ 队列,我使用的是 RabbitMQ.Client 包中的 EventingBasicConsumer 类。
Here 你可以找到我正在开发的库。具体来说,this 是给我带来问题的类/方法。
编辑
这是有问题的代码的剥离版本
void Main()
{
var engine = new Engine();
var messages = engine.Start();
messages.Subscribe(m => m.Dump());
Console.ReadLine();
engine.Stop();
}
public class Engine
{
IConnection _connection;
IModel _channel;
public IObservable<Message> Start()
{
var connectionFactory = new ConnectionFactory();
_connection = connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
var observable = Observable.FromEventPattern<BasicDeliverEventArgs>(
a => consumer.Received += a,
a => consumer.Received -= a)
.Select(e => e.EventArgs);
_channel.BasicConsume("a_queue", false, consumer);
return observable.Select(Transform);
}
private Message Transform(BasicDeliverEventArgs args) => new Message();
public void Stop()
{
_channel.Dispose();
_connection.Dispose();
}
}
public class Message { }
我遇到的症状是,由于我在订阅序列之前调用了 BasicConsume,所以 RabbitMQ 队列中的任何消息都会被获取但不会沿管道传递。
由于我没有启用“autoack”,因此程序一停止,消息就会返回到队列中。
【问题讨论】:
-
您的意思是您因为订阅太晚而错过了一些消息?
-
没错!我应该解决这个问题吗?
-
这部分似乎很清楚,但正如您自己所说的那样 - 最合理的做法是在开始之前订阅。为什么你不能在控制所有代码的同时做到这一点对我来说不是很清楚。也许只是将可选订阅回调传递给
Start?您也可以使用Replay(),但它会缓存所有消息,这可能不是一个好主意(取决于这些消息的数量)。 -
我没有想到回调,但我想知道是否有解决问题的Rx方法。至于拆分,Host 和 Engine 职责不同,混在一起不好。
-
“重播”是不行的。我正在查看“发布”,但我不确定我是否了解它的工作原理以及应该在哪里使用它。
标签: c# .net rabbitmq system.reactive