【问题标题】:Telling a ThreadPoolExecutor when it should go ahead or not告诉 ThreadPoolExecutor 何时应该继续
【发布时间】:2012-08-13 04:14:43
【问题描述】:

我必须通过某个端口将一组文件发送到几台计算机。事实是,每次调用发送文件的方法时,都会计算目标数据(地址和端口)。因此,使用一个循环,为每个方法调用创建一个线程,并在方法调用周围加上一个try-catch语句来处理BindException的程序试图使用一个已经在使用的端口的情况(不同的目标地址可能通过同一端口接收消息)告诉线程等待几秒钟然后重新启动重试,并继续尝试直到不抛出异常(成功执行传送)。 我不知道为什么(尽管当我第一次看到它时我就猜到了),Netbeans 警告我在循环内休眠一个 Thread 对象不是最佳选择。然后我用谷歌搜索了更多信息,找到了this link to another * post, which looked so interesting(我从未听说过the ThreadPoolExecutor class)。我一直在阅读该链接和 API 以尝试改进我的程序,但我还不确定我应该如何在我的程序中应用它。请问有人可以帮忙吗?

编辑:重要代码:

        for (Iterator<String> it = ConnectionsPanel.list.getSelectedValuesList().iterator(); it.hasNext();) {
        final String x = it.next();
        new Thread() {

            @Override
            public void run() {
                ConnectionsPanel.singleAddVideos(x);
            }
        }.start();
    }

    private static void singleAddVideos(String connName) {
    String newVideosInfo = "";

    for (Iterator<Video> it = ConnectionsPanel.videosToSend.iterator(); it.hasNext();) {
        newVideosInfo = newVideosInfo.concat(it.next().toString());
    }

    try {
        MassiveDesktopClient.sendMessage("hi", connName);
        if (MassiveDesktopClient.receiveMessage(connName).matches("hello")) {
            MassiveDesktopClient.sendMessage(newVideosInfo, connName);
        }
    } catch (BindException ex) {
        MassiveDesktopClient.println("Attempted to use a port which is already being used. Waiting and retrying...", new Exception().getStackTrace()[0].getLineNumber());
        try {
            Thread.sleep(MassiveDesktopClient.PORT_BUSY_DELAY_SECONDS * 1000);
        } catch (InterruptedException ex1) {
            JOptionPane.showMessageDialog(null, ex1.toString(), "Error", JOptionPane.ERROR_MESSAGE);
        }
        ConnectionsPanel.singleAddVideos(connName);
        return;
    }

    for (Iterator<Video> it = ConnectionsPanel.videosToSend.iterator(); it.hasNext();) {
        try {
            MassiveDesktopClient.sendFile(it.next().getAttribute("name"), connName);
        } catch (BindException ex) {
            MassiveDesktopClient.println("Attempted to use a port which is already being used. Waiting and retrying...", new Exception().getStackTrace()[0].getLineNumber());
            try {
                Thread.sleep(MassiveDesktopClient.PORT_BUSY_DELAY_SECONDS * 1000);
            } catch (InterruptedException ex1) {
                JOptionPane.showMessageDialog(null, ex1.toString(), "Error", JOptionPane.ERROR_MESSAGE);
            }
            ConnectionsPanel.singleAddVideos(connName);
            return;
        }
    }
}

【问题讨论】:

  • 与其谈论它,你应该发布代码本身。
  • 如果您将用户名更改为真实的用户名,我会为您 +1,那么您将拥有 21 个代表,这应该足以访问 * 聊天(在评论链接旁边的顶部) .因为你问的不是问题。
  • 我认为更改名称的可能性需要最低声望,对不起老兄。 @MarkoTopolnik,添加了代码。感谢您的关注(尽管我更喜欢解释而不是代码)。
  • 你的问题还是很不清楚。一堵很难理解的文字墙和一段几乎没有说明的代码。仍然“不是一个真正的问题”。
  • 基本上,我的问题是如何告诉 tpe 不是在给定时间内运行任务,而是直到它们正确完成(这意味着它们不会抛出 BindException)。因为告诉构造函数每个线程的时间限制,即使设置为 Integer.MAX_VALUE,也会导致问题,因为几乎总是需要通过相同的端口将内容发送到两个或多个不同的目的地,如果要发送的内容包含许多文件,每次运输可能需要超过上述时间,因此将跳过通过这些端口的其他运输。

标签: java multithreading port threadpool threadpoolexecutor


【解决方案1】:

您的问题不是很清楚 - 我知道您想重新运行您的任务,直到它成功(没有 BindException)。为此,您可以:

  • 尝试在不捕获异常的情况下运行您的代码
  • 从未来捕获异常
  • 如果任务失败,请稍后重新安排任务

简化的代码如下 - 添加错误消息并根据需要进行优化:

public static void main(String[] args) throws Exception {
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize);
    final String x = "video";
    Callable<Void> yourTask = new Callable<Void>() {
        @Override
        public Void call() throws BindException {
            ConnectionsPanel.singleAddVideos(x);
            return null;
        }
    };
    Future f = scheduler.submit(yourTask);
    boolean added = false; //it will retry until success
                           //you might use an int instead to retry
                           //n times only and avoid the risk of infinite loop
    while (!added) {
        try {
            f.get();
            added = true; //added set to true if no exception caught
        } catch (ExecutionException e) {
            if (e.getCause() instanceof BindException) {
                scheduler.schedule(yourTask, 3, TimeUnit.SECONDS); //reschedule in 3 seconds
            } else {
                //another exception was thrown => handle it
            }
        }
    }
}

public static class ConnectionsPanel {

    private static void singleAddVideos(String connName) throws BindException {
        String newVideosInfo = "";

        for (Iterator<Video> it = ConnectionsPanel.videosToSend.iterator(); it.hasNext();) {
            newVideosInfo = newVideosInfo.concat(it.next().toString());
        }

        MassiveDesktopClient.sendMessage("hi", connName);
        if (MassiveDesktopClient.receiveMessage(connName).matches("hello")) {
            MassiveDesktopClient.sendMessage(newVideosInfo, connName);
        }

        for (Iterator<Video> it = ConnectionsPanel.videosToSend.iterator(); it.hasNext();) {
            MassiveDesktopClient.sendFile(it.next().getAttribute("name"), connName);
        }
    }
}

【讨论】:

  • 感谢您的帮助,但这并没有改善我的工作方式。在这种情况下,如果抛出 BindException,程序所做的就是等待并重复(与我的程序相同)。我希望通过这个新课程实现的目标(对不起,如果不是 100% 清楚,我的错)类似于:“我是队列中的一个线程,必须使用端口 33333,但是当我尝试使用它,我得到了一个 BindException,因为另一个人正在使用它,所以让我们等到我收到警告说端口是空闲的,然后我可能会重试(一次又一次,直到我开始绑定端口)。”
  • @Stoyicker 所以不同之处在于,不是每 x 秒重试一次直到它工作,你想使用某种监听器/回调,让你知道端口何时准备好使用并运行发生这种情况?
  • 是的,因为在这种情况下,我将能够最大限度地减少任务必须在队列中的时间,但同时确保没有任务在没有成功完成的情况下被丢弃.
  • 你可以使用 CompletionService - 检查this similar question
  • 很高兴了解该课程,谢谢。链接的问题和this link 让我解决了我的问题。