【发布时间】:2018-08-22 06:56:58
【问题描述】:
我正在尝试使用 MQTT 和 Azure 设备 SDK 库将消息发送到 Azure IOT 集线器。在 IOT Hub 上配置了两个设备。两个设备都有不同的连接字符串。
设备 1 的连接字符串
connString = "HostName=ABC.azure-devices.net;DeviceId=ABC;SharedAccessKey=sharedKey";
设备 2 的连接字符串
connString = "HostName=DEF.azure-devices.net;DeviceId=DEF;SharedAccessKey=sharedKey";
我已经编写了两个发布者,它们会将消息发送到 IOT 集线器,订阅者将从发布者发送的 IOT 集线器接收消息。
在第一个发布者中,我为设备 1 传递了连接字符串,为第二个发布者传递了设备 2 的连接字符串。现在,当我同时运行两个发布者类时,订阅者正在接收来自两个发布者的消息。谁能告诉我如何修改代码,以便订阅者只接收发布者 1 发送的消息,即使两个发布者同时运行。
这是我的 Publisher 1 代码。
package com.iot.mqtt.connection;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.gson.Gson;
import com.microsoft.azure.sdk.iot.device.DeviceClient;
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.device.IotHubEventCallback;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
//import com.microsoft.docs.iothub.samples.SimulatedDevice.EventCallback;
//import com.microsoft.docs.iothub.samples.SimulatedDevice.MessageSender;
//import com.microsoft.docs.iothub.samples.SimulatedDevice.TelemetryDataPoint;
public class SimulatedDevice {
// The device connection string to authenticate the device with your IoT hub.
// Using the Azure CLI:
// az iot hub device-identity show-connection-string --hub-name {YourIoTHubName} --device-id MyJavaDevice --output table
private static String connString = "HostName=ABC.azure-devices.net;DeviceId=ABC;SharedAccessKey=aTVNu55sN9a2Y9+V0BCAOXdo8nSFDNzByfiTqMvNb20=";
// Using the MQTT protocol to connect to IoT Hub
private static IotHubClientProtocol protocol = IotHubClientProtocol.MQTT;
private static DeviceClient client;
// Specify the telemetry to send to your IoT hub.
private static class TelemetryDataPoint {
public double temperature;
public double humidity;
public String message;
public String timeStamp;
// Serialize object to JSON format.
public String serialize() {
Gson gson = new Gson();
return gson.toJson(this);
}
}
// Print the acknowledgement received from IoT Hub for the telemetry message sent.
private static class EventCallback implements IotHubEventCallback {
public void execute(IotHubStatusCode status, Object context) {
System.out.println("IoT Hub responded to message with status: " + status.name());
if (context != null) {
synchronized (context) {
context.notify();
}
}
}
}
private static class MessageSender implements Runnable {
public void run() {
try {
// Initialize the simulated telemetry.
double minTemperature = 20;
double minHumidity = 60;
String message;
Random rand = new Random();
InputStream is = null;
Properties prop = null;
prop = new Properties();
is = new FileInputStream("C:\\Users\\H251970\\eclipse-workspace\\IOTMqttTestProject\\resources\\config.properties");
prop.load(is);
message = prop.getProperty("message");
minTemperature = Double.parseDouble(prop.getProperty("temperature"));
minHumidity = Double.parseDouble(prop.getProperty("humidity"));
//System.out.println(message);
while (true) {
// Simulate telemetry.
double currentTemperature = minTemperature + rand.nextDouble() * 15;
double currentHumidity = minHumidity + rand.nextDouble() * 20;
String datatimeStamp= new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime());;
TelemetryDataPoint telemetryDataPoint = new TelemetryDataPoint();
telemetryDataPoint.temperature = currentTemperature;
telemetryDataPoint.humidity = currentHumidity;
telemetryDataPoint.message = message;
telemetryDataPoint.timeStamp = datatimeStamp;
// Add the telemetry to the message body as JSON.
String msgStr = telemetryDataPoint.serialize();
Message msg = new Message(msgStr);
// Add a custom application property to the message.
// An IoT hub can filter on these properties without access to the message body.
msg.setProperty("temperatureAlert", (currentTemperature > 30) ? "true" : "false");
System.out.println("Sending message: " + msgStr);
Object lockobj = new Object();
// Send the message.
EventCallback callback = new EventCallback();
client.sendEventAsync(msg, callback, lockobj);
synchronized (lockobj) {
lockobj.wait();
}
Thread.sleep(3000);
}
} catch (Exception e) {
System.out.println("Exception: " + e);
} /*finally {
inputStream.close();
}*/
}
}
public static void main(String[] args) throws IOException, URISyntaxException {
// Connect to the IoT hub.
client = new DeviceClient(connString, protocol);
client.open();
// Create new thread and start sending messages
MessageSender sender = new MessageSender();
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.execute(sender);
// Stop the application.
System.out.println("Press ENTER to exit.");
System.in.read();
executor.shutdownNow();
client.closeNow();
}
}
发布者 2 的代码也相同,只是连接字符串不同。
connString = "HostName=DEF.azure-devices.net;DeviceId=DEF;SharedAccessKey=aTVNu55sN9a2Y9+V0BCAOXdo8nSFDNzByfiTqMvNb20=";
订阅者正在接收以下格式的消息。
Sending message: {"temperature":"27.739594911863872°C","voltage":"15.81301816513805V","motorspeed":"5.0m/s","inverterName":"i550","timeStamp":"22/08/2018 11:18:03"}
IoT Hub responded to message with status: OK_EMPTY
这里是订阅者类的代码
package com.microsoft.docs.iothub.samples;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.nio.charset.Charset;
import java.net.URI;
import java.net.URISyntaxException;
public class ReadDeviceToCloudMessages {
// az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {your IoT Hub name}
//private static final String eventHubsCompatibleEndpoint = "{your Event Hubs compatible endpoint}";
private static final String eventHubsCompatibleEndpoint = "sb://ihsuprodmares009dednamespace.servicebus.windows.net/";
// az iot hub show --query properties.eventHubEndpoints.events.path --name {your IoT Hub name}
//private static final String eventHubsCompatiblePath = "{your Event Hubs compatible name}";
private static final String eventHubsCompatiblePath = "eventHubsCompatiblePathString";
// az iot hub policy show --name iothubowner --query primaryKey --hub-name {your IoT Hub name}
private static final String iotHubSasKey = "iotHubSasKeyString=";
private static final String iotHubSasKeyName = "iothubowner";
// Track all the PartitionReciever instances created.
private static ArrayList<PartitionReceiver> receivers = new ArrayList<PartitionReceiver>();
// Asynchronously create a PartitionReceiver for a partition and then start
// reading any messages sent from the simulated client.
private static void receiveMessages(EventHubClient ehClient, String partitionId)
throws EventHubException, ExecutionException, InterruptedException {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
// Create the receiver using the default consumer group.
// For the purposes of this sample, read only messages sent since
// the time the receiver is created. Typically, you don't want to skip any messages.
ehClient.createReceiver(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, partitionId,
EventPosition.fromEnqueuedTime(Instant.now())).thenAcceptAsync(receiver -> {
System.out.println(String.format("Starting receive loop on partition: %s", partitionId));
System.out.println(String.format("Reading messages sent since: %s", Instant.now().toString()));
receivers.add(receiver);
while (true) {
try {
// Check for EventData - this methods times out if there is nothing to retrieve.
Iterable<EventData> receivedEvents = receiver.receiveSync(100);
// If there is data in the batch, process it.
if (receivedEvents != null) {
for (EventData receivedEvent : receivedEvents) {
System.out.println(String.format("Telemetry received:\n %s",
new String(receivedEvent.getBytes(), Charset.defaultCharset())));
System.out.println(String.format("Application properties (set by device):\n%s",receivedEvent.getProperties().toString()));
System.out.println(String.format("System properties (set by IoT Hub):\n%s\n",receivedEvent.getSystemProperties().toString()));
}
}
} catch (EventHubException e) {
System.out.println("Error reading EventData");
}
}
}, executorService);
}
public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException, URISyntaxException {
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setEndpoint(new URI(eventHubsCompatibleEndpoint))
.setEventHubName(eventHubsCompatiblePath)
.setSasKeyName(iotHubSasKeyName)
.setSasKey(iotHubSasKey);
// Create an EventHubClient instance to connect to the
// IoT Hub Event Hubs-compatible endpoint.
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
// Use the EventHubRunTimeInformation to find out how many partitions
// there are on the hub.
final EventHubRuntimeInformation eventHubInfo = ehClient.getRuntimeInformation().get();
// Create a PartitionReciever for each partition on the hub.
for (String partitionId : eventHubInfo.getPartitionIds()) {
receiveMessages(ehClient, partitionId);
}
// Shut down cleanly.
System.out.println("Press ENTER to exit.");
System.in.read();
System.out.println("Shutting down...");
for (PartitionReceiver receiver : receivers) {
receiver.closeSync();
}
ehClient.closeSync();
executorService.shutdown();
System.exit(0);
}
}
现在我试图弄清楚如何仅在两个发布者同时运行时才从发布者 1 接收消息。提前致谢。
【问题讨论】:
-
嗨,Harshad,请不要直接显示您的主机名和 SharedAccessKey。这些是私人信息。
-
你能展示订阅者如何接收消息吗?注意:Azure IoT Hub 不是通用 MQTT 代理。
-
谢谢丽塔...以上是我的订阅者类代码。
-
嗨,Harshad,如果有帮助,请告诉我。如果您有任何疑问,请随时发布。
-
嗨,丽塔...感谢您的回答..但是我一直在尝试找出一种仅通过该主题发布和订阅的方法。我找到了我们可以通过主题向物联网中心发送消息并通过主题订阅消息的链接。 docs.microsoft.com/en-us/azure/iot-hub/… 我无法完全理解这一点,因为我对它很陌生,而且代码是用 python 编写的。我正在尝试使用 paho mqtt 库在 java 中复制相同的内容。如果我在任何地方错了,请纠正我。
标签: azure mqtt azure-iot-hub