【问题标题】:main thread consumer and other threads producer主线程消费者和其他线程生产者
【发布时间】:2018-03-08 04:12:17
【问题描述】:

我的问题是,我有一个包含 1000 条记录的数据集。我想要 3 个线程来处理这样的数据, thread1 从记录 1 到 300,thread2 从 301 到 600,依此类推。一个线程可以一次发出请求并获取 50 条记录,创建一个对象并将其放入队列中。 主线程会同时从队列中读取数据。

下面是代码,我面临的问题是recordRead变量告诉线程应该从哪里开始读取记录的起点。 但是我如何为每个线程设置不同的值,例如线程1应该是0,recordsToRead应该是300,对于thread2,recordRead应该是300,recordRead应该是300+300=600,最后一个线程应该是600,直到结尾。 页面大小=50 pagesize、recordRead和recordToRead都是属于主类和主线程的变量。

    ExecutorService service = Executors.newFixedThreadPool(nThreads);
    while(nThreads > 0) {
        nThreads--;
        service.execute(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub

                do {
                    int respCode = 0;
                    int RecordsToRead = div;
                    JSONObject jsObj = new JSONObject();
                    jsObj.put("pagesize", pageSize);
                    jsObj.put("start", recordsRead);
                    jsObj.put("searchinternalid", searchInternalId);

                    try {
                        boolean status = req.invoke(jsObj); 
                        respCode = req.getResponseCode();

                    } catch (Exception e) {         
                        req.reset();
                        e.printStackTrace();
                        return true;
                    }
                    JSONObject jsResp = req.getResponseJson();
                    //here jsResp will be added to ArrayBlockingQueue.

                    req.reset();
                }while(!isError && !isMaxLimit && recordsRead < RecordsToRead);

            }

        });
    }

在这个循环之后将是主线程读取队列的代码。 如何为所有线程设置 recordsRead 和 recordToread。

以及如何让主线程等待,直到至少一个线程在队列中插入一个对象。

【问题讨论】:

  • 您可以创建 Runnable 的子类,并将起始位置(以及其他任何内容)作为类 ctor 参数。

标签: java multithreading synchronization


【解决方案1】:

我在你的定义中看到了两个问题。第一个问题是执行并行块计算,第二个问题是从中创建一个连续的管道。让我们从第一个问题开始。要进行预定义大小的并行计算,fmpv 的最佳选择是使用 fork-join 框架。不仅因为性能(工作窃取非常有效),而且还因为更简单的代码。但是由于对我来说您仅限于 3 个线程,因此直接使用线程似乎也是有效的。我可以通过这种方式简单地实现您想要的:

    final int chunkSize = 300;
    //you can also use total amount of job
    //int totalWork = 1000 and chunk size equals totalWork/threadsNumber
    final int threadsNumber = 3;

    Thread[] threads = new Thread[threadsNumber];

    for (int ii = 0; ii < threadsNumber; ii++) {
        final int i = ii;

        threads[ii] = new Thread(() -> {
           //count your variable according the volume
            // for example you can do so
            int chunkStart = i * chunkSize; 
            int chunkEnd = chunkStart + chunkSize;
            for(int j = chunkStart; j < chunkEnd; j++) {
              //object creation with necessary proprs
              //offer to queue here
            }
        });

        threads[ii].start();
    }

    //your code here
    //take here

    for (int ii = 0; ii < threadsNumber; ii++) {
        try {
         //this part is only as example
         //you do not need it                
         //here if you want you can also w8 for completion of all threads
            threads[ii].join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

现在关于消费的第二个问题。对于这个 puprose,您可以使用例如 ConcurrentLinkedBlockingQueue (http://www.jgroups.org/javadoc/org/jgroups/util/ConcurrentLinkedBlockingQueue.html)。在生产者线程中提供offer,在main中使用take方法。

但老实说,我仍然没有得到你的问题的原因。您想创建连续的管道还是只是一次性计算?

另外我会推荐你​​参加这门课程:https://www.coursera.org/learn/parallel-programming-in-java/home/welcome。 这将帮助您准确解决问题并提供各种解决方案。还有并发和分布式计算课程。

【讨论】:

    猜你喜欢
    • 2015-01-23
    • 1970-01-01
    • 1970-01-01
    • 2018-09-24
    • 2017-02-01
    • 2012-04-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多