【问题标题】:Non-blocking realtime read from multiple shell subprocesses (Python)从多个 shell 子进程(Python)的非阻塞实时读取
【发布时间】:2018-07-15 01:57:04
【问题描述】:

我正在使用ffmpegsubrocess 构建实时多视频流监控。 我目前有以下代码,灵感来自"Async and await with subprocesses" post

问题是在一段时间后输出停止打印并且进程进入僵尸模式。我猜这个问题与PIPE的过载或死锁有关。需要帮助。

"""Async and await example using subprocesses

Note:
    Requires Python 3.6.
"""

import os
import sys
import time
import platform
import asyncio

async def run_command_shell(command):
    """Run command in subprocess (shell)

    Note:
        This can be used if you wish to execute e.g. "copy"
        on Windows, which can only be executed in the shell.
    """
    # Create subprocess
    process = await asyncio.create_subprocess_shell(
        command,
        stderr=asyncio.subprocess.PIPE)

    # Status
    print('Started:', command, '(pid = ' + str(process.pid) + ')')

    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()

    # Progress
    if process.returncode == 0:
        print('Done:', command, '(pid = ' + str(process.pid) + ')')
    else:
        print('Failed:', command, '(pid = ' + str(process.pid) + ')')

    # Result
    result = stderr.decode().strip()

    # Real time print
    print(result)

    # Return stdout
    return result


def make_chunks(l, n):
    """Yield successive n-sized chunks from l.

    Note:
        Taken from https://stackoverflow.com/a/312464
    """
    if sys.version_info.major == 2:
        for i in xrange(0, len(l), n):
            yield l[i:i + n]
    else:
        # Assume Python 3
        for i in range(0, len(l), n):
            yield l[i:i + n]


def run_asyncio_commands(tasks, max_concurrent_tasks=0):
    """Run tasks asynchronously using asyncio and return results

    If max_concurrent_tasks are set to 0, no limit is applied.

    Note:
        By default, Windows uses SelectorEventLoop, which does not support
        subprocesses. Therefore ProactorEventLoop is used on Windows.
        https://docs.python.org/3/library/asyncio-eventloops.html#windows
    """

    all_results = []

    if max_concurrent_tasks == 0:
        chunks = [tasks]
    else:
        chunks = make_chunks(l=tasks, n=max_concurrent_tasks)

    for tasks_in_chunk in chunks:
        if platform.system() == 'Windows':
            loop = asyncio.ProactorEventLoop()
            asyncio.set_event_loop(loop)
        else:
            loop = asyncio.get_event_loop()

        commands = asyncio.gather(*tasks_in_chunk)  # Unpack list using *
        results = loop.run_until_complete(commands)
        all_results += results
        loop.close()
    return all_results


if __name__ == '__main__':

    start = time.time()

    if platform.system() == 'Windows':
        # Commands to be executed on Windows
        commands = [
            ['hostname']
        ]
    else:
        # Commands to be executed on Unix
        commands = [
            ['du', '-sh', '/var/tmp'],
            ['hostname'],
        ]
    cmds = [["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"]]

    tasks = []
    for command in cmds:
        tasks.append(run_command_shell(*command))


    # # Shell execution example
    # tasks = [run_command_shell('copy c:/somefile d:/new_file')]

    # # List comprehension example
    # tasks = [
    #     run_command(*command, get_project_path(project))
    #     for project in accessible_projects(all_projects)
    # ]

    results = run_asyncio_commands(tasks, max_concurrent_tasks=20)  # At most 20 parallel tasks
    print('Results:', results)

    end = time.time()
    rounded_end = ('{0:.4f}'.format(round(end-start,4)))
    print('Script ran in about', str(rounded_end), 'seconds')

相关:Non-blocking read from multiple subprocesses (Python)

【问题讨论】:

  • 创建一个 minimal 代码示例来演示该问题。如果您可以使用单个 run_command_shell() 重现它,请放弃其他呼叫。如果您可以使用虚拟 python 脚本而不是 ffmpeg 使用未知视频流来重现它,请使用它。 minimal reproducible example¶ 这里不需要多个循环,将循环创建代码移至if __name__ == "__main__"loop.close() 应该是你程序中的最后一件事¶ 你在这里不需要 shell。您可以直接运行命令您一次读取所有输出,直到子进程结束 - 这不是“非阻塞实时读取”
  • 一次运行n 并行进程的简单方法是线程池。 Python: execute cat subprocess in parallel。在asyncio 解决方案中,您可以use a Semaphore() to limit the number of concurrent tasks

标签: python multithreading asynchronous ffmpeg subprocess


【解决方案1】:

原来问题可能与通过多线程asyncio等进行代码优化无关

原因可能是服务器限制,例如打开文件/文件描述符 (FD)、防火墙、其他配置文件的最大数量。

如果你偶然发现了类似的问题:


安装 htop

Htop 是适用于 Linux/Unix 类系统的交互式实时进程监控应用程序,也是top 命令的便捷替代品,top 命令是预装在所有 Linux 操作系统上的默认进程监控工具。

这可能有助于澄清原因。


测试单个 ffmpeg 命令

正如 jfs 所说,我需要 Minimal, Complete, and Verifiable example 。所以我们从一个非常小的开始:测试一个进程。

ffmpeg -y -i udp://224.10.0.123:1234  -f null -

就我而言,事实证明任何多播都会在 2:10 - 2:20 挂起 分钟。进程活着但处于僵尸模式。这很奇怪, 因为几天前一切正常。


测试另一个软件 (VLC's multicat)

Multicat最新正式版编号为2.2,可here获取。

得到它,不要忘记,biTStream 需要在构建时安装。

使用从流中录制视频的命令检查流:

timeout 10 multicat -uU @224.10.0.123:1234 test.ts

就我而言,同样的事情发生在第 2 分钟。该命令并未停止执行,但文件已停止记录。


检查打开文件/文件描述符的最大数量more info

使用以下命令命令显示打开文件描述符的最大数量:

cat /proc/sys/fs/file-max

要查看硬值和软值,请发出以下命令:

ulimit -Hn
ulimit -Sn

在执行我的一个 python 脚本的某个时刻,我看到了类似的错误,但是这个参数的增加对我没有帮助。


总结

所以问题与我的脚本的并行执行无关。在另一台虚拟机上验证成功。我联系了设置这个虚拟机的人,向他解释说最近几天出了点问题,提示问题出在防火墙上。他说他什么都没碰。但在这个电话之后,一切都开始完美运作。 (我几乎可以肯定他把它弄坏了):D

大家好!

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-29
    • 2011-09-18
    • 2014-03-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多