【问题标题】:ZeroMQ bidirectional async communication with subprocessesZeroMQ 与子进程的双向异步通信
【发布时间】:2017-09-10 09:27:33
【问题描述】:

我有一个服务器进程,它接收来自网络客户端的请求。 服务器必须调用一个外部工作进程(另一个 .py ),将数据流式传输到服务器,然后服务器将数据流回客户端。

服务器必须监视这些工作进程并向它们发送消息(基本上是杀死它们或发送消息以控制哪种数据被流式传输)。这些消息是异步的(例如取决于 Web 客户端)

我想在 ipc://-transport-class 上使用 ZeroMQ 套接字,但对 socket.recv() 方法的调用被阻塞了。

我应该使用两个套接字(一个用于将数据流式传输到服务器,另一个用于从服务器接收控制消息)?

【问题讨论】:

  • 您可以使用Poller 来实现非阻塞代码。

标签: python sockets ipc zeromq


【解决方案1】:

使用单独的套接字进行信号和消息传递总是更好

虽然Poller-instance 会有所帮助,但主要步骤是使用单独的套接字进行信号传输,使用另一个套接字进行数据流传输。总是。关键是,在这样的设置中,Poller.poll() 和事件循环都可以保持特定于套接字,并且在实时受控代码执行期间花费不超过预定义的时间。

因此,请不要犹豫,设置更丰富的信号/消息传递基础架构,作为一个环境,您将只享受更简单的控制、关注点分离和意图清晰。

ZeroMQ 是执行此操作的绝佳工具 - 包括每个套接字 IO 线程的关联性,因此确实可以轻松进行细粒度的性能调整。

【讨论】:

  • +1 用于使用两个单独的套接字的想法。我也觉得是为什么要去。我正在考虑使用普通套接字和处理消息的单独线程。 ZeroMQ 如何在这方面为我提供帮助?基本上,我不太明白你的最后一句话。
  • 一旦管理了几个琐碎的 ZeroMQ 可扩展正式通信行为原型,接下来就是性能。使用 ZeroMQ API,可以以细粒度方式调整最大性能/最小延迟,一个套接字具有绝对优先级(私有 ZeroMQ 数据泵 IO 线程,而其他低优先级可以共享其他 ZeroMQ 数据泵 IO 线程- pool --- 仍然在一个高级 API 下,因此能够从相同的高级语言 pre-API ZeroMQ 套接字抽象设置和调整所有马戏团。没有人愿意在低级覆盖所有这些-级别资源处理试验/错误)
【解决方案2】:

我想如果想出了一个解决方案,但我不知道是否有更好(更有效,更安全,......)的方式来做到这一点。 clientserver 发出请求,这会产生 N 进程 worker 来参与请求。

这是来自worker.py的相关摘录:

for i in range(start_counter,10):
    # Check if there is any message from server
    while True:
        try:
            msg = worker.recv(zmq.DONTWAIT)
            print("Received {} from server".format(msg))
        except zmq.Again:
            break

    # Send data to server
    worker.send(b"Message {} from {}".format(i, worker_id))

    # Take some sleep
    time.sleep(random.uniform(0.3, 1.1))

这样,worker a) 不需要单独的套接字,b) 不需要单独的线程来处理来自server 的消息。

在实际实现中,worker 必须以 100Hz 的频率将 128 字节的消息流式传输到server,并且server 必须接收大量此类消息(许多客户端提出的请求每个需要 3-10 个worker) . 如果以这种方式实施,这种方法会受到性能影响吗?

【讨论】:

    猜你喜欢
    • 2018-08-03
    • 2011-06-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-05-02
    • 1970-01-01
    • 1970-01-01
    • 2021-04-13
    相关资源
    最近更新 更多