【问题标题】:Threadpool with persistent worker instances具有持久工作实例的线程池
【发布时间】:2013-10-26 04:27:41
【问题描述】:

我正在尝试将线程池中的任务排入队列,以便在工作人员空闲时立即执行,我发现了各种示例,但在所有情况下,这些示例都已设置为为每个工作人员使用一个新的 Worker 实例工作,我想要持久的工人。

我正在尝试制作一个 ftp 备份工具,我可以使用它,但由于单一连接的限制,它速度很慢。我理想中想要做的是有一个单一的连接来扫描目录并建立一个文件列表,然后四个工作人员来下载所述文件。

这是我的 FTP 工作者的示例:

public class Worker implements Runnable {
  protected FTPClient _ftp;

  // Connection details
  protected String _host = "";
  protected String _user = "";
  protected String _pass = "";

  // worker status
  protected boolean _working = false;

  public Worker(String host, String user, String pass) {
    this._host = host;
    this._user = user;
    this._pass = pass;
  }

   // Check if the worker is in use
  public boolean inUse() {
    return this._working;
  }

  @Override
  public void run() {
    this._ftp = new FTPClient();
    this._connect();
  }

  // Download a file from the ftp server
  public boolean download(String base, String path, String file) {
    this._working   = true;
    boolean outcome = true;

    //create directory if not exists
    File pathDir = new File(base + path);
    if (!pathDir.exists()) {
      pathDir.mkdirs();
    }

    //download file
    try {
      OutputStream output = new FileOutputStream(base + path + file);
      this._ftp.retrieveFile(file, output);
      output.close();
    } catch (Exception e) {
      outcome = false;
    } finally {
      this._working = false;
      return outcome;
    }
  }

  // Connect to the server
  protected boolean _connect() {
    try {
      this._ftp.connect(this._host);
      this._ftp.login(this._user, this._pass);
    } catch (Exception e) {
      return false;
    }
    return this._ftp.isConnected();
  }

  // Disconnect from the server
  protected void _disconnect() {
    try {
      this._ftp.disconnect();
    } catch (Exception e) { /* do nothing */ }
  }
}

我希望能够在工作人员可用时为队列中的每个任务调用Worker.download(...),而不必为每次下载创建与 ftp 服务器的新连接。

任何帮助将不胜感激,因为我以前从未使用过线程,而我现在正在兜圈子。

【问题讨论】:

  • 为什么没有连接池?这样,工作人员就不会链接到连接,他们从池中检查连接并使用它们返回它们。这是使用稀缺外部资源进行编程的常用方法,因为工作人员在整个期间不需要连接,您可以拥有比连接更多的工作人员......
  • 也许你想使用Java's ExecutorService
  • @BoristheSpider 您应该根据您的评论做出回答。我认为OP必须结合ThreadPool和ConnectionPool的概念。
  • @Matt 请记住这是 Java。请使用 java 命名约定 - 无需以下划线开头。

标签: java multithreading ftp threadpool threadpoolexecutor


【解决方案1】:

示例已设置为对每个作业使用新的 Worker 实例,我想要持久的工人。

这是一个常见问题,有几种不同的解决方案。您想要的是每个线程的一些上下文,而不是每个RunnableCallable 将提交给ExecutorService

一种选择是拥有一个ThreadLocal,它将创建您的ftp 实例。这不是最优的,因为当线程终止时没有简单的方法来关闭 ftp 连接。然后,您可以通过限制线程池中运行的线程数来限制连接数。

我认为更好的解决方案是仅使用 ExecutorService 来分叉您的工作线程。对于每个工人,向他们注入一个BlockingQueue,他们都用它来出列并执行他们需要做的任务。这与ExecutorService 内部使用的队列是分开的。然后,您会将任务添加到 您的 队列,而不是添加到 ExecutorService 本身。

private static final BlockingQueue<FtpTask> taskQueue
        = new ArrayBlockingQueue<FtpTask>();

所以你的任务对象会是这样的:

public static class FtpTask {
     String base;
     String path;
     String file;
}

然后,Worker 类中的 run() 方法将执行以下操作:

public void run() {
    // make our permanent ftp instance
    this._ftp = new FTPClient();
    // connect it for the life of this thread
    this._connect();
    try {
        // loop getting tasks until we are interrupted
        // could also use volatile boolean !shutdown
        while (!Thread.currentThread().isInterrupted()) {
            FtpTask task = taskQueue.take();
            // if you are using a poison pill
            if (task == SHUTDOWN_TASK) {
                break;
            }
            // do the download here
            download(task.base, task.path, task.file);
        }
    } finally {
        this._disconnect();
    }
}

同样,您可以通过限制线程池中运行的线程数来限制连接数。

我最理想的做法是使用一个连接来扫描目录并建立一个文件列表,然后由四个工作人员下载所述文件。

我将有一个Executors.newFixedThreadPool(5); 并添加一个执行扫描/构建的线程和 4 个正在执行下载的工作线程。扫描线程将被放入BlockingQueue,而工作线程正在从同一个队列中获取。

【讨论】:

  • 我在两个答案中都缺少的是 OP 对连接数量有限的要求,Boris the Spider 在他的评论中提到了这一点。
  • 有限的连接数来自他 fork 的Worker 线程数。我添加了一个部分来解决@Fildor。
  • 我不得不做一些背景阅读来理解它,但我或多或少地将你的方法侵入了我的代码,它就像一个梦一样工作!现在制作一个不会让眼睛流泪的干净版本!谢谢
【解决方案2】:

我建议根据要求选择具有核心大小和 maxpoolsize 的 ThreadPoolexecutor。在这种情况下,还要使用链接阻塞队列,它会以先进先出的方式在其中执行您的任务。

一旦线程(worker) 空闲,就会从队列中挑选任务并执行。

查看 ThreadPoolExecutor 的详细信息。如果您在执行 ThreadPoolexecutor 时遇到任何问题,请告诉我。

【讨论】:

  • 同意,ThreadPoolExecutor 是正确的方法。本教程是一个很好的起点。 vogella.com/articles/JavaConcurrency/article.html
  • 如果您觉得以上帖子有用,请投票/选择作为答案
  • 问题是提交给TPE的每个任务都会建立一个新的连接,对吧?比如 OP 怎么能用一个连接进行 100 次下载?
猜你喜欢
  • 1970-01-01
  • 2010-11-05
  • 2018-01-20
  • 2021-08-13
  • 1970-01-01
  • 2020-09-10
  • 2018-05-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多