【问题标题】:Close a socket when all pending messages have been sent发送完所有未决消息后关闭套接字
【发布时间】:2010-11-09 15:22:03
【问题描述】:

我有这种方法可以将消息写入套接字:

public void sendMessage(byte[] msgB) {
    try {
        synchronized (writeLock) {
            log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
            ous.write(HEADER_MSG);
            ous.writeInt(msgB.length);
            ous.write(msgB);
            ous.flush();
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

现在一个名为 Bob 的线程想在某个不确定的时刻 X 关闭套接字,这意味着可能仍有线程在等待writeLock 发送它们的消息,甚至可能有一个线程正在编写它。

我可以通过让 Bob 在关闭套接字之前获取 writeLock 来解决后者,但我仍然可能会丢失尚未开始发送的消息,因为据我所知 synchronized 是不公平,Bob 可以在其他等待更长时间的线程之前获得锁。

我需要的是,在 X 之前对sendMessage 的所有调用都能正常工作,而在 X 之后的调用会引发错误。我该怎么做?

  • 说明:Bob 是从套接字输入流中读取的线程,X 是在该流上收到“关闭”消息时。

【问题讨论】:

    标签: java multithreading sockets networking concurrency


    【解决方案1】:

    您可以在此处使用执行器。由于每个发送消息都是同步的(我假设在一个公共共享对象上),您可以使用线程限制。

    static final ExecutorService executor = Executors.newSingleThreadExecutor();
    
    public void sendMessage(byte[] msgB) {
        executor.submit(new Runnable() {
            public void run() {
                try {
                    log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
                    ous.write(HEADER_MSG);
                    ous.writeInt(msgB.length);
                    ous.write(msgB);
                    ous.flush();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
    public static void atMomentX(){
        executor.shutdown();
    }
    

    完成后另一个线程可以调用 atMomentX();

    从 javadoc 中,关闭方法说:

    启动有序关闭,其中 之前提交的任务是 执行,但不会有新任务 公认。调用没有 如果已经关闭,则附加效果 下来。

    【讨论】:

    • 我使用了自己的方法,因为我不想将另一个线程添加到已经有数百个服务器系统的服务器系统中,但无论如何都赞成,因为它在技术上是正确的方法。
    【解决方案2】:

    考虑使用单线程ExecutorService 来执行消息的写入。发送线程只需尝试通过调用execute(Runnable)submit(Callable) 来“发送”他们的消息。一旦您希望停止发送消息,您将关闭 ExecutorService (shutdown()) 导致后续调用提交/执行以产生 RejectedExecutionException

    这种方法的优点是您只有一个 I/O 绑定线程,并且与多个线程等待自己写入消息相比,锁争用更少。这也是更好的关注点分离。

    这里有一个简单的例子,它可以更深入地说明问题:

    public interface Message {
      /**
       * Writes the message to the specified stream.
       */
      void writeTo(OutputStream os);
    }
    
    public class Dispatcher {
      private final ExecutorService sender;
      private final OutputStream out;
    
      public Dispatcher() {
        this.sender = Executors.newSingleThreadExecutor();
        this.out = ...; // Set up output stream.
      }
    
      /**
       * Enqueue message to be sent.  Return a Future to allow calling thread
       * to perform a blocking get() if they wish to perform a synchronous send.
       */
      public Future<?> sendMessage(final Message msg) {
        return sender.submit(new Callable<Void>() {
          public Void call() throws Exception {
            msg.writeTo(out);
            return null;
          }
        });
      }
    
      public void shutDown() {
        sender.shutdown(); // Waits for all tasks to finish sending.
    
        // Close quietly, swallow exception.
        try {
          out.close();
        } catch (IOException ex) {
        }
      }
    }
    

    【讨论】:

    • 我使用了自己的方法,因为我不想将另一个线程添加到已经有数百个服务器系统的服务器系统中,但无论如何都赞成,因为它在技术上是正确的方法。
    【解决方案3】:

    我想我可以将同步块替换为 ReentrantLock 设置为公平。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-01-09
      • 1970-01-01
      • 2011-05-17
      • 2019-09-20
      • 1970-01-01
      • 2022-11-11
      相关资源
      最近更新 更多