【问题标题】:Merging historical and live stock price data with Rx将历史和实时股票价格数据与 Rx 合并
【发布时间】:2016-01-20 00:53:38
【问题描述】:

我正在尝试 Rx,因为它似乎很适合我们的领域,但学习曲线让我感到惊讶。

我需要将历史价格数据与实时价格数据结合在一起。

我正在尝试将执行此操作的常用方法改编为 Rx 的语言:

  1. 立即订阅实时价格并开始缓冲我返回的值
  2. 发起对历史价格数据的请求(这需要在订阅实时价格之后进行,这样我们的数据就不会出现任何空白)
  3. 在历史价格回来时发布它们
  4. 在我们收到所有历史数据后,发布缓冲的实时数据,并在开始时删除与我们的历史数据重叠的所有值
  5. 继续回放实时价格馈送中的数据

我有这个令人作呕且不正确的稻草人代码,它似乎适用于我编写的幼稚测试用例:

IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
live.Connect();

IObservable<Tick> historical = historyService.For(since, symbol);

return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw
}

这有一些缺点

  1. 不知道合适的重播缓冲区大小。无法设置无限缓冲区 - 这是一个长时间运行的序列。真的,我们想要某种一次性缓冲区,在第一次调用订阅时刷新。如果这存在于 Rx 中,我找不到它。
  2. 即使我们切换到发布实时价格,回放缓冲区仍将继续存在。此时我们不需要缓冲区。
  3. 同样,一旦我们跳过了历史价格和实时价格之间的初始重叠,就不需要过滤掉重叠报价的谓词了。我真的很想做类似的事情:live.SkipWhile(tick =&gt; tick.Timestamp &lt; /* lazily get last timestamp in historical data */)Wait(this IObservable&lt;TSource&gt;) 在这里有用吗?

必须有更好的方法来做到这一点,但我仍在等待我的大脑像 FP 一样摸索 Rx。

我考虑过解决 1. 的另一个选项是编写我自己的 Rx 扩展,它是一个 ISubject,它将消息排队,直到它获得第一个订阅者(然后拒绝订阅者?)。也许这就是要走的路?

【问题讨论】:

  • Switch() 会在这里工作吗?如:historical.Switch(live)

标签: system.reactive


【解决方案1】:

如果您的历史数据和实时数据都是基于时间或调度器的,也就是说,随着时间的推移,事件流看起来像这样:

|---------------------------------------------------->  time
    h   h   h   h  h  h                                 historical
                l  l  l  l  l  l                        live

您可以使用简单的TakeUntil 构造:

var historicalStream = <fetch historical data>;
var liveStream = <fetch live data>;

var mergedWithoutOverlap = 
     // pull from historical
     historicalStream
       // until we start overlapping with live
       .TakeUntil(liveStream)
       // then continue with live data
       .Concat(liveStream);

如果您一次性获取所有历史数据,例如IEnumerable&lt;T&gt;,您可以结合使用StartWith 和其他逻辑:

var historicalData = <get IEnumerable of tick data>;
var liveData = <get IObservable of tick data>;

var mergedWithOverlap = 
    // the observable is the "long running" feed
    liveData
    // But we'll inject the historical data in front of it
    .StartWith(historicalData)
    // Perform filtering based on your needs
    .Where( .... );

【讨论】:

    【解决方案2】:

    怎么样:

    public static IObservable<T> CombineWithHistory<T, TSelectorResult>(this IObservable<T> live, IObservable<T> history, Func<T, TSelectorResult> selector)
    {
        var replaySubject = new ReplaySubject<T>();
        live.Subscribe(replaySubject);
        return history.Concat(replaySubject).Distinct(selector);
    }
    

    这使用序列 id 和 distinct 来过滤重复项。

    以及相应的测试:

    var testScheduler = new TestScheduler();
    
    var history = testScheduler.CreateColdObservable(
        OnNext(1L, new PriceTick { PriceId = 1 }),
        OnNext(2L, new PriceTick { PriceId = 2 }),
        OnNext(3L, new PriceTick { PriceId = 3 }),
        OnNext(4L, new PriceTick { PriceId = 4 }),
        OnCompleted(new PriceTick(), 5L));
    
    var live = testScheduler.CreateHotObservable(
        OnNext(1L, new PriceTick { PriceId = 3 }),
        OnNext(2L, new PriceTick { PriceId = 4 }),
        OnNext(3L, new PriceTick { PriceId = 5 }),
        OnNext(4L, new PriceTick { PriceId = 6 }),
        OnNext(5L, new PriceTick { PriceId = 7 }),
        OnNext(6L, new PriceTick { PriceId = 8 }),
        OnNext(7L, new PriceTick { PriceId = 9 })
        );
    
    
    live.Subscribe(pt => Console.WriteLine("Live {0}", pt.PriceId));
    history.Subscribe(pt => Console.WriteLine("Hist {0}", pt.PriceId), () => Console.WriteLine("C"));
    
    var combined = live.CombineWithHistory(history, t => t.PriceId);
    
    combined.Subscribe(pt => Console.WriteLine("Combined {0}", pt.PriceId));
    
    testScheduler.AdvanceTo(6L);
    

    如果您执行此测试,combined 会发出 id 为 1 到 8 的价格变动。

    【讨论】:

    • 感谢 Dave,这教会了我一些很好的调度程序新技巧。在我们的例子中使用 Distinct 的问题在于: 1) 因为数据是以一个或多个刻度的块而不是单个值的形式返回的,所以我们必须在调用选择器之前 SelectMany。鉴于我们的数据量和性能要求,这是不可能的。 2) 为了节省内存,我们的刻度具有精确到一秒的时间戳。我们可能在一秒钟内有多个具有相同值的滴答声,因此不可能在没有状态的情况下编写语义正确的选择器函数。
    • 我实际上改进了这个答案,因为我实际上是自己做的,但忘记发布代码。最后,我基本上将滴答声排队,然后在完成历史记录后清除它们。您需要有一些序列 id 以确保您不会丢失任何数据。
    • 另外,我提供的代码不符合您的要求,因为回放主题将保存整个历史记录。
    • 您的性能要求是什么?我已经在高性能场景中使用了它。
    • @DaveHillier 你的解决方案很糟糕而且非常丑陋,和我一起想,你将每个值都存储在ReplaySubject 中,然后你使用Distinct。总之,除其他问题外,对于“实时”订阅的每个对象,您为它们存储了 2 倍的推荐数量(Distinct 在内部使用 HashSet)。
    【解决方案3】:

    为了记录,这就是我最后所做的。我仍然是一名 Rx 学习者,并且在 2.0 版最后一次看到它后返回 .Net。非常感谢所有反馈。

    下面使用的 Ticks 对象可能包含一个或多个刻度值。历史数据服务以多个 Ticks 返回数据。

    public class HistoricalAndLivePriceFeed : IPriceFeed
    {
        private readonly IPriceFeed history;
        private readonly IPriceFeed live;
        private readonly IClock clock;
    
        public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live)
    :            this(history, live, new RealClock())
            {
        }
        public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock)
        {
            this.history = history;
            this.live = live;
            this.clock = clock;
        }
    
        public IObservable<Ticks> For(DateTime since, ISymbol symbol)
        {
            return Observable.Create<Ticks>(observer =>
            {
                var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol));
    
                var definitelyInHistoricalTicks = clock.Now;
                // Sleep to make sure that historical data overlaps our live data
                // If we ever use a data provider with less fresh historical data, we may need to rethink this
                clock.Wait(TimeSpan.FromSeconds(1));
    
                var liveStreamAfterEndOfHistoricalTicks = liveStream
                   .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks)
                   .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1));
    
                var subscription = history.For(since, symbol)
                   .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1))
                   .Concat(liveStreamAfterEndOfHistoricalTicks)
                   .Subscribe(observer);
    
                return liveStream.And(subscription);
            });
        }
    }
    public static class CompositeDisposableExtensions
    {
        public static CompositeDisposable And(this IDisposable disposable, Action action)
        {
            return And(disposable, Disposable.Create(action));
        }
    
        public static CompositeDisposable And(this IDisposable disposable, IDisposable other)
        {
            return new CompositeDisposable(disposable, other);
        }
    }
    

    使用这个 Rx 代码,我仍然不太信任:

    using System;
    using System.Collections.Generic;
    using System.Reactive.Disposables;
    using System.Reactive.Subjects;
    
    namespace My.Rx
    {
        /// <summary>
        /// Buffers values from an underlying observable when no observers are subscribed.
        /// 
        /// On Subscription, any buffered values will be replayed.
        /// 
        /// Only supports one observer for now.
        /// 
        /// Buffer is an ISubject for convenience of implementation but IObserver methods
        /// are hidden. It is not intended that Buffer should be used as an IObserver,
        /// except through StartBuffering() and it is dangerous to do so because none of 
        /// the IObserver methods check whether Buffer has been disposed.
        /// </summary>
        /// <typeparam name="TSource"></typeparam>
        public class Buffer<TSource> : ISubject<TSource>, IDisposable
        {
            private readonly object gate = new object();
            private readonly Queue<TSource> queue = new Queue<TSource>();
    
            private bool isDisposed;
            private Exception error;
            private bool stopped;
            private IObserver<TSource> observer = null;
            private IDisposable subscription;
    
            public static Buffer<TSource> StartBuffering(IObservable<TSource> observable)
            {
                return new Buffer<TSource>(observable);
            }
    
            private Buffer(IObservable<TSource> observable)
            {
                subscription = observable.Subscribe(this);
            }
    
            void IObserver<TSource>.OnNext(TSource value)
            {
                lock (gate)
                {
                    if (stopped) return;
                    if (IsBuffering)
                        queue.Enqueue(value);
                    else
                        observer.OnNext(value);
                }
            }
    
            void IObserver<TSource>.OnError(Exception error)
            {
                lock (gate)
                {
                    if (stopped) return;
                    if (IsBuffering)
                        this.error = error;
                    else
                        observer.OnError(error);
                    stopped = true;
                }
            }
    
            void IObserver<TSource>.OnCompleted()
            {
                lock (gate)
                {
                    stopped = true;
                }
            }
    
            public IDisposable Subscribe(IObserver<TSource> observer)
            {
                lock (gate)
                {
                    if (isDisposed)
                        throw new ObjectDisposedException(string.Empty);
    
                    if (this.observer != null)
                        throw new NotImplementedException("A Buffer can currently only support one observer at a time");
    
                    while(!queue.IsEmpty())
                    {
                        observer.OnNext(queue.Dequeue());
                    }
    
                    if (error != null)
                        observer.OnError(error);
                    else if (stopped)
                        observer.OnCompleted();
    
                    this.observer = observer;
                    return Disposable.Create(() =>
                                                 {
                                                     lock (gate)
                                                     {
                                                                                 // Go back to buffering
                                                         this.observer = null;
                                                     }
                                                 });
                }
            }
    
            private bool IsBuffering
            {
                get { return observer == null; }
            }
    
    
            public void Dispose()
            {
                lock (gate)
                {
                    subscription.Dispose();
    
                    isDisposed = true;
                    subscription = null;
                    observer = null;
                }
            }
        }
    }
    

    哪些通过了这些测试(我还没有费心检查线程安全):

    private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world");
    
    [Test]
    public void ReplaysBufferedValuesToFirstSubscriber()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
        underlying.OnNext(1);
        underlying.OnNext(2);
    
        var observed = new List<int>();
    
        buffer.Subscribe(Observer.Create<int>(observed.Add));
    
        Assert.That(observed, Is.EquivalentTo(new []{1,2}));
    }
    
    [Test]
    public void PassesNewValuesToObserver()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
    
        var observed = new List<int>();
        buffer.Subscribe(Observer.Create<int>(observed.Add));
    
        underlying.OnNext(1);
        underlying.OnNext(2);
    
        Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 }));
    }
    
    
    [Test]
    public void DisposesOfSubscriptions()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
    
        var observed = new List<int>();
    
        buffer.Subscribe(Observer.Create<int>(observed.Add))
            .Dispose();
    
        underlying.OnNext(1);
    
        Assert.That(observed, Is.Empty);
    }
    
    [Test]
    public void StartsBufferingAgainWhenSubscriptionIsDisposed()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
    
        // These should be buffered
        underlying.OnNext(1);
        underlying.OnNext(2);
    
        var firstSubscriptionObserved = new List<int>();
        using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add)))
        {
            // Should be passed through to first subscription
            underlying.OnNext(3);
        }
        Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 }));
    
        // First subscription has been disposed-
        // we should be back to buffering again
        underlying.OnNext(4);
        underlying.OnNext(5);
    
        var secondSubscriptionObserved = new List<int>();
        using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add)))
        {
            // Should be passed through to second subscription
            underlying.OnNext(6);
        }
        Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6}));
    }
    
    [Test]
    public void DoesNotSupportTwoConcurrentObservers()
    {
        // Use .Publish() if you need to do this
    
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
    
        buffer.Subscribe(Observer.Create<int>(i => { }));
    
        Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
    }
    
    [Test]
    public void CannotBeUsedAfterDisposal()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
        buffer.Dispose();
    
        Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
    }
    
    [Test]
    public void ReplaysBufferedError()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
    
        underlying.OnNext(1);
        underlying.OnError(exceptionThrownFromUnderlying);
    
        var observed = new List<int>();
        Exception foundException = null;
        buffer.Subscribe(
            observed.Add, 
            e => foundException = e);
    
        Assert.That(observed, Is.EquivalentTo(new []{1}));
        Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying));
    }
    
    [Test]
    public void ReplaysBufferedCompletion()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
    
        underlying.OnNext(1);
        underlying.OnCompleted();
    
        var observed = new List<int>();
        var completed = false;
        buffer.Subscribe(
            observed.Add,
            () => completed=true);
    
        Assert.That(observed, Is.EquivalentTo(new[] { 1 }));
        Assert.True(completed);
    }
    
    [Test]
    public void ReplaysBufferedErrorToSubsequentObservers()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
    
        underlying.OnNext(1);
        underlying.OnError(exceptionThrownFromUnderlying);
    
        // Drain value queue
        using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ;
    
        var observered = new List<int>();
        Exception exceptionEncountered = null;
        using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e)));
    
        Assert.That(observered, Is.Empty);
        Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying));
    }
    
    [Test]
    public void ReplaysBufferedCompletionToSubsequentObservers()
    {
        var underlying = new Subject<int>();
        var buffer = Buffer<int>.StartBuffering(underlying);
    
        underlying.OnNext(1);
        underlying.OnCompleted();
    
        // Drain value queue
        using (buffer.Subscribe(Observer.Create<int>(i => { }))) ;
    
        var observered = new List<int>();
        var completed = false;
        using (buffer.Subscribe(Observer.Create<int>(observered.Add, ()=>completed=true)));
    
        Assert.That(observered, Is.Empty);
        Assert.True(completed);
    }
    
    
    
    [Test]
    public void DisposingOfBufferDisposesUnderlyingSubscription()
    {
        var underlyingSubscriptionWasDisposed = false;
        var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed=  true   ));
    
        var buffer = Buffer<int>.StartBuffering(underlying);
        buffer.Dispose();
    
        Assert.True(underlyingSubscriptionWasDisposed);
    }
    

    【讨论】:

      【解决方案4】:

      在内存和交易重叠(正确性)方面的便捷方式。
      等待您的反馈:

      var tradeIds = new HashSet<string>();
      var replayQuotationTrades = new ReplaySubject<IntradayTrade>();
      var replaySubscription = _quotationTrades.Subscribe(replayQuotationTrades);
      return _historyTrades
                      .DelaySubscription(TimeSpan.FromMilliseconds(500), _backgroundScheduler)
                      .Do(t => tradeIds.Add(t.TradeId))
                      .Finally(() => DisposeAndCompleteReplayStream(replaySubscription, replayQuotationTrades))
                      .Concat(replayQuotationTrades.Where(t => !tradeIds.Contains(t.TradeId)))
                      .Finally(tradeIds.Clear)
                      .Concat(_quotationTrades)
                      .Subscribe(observer);
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2011-03-23
        • 1970-01-01
        • 1970-01-01
        • 2023-01-13
        • 2020-08-09
        • 1970-01-01
        • 2023-03-10
        相关资源
        最近更新 更多