【问题标题】:Python parallelization of tasks (event loop?)任务的 Python 并行化(事件循环?)
【发布时间】:2013-04-09 07:42:50
【问题描述】:

现状

我正在写 a small Python program 以在您的终端中播放来自 8tracks 的播放列表。

它由三部分组成,一个使用stdlib的cmd模块的client.py,一个使用python-requests访问API的api.py模块和一个在从模式下创建mplayer子进程的player.py模块和向它发送命令。

问题

到目前为止,这有效,问题是除了轮询子进程的标准输出之外,我没有其他方法可以判断歌曲是否已在 mplayer 中播放完毕。这意味着我必须观看该过程,以便在歌曲播放完毕后请求并开始播放列表中的下一首歌曲。

问题是等待子进程会阻塞cmd 模块的主循环。不过,我也不能简单地在单独的线程或进程中运行它,因为我必须共享对子进程的 stdout 的引用,而像这样的引用不能在进程之间共享。

可能的方法

我为此想到了不同的解决方案。我可以将player.py 放在一个单独的进程中并通过队列发送文本命令,但这会使事情变得过于复杂。我可以创建一个 Twisted 应用程序,但 Twisted 相当大,我不知道从哪里开始。另外,我不希望在我的项目中有这样的依赖。

第三种解决方案是使用 Gevent。问题是我如何让它与cmd 模块一起工作。据我了解 Gevent,我将不得不在我“等待”某事的每个地方屈服。在这种情况下,这将是在 HTTP 请求期间、在 cmd.cmdloop() 期间等待以及在子进程轮询之间的暂停期间。但是如何让cmd 模块屈服?某种子类或猴子补丁?

【问题讨论】:

  • 为什么你更喜欢“在你的项目中没有这样的依赖”?到底什么是“这种依赖”?依赖 Gevent 或 Python 与依赖 Twisted 有何不同?
  • 只是我认为 Twisted 是一个“巨大的项目”,我不太了解,仅此而已。但我并不完全反对它,只要它在我的情况下工作而不重写我所有的代码:)
  • 为什么不能只有一个命令输出尾线程?在进程之间共享 fds 很棘手,但在线程之间这应该是可行的。
  • @fmoo 嗯,到目前为止我只试过multiprocessing.dummy(它使用线程),它不能做这样的事情。但也许我应该尝试直接使用Threading...
  • 您能否发布一些(简化的?)player.py 的代码?过去,我完全将子进程尾部拆分为自己的线程,没有问题。 multiprocessing.dummy 应该也能正常工作。

标签: python parallel-processing subprocess twisted gevent


【解决方案1】:

从您的代码来看,您可以在与 mplayer 通信时使用 pexpect。 Pexpect(或期望)非常适合来回 stdio 通信。

【讨论】:

  • Pexpect 看起来不错,问题是我如何异步运行它,同时仍然有可能从外部进程/线程写入进程的标准输入(我想观察进程结束一首歌,但我也希望能够暂停或跳过一首曲目)。我必须以某种方式共享标准输入引用,这可能会在使用 pexpect 时导致同步问题。
  • Pexpect 生成一个新进程并使其可用于您的 python 脚本。您是否希望第三方也可以访问它?
  • 不,但我需要能够暂停曲目,同时观看标准输出以获取类似“播放完毕”的消息。单独线程/进程的唯一替代方案可能是协程。到目前为止,我还没有弄清楚如何在线程/进程之间共享对进程标准输出的引用。
【解决方案2】:

看来我想通了。我之前使用线程的所有尝试都是使用 multiprocessing.dummy 模块,它包装了 threading 模块,但在参数传递时表现有点不同 - 你不能传递引用。

直接使用threading,好像可以。我每次加载新曲目时都会启动一个后台线程来做到这一点。当曲目播放完毕后,我会向客户端发送SIGUSR1 信号,客户端通过加载和播放新歌曲来处理它。

player.py

import os
import signal
import threading
from pipes import quote

class MPlayer(object):

    def __init__(self):
        self.process = Process(['mplayer',
            '-slave', '-idle',
            '-really-quiet', '-msglevel', 'global=6:cplayer=4', '-msgmodule',
            '-input', 'nodefault-bindings',
            '-cache', '1024',
        ], bufsize=1)
        self.write_lock = threading.Lock()

    # (...)

    def load(self, path):

        with self.write_lock:
            self.p.write('loadfile {}\n'.format(quote(path)))

        def wait_for_finish(process):
            # HERE: poll process for track ending with process.read()
            os.kill(os.getpid(), signal.SIGUSR1)

        t = threading.Thread(target=wait_for_finish, args=(self.process,))
        t.daemon = True
        t.start()

client.py

import cmd
import signal
from player import MPlayer

class PlayCommand(cmd.Cmd, object):

    def __init__(self, *args, **kwargs):

        # (...)

        self.p = MPlayer()
        signal.signal(signal.SIGUSR1, self._song_end_handler)

    def _song_end_handler(self, signum, frame):
        print('SIGUSR1!!!!!!!!!!111!1')
        # HERE: Fetch new track URL
        self.p.load()

不过,如果有人认为他/她使用协程或事件找到了更好的解决方案,请随时用您的解决方案来回答。

【讨论】:

  • 代码中需要注意的一点:无论何时调用self.p.read(),都会消耗/删除自上次调用以来写入标准输出的所有数据。当前的实现似乎也过于复杂,但我还没有深入了解同步/读取问题的来源
  • @fmoo 是的,我知道,谢谢。目前这不是问题,但我正在尽可能地清理东西。我刚刚提交 + 推送了一个工作版本,但是如果您知道如何进一步简化事情,那当然会受到欢迎。
  • 对于否决这个答案的人,为什么?通过解释和代码示例,它是解决问题的有效解决方案。
猜你喜欢
  • 2020-09-14
  • 2018-04-10
  • 2011-10-25
  • 1970-01-01
  • 2023-03-04
  • 1970-01-01
  • 1970-01-01
  • 2017-03-17
相关资源
最近更新 更多