【问题标题】:Unit testing a Java multi-threaded network app对 Java 多线程网络应用程序进行单元测试
【发布时间】:2009-09-27 16:18:38
【问题描述】:

我正在编写一个 Java 多线程网络应用程序,并且很难想出一种方法来对从网络客户端发送和接收通信的对象进行单元测试。

对象向多个客户端发送消息,然后等待客户端的响应。

随着每个客户端的响应,仪表板样式的 GUI 会更新。

更详细...

Message 对象表示要发送的文本消息,并包含应接收消息的客户端数组。

Message 对象负责将自己分派给所有适当的客户端。

当对 Message 对象调用 dispatch() 方法时,该对象会为 Client 数组中的每个客户端生成一个新线程 (MessageDispatcher)。

每个 MessageDispatcher:

  • 向客户端打开一个新的 TCP 套接字(Socket)

  • 将消息传递给其客户端... PrintWriter out.println(msg text)

  • 创建一个“状态”对象,该对象被传递到消息对象中的队列,然后传递到 GUI。

每个状态对象代表以下事件之一:

  • 消息传递给 Socket(通过 Printwriter out.println())

  • 显示从客户端收到的回执(通过 BufferedReader/InputStreamReader in.readline()...阻塞直到收到网络输入

  • 从客户端收到用户确认回执(通过与上述相同的方法)

所以.. 我想对 Message 对象进行单元测试。 (使用 JUnit)

单元测试称为 MessageTest.java(包含在下面)。

我的第一步是设置一个包含单个收件人的 Message 对象。

然后我使用 JMockit 创建了一个模拟 Socket 对象,该对象可以向 PrintWriter 提供一个模拟 OutputStream 对象(我使用的是扩展 OutputStream 的 ByteArrayOutputStream)。

然后,当 MessageDispatcher 调用 (PrintWriter object).out 时,消息文本将理想地传递给我的模拟 Socket 对象(通过模拟 OutputStream),它可以检查消息文本是否正常。

还有 InputStreamReader 的示例原理.... 模拟 Socket 对象还提供了一个模拟 InputStreamReader 对象,该对象提供了一个由 MessageDispatcher 调用的模拟 BufferedReader(如前所述,MessageDispatcher 在 in.readLine() 上阻塞)。此时,模拟 BufferedReader 应该向 MessageDispatcher 提供虚假确认...

// mock Socket
Mockit.redefineMethods(Socket.class, new Object()
{

    ByteArrayOutputStream output = new ByteArrayOutputStream();
    ByteArrayInputStream input = new ByteArrayInputStream();

    public OutputStream getOutputStream()
    {
        return output;
    }

    public InputStream getInputStream()
    {
        return input;
    }

});

如果这不是多线程的,这应该可以正常工作。但是我不知道如何使用多个线程来做到这一点。任何人都可以给我任何建议或提示吗?

此外,如果您对设计有任何意见(例如,消息对象负责其自己的交付,而不是单独的交付对象。“依赖注入”风格/每个客户端交付的单独线程),那么我会感兴趣也听到了。

更新:这里是代码:

消息.java

public class Message {

    Client[] to;

    String contents;

    String status;

    StatusListener listener;

    BlockingQueue<Status> statusQ;

    public Message(Client[] to, String contents, StatusListener listener) 
    {
        this.to = to;
        this.contents = contents;
        this.listener = listener;
    }

    public void dispatch()
    {
        try {

            // open a new thread for each client

            // keep a linked list of socket references so that all threads can be closed
            List<Socket> sockets = Collections.synchronizedList(new ArrayList<Socket>());

            // initialise the statusQ for threads to report message status
            statusQ = new ArrayBlockingQueue<Status>(to.length*3); // max 3 status objects per thread

            // dispatch to each client individually and wait for confirmation
            for (int i=0; i < to.length; i++) {

            System.out.println("Started new thread");

            (new Thread(new MessageDispatcher(to[i], contents, sockets, statusQ))).start();

            }

            // now, monitor queue and empty the queue as it fills up.. (consumer)
            while (true) {
                listener.updateStatus(statusQ.take());
            }
        }

        catch (Exception e) { e.printStackTrace(); }

    }

    // one MessageDispatcher per client
    private class MessageDispatcher implements Runnable
    {

        private Client client;
        private String contents;
        private List<Socket> sockets;
        private BlockingQueue<Status> statusQ;

        public MessageDispatcher(Client client, String contents, List<Socket> sockets, BlockingQueue<Status> statusQ) {

            this.contents = contents;

            this.client = client;

            this.sockets = sockets;

            this.statusQ = statusQ;

        }

        public void run() {

        try {

            // open socket to client
            Socket sk = new Socket(client.getAddress(), CLIENTPORT);

            // add reference to socket to list
            synchronized(sockets) {
                sockets.add(sk);
            }

            PrintWriter out = new PrintWriter(sk.getOutputStream(), true);

            BufferedReader in = new BufferedReader(new InputStreamReader(sk.getInputStream()));

            // send message
            out.println(contents);

            // confirm dispatch
            statusQ.add(new Status(client, "DISPATCHED"));

            // wait for display receipt
            in.readLine();

            statusQ.add(new Status(client, "DISPLAYED"));

            // wait for read receipt
            in.readLine();

            statusQ.add(new Status(client, "READ"));

            }

            catch (Exception e) { e.printStackTrace(); }
        }

    }

}

....以及相应的单元测试:

MessageTest.java

public class MessageTest extends TestCase {

    Message msg;

    static final String testContents = "hello there";

    public void setUp() {

        // mock Socket
        Mockit.redefineMethods(Socket.class, new Object()
        {

            ByteArrayOutputStream output = new ByteArrayOutputStream();
            ByteArrayInputStream input = new ByteArrayInputStream();

            public OutputStream getOutputStream()
            {
                return output;
            }

            public InputStream getInputStream()
            {
                return input;
            }


        });

        // NB
        // some code removed here for simplicity
        // which uses JMockit to overrides the Client object and give it a fake hostname and address

        Client[] testClient = { new Client() };

        msg = new Message(testClient, testContents, this);

    }

    public void tearDown() {
    }

    public void testDispatch() {

        // dispatch to client
        msg.dispatch();


    }   
}

【问题讨论】:

  • 您能否发布一些您正在使用的代码并告诉我们为什么该代码不起作用?
  • 只放两个主要对象

标签: java multithreading unit-testing networking junit


【解决方案1】:

请注意,也可以通过 NIO API (java.nio) 在单个阻塞方法中实现多条消息的发送(多播),而无需创建新线程。不过,NIO 相当复杂。

我将首先编写测试,使用测试定义的 StatusListener 实现,它将所有更新事件存储在一个列表中。当 dispatch() 方法返回时,测试可以对事件列表的状态执行断言。

使用线程或 NIO 是 Message 类的实现细节。因此,除非您不介意将测试与此实现细节相结合,否则我建议您引入一个帮助类,该类负责发送多个异步消息并在任何异步回复时通知 Message 对象。然后,您可以在单元测试中模拟帮助程序类,而无需将它们耦合到线程或 NIO。

我成功实现了一个向一个客户端发送消息的案例的测试。我也对原生产代码做了一些改动,如下:

public class Message
{
   private static final int CLIENT_PORT = 8000;

   // Externally provided:
   private final Client[] to;
   private final String contents;
   private final StatusListener listener;

   // Internal state:
   private final List<Socket> clientConnections;
   private final BlockingQueue<Status> statusQueue;

   public Message(Client[] to, String contents, StatusListener listener)
   {
      this.to = to;
      this.contents = contents;
      this.listener = listener;

      // Keep a list of socket references so that all threads can be closed:
      clientConnections = Collections.synchronizedList(new ArrayList<Socket>());

      // Initialise the statusQ for threads to report message status:
      statusQueue = new ArrayBlockingQueue<Status>(to.length * 3);
   }

   public void dispatch()
   {
      // Dispatch to each client individually and wait for confirmation:
      sendContentsToEachClientAsynchronously();

      Status statusChangeReceived;

      do {
         try {
            // Now, monitor queue and empty the queue as it fills up (consumer):
            statusChangeReceived = statusQueue.take();
         }
         catch (InterruptedException ignore) {
            break;
         }
      }
      while (listener.updateStatus(statusChangeReceived));

      closeRemainingClientConnections();
   }

   private void closeRemainingClientConnections()
   {
      for (Socket connection : clientConnections) {
         try {
            connection.close();
         }
         catch (IOException ignore) {
            // OK
         }
      }

      clientConnections.clear();
   }

   private void sendContentsToEachClientAsynchronously()
   {
      for (Client client : to) {
         System.out.println("Started new thread");
         new Thread(new MessageDispatcher(client)).start();
      }
   }

   // One MessageDispatcher per client.
   private final class MessageDispatcher implements Runnable
   {
      private final Client client;

      MessageDispatcher(Client client) { this.client = client; }

      public void run()
      {
         try {
            communicateWithClient();
         }
         catch (IOException e) {
            throw new RuntimeException(e);
         }
      }

      private void communicateWithClient() throws IOException
      {
         // Open connection to client:
         Socket connection = new Socket(client.getAddress(), CLIENT_PORT);

         try {
            // Add client connection to synchronized list:
            clientConnections.add(connection);

            sendMessage(connection.getOutputStream());
            readRequiredReceipts(connection.getInputStream());
         }
         finally {
            connection.close();
         }
      }

      // Send message and confirm dispatch.
      private void sendMessage(OutputStream output)
      {
         PrintWriter out = new PrintWriter(output, true);

         out.println(contents);
         statusQueue.add(new Status(client, "DISPATCHED"));
      }

      private void readRequiredReceipts(InputStream input) throws IOException
      {
         BufferedReader in = new BufferedReader(new InputStreamReader(input));

         // Wait for display receipt:
         in.readLine();
         statusQueue.add(new Status(client, "DISPLAYED"));

         // Wait for read receipt:
         in.readLine();
         statusQueue.add(new Status(client, "READ"));
      }
   }
}
public final class MessageTest extends JMockitTest
{
   static final String testContents = "hello there";
   static final String[] expectedEvents = {"DISPATCHED", "DISPLAYED", "READ"};

   @Test
   public void testSendMessageToSingleClient()
   {
      final Client theClient = new Client("client1");
      Client[] testClient = {theClient};

      new MockUp<Socket>()
      {
         @Mock(invocations = 1)
         void $init(String host, int port)
         {
            assertEquals(theClient.getAddress(), host);
            assertTrue(port > 0);
         }

         @Mock(invocations = 1)
         public OutputStream getOutputStream() { return new ByteArrayOutputStream(); }

         @Mock(invocations = 1)
         public InputStream getInputStream()
         {
            return new ByteArrayInputStream("reply1\nreply2\n".getBytes());
         }

         @Mock(minInvocations = 1) void close() {}
      };

      StatusListener listener = new MockUp<StatusListener>()
      {
         int eventIndex;

         @Mock(invocations = 3)
         boolean updateStatus(Status status)
         {
            assertSame(theClient, status.getClient());
            assertEquals(expectedEvents[eventIndex++], status.getEvent());
            return eventIndex < expectedEvents.length;
         }
      }.getMockInstance();

      new Message(testClient, testContents, listener).dispatch();
   }
}

上面的 JMockit 测试使用新的 MockUp 类,在最新版本中尚不可用。不过,它可以替换为Mockit.setUpMock(Socket.class, new Object() { ... })

【讨论】:

  • 只是为了澄清......你的意思是在线程/NIO和消息对象之间创建一个类吗? (即,如您从消息对象中所说,删除“实现细节”的责任)...如果是这样,这是否违反封装原则(将所有方法用于处理对象内部的对象数据)和依赖注入? (即,从 OO 设计的角度来看不是很好)
  • 是的,如果您认为封装这些实现细节很重要;当然,这不是必需的。请注意,创建这样一个辅助类将遵循“封装变化的内容”的 OO 原则;在这种情况下,“变化的东西”是处理异步 I/O 的方式(即,使用阻塞或非阻塞 I/O 操作)。依我看,依赖注入在这里无关紧要;既然不涉及外部可配置的服务,为什么要这样做?
【解决方案2】:

也许您可以在 Message 类中使用 AbstractFactory 来创建输出和输入流,而不是重新定义方法 getOutputStream 和 getInputStream。在正常操作中,工厂将使用 Socket 来执行此操作。但是,为了进行测试,请给它一个工厂,为它提供您选择的流。这样您就可以更好地控制正在发生的事情。

【讨论】:

  • 你说得对,可以在 Message 类中使用某种工厂,但代价是让它变得更复杂。由于可以使用模拟工具对 Message 类进行干净的单元测试,因此我宁愿这样做,也不愿为生产代码增加额外的复杂性。
  • 这是我的想法...我宁愿使用 JMockit 将工厂传递给 Message.... 话虽如此,我对这两种方式都持开放态度,并为此重构了 Message 的副本(到目前为止它有效)。但是我正在寻找一种参与较少的方法..
猜你喜欢
  • 2010-09-11
  • 2019-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-03-21
  • 2011-03-25
相关资源
最近更新 更多