【发布时间】:2020-11-14 16:39:29
【问题描述】:
首先我将解释我的代码是如何工作的。 我有 3 个相互交互的模块:2 个模块通过套接字连接到单个模块并发送 UDP 帧。单个模块接收 UDP 帧,将它们保存到队列中,然后另一个函数将队列作为输入并进行一些处理。
我正在运行在不同终端中发送 UDP 帧的模块。我想运行接收 UDP 帧的函数和对不同线程中保存的帧进行处理的函数。为此,我使用了线程和队列包。但是我没有设法将所有线程一起运行。它总是卡在第二个线程中,永远不会到达最后一个线程。
这是我的代码:
send_1.py:
import socket
import pickle
import time
def send_frame():
UDP_IP = "127.0.0.1"
UDP_PORT = 5005
MESSAGE = {'x': 0.20, 'y': 0.2, 'z': 0.2}
MESSAGE = pickle.dumps(MESSAGE)
print(type(MESSAGE))
print("UDP target IP:", UDP_IP)
print("UDP target port:", UDP_PORT)
print("message:", MESSAGE)
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
while True:
sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
time.sleep(5)
send_frame()
send_2.py:
import socket
import pickle
import time
def send_frame():
UDP_IP = "127.0.0.1"
UDP_PORT = 5006
# MESSAGE = b"Hello, World!"
MESSAGE = {'x': 2.20, 'y': 2.2, 'z': 2.2}
MESSAGE = pickle.dumps(MESSAGE)
print(type(MESSAGE))
print("UDP target IP:", UDP_IP)
print("UDP target port:", UDP_PORT)
print("message:", MESSAGE)
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
while True:
sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
time.sleep(5)
send_frame()
这是接收帧、将它们保存到队列然后处理它们的代码。
receive.py:
import threading
import queue
import socket
import pickle
import time
class ReceiveData1:
def __init__(self):
pass
def receive_frame(self, q_1):
UDP_IP = "127.0.0.1"
UDP_PORT = 5005
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
data_1 = pickle.loads(data)
print('data_1:', data_1)
ts_1 = time.time()
frame_1 = {'data': data_1, 'timestamp': ts_1}
q_1.put(frame_1)
class ReceiveData2:
def __init__(self):
pass
def receive_frame(self, q_2):
UDP_IP = "127.0.0.1"
UDP_PORT = 5006
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
data_2 = pickle.loads(data)
print('data_2:', data_2)
ts_2 = time.time()
frame_2 = {'data': data_2, 'timestamp': ts_2}
q_2.put(frame_2)
class MatchFrames:
def __init__(self, delta_x, delta_y):
self.delta_x = delta_x
self.delta_y = delta_y
def get_decision(self, queue_1, queue_2):
print('queue_1:', queue_1)
print('queue_2:', queue_2)
frame_1 = queue_1.get()
frame_2 = queue_2.get()
data_1 = frame_1['data']
data_1_ts = frame_1['timestamp']
data_2 = frame_2['data']
data_2_ts = frame_2['timestamp']
decision = 'Unknown'
while time.time() < data_1_ts + 3 and time.time() < data_2_ts + 3:
if (data_2['x'] - self.delta_x <= data_1['x'] <= data_2['x'] + self.delta_x and
data_2['y'] - self.delta_y <= data_1['y'] <= data_2['y'] + self.delta_y):
decision = 'Correct'
break
else:
decision = 'Wrong'
break
print(decision)
if __name__ == '__main__':
threads = []
q_1 = queue.Queue()
rec_1 = ReceiveData1()
q_2 = queue.Queue()
rec_2 = ReceiveData2()
decide = MatchFrames(0.5, 0.5)
t1 = threading.Thread(target=rec_1.receive_frame, args=(q_1,))
t1.daemon = True
threads.append(t1)
t1.start()
t2 = threading.Thread(target=rec_2.receive_frame, args=(q_2,))
t2.daemon = True
threads.append(t2)
t2.start()
t3 = threading.Thread(target=decide.get_decision, args=(q_1, q_2,))
t3.daemon = True
threads.append(t3)
t3.start()
for t in threads:
t.join()
q_1.join()
q_2.join()
据我了解,这可能是因为join():通过运行thread.join(),下一个线程将等到前一个线程完成才能运行,这永远不会发生,因为它卡在while循环中。
有什么建议可以让三个线程一起运行并继续接收 UDP 帧吗?
【问题讨论】:
-
如果你想让线程永远运行,你为什么要加入它?
-
@MartinJames 我加入线程,因为如果我不这样做,它们会运行一次然后停止。
-
在线程代码周围加上一个“while(1){}”。
-
如果我没听错的话,不可能在 while 循环中编写线程代码,因为这将在每次迭代中不断创建和启动线程,这在 Python 中不起作用。跨度>
-
Nah.. 将循环放入线程代码中。
标签: python multithreading parallel-processing message-queue