【发布时间】:2018-07-15 01:57:04
【问题描述】:
我正在使用ffmpeg 和subrocess 构建实时多视频流监控。 我目前有以下代码,灵感来自"Async and await with subprocesses" post。
问题是在一段时间后输出停止打印并且进程进入僵尸模式。我猜这个问题与PIPE的过载或死锁有关。需要帮助。
"""Async and await example using subprocesses
Note:
Requires Python 3.6.
"""
import os
import sys
import time
import platform
import asyncio
async def run_command_shell(command):
"""Run command in subprocess (shell)
Note:
This can be used if you wish to execute e.g. "copy"
on Windows, which can only be executed in the shell.
"""
# Create subprocess
process = await asyncio.create_subprocess_shell(
command,
stderr=asyncio.subprocess.PIPE)
# Status
print('Started:', command, '(pid = ' + str(process.pid) + ')')
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Progress
if process.returncode == 0:
print('Done:', command, '(pid = ' + str(process.pid) + ')')
else:
print('Failed:', command, '(pid = ' + str(process.pid) + ')')
# Result
result = stderr.decode().strip()
# Real time print
print(result)
# Return stdout
return result
def make_chunks(l, n):
"""Yield successive n-sized chunks from l.
Note:
Taken from https://stackoverflow.com/a/312464
"""
if sys.version_info.major == 2:
for i in xrange(0, len(l), n):
yield l[i:i + n]
else:
# Assume Python 3
for i in range(0, len(l), n):
yield l[i:i + n]
def run_asyncio_commands(tasks, max_concurrent_tasks=0):
"""Run tasks asynchronously using asyncio and return results
If max_concurrent_tasks are set to 0, no limit is applied.
Note:
By default, Windows uses SelectorEventLoop, which does not support
subprocesses. Therefore ProactorEventLoop is used on Windows.
https://docs.python.org/3/library/asyncio-eventloops.html#windows
"""
all_results = []
if max_concurrent_tasks == 0:
chunks = [tasks]
else:
chunks = make_chunks(l=tasks, n=max_concurrent_tasks)
for tasks_in_chunk in chunks:
if platform.system() == 'Windows':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
commands = asyncio.gather(*tasks_in_chunk) # Unpack list using *
results = loop.run_until_complete(commands)
all_results += results
loop.close()
return all_results
if __name__ == '__main__':
start = time.time()
if platform.system() == 'Windows':
# Commands to be executed on Windows
commands = [
['hostname']
]
else:
# Commands to be executed on Unix
commands = [
['du', '-sh', '/var/tmp'],
['hostname'],
]
cmds = [["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"]]
tasks = []
for command in cmds:
tasks.append(run_command_shell(*command))
# # Shell execution example
# tasks = [run_command_shell('copy c:/somefile d:/new_file')]
# # List comprehension example
# tasks = [
# run_command(*command, get_project_path(project))
# for project in accessible_projects(all_projects)
# ]
results = run_asyncio_commands(tasks, max_concurrent_tasks=20) # At most 20 parallel tasks
print('Results:', results)
end = time.time()
rounded_end = ('{0:.4f}'.format(round(end-start,4)))
print('Script ran in about', str(rounded_end), 'seconds')
【问题讨论】:
-
创建一个 minimal 代码示例来演示该问题。如果您可以使用单个
run_command_shell()重现它,请放弃其他呼叫。如果您可以使用虚拟 python 脚本而不是ffmpeg使用未知视频流来重现它,请使用它。 minimal reproducible example¶ 这里不需要多个循环,将循环创建代码移至if __name__ == "__main__"。loop.close()应该是你程序中的最后一件事¶ 你在这里不需要 shell。您可以直接运行命令您一次读取所有输出,直到子进程结束 - 这不是“非阻塞实时读取” -
一次运行
n并行进程的简单方法是线程池。 Python: execute cat subprocess in parallel。在asyncio解决方案中,您可以use aSemaphore()to limit the number of concurrent tasks
标签: python multithreading asynchronous ffmpeg subprocess