【问题标题】:process socket data in near realtime python在近乎实时的python中处理套接字数据
【发布时间】:2021-08-05 18:10:03
【问题描述】:

我有多达 30 个节点,每个节点能够以高达 1000 条消息/秒的速度发送数据。每条消息可以有 256-512 字节的数据。 每个节点使用唯一的 tcp 端口进行通信。接收到的每个数据都经过预处理、插入数据库和后处理。

以下是我尝试过的观察方法:-

案例 1。使用 asyncio 并在收到数据后立即处理。

async def process_packets(reader, writer, db):
          while True:
              data = reader.read(4096)
              data = pre_process(data)
              save_in_db(data)
              post_process(data)
          writer.close()

观察:- 对于单个数据包,处理通常需要 10-20 毫秒。但是随着数据包频率的增加,tcp 缓冲开始发生,即对 reader.read() 的单个调用会获取多个数据包。 这增加了当前节点以及其他节点的处理。

案例 2。将 asyncio 与推送到队列中的数据和使用此队列的工作线程一起使用。

async def process_packets(reader, writer, q):
          while True:
              data = reader.read(4096)
              q.put(data)
          writer.close()

def worker_thread(q, db):
    while True:
        data = q.get()
        data = pre_process(data)
        save_in_db(data)
        post_process(data)

观察:- 由于在接收数据包时没有进行任何处理,所有节点都能够尽快将数据放入队列中。问题出现在工作线程中,其中 q.get() 随着时间的推移变得非常缓慢。

案例 3。为每个节点创建套接字服务器线程

def server_thread(port, db):
    s = socket.socket()
    s.bind()
    s.listen(1)

    while True:
        (conn, addr) = s.accept()
        while True:
            try:
                data = conn.recv(4096)
            except Exception:
                conn.close()
                break
            data = pre_process(data)
            save_in_db(data)
            post_process(data)

观察:- 这种情况的好处是每个节点都有专门的线程来接收和处理数据,所以其他线程不受影响。但是在这里我面临socket.recv()返回的多个数据包。这会增加处理时间。

我需要一种方法来尽可能快地处理来自这些节点的数据,让应用程序 24x7 不间断运行。

操作系统 = Ubuntu20.04-lts
系统 = 英特尔 i3 第 8 代,8GB 内存,4 核

【问题讨论】:

  • 如果您只期望每条消息 256-512 字节,为什么要接收到 4096 字节的缓冲区?
  • @user207421 从旧代码库中使用。那么将4096减少到1024会加快接收速度?
  • @anishkumar: " 所以将 4096 减少到 1024 会加快接收速度吗?" - 不。recv 只返回尽可能多的数据,只要它们是可用的小于给定的数字。因此,一次读取较少的数据甚至可能导致性能下降。
  • 用 c/c++ 重写应用程序会有什么不同吗?
  • 在此处阅读有关“框架”您的消息stackoverflow.com/a/62874813/2836621

标签: python-3.x multithreading performance sockets asynchronous


【解决方案1】:

但在这里我面临 socket.recv() 返回的多个数据包

TCP 是一个字节流,即在这一层没有数据包。您可能是指应用程序级别的消息。您的代码必须能够自己处理多个或部分应用程序消息,因为 TCP 本身不提供消息语法。虽然您似乎在足够快的阅读速度时只能收到完整的消息,但这并不能保证,最终您的应用程序可能会停顿一小段时间(由于调度)并且消息会累积。

处理从socket.recv() 返回的多条消息甚至可能是一个优势。一次读取多条消息意味着单个系统调用会返回更多的应用程序数据,从而提高应用程序的效率(相同数量的工作需要更少的系统调用)。因此,最好在单个recv 中尽可能多地阅读,而不是希望只收到一条消息。

至于另一种设计:每个节点一个线程的最后一种方法扩展性最好,因为在这种情况下,工作(以及负载)分布在多个 CPU 内核上。其他方法仅使用单个 CPU 内核。但是这些方法都不能真正保证您的特定系统能够处理那么多数据。它们的区别仅在于它们对底层系统提供的资源的利用程度。

【讨论】:

  • 我完全同意你的看法。我已经在预处理时处理了多个/部分数据包。问题在于多个/部分数据包大小更多,预处理需要更多时间。对于最后一种情况,当我使用 multiprocess.Process 时,我的 CPU 使用率会在几分钟内达到 100%,但如果我使用 threading.Thread(),CPU 使用率是 50-60%。同样经过几个小时的工作应用程序停顿后,应用程序日志或系统日志中没有错误消息。如果它的系统相关问题,那么我有 i9 10-gen,32GB 系统供使用。
  • @anishkumar: “在几个小时的工作应用程序停顿之后” - 我怀疑你的代码中存在竞争条件,即你遇到的东西只有很少发生,但未能正确解决。当然,不可能再进一步指出,因为您的代码基本上一无所知。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-11-20
  • 1970-01-01
  • 2012-08-05
  • 2014-03-18
  • 1970-01-01
  • 1970-01-01
  • 2012-07-02
相关资源
最近更新 更多