【问题标题】:Java NIO Pipe vs BlockingQueueJava NIO Pipe vs BlockingQueue
【发布时间】:2012-03-26 03:01:35
【问题描述】:

我刚刚发现它只有一个 NIO 工具,Java NIO Pipe,专为在线程之间传递数据而设计。与通过队列传递更传统的消息(例如 ArrayBlockingQueue)相比,使用这种机制有什么优势吗?

【问题讨论】:

  • 管道通过内核,很少有用,因为选择器有唤醒功能......这是通过 linux 上的管道实现的......
  • @bestsss 需要详细说明吗?你可以用选择器注册管道来接收通知,有什么问题?
  • @raffian,简而言之 - 你不能真正将 Pipes 用于 IPC,在这个过程中,有很多更有效的方式来传递信息。

标签: java nio


【解决方案1】:

通常,将数据传递给另一个线程进行处理的最简单方法是使用 ExecutorService。这包含了一个队列和一个线程池(可以有一个线程)

当你有一个支持 NIO 通道的库时,你可以使用管道。如果您想在线程之间传递数据的 ByteBuffers,它也很有用。

否则,使用 ArrayBlockingQueue 通常更简单/更快。

如果您想要一种更快的方式在线程之间交换数据,我建议您查看Exchanger,但它不像 ArrayBlockingQueue 那样通用。

The Exchanger and GC-less Java

【讨论】:

  • 谢谢,我从来没有考虑过使用 Exchanger 可以最大限度地减少 GC 开销。然而,交换器的缺点是它是同步的。通常您只想将数据泵入另一个线程,而不必等待它被拾取。
  • 管道是固定大小的。如果生产者生产速度过快,问题也是一样的,它必须停止。如果生产者在消费者完成之前从未填充缓冲区,则不必停止(在任何一种情况下)
  • 管道用于实现 Selector.wakeup,除此之外它们不是很有用,因为仅内存的解决方案更有效且不经过内核。
  • @Maxaon3000 从技术上讲,Exchanger 并不是真正同步的。它可用于实现双缓冲解决方案。交换的每一方都可以并行处理它们的缓冲区,但它们确实会在交换点阻塞。如果生产者或消费者总是更快,那么 Exchanger 很可能是更好的选择。如果生产者和消费者的性能不同,那么使用队列可以让另一端继续并赶上并有效地平衡延迟。最后,最好将这些选择基于经验数据,而不是猜测最佳解决方案。
【解决方案2】:

我相信 NIO Pipe 的设计目的是让您可以以线程安全的方式将数据发送到选择器循环内的通道,换句话说,任何线程都可以写入管道,并且数据将在另一个极端中处理管道,在选择器循环内。当您写入管道时,您会使另一端的通道可读。

【讨论】:

  • 我想知道使用选择器循环而不是简单的队列轮询在线程之间传递数据的性能特征。此外,通过管道传递数据似乎带来了必须将字节而不是对象传递给其他线程的不便。换句话说,它迫使您开发用于线程间数据交换的有线协议。
  • 您的意思是通过 ConcurrentLinkedQueue,对吗?这是一个很好的问题。我将筹码押在 ConcurrentLinkedQueue 上。 :) 但我看到管道的一个优点是:您可以像其他人一样发送消息,换句话说,您从通道读取而不是从队列中获取对象。
【解决方案3】:

所以在管道(check here)遇到很多麻烦之后,我决定支持非阻塞并发队列而不是 NIO 管道。所以我对 Java 的 ConcurrentLinkedQueue 做了一些基准测试。见下文:

public static void main(String[] args) throws Exception {

    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

    // first test nothing:

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";

        for (int i = 0; i < 1000000; i++) {
            bench.mark();
            // s = queue.poll();
            bench.measure();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // first test empty queue:

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";

        for (int i = 0; i < 1000000; i++) {
            bench.mark();
            s = queue.poll();
            bench.measure();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // now test polling one element on a queue with size one

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";
        String x = "pela";

        for (int i = 0; i < 1000000; i++) {
            queue.offer(x);
            bench.mark();
            s = queue.poll();
            bench.measure();
            if (s != x) throw new Exception("bad!");
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // now test polling one element on a queue with size two

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";
        String x = "pela";

        for (int i = 0; i < 1000000; i++) {
            queue.offer(x);
            queue.offer(x);
            bench.mark();
            s = queue.poll();
            bench.measure();
            if (s != x) throw new Exception("bad!");
            queue.poll();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }
}

结果:

totalLogs=1000000, minTime=0, maxTime=85000, avgTime=58.61 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=5281000, avgTime=63.35 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=725000, avgTime=59.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=25000, avgTime=58.13 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=378000, avgTime=58.45 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=15000, avgTime=57.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=170000, avgTime=58.11 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1495000, avgTime=59.87 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=232000, avgTime=63.0 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=184000, avgTime=57.89 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=2600000, avgTime=65.22 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=850000, avgTime=60.5 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=150000, avgTime=63.83 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=43000, avgTime=59.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=276000, avgTime=60.02 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=457000, avgTime=61.69 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=204000, avgTime=60.44 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=154000, avgTime=63.67 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=355000, avgTime=60.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=338000, avgTime=60.44 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=345000, avgTime=110.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=396000, avgTime=100.32 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=298000, avgTime=98.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1891000, avgTime=101.9 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=254000, avgTime=103.06 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1894000, avgTime=100.97 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=230000, avgTime=99.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=348000, avgTime=99.63 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=922000, avgTime=99.53 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=168000, avgTime=99.12 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=686000, avgTime=107.41 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=320000, avgTime=95.58 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=248000, avgTime=94.94 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=217000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=159000, avgTime=93.62 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=155000, avgTime=95.28 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=106000, avgTime=98.57 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=370000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1836000, avgTime=96.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=212000, avgTime=98.62 (times in nanos)

结论:

maxTime 可能很吓人,但我认为可以安全地得出结论,轮询并发队列的时间在 50 纳秒范围内。

【讨论】:

    【解决方案4】:

    我想管道会有更好的延迟,因为它很可能在幕后使用协程来实现。因此,生产者在数据可用时立即让步给消费者,而不是在线程调度器决定时。

    管道通常代表消费者-生产者问题,并且很可能以这种方式实现,以便两个线程协作并且不会被外部抢占。

    【讨论】:

      猜你喜欢
      • 2016-10-03
      • 2012-05-28
      • 2013-10-09
      • 1970-01-01
      • 2010-11-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多