【问题标题】:Concurrent writing upon a standard OutputStream在标准 OutputStream 上并发写入
【发布时间】:2012-11-01 14:14:49
【问题描述】:

我正在编写一个应用程序,该应用程序涉及将大量数据写入 OutputStream(属于 Socket)。使这有点复杂的是,通常有多个线程试图写入同一个 OutputStream。目前,我对其进行了设计,以便将数据写入其中的 OutputStream 位于其自己的线程中。该线程包含一个队列(LinkedList),它轮询字节数组并尽快将它们写入。

private class OutputStreamWriter implements Runnable {

    private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>();

    public void run() {
        OutputStream outputStream = User.this.outputStream;
        while (true) {
            try {
                if (chunkQueue.isEmpty()) {
                    Thread.sleep(100);
                    continue;
                }
                outputStream.write(chunkQueue.poll());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

这种设计的问题在于,随着越来越多的写入发生,越来越多的数据排队,并且写入速度没有任何提高。最初,当数据被放入队列时,它实际上是立即写入的。然后大约15秒左右后,数据开始滞后;从数据排队到数据实际写入的时间会有延迟。随着时间的推移,这种延迟变得越来越长。它非常引人注目。

解决此问题的一种方法是某种 ConcurrentOutputStream 实现,它允许在不阻塞的情况下发送数据,这样就不会开始备份写入(哎呀,那时就不需要队列了)。我不知道是否有这样的实现——我一直找不到——而且我个人认为甚至不可能写一个。

那么,有人对我如何重新设计这个有什么建议吗?

【问题讨论】:

  • 这不是很有建设性。它有什么问题?
  • 顺便说一句,您是否正在同步对链表的修改?因为它在设计上不是线程安全的。另外,你在套接字输出之上分层了什么样的输出流,你通过它推送了多少数据?

标签: java sockets queue outputstream


【解决方案1】:

socket的吞吐量是有限的;如果它比您的数据生成吞吐量慢,则必须缓冲数据,这是没有办法的。 “并发”写作根本没有帮助。

您可以考虑在排队数据超过一定限制时暂停数据生成,以减少内存消耗。

【讨论】:

  • 我只是在这里扔东西,但是,SocketChannel 呢?
  • 我认为这不会有帮助。瓶颈是网络带宽。
【解决方案2】:

我同意@irreputable 的观点,即并发写作毫无帮助。相反,您应该关注生产方面,即您已经拥有的。

  1. 使用 BlockingQueue 而不是 LinkedList。

  2. 使用队列的阻塞轮询操作,而不是仅仅盲目睡眠 100msl,根据定义,这将平均浪费 50% 的时间。在很长一段时间内,真的可以加起来。

【讨论】:

    【解决方案3】:

    我需要一个过滤器来拦截慢速连接,我需要尽快关闭数据库连接,所以我最初使用 Java 管道,但是当仔细观察它们的实现时,它都是同步的,所以我最终使用一个小缓冲区创建了自己的 QueueInputStream 和阻塞队列将缓冲区放入队列中一次已满,它是无锁的,除非 LinkedBlockingQueue 使用的锁定条件在小缓冲区的帮助下应该很便宜,此类仅用于单个每个实例的生产者和消费者,您应该传递一个 ExecutorService 以开始将排队的字节流式传输到最终的 OutputStream:

    import java.io.IOException;
    import java.io.OutputStream;
    import java.util.concurrent.*;
    
    public class QueueOutputStream extends OutputStream
    {
      private static final int DEFAULT_BUFFER_SIZE=1024;
      private static final byte[] END_SIGNAL=new byte[]{};
    
      private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
      private final byte[] buffer;
    
      private boolean closed=false;
      private int count=0;
    
      public QueueOutputStream()
      {
        this(DEFAULT_BUFFER_SIZE);
      }
    
      public QueueOutputStream(final int bufferSize)
      {
        if(bufferSize<=0){
          throw new IllegalArgumentException("Buffer size <= 0");
        }
        this.buffer=new byte[bufferSize];
      }
    
      private synchronized void flushBuffer()
      {
        if(count>0){
          final byte[] copy=new byte[count];
          System.arraycopy(buffer,0,copy,0,count);
          queue.offer(copy);
          count=0;
        }
      }
    
      @Override
      public synchronized void write(final int b) throws IOException
      {
        if(closed){
          throw new IllegalStateException("Stream is closed");
        }
        if(count>=buffer.length){
          flushBuffer();
        }
        buffer[count++]=(byte)b;
      }
    
      @Override
      public synchronized void write(final byte[] b, final int off, final int len) throws IOException
      {
        super.write(b,off,len);
      }
    
      @Override
      public synchronized void close() throws IOException
      {
        flushBuffer();
        queue.offer(END_SIGNAL);
        closed=true;
      }
    
      public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
      {
        return executor.submit(
                new Callable<Void>()
                {
                  @Override
                  public Void call() throws Exception
                  {
                    try{
                      byte[] buffer=queue.take();
                      while(buffer!=END_SIGNAL){
                        outputStream.write(buffer);
                        buffer=queue.take();
                      }
                      outputStream.flush();
                    } catch(Exception e){
                      close();
                      throw e;
                    } finally{
                      outputStream.close();
                    }
                    return null;
                  }
                }
        );
      }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-03-20
      • 1970-01-01
      • 2012-01-25
      • 1970-01-01
      • 1970-01-01
      • 2018-04-07
      • 1970-01-01
      • 2015-01-31
      相关资源
      最近更新 更多