【发布时间】:2021-08-05 18:10:03
【问题描述】:
我有多达 30 个节点,每个节点能够以高达 1000 条消息/秒的速度发送数据。每条消息可以有 256-512 字节的数据。 每个节点使用唯一的 tcp 端口进行通信。接收到的每个数据都经过预处理、插入数据库和后处理。
以下是我尝试过的观察方法:-
案例 1。使用 asyncio 并在收到数据后立即处理。
async def process_packets(reader, writer, db):
while True:
data = reader.read(4096)
data = pre_process(data)
save_in_db(data)
post_process(data)
writer.close()
观察:- 对于单个数据包,处理通常需要 10-20 毫秒。但是随着数据包频率的增加,tcp 缓冲开始发生,即对 reader.read() 的单个调用会获取多个数据包。 这增加了当前节点以及其他节点的处理。
案例 2。将 asyncio 与推送到队列中的数据和使用此队列的工作线程一起使用。
async def process_packets(reader, writer, q):
while True:
data = reader.read(4096)
q.put(data)
writer.close()
def worker_thread(q, db):
while True:
data = q.get()
data = pre_process(data)
save_in_db(data)
post_process(data)
观察:- 由于在接收数据包时没有进行任何处理,所有节点都能够尽快将数据放入队列中。问题出现在工作线程中,其中 q.get() 随着时间的推移变得非常缓慢。
案例 3。为每个节点创建套接字服务器线程
def server_thread(port, db):
s = socket.socket()
s.bind()
s.listen(1)
while True:
(conn, addr) = s.accept()
while True:
try:
data = conn.recv(4096)
except Exception:
conn.close()
break
data = pre_process(data)
save_in_db(data)
post_process(data)
观察:- 这种情况的好处是每个节点都有专门的线程来接收和处理数据,所以其他线程不受影响。但是在这里我面临socket.recv()返回的多个数据包。这会增加处理时间。
我需要一种方法来尽可能快地处理来自这些节点的数据,让应用程序 24x7 不间断运行。
操作系统 = Ubuntu20.04-lts
系统 = 英特尔 i3 第 8 代,8GB 内存,4 核
【问题讨论】:
-
如果您只期望每条消息 256-512 字节,为什么要接收到 4096 字节的缓冲区?
-
@user207421 从旧代码库中使用。那么将4096减少到1024会加快接收速度?
-
@anishkumar: " 所以将 4096 减少到 1024 会加快接收速度吗?" - 不。
recv只返回尽可能多的数据,只要它们是可用的小于给定的数字。因此,一次读取较少的数据甚至可能导致性能下降。 -
用 c/c++ 重写应用程序会有什么不同吗?
-
在此处阅读有关“框架”您的消息stackoverflow.com/a/62874813/2836621
标签: python-3.x multithreading performance sockets asynchronous