【问题标题】:SignalR core - invalidate dead connectionsSignalR 核心 - 使死连接无效
【发布时间】:2019-06-30 23:32:34
【问题描述】:

问题

我正在使用 .NET Core 2.2 和 ASP.NET Core SignalR。目前,我将所有连接状态保存在 SQL 数据库中(请参阅this document;尽管它是“旧”SignalR 库的手册,但逻辑是相同的)。我也在使用 Redis 背板,因为我的应用程序可以水平扩展。

但是,重新启动我的应用程序时,当前连接不会关闭,而是会成为孤立的。之前链接的文章指出:

如果您的网络服务器停止工作或应用程序重新启动, 不调用 OnDisconnected 方法。因此,有可能 您的数据存储库将包含连接 ID 为 no 的记录 有效期更长。要清理这些孤立的记录,您可能希望 使在时间范围之外创建的任何连接无效 与您的应用相关。

问题

在“旧”SignalR 中有一个 ITransportHeartbeatthis script 完美实现),但 .NET Core 版本没有这样的接口(至少,我找不到它)。

我如何知道连接是否不再存在?我想要(或实际上需要)清理旧的连接 ID。

【问题讨论】:

    标签: c# asp.net-core asp.net-core-signalr


    【解决方案1】:

    在另一个答案中 @davidfowl 的 cmets 之后更新。

    带有 SignalR 的 .NET Core 2.1 具有 IConnectionHeartbeatFeature,您可以使用它来实现与旧 SignalR 中的 ITransportHeartbeat 类似的功能。

    以下代码的主要症结在于,我们维护了一个内存列表,用于跟踪数据库中需要更新的连接。这使我们能够以受控的时间间隔和批量执行昂贵的数据库操作。 IConnectionHeartbeatFeature.OnHeartbeat() 每秒为每个连接触发一次,因此以该频率访问数据库可能会使您的服务器大规模下降。

    首先创建一个实体来维护内存中服务器尚未更新的连接列表:

    public interface IConnectionCounter
    {
        internal ConcurrentDictionary<string, DateTime> Connections { get; }
    
        public void RecordConnectionLastSeen(string connectionId);
        public void RemoveConnection(string connectionId);
    }
    
    /// <summary>
    /// Maintains a dictionary of connections that need to be refreshed in the 
    /// database
    /// </summary>
    public class ConnectionCounter : IConnectionCounter
    {
        private readonly ConcurrentDictionary<string, DateTime> _connections;
        ConcurrentDictionary<string, DateTime> IConnectionCounter.Connections 
            => _connections;
    
        public ConnectionCounter()
        {
            _connections = new ConcurrentDictionary<string, DateTime>();
        }
    
        public void RecordConnectionLastSeen(string connectionId)
        {
            _connections.AddOrUpdate(
                connectionId, 
                DateTime.UtcNow, 
                (existingConnectionId, time) => time);
        }
    
        public void RemoveConnection(string connectionId)
        {
            _connections.Remove(connectionId, out _);
        }
    }
    

    请注意,这不是需要更新的所有在线连接的最终列表,因为连接可能分布在多个服务器上。如果您有许多服务器,则可以通过将这些连接存储在 Redis 等分布式内存存储中来进一步减少负载。

    接下来,在 Hub 中设置 IConnectionCounter 以便计算连接数。

    public class ChatHub : Hub
    {
        private readonly IConnectionCounter _connectionCounter;
    
        public ChatHub(
            IConnectionCounter connectionCounter)
        {
            _connectionCounter = connectionCounter;
        }
    
        [AllowAnonymous]
        public override Task OnConnectedAsync()
        {
            var connectionHeartbeat = 
                Context.Features.Get<IConnectionHeartbeatFeature>();
    
            connectionHeartbeat.OnHeartbeat(connectionId => {
                _connectionCounter.RecordConnectionLastSeen((string)connectionId); 
            }, Context.ConnectionId);
    
            return base.OnConnectedAsync();
        }
    }
    

    现在创建一个服务,在 IConnectionCounter 中获取连接并使用所述连接的状态更新数据库:

    public interface IPresenceDatabaseSyncer
    {
        public Task UpdateConnectionsOnlineStatus();
    }
    
    /// <summary>
    /// Handles updating the online status of connections whose connections
    /// that need to be updated in the database
    /// </summary>
    public class PresenceDatabaseSyncer : IPresenceDatabaseSyncer
    {
        private readonly MyDbContext _context;
        private readonly IConnectionCounter _connectionCounter;
    
        public PresenceDatabaseSyncer(
            MyDbContext context, 
            IConnectionCounter connectionCounter)
        {
            _context = context;
            _connectionCounter = connectionCounter;
        }
    
        public async Task UpdateConnectionsOnlineStatus()
        {
            if (_connectionCounter.Connections.IsEmpty)
                return;
    
            foreach (var connection in _connectionCounter.Connections)
            {
                var connectionId = connection.Key;
                var lastPing = connection.Value;
    
                var dbConnection = _context.Connection
                    .FirstOrDefault(x => x.ConnectionId == connectionId);
    
                if (dbConnection != null)
                    dbConnection.LastPing = lastPing;
    
                _connectionCounter.RemoveConnection(connectionId);
            }
        }
    }
    

    然后我使用 HostedService 持续运行上面的 db sync:

    /// <summary>
    /// Runs a periodic sync operation to ensure that connections are 
    /// recorded as being online correctly in the database
    /// </summary>
    public class PresenceDatabaseSyncerHostedService : IHostedService, IDisposable
    {
        private const int SyncIntervalSeconds = 10;
    
        private readonly IServiceScopeFactory _serviceScopeFactory;
        private Timer _timer;
    
        public PresenceDatabaseSyncerHostedService(
            IServiceScopeFactory serviceScopeFactory)
        {
            _serviceScopeFactory = serviceScopeFactory;
        }
    
        public Task StartAsync(CancellationToken stoppingToken)
        {
            _timer = new Timer(
                DoWork, 
                null, 
                TimeSpan.Zero, 
                TimeSpan.FromSeconds(SyncIntervalSeconds));
    
            return Task.CompletedTask;
        }
    
        private async void DoWork(object state)
        {
            using var scope = _serviceScopeFactory.CreateScope();
            var scopedProcessingService = 
                scope.ServiceProvider.GetRequiredService<IPresenceDatabaseSyncer>();
    
            await scopedProcessingService.UpdateConnectionsOnlineStatus();
        }
    
        public Task StopAsync(CancellationToken stoppingToken)
        {
            _timer?.Change(Timeout.Infinite, 0);
            return Task.CompletedTask;
        }
    
        public void Dispose()
        {
            _timer?.Dispose();
        }
    }
    

    最后注册这些依赖和服务:

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddSingleton<IConnectionCounter, ConnectionCounter>();
            services.AddScoped<IPresenceDatabaseSyncer, PresenceDatabaseSyncer>();
    
            services.AddHostedService<PresenceDatabaseSyncerHostedService>();
            // ...
        }
        
        // ...
    }
    

    当然,实际清理数据库中的陈旧连接仍然存在问题。我使用另一个 HostedService 来处理这个问题,并将作为练习留给读者。

    如果您使用的是 Azure SignalR 服务,则根据@Devator 的回答手动发送 KeepAlive 消息还有一个额外的好处,因为您不需要为消息付费(因为 OnHeartbeat 发生在内部)。

    请记住,此功能并没有真正详细记录。我已经在生产环境中使用它几个月了,但我还没有看到使用这种技术的其他解决方案。

    【讨论】:

      【解决方案2】:

      我想出的解决方案如下。它没有那么优雅,但目前我没有其他选择。

      我更新了数据库中的模型,不仅包含ConnectionId,还包含LastPing(这是DateTime 类型)。客户端发送KeepAlive 消息(自定义消息,不使用 SignalR keepalive 设置)。收到消息后(服务器端),我用当前时间更新数据库:

      var connection = _context.Connection.FirstOrDefault(x => x.Id == Context.ConnectionId);
      connection.LastPing = DateTime.UtcNow;
      

      为了清理孤立的连接(SignalR 的OnDisconnected 方法不会删除),我有一个定期运行的任务(当前在 Hangfire 中),它删除了最近没有更新 LastPing 字段的连接。

      【讨论】:

      • 嗨。谢谢您的提问和回答。我有同样的问题:) 你提到你没有使用 SignalR 的 KeepAlive 设置,我想知道为什么?如果我们改用这些设置会出现什么问题?
      • 这还是最优解吗?
      • 这就是办法!
      • 感谢@davidfowl。你介意评论一下使用 IConnectionHeartbeatFeature 是否有任何缺点吗?我没有看到太多关于使用此功能的讨论,所以有点犹豫是否推荐这个,就像我在回答中所做的那样。
      • 最大的缺点是回调在每个连接上运行并且是同步的。由于信号器连接的范围,从那里解析 dbcontext 也是值得怀疑的。它不会扩展以运行每个连接经常运行的数据库查询,这会使您的服务器大规模下降。您可能希望在其中更新本地状态,然后批量更新所有连接状态。
      猜你喜欢
      • 2018-07-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-21
      • 2019-08-18
      • 1970-01-01
      相关资源
      最近更新 更多