【问题标题】:Delegating to threads while preserving linear readability在保持线性可读性的同时委托给线程
【发布时间】:2023-12-03 08:05:01
【问题描述】:

我一直在尝试不同的方法来处理结果断开连接的阻塞方法,同时保持可能已被中断的状态。我发现不得不处理发送和接收难以对齐的不同类和方法是令人沮丧的。

在下面的例子中,SomeBlockingMethod() 通常返回void 作为消息发送到其他进程。但相反,我使用接收结果的侦听器将其设为synchronized。通过将其转为线程,我可以wait() 获得超时或无限期的结果。

这很好,因为一旦返回结果,我就可以继续处理在等待线程任务结果时必须暂停的特定状态。

这我的方法有什么问题吗?

虽然这个问题可能看起来很笼统,但我特意寻找关于 Java 中的线程的建议。

示例伪代码:

public class SomeClass implements Command {

@Override
public void onCommand() {
   Object stateObject = new SomeObjectWithState();

   // Do things with stateObject

   Runnable rasync = () -> {
      Object r = SomeBlockingMethod();

      // Blocking method timed out
      if (r == null)
         return;

      Runnable rsync = () -> {
         // Continue operation on r which must be done synchronously

         // Also do things with stateObject
      };

      Scheduler().run(rsync);
   };

   Scheduler().run(rasync);
}

使用 CompletableFuture 更新:

CompletableFuture<Object> f = CompletableFuture.supplyAsync(() -> {
   return SomeBlockingMethod();
});

f.thenRun(() -> { () -> {
   String r = null;

   try {
      r = f.get();
   }
   catch (Exception e) {
      e.printStackTrace();
   }

   // Continue but done asynchronously
});

或者更好:

CompletableFuture.supplyAsync(() -> {
   return SomeBlockingMethod();
}).thenAccept((
   Object r) -> {

   // Continue but done asynchronously
});

严格使用CompletableFuture的问题在于CompletableFuture.thenAccept是从全局线程池运行的,不保证与调用线程同步。

为同步任务添加调度程序可以解决这个问题:

CompletableFuture.supplyAsync(() -> {
   return SomeBlockingMethod();
}).thenAccept((
   Object r) -> {

   Runnable rsync = () -> {
      // Continue operation on r which must be done synchronously
   };

   Scheduler().run(rsync);
});

与完整的调度程序方法相比,使用CompletableFuture 的一个警告是,存在于外部的任何先前状态都必须是最终的或有效的最终状态。

【问题讨论】:

  • CompletableFuture 解决了这个问题。 docs.oracle.com/javase/8/docs/api/java/util/concurrent/…
  • AdamSkywalker,自从查看您的评论后,我对 CompletableFuture 进行了大量研究,它彻底改变了我在 Java 中处理异步任务的方式。请将此作为答案提交,以便我奖励您的帮助。

标签: java multithreading asynchronous completable-future


【解决方案1】:

您应该查看RxJava,它使用流操作并支持线程。

api.getPeople()
  .observeOn(Schedulers.computation())
  .filter(p -> return p.isEmployee();)
  .map(p -> return String.format("%s %s - %s", p.firstName(), p.lastName(), p.payrollNumber());)
  .toList()
  .observerOn(<ui scheudler>)
  .subscirbe(p -> screen.setEmployees(p);)

【讨论】:

    最近更新 更多