【问题标题】:how to write a process-pool bash shell如何编写进程池 bash shell
【发布时间】:2011-06-22 14:23:59
【问题描述】:

我有10多个任务要执行,系统限制最多可以同时运行4个任务。

我的任务可以像这样开始: myprog 任务名

如何编写 bash shell 脚本来运行这些任务。最重要的是,当一个任务完成后,脚本可以立即启动另一个任务,使正在运行的任务计数一直保持为 4。

【问题讨论】:

    标签: bash shell multiprocessing sh multiprocess


    【解决方案1】:

    使用xargs:

    xargs -P <maximum-number-of-process-at-a-time> -n <arguments-per-process> <command>
    

    详情here

    【讨论】:

    • 引用的文章还提供了有关“make -j ”命令的信息。很有可能汇集许多部分依赖的作品。
    【解决方案2】:

    我在编写自己的进程池时偶然发现了这个线程,并且特别喜欢 Brandon Horsley 的解决方案,虽然我无法让信号正常工作,所以我从 Apache 中汲取灵感并决定尝试一个 pre-fork 模型一个先进先出作为我的工作队列。

    下面的函数是worker进程在fork时运行的函数。

    # \brief the worker function that is called when we fork off worker processes
    # \param[in] id  the worker ID
    # \param[in] job_queue  the fifo to read jobs from
    # \param[in] result_log  the temporary log file to write exit codes to
    function _job_pool_worker()
    {
        local id=$1
        local job_queue=$2
        local result_log=$3
        local line=
    
        exec 7<> ${job_queue}
        while [[ "${line}" != "${job_pool_end_of_jobs}" && -e "${job_queue}" ]]; do
            # workers block on the exclusive lock to read the job queue
            flock --exclusive 7
            read line <${job_queue}
            flock --unlock 7
            # the worker should exit if it sees the end-of-job marker or run the
            # job otherwise and save its exit code to the result log.
            if [[ "${line}" == "${job_pool_end_of_jobs}" ]]; then
                # write it one more time for the next sibling so that everyone
                # will know we are exiting.
                echo "${line}" >&7
            else
                _job_pool_echo "### _job_pool_worker-${id}: ${line}"
                # run the job
                { ${line} ; } 
                # now check the exit code and prepend "ERROR" to the result log entry
                # which we will use to count errors and then strip out later.
                local result=$?
                local status=
                if [[ "${result}" != "0" ]]; then
                    status=ERROR
                fi  
                # now write the error to the log, making sure multiple processes
                # don't trample over each other.
                exec 8<> ${result_log}
                flock --exclusive 8
                echo "${status}job_pool: exited ${result}: ${line}" >> ${result_log}
                flock --unlock 8
                exec 8>&-
                _job_pool_echo "### _job_pool_worker-${id}: exited ${result}: ${line}"
            fi  
        done
        exec 7>&-
    }
    

    你可以在 Github 上get a copy of my solution。这是一个使用我的实现的示例程序。

    #!/bin/bash
    
    . job_pool.sh
    
    function foobar()
    {
        # do something
        true
    }   
    
    # initialize the job pool to allow 3 parallel jobs and echo commands
    job_pool_init 3 0
    
    # run jobs
    job_pool_run sleep 1
    job_pool_run sleep 2
    job_pool_run sleep 3
    job_pool_run foobar
    job_pool_run foobar
    job_pool_run /bin/false
    
    # wait until all jobs complete before continuing
    job_pool_wait
    
    # more jobs
    job_pool_run /bin/false
    job_pool_run sleep 1
    job_pool_run sleep 2
    job_pool_run foobar
    
    # don't forget to shut down the job pool
    job_pool_shutdown
    
    # check the $job_pool_nerrors for the number of jobs that exited non-zero
    echo "job_pool_nerrors: ${job_pool_nerrors}"
    

    希望这会有所帮助!

    【讨论】:

      【解决方案3】:

      使用 GNU Parallel 你可以做到:

      cat tasks | parallel -j4 myprog
      

      如果你有 4 个核心,你甚至可以这样做:

      cat tasks | parallel myprog
      

      来自http://git.savannah.gnu.org/cgit/parallel.git/tree/README

      完整安装

      GNU Parallel 的完整安装非常简单:

      ./configure && make && make install
      

      个人安装

      如果您不是 root,您可以将 ~/bin 添加到您的路径并安装在 ~/bin 和 ~/share:

      ./configure --prefix=$HOME && make && make install
      

      或者,如果您的系统缺少“make”,您可以简单地复制 src/parallel src/sem src/niceload src/sql 到你路径中的一个目录。

      最小安装

      如果您只需要并行并且没有安装“make”(也许 旧系统或 Microsoft Windows):

      wget http://git.savannah.gnu.org/cgit/parallel.git/plain/src/parallel
      chmod 755 parallel
      cp parallel sem
      mv parallel sem dir-in-your-$PATH/bin/
      

      测试安装

      在这之后你应该能够做到:

      parallel -j0 ping -nc 3 ::: foss.org.my gnu.org freenetproject.org
      

      这将并行发送 3 个 ping 数据包到 3 个不同的主机并打印 完成后的输出。

      观看介绍视频以快速了解: https://www.youtube.com/playlist?list=PL284C9FF2488BC6D1

      【讨论】:

      • 哦,安装中涉及的安装细节太多了
      • 感谢上帝的免费软件。您需要的每一件事都已经完美实现。
      • 我刚刚阅读了“并行”的联机帮助页。它对输出进行重新排序,甚至可以与远程机器一起使用。这东西ROCKS
      • 多个参数:seq 100 | parallel echo '{}'
      • 您应该添加一些不带“命令”的使用示例。我浪费了 5 分钟在 parallel man 中找到该信息:'parallel -j 100
      【解决方案4】:

      我建议编写四个脚本,每个脚本依次执行一定数量的任务。然后编写另一个脚本,并行启动四个脚本。例如,如果您有脚本,script1.sh、script2.sh、script3.sh 和 script4.sh,您可以有一个名为 headscript.sh 的脚本,如下所示。

      #!/bin/sh
      ./script1.sh & 
      ./script2.sh & 
      ./script3.sh & 
      ./script4.sh &
      

      【讨论】:

      • 这是最简单的解决方案,但如果四个脚本的工作量大致相同,则效果最好。如果作业长度不可预测,两个脚本可能已经完成,但另外两个可能还有很多任务。其他解决方案正确地重新分配工作负载。
      【解决方案5】:

      我使用众所周知的 xargs 工具的内置功能找到了A Foo Walks into a Bar... blog 中提出的最佳解决方案 首先创建一个文件commands.txt,其中包含您要执行的命令列表

      myprog taskname1
      myprog taskname2
      myprog taskname3
      myprog taskname4
      ...
      myprog taskname123
      

      然后像这样将它通过管道传输到 xargs 以在 4 个进程池中执行:

      cat commands.txt | xargs -I CMD --max-procs=4 bash -c CMD
      

      你可以修改进程号

      【讨论】:

        【解决方案6】:

        按照@Parag Sardas' 的回答和此处链接的文档是您可能想要添加到.bash_aliases 的快速脚本。

        重新链接 doc link,因为它值得一读

        #!/bin/bash
        # https://stackoverflow.com/a/19618159
        # https://stackoverflow.com/a/51861820
        #
        # Example file contents:
        # touch /tmp/a.txt
        # touch /tmp/b.txt
        
        if [ "$#" -eq 0 ];  then
          echo "$0 <file> [max-procs=0]"
          exit 1
        fi
        
        FILE=${1}
        MAX_PROCS=${2:-0}
        cat $FILE | while read line; do printf "%q\n" "$line"; done | xargs --max-procs=$MAX_PROCS -I CMD bash -c CMD
        

        ./xargs-parallel.sh jobs.txt 4从jobs.txt读取最多4个进程

        【讨论】:

          【解决方案7】:

          你可以用信号做一些聪明的事情。

          请注意,这只是为了说明概念,因此未经彻底测试。

          #!/usr/local/bin/bash
          
          this_pid="$$"
          jobs_running=0
          sleep_pid=
          
          # Catch alarm signals to adjust the number of running jobs
          trap 'decrement_jobs' SIGALRM
          
          # When a job finishes, decrement the total and kill the sleep process
          decrement_jobs()
          {
            jobs_running=$(($jobs_running - 1))
            if [ -n "${sleep_pid}" ]
            then
              kill -s SIGKILL "${sleep_pid}"
              sleep_pid=
            fi
          }
          
          # Check to see if the max jobs are running, if so sleep until woken
          launch_task()
          {
            if [ ${jobs_running} -gt 3 ]
            then
              (
                while true
                do
                  sleep 999
                done
              ) &
              sleep_pid=$!
              wait ${sleep_pid}
            fi
          
            # Launch the requested task, signalling the parent upon completion
            (
              "$@"
              kill -s SIGALRM "${this_pid}"
            ) &
            jobs_running=$((${jobs_running} + 1))
          }
          
          # Launch all of the tasks, this can be in a loop, etc.
          launch_task task1
          launch_task tast2
          ...
          launch_task task99
          

          【讨论】:

          • 感谢您的回答,它可以工作,但我的系统仍然存在一些问题。
          • @Brandon Horsley 嗨,我收到以下错误:第 37 行:kill: `': not a pid or valid job spec In code : kill -s SIGALRM "${this_pid}" 你能帮忙吗我为什么会得到这个?
          【解决方案8】:

          这个经过测试的脚本一次运行 5 个作业,并会在它运行后立即重新启动一个新作业(由于在我们获得 SIGCHLD 时会终止 sleep 10.9。更简单的版本可以使用直接轮询(更改sleep 10.9 到 sleep 1 并摆脱陷阱)。

          #!/usr/bin/bash
          
          set -o monitor
          trap "pkill -P $$ -f 'sleep 10\.9' >&/dev/null" SIGCHLD
          
          totaljobs=15
          numjobs=5
          worktime=10
          curjobs=0
          declare -A pidlist
          
          dojob()
          {
            slot=$1
            time=$(echo "$RANDOM * 10 / 32768" | bc -l)
            echo Starting job $slot with args $time
            sleep $time &
            pidlist[$slot]=`jobs -p %%`
            curjobs=$(($curjobs + 1))
            totaljobs=$(($totaljobs - 1))
          }
          
          # start
          while [ $curjobs -lt $numjobs -a $totaljobs -gt 0 ]
           do
            dojob $curjobs
           done
          
          # Poll for jobs to die, restarting while we have them
          while [ $totaljobs -gt 0 ]
           do
            for ((i=0;$i < $curjobs;i++))
             do
              if ! kill -0 ${pidlist[$i]} >&/dev/null
               then
                dojob $i
                break
               fi
             done
             sleep 10.9 >&/dev/null
           done
          wait
          

          【讨论】:

            【解决方案9】:

            关于 4 个 shell 脚本的其他答案并不完全让我满意,因为它假设所有任务都需要大约相同的时间,并且因为它需要手动设置。但这是我要改进的方法。

            主脚本将按照特定的命名约定创建指向可执行文件的符号链接。例如,

            ln -s executable1 ./01-task.01
            

            第一个前缀用于排序,后缀标识批次(01-04)。 现在我们生成 4 个 shell 脚本,它们将批号作为输入并执行类似的操作

            for t in $(ls ./*-task.$batch | sort ; do
               t
               rm t
            done
            

            【讨论】:

              【解决方案10】:

              看看我在 bash 中的作业池实现:https://github.com/spektom/shell-utils/blob/master/jp.sh

              例如,当从大量 URL 下载时,最多运行 3 个 cURL 进程,您可以将 cURL 命令包装如下:

              ./jp.sh "My Download Pool" 3 curl http://site1/...
              ./jp.sh "My Download Pool" 3 curl http://site2/...
              ./jp.sh "My Download Pool" 3 curl http://site3/...
              ...
              

              【讨论】:

                【解决方案11】:

                这是我的解决方案。这个想法很简单。我创建了一个fifo 作为信号量,其中每一行代表一个可用资源。当read队列时,如果没有剩余,主进程将阻塞。而且,我们在任务完成后通过简单地echoing 任何东西到队列中返回资源。

                function task() {
                    local task_no="$1"
                    # doing the actual task...
                    echo "Executing Task ${task_no}"
                    # which takes a long time
                    sleep 1
                }
                
                function execute_concurrently() {
                    local tasks="$1"
                    local ps_pool_size="$2"
                
                    # create an anonymous fifo as a Semaphore
                    local sema_fifo
                    sema_fifo="$(mktemp -u)"
                    mkfifo "${sema_fifo}"
                    exec 3<>"${sema_fifo}"
                    rm -f "${sema_fifo}"
                
                    # every 'x' stands for an available resource
                    for i in $(seq 1 "${ps_pool_size}"); do
                        echo 'x' >&3
                    done
                
                    for task_no in $(seq 1 "${tasks}"); do
                        read dummy <&3 # blocks util a resource is available
                        (
                            trap 'echo x >&3' EXIT # returns the resource on exit
                            task "${task_no}"
                        )&
                    done
                    wait # wait util all forked tasks have finished
                }
                
                execute_concurrently 10 4
                

                上面的脚本将同时运行 10 个任务和 4 个任务。您可以将$(seq 1 "${tasks}") 序列更改为您要运行的实际任务队列。

                【讨论】:

                  【解决方案12】:

                  我根据Writing a process pool in Bash 中介绍的方法进行了修改。

                  #!/bin/bash
                  
                  #set -e   # this doesn't work here for some reason
                  POOL_SIZE=4   # number of workers running in parallel
                  
                  #######################################################################
                  #                            populate jobs                            #
                  #######################################################################
                  
                  declare -a jobs
                  
                  for (( i = 1988; i < 2019; i++ )); do
                      jobs+=($i)
                  done
                  
                  echo '################################################'
                  echo '    Launching jobs'
                  echo '################################################'
                  
                  parallel() {
                      local proc procs jobs cur
                      jobs=("$@")         # input jobs array
                      declare -a procs=() # processes array
                      cur=0               # current job idx
                  
                      morework=true
                      while $morework; do
                          # if process array size < pool size, try forking a new proc
                          if [[ "${#procs[@]}" -lt "$POOL_SIZE" ]]; then
                              if [[ $cur -lt "${#jobs[@]}" ]]; then
                                  proc=${jobs[$cur]}
                                  echo "JOB ID = $cur; JOB = $proc."
                  
                                  ###############
                                  # do job here #
                                  ###############
                  
                                  sleep 3 &
                  
                                  # add to current running processes
                                  procs+=("$!")
                                  # move to the next job
                                  ((cur++))
                              else
                                  morework=false
                                  continue
                              fi
                          fi
                  
                          for n in "${!procs[@]}"; do
                              kill -0 "${procs[n]}" 2>/dev/null && continue
                              # if process is not running anymore, remove from array
                              unset procs[n]
                          done
                      done
                      wait
                  }
                  
                  parallel "${jobs[@]}"
                  

                  【讨论】:

                    猜你喜欢
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    • 2012-08-15
                    • 1970-01-01
                    • 2010-10-16
                    相关资源
                    最近更新 更多