【问题标题】:Producer-Consumer multi threads syncing problems生产者-消费者多线程同步问题
【发布时间】:2014-06-18 09:50:36
【问题描述】:

大家好(第一篇)!

首先让我说我对 Java 编程还是比较陌生(C++ 是我的强项)

如果这个问题已经得到解答,请指出我正确的方向,但我的搜索技能还没有产生我正在寻找的东西。我遇到的问题是在我的消费者线程之间同步,而不是在我的生产者/消费者之间同步

我的任务是从生产者线程中寻找素数候选者,而不是使用多个消费者线程来检查每个候选者的有效性:

  • 使用多个线程,1 个生产者线程将“候选人”放入名为 prime_candidates 的数组列表(必须是数组列表)中,消费者从数组列表中抓取并检查

  • 消费者检查是否是素数,然后将素数放入自己的arraylist prime_list

  • 60 秒后 prime_list 被写入文件并继续

我已成功将 prime_candidates 与消费者(检查)线程同步。但是我一辈子都无法弄清楚如何同步消费者线程产生的 ArrayLists。他们都自己制作,并且使用我从生产者/消费者那里使用的同步方法不起作用。

生产者线程

import java.awt.Color;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Date;
import javax.swing.BorderFactory;
import javax.swing.JButton;
import javax.swing.JLabel;
import javax.swing.JPanel;


public class GenerateThread implements Runnable{

// MAIN ARRAY LIST THAT STORES THE NUMBERS WHICH COULD BE CONSIDERED PRIME CANDIDATES
private final ArrayList<PrimeCandidate> prime_candidates = new ArrayList();

private boolean alive = true;   // MAIN VARIABLE THAT WILL CONTINUE TO RUN THE THREAD 
private boolean toggle = true;  // VARIABLE THAT PAUSES THE PRODUCER

// START: FOR PANEL LAYOUT //
JPanel genPanel = new JPanel();

JPanel lastPane = new JPanel();
JLabel lastOutput = new JLabel();

JPanel queuePane = new JPanel();
JLabel queueOutput = new JLabel();

JLabel headLabel = new JLabel("Generate Thread");
JLabel lastLabel = new JLabel("Last");
JLabel queueLabel = new JLabel("Queue");

JButton pauseButton = new JButton("Pause");
JButton terminateButton = new JButton("Terminate");
// END: FOR PANEL LAYOUT //

// DEFAULT CONSTRUCTOR, DOESN'T REALLY DO MUCH
GenerateThread(){ System.out.println("Creating Generator");}

public JPanel createPanel()
{
    // TO SET THE BLACK BORDER AROUND THE PANELS HOLDING CURRENT SIZE AND 
    // PRIME CANDIDATE
    lastPane.setBorder(BorderFactory.createLineBorder(Color.black));
    queuePane.setBorder(BorderFactory.createLineBorder(Color.black));

    // ADD LISTENER TO TERMINATE THREAD
    terminateButton.addActionListener(new terminateListener());

    // ADD LISTENER TO PAUSE THREAD
    pauseButton.addActionListener(new pauseListener());

    lastPane.add(lastOutput);
    queuePane.add(queueOutput);

    genPanel.add(headLabel);
    genPanel.add(lastLabel);
    genPanel.add(lastPane);
    genPanel.add(queueLabel);
    genPanel.add(queuePane);
    genPanel.add(pauseButton);
    genPanel.add(terminateButton);

    return genPanel;
}


@Override
public void run() 
{
    long i = 3;
    while(alive)
    {
        if(toggle)
        {
            if(i % 2 == 1)
                if(!TestForPrime.isDividableBy3(BigInteger.valueOf(i)))
                {
                    addToArrayList(BigInteger.valueOf(i));
                    lastOutput.setText(BigInteger.valueOf(i).toString());
                    queueOutput.setText(BigInteger.valueOf(prime_candidates.size()).toString());
                }
            i++;
        }
        if(!toggle)continueNotifying();
    }
    while(!alive){continueNotifying();}
}

// ADDS NEW ITEM TO ARRAYLIST AND NOTIFYS ALL THREADS DEPENDENT ON ARRAYLIST
public synchronized void addToArrayList(BigInteger b)
{
        prime_candidates.add(new PrimeCandidate(b, new Date()));
        notify();
}

// CONTINUES NOTIFYING OTHER THREADS WHEN PAUSED OR TERMINATED SINCE THEY WILL
// NOT RUN UNLESS THEY HAVE BEEN GIVEN THE 'GO AHEAD' (NOTIFY())
public synchronized void continueNotifying()
{
    queueOutput.setText(BigInteger.valueOf(prime_candidates.size()).toString());
    notify();
}

// USED BY THE OTHER THREADS TO GET ITEMS FROM THE ARRAY LIST, THREADS DELETE
// THE VARIABLE THAT THEY GRAB, ESSENTIALLY CUTTING THE ARRAY LIST DOWN EVERY
// TIME
public synchronized PrimeCandidate getFromArrayList() throws InterruptedException 
{
      wait(); //Keeps thread from adding more to list while others are grabbing it

      PrimeCandidate returnCandidate = new PrimeCandidate(prime_candidates.get(0));

      prime_candidates.remove(0);

      return returnCandidate;
}

// LISTENER THAT WILL KILL THE THREAD PRIME CANDIDATE PRODUCER
// ALSO GREYS OUT BOTH BUTTONS TO SHOW THREAD IS DEAD
class terminateListener implements ActionListener
{
    @Override
    public void actionPerformed(ActionEvent e)
    { 
        terminateButton.setEnabled(false);
        pauseButton.setEnabled(false);
        alive = false;
    }
}

// LISTENER THAT WILL TEMPORARILY STOP THE PRODUCER
class pauseListener implements ActionListener
{
    @Override
    public void actionPerformed(ActionEvent e) 
    { 
        if(toggle)
            pauseButton.setText("Start");
        if(!toggle)
            pauseButton.setText("Pause");
        toggle = !toggle;
    }
}
}

消费者话题

import java.awt.Color;
import java.awt.Dimension;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.BufferedWriter;  
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import javax.swing.BorderFactory;
import javax.swing.JButton;
import javax.swing.JLabel;
import javax.swing.JPanel;
import javax.swing.Timer;

public class CheckThread  implements Runnable{

private final GenerateThread generated;
private ArrayList<Primes> prime_list = new ArrayList();
//private CopyOnWriteArrayList<Primes> prime_list = new CopyOnWriteArrayList();
private final String threadName;
private boolean alive = true;
private boolean toggle = true;
private Date timeFetched;
private Date timeFound;
private Primes currentPrime;

// START: FOR PANEL LAYOUT //
JPanel genPanel = new JPanel();

JPanel lastPane = new JPanel();
JLabel lastOutput = new JLabel();

JPanel queuePane = new JPanel();
JLabel queueOutput = new JLabel();

JLabel headLabel = new JLabel("Generate Thread");
JLabel lastLabel = new JLabel("Last");
JLabel queueLabel = new JLabel("In Work");

JButton pauseButton = new JButton("Pause");
JButton terminateButton = new JButton("Terminate");
// END: FOR PANEL LAYOUT //

CheckThread(GenerateThread gen, String name, int priority)
{
    generated = gen;
    threadName = name;
    headLabel.setText(name);
    System.out.println("Creating | " + name);

    if(priority == 0)
    {
       System.out.println(name + " Is writing thread");
        final Timer t = new Timer(10000, new writeToFileTimer());
        t.start();
    }
}

public JPanel createPanel()
{
   lastPane.setBorder(BorderFactory.createLineBorder(Color.black));
   lastPane.setMinimumSize(new Dimension(200,100));
   queuePane.setBorder(BorderFactory.createLineBorder(Color.black));


   terminateButton.addActionListener(new terminateListener());

   pauseButton.addActionListener(new pauseListener());

   lastPane.add(lastOutput);
   queuePane.add(queueOutput);

   genPanel.add(headLabel);
   genPanel.add(lastLabel);
   genPanel.add(lastPane);
   genPanel.add(queueLabel);
   genPanel.add(queuePane);
   genPanel.add(pauseButton);
   genPanel.add(terminateButton);

   lastOutput.setIgnoreRepaint(true);
   return genPanel;
}
 class writeToFileTimer implements ActionListener
 {
    @Override
    public void actionPerformed(ActionEvent event)
    {
       try{ writeToFile();}catch (InterruptedException e) { System.out.println("WTF");}
    }
 }

@Override
public void run() 
{
    while(alive)
    {
        try 
        {
            if(toggle)
            {
                PrimeCandidate recieved = new PrimeCandidate(generated.getFromArrayList());
                timeFetched = new Date();
                //queueOutput.setText(recieved.getCandidate().toString());
                if(TestForPrime.isPrime(recieved.getCandidate()))
                {
                    lastOutput.setText(recieved.getCandidate().toString());
                    timeFound = new Date();
                    String temp = "" + prime_list.size();
                    queueOutput.setText(temp);
                    addToArrayList(new Primes(recieved.getCandidate(),recieved.getTimeStamp(), timeFetched));
                }
            }
        } catch (InterruptedException e) {}
    }
}

class terminateListener implements ActionListener
{
    @Override
    public void actionPerformed(ActionEvent e) 
    { 
        terminateButton.setEnabled(false);
        pauseButton.setEnabled(false);
        alive = false;
    }
}

class pauseListener implements ActionListener
{
    @Override
    public void actionPerformed(ActionEvent e) 
    { 
        if(toggle)
            pauseButton.setText("Start");
        if(!toggle)
            pauseButton.setText("Pause");
        toggle = !toggle;
    }
}

// ADDS NEW ITEM TO ARRAYLIST AND NOTIFYS ALL THREADS DEPENDENT ON ARRAYLIST 
public synchronized void addToArrayList(Primes primeFound)
{
        prime_list.add(primeFound);
}

// USED BY THE OTHER THREADS TO GET ITEMS FROM THE ARRAY LIST, THREADS DELETE
// THE VARIABLE THAT THEY GRAB, ESSENTIALLY CUTTING THE ARRAY LIST DOWN EVERY
// TIME
public synchronized void writeToFile() throws InterruptedException
{
    wait(); 
    try 
    {
        File statText = new File("prime_numbers.txt");

        FileOutputStream is = new FileOutputStream(statText);

        OutputStreamWriter osw = new OutputStreamWriter(is);    

        Writer writ = new BufferedWriter(osw);
        while(!prime_list.isEmpty())
        {
            writ.append(prime_list.get(0).toString());
            prime_list.remove(0);
        }
        writ.close();
        headLabel.setText(threadName);

    } catch (IOException e) {System.err.println(" failed to write to Test.txt");}
    notify();
}   
}

我知道代码写得不好,但是当我做这个任务时,我一直在碰壁,从头开始重写我的代码,最终不再关心。有谁知道我如何可以轻松地在消费者线程中同步我的所有 prime_list Arraylist。如果您没有注意到我构建了每个数组列表,那么我可以简单地将每个线程/面板添加到主框架。它在获取和检查素数方面完美无缺,但我目前的头痛是将所有线程一起产生的文件写入文件。我可以将每个线程单独写入文件,但它们最终每次都相互写入

【问题讨论】:

  • 您的问题是什么?请用一句话。
  • 欢迎来到社区。我建议在阅读完此内容后编辑您的问题:msmvps.com/blogs/jon_skeet/archive/2010/08/29/…
  • 如果你“终于不再关心”了,为什么还要指望别人关心呢?更重要的是:您应该对问题进行表述,以便可以理解您需要帮助的内容,并且您的代码应该简短并且仅包含与您所询问的问题相关的部分 - 在您的情况下,这里不需要任何 GUI 代码.

标签: java multithreading arraylist synchronization


【解决方案1】:

如果我理解正确,您需要每 60 秒将所有消费者的累积质数写入一个文件。

现在您使用 prime_list 作为实例变量,因此多个线程将拥有自己的 prime_list。您可以将 prime_list 标记为静态,并且访问它们的方法也是静态的。在这些方法中,代码必须在同步块内。

这不是最好的设计。如果我从头开始写,我不会使用这种设计,但它会为你工作。

import java.awt.Color;
import java.awt.Dimension;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.BufferedWriter;  
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import javax.swing.BorderFactory;
import javax.swing.JButton;
import javax.swing.JLabel;
import javax.swing.JPanel;
import javax.swing.Timer;

public class CheckThread  implements Runnable{

private final GenerateThread generated;
private static ArrayList<Primes> prime_list = new ArrayList();
//private CopyOnWriteArrayList<Primes> prime_list = new CopyOnWriteArrayList();
private final String threadName;
private boolean alive = true;
private boolean toggle = true;
private Date timeFetched;
private Date timeFound;
private Primes currentPrime;

// START: FOR PANEL LAYOUT //
JPanel genPanel = new JPanel();

JPanel lastPane = new JPanel();
JLabel lastOutput = new JLabel();

JPanel queuePane = new JPanel();
JLabel queueOutput = new JLabel();

JLabel headLabel = new JLabel("Generate Thread");
JLabel lastLabel = new JLabel("Last");
JLabel queueLabel = new JLabel("In Work");

JButton pauseButton = new JButton("Pause");
JButton terminateButton = new JButton("Terminate");
// END: FOR PANEL LAYOUT //

CheckThread(GenerateThread gen, String name, int priority)
{
    generated = gen;
    threadName = name;
    headLabel.setText(name);
    System.out.println("Creating | " + name);

    if(priority == 0)
    {
       System.out.println(name + " Is writing thread");
        final Timer t = new Timer(10000, new writeToFileTimer());
        t.start();
    }
}

public JPanel createPanel()
{
   lastPane.setBorder(BorderFactory.createLineBorder(Color.black));
   lastPane.setMinimumSize(new Dimension(200,100));
   queuePane.setBorder(BorderFactory.createLineBorder(Color.black));


   terminateButton.addActionListener(new terminateListener());

   pauseButton.addActionListener(new pauseListener());

   lastPane.add(lastOutput);
   queuePane.add(queueOutput);

   genPanel.add(headLabel);
   genPanel.add(lastLabel);
   genPanel.add(lastPane);
   genPanel.add(queueLabel);
   genPanel.add(queuePane);
   genPanel.add(pauseButton);
   genPanel.add(terminateButton);

   lastOutput.setIgnoreRepaint(true);
   return genPanel;
}
 class writeToFileTimer implements ActionListener
 {
    @Override
    public void actionPerformed(ActionEvent event)
    {
       try{ writeToFile();}catch (InterruptedException e) { System.out.println("WTF");}
    }
 }

@Override
public void run() 
{
    while(alive)
    {
        try 
        {
            if(toggle)
            {
                PrimeCandidate recieved = new PrimeCandidate(generated.getFromArrayList());
                timeFetched = new Date();
                //queueOutput.setText(recieved.getCandidate().toString());
                if(TestForPrime.isPrime(recieved.getCandidate()))
                {
                    lastOutput.setText(recieved.getCandidate().toString());
                    timeFound = new Date();
                    String temp = "" + prime_list.size();
                    queueOutput.setText(temp);
                    addToArrayList(new Primes(recieved.getCandidate(),recieved.getTimeStamp(), timeFetched));
                }
            }
        } catch (InterruptedException e) {}
    }
}

class terminateListener implements ActionListener
{
    @Override
    public void actionPerformed(ActionEvent e) 
    { 
        terminateButton.setEnabled(false);
        pauseButton.setEnabled(false);
        alive = false;
    }
}

class pauseListener implements ActionListener
{
    @Override
    public void actionPerformed(ActionEvent e) 
    { 
        if(toggle)
            pauseButton.setText("Start");
        if(!toggle)
            pauseButton.setText("Pause");
        toggle = !toggle;
    }
}

// ADDS NEW ITEM TO ARRAYLIST AND NOTIFYS ALL THREADS DEPENDENT ON ARRAYLIST 
public static void addToArrayList(Primes primeFound)
{
   synchronized(prime_list) { // Added this
       prime_list.add(primeFound);
   }
}

// USED BY THE OTHER THREADS TO GET ITEMS FROM THE ARRAY LIST, THREADS DELETE
// THE VARIABLE THAT THEY GRAB, ESSENTIALLY CUTTING THE ARRAY LIST DOWN EVERY
// TIME
public static void writeToFile() throws InterruptedException
{
    synchronized(prime_list) { // Added this
    try 
    {
        File statText = new File("prime_numbers.txt");

        FileOutputStream is = new FileOutputStream(statText);

        OutputStreamWriter osw = new OutputStreamWriter(is);    

        Writer writ = new BufferedWriter(osw);
        for(int i=0;i<prime_list.size();i++) {
            writ.append(prime_list.get(i));
        }
        prime_list.clear();
        writ.close();
        headLabel.setText(threadName);

    } catch (IOException e) {System.err.println(" failed to write to Test.txt");}
    }
}   
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-04-12
    • 2017-03-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多