【问题标题】:How to perform non-blocking reading stdout from a subprocess in clojure?如何从clojure中的子进程执行非阻塞读取标准输出?
【发布时间】:2017-12-30 17:59:55
【问题描述】:

我希望从 clojure 和 通过标准流与此进程通信。

使用conch 库,我可以 生成并读取进程,并从out 流中读取数据:

(def my-process (sh/proc "my_dumb_process"))
  ; read 10 lines from my-process's stdout. Will block until 10 lines taken
  (take 10 (line-seq (clojure.java.io/reader (:out p))))

我想在我的进程打印时调用一个异步回调 到标准输出 - 只要标准输出流中有数据。

我对 clojure 有点陌生 - 有没有一种惯用的 clojur-ey 方式来做 这?我浏览过 core.async 很好,但我找不到 流的非阻塞解决方案。

【问题讨论】:

  • 很长一段时间——直到 NIO——整个 JVM 没有非阻塞 IO,所以你不能在 any上做异步 I/O > 基于 Java 的语言。因此,最长期的习惯用法将涉及拥有一个执行阻塞 I/O 并将结果推送到 core.async 通道的线程。
  • ...坦率地说,我很难争辩说这里有充分的理由不遵循这种模式。毕竟,线程比它正在读取其输出的重量级进程要方式便宜。

标签: multithreading asynchronous clojure process ipc


【解决方案1】:

一个用于我们目的的示例 shell 脚本(确保使其可执行),将其放在 clojure 项目的根目录中以便于测试:

$ cat dumb.sh
#!/bin/bash

for i in 1 2 3 4 5
do
    echo "Loop iteration $i"
    sleep 2
done

现在我们将定义要执行的流程,启动它,并获取标准输出 ((.getInputStream process)),一次读取一行并循环直到我们完成。实时阅读。

(defn run-proc
  [proc-name arg-string callback]
  (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
        process (.start pbuilder)]
    (with-open [reader (clojure.java.io/reader (.getInputStream process))]
      (loop []
        (when-let [line (.readLine ^java.io.BufferedReader reader)]
          (callback line)
          (recur))))))

测试:

(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil

此函数将阻塞,对您的callback 的调用也将阻塞;如果您希望它在单独的线程中运行,您可以包装在 future 中:

(future (callback line))

对于基于 core.async 的方法:

(defn run-proc-async
  [proc-name arg-string callback]
  (let [ch (async/chan 1000 (map callback))]
    (async/thread
      (let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
            process (.start pbuilder)]
        (with-open [reader (clojure.java.io/reader (.getInputStream process))]
          (loop []
            (when-let [line (.readLine ^java.io.BufferedReader reader)]
              (async/>!! ch line)
              (recur))))))
    ch))

这会将您的 callback 函数作为传感器应用到通道上,结果将放置在函数返回的通道上:

(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
                                  (println "Counted" cnt "characters")
                                  cnt))

#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters

(async/<!! *1)
=> 16

在本例中,通道上有一个 1000 的缓冲区。因此,除非您开始从频道中获取信息,否则对 &gt;!! 的调用将在读取 1000 行后阻塞。您也可以将put! 与回调一起使用,但这里有一个内置的 1024 限制,无论如何您都应该处理结果。

【讨论】:

    【解决方案2】:

    如果您不介意使用库,可以使用 lazy-genyield from the Tupelo library 找到一个简单的解决方案。它的工作方式类似于 Python 中的生成器函数

    (ns tst.demo.core
      (:use demo.core tupelo.test)
      (:require
        [clojure.java.io :as io]
        [tupelo.core :as t]
        [me.raynes.conch.low-level :as cll]
      ))
    (t/refer-tupelo)
    
    (dotest
      (let [proc          (cll/proc "dumb.sh")
            >>            (pretty proc)
            out-lines     (line-seq (io/reader (grab :out proc)))
            lazy-line-seq (lazy-gen
                            (doseq [line out-lines]
                              (yield line))) ]
        (doseq [curr-line lazy-line-seq]
          (spyx curr-line))))
    

    使用与以前相同的dumb.sh,它会产生以下输出:

    {:out  #object[java.lang.UNIXProcess$ProcessPipeInputStream 0x465b16bb "java.lang.UNIXProcess$ProcessPipeInputStream@465b16bb"],
     :in   #object[java.lang.UNIXProcess$ProcessPipeOutputStream 0xfafbc63 "java.lang.UNIXProcess$ProcessPipeOutputStream@fafbc63"],
     :err  #object[java.lang.UNIXProcess$ProcessPipeInputStream 0x59bb8f80 "java.lang.UNIXProcess$ProcessPipeInputStream@59bb8f80"],
     :process  #object[java.lang.UNIXProcess 0x553c74cc "java.lang.UNIXProcess@553c74cc"]}
    
    ; one of these is printed every 2 seconds
    curr-line => "Loop iteration 1"
    curr-line => "Loop iteration 2"
    curr-line => "Loop iteration 3"
    curr-line => "Loop iteration 4"
    curr-line => "Loop iteration 5"
    

    lazy-gen 中的所有内容都使用core.async在单独的线程中运行。 doseq 急切地使用进程输出并使用yield 将其放置在输出惰性序列上。第二个doseq 急切地消耗lazy-gen 的结果在当前线程中,并在可用时立即打印每一行。


    替代解决方案:

    一个更简单的解决方案是像这样简单地使用未来:

    (dotest
      (let [proc          (cll/proc "dumb.sh")
            out-lines     (line-seq (io/reader (grab :out proc))) ]
        (future
          (doseq [curr-line out-lines]
            (spyx curr-line)))))
    

    结果相同:

    curr-line => "Loop iteration 1"
    curr-line => "Loop iteration 2"
    curr-line => "Loop iteration 3"
    curr-line => "Loop iteration 4"
    curr-line => "Loop iteration 5"
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-03-29
      • 2016-07-28
      • 2011-09-18
      • 2018-06-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多