【问题标题】:Thread safety in wxpythonwxpython中的线程安全
【发布时间】:2015-04-23 10:50:43
【问题描述】:

我正在使用 wxpython 构建前端 GUI,用于分析和处理音频文件的命令行工具。文件被加载到 GUI 中;然后启动执行分析和调整步骤的线程;最后,这些过程的结果显示在主窗口中。

我一直在努力编写线程安全的代码;但是,某些线程仍然无法完成(应该注意的是,当我第二次手动启动它们时,它们通常会运行到完成)。下面我包含了我的程序的精简版,其中包含 AnalysisThread、AdjustThread 和 MainWindow 的类。主窗口中的按钮绑定到函数“OnAnalyze”和“OnAdjust”,它们创建相应线程类的实例。线程本身通过 wx.CallAfter 和 Publisher 与 GUI 通信。据我了解,这应该允许数据在主进程和线程之间安全地来回传递。如果有人能指出我在下面的代码中出错的地方,我将不胜感激。

如果我无法解决线程安全问题,我的备用计划是以某种方式检测线程的死亡并尝试在后台“恢复”它,而用户不知道存在故障。这看起来合理吗?如果是这样,我们将非常欢迎您就如何实现这一点提出建议。

非常感谢。

#!/usr/bin/python

import wx
import time
from threading import Thread
import os, sys, re, subprocess, shutil
from wx.lib.pubsub import setuparg1
from wx.lib.pubsub import pub as Publisher


#Start a thread that analyzes audio files.
class AnalysisThread(Thread):
    def __init__(self,args):
        Thread.__init__(self)
        self.file = args[0]
        self.index = args[1]
        self.setDaemon(True)
        self.start()

    def run(self):
       proc = subprocess.Popen(['ffmpeg', '-nostats', '-i', self.file, '-filter_complex', 'ebur128=peak=true+sample', '-f', 'null', '-'], bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
       flag = 0
       summary = ""
       while proc.poll() is None:
          line = proc.stdout.readline()
          if line:
             endProcess = re.search(r'Summary', line)
             if endProcess is not None:
                flag = 1
             if flag:
                summary += line

       wx.CallAfter(Publisher.sendMessage, "update", (self.file, summary, self.index))

#Start a thread that adjusts audio files so that they conform to EBU loudness standards.
class AdjustThread(Thread):
    def __init__(self,args):
        Thread.__init__(self)
        self.file = args[0]
        self.index = args[1]
        self.IL = args[2]
        self.TP = args[3]
        self.SP = args[4]
        self.setDaemon(True)
        self.start()

    def run(self):

       proc = subprocess.Popen(['ffmpeg', '-nostats', '-i', adjusted_file, '-filter_complex', 'ebur128=peak=true+sample', '-f', 'null', '-'], bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
       flag = 0
       summary = ""
       while proc.poll() is None:
          line = proc.stdout.readline()
          if line:
             endProcess = re.search(r'Summary', line)
             if endProcess is not None:
                flag = 1
             if flag:
                summary += line

       wx.CallAfter(Publisher.sendMessage, "update", (self.file, summary, self.index))


class MainWindow(wx.Frame):

    fileList = collections.OrderedDict()

    def __init__(self, parent, id, title):
        wx.Frame.__init__(self, parent, id, title, size=(900, 400))

        Publisher.subscribe(self.UpdateDisplay, "update")

        #Add "analyze" and "Adjust" buttons to the main frame.
        panel = wx.Panel(self, -1)
        vbox = wx.BoxSizer(wx.VERTICAL)
        self.ana = wx.Button(panel, -1, 'Analyze', size=(100, -1))
        self.adj = wx.Button(panel, -1, 'Adjust', size=(100, -1))
        self.Bind(wx.EVT_BUTTON, self.OnAnalyze, id=self.ana.GetId())
        self.Bind(wx.EVT_BUTTON, self.OnAdjust, id=self.adj.GetId())
        vbox.Add(self.ana, 0, wx.ALL, 10)
        vbox.Add(self.adj, 0, wx.ALL, 10)
        vbox.Add(self.list, 1, wx.EXPAND | wx.TOP, 3)
        vbox.Add((-1, 10))
        panel.SetSizer(hbox)

        self.Centre()
        self.Show(True)

    #This function gets called when "Analyze" is pressed.
    def OnAnalyze(self, event):

        for (file,index) in toAnalyze:

           #Add a progess bar
           item = self.list.GetItem(index,2)
           gauge = item.GetWindow()
           gauge.Pulse()

           #Launch the analysis thread
           AnalysisThread(args=(file,index,))

    #This function gets called when "Adjust" is pressed.
    def OnAdjust(self, event):

        for (file,index) in toAdjust:
           gauge = wx.Gauge(self.list,-1,range=50,size=(width,15),style=wx.GA_HORIZONTAL | wx.GA_SMOOTH)
           gauge.Pulse() #shouldn't start this right away...
           item.SetWindow(gauge, wx.ALIGN_CENTRE)
           self.list.SetItem(item) 

           #Launch the adjust thread
           AdjustThread(args=(file,index,intloud,truepeak,samplepeak))

    #This function is invoked by the Publisher.
    def UpdateDisplay(self, msg):
       t = msg.data 
       file = t[0]
       summary = t[1]
       i = t[2]
       self.ProcessSummary(file, summary, i)  
       item = self.list.GetItem(i,2)
       gauge = item.GetWindow()
       gauge.SetValue(50)
       self.fileList[file][1] = True

    #Display information from the threads in the main frame.
    def ProcessSummary(self, file, summary, i):
      loudnessRange = re.search(r'LRA:\s(.+?) LU', summary)
      if loudnessRange is not None:
         LRA = loudnessRange.group(1)
      else:
         LRA = "n/a"
      self.list.SetStringItem(i,7,LRA)         
      self.fileList[file][6] = LRA

      intloud = re.search(r'I:\s(.+?) LUFS', summary)
      if intloud is not None:
         IL = intloud.group(1)
      else:
         IL = "n/a"
      self.list.SetStringItem(i,4,IL)
      self.fileList[file][3] = IL

      truePeak = re.search(r'True peak:\s+Peak:\s(.+?) dBFS', summary)
      if truePeak is not None:
          TP = truePeak.group(1)
      else:
          TP = "n/a"
      self.list.SetStringItem(i,5,TP)
      self.fileList[file][4] = TP

      samplePeak = re.search(r'Sample peak:\s+Peak:\s(.+?) dBFS', summary)
      if samplePeak is not None:
          SP = samplePeak.group(1)
      else:
          SP = "n/a"
      self.list.SetStringItem(i,6,SP)
      self.fileList[file][5] = SP


app = wx.App()
MainWindow(None, -1, 'Leveler')
app.MainLoop()

【问题讨论】:

  • 你看过这个答案了吗? *.com/a/18495032/566035 我个人也使用wxpython.org/docs/api/wx.lib.delayedresult-module.html 做了类似的事情。延迟结果也不错。
  • 嗨 otterb,我确实看到了堆栈溢出帖子,并且实际上尝试将其中一些解决方案集成到我自己的代码中......但非常感谢您提供指向延迟结果模块的链接!我一定会检查出来,让你知道它是怎么回事。只是为了我自己的安心:我是否正确地假设线程的间歇性和任意性故障 不是 wxPython GUI 的一个不可避免的属性?我正在构建一个将交付给相当多用户的产品,因此稳健性至关重要。谢谢!
  • 至少根据我自己的经验,delayedresult 效果很好并且很容易实现(对我来说)。我相信 python 和 wxpython(我使用的是 2.8 而不是 3)都非常成熟,所以它应该是稳定的。 wxpython 3 相对较新(我猜还是测试版),我没有太多经验。
  • 再次感谢!
  • 您好 otterb:您能提供延迟结果模块的示例用法吗?我已经得到了类似“startWorker(self.OnAnalyze,self.AnalysisWorker,wargs =(file,))”的东西,其中self.OnAnalyze是当前函数,self.AnalysisWorker是调用线程的函数,给定“文件”参数。我已将 self.AnalysisWorker 简化为单个打印语句,并且它似乎在无限循环。我是否指定了错误的消费者?线程实际上在哪里被调用,数据以什么形式返回到 GUI?谢谢!

标签: python thread-safety wxpython python-multithreading


【解决方案1】:

这是从 2.8 系列的 wxPython 演示中获取的延迟结果示例代码。我修改了这段代码以满足我的需要。

顺便说一下,wxPython demo 是学习 wx 的宝贵资源。我强烈推荐。通过演示,wxPython 学习起来很有趣!

似乎第 3 版现已正式发布,因此稳定。但是如果你使用的是旧版本,你可以在这里找到演示:http://sourceforge.net/projects/wxpython/files/wxPython/

import wx
import wx.lib.delayedresult as delayedresult


class FrameSimpleDelayedBase(wx.Frame):
    def __init__(self, *args, **kwds):
        wx.Frame.__init__(self, *args, **kwds)
        pnl = wx.Panel(self)
        self.checkboxUseDelayed = wx.CheckBox(pnl, -1, "Using delayedresult")
        self.buttonGet = wx.Button(pnl, -1, "Get")
        self.buttonAbort = wx.Button(pnl, -1, "Abort")
        self.slider = wx.Slider(pnl, -1, 0, 0, 10, size=(100,-1),
                                style=wx.SL_HORIZONTAL|wx.SL_AUTOTICKS)
        self.textCtrlResult = wx.TextCtrl(pnl, -1, "", style=wx.TE_READONLY)

        self.checkboxUseDelayed.SetValue(1)
        self.checkboxUseDelayed.Enable(False)
        self.buttonAbort.Enable(False)

        vsizer = wx.BoxSizer(wx.VERTICAL)
        hsizer = wx.BoxSizer(wx.HORIZONTAL)
        vsizer.Add(self.checkboxUseDelayed, 0, wx.ALL, 10)
        hsizer.Add(self.buttonGet, 0, wx.ALL, 5)
        hsizer.Add(self.buttonAbort, 0, wx.ALL, 5)
        hsizer.Add(self.slider, 0, wx.ALL, 5)
        hsizer.Add(self.textCtrlResult, 0, wx.ALL, 5)
        vsizer.Add(hsizer, 0, wx.ALL, 5)
        pnl.SetSizer(vsizer)
        vsizer.SetSizeHints(self)

        self.Bind(wx.EVT_BUTTON, self.handleGet, self.buttonGet)
        self.Bind(wx.EVT_BUTTON, self.handleAbort, self.buttonAbort)




class FrameSimpleDelayed(FrameSimpleDelayedBase):
    """This demos simplistic use of delayedresult module."""

    def __init__(self, *args, **kwargs):
        FrameSimpleDelayedBase.__init__(self, *args, **kwargs)
        self.jobID = 0
        self.abortEvent = delayedresult.AbortEvent()
        self.Bind(wx.EVT_CLOSE, self.handleClose)

    def setLog(self, log):
        self.log = log

    def handleClose(self, event):
        """Only needed because in demo, closing the window does not kill the 
        app, so worker thread continues and sends result to dead frame; normally
        your app would exit so this would not happen."""
        if self.buttonAbort.IsEnabled():
            self.log( "Exiting: Aborting job %s" % self.jobID )
            self.abortEvent.set()
        self.Destroy()

    def handleGet(self, event): 
        """Compute result in separate thread, doesn't affect GUI response."""
        self.buttonGet.Enable(False)
        self.buttonAbort.Enable(True)
        self.abortEvent.clear()
        self.jobID += 1

        self.log( "Starting job %s in producer thread: GUI remains responsive"
                  % self.jobID )
        delayedresult.startWorker(self._resultConsumer, self._resultProducer, 
                                  wargs=(self.jobID,self.abortEvent), jobID=self.jobID)


    def _resultProducer(self, jobID, abortEvent):
        """Pretend to be a complex worker function or something that takes 
        long time to run due to network access etc. GUI will freeze if this 
        method is not called in separate thread."""
        import time
        count = 0
        while not abortEvent() and count < 50:
            time.sleep(0.1)
            count += 1
        return jobID


    def handleAbort(self, event): 
        """Abort the result computation."""
        self.log( "Aborting result for job %s" % self.jobID )
        self.buttonGet.Enable(True)
        self.buttonAbort.Enable(False)
        self.abortEvent.set()


    def _resultConsumer(self, delayedResult):
        jobID = delayedResult.getJobID()
        assert jobID == self.jobID
        try:
            result = delayedResult.get()
        except Exception, exc:
            self.log( "Result for job %s raised exception: %s" % (jobID, exc) )
            return

        # output result
        self.log( "Got result for job %s: %s" % (jobID, result) )
        self.textCtrlResult.SetValue(str(result))

        # get ready for next job:
        self.buttonGet.Enable(True)
        self.buttonAbort.Enable(False)


class FrameSimpleDirect(FrameSimpleDelayedBase):
    """This does not use delayedresult so the GUI will freeze while
    the GET is taking place."""

    def __init__(self, *args, **kwargs):
        self.jobID = 1
        FrameSimpleDelayedBase.__init__(self, *args, **kwargs)
        self.checkboxUseDelayed.SetValue(False)

    def setLog(self, log):
        self.log = log

    def handleGet(self, event): 
        """Use delayedresult, this will compute result in separate
        thread, and will affect GUI response because a thread is not
        used."""
        self.buttonGet.Enable(False)
        self.buttonAbort.Enable(True)

        self.log( "Doing job %s without delayedresult (same as GUI thread): GUI hangs (for a while)" % self.jobID )
        result = self._resultProducer(self.jobID)
        self._resultConsumer( result )

    def _resultProducer(self, jobID):
        """Pretend to be a complex worker function or something that takes 
        long time to run due to network access etc. GUI will freeze if this 
        method is not called in separate thread."""
        import time
        time.sleep(5)
        return jobID

    def handleAbort(self, event):
        """can never be called"""
        pass

    def _resultConsumer(self, result):
        # output result
        self.log( "Got result for job %s: %s" % (self.jobID, result) )
        self.textCtrlResult.SetValue(str(result))

        # get ready for next job:
        self.buttonGet.Enable(True)
        self.buttonAbort.Enable(False)
        self.jobID += 1


#---------------------------------------------------------------------------
#---------------------------------------------------------------------------

class TestPanel(wx.Panel):
    def __init__(self, parent, log):
        self.log = log
        wx.Panel.__init__(self, parent, -1)

        vsizer = wx.BoxSizer(wx.VERTICAL)
        b = wx.Button(self, -1, "Long-running function in separate thread")
        vsizer.Add(b, 0, wx.ALL, 5)
        self.Bind(wx.EVT_BUTTON, self.OnButton1, b)

        b = wx.Button(self, -1, "Long-running function in GUI thread")
        vsizer.Add(b, 0, wx.ALL, 5)
        self.Bind(wx.EVT_BUTTON, self.OnButton2, b)

        bdr = wx.BoxSizer()
        bdr.Add(vsizer, 0, wx.ALL, 50)
        self.SetSizer(bdr)
        self.Layout()

    def OnButton1(self, evt):
        frame = FrameSimpleDelayed(self, title="Long-running function in separate thread")
        frame.setLog(self.log.WriteText)
        frame.Show()

    def OnButton2(self, evt):
        frame = FrameSimpleDirect(self, title="Long-running function in GUI thread")
        frame.setLog(self.log.WriteText)
        frame.Show()

【讨论】:

  • 非常感谢 otterb - 这解决了我的问题! :) 非常感谢您的帮助。
  • 另外,我一定会研究 wxPython 演示;感谢您的提示!
最近更新 更多