【问题标题】:Vert.x Event loop - How is this asynchronous?Vert.x 事件循环 - 这是如何异步的?
【发布时间】:2016-02-27 19:12:44
【问题描述】:

我正在使用 Vert.x,并且对基于事件循环的服务器非常陌生,而不是线程/连接模型。

public void start(Future<Void> fut) {
    vertx
        .createHttpServer()
        .requestHandler(r -> {
            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME));
            final MyModel model = new MyModel();
            try {

                for(int i=0;i<10000000;i++){
                    //some simple operation
                }

                model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);

            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }

          r.response().end(
                  new Gson().toJson(model)
                 );
        })
        .listen(4568, result -> {
          if (result.succeeded()) {
            fut.complete();
          } else {
            fut.fail(result.cause());
          }
        });
    System.out.println("Server started ..");
  }
  • 我只是想模拟一个长时间运行的请求处理程序来了解这个模型是如何工作的。
  • 我观察到的是所谓的事件循环被阻塞,直到我的第一个请求完成。无论花费多少时间,在前一个请求完成之前,后续请求都不会被执行。
  • 显然我在这里遗漏了一块,这就是我在这里的问题。

根据目前的答案进行编辑:

  1. 不接受所有被认为是异步的请求吗?如果一个新的 只有前一个被清除后才能接受连接 关闭,它是如何异步的?
    • 假设一个典型的请求需要 100 毫秒到 1 秒之间的任何时间(取决于请求的种类和性质)。所以这意味着, 直到前一个请求,事件循环才能接受新连接 完成(即使它在一秒钟内结束)。如果我作为程序员 必须考虑所有这些并将此类请求处理程序推送到 工作线程,那么它与线程/连接有什么不同 模型?
    • 我只是想了解这个模型与传统的线程/连接服务器模型相比如何更好?假设没有 I/O 操作或 所有的 I/O 操作都是异步处理的?它甚至如何解决 c10k 问题,当它无法并行启动所有并发请求并且必须等到前一个请求终止时?
  2. 即使我决定将所有这些操作推送到一个工作线程(池化),那我又回到了同样的问题,不是吗?线程之间的上下文切换? 编辑并置顶这个问题以获得赏金

    • 不完全理解这个模型是如何声称是异步的。
    • Vert.x 有一个异步 JDBC 客户端(Asyncronous 是关键字),我尝试将其与 RXJava 适配。
    • 这是一个代码示例(相关部分)

    server.requestStream().toObservable().subscribe(req -> {

            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME));
            jdbc.getConnectionObservable().subscribe(
                    conn -> {
    
                        // Now chain some statements using flatmap composition
                        Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'");
                        // Subscribe to the final result
                        resa.subscribe(resultSet -> {
    
                            req.response().end(resultSet.getRows().toString());
                            System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                        }, err -> {
                            System.out.println("Database problem");
                            err.printStackTrace();
                        });
                    },
    
                    // Could not connect
                    err -> {
                        err.printStackTrace();
                    }
                    );
    
    });
    server.listen(4568);
    
    • 那里的选择查询大约需要 3 秒才能返回完整的表转储。
    • 当我触发并发请求(仅尝试 2 个)时,我看到第二个请求完全等待第一个请求完成。
    • 如果 JDBC 选择是异步的,让框架在等待选择查询返回任何内容时处理第二个连接不是一个公平的期望吗?

【问题讨论】:

  • TLDR:对于长时间运行的阻塞任务,策略是切换到后台线程池(经典多线程),它不使用与事件循环相同的线程以避免阻塞。

标签: java nio vert.x undertow


【解决方案1】:

在这些类型的处理引擎中,应该将长时间运行的任务转换为异步执行的操作,这是执行此操作的一种方法,以便关键线程可以尽快完成并返回执行另一项任务。即任何 IO 操作都会传递给框架,以便在 IO 完成时回调您。

该框架是异步的,因为它支持您生成和运行这些异步任务,但它不会将您的代码从同步更改为异步。

【讨论】:

  • 异步 i/o 意味着回调是异步执行的。但是是的,如果只有 1 个线程来执行您的任何代码,那与许多线程不同。我不确定所用标签的技术差异是什么。一种是多线程异步,另一种是延迟执行异步。
【解决方案2】:

这是如何异步的?答案就在你的问题本身

我观察到的是所谓的事件循环被阻塞,直到我 第一个请求完成。无论花费多少时间,随后 在前一个请求完成之前,请求不会被执行

这个想法不是为每个 HTTP 请求提供新的服务,而是使用您被长时间运行的任务阻塞的相同线程。

事件循环的目标是节省从一个线程到另一个线程的上下文切换所涉及的时间,并在任务使用 IO/网络活动时利用理想的 CPU 时间。如果在处理您的请求时它必须进行其他 IO/网络操作,例如:在此期间从远程 MongoDB 实例获取数据,您的线程将不会被阻塞,而是另一个请求将由同一线程提供服务,这是理想的用例事件循环模型(考虑到您的服务器有并发请求)。

如果你有长时间运行的不涉及网络/IO操作的任务,你应该考虑使用线程池来代替,如果你阻塞了你的主事件循环线程本身,其他请求就会被延迟。即对于长时间运行的任务,您可以为服务器响应而付出上下文切换的代价。

编辑: 服务器处理请求的方式可能会有所不同:

1) 为每个传入请求生成一个新线程(在此模型中,上下文切换会很高,并且每次生成一个新线程都会产生额外的成本)

2) 使用线程池来处理请求(同一组线程将用于处理请求,额外的请求会排队)

3) 使用事件循环(单个线程处理所有请求。上下文切换可以忽略不计。因为会有一些线程在运行,例如:将传入请求排队)

首先上下文切换还不错,它需要保持应用服务器响应,但是如果并发请求的数量太高(大约超过 10k),过多的上下文切换可能会成为问题。如果您想更详细地了解我建议您阅读C10K article

假设一个典型的请求需要 100 毫秒到 1 秒之间的任何时间(基于 关于请求的种类和性质)。所以这意味着,事件循环 在前一个请求完成之前不能接受新连接(甚至 如果它在一秒钟内结束)。

如果您需要响应大量并发请求(超过 10k),我会认为超过 500 毫秒是一个运行时间更长的操作。其次,就像我说的那样,涉及一些线程/上下文切换,例如:排队传入的请求,但是,线程之间的上下文切换将大大减少,因为一次线程太少。第三,如果在解决第一个请求时涉及网络/IO 操作,第二个请求将有机会在第一个被解决之前得到解决,这就是该模型发挥作用的地方。

如果我作为程序员必须思考 通过所有这些并将此类请求处理程序推送到工作线程, 那么它与线程/连接模型有什么不同呢?

Vertx 正在努力为您提供最好的线程和事件循环,因此,作为程序员,您可以就如何在两种情况下(即有和没有网络/IO 操作的长时间运行操作)下使您的应用程序高效进行调用。

我只是想了解这个模型如何更好地从 传统的线程/连接服务器模型?假设没有 I/O 操作或 所有的 I/O 操作都是异步处理的?它甚至如何解决c10k 问题,当它不能并行启动所有并发请求并且 必须等到前一个终止?

上面的解释应该回答了这个问题。

即使我决定将所有这些操作都交给工人 线程(池),然后我又回到了同样的问题,不是吗?语境 在线程之间切换?

就像我说的那样,两者都有优点和缺点,vertx 为您提供模型,并且根据您的用例,您必须选择最适合您的场景的模型。

【讨论】:

  • 我已根据回复编辑了问题。我知道长时间运行的操作必须被推送到工作线程。您如何定义长时间运行的操作。如果一个典型的请求需要一秒钟才能完成,那么对于我的应用程序类型来说它运行时间不长,但同时事件循环不会处理同时到达的 10 个并发请求。这是一个更好的模型吗?
【解决方案3】:

Vert.x 事件循环实际上是许多平台上存在的经典事件循环。当然,大多数解释和文档都可以在 Node.js 中找到,因为它是基于这种架构模式的最流行的框架。看看 Node.js 事件循环下的一个或多或少好的explanation 机制。 Vert.x tutorial 在“不要打电话给我们,我们会打电话给你”和“Verticles”之间也有很好的解释。

编辑您的更新:

首先,当您使用事件循环时,主线程应该非常快速地处理所有请求。你不应该在这个循环中做任何长时间的工作。当然,您不应该等待对数据库调用的响应。 - 异步安排通话 - 为结果分配回调(处理程序) - 回调将在工作线程中执行,而不是事件循环线程。例如,此回调将向套接字返回响应。 因此,您在事件循环中的操作应该只安排所有带有回调的异步操作,然后转到下一个请求而不等待任何结果。

假设一个典型的请求需要 100 毫秒到 1 秒之间的任何时间(取决于请求的种类和性质)。

在这种情况下,您的请求包含一些计算昂贵的部分或对 IO 的访问 - 您在事件循环中的代码不应等待这些操作的结果。

我只是想了解这个模型与传统的线程/连接服务器模型相比如何更好?假设没有 I/O 操作或所有 I/O 操作都是异步处理的?

当您有太多并发请求和传统编程模型时,您将为每个请求创建线程。这个线程会做什么?他们将主要等待 IO 操作(例如,来自数据库的结果)。这是一种资源的浪费。在我们的事件循环模型中,您有一个主线程来调度操作和为长任务预先分配数量的工作线程。 + 这些工作人员实际上都没有等待响应,他们只是可以在等待 IO 结果的同时执行另一个代码(它可以实现为回调或定期检查当前正在进行的 IO 作业的状态)。我建议您通过 Java NIO 和 Java NIO 2 了解如何在框架内实际实现此异步 IO。 Green threads 也是一个非常相关的概念,这很好理解。绿色线程和协程是一种影子事件循环,它们试图实现相同的目标——更少的线程,因为我们可以在绿色线程等待某些东西的同时重用系统线程。

当它无法并行启动所有并发请求并且必须等到前一个请求终止时,它如何解决 c10k 问题?

当然,我们不会在主线程中等待发送前一个请求的响应。获取请求,安排长/IO 任务执行,下一个请求。

即使我决定将所有这些操作推送到一个工作线程(池化),那我又回到了同样的问题,不是吗?线程之间的上下文切换?

如果你把一切都做对了——不。更重要的是,您将获得良好的数据局部性和执行流预测。一个 CPU 内核将执行您的短事件循环并安排异步工作,而无需上下文切换,仅此而已。其他核心调用数据库并返回响应,仅此而已。在回调之间切换或检查不同通道的 IO 状态实际上并不需要任何系统线程的上下文切换——它实际上是在一个工作线程中工作。因此,我们每个核心有一个工作线程,这个系统线程等待/检查来自多个数据库连接的结果可用性。重新访问Java NIO 概念以了解它如何以这种方式工作。 (NIO 的经典示例 - 可以接受许多并行连接(数千个)的代理服务器,代理对一些其他远程服务器的请求,侦听响应并将响应发送回客户端,所有这些都使用一个或两个线程)

关于您的代码,我为您制作了一个示例project,以证明一切都按预期工作:

public class MyFirstVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
                .put("url", "jdbc:hsqldb:mem:test?shutdown=true")
                .put("driver_class", "org.hsqldb.jdbcDriver")
                .put("max_pool_size", 30));


        client.getConnection(conn -> {
            if (conn.failed()) {throw new RuntimeException(conn.cause());}
            final SQLConnection connection = conn.result();

            // create a table
            connection.execute("create table test(id int primary key, name varchar(255))", create -> {
                if (create.failed()) {throw new RuntimeException(create.cause());}
            });
        });

        vertx
            .createHttpServer()
            .requestHandler(r -> {
                int requestId = new Random().nextInt();
                System.out.println("Request " + requestId + " received");
                    client.getConnection(conn -> {
                         if (conn.failed()) {throw new RuntimeException(conn.cause());}

                         final SQLConnection connection = conn.result();

                         connection.execute("insert into test values ('" + requestId + "', 'World')", insert -> {
                             // query some data with arguments
                             connection
                                 .queryWithParams("select * from test where id = ?", new JsonArray().add(requestId), rs -> {
                                     connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}});
                                     System.out.println("Result " + requestId + " returned");
                                     r.response().end("Hello");
                                 });
                         });
                     });
            })
            .listen(8080, result -> {
                if (result.succeeded()) {
                    fut.complete();
                } else {
                    fut.fail(result.cause());
                }
            });
    }
}

@RunWith(VertxUnitRunner.class)
public class MyFirstVerticleTest {

  private Vertx vertx;

  @Before
  public void setUp(TestContext context) {
    vertx = Vertx.vertx();
    vertx.deployVerticle(MyFirstVerticle.class.getName(),
        context.asyncAssertSuccess());
  }

  @After
  public void tearDown(TestContext context) {
    vertx.close(context.asyncAssertSuccess());
  }

  @Test
  public void testMyApplication(TestContext context) {
      for (int i = 0; i < 10; i++) {
          final Async async = context.async();
          vertx.createHttpClient().getNow(8080, "localhost", "/",
                            response -> response.handler(body -> {
                                context.assertTrue(body.toString().contains("Hello"));
                                async.complete();
                            })
        );
    }
  }
}

输出:

Request 1412761034 received
Request -1781489277 received
Request 1008255692 received
Request -853002509 received
Request -919489429 received
Request 1902219940 received
Request -2141153291 received
Request 1144684415 received
Request -1409053630 received
Request -546435082 received
Result 1412761034 returned
Result -1781489277 returned
Result 1008255692 returned
Result -853002509 returned
Result -919489429 returned
Result 1902219940 returned
Result -2141153291 returned
Result 1144684415 returned
Result -1409053630 returned
Result -546435082 returned

因此,我们接受一个请求 - 安排一个对数据库的请求,转到下一个请求,我们使用所有请求,并仅在数据库完成所有操作后为每个请求发送响应。

关于您的代码示例,我发现两个可能的问题 - 首先,您似乎没有close() 连接,这对于将其返回池很重要。其次,您的池是如何配置的?如果只有一个空闲连接 - 这些请求将序列化等待此连接。

我建议您为这两个请求添加一些时间戳打印,以找到您序列化的位置。你有一些东西使事件循环中的调用被阻塞。或者...检查您是否在测试中并行发送请求。在上一个之后得到响应后不是下一个。

【讨论】:

  • 池大小 = 30,我已放置日志以发出时间戳。一个日志在收到请求时输出开始时间,在请求完成时输出另一个日志。我看到的是每个请求大约需要 4 秒才能完成,每个请求都等待前一个请求结束。根据 Vert.x 文档,他们的 JDBC 包装器是异步的,因为它不等待 sql 结果返回,但我看到的是同步行为。它会等到结果返回后再返回事件循环。
  • @user378101 查看示例project。您可以在那里开始测试并查看输出。一切都按预期工作,当然 jdbc 客户端实际上是异步的。添加了详细回答。
  • Offcourse 我正在并行发送请求。我没那么笨:- 会试试你的示例代码,过一会儿再回来。
  • 是的,现在说得通了,是的,JDBC 调用确实是异步的。非常感谢。
  • 关于语句 - 回调将在工作线程中执行,非事件循环线程,当我在回调中打印当前线程名称时,它正在打印 vert.x-eventloop-thread-2 ,不这不是说它是在事件循环中执行的吗?
猜你喜欢
  • 2017-07-07
  • 1970-01-01
  • 1970-01-01
  • 2021-06-02
  • 2016-03-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多