【问题标题】:Rabbit MQ handle cancellation tokenRabbit MQ 处理取消令牌
【发布时间】:2017-02-06 17:03:48
【问题描述】:

我正在关注这篇文章 (http://www.jarloo.com/listening-to-rabbitmq-events/) 以使用控制台应用程序上的消息,目前我担心当用户按下 CTRL + C 并退出应用程序时会发生什么。

至少我希望它在退出程序之前完成当前消息的处理并确认。我很困惑如何实现此代码,因为我是 RabbitMQ 的新手。

我了解 channel.BasicConsume(queueName, true, consumer);正在阻塞线程。

任何帮助将不胜感激。

【问题讨论】:

  • 如果在用户按下 ctrl + c 时我没有确认消息,我是否不会处理相同的消息两次。

标签: c# .net rabbitmq


【解决方案1】:

这是我设法实现的目标,但不确定这是最好的方法还是有改进。我没有使用cancellationToken,还不确定我们是否可以使用它。

在我的控制台上,我得到了预期的结果(见截图)

public abstract class QueueConnection : IDisposable
{
    internal IConnection _connection;
    internal IModel _model;
    internal IBasicProperties _properties;
    internal RabbitMQSettings _settings;
    internal protected object _modelLock = new Object();

    public QueueConnection(IOptions<RabbitMQSettings> queueConfig)
    {
        _settings = queueConfig.Value;
    }

    internal bool CreateModel(string queueName)
    {
        if (string.IsNullOrEmpty(queueName))
        {
            throw new ArgumentException("The queue name has to be specified before.");
        }

        lock (_modelLock)
        {
            if (!IsConnected) Connect();
            if (_model == null || _model.IsClosed)
            {
                _model = _connection.CreateModel();

                // When AutoClose is true, the last channel to close will also cause the connection to close. 
                // If it is set to true before any channel is created, the connection will close then and there.
                _connection.AutoClose = true;

                // Configure the Quality of service for the model. Below is how what each setting means.
                // BasicQos(0="Dont send me a new message untill I’ve finshed",  1= "Send me one message at a time", false ="Apply to this Model only")
                _model.BasicQos(0, 50, false);

                const bool durable = true, queueAutoDelete = false, exclusive = false;

                _model.QueueDeclare(queueName, durable, exclusive, queueAutoDelete, null);
                _properties = RabbitMQProperties.CreateDefaultProperties(_model);
            }
        }

        return true;
    }

    public void Connect()
    {
        var connectionFactory = new ConnectionFactory
        {
            HostName = _settings.HostName,
            UserName = _settings.UserName,
            Password = _settings.Password,
        };


        if (_settings.Port.HasValue) connectionFactory.Port = _settings.Port.Value;
        if (_settings.Heartbeat.HasValue) connectionFactory.RequestedHeartbeat = _settings.Heartbeat.Value;
        if (!string.IsNullOrEmpty(_settings.VirtualHost)) connectionFactory.VirtualHost = _settings.VirtualHost;


        _connection = connectionFactory.CreateConnection();
    }

    public bool IsConnected
    {
        get { return _connection != null && _connection.IsOpen; }
    }

    public object GetConnection()
    {
        return _connection;
    }

    public void Disconnect()
    {
        if (_connection != null) _connection.Dispose();
    }

    void IDisposable.Dispose()
    {
        Disconnect();
    }
}

队列消费者类

public class QueueConsumer : QueueConnection, IQueueConsumer
{
    private EventingBasicConsumer consumer;
    public QueueConsumer(IOptions<RabbitMQSettings> queueConfig)
        :base(queueConfig) {}

    public void ReadFromQueue(Action<string, ulong> onDequeue, Action<Exception, ulong> onError)
    {
        ReadFromQueue(onDequeue, onError, _settings.QueueName);
    }

    public void ReadFromQueue(Action<string, ulong> onDequeue, Action<Exception, ulong> onError, string queueName)
    {

        CreateModel(queueName);

        consumer = new EventingBasicConsumer(_model);

        // Receive the messages
        consumer.Received += (o, e) =>
        {
            try
            {
                var queueMessage = Encoding.UTF8.GetString(e.Body);
                onDequeue.Invoke(queueMessage, e.DeliveryTag);
            }
            catch (Exception ex)
            {
                onError.Invoke(ex, e.DeliveryTag);
            }
        };

        // if the consumer shutdown reconnects to rabbitmq and begin reading from the queue again.
        consumer.Shutdown += (o, e) =>
        {
            CreateModel(queueName);
            ReadFromQueue(onDequeue, onError, queueName);
        };

        _model.BasicConsume(queueName, false, consumer);

    }

    public void AcknowledgeMessage(ulong deliveryTag)
    {
        if (!IsConnected) Connect();
        CreateModel(_settings.QueueName);
        _model.BasicAck(deliveryTag, false);
    }

    public void StopListening()
    {
        _model.BasicCancel(consumer.ConsumerTag);
    }
}

主类

    static ManualResetEvent _quitEvent = new ManualResetEvent(false);


    public static void Main(string[] args)
    {

        IServiceCollection services = new ServiceCollection();
        ConfigureServices(services);

        var serviceProvider = services.BuildServiceProvider();

        Console.WriteLine($"[{DateTime.UtcNow.ToString("dd/MM/yyyy HH:mm:ss")}] -> Worker role started");

        var listener = serviceProvider.GetService<IMessageProcessor>();

        Console.CancelKeyPress += (sender, eArgs) =>
        {
            listener.OnStop();

            Console.WriteLine($"[{DateTime.UtcNow.ToString("dd/MM/yyyy HH:mm:ss")}] -> Worker role finished");
            _quitEvent.Set();
            eArgs.Cancel = true;
        };

        _quitEvent.WaitOne();
    }

    private static IConfigurationRoot GetConfiguration()
    {
        // Build appsetting.json configuration
        var environment = Environment.GetEnvironmentVariable("Environment");

        return new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
            .AddJsonFile($"appsettings.{environment}.json", optional: true)
            .AddEnvironmentVariables().Build();
    }

    private static void ConfigureServices(IServiceCollection services)
    {
        IConfigurationRoot configuration = GetConfiguration();
        services.AddSingleton<IConfigurationRoot>(configuration);

        // Support typed options
        services.AddOptions();

        services.Configure<RabbitMQSettings>(configuration.GetSection("RabbitMQConfig"));

        services.AddSingleton<IQueueConsumer, QueueConsumer>();
        services.AddScoped<IMessageProcessor, MessageProcessor>();

    }
}

【讨论】:

    【解决方案2】:

    对于您试图解决的问题,我唯一能想到的就是将其视为交易。只有当您收到消息时,对其进行完全处理并发送回确认,才认为交易已完成。

    如果您首先处理消息并且有人在 Ack 之前终止了应用程序,它将再次排队,当我们重新启动应用程序时,它将再次得到处理。

    如果您先确认然后尝试处理消息并且有人终止应用程序,您将丢失消息。

    所以看起来像是考虑整个过程,因为事务会使其工作,或者另一个选项是照顾再次处理相同的消息。

    【讨论】:

    • 问题是当我处理消息时,我需要调用外部服务来更新内容,以便有人终止应用程序我想保持终止并完成该消息的处理,即优雅地终止。
    猜你喜欢
    • 1970-01-01
    • 2016-11-02
    • 1970-01-01
    • 1970-01-01
    • 2018-07-30
    • 2023-01-31
    • 2018-03-02
    • 2016-10-25
    • 1970-01-01
    相关资源
    最近更新 更多