【问题标题】:Thread synchrnisation with PipedInputStream and PipedOutputStream使用 PipedInputStream 和 PipedOutputStream 进行线程同步
【发布时间】:2014-07-21 05:14:50
【问题描述】:

我在尝试用Java 中的PipedInputStream 和PipedOutputStream 实现线程同步时遇到问题。

共有三个线程 T1、T2、T3 可以同时编辑名为 toto.txt 的文件。 toto.txt 的文件内容是这样的:

T1 : 1 T2 : 1 T3 : 1 T1 : 2 T2 : 2 T3 : 2 T1 : 3 T2 : 3 T3 : 3 ....

我的想法是:每个线程只有在有一个关键变量key = true时才能访问toto.txt。编辑文件后,线程 A 将密钥内容写入连接到 PipedOutputStream 的 pipedInputStream。线程 B 从 PipedOutStream 读取 key,如果 key = true,B 可以访问编辑文件。有一个可以写入文件的启动线程,另一个线程首先等待密钥 -> 写入文件 -> 将密钥写入管道。如果有 3 个螺纹,则连接 3 个管道:T1-T2、T2-T3、T3-T1。

我的代码线程

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;

public class threadFlux implements Runnable {

    public String _threadName;
    public boolean _key;
    public boolean _stratingThread;
    public int _count;
    public int _maxCount;
    public String _fileName;
    public DataInputStream _is;
    public DataOutputStream _os;

    public threadFlux(String threadName, String fileName, boolean starting, int maxCount) {
        this._threadName = threadName; 
        this._maxCount = maxCount;
        this._count = 1;
        this._fileName = fileName;
        this._stratingThread = starting;
        this._key = (starting == true);
    }

    @Override
    public void run() {
        while (this._count <= this._maxCount) {
            if (this._stratingThread == true) {
                try {
                    /* starting thread write to file */
                    System.out.println("startint thread");
                    System.out.println(this._threadName + ": " + this._count);

                    this.writeToFile(this._threadName + ": " + this._count + "\n");
                    this._count++;
                    /* write key to pipe */
                    this.writeKeyToPipe(this._key);
                    System.out.println("key written");
                    /* set key = false */
                    this._key = false;
                    this._stratingThread = false;
                } catch (IOException ex) {
                    Logger.getLogger(threadFlux.class.getName()).log(Level.SEVERE, null, ex);
                }
            } else {
                try {
                    /* read key from pipe */
                    System.out.println(this._threadName + " Clef " + this._key);
                    this._key = this.readKeyFromPipe();
                    System.out.println(this._threadName + " Clef " + this._key);
                    /* write key to pipe */
                    System.out.println(this._threadName + ": " + this._count);
                    this.writeToFile(this._threadName + ": " + this._count + "\n");
                    this._count++;

                    /* write key to pipe for another thread */
                    this.writeKeyToPipe(this._key);
                    this._key = false;
                } catch (IOException ex) {
                    Logger.getLogger(threadFlux.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
        System.out.println(this._threadName + " finish!");
    }

    public void setPipedStream(PipedOutputStream pos, PipedInputStream pis) throws IOException {
        this._os = new DataOutputStream(new BufferedOutputStream(pos));
        this._is = new DataInputStream(new BufferedInputStream(pis));

    }

    private void writeToFile(String string) throws IOException {
        File file = new File(this._fileName);

        //if file doesnt exists, then create it
        if (!file.exists()) {
            file.createNewFile();
        }

        //true = append file
        FileWriter fileWritter = new FileWriter(file.getName(), true);
        try (BufferedWriter bufferWritter = new BufferedWriter(fileWritter)) {
            bufferWritter.write(string);
            bufferWritter.close();
        }
    }

    private void writeKeyToPipe(boolean _key) throws IOException {
        this._os.writeBoolean(_key);      
    }

    private boolean readKeyFromPipe() throws IOException {
        return this._is.readBoolean();
    }
}

我的主程序

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Test {
    public static void main(String[] args) {

        try {
            // TODO code application logic here

            threadFlux runnableThread1 = new threadFlux("T1", "toto.txt", true, 3);

            threadFlux runnableThread2 = new threadFlux("T2", "toto.txt", false, 3);

            threadFlux runnableThread3 = new threadFlux("T3", "toto.txt", false, 3);

            PipedOutputStream pos1 = new PipedOutputStream();           
            PipedOutputStream pos2 = new PipedOutputStream();
            PipedOutputStream pos3 = new PipedOutputStream();

            PipedInputStream pis2 = new PipedInputStream(pos1);
            PipedInputStream pis1 = new PipedInputStream(pos3);
            PipedInputStream pis3 = new PipedInputStream(pos2);

            runnableThread1.setPipedStream(pos1, pis1);
            runnableThread2.setPipedStream(pos2, pis2);
            runnableThread3.setPipedStream(pos3, pis3);

            Thread thread1 = new Thread(runnableThread1);
            Thread thread2 = new Thread(runnableThread2);
            Thread thread3 = new Thread(runnableThread3);

            thread1.start();
            thread2.start();
            thread3.start();
        } catch (IOException ex) {
            Logger.getLogger(Test.class.getName()).log(Level.SEVERE, null, ex);
        } finally {        
        }
    }
}

当我运行这些代码时出现的问题是:在启动线程写入文件并将密钥写入 PipedOutputStream 后它被阻塞。

感谢您的帮助

【问题讨论】:

    标签: java stream synchronization output pipe


    【解决方案1】:

    PipedOutputStream 有一个固定的缓冲区,我上次查看时为 4k。当它填满时,它会阻塞写入,直到读取线程读取某些内容。所以你的阅读线程没有阅读。

    不要这样做。线程之间的 I/O 管道基本上是不必要的。您不需要像这样移动数据。寻找其他设计。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-04-01
      • 1970-01-01
      • 1970-01-01
      • 2012-03-18
      • 1970-01-01
      • 2017-10-28
      • 1970-01-01
      相关资源
      最近更新 更多