【问题标题】:Retry logic with CompletableFuture使用 CompletableFuture 重试逻辑
【发布时间】:2017-03-21 23:45:38
【问题描述】:

我需要在我正在处理的异步框架中提交任务,但我需要捕获异常,并在“中止”之前多次重试同一任务。

我正在使用的代码是:

int retries = 0;
public CompletableFuture<Result> executeActionAsync() {

    // Execute the action async and get the future
    CompletableFuture<Result> f = executeMycustomActionHere();

    // If the future completes with exception:
    f.exceptionally(ex -> {
        retries++; // Increment the retry count
        if (retries < MAX_RETRIES)
            return executeActionAsync();  // <--- Submit one more time

        // Abort with a null value
        return null;
    });

    // Return the future    
    return f;
}

目前无法编译,因为 lambda 的返回类型错误:它需要 Result,但 executeActionAsync 返回 CompletableFuture&lt;Result&gt;

如何实现这个完全异步的重试逻辑?

【问题讨论】:

  • 你不能简单地处理executeMycustomActionHere()里面的重试吗?也许只是传递一个带有你想要的重试次数的参数。
  • @DidierL 对不起,我不明白我应该做什么。我只是将问题移到该函数中。不是吗?
  • 是的,除非您将重试与CompletableFuture 的处理分开,这并不是真正为此而设计的。

标签: java exception asynchronous concurrency java-8


【解决方案1】:

链接后续重试可以很简单:

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.exceptionally(t -> executeMycustomActionHere().join());
    }
    return f;
}

了解以下缺点
这只是按照预期链接尽可能多的重试,因为这些后续阶段在非异常情况下不会做任何事情。

一个缺点是,如果第一次尝试立即失败,那么当第一个 exceptionally 处理程序被链接时,f 已经异常完成,该操作将由调用线程调用,消除了请求的异步性质完全。通常,join() 可能会阻塞一个线程(默认执行器会启动一个新的补偿线程,但仍然不鼓励这样做)。不幸的是,exceptionallyAsyncexceptionallyCompose 方法都没有。

不调用join() 的解决方案是

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.thenApply(CompletableFuture::completedFuture)
           .exceptionally(t -> executeMycustomActionHere())
           .thenCompose(Function.identity());
    }
    return f;
}

演示如何将“compose”和“exceptionally”处理程序结合起来。

此外,如果所有重试均失败,则只会报告最后一个异常。更好的解决方案应该报告第一个异常,然后将重试的后续异常添加为抑制异常。可以通过链接递归调用来构建这样的解决方案,正如Gili’s answer 所暗示的那样,但是,为了使用这种思想进行异常处理,我们必须使用上面显示的“组合”和“异常”的步骤:

public CompletableFuture<Result> executeActionAsync() {
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> retry(t, 0))
        .thenCompose(Function.identity());
}
private CompletableFuture<Result> retry(Throwable first, int retry) {
    if(retry >= MAX_RETRIES) return CompletableFuture.failedFuture(first);
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> { first.addSuppressed(t); return retry(first, retry+1); })
        .thenCompose(Function.identity());
}

CompletableFuture.failedFuture 是一种 Java 9 方法,但如果需要,将与 Java 8 兼容的反向端口添加到您的代码中将是微不足道的:

public static <T> CompletableFuture<T> failedFuture(Throwable t) {
    final CompletableFuture<T> cf = new CompletableFuture<>();
    cf.completeExceptionally(t);
    return cf;
}

【讨论】:

  • 注意:如果您不想向调用者报告异常,而是回退到问题中指出的null 值(尽管我不推荐这种编码风格),您可以在任一解决方案中,只需将 .exceptionally(t -&gt; null) 链接到所产生的未来。
  • 如果您的whenCompleteAsync()executeMycustomActionHere() 在同一个Executor 上运行,我认为您可能会因为这里的join() 而出现死锁,因为任务将排队永远没有机会被处决。
  • @Didier L:由于没有指定执行器,因此将使用默认执行器。在这种情况下,底层的 Fork/Join 框架将启动新线程来补偿在CompletableFuture.join() 中阻塞的线程(它在内部调用ForkJoinPool.managedBlock(…))。但如果你明确指定Executor,那不是ForkJoinPool,你是对的。
  • @tkruse 那么,CompletableFuture 及其有限的 API 可能不适合这项工作。并发 API 中还有其他工具可以构建更好的解决方案。
  • @tkruse 很好,一个解决方案非常复杂,涉及可变状态,另一个解决方案,一年多后添加,避开了做异常处理的实际要求,通过假设有一个 Response 类型具有状态代码,这使任务更容易,但不能解决 OP 的问题。然而,那个迟到的答案引导我找到了一个真正的非阻塞解决方案来解决 OP 的任务,所以它现在得到了我的支持,我更新了我的答案。我仍然认为CompletableFuture 不是最好的 API。我认为,我的答案代码说明了这一点。
【解决方案2】:

我想我成功了。这是我创建的示例类和测试代码:


RetriableTask.java

public class RetriableTask
{
    protected static final int MAX_RETRIES = 10;
    protected int retries = 0;
    protected int n = 0;
    protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();

    public RetriableTask(int number) {
        n = number;
    }

    public CompletableFuture<Integer> executeAsync() {
        // Create a failure within variable timeout
        Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
        CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);

        // Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts. 
        // In real application this should be a real future
        final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
        if (n > 5 && retries > 5)
            taskFuture.complete(retries * n);

        // Attach the failure future to the task future, and perform a check on completion
        taskFuture.applyToEither(timeoutFuture, Function.identity())
            .whenCompleteAsync((result, exception) -> {
                if (exception == null) {
                    future.complete(result);
                } else {
                    retries++;
                    if (retries >= MAX_RETRIES) {
                        future.completeExceptionally(exception);
                    } else {
                        executeAsync();
                    }
                }
            });

        // Return the future    
        return future;
    }
}

用法

int size = 10;
System.out.println("generating...");
List<RetriableTask> tasks = new ArrayList<>();
for (int i = 0; i < size; i++) {
    tasks.add(new RetriableTask(i));
}

System.out.println("issuing...");
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < size; i++) {
    futures.add(tasks.get(i).executeAsync());
}

System.out.println("Waiting...");
for (int i = 0; i < size; i++) {
    try {
        CompletableFuture<Integer> future = futures.get(i);
        int result = future.get();
        System.out.println(i + " result is " + result);
    } catch (Exception ex) {
        System.out.println(i + " I got exception!");
    }
}
System.out.println("Done waiting...");

输出

generating...
issuing...
Waiting...
0 I got exception!
1 I got exception!
2 I got exception!
3 I got exception!
4 I got exception!
5 I got exception!
6 result is 36
7 result is 42
8 result is 48
9 result is 54
Done waiting...

主要思想和一些胶水代码(failAfter函数)来自here

欢迎任何其他建议或改进。

【讨论】:

    【解决方案3】:

    我建议不要实现自己的重试逻辑,而是使用经过验证的库,例如 failsafe,它内置了对期货的支持(并且似乎比 guava-retrying 更受欢迎)。对于您的示例,它看起来像:

    private static RetryPolicy retryPolicy = new RetryPolicy()
        .withMaxRetries(MAX_RETRIES);
    
    public CompletableFuture<Result> executeActionAsync() {
        return Failsafe.with(retryPolicy)
            .with(executor)
            .withFallback(null)
            .future(this::executeMycustomActionHere);
    }
    

    您可能应该避免使用.withFallback(null),而只是让返回的future 的.get() 方法抛出结果异常,以便您的方法的调用者可以专门处理它,但这是您必须做出的设计决定。

    需要考虑的其他事项包括您是否应该立即重试或在尝试之间等待一段时间、任何类型的递归退避(当您调用可能已关闭的 Web 服务时很有用)以及是否存在特定异常不值得重试(例如,如果方法的参数无效)。

    【讨论】:

      【解决方案4】:

      实用程序类:

      public class RetryUtil {
      
          public static <R> CompletableFuture<R> retry(Supplier<CompletableFuture<R>> supplier, int maxRetries) {
              CompletableFuture<R> f = supplier.get();
              for(int i=0; i<maxRetries; i++) {
                  f=f.thenApply(CompletableFuture::completedFuture)
                      .exceptionally(t -> {
                          System.out.println("retry for: "+t.getMessage());
                          return supplier.get();
                      })
                      .thenCompose(Function.identity());
              }
              return f;
          }
      }
      

      用法:

      public CompletableFuture<String> lucky(){
          return CompletableFuture.supplyAsync(()->{
              double luckNum = Math.random();
              double luckEnough = 0.6;
              if(luckNum < luckEnough){
                  throw new RuntimeException("not luck enough: " + luckNum);
              }
              return "I'm lucky: "+luckNum;
          });
      }
      @Test
      public void testRetry(){
          CompletableFuture<String> retry = RetryUtil.retry(this::lucky, 10);
          System.out.println("async check");
          String join = retry.join();
          System.out.println("lucky? "+join);
      }
      

      输出

      async check
      retry for: java.lang.RuntimeException: not luck enough: 0.412296354211683
      retry for: java.lang.RuntimeException: not luck enough: 0.4099777199676573
      lucky? I'm lucky: 0.8059089479049389
      

      【讨论】:

        【解决方案5】:

        我最近使用guava-retrying 库解决了一个类似的问题。

        Callable<Result> callable = new Callable<Result>() {
            public Result call() throws Exception {
                return executeMycustomActionHere();
            }
        };
        
        Retryer<Boolean> retryer = RetryerBuilder.<Result>newBuilder()
                .retryIfResult(Predicates.<Result>isNull())
                .retryIfExceptionOfType(IOException.class)
                .retryIfRuntimeException()
                .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRIES))
                .build();
        
        CompletableFuture.supplyAsync( () -> {
            try {
                retryer.call(callable);
            } catch (RetryException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
               e.printStackTrace();
            }
        });
        

        【讨论】:

        • 我在文档中看到没有关于 Retryer 的非阻塞属性的保证。
        【解决方案6】:

        这是一种适用于任何 CompletionStage 子类的方法,并且不会返回一个虚拟的 CompletableFuture,它只会等待其他期货更新。

        /**
         * Sends a request that may run as many times as necessary.
         *
         * @param request  a supplier initiates an HTTP request
         * @param executor the Executor used to run the request
         * @return the server response
         */
        public CompletionStage<Response> asyncRequest(Supplier<CompletionStage<Response>> request, Executor executor)
        {
            return retry(request, executor, 0);
        }
        
        /**
         * Sends a request that may run as many times as necessary.
         *
         * @param request  a supplier initiates an HTTP request
         * @param executor the Executor used to run the request
         * @param tries    the number of times the operation has been retried
         * @return the server response
         */
        private CompletionStage<Response> retry(Supplier<CompletionStage<Response>> request, Executor executor, int tries)
        {
            if (tries >= MAX_RETRIES)
                throw new CompletionException(new IOException("Request failed after " + MAX_RETRIES + " tries"));
            return request.get().thenComposeAsync(response ->
            {
                if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL)
                    return retry(request, executor, tries + 1);
                return CompletableFuture.completedFuture(response);
            }, executor);
        }
        

        【讨论】:

          【解决方案7】:

          也许已经晚了,但希望有人会发现这很有用,我最近解决了这个问题,用于在失败时重试 rest API 调用。就我而言,我必须重试 500 HTTP 状态代码,下面是我的休息客户端代码(我们正在使用播放框架中的 WSClient),您可以根据需要将其更改为任何休息客户端。

           int MAX_RETRY = 3;
           CompletableFuture<WSResponse> future = new CompletableFuture<>();
          
           private CompletionStage<WSResponse> getWS(Object request,String url, int retry, CompletableFuture future) throws JsonProcessingException {
           ws.url(url)
                  .post(Json.parse(mapper.writeValueAsString(request)))
                  .whenCompleteAsync((wsResponse, exception) -> {
                      if(wsResponse.getStatus() == 500 && retry < MAX_RETRY) {
                          try {
                              getWS(request, retry+1, future);
                          } catch (IOException e) {
                              throw new Exception(e);
                          }
                      }else {
                          future.complete(wsResponse);
                      }
                  });
          
               return future;
          }
          

          如果状态码为 200 或不是 500,此代码将立即返回,而如果 HTTP 状态为 500,它将重试 3 次。

          【讨论】:

          • 这个确实简单好用,但是需要在出现异常的情况下处理并完成future或者competitionExceptionally,如果你是在使用异常完成异常处理。
          【解决方案8】:

          我们需要根据错误情况重试任务。

          public static <T> CompletableFuture<T> retryOnCondition(Supplier<CompletableFuture<T>> supplier,
                                                       Predicate<Throwable> retryPredicate, int maxAttempts) {
              if (maxAttempts <= 0) {
                  throw new IllegalArgumentException("maxAttempts can't be <= 0");
              }
              return retryOnCondition(supplier, retryPredicate, null, maxAttempts);
          }
          
          private static <T> CompletableFuture<T> retryOnCondition(
              Supplier<CompletableFuture<T>> supplier, Predicate<Throwable> retryPredicate,
              Throwable lastError, int attemptsLeft) {
          
              if (attemptsLeft == 0) {
                  return CompletableFuture.failedFuture(lastError);
              }
          
              return supplier.get()
                  .thenApply(CompletableFuture::completedFuture)
                  .exceptionally(error -> {
                      boolean doRetry = retryPredicate.test(error);
                      int attempts = doRetry ? attemptsLeft - 1 : 0;
                      return retryOnCondition(supplier, retryPredicate, error, attempts);
                  })
                  .thenCompose(Function.identity());
          }
          

          用法:

          public static void main(String[] args) {
              retryOnCondition(() -> myTask(), e -> {
                  //log exception
                  return e instanceof MyException;
              }, 3).join();
          }
          

          【讨论】:

            猜你喜欢
            • 2018-04-12
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2018-10-20
            • 1970-01-01
            • 2015-11-11
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多