【问题标题】:python : multithreaded unix socket transfers are very slowpython:多线程unix套接字传输非常慢
【发布时间】:2018-02-28 18:19:10
【问题描述】:

我正在尝试为多个进程之间的通信创建一种方式,以使用 UNIX 域套接字传输数据。有 2 个程序(progserv.py 和 progclient.py),每个程序都有 N 个(这个例子有 4 个)线程,每个程序都与另一个进程上的相应程序通信。 progclient.py 发送“pickle”形式的 datetime.datetime 对象。 progserv.py 接收数据并确定通信延迟。这里的想法是找出进程间数据传输中的通信延迟。

我面临的问题是,随着 N 值的增加,延迟会从不到 1 毫秒增加到有时超过 250 毫秒。延迟时间并不一致,一直波动很大。

progserv.py 的代码:

    import string
    import random
    import threading
    import time
    import os
    import traceback
    import socket
    import pickle

    from datetime import datetime

    def worker(num):
            addr = './tmp'+str(num)
            logstr = "SERVER"+str(num)+":"
            if os.path.exists(addr):
                    os.unlink(addr)
        try:
                msock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
                msock.bind(addr)
                msock.listen(1)
                sock,_ = msock.accept()
                sock.setblocking(0)
                msock.close()
        except:
                print (logstr,"Error opening the socket for listening.Trace:", traceback.format_exc())
                return

        print (logstr,"Socket connection with client done")
        buf=list()
        count=1
        while True:
                try:
                        data = sock.recv(1500)
                        if len(data)<=0:
                                print (logstr,"Peer end probably closed down the connection")
                                break
                        else:
                                try:
                                        d=(datetime.now()-pickle.loads(data)).total_seconds()*1000
                                        buf.append((count,d))
                                        count+=1
                                        #print (logstr,"Received ",str(count-1),"packets")
                                except:
                                        pass
                except socket.error:
                        if len(buf)>0:
                                c,data=buf.pop(0)
                                print (logstr,"COUNTER=",c," DELTA=",data,"msec")
                except:
                        print (logstr,"Exception in receiving data. Trace:", traceback.format_exc())
                        break

        if isinstance(sock, socket.socket):
                sock.close()
        if isinstance(msock, socket.socket):
                msock.close()


def main():

        server_count=5
        threads=list()
        for i in range(server_count):
                t=threading.Thread(target=worker, args=(i,))
                threads.append(t)
                t.start()
                print ("MAIN: Started thread index",i)
                time.sleep(0.1)

        for t in threads:
                t.join()



if __name__=='__main__':
        main()

progclient.py 的代码:

import traceback, os, time, random, socket
import threading, pickle

from datetime import datetime

def worker(num):
        addr = './tmp'+str(num)
        logstr="CLIENT"+str(num)+":"

        try:
                sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
                sock.connect(addr)
                sock.setblocking(0)
        except:
                print (logstr,"Error opening the socket for connection. Trace:", traceback.format_exc())
                return

        while True:
                try:
                        d=datetime.now()
                        bytes=sock.send(pickle.dumps(d))
                        if bytes<=0:
                                print (logstr,"Peer end probably closed down the connection")
                                break
                        else:
                                print (logstr,"Sent",bytes,"bytes")
                except socket.error:
                        pass
                except:
                        print (logstr,"Exception during sending. Trace:", traceback.format_exc())
                        break

                time.sleep(1)

        if isinstance(sock, socket.socket):
                sock.close()


def main():

        client_count=5
        threads=list()
        for i in range(client_count):
                t=threading.Thread(target=worker, args=(i,))
                threads.append(t)
                t.start()
                print ("CLIENT : Started thread index",i)
                time.sleep(0.1)

        for t in threads:
                t.join()

if __name__=='__main__':
        main()

在每个进程上运行 4 个线程的延迟时间:

MAIN: Started thread index 0
MAIN: Started thread index 1
MAIN: Started thread index 2
MAIN: Started thread index 3
MAIN: Started thread index 4
SERVER0: Socket connection with client done
SERVER0: COUNTER= 1  DELTA= 0.7969999999999999 msec
SERVER1: Socket connection with client done
SERVER1: COUNTER= 1  DELTA= 0.28200000000000003 msec
SERVER2: Socket connection with client done
SERVER3: Socket connection with client done
SERVER2: COUNTER= 1  DELTA= 258.839 msec
SERVER4: Socket connection with client done
SERVER3: COUNTER= 1  DELTA= 238.006 msec
SERVER4: COUNTER= 1  DELTA= 477.099 msec
SERVER0: COUNTER= 2  DELTA= 0.175 msec
SERVER2: COUNTER= 2  DELTA= 0.174 msec
SERVER1: COUNTER= 2  DELTA= 108.28 msec
SERVER4: COUNTER= 2  DELTA= 35.966 msec
SERVER3: COUNTER= 2  DELTA= 146.63899999999998 msec
SERVER1: COUNTER= 3  DELTA= 76.92 msec
SERVER2: COUNTER= 3  DELTA= 116.096 msec
SERVER0: COUNTER= 3  DELTA= 317.726 msec
SERVER3: COUNTER= 3  DELTA= 15.792 msec
SERVER4: COUNTER= 3  DELTA= 294.577 msec
SERVER0: COUNTER= 4  DELTA= 6.123 msec
SERVER1: COUNTER= 4  DELTA= 95.69999999999999 msec
SERVER2: COUNTER= 4  DELTA= 0.434 msec
SERVER4: COUNTER= 4  DELTA= 63.32599999999999 msec
SERVER3: COUNTER= 4  DELTA= 322.351 msec
SERVER0: COUNTER= 5  DELTA= 185.03300000000002 msec
SERVER1: COUNTER= 5  DELTA= 84.869 msec
SERVER2: COUNTER= 5  DELTA= 33.532 msec
SERVER3: COUNTER= 5  DELTA= 101.01400000000001 msec
SERVER4: COUNTER= 5  DELTA= 152.035 msec
SERVER0: COUNTER= 6  DELTA= 33.533 msec
SERVER2: COUNTER= 6  DELTA= 0.501 msec
SERVER1: COUNTER= 6  DELTA= 101.59400000000001 msec
SERVER3: COUNTER= 6  DELTA= 91.584 msec
SERVER4: COUNTER= 6  DELTA= 32.779999999999994 msec
SERVER1: COUNTER= 7  DELTA= 42.009 msec
SERVER0: COUNTER= 7  DELTA= 142.716 msec
SERVER2: COUNTER= 7  DELTA= 137.214 msec
SERVER3: COUNTER= 7  DELTA= 124.629 msec
SERVER4: COUNTER= 7  DELTA= 190.025 msec
SERVER0: COUNTER= 8  DELTA= 0.9859999999999999 msec
SERVER1: COUNTER= 8  DELTA= 160.696 msec
SERVER2: COUNTER= 8  DELTA= 60.282000000000004 msec
SERVER4: COUNTER= 8  DELTA= 0.617 msec
SERVER3: COUNTER= 8  DELTA= 169.09 msec
SERVER0: COUNTER= 9  DELTA= 129.679 msec
SERVER1: COUNTER= 9  DELTA= 107.504 msec
SERVER2: COUNTER= 9  DELTA= 8.319999999999999 msec
SERVER3: COUNTER= 9  DELTA= 97.829 msec
SERVER4: COUNTER= 9  DELTA= 262.472 msec
SERVER0: COUNTER= 10  DELTA= 188.364 msec
SERVER2: COUNTER= 10  DELTA= 101.07 msec
SERVER3: COUNTER= 10  DELTA= 116.54100000000001 msec
SERVER1: COUNTER= 10  DELTA= 318.44899999999996 msec
SERVER4: COUNTER= 10  DELTA= 16.676 msec
SERVER0: COUNTER= 11  DELTA= 7.071 msec
SERVER1: COUNTER= 11  DELTA= 60.224 msec
SERVER2: COUNTER= 11  DELTA= 101.07199999999999 msec
SERVER3: COUNTER= 11  DELTA= 45.32 msec
SERVER4: COUNTER= 11  DELTA= 11.719999999999999 msec
SERVER0: COUNTER= 12  DELTA= 125.88199999999999 msec
SERVER1: COUNTER= 12  DELTA= 75.607 msec
SERVER2: COUNTER= 12  DELTA= 84.648 msec
SERVER3: COUNTER= 12  DELTA= 44.051 msec
SERVER4: COUNTER= 12  DELTA= 290.6 msec
SERVER0: COUNTER= 13  DELTA= 0.306 msec
SERVER1: COUNTER= 13  DELTA= 58.865 msec
SERVER2: COUNTER= 13  DELTA= 101.007 msec
SERVER3: COUNTER= 13  DELTA= 12.927 msec
SERVER4: COUNTER= 13  DELTA= 212.348 msec
SERVER0: COUNTER= 14  DELTA= 53.222 msec
SERVER1: COUNTER= 14  DELTA= 63.087 msec
SERVER2: COUNTER= 14  DELTA= 112.285 msec
SERVER4: COUNTER= 14  DELTA= 0.192 msec
SERVER3: COUNTER= 14  DELTA= 111.608 msec
SERVER0: COUNTER= 15  DELTA= 41.897999999999996 msec
SERVER2: COUNTER= 15  DELTA= 0.514 msec
SERVER3: COUNTER= 15  DELTA= 30.408 msec
SERVER1: COUNTER= 15  DELTA= 261.84499999999997 msec
SERVER4: COUNTER= 15  DELTA= 69.785 msec
SERVER0: COUNTER= 16  DELTA= 88.022 msec
SERVER2: COUNTER= 16  DELTA= 19.599999999999998 msec
SERVER1: COUNTER= 16  DELTA= 201.69500000000002 msec
SERVER3: COUNTER= 16  DELTA= 52.805 msec
SERVER4: COUNTER= 16  DELTA= 8.53 msec
SERVER0: COUNTER= 17  DELTA= 0.324 msec
SERVER1: COUNTER= 17  DELTA= 159.975 msec
SERVER2: COUNTER= 17  DELTA= 58.62 msec
SERVER4: COUNTER= 17  DELTA= 0.462 msec
SERVER3: COUNTER= 17  DELTA= 100.717 msec
SERVER0: COUNTER= 18  DELTA= 68.68100000000001 msec
SERVER1: COUNTER= 18  DELTA= 8.701 msec
SERVER2: COUNTER= 18  DELTA= 16.889999999999997 msec
SERVER3: COUNTER= 18  DELTA= 156.637 msec
SERVER4: COUNTER= 18  DELTA= 56.296 msec
SERVER0: COUNTER= 19  DELTA= 29.073 msec
SERVER2: COUNTER= 19  DELTA= 30.326999999999998 msec
SERVER3: COUNTER= 19  DELTA= 0.155 msec
SERVER1: COUNTER= 19  DELTA= 251.63200000000003 msec
SERVER4: COUNTER= 19  DELTA= 5.305 msec

当尝试仅使用 1 个线程进行相同的传输时,我会看到大约 1 毫秒的延迟。它随着每个进程上使用的线程数不规律地增加。

该平台是基于 Jessie 的 Raspberry Pi。有人可以解释一下这种行为吗?

【问题讨论】:

    标签: python-3.x sockets unix-socket python-sockets


    【解决方案1】:

    这是Python's Global Interpreter Lock (GIL)造成的,只能限制并发处理速度。来源:https://jeffknupp.com/blog/2013/06/30/pythons-hardest-problem-revisited/

    为了获得更快的机会或完成一项任务,您将不得不创建另一个进程(而不是线程),或者只是转移到像 c++ 这样的语言,它可以在线程限制上没有“锁定”的情况下启用并发线程和处理。

    【讨论】:

    • 这几乎是一个仅链接的答案。请问问自己:“如果其中的链接失效,我的答案有多大价值?”。有一个链接很好,但您也应该解释或总结链接中与问题直接相关的材料。
    • 您好,感谢您的回答。我开始尝试使用 multiprocessing.Process。那里的速度更快,因为每个进程都在不同的进程空间下,因此是一个新的 GIL。但是,这些数字对我来说仍然不够好。到目前为止,基于 Unix 的套接字是我最好的选择,因为 AF_INET 套接字将占用网络堆栈的开销。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-02
    • 2013-05-03
    • 2012-06-13
    • 2016-09-26
    相关资源
    最近更新 更多