前言

最近在功能性测试的过程中,需要在Python环境下用OpenCV读取网络摄像头的视频流,接着用目标检测器进行视屏帧的后续处理。在测试过程中发现如果是单线程的情况,会出现比较严重的时延,如果目标检测模型稍微大一点,像YOLOv4这类的,那么情况更加严重。

后面考虑到演示效果,从单线程改为了多线程,即单独用一个线程实时捕获视频帧,主线程在需要时从子线程拷贝最近的帧使用即可。通过这样的修改,不仅时延基本消失,整个流程的实时性也有相对的提升,可以说是非常实用的技巧。

Python多线程编程

使用Python进行多线程编程是较为简单的,Python的threading模块封装了相关的操作,通过编写功能类继承threading.Thread即可实现自己的逻辑。简单的代码示例如下所示:

class myThread(threading.Thread):
    def __init__(self, name=None):
        super(myThread, self).__init__(name=name)
    def run(self):
        print('=> Thread %s is running ...' % self.name)
thread = myThread()
thread.start()
thread.join()

上面的代码简单展示了如何使用线程类:通过调用start()方法,线程实例开始在单独的线程上下文中运行自己的run()函数处理任务,直到线程退出。在此期间,主线程可以继续执行任务。当主线程任务执行结束时,主线程可通过设置全局状态变量告知子线程退出,同时调用join()方法等待子线程运行结束。

OpenCV视屏流的多线程处理

在上面例子的基础上,可对简单的单线程处理流程进行优化,即将读取视频帧的部分单独放在一个线程执行,同时提供线程间同步、数据交互的支持,在主线程中运行目标检测模型和后续处理流程,在需要时从读取视频帧的子线程获取最近的帧进行预处理、推理、后处理和可视化等操作。相关的示例代码如下:

import numpy as np
import cv2
import threading
from copy import deepcopy
thread_lock = threading.Lock()
thread_exit = False
class myThread(threading.Thread):
    def __init__(self, camera_id, img_height, img_width):
        super(myThread, self).__init__()
        self.camera_id = camera_id
        self.img_height = img_height
        self.img_width = img_width
        self.frame = np.zeros((img_height, img_width, 3), dtype=np.uint8)
    def get_frame(self):
        return deepcopy(self.frame)
    def run(self):
        global thread_exit
        cap = cv2.VideoCapture(self.camera_id)
        while not thread_exit:
            ret, frame = cap.read()
            if ret:
                frame = cv2.resize(frame, (self.img_width, self.img_height))
                thread_lock.acquire()
                self.frame = frame
                thread_lock.release()
            else:
                thread_exit = True
        cap.release()
def main():
    global thread_exit
    camera_id = 0
    img_height = 480
    img_width = 640
    thread = myThread(camera_id, img_height, img_width)
    thread.start()
    while not thread_exit:
        thread_lock.acquire()
        frame = thread.get_frame()
        thread_lock.release()
        cv2.imshow('Video', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            thread_exit = True
    thread.join()
if __name__ == "__main__":
    main()

在上面的代码中,为确保资源访问不受冲突,使用threading.Lock进行保护;主线程使用thread_exit全局状态变量控制子线程的运行状态。稍微特别一点的是,thread_exit实际上控制着两个线程的运行状态,因为在上述的处理流程中,两个线程都拥有终止运行流程的话语权,故这样的处理是合理的。

结语

实际上使用多线程并行处理任务,最大程度地利用资源早已是老生常谈的技巧,例如在服务器端,会开辟有专门的线程池用于处理随时可能到来的请求,而在嵌入式通信终端上,也通常采用线程池的方式来处理收到的消息包,以尽可能提升实时性。虽然多线程的处理方式相较单线程而言要稍微复杂一些,但带来的性能提升确是实打实的,所以还是很值得一试。

原文地址:https://blog.csdn.net/hlld__/article/details/110087110

相关文章: