【问题标题】:How to use pyav or opencv to decode a live stream of raw H.264 data?如何使用 pyav 或 opencv 解码原始 H.264 数据的实时流?
【发布时间】:2020-10-26 19:15:09
【问题描述】:

数据由socket接收,没有更多的shell,它们是纯IP B帧,以NAL Header(类似于00 00 00 01)开头。我现在正在使用 pyav 解码帧,但我只能在接收到第二个 pps 信息(在关键帧中)之后解码数据(所以我发送到我的解码线程的数据块可以以 pps 和 sps 开头),否则decode() 或 demux() 会返回错误“non-existing PPS 0 referenced decode_slice_header error”。

我想将数据馈送到可以记住前一个 P 帧的持续解码器,因此在馈送一个 B 帧后,解码器返回一个解码后的视频帧。或者某种形式的 IO,可以作为容器打开并通过另一个线程继续向其中写入数据。

这是我的关键代码:

#read thread... read until get a key frame, then make a new io.BytesIO() to store the new data.
rawFrames = io.BytesIO()
while flag_get_keyFrame:()
    ....
    content= socket.recv(2048)
    rawFrames.write(content)
    ....

#decode thread... decode content between two key frames
....
rawFrames.seek(0)
container = av.open(rawFrames)
for packet in container.demux():
    for frame in packet.decode():
        self.frames.append(frame)
....

我的代码会播放视频,但会有 3~4 秒的延迟。所以我不会把所有的东西都放在这里,因为我知道它实际上并没有达到我想要实现的目标。 我想在收到第一个关键帧后播放视频,并在收到后立即解码以下帧。 Pyav opencv ffmpeg 或其他,我怎样才能实现我的目标?

【问题讨论】:

    标签: python opencv ffmpeg h.264 pyav


    【解决方案1】:

    经过数小时的寻找答案。这是我自己想出来的。

    对于单线程,您可以执行以下操作:

    rawData = io.BytesIO()
    container = av.open(rawData, format="h264", mode='r')
    cur_pos = 0
    while True:
        data = await websocket.recv()
        rawData.write(data)
        rawData.seek(cur_pos)
        for packet in container.demux():
            if packet.size == 0:
                continue
            cur_pos += packet.size
            for frame in packet.decode():
                self.frames.append(frame)
    

    这是基本思想。我已经制定了一个将接收线程和解码线程分开的通用版本。如果 CPU 跟不上解码速度,代码也会跳帧,并从下一个关键帧开始解码(这样就不会出现撕裂的绿屏效果)。以下是完整版代码:

    import asyncio
    import av
    import cv2
    import io
    from multiprocessing import Process, Queue, Event
    import time
    import websockets
    
    def display_frame(frame, start_time, pts_offset, frame_rate):
        if frame.pts is not None:
            play_time = (frame.pts - pts_offset) * frame.time_base.numerator / frame.time_base.denominator
            if start_time is not None:
                current_time = time.time() - start_time
                time_diff = play_time - current_time
                if time_diff > 1 / frame_rate:
                    return False
                if time_diff > 0:
                    time.sleep(time_diff)
        img = frame.to_ndarray(format='bgr24')
        cv2.imshow('Video', img)
        return True
    
    def get_pts(frame):
        return frame.pts
    
    def render(terminated, data_queue):
        rawData = io.BytesIO()
        cur_pos = 0
        frames_buffer = []
        start_time = None
        pts_offset = None
        got_key_frame = False
        while not terminated.is_set():
            try:
                data = data_queue.get_nowait()
            except:
                time.sleep(0.01)
                continue
            rawData.write(data)
            rawData.seek(cur_pos)
            if cur_pos == 0:
                container = av.open(rawData, mode='r')
                original_codec_ctx = container.streams.video[0].codec_context
                codec = av.codec.CodecContext.create(original_codec_ctx.name, 'r')
            cur_pos += len(data)
            dts = None
            for packet in container.demux():
                if packet.size == 0:
                    continue
                dts = packet.dts
                if pts_offset is None:
                    pts_offset = packet.pts
                if not got_key_frame and packet.is_keyframe:
                    got_key_frame = True
                if data_queue.qsize() > 8 and not packet.is_keyframe:
                    got_key_frame = False
                    continue
                if not got_key_frame:
                    continue
                frames = codec.decode(packet)
                if start_time is None:
                    start_time = time.time()
                frames_buffer += frames
                frames_buffer.sort(key=get_pts)
                for frame in frames_buffer:
                    if display_frame(frame, start_time, pts_offset, codec.framerate):
                        frames_buffer.remove(frame)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
            if dts is not None:
                container.seek(25000)
            rawData.seek(cur_pos)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        terminated.set()
        cv2.destroyAllWindows()
    
    async def receive_encoded_video(websocket, path):
        data_queue = Queue()
        terminated = Event()
        p = Process(
            target=render,
            args=(terminated, data_queue)
        )
        p.start()
        while not terminated.is_set():
            try:
                data = await websocket.recv()
            except:
                break
            data_queue.put(data)
        terminated.set()
    

    【讨论】:

      【解决方案2】:

      正常情况下会延迟 3~4 秒,因为您正在读取编码数据并通过 CPU 对其进行解码需要时间。

      1. 如果您有 GPU 硬件,您可以使用 FFMPEG 通过 GPU 解码 H264。 Here 就是一个例子。
      2. 如果您没有 GPU,在 CPU 上解码 H264 总是会导致延迟。您可以使用 FFMPEG 进行有效解码,但这也会将总延迟降低近 10%

      【讨论】:

      • 谢谢,但我很确定我的问题不是由 cpu 解码引起的。我想要的是在收到第一个 P 帧后立即解码帧。
      猜你喜欢
      • 2015-12-12
      • 2016-10-01
      • 2014-01-13
      • 2013-04-07
      • 1970-01-01
      • 1970-01-01
      • 2015-05-11
      • 2015-12-04
      • 2012-06-22
      相关资源
      最近更新 更多