【问题标题】:Can I call a thread recurrently from within a thread?我可以从一个线程中反复调用一个线程吗?
【发布时间】:2021-03-04 01:35:08
【问题描述】:

我正在尝试使用 queue 将样本从线程 A(“采集”)传输到线程 B(“P300”),但我无法读取线程 B 中的任何数据,尽管样本 在线程 A 中分配。从我的输出来看,我认为我的线程 B 在我的线程 A 开始放入数据之前正在匆忙测试。

在下面查看我的代码结构的近似值:

import threading
import queue
from queue import Empty
import numpy as np
import warnings
warnings.filterwarnings("error")

class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
        threading.Thread.__init__(self)
        self.stopQ2 = stopQ2
        self.stopQ1 = stopQ1
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.stopQ1, self.stopQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ, stopQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ
        self.stopQ = stopQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ, self.stopQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
StopQ1 = queue.Queue()
StopQ2 = queue.Queue()
FeatQ1 = queue.Queue()
StopQ1.put(0)
StopQ2.put(0)
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, StopQ1, StopQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1, StopQ1)

def Acquisition(inlet, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
    i = 0
    print('Starting...')
    while i<1250: #i is the number of samples
        sample, timestamp = inlet.pull_sample() #samples coming in @ 250Hz
        ##Normalization, filtering##
        threadLock.acquire()
        dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]]) #I only need the last 250 samples
        threadLock.release()
        i += 1

def P300fun(dataInQ, featureQ, stopQ):
    p300sample = []
    p300timestamp = []
    print(f"Is DataInQ size true? {DataOutQ1.qsize()}")
    print("Is dataInQ emtpy?", DataOutQ1.empty())
    while dataInQ.qsize(): #or while not dataqueue.empty():
        try:
            print("DataInQ has data")
            ss, ts = dataInQ.get(0) 
            print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
        except Empty:
            return
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")

然后输出:

Looking for an EEG stream...
Connected!

Is DataInQ size true? 0
Starting...
Is dataInQ emtpy? True
Thread Finished

>>>DONE<<<

在我的研究中,question 1 似乎出现了类似的问题,但似乎问题出在图像处理部分(他们使用了multiprocessing 包)。 Question 2 似乎有一个并发问题,这可能是我的问题,但我不知道如何将它翻译成我的问题,如果我错了,请告诉我)。 Question 3 只是参数顺序有问题,所以我认为这里不适用。

我应该怎么做?我应该从线程 A 中反复调用线程 B 吗?我需要线程 B 上的循环或延迟吗? .join() 部分可能有问题吗?我需要在不久的将来添加更多线程,所以最好先弄清楚如何只使用两个线程......

感谢所有帮助!

【问题讨论】:

    标签: python multithreading concurrency queue


    【解决方案1】:

    作为一个菜鸟可能会很棘手......所以我会回答我自己的问题以帮助其他可能遇到这个问题的初学者。

    好吧,首先要做的事情是:不,不可能从一个线程内循环调用一个线程,因为每个线程只能被调用一次。

    但是有一种方法可以防止线程结束,让它们等待允许它们继续的触发器。经过更多研究,我遇到了this question,它向我展示了一种为线程创建事件的方法。可以在here 找到文档。而且很简单:事件对象的行为类似于标志,可以是set()(表示True)或clear()(表示False,这是原始值)。要测试一个事件,可以使用is_set() 方法来解决布尔问题,或者使用wait() 方法来代替计时器。就我而言,它为我节省了一些我将要使用的队列:

    import threading
    import queue
    from queue import Empty
    import numpy as np
    
    
    class AcqThread(threading.Thread):
        def __init__(self, dataOutQ1, dataOutQ2, saveQ):
            threading.Thread.__init__(self)
            self.dataOutQ2 = dataOutQ2
            self.dataOutQ1 = dataOutQ1
            self.saveQ = saveQ
    
        def run(self):
            Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)
    
    class P300Thread(threading.Thread):
        def __init__(self, dataInQ, featureQ):
            threading.Thread.__init__(self)
            self.dataInQ = dataInQ
            self.featureQ = featureQ
    
        def run(self):
            P300fun(self.dataInQ, self.featureQ)
    
    threadLock = threading.Lock()
    SaveQ = queue.Queue()
    DataOutQ1 = queue.Queue()
    DataOutQ2 = queue.Queue()
    FeatQ1 = queue.Queue()
    FeatQ2 = queue.Queue()
    
    #NEW:: initializes Events
    E = threading.Event()
    EP300 = threading.Event()
    #
    AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
    P300Th = P300Thread(DataOutQ1, FeatQ1)
    

    它允许我“周期性地”“调用”线程 B,因为它在活动时保持我的第一个(因为事件 E)并且仅在设置事件 EP300 时才进入处理部分。然后,EP300在该过程完成后被清除:

    def Acquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
        i = 0
        print('Starting...')
        while i<1250:
            sample, timestamp = inlet.pull_sample()
            ##Normalization, filtering##
            if _condition_:
                threadLock.acquire()
                dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
                threadLock.release()
                EP300.set() #NEW:: allows the P300 function to collect data from queue
            i += 1
        E.set() #NEW:: flaggs end data collection
    
    def P300fun(dataInQ, featureQ):
        p300sample = []
        p300timestamp = []
        while not E.is_set(): #NEW:: loop until collection is ended
            if EP300.is_set(): #NEW:: activated when Event is triggered
                while dataInQ.qsize():
                    try:
                        print("DataInQ has data")
                        ss, ts = dataInQ.get(0) 
                        print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
                    except Empty:
                        return
            if not E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
                EP300.clear()
        print('Thread Finished')
    
    if __name__ == '__main__':
        print('Looking for an EEG stream...')
        streams = resolve_stream('type', 'EEG')
        inlet = StreamInlet(streams[0])
        print('Connected!\n')
    
        AcqTh.start()
        P300Th.start()
    
        AcqTh.join()
        P300Th.join()
    
        print("\n\n>>>DONE<<<\n\n")
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-11-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-07-14
      • 2015-09-11
      • 1970-01-01
      相关资源
      最近更新 更多