【发布时间】:2020-05-29 10:27:03
【问题描述】:
因为我找不到任何不使用循环来获取流内容的实现,所以我开始实现一个,但我遇到了几个问题,你们中的一些人可能会指出我正确的地方。
该实现使用 Pub/Sub 和流的组合: * 日志 -> 流通道 * 日志:通知 -> 发布/订阅 * log:lastReadMessage -> 包含流中的最后读取键
出版商
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
while(true)
{
var value = new NameValueEntry[]
{
new NameValueEntry("id", Guid.NewGuid().ToString()),
new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
};
redisDb.StreamAdd("log", value);
var publisher = connectionMultiplexer.GetSubscriber();
publisher.Publish("log:notify", string.Empty, CommandFlags.None);
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
订阅者
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
var observableStream = CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
.Subscribe(x => {
Console.WriteLine(x);
});
Console.ReadLine();
}
private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(obs =>
{
var subscriber = connection.GetSubscriber();
subscriber.Subscribe($"{channel}:notify", async (ch, msg) =>
{
var locker = await taskFromStreamBlocker
.WaitAsync(0)
.ConfigureAwait(false);
if (!locker)
{
return;
}
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
taskFromStreamBlocker.Release();
});
return Disposable.Create(() => subscriber.Unsubscribe(channel));
});
}
为什么是信号量?
因为我可以将大量消息添加到流中,并且我不希望同一条消息被处理两次。
问题
如果我们在流中有未处理的消息,我们如何在没有来自 Pub/Sub 的事件的情况下进行处理 当我们开始时,我们可以验证它是否是未处理的消息并对其进行处理。如果在此期间有一条新消息添加到流中,而我们尚未订阅 Pub/sub,则订阅者将不会处理该消息,直到我们通过 Pub/Sub 收到通知。
信号量很重要,不能两次处理同一消息,但同时它是一个诅咒。在消息的处理过程中,可以将另一个消息添加到流中。当这种情况发生时,订阅者不会立即处理,而只会在下次通知时处理(此时将处理两条消息)。
您将如何实现这一点? 是否仅使用 Rx 实现 Redis 流? 该解决方案不应使用某种循环并且具有内存效率。这可能吗?
最好的祝福
保罗·阿博伊姆·平托
【问题讨论】:
标签: c# redis system.reactive