【问题标题】:System.Reactive - Disposing unknown amount of subscriptions at onceSystem.Reactive - 一次处理未知数量的订阅
【发布时间】:2022-01-08 18:03:02
【问题描述】:

我有未知数量的订阅想要立即处理,因为它们可能会变得很多。有没有使用System.Reactive 一次性处理它们的机制?也许,将它们包装到 Observable.Using(() => Disposable.Create... 中会起作用吗?

client.Streams.PongStream.Subscribe(x =>
    Log.Information($"Pong received ({x.Message})"));

client.Streams.FundingStream.Subscribe(response =>
{
    var funding = response.Data;
    Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
                    $"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
                    $"index price {funding.IndexPrice}");
});

client.Streams.AggregateTradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
                    $"price: {trade.Price} size: {trade.Quantity}");
});

client.Streams.TradesStream.Subscribe(response =>
{
    var trade = response.Data;
    Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
                    $"price: {trade.Price} size: {trade.Quantity}");
});

client.Streams.OrderBookPartialStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book snapshot [{ob.Symbol}] " +
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
    Task.Delay(500).Wait();
    //OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
});

client.Streams.OrderBookDiffStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Order book diff [{ob.Symbol}] " +
                    $"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
                    $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
});

client.Streams.BookTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Book ticker [{ob.Symbol}] " +
                    $"Best ask price: {ob.BestAskPrice} " +
                    $"Best ask qty: {ob.BestAskQty} " +
                    $"Best bid price: {ob.BestBidPrice} " +
                    $"Best bid qty: {ob.BestBidQty}");
});

client.Streams.KlineStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Kline [{ob.Symbol}] " +
                    $"Kline start time: {ob.Data.StartTime} " +
                    $"Kline close time: {ob.Data.CloseTime} " +
                    $"Interval: {ob.Data.Interval} " +
                    $"First trade ID: {ob.Data.FirstTradeId} " +
                    $"Last trade ID: {ob.Data.LastTradeId} " +
                    $"Open price: {ob.Data.OpenPrice} " +
                    $"Close price: {ob.Data.ClosePrice} " +
                    $"High price: {ob.Data.HighPrice} " +
                    $"Low price: {ob.Data.LowPrice} " +
                    $"Base asset volume: {ob.Data.BaseAssetVolume} " +
                    $"Number of trades: {ob.Data.NumberTrades} " +
                    $"Is this kline closed?: {ob.Data.IsClose} " +
                    $"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
                    $"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
                    $"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
                    $"Ignore: {ob.Data.Ignore} ");
});

client.Streams.MiniTickerStream.Subscribe(response =>
{
    var ob = response.Data;
    Log.Information($"Mini-ticker [{ob.Symbol}] " +
                    $"Open price: {ob.OpenPrice} " +
                    $"Close price: {ob.ClosePrice} " +
                    $"High price: {ob.HighPrice} " +
                    $"Low price: {ob.LowPrice} " +
                    $"Base asset volume: {ob.BaseAssetVolume} " +
                    $"Quote asset volume: {ob.QuoteAssetVolume}");
});

这是这些订阅的实际情况。

public class BinanceClientStreams
{
    internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();

    internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
    internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();

    internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
        new Subject<OrderBookPartialResponse>();

    internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
    internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();

    internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
    
    internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
    
    internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
    
    // PUBLIC

    /// <summary>
    /// Response stream to every ping request
    /// </summary>
    public IObservable<PongResponse> PongStream => PongSubject.AsObservable();

    /// <summary>
    /// Trades stream - emits every executed trade on Binance
    /// </summary>
    public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();

    /// <summary>
    /// Chunk of trades - emits grouped trades together
    /// </summary>
    public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();

    /// <summary>
    /// Partial order book stream - emits small snapshot of the order book
    /// </summary>
    public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();

    /// <summary>
    /// Order book difference stream - emits small snapshot of the order book
    /// </summary>
    public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();

    /// <summary>
    /// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
    /// </summary>
    public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();

    /// <summary>
    ///  The best bid or ask's price or quantity in real-time for a specified symbol
    /// </summary>
    public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();

    /// <summary>
    /// The Kline/Candlestick subscription, provide symbol and chart intervals
    /// </summary>
    public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();

    /// <summary>
    /// Mini-ticker specified symbol statistics for the previous 24hrs
    /// </summary>
    public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();
}

【问题讨论】:

    标签: c# .net-core system.reactive


    【解决方案1】:

    我认为您正在寻找的是CompositeDisposable。您需要创建该类的一个实例并将所有订阅添加到它。

    var compDisp = new CompositeDisposable();
    
    compDisp.Add(client.Streams.PongStream.Subscribe(x =>
        Log.Information($"Pong received ({x.Message})")));
    
    compDisp.Add(client.Streams.FundingStream.Subscribe(response =>
    {
        var funding = response.Data;
        Log.Information($"Funding: [{funding.Symbol}] rate:[{funding.FundingRate}] " +
                        $"mark price: {funding.MarkPrice} next funding: {funding.NextFundingTime} " +
                        $"index price {funding.IndexPrice}");
    }));
    
    compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>
    {
        var trade = response.Data;
        Log.Information($"Trade aggreg [{trade.Symbol}] [{trade.Side}] " +
                        $"price: {trade.Price} size: {trade.Quantity}");
    }));
    
    compDisp.Add(client.Streams.TradesStream.Subscribe(response =>
    {
        var trade = response.Data;
        Log.Information($"Trade normal [{trade.Symbol}] [{trade.Side}] " +
                        $"price: {trade.Price} size: {trade.Quantity}");
    }));
    
    compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>
    {
        var ob = response.Data;
        Log.Information($"Order book snapshot [{ob.Symbol}] " +
                        $"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
                        $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
        Task.Delay(500).Wait();
        //OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
    }));
    
    compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>
    {
        var ob = response.Data;
        Log.Information($"Order book diff [{ob.Symbol}] " +
                        $"bid: {ob.Bids.FirstOrDefault()?.Price:F} " +
                        $"ask: {ob.Asks.FirstOrDefault()?.Price:F}");
    }));
    
    compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>
    {
        var ob = response.Data;
        Log.Information($"Book ticker [{ob.Symbol}] " +
                        $"Best ask price: {ob.BestAskPrice} " +
                        $"Best ask qty: {ob.BestAskQty} " +
                        $"Best bid price: {ob.BestBidPrice} " +
                        $"Best bid qty: {ob.BestBidQty}");
    }));
    
    compDisp.Add(client.Streams.KlineStream.Subscribe(response =>
    {
        var ob = response.Data;
        Log.Information($"Kline [{ob.Symbol}] " +
                        $"Kline start time: {ob.Data.StartTime} " +
                        $"Kline close time: {ob.Data.CloseTime} " +
                        $"Interval: {ob.Data.Interval} " +
                        $"First trade ID: {ob.Data.FirstTradeId} " +
                        $"Last trade ID: {ob.Data.LastTradeId} " +
                        $"Open price: {ob.Data.OpenPrice} " +
                        $"Close price: {ob.Data.ClosePrice} " +
                        $"High price: {ob.Data.HighPrice} " +
                        $"Low price: {ob.Data.LowPrice} " +
                        $"Base asset volume: {ob.Data.BaseAssetVolume} " +
                        $"Number of trades: {ob.Data.NumberTrades} " +
                        $"Is this kline closed?: {ob.Data.IsClose} " +
                        $"Quote asset volume: {ob.Data.QuoteAssetVolume} " +
                        $"Taker buy base: {ob.Data.TakerBuyBaseAssetVolume} " +
                        $"Taker buy quote: {ob.Data.TakerBuyQuoteAssetVolume} " +
                        $"Ignore: {ob.Data.Ignore} ");
    }));
    
    compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>
    {
        var ob = response.Data;
        Log.Information($"Mini-ticker [{ob.Symbol}] " +
                        $"Open price: {ob.OpenPrice} " +
                        $"Close price: {ob.ClosePrice} " +
                        $"High price: {ob.HighPrice} " +
                        $"Low price: {ob.LowPrice} " +
                        $"Base asset volume: {ob.BaseAssetVolume} " +
                        $"Quote asset volume: {ob.QuoteAssetVolume}");
    }));
    

    一旦compDisp 实例被释放,所有订阅都将被释放。当然,何时完成取决于您的应用程序的上下文。

    编辑: 根据您的应用程序架构,WhenActivated 扩展方法也可能对您感兴趣。它在ActivatableViewActivatableViewModel 接口上定义,并接受每次激活视图(模型)时(即基本上在屏幕上显示时)调用的函数。此函数还有一个CompositeDisposable 作为参数,每次视图(模型)被停用时都会设置该参数。

    编辑 2 刚刚意识到DiposeWith 方法实际上是ReactiveUI 框架以及WhenAcitvated 扩展方法的一部分,而不是该框架所基于的响应式扩展的一部分。所以你不能在不使用该框架的情况下编写像myObservable.Subscribe(x =&gt; ...).DisposeWith(compDisp) 这样的东西,但compDisp.Add(myObservable.Subscribe(x =&gt; ...)) 应该可以工作。我对上面的代码做了相应的调整。

    【讨论】:

    • 感谢您的回答!这是我根据您的回答所做的:controlc.com/4d3cb01d。你可以检查一下吗?
    • 我不知道 CompositeDisposable 也可以这样构造(如前所述,我通常使用 ReactuveUI 的 DisposeWith 方法)。但是,我刚刚在我的应用程序中修改了一个视图模型类进行测试,它似乎按照您的方式工作。
    • 非常感谢! :) 我在项目中添加了扩展方法,我目前正在使用.DisposeWith
    猜你喜欢
    • 2022-01-09
    • 1970-01-01
    • 1970-01-01
    • 2017-08-08
    • 2017-08-01
    • 2012-09-07
    • 1970-01-01
    • 2021-12-24
    • 2021-04-25
    相关资源
    最近更新 更多