【问题标题】:Java Threading within TCP and server sockets [closed]TCP和服务器套接字中的Java线程[关闭]
【发布时间】:2013-03-27 18:11:19
【问题描述】:

我写了以下两行

ServerSocket mobCom = new ServerSocket(9846);
Socket server = mobCom.accept();

我希望创建一个新的 TCP 连接并由一个新线程处理该连接。例如上面的代码创建了一个服务器套接字。并且有多个客户。每当客户端连接到服务器时,可能会创建一个新线程来满足来自该特定客户端的请求。我该如何实现。

编辑

我还想将线程池限制为 10 个用户。如果出现更多用户,我想向他们发送错误消息,而不处理他们的进一步请求。

【问题讨论】:

  • 网络上有大约一百万个这样的例子......
  • this answer

标签: java multithreading sockets serversocket


【解决方案1】:

您可以使用 java util concurrent 的 SynchronousQueue 来实现所需的结果。 创建固定数量的工人。使用 take 调用启动对 SynchronousQueue 的块读取。因此,如果所有工作人员各自完成一项工作并忙于处理它们(与套接字通信),则不会从 SynchronousQueue 读取任何内容,因此对同步队列的 offer 将失败。检查此故障(这意味着所有固定数量的工作人员都忙,现在没有人锁定队列),拒绝下一个请求。

以下行中的示例代码 [未经测试 - 为简洁起见避免异常,请根据您的需要进行修改]。

public class BoundedServer 
{
    public static void main(String[] args) 
    {
        /**
         * Port to serve
         */
        final int port = 2013;

        /**
         * Max Workers
         */
        final int maxworkers = 10; 

        /**
         * The server socket.
         */
        ServerSocket mServerSocket = null;

        /**
         * Queue of work units to process if there is a worker available.
         */
        final SynchronousQueue<WorkUnit> mQueueToProcess = new SynchronousQueue<WorkUnit>();

        /**
         * Queue of work units to reject if there is no current worker available.
         */
        final LinkedBlockingQueue<WorkUnit> mQueueToReject = new LinkedBlockingQueue<WorkUnit>(); 

        /**
         * A thread pool to handle the work.
         */
        final ExecutorService communicationservice = Executors.newFixedThreadPool(maxworkers);

        /**
         * Let a single thread take care of rejecting the requests when needed to do so.
         */
        final ExecutorService rejectionservice = Executors.newSingleThreadExecutor();

        try 
        {
            Runnable communicationlauncher = new Runnable() 
            {
                public void run() 
                {
                    try
                    {
                        /**
                         * Set of workers to handle the work.
                         */
                        final CommunicationWorker[] workers = new CommunicationWorker[maxworkers];

                        communicationservice.invokeAll(Arrays.asList(workers));
                    }
                    finally
                    {
                        communicationservice.shutdown();
                    }
                }
            };

            new Thread(communicationlauncher).start();

            Runnable rejectionlauncher = new Runnable() 
            {
                public void run() 
                {
                    try
                    {
                        RejectionWorker rejectionworker = new RejectionWorker(mQueueToReject);

                        rejectionservice.submit(rejectionworker);
                    }
                    finally
                    {
                        rejectionservice.shutdown();
                    }
                }
            };
            new Thread(rejectionlauncher).start();

            mServerSocket = new ServerSocket(port);

            while(true)
            {
                WorkUnit work = new WorkUnit(mServerSocket.accept());

                if(!mQueueToProcess.offer(work))
                {
                    mQueueToReject.add(work);
                }
            }
        } 
        finally
        {
            try
            {
                mServerSocket.close();
            }
        }
    }
}


public class WorkUnit 
{
    private Socket mSocket = null;

    public WorkUnit(Socket socket) 
    {
        super();
        this.setSocket(socket);
    }

    public Socket getSocket() {
        return mSocket;
    }

    public void setSocket(Socket mSocket) {
        this.mSocket = mSocket;
    }
}

public class CommunicationWorker 
implements Callable<Boolean> 
{
    private SynchronousQueue<WorkUnit> mQueueToProcess;

    public CommunicationWorker(SynchronousQueue<WorkUnit> queueToProcess) 
    {
        super();
        this.mQueueToProcess = queueToProcess;
    }

    @Override
    public Boolean call() throws Exception 
    {
        while(true)
        {
            WorkUnit work = mQueueToProcess.take();

            Socket socket = work.getSocket();

            // Code to handle socket communication and closure.
            // Once the communication is finished, this thread will get blocked to mQueueToProcess.
        }
    }
}


public class RejectionWorker 
implements Callable<Boolean> 
{
    private LinkedBlockingQueue<WorkUnit> mQueueToReject;

    public RejectionWorker(LinkedBlockingQueue<WorkUnit> queueToReject) 
    {
        super();
        this.mQueueToReject = queueToReject;
    }

    @Override
    public Boolean call() throws Exception 
    {
        while(true)
        {
            WorkUnit work = mQueueToReject.take();

            Socket socket = work.getSocket();

            // Code to reject the request.
        }
    }
}

【讨论】:

    【解决方案2】:

    您必须执行以下操作。 ServiceThread 是线程将服务请求。

     while (true) {
                  try {
                      Socket clientSocket = null;
                      if (null != serverSocket) {
                        clientSocket = serverSocket.accept();
                        ServiceThread serverThread = new ServiceThread(clientSocket); // Create a new thread for each client
                        serverThread.start();
                      }
                  }  catch( Exception ex ) {
                      System.out.println("Exception while accepting connection " + ex.getMessage());
                      ex.printStackTrace();
                  }
    

    【讨论】:

    • 如果serverSocket 为空,此代码将占用 CPU。
    猜你喜欢
    • 1970-01-01
    • 2015-01-21
    • 2013-05-15
    • 2020-09-17
    • 2017-03-06
    • 2015-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多