【问题标题】:ThreadPoolExecutor.shutdownNow() not throwing InterruptedException in ThreadThreadPoolExecutor.shutdownNow() 不在线程中抛出 InterruptedException
【发布时间】:2021-12-21 10:10:49
【问题描述】:

我正在实现一个传输服务器程序,它从客户端(通过控制台输入)获取消息,然后将其转发到某种邮箱。

为了允许不同客户端同时接收多个消息,我首先创建了一个实现Runnable 接口的类。每个此类实例都将处理与一个客户端的通信:

public class ClientConnection implements Runnable {

    //...

    //...

    @Override
    public void run() {
        try {
            // prepare the input reader and output writer
            BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);

            Message message = new Message();
            String request = "";

            // read client requests
            while ((request = reader.readLine()) != null) {

                System.out.println("Client sent the following request: " + request);
                String response;
                if (request.trim().equals("quit")) {
                    writer.println("ok bye");
                    return;
                }

                response = message.parseRequest(request);
                if (message.isCompleted()) {
                    messagesQueue.put(message);
                    message = new Message();
                }
                writer.println(response);
            }

        } catch (SocketException e) {
            System.out.println("ClientConnection: SocketException while handling socket: " + e.getMessage());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        } catch (InterruptedException e) {
            System.out.println("Client Connection was interrupted!");
            e.printStackTrace();
        } finally {
            if (clientSocket != null && !clientSocket.isClosed()) {
                try {
                    clientSocket.close();
                } catch (IOException ignored) {}
            }

        }

    }
}

我确实有一个父线程负责启动和管理所有 ClientConnection 可运行对象:

@Override
public void run() {

    clientConnectionExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    while (true) {

        Socket clientSocket;

        try {
            // wait for a Client to connect
            clientSocket = serverSocket.accept();

            ClientConnection clientConnection = new ClientConnection(clientSocket, messagesQueue);
            clientConnectionExecutor.execute(clientConnection);

        } catch (IOException e) {
            // when this exception occurs, it means that we want to shut down everything
            clientConnectionExecutor.shutdownNow();  // force terminate all ClientConnections
            return;
        }
    }
}

现在根据this Stackoverflow Question,我预计一旦调用shutdownNow();,我的ClientConnection.run() 方法中就会抛出一个InterruptedException,在那里,它应该打印Client Connection was interrupted!。但这并没有发生,因此似乎永远无法到达 catch 子句,输入读取循环只是继续。

我在另一个 Stackoverflow 问题中读到,这可能与块中的其他一些代码行有关,似乎正在使用 InterruptedException,但没有任何关于代码行可以做到这一点的特定信息。所以我很感谢任何提示。

编辑:事实证明,只要我通过在客户端上键入“quit”手动退出循环,循环就会退出,然后,Client Connection was interrupted! 将被打印出来。所以不知何故,只要循环正在运行,异常似乎就会被忽略,并且只在之后处理。

【问题讨论】:

    标签: java try-catch threadpoolexecutor java-threads interrupted-exception


    【解决方案1】:

    来自shutdownNow 的 Oracle 文档:

    除了尽力停止处理正在执行的任务之外,没有任何保证。例如,典型的实现将通过 Thread.interrupt() 取消,因此任何未能响应中断的任务都可能永远不会终止。

    如果您查看ThreadPoolExecutor 源代码,您会发现shutdownNow 使用以下代码中断线程:

            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
    

    您的ClientConnection 不检查标志Thread.interrupted。由于帖子中的信息,我无法弄清楚哪个方法会抛出InterruptedException。可能,其他一些方法,例如读取器或写入器的readLine,会阻塞线程,因为它们使用套接字的InputStreamOutputStream,并且很明显,如果数据不是立即可用的,套接字的流会阻塞线程。

    例如,我写了这段代码来测试它:

    class Example {
        public static void main(String[] args) {
            Thread thread = new Thread(() -> {
                try(ServerSocket serverSocket = new ServerSocket()) {
                    serverSocket.bind(new InetSocketAddress(8080));
                    Socket socket = serverSocket.accept();
                    int dataByte = socket.getInputStream().read();
                    System.out.println(dataByte);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            thread.start();
            thread.interrupt();
        }
    }
    

    在 OpenJdk-16.0.2 上没有实际中断。

    对于您的问题,我看到了两种可能的解决方案:

    1. 如果您确定Socket 不会阻塞您的线程,请检查while 循环内的Thread.interrupted

    2. 如果您不确定,请在非阻塞模式下使用SocketChannel 而不是Socket 手动检查Thread.interrupted

    对于第二种方式,我将示例转换为:

    class Example {
        public static void main(String[] args) {
            Thread thread = new Thread(() -> {
                try(ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
                    serverSocket.configureBlocking(false);
                    serverSocket.bind(new InetSocketAddress(8080));
    
                    SocketChannel socket = null;
    
                    while (socket == null) {
                        socket = serverSocket.accept();
    
                        if (Thread.interrupted()) {
                            throw new InterruptedException();
                        }
                    }
    
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    socket.read(byteBuffer);
                    byte[] bytes = new byte[byteBuffer.limit()];
                    byteBuffer.flip();
                    byteBuffer.get(bytes);
                    System.out.println(new String(bytes, StandardCharsets.UTF_8));
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    System.out.println("Interrupted successfully");
                }
            });
            thread.start();
            thread.interrupt();
        }
    }
    

    有效。

    祝 Java 好运 :)

    【讨论】:

    • 感谢您的精彩解释,我现在确实确定 readline() 正在阻止一切。我不知道Java中实际上存在无法中断的阻塞函数。所以我按照你的方法在while循环条件下检查Thread.interrupted标志,其中一个缺陷是客户端似乎有必要再按一次回车键才能真正退出客户端。所以循环条件评估为假为时已晚。我将进一步研究SocketChannel 课程,看看这是否符合我的需求。
    • @BenjyTec 听起来线程在readLine() 被额外阻塞了一次。所以,正如我在回答中所说,根据我的小研究,Socket 无法阻止这种行为。此外,建议您对SocketSocketChannel 的读写操作要小心,因为这不是一种有效的方法。问题是您实际上需要一个线程来操作连接。 IO 操作可能比业务逻辑操作慢很多倍。我的建议是在单独的线程中读写。
    • 一般是SocketChannel。我强烈建议您使用SocketChannel,因为它可以在阻塞和非阻塞模式下运行。因此,SocketChannelServerSocketChannel 始终可以替换代码中旧的 SocketServerSocket。此外,您可以将ByteBuffer 与它们一起使用,这使您可以轻松地为内部缓冲区实现任何功能。如果您不熟悉ByteBuffer,请查看它。
    【解决方案2】:

    我原以为只要 shutdownNow();正在调用,我的 ClientConnection.run() 中会抛出一个 InterruptedException

    您的messagesQueue 应该是BlockingQueue。所以messagesQueue.put(message) 会让你需要捕获Interrupted 异常。所以只有在put方法中线程被阻塞(queue已满),你调用threadpool#shutdownNow,那么线程才会收到Interrupted异常。在其他情况下,线程不会收到此Interrupted 异常。

    您可以将while ((request = reader.readLine()) != null) 更改为while ((request = reader.readLine()) != null && !Thread.interrupted())

    另一种解决方案是维护所有客户端套接字,并在需要关闭它们时关闭所有客户端套接字,这样客户端线程将直接收到IOException

            List<Socket> clientSockets = new ArrayList<>();
            while (true) {
                try {
                    Socket accept = serverSocket.accept();
                    clientSockets.add(accept);
                    executorService.submit(new ClientConnection(accept));
                }catch (Exception e) {
                    for (Socket socket : clientSockets) {
                        try {
                            socket.close();
                        } catch (Exception exception) {
                            //
                        }
                    }
                    //executorService.shutdownNow();
                }
            }
    

    【讨论】:

    • 谢谢,您的第二种方法看起来很有希望。不过有一个问题,clientSockets 列表会随着时间的推移而膨胀,不是吗?因为它将包含几个小时前已经断开连接的客户端的套接字。
    • 是的,你应该有一个特殊的类(也许叫它ClientManager..)来维护所有客户端套接字,当客户端关闭时删除相应的套接字。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-19
    • 2017-04-21
    • 1970-01-01
    • 1970-01-01
    • 2020-09-27
    • 1970-01-01
    相关资源
    最近更新 更多