【问题标题】:How to implement Redis streams with C# Rx如何使用 C# Rx 实现 Redis 流
【发布时间】:2020-05-29 10:27:03
【问题描述】:

因为我找不到任何不使用循环来获取流内容的实现,所以我开始实现一个,但我遇到了几个问题,你们中的一些人可能会指出我正确的地方。

该实现使用 Pub/Sub 和流的组合: * 日志 -> 流通道 * 日志:通知 -> 发布/订阅 * log:lastReadMessage -> 包含流中的最后读取键

出版商

        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);

            while(true)
            {
                var value =  new NameValueEntry[]
                {
                    new NameValueEntry("id", Guid.NewGuid().ToString()),
                    new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
                };

                redisDb.StreamAdd("log", value);
                var publisher = connectionMultiplexer.GetSubscriber();
                publisher.Publish("log:notify", string.Empty, CommandFlags.None);
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }

订阅者

        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);


            var observableStream =  CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
                .Subscribe(x => {
                  Console.WriteLine(x);  
                });

            Console.ReadLine();
        }
        private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);

        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }


            return Observable.Create<string>(obs => 
            {
                var subscriber = connection.GetSubscriber();
                subscriber.Subscribe($"{channel}:notify", async (ch, msg) => 
                {
                    var locker = await taskFromStreamBlocker
                        .WaitAsync(0)
                        .ConfigureAwait(false);

                    if (!locker)
                    {
                        return;
                    }

                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    taskFromStreamBlocker.Release();
                });

                return Disposable.Create(() => subscriber.Unsubscribe(channel));
            });
        }

为什么是信号量?

因为我可以将大量消息添加到流中,并且我不希望同一条消息被处理两次。

问题

  1. 如果我们在流中有未处理的消息,我们如何在没有来自 Pub/Sub 的事件的情况下进行处理 当我们开始时,我们可以验证它是否是未处理的消息并对其进行处理。如果在此期间有一条新消息添加到流中,而我们尚未订阅 Pub/sub,则订阅者将不会处理该消息,直到我们通过 Pub/Sub 收到通知。

  2. 信号量很重要,不能两次处理同一消息,但同时它是一个诅咒。在消息的处理过程中,可以将另一个消息添加到流中。当这种情况发生时,订阅者不会立即处理,而只会在下次通知时处理(此时将处理两条消息)。

您将如何实现这一点? 是否仅使用 Rx 实现 Redis 流? 该解决方案不应使用某种循环并且具有内存效率。这可能吗?

最好的祝福

保罗·阿博伊姆·平托

【问题讨论】:

    标签: c# redis system.reactive


    【解决方案1】:

    这是另一种使用具有 200 毫秒运行时间的计时器的解决方案

    
            private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
            {
                var lastReadMessage = "0-0";
    
                var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
                if (string.IsNullOrEmpty(lastReadMessageData))
                {
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                }
                else
                {
                    lastReadMessage = lastReadMessageData;
                }
    
                var instance = ThreadPoolScheduler.Instance;
    
                return Observable.Create<string>(obs => 
                {
                    var disposable = Observable
                        .Interval(TimeSpan.FromMilliseconds(200), instance)
                        .Subscribe(async _ => 
                        {
                            var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
    
                            foreach(var message in messages)
                            {
                                obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                                lastReadMessage = message.Id;
                            }
    
                            redisDb.KeyDelete($"{channel}:lastReadMessage");
                            redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                        });
                    cancellationToken.Register(() => disposable.Dispose());
    
                    return Disposable.Empty;    
                });
           }
    
    

    【讨论】:

      【解决方案2】:

      这是我想避免的 WHILE 解决方案

              private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
              {
                  var lastReadMessage = "0-0";
      
                  var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
                  if (string.IsNullOrEmpty(lastReadMessageData))
                  {
                      redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                  }
                  else
                  {
                      lastReadMessage = lastReadMessageData;
                  }
      
                  return Observable.Create<string>(async obs => 
                  {
                      while(!cancellationToken.IsCancellationRequested)
                      {
                          var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
      
                          foreach(var message in messages)
                          {
                              obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                              lastReadMessage = message.Id;
                          }
      
                          redisDb.KeyDelete($"{channel}:lastReadMessage");
                          redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
      
                          await Task.Delay(TimeSpan.FromMilliseconds(500));
                      }
      
                      return Disposable.Empty;
                  });
              }
      

      【讨论】:

      • 这很糟糕..每个案例等待 500 毫秒。 IIfHaveNoWork() 然后等待 500 毫秒。您可以根据没有记录或提取的记录数 = 到最大计数来决定 IfHaveNoWork 。所以你只有在没有工作的时候等待。然后你可以测量它。
      • 你应该使用 maxcount ,否则当你有太多记录时它会爆炸。
      【解决方案3】:

      我使用紧密循环只是做一个 XRange 并保存一个位置 - KISS.. 但如果没有工作它会后退,所以当它的紧密循环发生很多事情时它非常快。

      如果您需要更高的性能,例如在处理时读取,但是在大多数情况下我会提醒您不要这样做。

      1. 这会带来很多复杂性,这需要坚如磐石。
      2. Redis 通常足够快
      3. “我不希望 o 对同一消息进行两次处理。”几乎每个系统都至少有一次交付,以消除崩溃周围的这种情况令人难以置信的困难/缓慢。您可以通过使用 id 的哈希集来部分删除它,但对于消费者来说处理它和设计为幂等的消息非常简单。这可能是消息设计问题的根本原因。如果您对每个阅读器进行分区(单独的流和每个流 1 个工作人员),您可以将哈希集保存在内存中,避免缩放/分布式问题。请注意,Redis 流可以保留顺序,使用它来制作更简单的幂等消息。
      4. 异常,您不希望停止处理流,因为消费者在 1 条消息上有逻辑异常,例如在晚上接到呼叫整个系统已停止,锁定会使情况变得更糟。事件数据无法更改,因此请尽最大努力。但是 infra / redis 异常确实需要抛出并重试。在循环之外进行管理非常痛苦。
      5. 简单的背压。如果您不能足够快地处理工作,那么循环会变慢,而不是创建大量任务并耗尽所有内存。

      我不再使用分布式锁/信号量。

      如果您处理命令,例如 dosomething 而不是 xyz,这些可能会失败。消费者应该再次处理已经发生的情况,而不是 redis / 流读取部分。

      一些带有魔术回调的库不能解决这些问题,当在任何节点上运行超时等时,回调将重试。复杂性/问题仍然存在,它们只是转移到其他地方。

      您可能在顶部有一个可观察的消费者,但这基本上是装饰性的,它不能解决问题,如果您在某个地方查看许多实现,您会看到相同的循环。我不会使用它来让消费者注册一个动作。

      例如

          public interface IStreamSubscriber
          {
              void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
              void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
              void Start();
          }    
      

      在您的情况下,回调可能具有可观察的并且不使用循环,但是下面有一个低级循环,它也可以为消费者进行消息到对象的转换。

      【讨论】:

      • 这个想法是为了避免循环,因为如果我们有多个流,我们就会有多个循环......这会消耗大量内存和 CPU......对吗?
      • 这是一个比回调更小的问题。它们也有隐藏的内存来跟踪这些东西,你可能会得到泄漏。使用循环,每次迭代都会擦除内存,没有全局状态。循环将擦除内存,批处理的大小实际上使事情运行得更快,并允许批处理操作,如果您需要它尝试在数据库中一次插入 1 条记录。内存不是您为多少内存设置批处理大小的问题,而是您在最坏的情况下说话的 Megs。
      猜你喜欢
      • 2016-09-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-06-24
      • 1970-01-01
      • 2016-12-18
      • 2011-03-13
      • 1970-01-01
      相关资源
      最近更新 更多