【问题标题】:Receiving socket data in one thread, writing the data in another -- python在一个线程中接收套接字数据,在另一个线程中写入数据——python
【发布时间】:2016-11-08 23:29:26
【问题描述】:

我目前正在编写一个 Python 程序来从 TCP/UDP 套接字接收数据,然后将数据写入文件。现在,我的程序通过将每个数据报写入文件来进行 I/O 绑定(我正在为非常大的文件执行此操作,因此速度很慢)。考虑到这一点,我决定尝试在一个线程中从套接字接收数据,然后在另一个线程中写入该数据。到目前为止,我已经提出了以下粗略的草案。目前,它只将单个数据块(512 字节)写入文件。

f = open("t1.txt","wb")
def write_to_file(data):
    f.write(data)

def recv_data():
    dataChunk, addr = sock.recvfrom(buf) #THIS IS THE DATA THAT GETS WRITTEN
    try:
        w = threading.Thread(target = write_to_file, args = (dataChunk,))
        threads.append(w)
        w.start()
        while(dataChunk):
            sock.settimeout(4)
            dataChunk,addr = sock.recvfrom(buf)
    except socket.timeout:
        print "Timeout"
        sock.close()
        f.close()

threads = []
r = threading.Thread(target=recv_data)
threads.append(r)
r.start()

我想我做错了什么,我只是不确定使用线程的最佳方法是什么。现在,我的问题是我必须在创建线程时提供一个参数,但是该参数的值没有正确更改以反映进来的新数据块。但是,如果我把行 w=threading.Thread(target=write_to_file, arg=(dataChunk,))while(dataChunk) 循环中,我不是每次迭代都创建一个新线程吗?

另外,对于它的价值,这只是我使用单独的接收和写入线程的小型概念验证。这不是最终应该使用这个概念的更大的程序。

【问题讨论】:

    标签: python multithreading sockets io


    【解决方案1】:

    您需要有一个缓冲区供读取线程写入,写入线程从中读取。 deque from the collections module 是完美的,因为它允许从任一侧追加/弹出而不会降低性能。

    所以,不要将dataChunk 传递给您的线程,而是传递给缓冲区。

    import collections  # for the buffer
    import time  # to ease polling
    import threading 
    
    def write_to_file(path, buffer, terminate_signal):
        with open(path, 'wb') as out_file:  # close file automatically on exit
          while not terminate_signal.is_set() or buffer:  # go on until end is signaled
            try:
              data = buffer.pop()  # pop from RIGHT end of buffer
            except IndexError:
              time.sleep(0.5)  # wait for new data
            else:
              out_file.write(data)  # write a chunk
    
    def read_from_socket(sock, buffer, terminate_signal):
        sock.settimeout(4)
        try:
          while True:
            data, _ = sock.recvfrom(buf)
            buffer.appendleft(data)  # append to LEFT of buffer
        except socket.timeout:
          print "Timeout"
          terminate_signal.set()  # signal writer that we are done
          sock.close()
    
    buffer = collections.deque()  # buffer for reading/writing
    terminate_signal = threading.Event()  # shared signal
    threads = [
      threading.Thread(target=read_from_socket, kwargs=dict(
        sock=sock,
        buffer=buffer,
        terminate_signal=terminate_signal
      )),
      threading.Thread(target= write_to_file, kwargs=dict(
        path="t1.txt",
        buffer=buffer,
        terminate_signal=terminate_signal
      ))
    ]
    for t in threads:  # start both threads
      t.start()
    for t in threads:  # wait for both threads to finish
      t.join()
    

    【讨论】:

    • 两件事:1.我假设你需要在某个地方启动每个线程(大概像for t in threads: t.start()和2.它似乎仍然没有工作。现在,文件只是226 字节。它似乎也只获取最后 226 字节的数据,而不是一开始的任何内容。
    • @Swoldier 您必须启动,然后等待两个线程。我已经添加了相关代码。
    • @Swoldier 我只能提供一个粗略的草稿 - 上面的代码实际上并没有 运行 而且它不能因为例如 bugsock.recvfrom(buf) 中未定义.
    • 搞定了!我弄乱了write_to_file() 循环中的缩进。我将else 附加到while not 而不是try...except 非常感谢!现在接受答案。
    猜你喜欢
    • 2016-12-19
    • 1970-01-01
    • 2023-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多