【问题标题】:How to chunk shell script input by time, not by size?如何按时间而不是按大小对 shell 脚本输入进行分块?
【发布时间】:2019-10-12 08:43:26
【问题描述】:

在 bash 脚本中,我使用了多生产者单消费者模式。生产者是将行写入fifo(通过GNU Parallel)的后台进程。消费者从 fifo 中读取所有行,然后排序、过滤并将格式化的结果打印到标准输出。

但是,获得完整结果可能需要很长时间。生产者通常在前几个结果上很快,但随后会放慢速度。在这里,我更感兴趣的是每隔几秒查看一次数据块,每个数据块都单独排序和过滤。

mkfifo fifo
parallel ... >"$fifo" &
while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
  process "$chunk"
done

循环将一直运行,直到所有生产者都完成并读取所有输入。读取每个块,直到 5 秒内没有新数据,或者直到块开始后 10 秒。如果 10 秒内没有新数据,块也可能为空。

我试图让它像这样工作:

output=$(mktemp)
while true; do
  wasTimeout=0 interruptAt=$(( $(date '+%s') + 10 ))
  while true; do
    IFS= read -r -t5 <>"${fifo}"
    rc="$?"
    if [[ "${rc}" -gt 0 ]]; then
      [[ "${rc}" -gt 128 ]] && wasTimeout=1
      break
    fi
    echo "$REPLY" >>"${output}"
    if [[ $(date '+%s') -ge "${interruptAt}" ]]; then
      wasTimeout=1
      break
    fi
  done
  echo '---' >>"${output}"
  [[ "${wasTimeout}" -eq 0 ]] && break
done

尝试了一些变体。在上面的表格中,它读取第一个块,然后永远循环。如果我使用&lt;"${fifo}"(如上所述没有读/写),它会在第一个块之后阻塞。也许所有这些都可以用buffer 和/或stdbuf 来简化?但是它们都按大小而不是时间来定义块。

【问题讨论】:

  • 有一点,在 C 中编写代码比在 shell 中更容易。我的直觉说这种分块功能已经达到了这一点。您必须处理一个问题——您需要一个进程来保持 FIFO 打开以供读取(而不实际读取任何数据),以便在 C 程序打开 FIFO、读取块、关闭时不会丢失数据FIFO(可能通过退出隐含)并返回。其他人可能会有所不同。
  • @JonathanLeffler:到目前为止,Bash 可以处理我扔给它的所有东西,在我尝试解决它之前,这个问题看起来是无害的。我想知道一个只有read -t3 的子shell,从一个fifo 读取,缓冲读取,然后写入另一个是否可以工作。它如何区分 a) fifo 不再有作者,没有更多数据来自 b) fifo 是空的,但稍后可能会有更多数据到达...如果没有更多的作者,我不想再开始一个 read先进先出。
  • 是的,这是一个棘手的问题,而不是一个容易在 shell 脚本中编程的问题。我自动使用 C;等效的程序当然可以使用 C++ 和 Perl 编写,也可能是 Python 和 Java(也很可能是其他语言)。

标签: bash buffer chunking


【解决方案1】:

这不是一个容易解决的问题。正如我所暗示的那样,C 程序(或使用除 shell 之外的某种编程语言的程序)可能是最好的解决方案。一些复杂的因素是:

  • 超时读取。
  • 如果数据很快到达,超时就会改变。
  • 不同的系统有不同的间隔定时函数集:
    • alarm() 可能随处可见,但只有 1 秒的分辨率,容易累积舍入误差。 (使用make UFLAGS=-DUSE_ALARM 编译此版本;在macOS 上,使用make UFLAGS=-DUSE_ALARM LDLIB2=。)
    • setitimer() 使用微秒计时和struct timeval 类型。 (使用make UFLAGS=-DUSE_SETITIMER编译此版本;在macOS上使用make UFLAGS=-DUSE_SETITIMER LDLIB2=编译。)
    • timer_create()timer_settime() 等使用现代纳秒类型struct timespec。这在 Linux 上可用;它不适用于 macOS 10.14.5 Mojave 或更早版本。 (使用 make 编译此版本;它不适用于 macOS。)

程序使用信息是:

$ chunker79 -h
Usage: chunker79 [-hvV][-c chunk][-d delay][-f file]
  -c chunk  Maximum time to wait for data in a chunk (default 10)
  -d delay  Maximum delay after line read (default: 5)
  -f file   Read from file instead of standard input
  -h        Print this help message and exit
  -v        Verbose mode: print timing information to stderr
  -V        Print version information and exit

$

此代码可在我在 GitHub 上的 SOQ(堆栈溢出问题)存储库中以文件 chunker79.c 的形式在 src/so-5631-4784 子目录中找到。您还需要 src/libsoq 目录中的一些支持代码。

/*
@(#)File:           chunker79.c
@(#)Purpose:        Chunk Reader for SO 5631-4784
@(#)Author:         J Leffler
@(#)Copyright:      (C) JLSS 2019
*/

/*TABSTOP=4*/

/*
** Problem specification from the Stack Overflow question
**
** In a bash script I am using a many-producer single-consumer pattern.
** Producers are background processes writing lines into a fifo (via GNU
** Parallel).  The consumer reads all lines from the fifo, then sorts,
** filters, and prints the formatted result to stdout.
**
** However, it could take a long time until the full result is
** available.  Producers are usually fast on the first few results but
** then would slow down.  Here I am more interested to see chunks of
** data every few seconds, each sorted and filtered individually.
**
**    mkfifo fifo
**    parallel ... >"$fifo" &
**    while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
**      process "$chunk"
**    done
**
** The loop would run until all producers are done and all input is
** read.  Each chunk is read until there has been no new data for 5s, or
** until 10s have passed since the chunk was started.  A chunk may also
** be empty if there was no new data for 10s.
*/

/*
** Analysis
**
** 1.  If no data arrives at all for 10 seconds, then the program should
**     terminate producing no output.  This timeout is controlled by the
**     value of time_chunk in the code.
** 2.  If data arrives more or less consistently, then the collection
**     should continue for 10s and then finish.  This timeout is also
**     controlled by the value of time_chunk in the code.
** 3.  If a line of data arrives before 5 seconds have elapsed, and no
**     more arrives for 5 seconds, then the collection should finish.
**     (If the first line arrives after 5 seconds and no more arrives
**     for more than 5 seconds, then the 10 second timeout cuts in.)
**     This timeout is controlled by the value of time_delay in the code.
** 4.  This means that we want two separate timers at work:
**     - Chunk timer (started when the program starts).
**     - Delay timer (started each time a line is read).
**
** It doesn't matter which timer goes off, but further timer signals
** should be ignored.  External signals will confuse things; tough!
**
** -- Using alarm(2) is tricky because it provides only one time, not two.
** -- Using getitimer(2), setitimer(2) uses obsolescent POSIX functions,
**    but these are available on macOS.
** -- Using timer_create(2), timer_destroy(2), timer_settime(2),
**    timer_gettime(2) uses current POSIX function but is not available
**    on macOS.
*/

#include "posixver.h"

#include "stderr.h"
#include "timespec_io.h"
#include <assert.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <time.h>
#include <unistd.h>

#ifdef USE_SETITIMER
#include "timeval_math.h"
#include "timeval_io.h"
#include <sys/time.h>
#endif /* USE_SETITIMER */

static const char optstr[] = "hvVc:d:f:";
static const char usestr[] = "[-hvV][-c chunk][-d delay][-f file]";
static const char hlpstr[] =
    "  -c chunk  Maximum time to wait for data in a chunk (default 10)\n"
    "  -d delay  Maximum delay after line read (default: 5)\n"
    "  -f file   Read from file instead of standard input\n"
    "  -h        Print this help message and exit\n"
    "  -v        Verbose mode: print timing information to stderr\n"
    "  -V        Print version information and exit\n"
    ;

static struct timespec time_delay = { .tv_sec =  5, .tv_nsec = 0 };
static struct timespec time_chunk = { .tv_sec = 10, .tv_nsec = 0 };
static struct timespec time_start;

static bool verbose = false;

static void set_chunk_timeout(void);
static void set_delay_timeout(void);
static void cancel_timeout(void);
static void alarm_handler(int signum);

// Using signal() manages to set SA_RESTART on a Mac.
// This is allowed by standard C and POSIX, sadly.
// signal(SIGALRM, alarm_handler);

#if defined(USE_ALARM)

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    alarm(time_chunk.tv_sec);
    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    unsigned time_left = alarm(0);
    if (time_left > time_delay.tv_sec)
        alarm(time_delay.tv_sec);
    else
        alarm(time_left);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    alarm(0);
    signal(SIGALRM, SIG_IGN);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

#elif defined(USE_SETITIMER)

static inline struct timeval cvt_timespec_to_timeval(struct timespec ts)
{
    return (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 };
}

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
    tv_new.it_value = cvt_timespec_to_timeval(time_chunk);
    struct itimerval tv_old;
    if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
        err_syserr("failed to set interval timer: ");
    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_until;
    if (getitimer(ITIMER_REAL, &tv_until) != 0)
        err_syserr("failed to set interval timer: ");
    struct timeval tv_delay = cvt_timespec_to_timeval(time_delay);

    if (verbose)
    {
        char buff1[32];
        fmt_timeval(&tv_delay, 6, buff1, sizeof(buff1));
        char buff2[32];
        fmt_timeval(&tv_until.it_value, 6, buff2, sizeof(buff2));
        err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
    }

    if (cmp_timeval(tv_until.it_value, tv_delay) <= 0)
    {
        if (verbose)
            err_remark("---- %s(): no need for delay timer\n", __func__);
    }
    else
    {
        struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
        tv_new.it_value = cvt_timespec_to_timeval(time_delay);
        struct itimerval tv_old;
        if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
            err_syserr("failed to set interval timer: ");
        if (verbose)
            err_remark("---- %s(): set delay timer\n", __func__);
    }
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_new =
    {
        .it_value    = { .tv_sec = 0, .tv_usec = 0 },
        .it_interval = { .tv_sec = 0, .tv_usec = 0 },
    };
    struct itimerval tv_old;
    if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
        err_syserr("failed to set interval timer: ");
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

#else /* USE_TIMER_GETTIME */

#include "timespec_math.h"

static timer_t t0 = { 0 };

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);

    struct sigevent ev =
    {
        .sigev_notify = SIGEV_SIGNAL,
        .sigev_signo = SIGALRM,
        .sigev_value.sival_int = 0,
        .sigev_notify_function = 0,
        .sigev_notify_attributes = 0,
    };
    if (timer_create(CLOCK_REALTIME, &ev, &t0) < 0)
        err_syserr("failed to create a timer: ");

    struct itimerspec it =
    {
        .it_interval = { .tv_sec = 0, .tv_nsec = 0 },
        .it_value = time_chunk,
    };
    struct itimerspec ot;
    if (timer_settime(t0, 0, &it, &ot) != 0)
        err_syserr("failed to activate timer: ");

    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerspec time_until;
    if (timer_gettime(t0, &time_until) != 0)
        err_syserr("failed to set per-process timer: ");

    char buff1[32];
    fmt_timespec(&time_delay, 6, buff1, sizeof(buff1));
    char buff2[32];
    fmt_timespec(&time_until.it_value, 6, buff2, sizeof(buff2));
    err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);

    if (cmp_timespec(time_until.it_value, time_delay) <= 0)
    {
        if (verbose)
            err_remark("---- %s(): no need for delay timer\n", __func__);
    }
    else
    {
        struct itimerspec time_new =
        {
            .it_interval = { .tv_sec = 0, .tv_nsec = 0 },
            .it_value = time_delay,
        };
        struct itimerspec time_old;
        if (timer_settime(t0, 0, &time_new, &time_old) != 0)
            err_syserr("failed to set per-process timer: ");
        if (verbose)
            err_remark("---- %s(): set delay timer\n", __func__);
    }
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (timer_delete(t0) != 0)
        err_syserr("failed to delete timer: ");
}

#endif /* Timing mode */

/* Writing to stderr via err_remark() is not officially supported */
static void alarm_handler(int signum)
{
    assert(signum == SIGALRM);
    if (verbose)
        err_remark("---- %s(): signal %d\n", __func__, signum);
}

static void read_chunks(FILE *fp)
{
    size_t num_data = 0;
    size_t max_data = 0;
    struct iovec *data = 0;
    size_t buflen = 0;
    char *buffer = 0;
    ssize_t length;
    size_t chunk_len = 0;

    clock_gettime(CLOCK_REALTIME, &time_start);

    set_chunk_timeout();
    while ((length = getline(&buffer, &buflen, fp)) != -1)
    {
        if (num_data >= max_data)
        {
            size_t new_size = (num_data * 2) + 2;
            void *newspace = realloc(data, new_size * sizeof(data[0]));
            if (newspace == 0)
                err_syserr("failed to allocate %zu bytes data: ", new_size * sizeof(data[0]));
            data = newspace;
            max_data = new_size;
        }
        data[num_data].iov_base = buffer;
        data[num_data].iov_len = length;
        num_data++;
        if (verbose)
            err_remark("Received line %zu\n", num_data);
        chunk_len += length;
        buffer = 0;
        buflen = 0;
        set_delay_timeout();
    }
    cancel_timeout();

    if (chunk_len > 0)
    {
        if ((length = writev(STDOUT_FILENO, data, num_data)) < 0)
            err_syserr("failed to write %zu bytes to standard output: ", chunk_len);
        else if ((size_t)length != chunk_len)
            err_error("failed to write %zu bytes to standard output "
                      "(short write of %zu bytes)\n", chunk_len, (size_t)length);
    }

    if (verbose)
        err_remark("---- %s(): data written (%zu bytes)\n", __func__, length);

    for (size_t i = 0; i < num_data; i++)
        free(data[i].iov_base);
    free(data);
    free(buffer);
}

int main(int argc, char **argv)
{
    const char *name = "(standard input)";
    FILE *fp = stdin;
    err_setarg0(argv[0]);
    err_setlogopts(ERR_MICRO);

    int opt;
    while ((opt = getopt(argc, argv, optstr)) != -1)
    {
        switch (opt)
        {
        case 'c':
            if (scn_timespec(optarg, &time_chunk) != 0)
                err_error("Failed to convert '%s' into a time value\n", optarg);
            break;
        case 'd':
            if (scn_timespec(optarg, &time_delay) != 0)
                err_error("Failed to convert '%s' into a time value\n", optarg);
            break;
        case 'f':
            if ((fp = fopen(optarg, "r")) == 0)
                err_syserr("Failed to open file '%s' for reading: ", optarg);
            name = optarg;
            break;
        case 'h':
            err_help(usestr, hlpstr);
            /*NOTREACHED*/
        case 'v':
            verbose = true;
            break;
        case 'V':
            err_version("CHUNKER79", &"@(#)$Revision$ ($Date$)"[4]);
            /*NOTREACHED*/
        default:
            err_usage(usestr);
            /*NOTREACHED*/
        }
    }

    if (optind != argc)
        err_usage(usestr);

    if (verbose)
    {
        err_remark("chunk: %3lld.%09ld\n", (long long)time_chunk.tv_sec, time_chunk.tv_nsec);
        err_remark("delay: %3lld.%09ld\n", (long long)time_delay.tv_sec, time_delay.tv_nsec);
        err_remark("file:  %s\n", name);
    }

    read_chunks(fp);

    return 0;
}

我的 SOQ 存储库还有一个脚本 gen-data.sh,它使用一些自定义程序来生成这样的数据流(种子值写入标准错误,而不是标准输出):

$ gen-data.sh
# Seed: 1313715286
2019-06-03 23:04:16.653: Zunmieoprri Rdviqymcho 5878 2017-03-29 03:59:15 Udransnadioiaeamprirteo
2019-06-03 23:04:18.525: Rndflseoevhgs Etlaevieripeoetrnwkn 9500 2015-12-18 10:49:15 Ebyrcoebeezatiagpleieoefyc
2019-06-03 23:04:20.526: Nrzsuiakrooab Nbvliinfqidbujoops 1974 2020-05-13 08:05:14 Lgithearril
2019-06-03 23:04:21.777: Eeagop Aieneose 6533 2016-11-06 22:51:58 Aoejlwebbssroncmeovtuuueigraa
2019-06-03 23:04:23.876: Izirdoeektau Atesltiybysaclee 4557 2020-09-13 02:24:46 Igrooiaauiwtna
2019-06-03 23:04:26.145: Yhioit Eamrexuabagsaraiw 9703 2014-09-13 07:44:12 Dyiiienglolqopnrbneerltnmsdn
^C
$

当使用默认选项输入 chunker79 时,我得到如下输出:

$ gen-data.sh | chunker79
# Seed: 722907235
2019-06-03 23:06:20.570: Aluaezkgiebeewal Oyvahee 1022 2015-08-12 07:45:54 Weuababeeduklleym
2019-06-03 23:06:24.100: Gmujvoyevihvoilc Negeiiuvleem 8196 2015-08-29 21:15:15 Nztkrvsadeoeagjgoyotvertavedi
$

如果您分析时间间隔(查看输出行中的前两个字段),则该输出符合规范。更详细的分析如下所示:

$ timecmd -mr -- gen-data.sh | timecmd -mr -- chunker79
2019-06-03 23:09:14.246 [PID 57159] gen-data.sh
2019-06-03 23:09:14.246 [PID 57160] chunker79
# Seed: -1077610201
2019-06-03 23:09:14.269: Woreio Rdtpimvoscttbyhxim 7893 2017-03-12 12:46:57 Uywaietirkekes
2019-06-03 23:09:16.939: Uigaba Nzoxdeuisofai 3630 2017-11-16 09:28:59 Jnsncgoesycsevdscugoathusaoq
2019-06-03 23:09:17.845: Sscreua Aloaoonnsuur 5163 2016-08-13 19:47:15 Injhsiifqovbnyeooiimitaaoir
2019-06-03 23:09:19.272 [PID 57160; status 0x0000]  -  5.026s  -  chunker79
2019-06-03 23:09:22.084 [PID 57159; status 0x8D00]  -  7.838s  -  gen-data.sh
$

chunker79 的输出出现和gen-data.sh 完成之间存在明显的暂停。这是由于 Bash 等待管道中的所有进程完成,而 gen-data.sh 直到下一次在完成 chunker79 的消息之后写入管道时才会完成。这是此测试设置的产物;它不会是问题中概述的 shell 脚本中的一个因素。

【讨论】:

  • 哇。这比我所希望的要多得多。我今晚去看看。
  • 在我自欺欺人几次之后,我可以确认这与宣传的一样有效,再次感谢。我不能使用 gendata.sh(运球手?),但为了这个目的,一些简单的东西就足够了:while sleep 1s; do echo $(( c++ )); done | while chunk=$(./chunker79); do echo -e "${chunk}\n---"; done(以及这个的变体)。没有收到数据后还需要决定是否再次调用(如果最后一个chunk为空,fifo还有写过程吗?)。但我会简单地附加一个 EOF 标记并检查它。
  • gen-data.sh 使用的所有三个程序(randomdribblertstamp)都是我创建的 C 程序(尽管我不会惊讶地发现其他人使用不同的程序称为 @ 987654355@)。可根据要求提供代码 - 请参阅我的个人资料。 random 程序有一个相当好的内核,但还没有一个好的接口。
  • 您可以创建一个简单的程序(当然在 C 中,甚至可能在 shell 中——GNU timeout 可能会有所帮助)尝试打开 FIFO,但如果花费超过 N 秒(其中 N 可能是 1,或者可能是 5) 是成功的。这意味着没有打开 FIFO 进行写入的进程。您可能需要一个潜伏在后台的进程,并打开 FIFO 进行读取——当您没有检测到任何写入者时,您会杀死那个潜伏者。也许你会删除 FIFO。
【解决方案2】:

我会考虑编写一个带队列的安全多线程程序。

我更了解 Java,但可能还有更现代的合适语言,例如 Go 和 Kotlin。

【讨论】:

  • 其实我是先考虑Java的。在这里,我正在与一个 C 库进行交互,并且需要将其移植到 Java 或开始使用 JNI。这两种选择似乎都不太令人愉快。也许我会尝试一下 Rust。
【解决方案3】:

类似这样的:

#!/usr/bin/perl

$timeout = 3;
while(<STDIN>) {
    # Make sure there is some input                                                      
    push @out,$_;
    eval {
        local $SIG{ALRM} = sub { die };
        alarm $timeout;
        while(<STDIN>) {
            alarm $timeout;
            push @out,$_;
        }
        alarm 0;
    };
    system "echo","process",@out;
}

【讨论】:

  • 感谢您的建议(以及进行并行处理:)。我不介意使用 Perl,只要我不必理解所有这些……我尝试了上面的方法,但它有一些问题。考虑一下这个测试:c=0; while sleep 1s; do echo $(( c++ )); done | ./x.pl。它应该打印在最多 3 秒(如果没有收到输入,则为 10 秒)之后得到的任何内容,但不输出任何内容。将睡眠更改为 4 秒,现在超时工作正常,但每个输出都重复之前的输出加上来自 stdin 的新输入。
【解决方案4】:

GNU Parallel 20200122 引入 --blocktimeout (--bt):

find ~ | parallel -j3 --bt 2s --pipe wc

这就像普通的 GNU Parallel 一样工作,除非它需要超过 2 秒来填充一个块。在这种情况下,到目前为止读取的块只是传递给wc(除非它是空的)。

它有一个稍微奇怪的启动行为:你必须等待 3*2s (jobslots*timeout) 才能稳定输出,并且你至少每 2s 得到一个输出。

【讨论】:

    猜你喜欢
    • 2022-01-23
    • 2015-10-10
    • 2014-03-23
    • 2016-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-06
    • 2012-05-06
    相关资源
    最近更新 更多