【问题标题】:Task delegation in Python/RedisPython/Redis 中的任务委托
【发布时间】:2018-08-07 08:57:39
【问题描述】:

我在考虑可以解决以下问题的架构时遇到问题:

我有一个 Web 应用程序(生产者),可以根据请求接收一些数据。我还有许多应该处理这些数据的进程(消费者)。 1 个请求生成 1 批数据,应仅由 1 个消费者处理。

我目前的解决方案包括接收数据,使用 Redis 将其缓存在内存中,通过消息通道发送一条消息,当消费者在同一通道上侦听时,数据已写入,然后数据由消费者。这里的问题是我需要阻止多个消费者处理相同的数据。那么如何通知其他消费者我已经开始着手这项任务呢?

生产者代码(烧瓶端点):

    data = request.get_json()
    db = redis.Redis(connection_pool=pool)
    db.set(data["externalId"], data)
    # Subscribe to the batches channel and publish the id
    db.pubsub()
    db.publish('batches', request_key)
    results = None
    result_key = str(data["externalId"])

    # Wait till the batch is processed
    while results is None:
        results = db.get(result_key)
        if results is not None:
            results = results.decode('utf8')

    db.delete(data["externalId"])
    db.delete(result_key)

消费者:

    db = redis.Redis(connection_pool = pool)
    channel = db.pubsub()
    channel.subscribe('batches')

    while True:
        try:
            message = channel.get_message()
            message_data = bytes(message['data']).decode('utf8')
            external_id = message_data.split('-')[-1]
            data = json.loads(db.get(external_id).decode('utf8'))
            result = DataProcessor.process(data)
            db.set(str(external_id), result)
        except Exception:
            pass

【问题讨论】:

    标签: python python-3.x multithreading design-patterns redis


    【解决方案1】:

    正是由于这个原因,PUBSUB 对于任务队列来说通常是有问题的。来自文档 (https://redis.io/topics/pubsub):

    SUBSCRIBE、UNSUBSCRIBE 和 PUBLISH 实现了发布/订阅消息传递范式,其中(引用 Wikipedia)发送者(发布者)没有被编程为将他们的消息发送给特定的接收者(订阅者)。相反,发布的消息被表征为频道,而不知道可能有哪些订阅者(如果有的话)。

    一种流行的替代方法是通过将元素推送到 Redis 列表的末尾来实现“发布”,并通过让您的工作人员每隔一段时间轮询该列表来“订阅”(指数退避通常是一个合适的选择) .为了避免多个工作人员获得相同工作的情况,请使用lpop 从列表中获取和删除一个元素。 Redis 是单线程的,因此可以保证只有一个工作人员会接收每个元素。

    所以,在发布方面,目标是这样的:

    db = redis.Redis(connection_pool=pool)
    db.rpush("my_queue", task_payload)
    

    在订阅端,您可以安全地并行运行这样的循环,只要您需要多次:

    while True:
        db = redis.Redis(connection_pool=pool)
        payload = db.lpop("my_queue")
        if not payload:
            continue
        < deserialize and process payload here >
    

    请注意,这是一个后进先出队列 (LIFO),因为我们使用 rpush 推送到右侧,并使用 lpop 从左侧弹出。您可以通过组合lpush/lpop 轻松实现 FIFO 版本。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-07-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多