【问题标题】:Redis Pub/Sub with Reliability具有可靠性的 Redis Pub/Sub
【发布时间】:2011-09-05 17:41:51
【问题描述】:

我一直在考虑使用 Redis Pub/Sub 作为 RabbitMQ 的替代品。

据我了解,Redis 的 pub/sub 持有与每个订阅者的持久连接,如果连接终止,所有未来的消息都将丢失并丢弃在地板上。

一种可能的解决方案是使用列表(和阻塞等待)来存储所有消息和发布/订阅,这只是一种通知机制。我认为这让我大部分时间都在那里,但我仍然对失败案例有些担忧。

  1. 当订阅者死亡并重新上线时会发生什么,它应该如何处理所有待处理的消息?
  2. 当系统收到格式错误的消息时,您如何处理这些异常?死信队列?
  3. 是否有实施重试策略的标准做法?

【问题讨论】:

  • 您可以在redis.io/commands/rpoplpush查看 Redis 模式以获得可靠的队列
  • 我也有同样的问题...我想向客户端发送位置更新...一旦它们断开连接,我不知道如何在客户端和服务器之间同步数据...有你解决问题了吗?如果是,怎么做??

标签: redis


【解决方案1】:

当订阅者(消费者)死亡时,您的列表将继续增长,直到客户返回。一旦达到特定限制,您的生产者可以(从任一侧)修剪列表,但这是您需要在应用程序级别处理的事情。如果您在每条消息中包含时间戳,那么您的消费者就可以对消息的年龄采取行动,假设您具有要在消息年龄上强制执行的应用程序逻辑。

我不确定格式错误的消息是如何进入系统的,因为与 Redis 的连接通常是具有完整性保证的 TCP。但是如果发生这种情况,可能是由于生产者层的消息编码中的错误,您可以通过保留接收消费者异常消息的每个生产者队列来提供处理错误的通用机制。

重试策略很大程度上取决于您的应用需求。如果您需要 100% 保证消息已被接收和处理,那么您应该考虑使用 Redis 事务(MULTI/EXEC)来包装消费者完成的工作,这样您就可以确保客户端不会删除消息,除非它已经完成了它的工作。如果您需要显式确认,则可以在专用于生产者进程的队列上使用显式 ACK 消息。

如果不详细了解您的应用程序需求,就很难知道如何明智地选择。通常,如果您的消息需要完整的 ACID 保护,那么您可能还需要使用 redis 事务。如果您的消息仅在及时时才有意义,则可能不需要事务。听起来好像您不能容忍丢失的消息,所以您使用列表的方法很好。如果您需要为您的消息实现一个优先级队列,您可以使用排序集(Z 命令)来存储您的消息,使用它们的优先级作为得分值,以及一个轮询消费者。

【讨论】:

    【解决方案2】:

    如果您想要一个订阅者在死亡时不会丢失消息的 pub/sub 系统,请考虑使用 Redis Streams 而不是 Redis Pub/sub。

    Redis Streams 有自己的架构和 Redis Pub/sub 的优缺点。使用 Redis Streams,订阅者可以发出命令:

    我收到的最后一条消息是 X,现在给我下一条消息; 如果没有新消息,则等待消息到达。

    上面链接的 Antirez 的文章很好地介绍了 Redis 流,并提供了更多信息。

    【讨论】:

      【解决方案3】:

      我所做的是使用一个排序集,使用时间戳作为分数,将数据的键作为成员值。我使用最后一个项目的分数来检索接下来的几个项目,然后获取密钥。工作完成后,我将 zrem 和 del 包装在 MULTI/EXEC 事务中。

      基本上是 Edward 所说的,但是将键存储在有序集中,因为我的消息可能非常大。

      希望这会有所帮助!

      【讨论】:

        【解决方案4】:

        这是我专门为此目的编写的一个类:

        import logging
        
        from redis import StrictRedis
        
        # Defaults
        CONNECT_TIMEOUT_SECS = 5.0  # Control how long to wait while establishing a connection
        REQUEST_TIMEOUT_SECS = 120.0  # Request socket timeout
        
        
        class RedisBaseClient(object):
        
            def __init__(self, config=None, connect_timeout_secs=CONNECT_TIMEOUT_SECS,
                         request_timeout_secs=REQUEST_TIMEOUT_SECS):
                """
                Load config
                :param config: dict, config
                :param connect_timeout_secs: float, re-connect timeout seconds
                :param request_timeout_secs: float, timeout seconds
                """
                self.read_conn = None
                self.write_conn = None
                self.config = config or {}
        
                self.CONNECT_TIMEOUT_SECS = connect_timeout_secs
                self.REQUEST_TIMEOUT_SECS = request_timeout_secs
        
                self.read_connection()
        
            def _connect(self, host, port):
                return StrictRedis(host=host,
                                   port=port,
                                   socket_keepalive=False,
                                   retry_on_timeout=True,
                                   socket_timeout=self.REQUEST_TIMEOUT_SECS,
                                   socket_connect_timeout=self.CONNECT_TIMEOUT_SECS)
        
            def read_connection(self):
                """
                Returns a read connection to redis cache
                """
                if not self.read_conn:
                    try:
                        self.read_conn = self._connect(self.config['read_host'], self.config['read_port'])
                    except KeyError:
                        logging.error("RedisCache.read_connection invalid configuration")
                        raise
                    except Exception as e:
                        logging.exception("RedisCache.read_connection unhandled exception {}".format(e))
                        raise
        
                return self.read_conn
        
            def write_connection(self):
                """
                Returns a write connection to redis cache
                """
                if not self.write_conn:
                    try:
                        self.write_conn = self._connect(self.config['write_host'], self.config['write_port'])
                    except KeyError:
                        logging.error("RedisCache.write_connection invalid configuration")
                        raise
                    except Exception as e:
                        logging.exception("RedisCache.write_connection unhandled exception {}".format(e))
                        raise
        
                return self.write_conn
        
        
        class RedisQueue(RedisBaseClient):
        
            def get_queue_msg_count(self, q_name):
                """
                Get queue message count
                Return the size of the queue (list).
                :param q_name: str, redis key (queue name)
                :return:
                """
                try:
                    msg_count = self.read_connection().llen(q_name)
                except Exception as e:  # pragma: no cover
                    msg_count = 0
                    logging.warning("RedisQueue.get_queue_msg_count no data for queue {}. {}".format(q_name, e))
                return msg_count
        
            def is_empty(self, q_name):
                """
                Return True if the queue is empty, False otherwise.
                :param q_name: str, queue name
                :return: bool, is empty
                """
                return self.get_queue_msg_count(q_name) == 0
        
            def publish(self, q_name, data):
                """
                Publish msg/item to queue.
                :param q_name: str, queue name
                :param data: str, data (message)
                :return: bool, success
                """
                try:
                    self.write_connection().rpush(q_name, data)
                except Exception as e:  # pragma: no cover
                    logging.warning("RedisQueue.publish for queue {}, msg {}. {}".format(q_name, data, e))
                    return False
                return True
        
            def publish_multiple(self, q_name, data_list):
                """
                Publish multiple msg/items to queue.
                :param q_name: str, queue name
                :param data_list: list of str, data (message)
                :return: bool, success
                """
                try:
                    self.write_connection().rpush(q_name, *data_list)
                except Exception as e:  # pragma: no cover
                    logging.warning("RedisQueue.publish_multiple for queue {}. {}".format(q_name, e))
                    return False
                return True
        
            def flush_queue(self, q_name):
                """
                Flush a queue to clear work for consumer
                :param q_name:
                :return:
                """
                try:
                    self.write_connection().delete(q_name)
                except Exception as e:  # pragma: no cover
                    logging.exception("RedisQueue.flush_queue {} error {}".format(q_name, e))
                    return False
                return True
        
            def flush_queues(self, q_names):
                """
                Flush all queues
                :return: bool, success
                """
                try:
                    self.write_connection().delete(*q_names)
                except Exception as e:  # pragma: no cover
                    logging.exception("RedisQueue.flush_queues {} error {}".format(q_names, e))
                    return False
                return True
        
            def get_messages(self, q_name, prefetch_count=100):
                """
                Get messages from queue
                :param q_name: str, queue name
                :param prefetch_count: int, number of msgs to prefetch
                    for consumer (default 1000)
                """
                pipe = self.write_connection().pipeline()
                pipe.lrange(q_name, 0, prefetch_count - 1)  # Get msgs (w/o pop)
                pipe.ltrim(q_name, prefetch_count, -1)  # Trim (pop) list to new value
        
                messages, trim_success = pipe.execute()
        
                return messages
        
            def get_message(self, q_name, timeout=None):
                """
                Pop and return an msg/item from the queue.
                If optional args timeout is not None (the default), block
                if necessary until an item is available.
                Allows for blocking via timeout if queue
                does not exist.
                :param q_name: str, queue name
                :param timeout: int, timeout wait seconds (blocking get)
                :return: str, message
                """
                if timeout is not None:
                    msg = self.read_connection().blpop(q_name, timeout=timeout)
                    if msg:
                        msg = msg[1]
                else:
                    msg = self.read_connection().lpop(q_name)
        
                return msg
        
            def get_message_safe(self, q_name, timeout=0, processing_prefix='processing'):
                """
                Retrieve a message but also send it to
                a processing queue for later acking
                :param q_name: str, queue name
                :param timeout:
                :param processing_prefix:
                :return:
                """
                # Too bad blpoplpush does not exist
                # item = self.read_connection().brpoplpush(q_name, "{}:{}".format(q_name, processing_prefix), timeout=timeout)
        
                msg = self.get_message(q_name=q_name, timeout=timeout)
                if msg:
                    self.write_connection().lpush("{}:{}".format(q_name, processing_prefix), msg)
                return msg
        
            def ack_message_safe(self, q_name, message, processing_prefix='processing'):
                """
                Acknowledge a message has been processed
                :param q_name: str, queue name
                :param message: str, message value
                :param processing_prefix: str, prefix of processing queue name
                :return: bool, success
                """
                self.read_connection().lrem("{}:{}".format(q_name, processing_prefix), -1, message)
                return True
        
            def requeue_message_safe(self, q_name, processing_prefix='processing'):
                """
                Move unprocessed messages from processing queue
                to original queue for re-processing
                :param q_name:
                :param processing_prefix:
                :return: bool, success
                """
                msgs = self.write_connection().lrange("{}:{}".format(q_name, processing_prefix), 0, -1)  # Get all msgs
                if msgs:
                    msgs = msgs[::-1]  # Reverse order
                    pipe = self.write_connection().pipeline()
                    pipe.rpush(q_name, *msgs)
                    pipe.ltrim("{}:{}".format(q_name, processing_prefix), 0, -1)  # Cleanup
                    pipe.execute()
                return True
        

        只需初始化RedisQueue 并使用函数。我想这就是你所追求的。

        【讨论】:

        • 这是做什么的?也许与我的问题有关? stackoverflow.com/questions/68437652/…
        • 它不是 pub sub,它更可靠,因为您的消费者可以离线并且消息将排队而不是丢失。它是一个简单的队列实现。
        • 是的,类似于流对吧?你为此写了些什么?但是不确定如何使用它。它会添加新命令吗?
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2011-06-23
        • 2012-12-01
        • 2012-01-13
        • 2015-11-09
        • 1970-01-01
        • 2014-11-24
        • 1970-01-01
        相关资源
        最近更新 更多