【问题标题】:Socket sends message only onceSocket 只发送一次消息
【发布时间】:2016-11-12 01:22:57
【问题描述】:

下面的代码工作并在预定时间发送消息,但我认为每次计时器执行预定任务时打开新套接字不是一个好的解决方案。我想要的是只在运行方法中打开一次套接字,并在计时器中创建新的类实例时在 SendMessage 类中访问它。这样它不起作用,它只发送一条消息然后停止发送。此外,对于一些关于代码或使其线程安全的提示的批评者,我会很高兴。

public class Client implements Runnable{

// Client Constructor here

@Override
public void run(){
    //SENDS ONLY ONE MESSAGE
    pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

    Timer timer = new Timer();

    timer.schedule(new SendMessage(), 0, 1000/mps);
}

private class SendMessage extends TimerTask{

    private int id;

    @Override
    public void run() {

        try
          {  // THIS WORKS FINE, SENDS MESSAGES AT SCHEDULED TIME                     
             pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

             OutputStream outToServer = pitcherSocket.getOutputStream();

             DataOutputStream out = new DataOutputStream(outToServer);

             out.writeInt(id);

             out.flush();

          }catch(IOException e)
          {
             e.printStackTrace();
          }
       }
    }
}

编辑:完整代码

客户

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class Pitcher implements Runnable{

private int port;
private int mps;
private int size;
private String hostname;
private List<Integer> messageIds = Collections.synchronizedList(new     ArrayList<Integer>());
private Socket pitcherSocket;

//constatns, integer is 4 bytes, long is 8 bytes
private static final int INT_SIZE = 4;
private static final int LONG_SIZE = 8;


public Pitcher(int port, int mps, int size, String hostname) {

    this.port = port;
    this.mps = mps;
    this.size = size;
    this.hostname = hostname;
}

@Override
public void run(){

    System.out.println("Pitcher running...");
    System.out.println();

    Timer timer = new Timer();

    timer.schedule(new SendMessage(), 0, 1000/mps); 

    timer.schedule(new DisplayStatistics(), 0, 1000/mps);

}

//Nested class that sends messages
private class SendMessage extends TimerTask{

    private int numberOfSentMessages = 0;
    private int id;

    @Override
    public void run() {

        try {                         
             pitcherSocket = new Socket(InetAddress.getByName(hostname), port);

             OutputStream outToServer = pitcherSocket.getOutputStream();

             DataOutputStream out = new DataOutputStream(outToServer);

             //send message size
             out.writeInt(size);

             //message id is same as number of the sent message
             id = numberOfSentMessages + 1;
             out.writeInt(id);
             messageIds.add(id);



             //get system timestamp
             long currentTimestamp = System.currentTimeMillis();
             out.writeLong(currentTimestamp);

             //fill in the rest-
             byte[] rest = new byte[size - 2 * INT_SIZE - LONG_SIZE];     //message size(default 300 bytes) - size(4 bytes) - message id(4 bytse) - timestamp(8 bytes)
             out.write(rest);

             out.flush();

             numberOfSentMessages++;


             InputStream inFromServer = pitcherSocket.getInputStream();
             DataInputStream in = new DataInputStream(inFromServer);

             Integer catcherMessageSize = in.readInt();
             Integer catcherId = in.readInt();
             long catcherTimestamp = in.readLong();

             System.out.println("Sent message:     " + size + " " + id + " " + currentTimestamp + "...");
             System.out.println("Received message: " + catcherMessageSize + " " + catcherId + " " + catcherTimestamp + "...");
             System.out.println();

          }catch(IOException e)
          {
             e.printStackTrace();
          }

    }

}

}

服务器

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;

public class Catcher implements Runnable{

private int port;
private String bind;
private ServerSocket serverSocket;

//constatns, integer is 4 bytes, long is 8 bytes
private static final int INT_SIZE = 4;
private static final int LONG_SIZE = 8;

public Catcher(int port, String bind) {

    this.port = port;
    this.bind = bind;
}

@Override
public void run() {

    System.out.println("Catcher running...");
    System.out.println();

    try {
        serverSocket = new ServerSocket(port, 100, InetAddress.getByName(bind));
    } 
    catch (IOException e1) {
        e1.printStackTrace();
    }

    while(true){

         try
         {              
            Socket server = serverSocket.accept();

            DataInputStream in = new DataInputStream(server.getInputStream());

            Integer pitcherMessageSize = in.readInt();
            Integer pitcherId = in.readInt();
            long pitcherTimestamp = in.readLong();

            DataOutputStream out = new DataOutputStream(server.getOutputStream());

            //message id and size are sent back
            out.writeInt(pitcherMessageSize);
            out.writeInt(pitcherId);

            //send back current time
            long currentTimestamp = System.currentTimeMillis();
            out.writeLong(currentTimestamp);

            //fill in the rest
            byte[] rest = new byte[pitcherMessageSize - 2 * INT_SIZE - LONG_SIZE]; //message size(default 300 bytes) - size(4 bytes) - message id(4 bytes) - timestamp(8 bytes)
            out.write(rest);

            out.flush();

            System.out.println("Received message: " + pitcherMessageSize + " " + pitcherId + " " + pitcherTimestamp + "...");
            System.out.println("Sent message:     " + pitcherMessageSize + " " + pitcherId + " " + currentTimestamp + "...");
            System.out.println();

            //server.close();

         }
         catch(SocketTimeoutException s){
            System.out.println("Socket timed out!");
            break;
         }
         catch(IOException e){
            e.printStackTrace();
            break;
         }
      } 
}
}

【问题讨论】:

  • 您是否看到任何异常?当它停止发送时,程序是结束还是只是挂起?
  • 也不例外,服务器收到第一个id,继续监听。客户端计时器仍然执行 SendMessage 类并尝试发送,但在第一次迭代后没有发送任何内容
  • 据我所知,您正在按计划创建和执行SendMessageSendMessage 每次创建时都会尝试通过 Socket 重新连接。
  • 你知道为什么它没有重新连接吗?
  • 显示你的服务器端代码。您是否每次都终止连接并重新建立?还是你只在客户端做?您也永远不会关闭套接字客户端。我不确定您为什么要连接、发送消息、断开连接然后重新连接。我是不是误会了什么?

标签: java multithreading sockets


【解决方案1】:

您是否考虑过同时制作SendMessage 的socket 和DataOutputStream 成员变量。这是一些代码,可以为您提供一个粗略的开始。您可能需要进行一些增强,例如检查套接字是否打开以及如果当前套接字关闭则能够创建一个新的...

private class SendMessage extends TimerTask {
    private int id = 10;
    private Socket pitchSocket;
    private DataOutputStream out;

    public SendMessage(Socket socket) {
        this.pitchSocket = socket;
        try{
            out = new DataOutputStream(pitchSocket.getOutputStream());
        } catch(IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {  
            out.writeInt(id);
            out.flush();
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
}

【讨论】:

  • 仍然表现相同:/
  • 我想我可能误读了您的原始帖子。您是否创建了多个 SendMessenger?
【解决方案2】:

在能够查看整个代码之后,我认为您肯定有一些线程问题,尽管我认为它们更多地出现在服务器端而不是客户端。您的服务器是单线程的。这意味着您一次只能处理一个请求。你想要一个多线程服务器。我重构了您的代码以创建一个多线程的 Catcher 示例。我正在使用 Thead 类来完成所有这些工作,这可能有点过时了。您可能想看看 java.util.concurrent,它们可能会有更新。

package clientserver;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;

public class Catcher implements Runnable{

private int port;
private String bind;
private ServerSocket serverSocket;



public Catcher(int port, String bind) {

    this.port = port;
    this.bind = bind;
}

@Override
public void run() {

    System.out.println("Catcher running...");
    System.out.println();

    try {
        serverSocket = new ServerSocket(port, 100, InetAddress.getByName(bind));
    } 
    catch (IOException e1) {
        e1.printStackTrace();
   }

    while(true){
         try
         {              
           new Thread(new CatcherHandler(serverSocket.accept())).start();
           Thread.sleep(1000);

        }
        catch(SocketTimeoutException s){
           System.out.println("Socket timed out!");
           break;
        }
        catch(IOException e){
           e.printStackTrace();
           break;
        } catch (InterruptedException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       }
     } 
}

public static void main(String[] argv){
    new Thread( new Catcher(8093, "localhost")).start();;

}
}

class CatcherHandler implements Runnable{   
   Socket server;
   DataOutputStream out;
   DataInputStream in;

   private static final int INT_SIZE = 4;
   private static final int LONG_SIZE = 8;

   public CatcherHandler(Socket server) {
       super();
       this.server = server;
       try {
           in = new DataInputStream(server.getInputStream());
          out = new DataOutputStream(server.getOutputStream());

       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       }

   }



   @Override
   public void run() {
       try{
           if(in.available() > 0){

              Integer pitcherMessageSize = in.readInt();
               Integer pitcherId = in.readInt();
               long pitcherTimestamp = in.readLong();

               //message id and size are sent back
               out.writeInt(pitcherMessageSize);
               out.writeInt(pitcherId);

               //send back current time
               long currentTimestamp = System.currentTimeMillis();
               out.writeLong(currentTimestamp);

               //fill in the rest
               byte[] rest = new byte[pitcherMessageSize - 2 * INT_SIZE - LONG_SIZE]; //message size(default 300 bytes) - size(4 bytes) - message id(4 bytes) - timestamp(8 bytes)
               out.write(rest);

               out.flush();

               System.out.println("Received message: " + pitcherMessageSize + " " + pitcherId + " " + pitcherTimestamp + "...");
               System.out.println("Sent message:     " + pitcherMessageSize + " " + pitcherId + " " + currentTimestamp + "...");
               System.out.println();
               Thread.sleep(1000);

           }
       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       } catch (InterruptedException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       }finally{}
       //server.close();

   }
} 

此外,我还重构了您的客户端,使其能够使用一个套接字并确保安全。现在 SendMessage 接受一个 DataInputStream 和一个 DataOutputSteam 作为它的参数。

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class Pitcher implements Runnable{

private int port;
private int mps;
private int size;
private String hostname;
private List<Integer> messageIds = Collections.synchronizedList(new     ArrayList<Integer>());
private Socket pitcherSocket;
private DataOutputStream out;
private DataInputStream in;

//constatns, integer is 4 bytes, long is 8 bytes
private static final int INT_SIZE = 4;
private static final int LONG_SIZE = 8;


public Pitcher(int port, int mps, int size, String hostname) {

    this.port = port;
    this.mps = mps;
    this.size = size;
    this.hostname = hostname;



    try {
        this.pitcherSocket = new Socket(InetAddress.getByName(hostname), port);
        out = new DataOutputStream(pitcherSocket.getOutputStream());
        in = new DataInputStream(pitcherSocket.getInputStream());
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }


}

public static void main(String[] argv) throws Exception{
    for(int i = 0; i < 10; i++){
        new Thread(new Pitcher(8093, 1, 200, "localhost")).start();
        Thread.sleep(1000);
    }

    Thread.sleep(10000);
}

@Override
public void run(){

    System.out.println("Pitcher running...");
    System.out.println();

    Timer timer = new Timer();

    timer.schedule(new SendMessage(out, in), 0, 1000); 

    //timer.schedule(new DisplayStatistics(), 0, 1000);

}

//Nested class that sends messages
private class SendMessage extends TimerTask{

    private int numberOfSentMessages = 0;
    private int id;
    private DataOutputStream out;
    private DataInputStream in;

    public SendMessage(DataOutputStream out, DataInputStream in){
        this.out = out;
        this.in = in;
    }

    @Override
    public void run() {

        try {                         
            long currentTimestamp = 0L;
            synchronized(out){
                 //send message size
                 out.writeInt(size);

                 //message id is same as number of the sent message
                 id = numberOfSentMessages + 1;
                 out.writeInt(id);
                 messageIds.add(id);



                 //get system timestamp
                 currentTimestamp = System.currentTimeMillis();
                 out.writeLong(currentTimestamp);

                 //fill in the rest-
                 byte[] rest = new byte[size - 2 * INT_SIZE - LONG_SIZE];     //message size(default 300 bytes) - size(4 bytes) - message id(4 bytse) - timestamp(8 bytes)
                 out.write(rest);

                 out.flush();
            }
             numberOfSentMessages++;

             long catcherTimestamp = 0L;
             Integer catcherMessageSize;
             Integer catcherId;
             synchronized(in){
                 catcherMessageSize = in.readInt();
                 catcherId = in.readInt();
                 catcherTimestamp = in.readLong();
             }
             System.out.println("Sent message:     " + size + " " + id + " " + currentTimestamp + "...");
             System.out.println("Received message: " + catcherMessageSize + " " + catcherId + " " + catcherTimestamp + "...");
             System.out.println();
             Thread.sleep(1000);

          }catch(IOException e)
          {
             e.printStackTrace();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}

}

【讨论】:

  • 非常感谢!明天我将尝试实现此代码,并让您知道一切是否正常。你能告诉我你为什么使用thread.sleep(X)吗?我是线程新手。
  • 是的,我正在使用睡眠来允许其他线程运行。基本上,当您调用 sleep 方法时,它允许调度程序允许其他线程运行。
  • 我遇到了另一个问题。当投手发送消息时,捕手 System.currentTimeMillis();比投手小。
  • Catcher 和 Pitcher 是否安装在两台不同的机器上?如果是这样并且两者都在运行 *nix,请打开两个终端并在终端窗口中输入“日期”。这两台机器上的时间日期可能都已关闭。我相信您应该能够使用一些外部时间同步服务来获得正确的时间。这是假设您对两个测试框都有管理员访问权限。如果这是问题所在并且您拥有管理员权限,请查看此link。祝你好运!
【解决方案3】:

Java Socket 类不是线程安全的。要让多个线程访问同一个 Socket 对象,您需要同步它们的操作。这可以通过为您的所有 SendMessage 线程提供一个公共对象来完成,该对象将充当锁。您计划使用的每个套接字操作(例如读取和写入)都需要一个对象。然后,将调用 Socket 对象的每个操作重构为单独的方法,并围绕该对象同步它们。例如。对于读取操作,您可以在 SendMessage 中拥有一个名为 read() 的方法,该方法调用 Socket.read 并围绕锁定对象同步该方法以进行读取。

private class SendMessage extends TimerTask{

    private Object readLock; 
    private Socket socket;

    public SendMessage(Object readLock, Socket socket) {
        this.readLock = readLock;
        this.socket = socket;
    }

    public void readFromSocket() {
         synchronized(readLock) {
              socket.read();
         }
    }

    @Override  
    public void run() {
        readFromSocket();
        // do other stuff
    }

}

【讨论】:

  • 这并不能解释他的问题的原因。
  • 只有当他在多个线程中使用同一个socket对象时才会出现问题。因此,很可能是由于套接字不是线程安全的。请详细说明您认为这不是问题的原因。
  • 但是线程错误不会导致他看到的不当行为。
  • 问题中没有代码在两个线程中使用同一个socket。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-11-08
  • 2020-12-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-10-02
  • 1970-01-01
相关资源
最近更新 更多