【问题标题】:Java threading queue with thread pool带线程池的 Java 线程队列
【发布时间】:2013-10-10 04:41:52
【问题描述】:

我制作了一个程序,它使用单线程将文件中的数据读取到链表中,我们称之为 LL1。由此我创建了一个线程池,它为每个线程分配一个处理任务,该任务从 LL1 读取数据并将其计算输出到一个新的链表。 由此我需要将每个线程的新链表输出到一个文件中。我试图在顺序块中输出每个链表,这样线程就不会混合数据,所以我使用了如下同步点:

public synchronized void appendContents(List<Vector2> output1) {
    try {
        sFileName = outFilePath + "\\file" +fileCount+ ".cntr";
        File oFile = new File(sFileName);
        if (!oFile.exists()) {
            oFile.createNewFile();
        }
        if (oFile.canWrite()) {
            //BufferedWriter oWriter = new BufferedWriter(new FileWriter(sFileName, true));
             FileWriter wstream = new FileWriter(oFile, true);
             BufferedWriter outWriter = new BufferedWriter(wstream);
             for(int i = 0; i < output1.size(); i++)
             {
                //replace the space marker values with a newline
                if(output1.get(i).y == -200.0){
                outWriter.newLine();
                }else{
                outWriter.write(String.valueOf(output1.get(i).x) + " " + String.valueOf(output1.get(i).y) + " " + String.valueOf(interval));
                outWriter.newLine();    
                }
             }           
             outWriter.close();
        }
    }
    catch (IOException oException) {
        throw new IllegalArgumentException("Error appending/File cannot be written: \n" + sFileName);
    }

我面临的问题是数据没有按我需要的顺序输出,即

list1 value                              list1 value
list1 value         _______________\     list2 value
list1 value         ________________\    list1 value 
list2 value         RATHER THAN ____/    list3 value
list2 value         ---------------/     list2 value
list2 value                              list1 value
list3 value                              list2 value
list3 value                              list1 value
list3 value                              list3 value
list3 value                              list3 value

如果有人可以让我朝着正确的方向迈出一步,我将不胜感激。 谢谢,

杰克

【问题讨论】:

  • 向我们展示你是如何开始你的话题的。还要说明您得到的结果和预期的结果。
  • 也许这个example 会有所帮助
  • 您需要同步访问的资源是什么?似乎很可能是List&lt;Vector2&gt;。您可能应该锁定它,而不是使用 appendContents() 所属对象的隐式锁定。如前所述,在高度并发的环境中,您的 appendContents() 方法可能会成为瓶颈并限制可扩展性。

标签: java multithreading input linked-list output


【解决方案1】:

synchronized 的目的是在共享资源上进行同步,以便一次只有一个Thread 可以访问关键部分。我将假设您正在生成三个Thread 实例,每个实例在它们自己的对象上调用appendContents

synchronized 方法在 this 上隐式同步,但由于所有三个 Threads 都在不同的对象上同步,即。一个不同的this,没有什么能阻止它们。

【讨论】:

    【解决方案2】:

    据我了解,您每次都为每个列表元素运行新任务?

    然后您只需编写可调用任务 -> 将结果保存在 Feature.Put Feature 中的 List vs result(resultFeatureFromList)。最后做一些这样的事情: 我使用来自 Guava Lib 的函数;

    Iterables.transform(resultList<Feature>,new Function(){
       public resultComputition apply(Feature resultFeatureFromList){
                   return resultFeatureFromList.get();
       }
    });
    

    因此,总而言之,您将按正确的顺序运行所有任务。在拉完所有之后,只需等待结果。

    【讨论】:

      【解决方案3】:

      user2870704's answer 开始,您可以按如下方式构建您的应用程序:

      • 让线程读取文件内容;
      • 对于文件中的每一项,向您的线程池提交一个Callable 任务;
      • 将返回的Future 存储在一个列表或列表列表中 - 您可以定义所需的粒度;
      • 打开输出文件,在同一个线程中,遍历结果列表进行写入。

      举个例子:

      void readAndOutput(String inputFilePath, String outputFilePath) {
          List<List<Future<Result>>> results = readAndSpawnTask(inputFilePath);
      
          PrintWriter out = new PrintWriter(new File(outputFilePath));
      
          for (List<Future<Result>> block : results) {
              for (Future<Result> r : block) {
                  out.println(r.get().toString());
              }
          }
      
          out.flush();
          out.close();
      }
      
      
      List<List<Future<Result>>> readAndSpawnTask(String path) {
          List<List<Future<Result>>> results = new ArrayList<>(numOfBlocks);
          BufferedReader in = new BufferedReader(new FileReader(new File(path)));
      
          for (int i = 0; i < numOfBlocks; ++i) {
              results.add(new LinkedList<Future<Result>>());
          }
      
          for (String line = in.readLine(); line != null; line = in.readLine()) {
              int respectiveBlock;
              Callable<Result> task;
              // Process line and convert it into a task of your own.
              // Determine in which block the result goes into.
              Future<Result> r = threadPool.submit(task);
              results.get(respectiveBlock).add(r);
          }
      
          in.close();
      
          return results;
      }
      

      如果您想要和/或需要并发性,想法是在单个线程中访问文件。使用Future 的列表,您可以保证以正确的顺序写入结果,并且您的主线程将阻塞,直到所需的结果准备好。

      当然,你还是要考虑上面代码中可能抛出的异常。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-03-25
        • 2023-01-29
        • 1970-01-01
        • 1970-01-01
        • 2013-04-19
        • 2014-10-12
        相关资源
        最近更新 更多