在另一个答案中 @davidfowl 的 cmets 之后更新。
带有 SignalR 的 .NET Core 2.1 具有 IConnectionHeartbeatFeature,您可以使用它来实现与旧 SignalR 中的 ITransportHeartbeat 类似的功能。
以下代码的主要症结在于,我们维护了一个内存列表,用于跟踪数据库中需要更新的连接。这使我们能够以受控的时间间隔和批量执行昂贵的数据库操作。 IConnectionHeartbeatFeature.OnHeartbeat() 每秒为每个连接触发一次,因此以该频率访问数据库可能会使您的服务器大规模下降。
首先创建一个实体来维护内存中服务器尚未更新的连接列表:
public interface IConnectionCounter
{
internal ConcurrentDictionary<string, DateTime> Connections { get; }
public void RecordConnectionLastSeen(string connectionId);
public void RemoveConnection(string connectionId);
}
/// <summary>
/// Maintains a dictionary of connections that need to be refreshed in the
/// database
/// </summary>
public class ConnectionCounter : IConnectionCounter
{
private readonly ConcurrentDictionary<string, DateTime> _connections;
ConcurrentDictionary<string, DateTime> IConnectionCounter.Connections
=> _connections;
public ConnectionCounter()
{
_connections = new ConcurrentDictionary<string, DateTime>();
}
public void RecordConnectionLastSeen(string connectionId)
{
_connections.AddOrUpdate(
connectionId,
DateTime.UtcNow,
(existingConnectionId, time) => time);
}
public void RemoveConnection(string connectionId)
{
_connections.Remove(connectionId, out _);
}
}
请注意,这不是需要更新的所有在线连接的最终列表,因为连接可能分布在多个服务器上。如果您有许多服务器,则可以通过将这些连接存储在 Redis 等分布式内存存储中来进一步减少负载。
接下来,在 Hub 中设置 IConnectionCounter 以便计算连接数。
public class ChatHub : Hub
{
private readonly IConnectionCounter _connectionCounter;
public ChatHub(
IConnectionCounter connectionCounter)
{
_connectionCounter = connectionCounter;
}
[AllowAnonymous]
public override Task OnConnectedAsync()
{
var connectionHeartbeat =
Context.Features.Get<IConnectionHeartbeatFeature>();
connectionHeartbeat.OnHeartbeat(connectionId => {
_connectionCounter.RecordConnectionLastSeen((string)connectionId);
}, Context.ConnectionId);
return base.OnConnectedAsync();
}
}
现在创建一个服务,在 IConnectionCounter 中获取连接并使用所述连接的状态更新数据库:
public interface IPresenceDatabaseSyncer
{
public Task UpdateConnectionsOnlineStatus();
}
/// <summary>
/// Handles updating the online status of connections whose connections
/// that need to be updated in the database
/// </summary>
public class PresenceDatabaseSyncer : IPresenceDatabaseSyncer
{
private readonly MyDbContext _context;
private readonly IConnectionCounter _connectionCounter;
public PresenceDatabaseSyncer(
MyDbContext context,
IConnectionCounter connectionCounter)
{
_context = context;
_connectionCounter = connectionCounter;
}
public async Task UpdateConnectionsOnlineStatus()
{
if (_connectionCounter.Connections.IsEmpty)
return;
foreach (var connection in _connectionCounter.Connections)
{
var connectionId = connection.Key;
var lastPing = connection.Value;
var dbConnection = _context.Connection
.FirstOrDefault(x => x.ConnectionId == connectionId);
if (dbConnection != null)
dbConnection.LastPing = lastPing;
_connectionCounter.RemoveConnection(connectionId);
}
}
}
然后我使用 HostedService 持续运行上面的 db sync:
/// <summary>
/// Runs a periodic sync operation to ensure that connections are
/// recorded as being online correctly in the database
/// </summary>
public class PresenceDatabaseSyncerHostedService : IHostedService, IDisposable
{
private const int SyncIntervalSeconds = 10;
private readonly IServiceScopeFactory _serviceScopeFactory;
private Timer _timer;
public PresenceDatabaseSyncerHostedService(
IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
public Task StartAsync(CancellationToken stoppingToken)
{
_timer = new Timer(
DoWork,
null,
TimeSpan.Zero,
TimeSpan.FromSeconds(SyncIntervalSeconds));
return Task.CompletedTask;
}
private async void DoWork(object state)
{
using var scope = _serviceScopeFactory.CreateScope();
var scopedProcessingService =
scope.ServiceProvider.GetRequiredService<IPresenceDatabaseSyncer>();
await scopedProcessingService.UpdateConnectionsOnlineStatus();
}
public Task StopAsync(CancellationToken stoppingToken)
{
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
_timer?.Dispose();
}
}
最后注册这些依赖和服务:
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IConnectionCounter, ConnectionCounter>();
services.AddScoped<IPresenceDatabaseSyncer, PresenceDatabaseSyncer>();
services.AddHostedService<PresenceDatabaseSyncerHostedService>();
// ...
}
// ...
}
当然,实际清理数据库中的陈旧连接仍然存在问题。我使用另一个 HostedService 来处理这个问题,并将作为练习留给读者。
如果您使用的是 Azure SignalR 服务,则根据@Devator 的回答手动发送 KeepAlive 消息还有一个额外的好处,因为您不需要为消息付费(因为 OnHeartbeat 发生在内部)。
请记住,此功能并没有真正详细记录。我已经在生产环境中使用它几个月了,但我还没有看到使用这种技术的其他解决方案。