【问题标题】:ruby 'async/io' and Reactor, help understand the exampleruby 'async/io' 和 Reactor,帮助理解示例
【发布时间】:2018-11-23 11:46:20
【问题描述】:

我需要一些帮助来理解here 给出的基本“async/io”示例:

require 'async/io'

def echo_server(endpoint)
    Async::Reactor.run do |task|
        # This is a synchronous block within the current task:
        endpoint.accept do |client|
            # This is an asynchronous block within the current reactor:
            data = client.read(512)

            # This produces out-of-order responses.
            task.sleep(rand * 0.01)

            client.write(data.reverse)
        end
    end
end

def echo_client(endpoint, data)
    Async::Reactor.run do |task|
        endpoint.connect do |peer|
            result = peer.write(data)

            message = peer.read(512)

            puts "Sent #{data}, got response: #{message}"
        end
    end
end

Async::Reactor.run do
    endpoint = Async::IO::Endpoint.tcp('0.0.0.0', 9000)

    server = echo_server(endpoint)

    5.times.collect do |i|
        echo_client(endpoint, "Hello World #{i}")
    end.each(&:wait)

    server.stop
end

反应堆模式(如果错了请纠正)基本上是一种同步任务的调度程序,这样在阻塞时,一个任务被挂起,另一个任务被启动,依此类推,一旦任务完成,任务就会恢复。解锁[source]

在给定的 github 示例中,首先定义了返回 Async::Task 的 echo_server 方法,并将其分配给服务器变量 server

现在创建了变量,底层任务开始监听套接字并被client.read(512) 调用阻塞。它被挂起,流到达循环部分,5 个客户端Async::Tasks 将消息一一写入套接字。

现在发生了一些我不明白的事情。服务器任务被解锁并回复第一条消息。之后它应该退出,因为没有循环。然而,它服务于所有五个请求并在此之后退出。显然有什么我弄错了,但我无法弄清楚。任何 cmets 都非常感谢。

【问题讨论】:

    标签: ruby async-await reactor tcpsocket


    【解决方案1】:

    echo_client 在循环中被调用时被执行了 5 次。该函数调用endpoint.connect 并发送一条消息并读取一条响应。

    echo_server 被执行 1 次并调用 endpoint.accept 为每个连接生成块。服务器读取一条消息并将其写回。

    服务器任务被解锁并回复第一条消息。之后它应该退出,因为没有循环。

    endpoint.accept 实现为a loop

            def accept(backlog = Socket::SOMAXCONN, &block)
                bind do |server|
                    server.listen(backlog)
    
                    server.accept_each(&block)
                end
            end
    

    这里是implementation of server.accept_each

            def accept_each(task: Task.current)
                task.annotate "accepting connections #{self.local_address.inspect}"
    
                while true
                    self.accept(task: task) do |io, address|
                        yield io, address, task: task
                    end
                end
            end
    

    如您所见,它绑定到套接字,侦听传入的连接,然后在循环中调用接受。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-07-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-11-02
      • 1970-01-01
      相关资源
      最近更新 更多