【问题标题】:Pattern for Python Redis Pub-Sub-multithreading? Async?Python Redis Pub-Sub-multithreading 的模式?异步?
【发布时间】:2021-05-09 20:21:21
【问题描述】:

我有一个由 Redis pubsub 消息触发的函数。问题是这个加载视频并将其逐帧流式传输到 Redis 的函数是阻塞的,因此后续消息无法通过。

解决这类问题最简单、最有效的模式是什么?

订阅:

conn = redis.Redis(host="localhost", port="6379")
if not conn.ping():
        raise Exception('Redis unavailable')

pubsub = conn.pubsub()
pubsub.subscribe("feed")
data = None
for message in pubsub.listen():
    logging.info("received pubsub message")
    logging.info(message)
    logging.info(message['type'])
    if message['type'] == "message":
        data = json.loads(message.get("data"))
        if data and data['source']:
            try:
                args.infile = data['source']
                loader = Video(infile=data.get("source"), fps=30.0)
                load(loader, conn, args)
            except error:
                logging.error("Error occurred", exc_info=True)

视频类:

class Video:
    def __init__(self, infile=0, fps=30.0):
        try: 
            self.isFile = not str(infile).isdecimal()
            print('video: self.isFile', self.isFile)
            self.ts = time.time()
            self.infile = infile
            self.cam = cv2.VideoCapture(self.infile)
            if not self.isFile:
                self.cam.set(cv2.CAP_PROP_FPS, fps)
                self.fps = fps
                # TODO: some cameras don't respect the fps directive
                self.cam.set(cv2.CAP_PROP_FRAME_WIDTH, 800)
                self.cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)
            else:
                self.fps = self.cam.get(cv2.CAP_PROP_FPS)
                self.sma = SimpleMovingAverage(value=0.1, count=19)
        except error as error:
            # Output expected AssertionErrors.
            logging.error("Error occurred", exc_info=True)
 
    def __iter__(self):
        self.count = -1
        return self

    def __next__(self):
        try:
            self.count += 1
            if not self.fps:
                self.fps = 30.0
            # Respect FPS for files
            if self.isFile:
                delta = time.time() - self.ts
                self.sma.add(delta)
                time.sleep(max(0,(1 - self.sma.current*self.fps)/self.fps))
                self.ts = time.time()

            # Read image
            ret_val, img0 = self.cam.read()
            if not ret_val and self.isFile:
                self.cam.set(cv2.CAP_PROP_POS_FRAMES, 0)
                ret_val, img0 = self.cam.read()
            assert ret_val, 'Video Error'

            # Preprocess
            img = img0
            if not self.isFile:
                img = cv2.flip(img, 1)

            return self.count, img
        except AssertionError as error:
            # Output expected AssertionErrors.
            redisLog("Error occurred", exc_info=True)
        except Exception as exception:
            # Output unexpected Exceptions.
            logging.exception("Exception occurred")

    def __len__(self):
        return 0

加载函数:

def load(loader, conn, args):
    try:
        for (count, img) in loader:
            _, data = cv2.imencode(args.fmt, img)
            msg = {
                'count': count,
                'image': data.tobytes()
            }
            _id = conn.xadd(args.output, msg, maxlen=args.maxlen)
            if args.verbose:
                print('frame: {} id: {}'.format(count, _id))
            if args.count is not None and count+1 == args.count:
                print('Stopping after {} frames.'.format(count))
                break
    except AssertionError:
        logging.error("Error occurred", exc_info=True)
        raise

【问题讨论】:

    标签: python asynchronous publish-subscribe


    【解决方案1】:

    我最终使用了多处理。这有点痛苦,因为您只能将 Pickelable 参数传递给子进程,这意味着连接已断开。因此,我必须从子进程中进行连接。

    procs = []
    logging.basicConfig(level=logging.DEBUG)
    
    for message in pubsub.listen():
        logging.info("received pubsub message")
        logging.info(message)
        logging.info(message['type'])
        try:
            if message['type'] == "message":
                data = json.loads(message.get("data"))
                if data and data['source']:
                        for proc in procs:
                            if proc.is_alive():
                                proc.terminate()
                                proc.join(timeout=0)
                                procs.pop(0)
                        loaderProcess = multiprocessing.Process(target=load, args = (data.get("source"), args,))
                        procs.append(loaderProcess)
                        loaderProcess.start()
                        continue
        except Exception as e:
            logging.error("Error occurred", exc_info=True)
    

    ...

    def load(source, args):
        try:
            conn = redis.Redis(host="localhost", port="6379")
            if not conn.ping():
                raise Exception('Redis unavailable')
    
            loader = Video(infile=source, fps=30.0)
            for (count, img) in loader:
                _, data = cv2.imencode(args.fmt, img)
                msg = {
                    'count': count,
                    'image': data.tobytes()
                }
                _id = conn.xadd(args.output, msg, maxlen=args.maxlen)
                if args.verbose:
                    print('frame: {} id: {}'.format(count, _id))
                if args.count is not None and count+1 == args.count:
                    print('Stopping after {} frames.'.format(count))
                    break
        except AssertionError:
            logging.error("Error occurred", exc_info=True)
            raise
    

    【讨论】:

      猜你喜欢
      • 2017-09-25
      • 2015-11-09
      • 1970-01-01
      • 2016-01-18
      • 2011-10-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-30
      相关资源
      最近更新 更多