【问题标题】:Missing characters while reading input with threads使用线程读取输入时缺少字符
【发布时间】:2017-02-28 01:27:01
【问题描述】:

假设我们有一个脚本,它打开一个文件,然后逐行读取它并将该行打印到终端。我们有一个单线程和一个多线程版本。

问题在于两个脚本的结果输出几乎相同,但不完全相同。在多线程版本中,大约有十行错过了前 2 个字符。我的意思是,如果真正的线是“Stackoverflow Rocks”线,我会得到“ackoverflow Rocks”。

我认为这与某些竞争条件有关,因为如果我调整参数以创建大量的小工人,我会比使用更少和更大的工人得到更多的错误。

单线程是这样的:

$file = "some/file.txt";
open (INPUT, $file) or die "Error: $!\n";

while ($line = <STDIN>) {
    print $line;
}

多线程版本使用线程队列,此实现基于@ikegami 方法:

use threads            qw( async );
use Thread::Queue 3.01 qw( );

use constant NUM_WORKERS    => 4;
use constant WORK_UNIT_SIZE => 100000;

sub worker {
    my ($job) = @_;
    for (@$job) {
        print $_;
    }
}

my $q = Thread::Queue->new();


async { while (defined( my $job = $q->dequeue() )) { worker($job); } }
    for 1..NUM_WORKERS;

my $done = 0;    

while (!$done) {
    my @lines;

    while (@lines < WORK_UNIT_SIZE) {
        my $line = <>;
        if (!defined($line)) {
            $done = 1;
            last;
        }

    push @lines, $line;
}

$q->enqueue(\@lines) if @lines;
}

$q->end();
$_->join for threads->list;

【问题讨论】:

  • 线程中的打印在没有任何同步的情况下命中目标,有时会重叠。您不能(可靠地)从多个线程/进程打印到同一个地方。存储每个线程的结果(比如在一个数组中)并从父线程打印,或者从每个线程写入一个单独的文件,然后将它们连接起来。
  • 感谢您的回复。加强我的研究,我发现了 Thread::Semaphore 库。我不知道这有多可靠,但它似乎正在工作。你知道使用这个@zdim 有什么注意事项吗?
  • 就我所知,它是一个标准工具,我不知道有什么特别的注意事项。 (一般来说,请尊重适当使用 Perl 的解释器线程的正常警告——所以要记住它们并不完全是轻量级的。使用 Thread::Queue 支持这一点,但并不适合我们。)我会说这个问题是关于你在你的应用程序中做了什么。您询问的印刷品显然是实际工作的占位符。什么样的工作?它需要多少沟通?等等。这将决定你是否需要信号量(或者你可能需要或不需要的其他东西)。
  • 我正在读取输入行并从中过滤一些单词(在这里简化问题)。然后我应该将结果行写入一个文件,以便下一个进程可以读取它并使用它们进行新的工作。 @zdim
  • 而且输出不再有序也没关系?

标签: multithreading perl file stdin


【解决方案1】:

我尝试了您的程序并得到了类似(错误)的结果。我在print 周围使用threads::shared 中的lock 而不是Thread::Semaphore,因为它比T::S 更易于使用,即:

use threads;
use threads::shared;
...
my $mtx : shared;

sub worker
{
    my ($job) = @_;
    for (@$job) {
        lock($mtx); # (b)locks
        print $_;
                    # autom. unlocked here
    }
}
...

全局变量$mtx 用作互斥体。它的价值无关紧要,即使undef(如这里)也可以。 对lock 的调用仅在当前没有其他线程持有该变量的锁时才会阻塞并返回。 当它超出范围时,它会自动解锁(从而使lock 返回)。在这个发生的示例中 在for 循环的每一次迭代之后;不需要额外的{…} 块。

现在我们已经同步了print 调用……

但是这也不起作用,因为print 确实缓冲了 I/O(嗯,只有 O)。所以我强制无缓冲输出:

use threads;
use threads::shared;
...
my $mtx : shared;
$| = 1;  # force unbuffered output

sub worker
{
    # as above
}
...

然后它起作用了。令我惊讶的是,我可以删除lock,它仍然有效。也许是偶然的。请注意,如果没有缓冲,您的脚本运行速度会明显变慢。

我的结论是:你是suffering from buffering

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-05-09
    • 2015-07-12
    • 2021-12-21
    • 2014-03-26
    • 2016-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多