一个用于我们目的的示例 shell 脚本(确保使其可执行),将其放在 clojure 项目的根目录中以便于测试:
$ cat dumb.sh
#!/bin/bash
for i in 1 2 3 4 5
do
echo "Loop iteration $i"
sleep 2
done
现在我们将定义要执行的流程,启动它,并获取标准输出 ((.getInputStream process)),一次读取一行并循环直到我们完成。实时阅读。
(defn run-proc
[proc-name arg-string callback]
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(callback line)
(recur))))))
测试:
(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil
此函数将阻塞,对您的callback 的调用也将阻塞;如果您希望它在单独的线程中运行,您可以包装在 future 中:
(future (callback line))
对于基于 core.async 的方法:
(defn run-proc-async
[proc-name arg-string callback]
(let [ch (async/chan 1000 (map callback))]
(async/thread
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(async/>!! ch line)
(recur))))))
ch))
这会将您的 callback 函数作为传感器应用到通道上,结果将放置在函数返回的通道上:
(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
(println "Counted" cnt "characters")
cnt))
#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
(async/<!! *1)
=> 16
在本例中,通道上有一个 1000 的缓冲区。因此,除非您开始从频道中获取信息,否则对 >!! 的调用将在读取 1000 行后阻塞。您也可以将put! 与回调一起使用,但这里有一个内置的 1024 限制,无论如何您都应该处理结果。