【问题标题】:How to use Rx to deliver events on a schedule?如何使用 Rx 按计划交付事件?
【发布时间】:2014-05-29 01:09:00
【问题描述】:

我正在使用 C# 的响应式扩展。我想要几个线程在 ConcurrentQueue 上排队项目。然后我想订阅该队列,但每 1 秒只能获得 1 个元素。这answer 几乎可以工作,但当我向队列中添加更多元素时就不行了。

给定一个整数队列:[1, 2, 3, 4, 5, 6]。我希望 Subscribe(Console.WriteLine) 每秒打印一个值。当 Rx 打印这些数字时,我想将来自另一个线程的更多整数添加到队列中。有什么想法吗?

【问题讨论】:

    标签: c# multithreading system.reactive


    【解决方案1】:

    要使输入流的输出速度不超过Timespan 间隔所描述的速度,请使用以下命令:

    var paced = input.Select(i => Observable.Empty<T>()
                                            .Delay(interval)
                                            .StartWith(i)).Concat();
    

    请参阅here 了解说明。这是一个为快速出列的并发队列量身定制的示例实现。请注意,使用IEnumerable&lt;T&gt;ToObservable 扩展将ConcurrentQueue&lt;T&gt; 直接转换为可观察对象是错误的,因为遗憾的是,一旦队列为空,此可观察对象就会完成。令人讨厌的是——至少在我看来——ConcurrentQueue&lt;T&gt; 上没有异步出队,所以我不得不引入一个轮询机制。其他抽象(例如BlockingCollection&lt;T&gt;)可能会更好地为您服务!

    public static class ObservableExtensions
    {
        public static IObservable<T> Pace<T>(this ConcurrentQueue<T> queue,
                                             TimeSpan interval)
        {
            var source = Observable.Create<T>(async (o, ct) => {
    
                while(!ct.IsCancellationRequested)
                {
                    T next; 
                    while(queue.TryDequeue(out next))
                        o.OnNext(next);
    
                    // You might want to use some arbitrary shorter interval here
                    // to allow the stream to resume after a long delay in source
                    // events more promptly    
                    await Task.Delay(interval, ct);
                }   
    
                ct.ThrowIfCancellationRequested();
            });
    
            // this does the pacing
            return source.Select(i => Observable.Empty<T>()
                         .Delay(interval)
                         .StartWith(i)).Concat()
                         .Publish().RefCount(); // to allow multiple subscribers    
        }
    }
    

    示例用法:

    public static void Main()
    {
        var queue = new ConcurrentQueue<int>();
    
        var stopwatch = new Stopwatch();
    
        queue.Pace(TimeSpan.FromSeconds(1))
            .Subscribe(
                x => Console.WriteLine(stopwatch.ElapsedMilliseconds + ": x" + x),
                e => Console.WriteLine(e.Message),
                () => Console.WriteLine("Done"));
    
        stopwatch.Start();
        queue.Enqueue(1);
        queue.Enqueue(2);
        Thread.Sleep(500);
        queue.Enqueue(3);
        Thread.Sleep(5000);
        queue.Enqueue(4);
        queue.Enqueue(5);
        queue.Enqueue(6);
    
        Console.ReadLine();
    
    }
    

    【讨论】:

    • 此解决方案在多次订阅时会出现问题。比赛条件。
    • 是的,它会的,完全正确 - 我应该指出这一点。虽然仍然可以滥用对 Pace 的多次调用,但我添加了 Publish().RefCount() 以提供适当的保护。
    • 有效!谢谢。这可以推广到扩展 Observables,而不是专门针对 ConcurrentQueue 吗?
    • 当然,通用版是第一个代码sn -p - input是任意IObservable&lt;T&gt;
    【解决方案2】:

    您可能会对Observable.Buffer 过载之一感到满意。但请考虑不要对长时间运行的订阅使用缓冲,因为缓冲的元素会对您的 RAM 造成压力。

    您还可以使用Observable.Generate 构建您自己的具有任何所需行为的扩展方法

    void Main()
    {
        var queue = new ConcurrentQueue<int>();
        queue.Enqueue(1);
        queue.Enqueue(2);
        queue.Enqueue(3);
        queue.Enqueue(4);
        queue.ObserveEach(TimeSpan.FromSeconds(1)).DumpLive("queue");
    }
    
    // Define other methods and classes here
    public static class Ex {
        public static IObservable<T> ObserveConcurrentQueue<T>(this ConcurrentQueue<T> queue, TimeSpan period) 
        {
            return Observable
                .Generate(
                    queue, 
                    x => true,
                    x => x, 
                    x => x.DequeueOrDefault(), 
                    x => period)
                .Where(x => !x.Equals(default(T)));
        }
    
        public static T DequeueOrDefault<T>(this ConcurrentQueue<T> queue)
        {
            T result;
            if (queue.TryDequeue(out result))
                return result;
            else
                return default(T);
        }
    }
    

    【讨论】:

    • 如果我的输入到达的时间小于缓冲区的时间跨度,缓冲区就会起作用。如果我的输入更快,那么我将一次得到一批项目。这不是我们想要的行为。
    • 谢谢!这行得通。当我使用 T=int 进行测试时,Where 子句全部删除了 0。
    猜你喜欢
    • 2023-02-19
    • 2011-03-13
    • 2016-08-12
    • 2022-06-29
    • 1970-01-01
    • 2021-11-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多