【发布时间】:2021-05-09 20:21:21
【问题描述】:
我有一个由 Redis pubsub 消息触发的函数。问题是这个加载视频并将其逐帧流式传输到 Redis 的函数是阻塞的,因此后续消息无法通过。
解决这类问题最简单、最有效的模式是什么?
订阅:
conn = redis.Redis(host="localhost", port="6379")
if not conn.ping():
raise Exception('Redis unavailable')
pubsub = conn.pubsub()
pubsub.subscribe("feed")
data = None
for message in pubsub.listen():
logging.info("received pubsub message")
logging.info(message)
logging.info(message['type'])
if message['type'] == "message":
data = json.loads(message.get("data"))
if data and data['source']:
try:
args.infile = data['source']
loader = Video(infile=data.get("source"), fps=30.0)
load(loader, conn, args)
except error:
logging.error("Error occurred", exc_info=True)
视频类:
class Video:
def __init__(self, infile=0, fps=30.0):
try:
self.isFile = not str(infile).isdecimal()
print('video: self.isFile', self.isFile)
self.ts = time.time()
self.infile = infile
self.cam = cv2.VideoCapture(self.infile)
if not self.isFile:
self.cam.set(cv2.CAP_PROP_FPS, fps)
self.fps = fps
# TODO: some cameras don't respect the fps directive
self.cam.set(cv2.CAP_PROP_FRAME_WIDTH, 800)
self.cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)
else:
self.fps = self.cam.get(cv2.CAP_PROP_FPS)
self.sma = SimpleMovingAverage(value=0.1, count=19)
except error as error:
# Output expected AssertionErrors.
logging.error("Error occurred", exc_info=True)
def __iter__(self):
self.count = -1
return self
def __next__(self):
try:
self.count += 1
if not self.fps:
self.fps = 30.0
# Respect FPS for files
if self.isFile:
delta = time.time() - self.ts
self.sma.add(delta)
time.sleep(max(0,(1 - self.sma.current*self.fps)/self.fps))
self.ts = time.time()
# Read image
ret_val, img0 = self.cam.read()
if not ret_val and self.isFile:
self.cam.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret_val, img0 = self.cam.read()
assert ret_val, 'Video Error'
# Preprocess
img = img0
if not self.isFile:
img = cv2.flip(img, 1)
return self.count, img
except AssertionError as error:
# Output expected AssertionErrors.
redisLog("Error occurred", exc_info=True)
except Exception as exception:
# Output unexpected Exceptions.
logging.exception("Exception occurred")
def __len__(self):
return 0
加载函数:
def load(loader, conn, args):
try:
for (count, img) in loader:
_, data = cv2.imencode(args.fmt, img)
msg = {
'count': count,
'image': data.tobytes()
}
_id = conn.xadd(args.output, msg, maxlen=args.maxlen)
if args.verbose:
print('frame: {} id: {}'.format(count, _id))
if args.count is not None and count+1 == args.count:
print('Stopping after {} frames.'.format(count))
break
except AssertionError:
logging.error("Error occurred", exc_info=True)
raise
【问题讨论】:
标签: python asynchronous publish-subscribe