【问题标题】:Handling exceptions in System.Reactive's Subscribe在 System.Reactive 的订阅中处理异常
【发布时间】:2022-01-09 14:58:34
【问题描述】:

我想使用System.Reactive 显示以下所有订阅的例外情况。

下面一小段代码,你可以看到我试过了,但还是没有显示消息。

client.Streams.AggregateTradesStream
    .Subscribe(response =>
    {
        throw new Exception("Asd");

        Guard.Against.Null(response, nameof(response));
        Guard.Against.Null(response.Data, nameof(response.Data),
            "Something went wrong and the aggregated trade object is null");

        var trade = response.Data;
        Log.Information($"Aggregated trade [{trade.Symbol}] [{trade.Side}] " +
                        $"Price: {trade.Price} Size: {trade.Quantity}");
    }, ex => Console.WriteLine("Exception: {0} {1}", ex.Message, DateTime.Now))
    .DisposeWith(disposable);

如何处理/显示所有.Subscribes 的异常,因为我所做的不起作用?也许,扩展方法可能会有所帮助。

完整代码

public class Program
{
    private static readonly ManualResetEvent ExitEvent = new(false);

    private static async Task Main()
    {
        Log.Logger = new LoggerConfiguration()
            .MinimumLevel.Verbose()
            .Enrich.FromLogContext()
            .WriteTo.Console(LogEventLevel.Debug, theme: SystemConsoleTheme.Colored)
            .WriteTo.File(Path.Combine("logs", "verbose.log"), rollingInterval: RollingInterval.Day)
            .CreateLogger();

        var disposable = new CompositeDisposable();
        var uri = new Uri("wss://stream.binance.com:9443");

        using var communicator = new BinanceWebSocketCommunicator(uri);

        communicator.Name = "Binance-Spot";
        communicator.ReconnectTimeout = TimeSpan.FromMinutes(10);

        communicator.ReconnectionHappened
            .Subscribe(info => Log.Information($"Reconnection happened, type: {info.Type}"))
            .DisposeWith(disposable);

        communicator.DisconnectionHappened
            .Subscribe(info => Log.Information($"Disconnection happened, type: {info.Type}"))
            .DisposeWith(disposable);

        using var client = new BinanceWebSocketClient(communicator);

        client.Streams.PongStream
            .Subscribe(x => Log.Information($"Pong received ({x.Message})"))
            .DisposeWith(disposable);

        client.Streams.AggregateTradesStream
            .Subscribe(response =>
            {
                throw new Exception("Asd");

                Guard.Against.Null(response, nameof(response));
                Guard.Against.Null(response.Data, nameof(response.Data),
                    "Something went wrong and the aggregated trade object is null");

                var trade = response.Data;
                Log.Information($"Aggregated trade [{trade.Symbol}] [{trade.Side}] " +
                                $"Price: {trade.Price} Size: {trade.Quantity}");
            }, ex => Console.WriteLine("Exception: {0} {1}", ex.Message, DateTime.Now))
            .DisposeWith(disposable);

        client.Streams.KlineStream
            .Subscribe(response =>
            {
                Guard.Against.Null(response, nameof(response));
                Guard.Against.Null(response.Data, nameof(response.Data),
                    "Something went wrong and the kline object is null");
                Guard.Against.Null(response.Data.Data, nameof(response.Data.Data),
                    "Something went wrong and the kline data object is null");

                var kline = response.Data;
                var klineData = response.Data.Data;

                Log.Information($"Kline [{kline.Symbol}] " +
                                $"Kline start time: {klineData.StartTime} " +
                                $"Kline close time: {klineData.CloseTime} " +
                                $"Interval: {klineData.Interval} " +
                                $"First trade ID: {klineData.FirstTradeId} " +
                                $"Last trade ID: {klineData.LastTradeId} " +
                                $"Open price: {klineData.OpenPrice} " +
                                $"Close price: {klineData.ClosePrice} " +
                                $"High price: {klineData.HighPrice} " +
                                $"Low price: {klineData.LowPrice} " +
                                $"Base asset volume: {klineData.BaseAssetVolume} " +
                                $"Number of trades: {klineData.NumberTrades} " +
                                $"Is this kline closed?: {klineData.IsClosed} " +
                                $"Quote asset volume: {klineData.QuoteAssetVolume} " +
                                $"Taker buy base: {klineData.TakerBuyBaseAssetVolume} " +
                                $"Taker buy quote: {klineData.TakerBuyQuoteAssetVolume} " +
                                $"Ignore: {klineData.Ignore} ");
            })
            .DisposeWith(disposable);

        client.AddSubscription(
            new AggregateTradeSubscription("bnbusdt"),
            new KlineSubscription("btcusdt", "1h"));

        await communicator.Start().ConfigureAwait(false);

        ExitEvent.WaitOne();

        disposable.Dispose();

        Log.CloseAndFlush();
    }
}

【问题讨论】:

    标签: c# .net-core system.reactive


    【解决方案1】:

    我意识到,为了在 .Subscribe 中显示异常,我必须将其包装在 .Select/.SelectManytry catch 块中。

    client.Streams.AggregateTradesStream
        .Subscribe(response =>
        {
            try
            {
                Guard.Against.Null(response, nameof(response));
                Guard.Against.Null(response.Data, nameof(response.Data));
    
                var trade = response.Data;
                Log.Information($"Aggregated trade [{trade.Symbol}] [{trade.Side}] " +
                                $"Price: {trade.Price} Size: {trade.Quantity}");
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Exception while receiving message");
            }
        })
        .DisposeWith(disposable);
    

    【讨论】:

      猜你喜欢
      • 2022-01-08
      • 1970-01-01
      • 1970-01-01
      • 2012-09-07
      • 2013-01-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多