消息是一个抽象的概念。消息可以被认为是必须以某种方式发出、传输、接收和处理的数据的一部分。它不依赖于特定的编程语言、框架或库。除了 Spring 之外,Websocket、JMS 等消息在 Win32 API、D-Bus、网络等低级别上无处不在。
对于您的任务,消息可以只是一个字符串。由于两个播放器在同一个进程中运行,因此无需将其表示为 JSON/XML/等。
原始任务描述相当广泛。在我看来,你不需要递归,而是一种钟摆。第一个播放器发送初始化消息并等待响应。同时,第二个玩家收到初始化消息,将他的计数器添加到消息中并回复。然后第一个玩家在收到响应后醒来,添加他的计数器并发回。这种交流一次又一次地重复。此外,每个玩家在收到上一条消息的答复之前不得发送下一条消息。
Blocking queues 是在一个进程中使用纯 Java 的最佳方法。
考虑以下代码:
import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class MessageTask
{
// Blocking queue looks superfluous for single message. But such a queue saves us from cumbersome
// synchronization of the threads.
private static final int MAX_MESSAGES_IN_QUEUE = 1;
public static void main(String[] args)
{
BlockingQueue<String> firstToSecond = new ArrayBlockingQueue<String>(MAX_MESSAGES_IN_QUEUE);
BlockingQueue<String> secondToFirst = new ArrayBlockingQueue<String>(MAX_MESSAGES_IN_QUEUE);
// Both players use the same queues symmetrically.
InitiatorPlayer firstPlayer = new InitiatorPlayer(firstToSecond, secondToFirst);
Player secondPlayer = new Player(secondToFirst, firstToSecond);
// Please note that we can start threads in reverse order. But thankfully to
// blocking queues the second player will wait for initialization message from
// the first player.
new Thread(secondPlayer).start();
new Thread(firstPlayer).start();
}
}
class Player implements Runnable
{
protected final BlockingQueue<String> sent;
protected final BlockingQueue<String> received;
// Please aware that integer field may overflow during prolonged run
// of the program. So after 2147483647 we'll get -2147483648. We can
// either use BigInteger or compare the field with Integer.MAX_VALUE
// before each increment.
//
// Let's choose BigInteger for simplicity.
private BigInteger numberOfMessagesSent = new BigInteger("0");
public Player(BlockingQueue<String> sent, BlockingQueue<String> received)
{
this.sent = sent;
this.received = received;
}
@Override
public void run()
{
while (true)
{
String receivedMessage = receive();
reply(receivedMessage);
}
}
protected String receive()
{
String receivedMessage;
try
{
// Take message from the queue if available or wait otherwise.
receivedMessage = received.take();
}
catch (InterruptedException interrupted)
{
String error = String.format(
"Player [%s] failed to receive message on iteration [%d].",
this, numberOfMessagesSent);
throw new IllegalStateException(error, interrupted);
}
return receivedMessage;
}
protected void reply(String receivedMessage)
{
String reply = receivedMessage + " " + numberOfMessagesSent;
try
{
// Send message if the queue is not full or wait until one message
// can fit.
sent.put(reply);
System.out.printf("Player [%s] sent message [%s].%n", this, reply);
numberOfMessagesSent = numberOfMessagesSent.add(BigInteger.ONE);
// All players will work fine without this delay. It placed here just
// for slowing the console output down.
Thread.sleep(1000);
}
catch (InterruptedException interrupted)
{
String error = String.format(
"Player [%s] failed to send message [%s] on iteration [%d].",
this, reply, numberOfMessagesSent);
throw new IllegalStateException(error);
}
}
}
class InitiatorPlayer extends Player
{
private static final String INIT_MESSAGE = "initiator player";
public InitiatorPlayer(BlockingQueue<String> sent, BlockingQueue<String> received)
{
super(sent, received);
}
@Override
public void run()
{
sendInitMessage();
while (true)
{
String receivedMessage = receive();
reply(receivedMessage);
}
}
private void sendInitMessage()
{
try
{
sent.put(INIT_MESSAGE);
System.out.printf("Player [%s] sent message [%s].%n", this, INIT_MESSAGE);
}
catch (InterruptedException interrupted)
{
String error = String.format(
"Player [%s] failed to sent message [%s].",
this, INIT_MESSAGE);
throw new IllegalStateException(error, interrupted);
}
}
}
示例输出:
Player [InitiatorPlayer@712dc5e9] sent message [initiator player].
Player [Player@69bf9b51] sent message [initiator player 0].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0].
Player [Player@69bf9b51] sent message [initiator player 0 0 1].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3 3 4].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3 4 4].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5 5].
PS您的机器上的输出可能略有不同,如下所示:
Player [InitiatorPlayer@82b9342] sent message [initiator player].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0].
Player [Player@5d7a0209] sent message [initiator player 0].
Player [Player@5d7a0209] sent message [initiator player 0 0 1].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3 3 4].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3 4 4].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5 5].
这是因为玩家在不同的线程中工作。以下情况是可能的:
- 第一个玩家发送消息并将日志记录打印到控制台。
- 第二个玩家收到消息并发送回复。但是相应的线程在发送回复后立即被线程调度程序暂停。
- 第一个玩家收到回复,发送另一条消息并将日志记录打印到控制台。
- 第二个玩家的线程被线程调度程序唤醒,并打印关于他在第 3 点中提到的回复的日志记录。
这种行为是正确的。玩家通过队列同步,例如第一个玩家在回答之前不会发送新消息。但是将日志打印到控制台与发送/接收消息不同步(也不能同步)。
PS2 只需一个阻塞队列(甚至使用单个互斥体)即可解决该任务。但是两个单独的队列更适合说明和可能的解决方案扩展。