【发布时间】:2018-08-07 08:57:39
【问题描述】:
我在考虑可以解决以下问题的架构时遇到问题:
我有一个 Web 应用程序(生产者),可以根据请求接收一些数据。我还有许多应该处理这些数据的进程(消费者)。 1 个请求生成 1 批数据,应仅由 1 个消费者处理。
我目前的解决方案包括接收数据,使用 Redis 将其缓存在内存中,通过消息通道发送一条消息,当消费者在同一通道上侦听时,数据已写入,然后数据由消费者。这里的问题是我需要阻止多个消费者处理相同的数据。那么如何通知其他消费者我已经开始着手这项任务呢?
生产者代码(烧瓶端点):
data = request.get_json()
db = redis.Redis(connection_pool=pool)
db.set(data["externalId"], data)
# Subscribe to the batches channel and publish the id
db.pubsub()
db.publish('batches', request_key)
results = None
result_key = str(data["externalId"])
# Wait till the batch is processed
while results is None:
results = db.get(result_key)
if results is not None:
results = results.decode('utf8')
db.delete(data["externalId"])
db.delete(result_key)
消费者:
db = redis.Redis(connection_pool = pool)
channel = db.pubsub()
channel.subscribe('batches')
while True:
try:
message = channel.get_message()
message_data = bytes(message['data']).decode('utf8')
external_id = message_data.split('-')[-1]
data = json.loads(db.get(external_id).decode('utf8'))
result = DataProcessor.process(data)
db.set(str(external_id), result)
except Exception:
pass
【问题讨论】:
标签: python python-3.x multithreading design-patterns redis