【问题标题】:Graceful shutdown of multithreaded java server using executor service使用执行器服务优雅关闭多线程 java 服务器
【发布时间】:2022-01-24 20:50:37
【问题描述】:

我重构了一个单线程服务器,使其能够同时处理多线程并接受多个客户端。为此,我为每个新客户端生成一个新的ClientHandler 线程并将其提交给ExecutorService。我想通过在System.In 中输入新行来启动服务器关闭。

但是,我无法从内部关闭服务器(使用 Oracle 的 ExecutorService 文档中建议的关闭方法) - 有人可以向我解释为什么吗? 我的ServerRunnable,我把它和我的单个客户端线程放在同一个ThreadPool 中——这可能是问题吗?

PS:这是一个大学项目。我故意省略了实现的接口和请求处理方法的名称,并重命名了类,以防止这成为将来每个懒惰学生的首选解决方案。

服务器

public class Server extends Runnable {

  private final List<ClientHandler> activeHandlers = new ArrayList<>();
  private int port;
  private volatile boolean terminated = false;
  private ExecutorService service;


  @Override
  public void start(int port) throws ServerException {
      this.port = port;
      service = Executors.newCachedThreadPool();
      service.submit(this);
  }

  @Override
  public void shutdown() throws ServerException {
      System.out.println("Shutdown initiated.");
      this.terminated = true;
      PoolUtil.safeShutdown(service);
  }

  @Override
  public void run() {
      try (ServerSocket serverSocket = new ServerSocket(port)) {
          while (!terminated) {
              try {
                  Socket client = serverSocket.accept();
                  ClientHandler clientSocket = connect(client);
                  service.submit(clientSocket);
              } catch (IOException e) {
                  System.err.println("ERROR: Connection to client failed.");
              }
          }
      } catch (IOException e) {
          System.err.println("ERROR: Could not create a socket on port " + port);
      } finally {
          PoolUtil.safeShutdown(service);
      }
  }

  @Override
  public ClientHandler connect(Socket client) {
      ClientHandler clientHandler = new ClientHandler(client, this);
      activeHandlers.add(clientHandler);
      System.out.println("Registered new ClientHandler for " + client.getInetAddress().toString());
      return clientHandler;
  }

  @Override
  public void disconnect(ClientHandler clientHandler) {
      activeHandlers.remove(clientHandler);
      System.out.println("Client successfully disconnected.");
  }
}

客户端处理程序

ublic class ClientHandler extends Runnable {
  private final Socket client;
  private final DirectoryServer server;
  private boolean terminated;
  private final Result result = new Result();

  public ClientHandler(Socket client, DirectoryServer server) {
      this.client = client;
      this.server = server;
      terminated = false;
  }

  @Override
  public void run() {
      try (client;
           ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());
           ObjectInputStream ois = new ObjectInputStream(client.getInputStream())) {
          while (!terminated) {
              Object message = ois.readObject();
              if (message instanceof SomeRequest) {
                 // dostuff...
              } else if (message instanceof TerminateConnection) {
                  TerminateConnection termination = (TerminateConnection) message;
                  process(termination);
              } else {
                  System.err.println(
                          "ERROR: the received object of class "
                                  + message.getClass().toString()
                                  + "can not be processed."
                  );
              }
          }
      } catch (IOException e) {
          // FIXME: Error handling
          System.err.println("ERROR concerning client " + client.getInetAddress() + " -> " + e.getMessage());
      } catch (ClassNotFoundException e) {
          // FIXME: Error handling
          System.err.println("ERROR: the class of the received object unknown to server --> " + e.getMessage());
      }

  }


  @Override
  public void process(TerminateConnection terminateConnection) {
      this.terminated = true;
      server.disconnect(this);
  }
}

ServerMain

public class ServerMain {

    public static void main(String[] args) throws ServerException, IOException {
        Server server = new Server();
        server.start(1337);
        System.out.println("Server started. Press enter to terminate.");

        System.in.read();

        server.shutdown();
        System.out.println("Server is shut down...");
    }
}

PoolUtil.shutdown()

public static void safeShutdown(ExecutorService threadPool){
        threadPool.shutdown();
        try {
            // Waits a minute for all tasks to terminate
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                // Cancel all tasks that are still running after a minute
                threadPool.shutdownNow();
                // Waits another minute for all tasks to be cancelled
                if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Service did not terminate!");
                }
            }
        } catch (InterruptedException e) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

【问题讨论】:

  • shutdownNow()后面一定要awaitTermination()吗?我见过在shutdownNow() 之后没有awaitTermination() 而在shutdown() 之后没有shutdown() 的例子。
  • @AliasCartellano 这就是Oracle proposes to shutdown ExecutorServices 的方式。我想额外的awaitTermination() 是一种额外的线程安全层——尽管我也不明白它的必要性。

标签: java multithreading threadpool executorservice shutdown


【解决方案1】:

ExecutorService.shutdown() 方法的 JavaDoc 声明它意味着“之前提交的任务已执行,但不会接受新任务”。但是,在所有任务完成之前不会终止。您的任务的 Runnables 执行阻塞操作, serverSocket.accept() 因此,您不应期望 awaitTermination 方法在关闭后返回,直到关闭后有足够的请求进入以耗尽所有阻塞的任务。您可以尝试使用 shutdownNow() 而不是 shutdown() 以便它尝试立即取消/中断所有正在运行的任务,这有​​望解除它们的阻塞。

【讨论】:

  • 感谢您的提示。我将类更改为将 serverSocket 作为实例变量,这样我就可以从shutdown() 中调用它的close() 方法。这会从接受套接字触发IOException,我通过终止线程来捕获它。
【解决方案2】:

感谢@njr 我意识到serverSocket.accept() 是我问题的根源。

该方法正在阻塞并等待传入​​连接。为了能够终止它,我创建了一个实例变量 serverSocket,我可以通过在我的 shutdown() 方法中调用 serverSocket.close() 来关闭它。

这将导致serverSocket.accept() 抛出IOException - 所以我抓住它并调用Thread.currentThread().interrupt() 来关闭正在运行的线程。

下面是相关代码:

public class Server extends Runnable {

    private final List<ClientHandler> activeHandlers = new ArrayList<>();
    private ServerSocket newConnections;
    private volatile boolean terminated = false;
    private ExecutorService service;


    @Override
    public void start(int port) throws ServerException {
        try {
            this.newConnections = new ServerSocket(port);
            service = Executors.newCachedThreadPool();
            service.submit(this);
        } catch (IOException e) {
            throw new ServerException("Server can not be created at port " + port);
        }
    }

    @Override
    public void shutdown() throws ServerException {
        try {
            this.terminated = true;
            newConnections.close();
        } catch (IOException e) {
            throw new ServerException("Shut down failed - server socket can not be closed");
        } finally {
            PoolUtil.safeShutdown(service);
        }
    }

    @Override
    public void run() {
        try {
            while (!terminated) {
                try {
                    Socket client = newConnections.accept();
                    ClientHandler clientSocket = connect(client);
                    service.submit(clientSocket);
                } catch (IOException e) {
                    System.out.println("ServerSocket terminated");
                    Thread.currentThread().interrupt();
                }
            }
        } finally {
            PoolUtil.safeShutdown(service);
        }
    }

    // left out irrelevant methods

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-02-27
    • 2011-07-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-18
    • 1970-01-01
    相关资源
    最近更新 更多