【问题标题】:How to send immediately asynchronous response for request如何立即发送请求的异步响应
【发布时间】:2017-12-30 04:34:43
【问题描述】:

我正在尝试找到一种异步方式来立即返回对客户端请求的响应。

我只需要记录请求数据,调用新线程在其他服务器上请求昂贵的操作(一些后端操作) 并且不等待他们的响应立即返回 200 状态响应给客户端。

此时我正在尝试使用 CompletableFuture 来实现,但我错过了一些东西。

package com.example.controller;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.concurrent.CompletableFuture;

@Path("/class")
public class AsynchronousResponse {

private static final Logger LOGGER = (Logger) LogManager.getLogger(AsynchronousResponse.class.getName());
private static final int HTTP_STATUS_OK = 200;
private static final String EMPTY_CONTENT = "";

@Context
private HttpServletRequest httpRequest;

@Path("/method")
@GET
@Consumes(MediaType.APPLICATION_JSON)
public Response asyncResponseSupply() {
    LOGGER.info(httpRequest.getSession().getId() + " : New session received");
    CompletableFuture.supplyAsync(this::veryExpensiveOperations);
    LOGGER.info(httpRequest.getSession().getId() + " : Returning empty response... ");
    return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build();
}

// need to do some operations on data from httpRequest
private String veryExpensiveOperations() {
    LOGGER.info(httpRequest.toString());
    LOGGER.info("Start very expensive operations");
    try {
        Thread.sleep(3000);
        LOGGER.info("Finished");
        return "DONE";
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
        LOGGER.error("Error: " + e.getMessage());
        return "ERROR";
    }
}

}

总而言之,我会立即得到响应,但是 veryExpensiveOperations() 方法似乎丢失了 httpRequest 值,这对我来说太糟糕了,因为我需要使用来自客户端请求的值调用其他 Web 服务。感谢您的帮助!

对于我的应用,我使用的是 Jetty ver。 9.2.18.v20160721

【问题讨论】:

    标签: java asynchronous java-8 jetty completable-future


    【解决方案1】:

    一旦对 servlet 的分派完成,就会提交响应并回收请求和响应对象。

    要在 servlet 规范上进行处理,您必须使用 AsyncContext(通过 HttpServletRequest.startAsync() 调用获得)让容器知道请求/响应尚未完成,并且正在对其进行处理-dispatch 线程(比如你的CompletableFuture.supplyAsync() 调用)

    至于如何混合 JAX-RS 和AsyncContext,我不知道。我什至不知道 JAX-RS 是否已经更新以支持 AsyncContext

    【讨论】:

      【解决方案2】:

      感谢 Joakim 的回答(我不能投赞成票,因为我在这里很新,而且我没有足够的声誉)。根据您对AsyncContext 的建议,我找到了一种发送快速响应的方法。更改后我的代码(适用于 runAsync 和 supplyAsync):

      package com.example.controller;
      
      import org.apache.logging.log4j.LogManager;
      import org.apache.logging.log4j.core.Logger;
      
      import javax.servlet.AsyncContext;
      import javax.servlet.http.HttpServletRequest;
      import javax.ws.rs.Consumes;
      import javax.ws.rs.GET;
      import javax.ws.rs.Path;
      import javax.ws.rs.core.Context;
      import javax.ws.rs.core.MediaType;
      import javax.ws.rs.core.Response;
      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.ForkJoinPool;
      
      
      @Path("/test")
      public class AsynchronousResponse {
      
      private static final Logger LOGGER = (Logger) LogManager.getLogger(AsynchronousResponse.class.getName());
      private static final int HTTP_STATUS_OK = 200;
      private static final String EMPTY_CONTENT = "";
      
      @Context
      private HttpServletRequest httpRequest;
      
      private AsyncContext asyncContext;
      
      @Path("/supply")
      @GET
      @Consumes(MediaType.APPLICATION_JSON)
      public Response asyncResponseSupply() {
          String loggerPrefix = httpRequest.getSession().getId() + " :[SUPPLY] ";
          LOGGER.info(loggerPrefix + "New session received");
          LOGGER.info(loggerPrefix + "Active threads on controller init: " + Thread.activeCount());
          asyncContext = httpRequest.startAsync();
          ForkJoinPool pool = new ForkJoinPool(
                  Runtime.getRuntime().availableProcessors(),
                  ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                  null,
                  true);
          CompletableFuture.supplyAsync(this::veryExpensiveOperations, pool);
          LOGGER.info(loggerPrefix + "Actual active Threads on controller return: " + Thread.activeCount());
          LOGGER.info(loggerPrefix + "Returning empty response... ");
          return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build();
      }
      
      @Path("/run")
      @GET
      @Consumes(MediaType.APPLICATION_JSON)
      public Response asyncResponseSupplyRun() {
          String loggerPrefix = httpRequest.getSession().getId() + " :[RUN] ";
          LOGGER.info(loggerPrefix + "New session received");
          LOGGER.info(loggerPrefix + "Active threads on controller init: " + Thread.activeCount());
          asyncContext = httpRequest.startAsync();
          ForkJoinPool pool = new ForkJoinPool(
                  Runtime.getRuntime().availableProcessors(),
                  ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                  null,
                  true);
          CompletableFuture.runAsync(this::veryExpensiveOperations, pool);
          LOGGER.info(loggerPrefix + "Actual active Threads on controller return: " + Thread.activeCount());
          LOGGER.info(loggerPrefix + "Returning empty response... ");
          return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build();
      }
      
      // need to do some operations on data from httpRequest
      private String veryExpensiveOperations() {
          String loggerPrefix = httpRequest.getSession().getId() + " :[veryExpensiveOperations] ";
          LOGGER.info(loggerPrefix + "Request toString: " + httpRequest.toString());
          LOGGER.info(loggerPrefix + "Start very expensive operations");
      
          try {
              Thread.sleep(3000);
              LOGGER.info(loggerPrefix + "Thread sleep finished");
              LOGGER.info(loggerPrefix + "Actual active Threads: " + Thread.activeCount());
              Thread.currentThread().interrupt();
              asyncContext.complete();
              return "DONE";
      
          } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
              e.printStackTrace();
              LOGGER.error(loggerPrefix + "Error: " + e.getMessage());
              asyncContext.complete();
              return "ERROR";
      
          }
      }
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-05-28
        • 1970-01-01
        • 2020-04-17
        • 2016-06-09
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多