【问题标题】:How to implement a dynamic amount of concurrent threads?如何实现动态数量的并发线程?
【发布时间】:2017-03-15 14:04:11
【问题描述】:

我正在启动并发线程做一些事情:

concurrent = 10
q = Queue(concurrent * 2)
for j in range(concurrent):
    t = threading.Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    # process each line and assign it to an available thread
    for line in call_file:
        q.put(line)
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

同时我有一个明显的线程计数时间:

def printit():
    threading.Timer(1.0, printit).start()
    print current_status

printit()

我想增加(或减少)主进程的并发线程数量,比如说每分钟。我可以在时间线程中做一个时间计数器,让它每分钟做一次,但是如何改变主进程中的并发线程数?

是否有可能(如果是的话如何)这样做?

【问题讨论】:

    标签: python multithreading python-multithreading


    【解决方案1】:

    这是我的工人:

    def UpdateProcesses(start,processnumber,CachesThatRequireCalculating,CachesThatAreBeingCalculated,CacheDict,CacheLock,IdleLock,FileDictionary,MetaDataDict,CacheIndexDict):
    NewPool()
    while start[processnumber]:
        IdleLock.wait()
        while len(CachesThatRequireCalculating)>0 and start[processnumber] == True:
            CacheLock.acquire()
            try:
                cacheCode = CachesThatRequireCalculating[0] # The list can be empty if an other process takes the last item during the CacheLock 
                CachesThatRequireCalculating.remove(cacheCode)
                print cacheCode,"starts processing by",processnumber,"process"
            except:
                CacheLock.release()
            else:
                CacheLock.release()
                CachesThatAreBeingCalculated.append(cacheCode[:3])
                Array,b,f = TIPP.LoadArray(FileDictionary[cacheCode[:2]])#opens the dask array
                Array = ((Array[:,:,CacheIndexDict[cacheCode[:2]][cacheCode[2]]:CacheIndexDict[cacheCode[:2]][cacheCode[2]+1]].compute()/2.**(MetaDataDict[cacheCode[:2]]["Bit Depth"])*255.).astype(np.uint16)).transpose([1,0,2]) #slices and calculates the array
                f.close() #close the file
                if CachesThatAreBeingCalculated.count(cacheCode[:3]) != 0: #if not, this cache is not needed annymore (the cacheCode is removed bij wavelengthchange)
                    CachesThatAreBeingCalculated.remove(cacheCode[:3])
                    try: #If the first time the object if not aivalable try a second time
                        CacheDict[cacheCode[:3]] = Array
                    except:
                        CacheDict[cacheCode[:3]] = Array
                    print cacheCode,"done processing by",processnumber,"process"
        if start[processnumber]:
            IdleLock.clear()
    

    这就是我启动它们的方式:

        self.ProcessLst = [] #list with all the processes who calculate the caches
        for processnumber in range(min(NumberOfMaxProcess,self.processes)):
            self.ProcessTerminateLst.append(True)
        for processnumber in range(min(NumberOfMaxProcess,self.processes)):
            self.ProcessLst.append(process.Process(target=Proc.UpdateProcesses,args=(self.ProcessTerminateLst,processnumber,self.CachesThatRequireCalculating,self.CachesThatAreBeingCalculated,self.CacheDict,self.CacheLock,self.IdleLock,self.FileDictionary,self.MetaDataDict,self.CacheIndexDict,)))
            self.ProcessLst[-1].daemon = True
            self.ProcessLst[-1].start()
    

    我这样关闭它们:

        for i in range(len(self.ProcessLst)): #For both while loops in the processes self.ProcessTerminateLst[i] must be True. So or the process is now ready to be terminad or is still in idle mode.
            self.ProcessTerminateLst[i] = False
    
        self.IdleLock.set() #Makes sure no process is in Idle and all are ready to be terminated
    

    【讨论】:

      【解决方案2】:

      我会使用游泳池。一个池具有同时使用的最大线程数,但您可以应用 inf 个作业。他们留在等待列表中,直到线程可用。我认为您无法更改池中当前进程的数量。

      【讨论】:

      • 我可以在我的工作中使用 sleep 并减少或增加该值以获得更多或更少的工作,但我想要的是更改并发线程的数量(因为每个线程都是并发连接,我正在为服务器编写负载模拟器,我需要模拟缓存的预热),所以这些技巧在我的情况下不起作用
      • 我有一个类似的情况,我使用管理器来管理进程之间的 lst 并在我的线程中使用 while 循环。所以我只是启动了 x 个线程,如果我想关闭它们,我在 while 循环中将变量设为 True,然后线程退出。此变量位于管理器列表中,因此您可以在主线程中更改它。我可以向您展示我的代码,让您了解我是如何解决它的,但我是这个论坛的新手,不知道如何解决。
      • hmmm...看起来你只能减少,不能增加...我使用全局变量,所以我可以在线程之间共享状态^^
      • increase 只是启动一个新线程并将一个布尔值附加到管理器中的 stoplst
      • 我的代码要长得多,但我希望这能让您了解您可以做什么。如果您有更多问题,我很乐意回答。
      猜你喜欢
      • 1970-01-01
      • 2012-05-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-09-12
      • 1970-01-01
      相关资源
      最近更新 更多