【发布时间】:2026-01-16 19:25:01
【问题描述】:
我对 JAVA 不熟悉,并且在理解一个特定问题方面挣扎了好几天。 我想从 Azure 云接收消息,并且我正在使用 Azure-Service-Bus。
根据azure-service-bus documentation,我已经构建了一个如下所示的java文件:
import com.azure.messaging.servicebus.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.List;
static String connectionString = "<NAMESPACE CONNECTION STRING>";
static String topicName = "<TOPIC NAME>";
static String subName = "<SUBSCRIPTION NAME>";
// handles received messages
static void receiveMessages() throws InterruptedException
{
CountDownLatch countdownLatch = new CountDownLatch(1);
// Create an instance of the processor through the ServiceBusClientBuilder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.processor()
.topicName(topicName)
.subscriptionName(subName)
.processMessage(ServiceBusTopicTest::processMessage)
.processError(context -> processError(context, countdownLatch))
.buildProcessorClient();
System.out.println("Starting the processor");
processorClient.start();
TimeUnit.SECONDS.sleep(10);
System.out.println("Stopping and closing the processor");
processorClient.close();
}
private static void processMessage(ServiceBusReceivedMessageContext context) {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
message.getSequenceNumber(), message.getBody());
}
private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (!(context.getException() instanceof ServiceBusException)) {
System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
return;
}
ServiceBusException exception = (ServiceBusException) context.getException();
ServiceBusFailureReason reason = exception.getReason();
if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
|| reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
|| reason == ServiceBusFailureReason.UNAUTHORIZED) {
System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
reason, exception.getMessage());
countdownLatch.countDown();
} else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
System.out.printf("Message lock lost for message: %s%n", context.getException());
} else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
try {
// Choosing an arbitrary amount of time to wait until trying again.
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.err.println("Unable to sleep for period of time");
}
} else {
System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
reason, context.getException());
}
}
public static void main(String[] args) throws InterruptedException {
receiveMessages();
}
到目前为止一切正常,来自 Azure 的消息打印在控制台日志中。
尽管如此,我的目标是将消息保存在我想在main() 函数中创建的变量/数组列表中。所以我尝试将processMessage() 函数中的消息返回给main() 函数,但它不起作用,因为它是一个静态void 类。我还尝试创建一个数组列表变量并将其传递给processMessage() 函数,但我没有了解如何在processClient 创建中传递数组列表变量。
我正在使用一个名为 Mendix 的低代码平台,并且想要实现这个自定义 java 代码,并且只是在努力将消息传递到数组列表到 main() 函数。
谁能帮助我或给我一个提示如何解决这个问题?
谢谢, 奥马尔
【问题讨论】:
标签: java azure azureservicebus