【问题标题】:Java Process with concurrent Input/Output Streams具有并发输入/输出流的 Java 进程
【发布时间】:2016-01-29 20:19:28
【问题描述】:

我正在尝试创建一种允许用户输入字符串的控制台/终端,然后将其制成一个进程并打印出结果。就像普通的控制台一样。但是我在管理输入/输出流时遇到了麻烦。我研究了this thread,但遗憾的是,该解决方案不适用于我的问题。

除了“ipconfig”和“cmd.exe”等标准命令外,如果脚本要求输入,我还需要能够运行脚本并使用相同的输入流来传递一些参数。

例如,在运行脚本“python pyScript.py”后,如果它要求,我应该能够将进一步的输入传递给脚本(例如:raw_input),同时还可以打印脚本的输出。您期望从终端获得的基本行为。

到目前为止我得到了什么:

import java.awt.BorderLayout;
import java.awt.Color;
import java.awt.Dimension;
import java.awt.event.KeyEvent;
import java.awt.event.KeyListener;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;

import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextPane;
import javax.swing.text.BadLocationException;
import javax.swing.text.Document;

public class Console extends JFrame{

    JTextPane inPane, outPane;
    InputStream inStream, inErrStream;
    OutputStream outStream;

    public Console(){
        super("Console");
        setPreferredSize(new Dimension(500, 600));
        setLocationByPlatform(true);
        setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);

        // GUI
        outPane = new JTextPane();
        outPane.setEditable(false);
        outPane.setBackground(new Color(20, 20, 20));
        outPane.setForeground(Color.white);
        inPane = new JTextPane();
        inPane.setBackground(new Color(40, 40, 40));
        inPane.setForeground(Color.white);
        inPane.setCaretColor(Color.white);

        JPanel panel = new JPanel(new BorderLayout());
        panel.add(outPane, BorderLayout.CENTER);
        panel.add(inPane, BorderLayout.SOUTH);

        JScrollPane scrollPanel = new JScrollPane(panel);

        getContentPane().add(scrollPanel);

        // LISTENER
        inPane.addKeyListener(new KeyListener(){
            @Override
            public void keyPressed(KeyEvent e){
              if(e.getKeyCode() == KeyEvent.VK_ENTER){
                    e.consume();
                    read(inPane.getText());
                }
            }
            @Override
            public void keyTyped(KeyEvent e) {}

            @Override
            public void keyReleased(KeyEvent e) {}
        });


        pack();
        setVisible(true);
    }

    private void read(String command){
        println(command);

        // Write to Process
        if (outStream != null) {
            System.out.println("Outstream again");
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outStream));
            try {
                writer.write(command);
                //writer.flush();
                //writer.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }

        // Execute Command
        try {
            exec(command);
        } catch (IOException e) {}

        inPane.setText("");
    }

    private void exec(String command) throws IOException{
        Process pro = Runtime.getRuntime().exec(command, null);

        inStream = pro.getInputStream();
        inErrStream = pro.getErrorStream();
        outStream = pro.getOutputStream();

        Thread t1 = new Thread(new Runnable() {
            public void run() {
                try {
                    String line = null;
                    while(true){
                        BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
                        while ((line = in.readLine()) != null) {
                            println(line);
                        }
                        BufferedReader inErr = new BufferedReader(new InputStreamReader(inErrStream));
                        while ((line = inErr.readLine()) != null) {
                            println(line);
                        }
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
    }

    public void println(String line) {
        Document doc = outPane.getDocument();
        try {
            doc.insertString(doc.getLength(), line + "\n", null);
        } catch (BadLocationException e) {}
    }

    public static void main(String[] args){
        new Console();
    }
}

我不使用提到的ProcessBuilder,因为我喜欢区分错误流和正常流。

29.08.2016 更新

在@ArcticLord 的帮助下,我们已经实现了原始问题中提出的问题。 现在只需消除任何奇怪的行为,如非终止过程。控制台有一个“停止”按钮,只需调用 pro.destroy()。但由于某种原因,这不适用于无限运行的进程,即垃圾邮件输出。

控制台:http://pastebin.com/vyxfPEXC

InputStreamLineBuffer:http://pastebin.com/TzFamwZ1

停止的示例代码:

public class Infinity{
    public static void main(String[] args){ 
        while(true){
            System.out.println(".");
        }
    }
}

确实停止的示例代码:

import java.util.concurrent.TimeUnit;

public class InfinitySlow{
    public static void main(String[] args){ 
        while(true){
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(".");
        }
    }
}

【问题讨论】:

    标签: java process inputstream outputstream


    【解决方案1】:

    您的代码是正确的。您只错过了一些小事。
    让我们从您的 read 方法开始:

    private void read(String command){
        [...]
        // Write to Process
        if (outStream != null) {
            [...]
            try {
                writer.write(command + "\n");  // add newline so your input will get proceed
                writer.flush();  // flush your input to your process
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
        // ELSE!! - if no outputstream is available
        // Execute Command
        else {
            try {
                exec(command);
            } catch (IOException e) {
                // Handle the exception here. Mostly this means
                // that the command could not get executed
                // because command was not found.
                println("Command not found: " + command);
            }
        }
        inPane.setText("");
    }
    

    现在让我们修复您的exec 方法。您应该使用单独的线程来读取正常的进程输出和错误输出。此外,我引入了第三个线程,它等待进程结束并关闭 outputStream,因此下一个用户输入并不意味着进程,而是一个新命令。

    private void exec(String command) throws IOException{
        Process pro = Runtime.getRuntime().exec(command, null);
    
        inStream = pro.getInputStream();
        inErrStream = pro.getErrorStream();
        outStream = pro.getOutputStream();
    
        // Thread that reads process output
        Thread outStreamReader = new Thread(new Runnable() {
            public void run() {
                try {
                    String line = null;
                    BufferedReader in = new BufferedReader(new InputStreamReader(inStream));                        
                    while ((line = in.readLine()) != null) {
                        println(line);                       
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("Exit reading process output");
            }
        });
        outStreamReader.start();
    
        // Thread that reads process error output
        Thread errStreamReader = new Thread(new Runnable() {
            public void run() {
                try {
                    String line = null;           
                    BufferedReader inErr = new BufferedReader(new InputStreamReader(inErrStream));
                    while ((line = inErr.readLine()) != null) {
                        println(line);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("Exit reading error stream");
            }
        });
        errStreamReader.start();
    
        // Thread that waits for process to end
        Thread exitWaiter = new Thread(new Runnable() {
            public void run() {
                try {
                    int retValue = pro.waitFor();
                    println("Command exit with return value " + retValue);
                    // close outStream
                    outStream.close();
                    outStream = null;
                } catch (InterruptedException e) {
                    e.printStackTrace(); 
                } catch (IOException e) {
                    e.printStackTrace();
                } 
            }
        });
        exitWaiter.start();
    }
    

    现在应该可以了。
    如果您输入ipconfig,它会打印命令输出,关闭输出流并准备好执行新命令。
    如果您输入cmd,它会打印输出并让您输入更多的cmd 命令,例如dircd 等等,直到您输入exit。然后它关闭输出流并准备好执行新命令。

    您可能会在执行 python 脚本时遇到问题,因为如果没有将它们刷新到系统管道中,使用 Java 读取 Process InputStreams 会出现问题。
    请参阅此示例 python 脚本

    print "Input something!"
    str = raw_input()
    print "Received input is : ", str
    

    您可以使用 Java 程序运行它并输入输入,但在脚本完成之前您不会看到脚本输出。
    我能找到的唯一解决方法是手动刷新脚本中的输出。

    import sys
    print "Input something!"
    sys.stdout.flush()
    str = raw_input()
    print "Received input is : ", str
    sys.stdout.flush()
    

    运行这个脚本会如你所愿。
    您可以在

    处阅读有关此问题的更多信息

    编辑:我刚刚为 Python 脚本的 stdout.flush() 问题找到了另一个非常简单的解决方案。以python -u script.py 启动它们,您无需手动刷新。这应该可以解决您的问题。

    EDIT2:我们在 cmets 中讨论了使用此解决方案的输出和错误流将混淆,因为它们在不同的线程中运行。这里的问题是,当错误流线程出现时,我们无法区分输出写入是否完成。否则,带有锁的经典线程调度可以处理这种情况。但是无论数据是否流动,我们都有一个连续的流,直到处理完成。因此,我们需要一种机制来记录自从从每个流中读取最后一行以来已经过去了多少时间。

    为此,我将介绍一个获取 InputStream 并启动线程以读取传入数据的类。该线程将每一行存储在队列中,并在流结束时停止。此外,它还保存最后一行被读取并添加到队列中的时间。

    public class InputStreamLineBuffer{
        private InputStream inputStream;
        private ConcurrentLinkedQueue<String> lines;
        private long lastTimeModified;
        private Thread inputCatcher;
        private boolean isAlive;
    
        public InputStreamLineBuffer(InputStream is){
            inputStream = is;
            lines = new ConcurrentLinkedQueue<String>();
            lastTimeModified = System.currentTimeMillis();
            isAlive = false;
            inputCatcher = new Thread(new Runnable(){
                @Override
                public void run() {
                    StringBuilder sb = new StringBuilder(100);
                    int b;
                    try{
                        while ((b = inputStream.read()) != -1){  
                            // read one char
                            if((char)b == '\n'){
                                // new Line -> add to queue
                                lines.offer(sb.toString());
                                sb.setLength(0); // reset StringBuilder
                                lastTimeModified = System.currentTimeMillis();
                            }
                            else sb.append((char)b); // append char to stringbuilder
                        }
                    } catch (IOException e){
                        e.printStackTrace();
                    } finally {
                        isAlive = false;
                    }
                }});
        }
        // is the input reader thread alive
        public boolean isAlive(){
            return isAlive;
        }
        // start the input reader thread
        public void start(){
            isAlive = true;
            inputCatcher.start();
        }
        // has Queue some lines
        public boolean hasNext(){
            return lines.size() > 0;
        }
        // get next line from Queue
        public String getNext(){
            return lines.poll();
        }
        // how much time has elapsed since last line was read
        public long timeElapsed(){
            return (System.currentTimeMillis() - lastTimeModified);
        }
    }
    

    通过这个类,我们可以将输出和错误读取线程合二为一。当输入读取缓冲区线程存在并且没有使用数据时,它仍然存在。在每次运行中,它都会检查自上次读取输出后是否已经过去了一段时间,如果是,它会一次打印所有未打印的行。与错误输出相同。然后它会休眠几毫秒,以免浪费 cpu 时间。

    private void exec(String command) throws IOException{
        Process pro = Runtime.getRuntime().exec(command, null);
    
        inStream = pro.getInputStream();
        inErrStream = pro.getErrorStream();
        outStream = pro.getOutputStream();
    
        InputStreamLineBuffer outBuff = new InputStreamLineBuffer(inStream);
        InputStreamLineBuffer errBuff = new InputStreamLineBuffer(inErrStream);
    
        Thread streamReader = new Thread(new Runnable() {       
            public void run() {
                // start the input reader buffer threads
                outBuff.start();
                errBuff.start();
    
                // while an input reader buffer thread is alive
                // or there are unconsumed data left
                while(outBuff.isAlive() || outBuff.hasNext() ||
                    errBuff.isAlive() || errBuff.hasNext()){
    
                    // get the normal output if at least 50 millis have passed
                    if(outBuff.timeElapsed() > 50)
                        while(outBuff.hasNext())
                            println(outBuff.getNext());
                    // get the error output if at least 50 millis have passed
                    if(errBuff.timeElapsed() > 50)
                        while(errBuff.hasNext())
                            println(errBuff.getNext());
                    // sleep a bit bofore next run
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }                 
                }
                System.out.println("Finish reading error and output stream");
            }          
        });
        streamReader.start();
    
        // remove outStreamReader and errStreamReader Thread
        [...]
    }
    

    也许这不是一个完美的解决方案,但它应该处理这里的情况。


    编辑 (31.8.2016)
    我们在 cmets 中讨论过,在实现杀死启动的停止按钮时,代码仍然存在问题 使用Process#destroy() 处理。产生大量输出的过程,例如在无限循环中 通过调用destroy() 立即销毁。但是由于它已经产生了大量必须消耗的输出 通过我们的streamReader,我们无法恢复正常的程序行为。
    所以我们需要在这里做一些小的改动:

    我们将在InputStreamLineBuffer 中引入一个destroy() 方法来停止输出读取并清除队列。
    更改将如下所示:

    public class InputStreamLineBuffer{
        private boolean emergencyBrake = false;
        [...]
        public InputStreamLineBuffer(InputStream is){
            [...]
                    while ((b = inputStream.read()) != -1 && !emergencyBrake){
                        [...]
                    }
        }
        [...]
    
        // exits immediately and clears line buffer
        public void destroy(){
            emergencyBrake = true;
            lines.clear();
        }
    }
    

    还有一些主程序的小改动

    public class ExeConsole extends JFrame{
        [...]
        // The line buffers must be declared outside the method
        InputStreamLineBuffer outBuff, errBuff; 
        public ExeConsole{
            [...]
            btnStop.addActionListener(new ActionListener() {
                public void actionPerformed(ActionEvent e) {
                     if(pro != null){
                          pro.destroy();
                          outBuff.destroy();
                          errBuff.destroy();
                     }
            }});
        }
        [...]
        private void exec(String command) throws IOException{
            [...]
            //InputStreamLineBuffer outBuff = new InputStreamLineBuffer(inStream);
            //InputStreamLineBuffer errBuff = new InputStreamLineBuffer(inErrStream);        
            outBuff = new InputStreamLineBuffer(inStream);
            errBuff = new InputStreamLineBuffer(inErrStream);    
            [...]
        }
    }
    

    现在它应该能够破坏甚至一些输出垃圾邮件进程。

    注意:我发现Process#destroy() 无法破坏子进程。所以如果你在 Windows 上启动 cmd 并从那里启动一个 java 程序,你最终会在 java 程序仍在运行时破坏 cmd 进程。 您将在任务管理器中看到它。这个问题不能用java本身解决。它需要 一些操作系统依赖于外部工具来获取这些进程的 pids 并手动杀死它们。

    【讨论】:

    • 非常感谢!实际上我昨天找到了类似的解决方案。但是错误和输出流存在一个小问题。由于它们位于两个单独的线程中,因此它们经常混淆它们的输出。例如:“cmd”、“dir”、“foo”、“bar”。由于 foo 在无效输入中,cmd 返回 2 行(命令和一个新行)与 outstream 和 2 行(错误描述)与错误流。但结果往往是:out,err, out, err.
    • 对,但是当errorstream开始打印foo不是命令时,您无法确定打印dir命令的输出是否完成。所以我认为你只能使用ProcessBuilderredirectErrorStream 并忽略这里的第二个线程。这样就解决了这个问题。
    • 我正在尝试使用redirectErrorStream,但它似乎并没有真正打印出错误流。只有outStream。而且一旦它打印了一个errorStream,它就结束了这个过程。 (PS:我也喜欢有单独的流,因为我可以用红色标记错误流)。要是有办法把它们按正确的顺序排列就好了……
    • 我已在我的答案中添加了混合问题的解决方案。我希望这能满足您的需求:)
    • 感谢您的出色工作!你肯定已经为自己赢得了奖励!可悲的是,我无法让它最终发挥作用。您能否通过 pastebin 或其他方式将整个控制台代码发送给我? (我尝试将您的解决方案与我的版本合并,这与操作不同。这可能会导致一些问题......)
    【解决方案2】:

    虽然@ArticLord 的解决方案很好很简洁,但最近我遇到了同样的问题,并提出了一个概念上等效的解决方案,但在实现上略有不同。

    概念是相同的,即“批量读取”:当读取器线程获得轮到时,它会消耗它处理的所有流,并且只有在完成时才传递手。
    这保证了 out/err 打印顺序。

    但我没有使用基于计时器的轮次分配,而是使用基于锁的非阻塞读取模拟:

    // main method for testability: replace with private void exec(String command)
    public static void main(String[] args) throws Exception
    {
        // create a lock that will be shared between reader threads
        // the lock is fair to minimize starvation possibilities
        ReentrantLock lock = new ReentrantLock(true);
    
        // exec the command: I use nslookup for testing on windows 
        // because it is interactive and prints to stderr too
        Process p = Runtime.getRuntime().exec("nslookup");
    
        // create a thread to handle output from process (uses a test consumer)
        Thread outThread = createThread(p.getInputStream(), lock, System.out::print);
        outThread.setName("outThread");
        outThread.start();
    
        // create a thread to handle error from process (test consumer, again)
        Thread errThread = createThread(p.getErrorStream(), lock, System.err::print);
        errThread.setName("errThread");
        errThread.start();
    
        // create a thread to handle input to process (read from stdin for testing purpose)
        PrintWriter writer = new PrintWriter(p.getOutputStream());
        Thread inThread = createThread(System.in, null, str ->
        {
            writer.print(str);
            writer.flush();
        });
        inThread.setName("inThread");
        inThread.start();
    
        // create a thread to handle termination gracefully. Not really needed in this simple
        // scenario, but on a real application we don't want to block the UI until process dies
        Thread endThread = new Thread(() ->
        {
            try
            {
                // wait until process is done
                p.waitFor();
                logger.debug("process exit");
    
                // signal threads to exit
                outThread.interrupt();
                errThread.interrupt();
                inThread.interrupt();
    
                // close process streams
                p.getOutputStream().close();
                p.getInputStream().close();
                p.getErrorStream().close();
    
                // wait for threads to exit
                outThread.join();
                errThread.join();
                inThread.join();
    
                logger.debug("exit");
            }
            catch(Exception e)
            {
                throw new RuntimeException(e.getMessage(), e);
            }
        });
        endThread.setName("endThread");
        endThread.start();
    
        // wait for full termination (process and related threads by cascade joins)
        endThread.join();
    
        logger.debug("END");
    }
    
    // convenience method to create a specific reader thread with exclusion by lock behavior
    private static Thread createThread(InputStream input, ReentrantLock lock, Consumer<String> consumer)
    {
        return new Thread(() ->
        {
            // wrap input to be buffered (enables ready()) and to read chars
            // using explicit encoding may be relevant in some case
            BufferedReader reader = new BufferedReader(new InputStreamReader(input));
    
            // create a char buffer for reading
            char[] buffer = new char[8192];
    
            try
            {
                // repeat until EOF or interruption
                while(true)
                {
                    try
                    {
                        // wait for your turn to bulk read
                        if(lock != null && !lock.isHeldByCurrentThread())
                        {
                            lock.lockInterruptibly();
                        }
    
                        // when there's nothing to read, pass the hand (bulk read ended)
                        if(!reader.ready())
                        {
                            if(lock != null)
                            {
                                lock.unlock();
                            }
    
                            // this enables a soft busy-waiting loop, that simultates non-blocking reads
                            Thread.sleep(100);
                            continue;
                        }
    
                        // perform the read, as we are sure it will not block (input is "ready")
                        int len = reader.read(buffer);
                        if(len == -1)
                        {
                            return;
                        }
    
                        // transform to string an let consumer consume it
                        String str = new String(buffer, 0, len);
                        consumer.accept(str);
                    }
                    catch(InterruptedException e)
                    {
                        // catch interruptions either when sleeping and waiting for lock
                        // and restore interrupted flag (not necessary in this case, however it's a best practice)
                        Thread.currentThread().interrupt();
                        return;
                    }
                    catch(IOException e)
                    {
                        throw new RuntimeException(e.getMessage(), e);
                    }
                }
            }
            finally
            {
                // protect the lock against unhandled exceptions
                if(lock != null && lock.isHeldByCurrentThread())
                {
                    lock.unlock();
                }
    
                logger.debug("exit");
            }
        });
    }
    

    请注意,@ArticLord 和我的两种解决方案都不是完全安全的,而且机会(真的很少)与消费者的速度成反比。

    2016 年快乐! ;)

    【讨论】:

    • 如何让线程从单进程构建器获取输入和输出并使用 jtext 组件
    猜你喜欢
    • 2011-04-08
    • 1970-01-01
    • 1970-01-01
    • 2012-03-21
    • 1970-01-01
    • 2012-07-19
    • 2012-11-17
    相关资源
    最近更新 更多