【问题标题】:Prioritized subscriptions invocation优先订阅调用
【发布时间】:2013-02-22 15:51:10
【问题描述】:

我有一个可观察的消息序列。有一组订阅者可以处理这些消息。每个订阅者都有一个执行优先级。每条消息必须由从当前订阅的订阅者列表中选择的最高优先级订阅者处理一次。订阅者不断地订阅/取消订阅序列,因此我们在构建序列时不知道订阅者的数量和优先级。使用 rx 是一种可能/可行的解决方案吗?

举例说明:

public class Message
{
    public string Value { get; set; }
    public bool IsConsumed { get; set; }
}

var subject = new Subject<Message>();
var sequence = subject.Publish().RefCount();

Action<Message, int> subscriber = (m, priority) =>
{
    if (!m.IsConsumed)
    {
        m.IsConsumed = true;
        Trace.WriteLine(priority);
    }
};

var s2 = sequence.Priority(2).Subscribe(m => subscriber(m, 2));
var s1 = sequence.Priority(1).Subscribe(m => subscriber(m, 1));

subject.OnNext(new Message()); // output: 1

s1.Dispose();
subject.OnNext(new Message()); // output: 2

使该解决方案有效的缺失部分是 Rx 库中不存在的 Priority 方法。

【问题讨论】:

    标签: c# .net system.reactive


    【解决方案1】:

    这是一个非常有趣的问题...

    所以,首先:我不知道有任何内在的 Rx 运算符可以实现类似于您在此 Priority 扩展中想要的“路由”效果。

    也就是说,我今天午饭时在 LINQPad 上玩耍,想出了一个(非常)老套但似乎可行的概念证明:

    首先,你的消息类

    public class Message
    {
        public string Value { get; set; }
        public bool IsConsumed { get; set; }
    }
    

    接下来,扩展方法wrapper-class:

    public static class Ext
    {    
        public static PrioritizedObservable<T> Prioritize<T>(this IObservable<T> source)
        {
            return new PrioritizedObservable<T>(source);
        }
    }
    

    这是什么PrioritizedObservable&lt;T&gt;

    public class PrioritizedObservable<T> 
       : IObservable<T>, IObserver<T>, IDisposable
    {
        private IObservable<T> _source;
        private ISubject<T,T> _intermediary;
        private IList<Tuple<int, Subject<T>>> _router;
    
        public PrioritizedObservable(IObservable<T> source)
        {
            // Make sure we don't accidentally duplicate subscriptions
            // to the underlying source
            _source = source.Publish().RefCount();
    
            // A proxy from the source to our internal router
            _intermediary = Subject.Create(this, _source);
            _source.Subscribe(_intermediary);        
    
            // Holds per-priority subjects
            _router = new List<Tuple<int, Subject<T>>>();
        }
    
        public void Dispose()
        {
            _intermediary = null;
            foreach(var entry in _router)
            {
                entry.Item2.Dispose();
            }
            _router.Clear();
        }
    
        private ISubject<T,T> GetFirstListener()
        {
            // Fetch the first subject in our router
            // ordered by priority 
            return _router.OrderBy(tup => tup.Item1)
                .Select(tup => tup.Item2)
                .FirstOrDefault();
        }
    
        void IObserver<T>.OnNext(T value)
        {
            // pass along value to first in line
            var nextListener = GetFirstListener();
            if(nextListener != null)
                nextListener.OnNext(value);
        }
    
        void IObserver<T>.OnError(Exception error)
        {
            // pass along error to first in line
            var nextListener = GetFirstListener();
            if(nextListener != null)
                nextListener.OnError(error);
        }
    
        void IObserver<T>.OnCompleted()
        {
            var nextListener = GetFirstListener();
            if(nextListener != null)
                nextListener.OnCompleted();
        }
    
        public IDisposable Subscribe(IObserver<T> obs)
        {
            return PrioritySubscribe(1, obs);
        }
    
        public IDisposable PrioritySubscribe(int priority, IObserver<T> obs)
        {
            var sub = new Subject<T>();
            var subscriber = sub.Subscribe(obs);
            var entry = Tuple.Create(priority, sub);
            _router.Add(entry);
            _intermediary.Subscribe(sub);
            return Disposable.Create(() => 
            {
                subscriber.Dispose();
                _router.Remove(entry);
            });
        }
    }
    

    还有一个测试工具:

    void Main()
    {
        var subject = new Subject<Message>();
        var sequence = subject.Publish().RefCount().Prioritize();
    
        Action<Message, int> subscriber = (m, priority) =>
        {
            if (!m.IsConsumed)
            {
                m.IsConsumed = true;
                Console.WriteLine(priority);
            }
        };
    
        var s3 = sequence.PrioritySubscribe(3, Observer.Create<Message>(m => subscriber(m, 3)));
        var s2 = sequence.PrioritySubscribe(2, Observer.Create<Message>(m => subscriber(m, 2)));
        var s1 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1)));
        var s11 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1)));
    
        subject.OnNext(new Message()); // output: 1
    
        s1.Dispose();
        subject.OnNext(new Message()); // output: 1
        s11.Dispose();
    
        subject.OnNext(new Message()); // output: 2
        s2.Dispose();
        subject.OnNext(new Message()); // output: 3
    
        sequence.Dispose();
    
    }
    

    【讨论】:

    • 你真是个天才!非常感谢!
    猜你喜欢
    • 1970-01-01
    • 2016-08-20
    • 2020-05-30
    • 2019-06-17
    • 1970-01-01
    • 1970-01-01
    • 2021-02-18
    • 2013-09-09
    • 1970-01-01
    相关资源
    最近更新 更多