【问题标题】:Java - how to implement threads on a process that waits for data [closed]Java - 如何在等待数据的进程上实现线程[关闭]
【发布时间】:2012-11-29 11:36:12
【问题描述】:

我是多线程编程的新手,我希望能够深入了解实现以下想法的最佳方法。

现在我的代码是这样工作的

它是单线程的,所以在处理每条数据并将其写入数据库所花费的时间里,新数据进来并排队,这太慢了。我在一个 4CPU 的服务器上运行,但是当前的设置只使用了 1 个。

我想将中间部分完成的工作与其余 3 个 CPU 分开。我将如何最好地做到这一点?我以为我可以为每条新数据创建一个新线程,但我们在一天的时间里谈论了数十万个新线程。根据我的阅读,与此相关的开销将非常大。内存对我来说是一个问题,所以如果创建所有这些线程会占用太多内存,我就会遇到麻烦。一个新线程是否会使用较不繁忙的 CPU,或者它会使用相同的 CPU,因为它是同一个 JVM?

每条新数据的处理和数据库写入不应超过几秒钟,如果那样的话。

我也在阅读有关线程池的文章,但这个想法让我有点困惑,我找不到一个很好的例子。

我在想这样的事情

请帮助多线程新手提出合理的设计!在此先感谢:-)

【问题讨论】:

  • 这个问题对于 StackOverflow 来说可能有点过于开放了。您可能想在programmers.stackexchange.com 发帖。不过图片不错。
  • 我会花时间阅读Java Concurrency,特别注意Executors 部分。虽然我同意邓肯的观点,但请为一个深思熟虑的问题 +1
  • 我认为你的最后一张图片基本上是准确的。但是,您可能希望将处理过的数据传递给将它们写入数据库的其他线程,以便能够分别调整处理步骤和写入步骤的线程数。
  • 听起来是一个可以通过使用生产者和消费者来解决的问题。此外,您可能希望使用某种与数据库的连接池来正确处理与数据库的连接。永远不要直接使用线程,有时最好在同一个线程中执行某些东西;创建线程的成本相对较高。
  • @Duncan - 谢谢,我以前没见过程序员 stackexchange。会在那里发帖。 Kristoffer 分离出数据库部分可能是有意义的,但它基本上将我引向同一个问题,即如何让线程等待某些操作。 MadProgrammer - 我会研究 Executors 类,也许这就是我需要的

标签: java multithreading threadpool


【解决方案1】:

更重要的一点是,有多少线程在并行工作(因此可能会杀死一台机器)。如果您一个接一个地创建一个 Thread 对象,它可以更有效地完成,但一般来说,这样做的成本(可能)可以忽略不计(正如 Michal 指出的那样)。撇开这些不谈(假设您想了解多线程),您的设计听起来已经足够合理了。现在去看看java.util.concurrent可以为您提供哪些工具来实现它:

  • ExecutorService: 会是最好的选择。创建一个由n 工作线程组成的固定线程池,然后,对于每个传入线程,发布一个Runnable 进行处理并将所有数据存储到数据库中。

    public class DataProcessor {
        final ExecutorService workerThreadPool = Executors.newFixedThreadPool(5);
    
        public void onNewDataFromTheOutsideWorld(Data d) { 
           workerThreadPool.execute(new ProcessingAndStoreToDBRunnable(d));
        }
    
        public void onShutdown() { 
           workerThreadPool.shutdown();
        }
    }
    

    ExecutorService 将确保只有固定数量的工作人员实际并行运行,

  • 自己的排队机制:当涉及到具有不同优先级的作业时,您可能希望实现自己的工作机制。请注意,这要复杂得多,如果可能,您可能应该坚持使用ExecutorService 解决方案。

    基本思想是有一个 BlockingQueue 向其中添加数据并启动您从队列中读取作业的 n 工作线程。诀窍是,如果没有作业,队列将阻塞(从而使线程进入睡眠状态),如果有超过n 个作业,则作业将存储在队列中,直到有处理线程可用。

    public class DataProcessor {
        final BlockingQueue<Data> queue = new BlockingQueue<Data>();
    
        public void onInit() {
           for (int i = 0; i < n; i++) 
               new Thread(new WorkerRunnable(queue)).start();
        }
    
        public void onNewDataFromTheOutsideWorld(Data d) { 
           queue.add(d);
        }
    }
    
    public class WorkerRunnable implements Runnable {
        public void run() { 
           while (true) {
               Data d = queue.take();
               processData(d);
           }
        }
     }
    

    正如我所说,这要实现起来要复杂得多,因为我还没有触及像

    这样的问题
    • 停止工作线程
    • 确保处理异常,然后返回处理
    • 等。

这些只是多线程环境中的基本(但非常强大)工具。如果您需要更高级的工具,请查看 Guava 库,例如使用 ListenableFuture 的精彩概念(如果您需要工作线程的结果,您应该使用它。)

然后您将拥有一个相当基本的设计,您可以从中添加一些更复杂的处理步骤,正如您的 cmets 中已经指出的那样。还有人指出,这会变成一个相当广泛的问题;)

【讨论】:

  • 请避免使用 while (true) 位。这是表现最差的违规者之一。这实际上是一个教科书案例。这里的问题是,如果数据来自外部世界时有延迟,这并不意味着线程将什么都不做。它实际上会消耗操作系统分配的任何 cpu 周期,只检查“while(true)”是否真的是真的……因为它总是如此,它只是浪费周期,因此,如果你这样做,你甚至可以得到性能更差。解决这个问题的经典方法是使用等待和通知,或者在本例中为 notifyAll。
  • Cont'd... 所以基本上你会做类似“if (!queue.isEmpty()) queue.take()”然后“else wait()”这样的事情,如果你想队列中的一个对象是空的,线程只是无限期地进入睡眠状态。更准确地说,直到另一个线程执行 notify 或 notifyAll。在这种情况下,将数据放入队列的对象将执行“queue.put(data); notifyAll();”这可以防止上述情况,称为“忙等待”。所以,请避免这种“while(true)”作为瘟疫!您可以在任何 CS 书籍中确认这一点,您会看到更好的解释。
  • 感谢您的评论,但take()wait() 直到队列中有新项目可用。 while(true)(我同意你的观点,一般避免它!)在这里说明工作线程必须在处理完队列中的一条消息后重新开始在队列上工作。
  • @Acapulco 我不明白你为什么认为while(true) 是一个问题:queue.take(); 是一个阻塞调用......而且这个答案中建议的方法比使用 wait/ 要好得多按照您的建议通知。
  • @Marianna 没错。 ExecutorService 将确保只有n 线程并行运行。所有超出的任务都排队并一个接一个地处理。多少线程是并行运行的好数字取决于 CPU 的数量以及它们是更多的 I/O 绑定还是计算任务。例如。如果一个处理需要一个 CPU 30 秒达到 100%(无 I/O),那么让更多线程运行而不是可用 CPU 是没有意义的。另一方面,线程只依赖于 I/O,你可能会运行更多。
【解决方案2】:

首先,您应该考虑性能在此应用程序中的重要性以及您需要处理的流量类型。如果您不太关心为每个请求添加 0.1 毫秒的延迟(如果您说每个请求需要几秒钟,我认为您不会这样做),那么创建一个新线程将不会是一个明显的成本。请注意,您的线程应该在完成工作后结束生命,因此您不会同时拥有数十万个线程 - 它们将随着时间的推移而启动和结束。如果您每天收到“几十万”个请求,那么这只是每秒几个请求(假设它们被平均分配)。使用这些参数,您的平均活动请求数将大约为几十个(大约每秒 10 次乘以每个请求几秒钟 ~= 随时有几十个请求)。这比您机器上的内核数量多,但应该毫无问题地处理 - 如果这些线程与数据库通信,它们将花费大部分时间等待通信链接。虽然为每个请求设置一个单独的线程通常可能不是最好的设计,但它可能比学习 Futures 和 Executors 更容易实现。

因此,这两种解决方案都有其优点 - 更好的设计和可能更好的资源使用的未来(尽管这可能取决于您安排它们的程度)和每个请求的线程以使某些东西快速工作(并且很容易能够理解正在发生的事情在系统内部)。如果您现在只学习并发,我实际上建议您先以不太优雅的方式进行学习,以便了解系统在幕后需要做什么。然后,当您熟悉那种“手动”调度方法时,您可以进入更高级别的抽象,学习 Futures 等,并重构您的代码。如果您立即开始使用 Futures,那么第二个版本可能会比您能够编写的代码要好得多。

【讨论】:

  • 那么,通过使用多个线程,操作系统会自动将工作委派给最不忙的 CPU,这是一个公平的假设吗?谢谢您的意见。你是正确的,线程将是短暂的,所以我可能一次最多只有 50 个活跃,这似乎并不可怕。我不太关心 100 毫秒的延迟,只要它保持这种状态。现在延迟已经失控,因为在一天中它会慢慢上升,直到达到几分钟的水平。
  • @Marianna 通常操作系统非常擅长调度,所以在您的设置中,我认为它可以在 4 个内核上处理 50 或 100 个线程而没有太多麻烦。这些线程将花费大部分时间等待数据库,而不是执行计算,因此拥有许多线程是非常安全的。如果数据库可以处理那么多并行请求,我实际上会更担心。顺便说一句,我所说的延迟大约是 0.1 毫秒,而不是 100 毫秒。
猜你喜欢
  • 2010-11-23
  • 2023-03-08
  • 2015-08-01
  • 2020-04-01
  • 1970-01-01
  • 2018-04-12
  • 1970-01-01
  • 2013-12-02
  • 1970-01-01
相关资源
最近更新 更多