【问题标题】:Start and stop Process Thread from Callable从 Callable 启动和停止进程线程
【发布时间】:2014-10-22 10:13:25
【问题描述】:

我有一个启动线程的可调用对象(该线程运行一个 ping 进程)我想允许用户取消任务:

public class PingCallable implements Callable<PingResult> {

private ProcThread processThread;

public PingCallable(String ip) {
    this.processThread = new ProcThread(ip);
}

@Override
public PingResult call() throws Exception {
    log.trace("Checking if the ip " + ip + " is alive");
    try {
        processThread.start();
        try {
            processThread.join();
        } catch (InterruptedException e) {
            log.error("The callable thread was interrupted for " + processThread.getName());
            processThread.interrupt();
            // Good practice to reset the interrupt flag.
            Thread.currentThread().interrupt();
        }
    } catch (Throwable e) {
        System.out.println("Throwable ");
    }
    return new PingResult(ip, processThread.isPingAlive());
  }
}

ProcThread,看起来像:

@Override
public void run() {
    try {
        process = Runtime.getRuntime().exec("the long ping", null, workDirFile);
        /* Get process input and error stream, not here to keep it short*/ 

        // waitFor is InterruptedException sensitive
        exitVal = process.waitFor();
    } catch (InterruptedException ex) {
        log.error("interrupted " + getName(), ex);
        process.destroy();
        /* Stop the intput and error stream handlers, not here */ 
        // Reset the status, good practice
        Thread.currentThread().interrupt();
    } catch (IOException ex) {
        log.error("Exception while execution", ex);
    }
}

还有测试:

    @Test
    public void test() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(15);
        List<Future<PingResult>> futures = new ArrayList<>();

        for (int i= 0; i < 100; i++) {
            PingCallable pingTask = new PingCallable("10.1.1.142");
            futures.add(executorService.submit(pingTask));
        }

        Thread.sleep(10000);
        executorService.shutdownNow();
//        for (Future<PingResult> future : futures) {
//            future.cancel(true);
//        }
    }

我使用 ProcessExplorer 监控 ping 进程,我看到 15,然后执行 shutdownNow,或者 future.cancel(true),只有 4-5 最多 8 个进程被中断,其余的保持活动状态,我几乎从来没有看到 15消息说“可调用线程被中断..”,并且测试直到进程结束才完成。 为什么

【问题讨论】:

  • 这可能与 JUnit 无法与线程一起正常工作有关。或者可能没有执行的线程完成了他们的工作
  • 有任何理由拥有 ProcThread 吗?是否可以将其删除并将 run() 方法中的代码放入您的 PingCallable 中?这将简化问题。您已经有一个线程池,因此您不需要生成更多线程。
  • @pauli 我使用 ProcThread 的唯一原因是我们的许多应用程序都在使用它。我也试过了,没有产生另一个线程,但它仍然不起作用。

标签: java concurrency executorservice java.util.concurrent callable


【解决方案1】:

我可能没有完整的答案,但有两点需要注意:

  • shutdownNow 表示关闭,要查看线程是否实际停止,请使用 awaitTermination
  • process.destroy() 也需要时间来执行,因此可调用对象应该在中断进程线程后等待它完成。

我稍微修改了代码,发现future.cancel(true)实际上会阻止ProcThread的catch InterruptedException-block中的任何东西的执行,除非你使用executor.shutdown()而不是executor.shutdownNow()。打印“Executor terminate: true”时单元测试完成(使用junit 4.11)。 看起来使用 future.cancel(true)executor.shutdownNow() 会双重中断一个线程,这可能会导致被中断的块被跳过。

在我用于测试的代码下方。取消注释 for (Future&lt;PingResult&gt; f : futures) f.cancel(true);shutdown(Now) 以查看输出的差异。

public class TestRunInterrupt {


static long sleepTime = 1000L;
static long killTime = 2000L;

@Test
public void testInterrupts() throws Exception {

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    List<Future<PingResult>> futures = new ArrayList<Future<PingResult>>();
    for (int i= 0; i < 100; i++) {
        PingCallable pingTask = new PingCallable("10.1.1.142");
        futures.add(executorService.submit(pingTask));
    }
    Thread.sleep(sleepTime + sleepTime / 2);
    // for (Future<PingResult> f : futures) f.cancel(true);
    // executorService.shutdown();
    executorService.shutdownNow();
    int i = 0;
    while (!executorService.isTerminated()) {
        System.out.println("Awaiting executor termination " + i);
        executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        i++;
        if (i > 5) {
            break;
        }
    }
    System.out.println("Executor terminated: " + executorService.isTerminated());
}

static class ProcThread extends Thread {

    static AtomicInteger tcount = new AtomicInteger();

    int id;
    volatile boolean slept;

    public ProcThread() {
        super();
        id = tcount.incrementAndGet();
    }

    @Override
    public void run() {

        try {
            Thread.sleep(sleepTime);
            slept = true;
        } catch (InterruptedException ie) {
            // Catching an interrupted-exception clears the interrupted flag.
            System.out.println(id + " procThread interrupted");
            try {
                Thread.sleep(killTime);
                System.out.println(id + " procThread kill time finished");
            } catch (InterruptedException ie2) {
                System.out.println(id + "procThread killing interrupted"); 
            }
            Thread.currentThread().interrupt();
        } catch (Throwable t) {
            System.out.println(id + " procThread stopped: " + t);
        }
    }
}

static class PingCallable implements Callable<PingResult> {

    ProcThread pthread;

    public PingCallable(String s) {
        pthread = new ProcThread();
    }

    @Override
    public PingResult call() throws Exception {

        System.out.println(pthread.id + " starting sleep");
        pthread.start();
        try {
            System.out.println(pthread.id + " awaiting sleep");
            pthread.join();
        } catch (InterruptedException ie) {
            System.out.println(pthread.id + " callable interrupted");
            pthread.interrupt();
            // wait for kill process to finish
            pthread.join();
            System.out.println(pthread.id + " callable interrupt done");
            Thread.currentThread().interrupt();
        } catch (Throwable t) {
            System.out.println(pthread.id + " callable stopped: " + t);
        }
        return new PingResult(pthread.id, pthread.slept);
    }
}

static class PingResult {

    int id;
    boolean slept;

    public PingResult(int id, boolean slept) {
        this.id = id;
        this.slept = slept;
        System.out.println(id + " slept " + slept);
    }
}

}

不带future.cancel(true) 或带future.cancel(true) 和正常shutdown() 的输出: 1 starting sleep 1 awaiting sleep 2 starting sleep 3 starting sleep 2 awaiting sleep 3 awaiting sleep 1 slept true 3 slept true 2 slept true 5 starting sleep 4 starting sleep 6 starting sleep 5 awaiting sleep 6 awaiting sleep 4 awaiting sleep 4 callable interrupted Awaiting executor termination 0 6 callable interrupted 4 procThread interrupted 5 callable interrupted 6 procThread interrupted 5 procThread interrupted Awaiting executor termination 1 6 procThread kill time finished 5 procThread kill time finished 4 procThread kill time finished 5 callable interrupt done 5 slept false 6 callable interrupt done 4 callable interrupt done 6 slept false 4 slept false Executor terminated: true

future.cancel(true)shutdownNow() 的输出: 1 starting sleep 2 starting sleep 1 awaiting sleep 2 awaiting sleep 3 starting sleep 3 awaiting sleep 3 slept true 2 slept true 1 slept true 4 starting sleep 6 starting sleep 5 starting sleep 4 awaiting sleep 5 awaiting sleep 6 awaiting sleep 5 callable interrupted 6 callable interrupted 4 callable interrupted 5 procThread interrupted 6 procThread interrupted 4 procThread interrupted Executor terminated: true

【讨论】:

  • 非常感谢先生。就我而言,我也尝试了 awaitTeremination。测试打印“执行程序终止..”。测试是绿色的,看起来很好,但是运行测试的 jvm 的按钮仍然是红色的,这意味着运行测试的 JVM 仍然打开,当我在 ProcessExplorer 中手动终止 ping 时它关闭。另外我不使用shutdownNow和future.cancel(true),它们基本相同(据我所知)它们中的任何一个都被评论了
  • 我还注意到进程的线程处理程序永远不会终止,在这种情况下执行程序永远不会完成,必须检查如何执行此操作。
  • @dalvarezmartinez1 尽管有process.destroy(),但 ping 进程似乎并未停止。我在java process destroy does not kill ping 上搜索了很多有趣的结果。也许您可以通过运行长 ping 以外的进程(例如,带有“echo hello world”的批处理文件或类似于this answer 的内容)来测试这是否是问题的原因。
  • 非常感谢。你是对的,问题出在某个地方!我也认为我知道如何解决它,我会再测试一些并明天发布一些东西!谢谢!
【解决方案2】:

昨天我进行了一系列测试,其中一项最有成效:

  1. 中断运行进程的线程,检查它是否被中断,并且进程仍然挂在“waitFor”上,
  2. 我决定调查为什么进程没有检测到它正在运行的线程被中断。
  3. 我发现正确处理流(输出、输入和错误)至关重要,否则external process will block on I/O buffer
  4. 我注意到我的错误处理程序在读取时也被阻塞(没有错误输出),不知道这是否是一个问题,但我决定按照建议和redirect the err stream to out stream
  5. 终于发现invoke and destroy processes in Java有一个正确的方法

新的 ProcThread (正如@pauli 所建议的,它不再从 THREAD 扩展!运行在可调用对象中,我保留名称以便可以注意到差异)看起来像:

        try {
        ProcessBuilder builder = new ProcessBuilder(cmd);
        builder.directory(new File(workDir));
        builder.redirectErrorStream(true);
        process = builder.start();
        // any output?
        sht= new StreamHandlerThread(process.getInputStream(), outBuff);
        sht.start();

        // Wait for is InterruptedException sensitive, so when you want the job to stop, interrupt the thread.
        exitVal = process.waitFor();
        sht.join();
        postProcessing();
        log.info("exitValue: %d", exitVal);
    } catch (InterruptedException ex) {
        log.error("interrupted " + Thread.currentThread().getName(), ex);
        shutdownProcess();

关机过程:

private void shutdownProcess() {
    postProcessing();
    sht.interrupt();
    sht.join();
}

后处理:

    private void postProcessing() {
    if (process != null) {
        closeTheStream(process.getErrorStream());
        closeTheStream(process.getInputStream());
        closeTheStream(process.getOutputStream());
        process.destroy();
    }
}

【讨论】:

  • 您不需要流处理程序线程 - OstermillerUtils 的 ExecHelper 已经存在了很长时间并且为我提供了良好的服务(带有中止超时的调整版本是 here)。跨度>
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-04
  • 1970-01-01
  • 1970-01-01
  • 2015-11-07
  • 1970-01-01
相关资源
最近更新 更多