【发布时间】:2021-05-09 21:08:35
【问题描述】:
现在我想构建一个 ArrayList 消息队列,它会自动填充来自 MQTT 代理的消息。例如,如果从 MQTT 发送了 3 条消息,那么在 MessageQueue 中,应该自动有 3 条消息。我当前的代码是
import java.util.ArrayList;
import TestMQTT.SubscribeSample;
import jade.core.Agent;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.ACLMessage;
public class ServerAgent extends Agent {
private static final long serialVersionUID = 1L;
protected ArrayList<ACLMessage> MessageQueue = null;
// protected ArrayList<String> myProcess = new ArrayList<>();
public void setup() {
SubscribeSample subscribeSample = new SubscribeSample();
subscribeSample.subscribe("Json");
//repeat the following actions every 5 seconds
this.addBehaviour(new TickerBehaviour(this, 5000) {
@Override
protected void onTick() {
if (subscribeSample.getMsg() != null && MessageQueue != null) {
MessageQueue.add(subscribeSample.getMsg());
System.out.println("Current MessageQueue is:" + MessageQueue);
} else {
System.out.println("No message yet");
}
System.out.println("The whole MessageQueue are:" + MessageQueue);
}
subscribesample类定义如下:
public class SubscribeSample {
public static String arrivedMessage;
public static ArrayList<Object> ReceivedRequests = new ArrayList<>();
public static ACLMessage msg = null;
public static void subscribe(String TOPIC) {
String broker = "tcp://192.168.137.100:1883";
int qos = 1;
String clientid = "mqtt-explorer-3260c410";
MaintestOutput m = new MaintestOutput();
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String TOPIC, MqttMessage message)
throws ClassNotFoundException, IOException {
System.out.println("======get message from [" + TOPIC + "]======");
System.out.println("message content:" + new String(message.getPayload()));
String Json = new String(message.getPayload());
Gson gson = new Gson();
msg = gson.fromJson(Json, ACLMessage.class);
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
);
// estblished connection
System.out.println("conneted to the broker: " + broker);
client.connect(options);
System.out.println("connected successfully");
client.subscribe(TOPIC, qos);
System.out.println("start listening" + TOPIC);
} catch (
Exception e) {
e.printStackTrace();
}
}
在我的ServerAgent中,System.out.println("Current MessageQueue is:" + MessageQueue)只会显示当前收到的消息,不会显示之前的消息,而System.out.println("整个MessageQueue的输出是:" + MessageQueue) 永远为零。
我想实现ServerAgent收到的每一条消息,都会自动发送到MessageQueue,System.out.println("The whole MessageQueue are:" + MessageQueue)的输出就是所有的消息
【问题讨论】:
标签: java arrays arraylist pug mqtt