【发布时间】:2021-10-20 18:10:01
【问题描述】:
我一直在尝试编写一个模拟广播网络流的 python 程序,但我不太确定如何正确地做到这一点。为此,我想让程序在没有连接客户端的情况下持续“播放”音乐,因此它会模拟一个“实时”收音机,您可以在其中连接并收听正在播放的任何内容。
我现在拥有的是与 TCP 基本套接字编程的服务器/客户端关系,服务器端有一个应该继续阅读音乐的生产者线程,以及应该将音频帧发送到客户端的按需消费者线程,用 PyAudio 播放。问题可能出在线程之间共享数据的方式上。
首先我尝试使用单个队列,但是当客户端从队列中读取数据时,这些数据会被删除,如果我连接了多个客户端,那将制作音乐跳过一些帧。
然后我尝试创建一个固定数量 (10) 的 Queue 对象,这些对象将用于每个客户端,生产者线程为每个队列提供数据,但每个客户端都会创建一个消费者线程它自己的,并且只能从使用控制变量“分配”给它的队列中读取。这里的问题是:如果有任何队列没有被消费(例如,如果我只有一个客户端连接),Queue.put() 方法将阻塞,因为这些队列已满。如何保持所有队列“运行”和同步,即使它们没有被使用?
这就是我现在的位置,感谢任何建议。我还不是经验丰富的程序员,所以请耐心等待。我认为 Queue 在这种情况下不是推荐的 IPC 方法,但如果有使用方法,请告诉我。
下面是我现在的代码:
server.py
#TCP config omitted
#Producer Thread
def readTheMusics(queue):
#Control variable to keep looping through 2 music files
i=1
while i < 3:
fname = "music" + str(i) + ".wav"
wf = wave.open(fname, 'rb')
data = wf.readframes(CHUNK)
while data:
for k in range (10):
queue[k].put(data)
data = wf.readframes(CHUNK)
wf.close()
i += 1
if i==3:
i=1
#Consumer Thread
def connection(connectionSocket, addr, queue, index):
while True:
data = queue[index-1].get(True)
connectionSocket.send(data)
connectionSocket.close()
def main():
i = 1
#Queue(1) was used to prevent an infinite queue and therefore a memory leak
queueList = [Queue(1) for j in range(10)]
th2 = threading.Thread(target=musicReading, args=(queueList, ))
th2.start()
while True:
connectionSocket, addr = serverSocket.accept()
print("connected - id {}".format(i))
th = threading.Thread(target=connection, args=(connectionSocket, addr, queueList, i))
th.start()
i = i + 1
if __name__ == '__main__':
main()
【问题讨论】:
-
队列不是正确的答案。您的连接线程应该只将当前连接的套接字集存储在列表中。然后,每次从波形文件中读取一个块时,您只需执行
for sock in socketlist:/sock.send(data)。 -
你好,蒂姆!我怎么能以这种方式从读取线程中获取数据?
-
在阅读线程中发送。您不需要连接线程。当
accept返回时,将套接字添加到全局列表中。您需要检查“发送”是否失败,以便从列表中删除已关闭的套接字。 -
谢谢蒂姆。我还是有一些问题,不知道我做得对不对。我已经用 2 个同步客户端对其进行了测试,但音乐仍然不同步。也许
send(data)方法不是即时的,或者其他什么,for 循环卡在那里......有什么想法吗? -
“同步”到底是什么意思?总会有可变的网络延迟。
标签: python multithreading sockets