【问题标题】:Redis pub/sub adding additional channels mid subscriptionRedis pub/sub 在订阅中添加额外的频道
【发布时间】:2011-07-15 07:06:54
【问题描述】:

是否可以向 Redis 连接添加其他订阅?我有一个监听线程,但它似乎不受新的 SUBSCRIBE 命令的影响。

如果这是预期的行为,如果用户将股票行情提要添加到他们的兴趣或加入聊天室,应该使用什么模式?

我想实现一个类似于以下的 Python 类:

import threading
import redis

class RedisPubSub(object):
    def __init__(self):
        self._redis_pub = redis.Redis(host='localhost', port=6379, db=0)        
        self._redis_sub = redis.Redis(host='localhost', port=6379, db=0)        
        self._sub_thread = threading.Thread(target=self._listen)
        self._sub_thread.setDaemon(True)
        self._sub_thread.start()

    def publish(self, channel, message):
        self._redis_pub.publish(channel, message)

    def subscribe(self, channel):
        self._redis_sub.subscribe(channel)

    def _listen(self):
        for message in self._redis_sub.listen():
            print message

【问题讨论】:

    标签: python redis


    【解决方案1】:

    python-redis RedisConnectionPool 类继承自 threading.local,这会产生您所看到的“神奇”效果。

    总结:您的主线程和工作线程的 self._redis_sub 客户端最终使用了两个不同的服务器连接,但只有主线程的连接发出了 SUBSCRIBE 命令。

    详细信息:由于主线程正在创建self._redis_sub,因此该客户端最终被放置到主线程的本地线程存储中。接下来我假设主线程进行了client.subscribe(channel) 调用。现在主线程的客户端在连接 1 上订阅。接下来您启动 self._sub_thread 工作线程,它最终将自己的 self._redis_sub 属性设置为 redis.Client 的新实例,该实例构造一个新的连接池并建立一个新的连接到redis服务器。

    此新连接尚未订阅您的频道,因此listen() 会立即返回。因此,使用python-redis,您无法在线程之间传递具有未完成订阅(或任何其他有状态命令)的已建立连接。

    根据您计划如何实现您的应用,您可能需要切换到使用不同的客户端,或者想出一些其他方式将订阅状态传达给工作线程,例如通过队列发送订阅命令。

    另一个问题是python-redis 使用阻塞套接字,这会阻止您的侦听线程在等待消息时执行其他工作,并且它无法发出希望取消订阅的信号,除非它在收到消息后立即取消订阅。

    【讨论】:

    • 我认为最后一个问题是杀手。我需要监听线程能够随意添加和删除订阅。
    【解决方案2】:

    异步方式:

    Twisted framework 和插头txredisapi

    示例代码(订阅:

    import txredisapi as redis
    
    from twisted.application import internet
    from twisted.application import service
    
    
    class myProtocol(redis.SubscriberProtocol):
        def connectionMade(self):
            print "waiting for messages..."
            print "use the redis client to send messages:"
            print "$ redis-cli publish chat test"
            print "$ redis-cli publish foo.bar hello world"
            self.subscribe("chat")
            self.psubscribe("foo.*")
    
    
            reactor.callLater(10, self.unsubscribe, "chat")
            reactor.callLater(15, self.punsubscribe, "foo.*")
    
            # self.continueTrying = False
            # self.transport.loseConnection()
    
        def messageReceived(self, pattern, channel, message):
            print "pattern=%s, channel=%s message=%s" % (pattern, channel, message)
    
        def connectionLost(self, reason):
            print "lost connection:", reason
    
    
    class myFactory(redis.SubscriberFactory):
        # SubscriberFactory is a wapper for the ReconnectingClientFactory
        maxDelay = 120
        continueTrying = True
        protocol = myProtocol
    
    
    application = service.Application("subscriber")
    srv = internet.TCPClient("127.0.0.1", 6379, myFactory())
    srv.setServiceParent(application)
    

    只有一个线程,不头痛:)

    当然取决于你编码的应用程序类型。在联网的情况下会扭曲。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-09-03
      • 2020-09-26
      • 2023-02-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-22
      • 2015-12-22
      相关资源
      最近更新 更多