【问题标题】:Share common data between two threads serving a Socket connection在服务于 Socket 连接的两个线程之间共享公共数据
【发布时间】:2012-06-12 05:13:17
【问题描述】:

我在 SO 上看到了很多类似的问题,但图片中几乎没有一个有 Socket。所以请花点时间阅读这个问题。

我有监听请求的服务器应用程序(使用ServerSocket),当客户端尝试连接时,会创建新线程来为客户端提供服务(并且服务器返回到新请求的监听模式)。现在,我需要根据其他客户端发送到服务器的内容来响应一个客户端。

示例:

  • ServerSocket 监听传入连接。
  • 客户端 A 连接,创建新线程为 A 服务。
  • 客户端 B 连接,创建新线程为 B 服务。
  • A 向服务器发送消息“Hello from A”。
  • 将此消息作为响应发送给客户端 B。

我对整个“线程间通信”这件事很陌生。显然,上面提到的情况听起来很简单,但我描述这个是为了得到一个提示,因为我将在客户端之间交换大量数据,保持服务器为中间。

另外,如果我想将共享对象限制为(例如 10 个)特定客户端,该怎么办?这样,当第 11 个客户端连接到服务器时,我创建了新的共享对象,该对象将用于在第 11、12、13 ......最多 20 个客户端之间交换数据。以此类推,每组 10 个客户端。

我尝试了什么:(我猜是愚蠢的)

  • 我有一个public 类,该对象应该共享为public static,这样我就可以在不实例化它的情况下将它用作全局对象,例如MyGlobalClass.SharedMsg
  • 这不起作用,我无法将在一个线程中接收到的数据发送到另一个线程。

我知道有一个明显的锁定问题,因为如果一个线程正在写入一个对象,其他线程在第一个线程完成写入之前无法访问它。

那么解决这个问题的理想方法是什么?

更新

由于我创建线程来处理传入连接请求的方式,我无法理解如何在线程之间共享相同的对象,因为使用上面提到的全局对象不起作用。

以下是我如何监听传入连接并动态创建服务线程。

// Method of server class
public void startServer()
{
    if (!isRunning)
    {
        try
        {
            isRunning = true;
            while (isRunning)
            {
                try
                {
                    new ClientHandler(mysocketserver.accept()).start();
                }
                catch (SocketTimeoutException ex)
                {
                    //nothing to perform here, go back again to listening.
                }
                catch (SocketException ex)
                {
                    //Not to handle, since I'll stop the server using SocketServer's close() method, and its going to throw SocketException anyway.
                }
            }
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
    else
        System.out.println("Server Already Started!");
}

还有ClientHandler 类。

public class ClientHandler extends Thread
{
    private Socket client = null;
    private ObjectInputStream in = null;
    private ObjectOutputStream out = null;

    public ClientHandler(Socket client)
    {
        super("ClientHandler");
        this.client = client;
    }

    //This run() is common for every Client that connects, and that's where the problem is.
    public void run()
    {
        try
        {
            in = new ObjectInputStream(client.getInputStream());
            out = new ObjectOutputStream(client.getOutputStream());

            //Message received from this thread.
            String msg = in.readObject().toString();
            System.out.println("Client @ "+ client.getInetAddress().getHostAddress() +" Says : "+msg);


            //Response to this client.
            out.writeObject("Message Received");

            out.close();
            in.close();
            client.close();
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
}

我相信我创建动态线程来服务每个连接的客户端的方式,使用全局对象共享相同的数据源是不可能的,因为上面的run() 的主体对于每个连接的客户端都是完全相同的,因此同样的方法既是消费者也是生产者。我应该进行哪些修复,以便我可以为每个连接创建动态线程并仍然共享同一个对象。

【问题讨论】:

    标签: java multithreading sockets


    【解决方案1】:

    您可能需要一个队列用于每个客户端之间的通信。每个队列都是数据从一个客户端推送到另一个客户端的“管道”。

    你会这样使用它(伪代码):

    Thread 1:
    Receive request from Client A, with message for Client B
    Put message on back of concurrent Queue A2B
    Respond to Client A.
    
    Thread 2:
    Receive request from Client B.
    Pop message from front of Queue A2B
    Respond to Client B with message.
    

    您可能还希望它是通用的,因此您有一个 AllToB 队列,许多客户端(因此也有许多线程)可以写入。

    注意类别:ConcurrentLinkedQueueArrayBlockingQueue

    如果你想限制消息的数量,那么 ArrayBlockingQueue 及其容量构造函数允许你这样做。如果不需要阻塞功能,可以使用offerpoll 方法,而不是puttake

    我不会担心共享队列,这会使问题变得更加复杂。仅当您知道需要解决内存使用问题时才执行此操作。

    编辑:根据您的更新:

    如果您需要在所有动态创建的实例之间共享一个实例,您可以:

    1. 制作一个静态实例。
    2. 将其传递给构造函数。

    示例 1:

    public class ClientHandler extends Thread
    {
      public static final Map<ClientHandler, BlockingQueue<String>> messageQueues 
        = new ConcurrentHashMap<>();
    
      <snip>
    
      public ClientHandler(Socket client)
      {
        super("ClientHandler");
        this.client = client;
        // Note: Bad practice to reference 'this' in a constructor.
        // This can throw an error based on what the put method does.
        // As such, if you are to do this, put it at the end of the method.
        messageQueues.put(this, new ArrayBlockingQueue<>());
      }
    
      // You can now access this in the run() method like so:
      // Get messages for the current client.
      // messageQueues.get(this).poll();
      // Send messages to the thread for another client.
      // messageQueues.get(someClient).offer(message);
    

    几点说明:

    • messageQueues 对象实际上应该包含某种客户端标识符,而不是短暂的对象引用。
    • 更可测试的设计会将 messageQueues 对象传递给构造函数以允许模拟。
    • 我可能会建议对地图使用包装类,这样您就可以使用 2 个参数调用 offer,而不必担心地图语义。

    【讨论】:

    • 链接到任何使用示例?请注意,客户端会将其消息作为 JSON 对象发送,所以我可以将接收到的消息(实际为 JSONObject 或 JSON 格式的 String)保留在此 ConcurrentLinkedQueue 对象中以供进一步处理?
    • 试试这些例子:ConcurrentLinkedQueueArrayBlockingQueue
    • 为什么不将连接存储在“全局”集合中?收到消息后,通过集合循环并将您需要的内容发送给需要它的人。无论如何,您也可以尝试使用带有事件驱动的 Java NIO 的套接字进行探索,并且似乎适合这种情况 - 在收到消息时做一些事情。
    猜你喜欢
    • 2020-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-03
    • 2015-09-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多