【发布时间】:2014-02-13 11:33:15
【问题描述】:
我想为消息头中的每条消息设置一个唯一的 guid,然后如果我愿意,我可以删除一条特定的消息。 NMS中是否有任何api可以帮助我删除消息?我正在使用 ActiveMQ 5.9.0 和 NMS 1.6.1
【问题讨论】:
标签: activemq message-queue nms
我想为消息头中的每条消息设置一个唯一的 guid,然后如果我愿意,我可以删除一条特定的消息。 NMS中是否有任何api可以帮助我删除消息?我正在使用 ActiveMQ 5.9.0 和 NMS 1.6.1
【问题讨论】:
标签: activemq message-queue nms
非常有可能直接从 C# 和 NMS 库中执行单独删除,而无需 REST API。
我在 C# 项目中使用来自 NuGet 的 NMS 18.0 作为我们的支持和维护工具 Nodinite,这是来自众多 monitoring agents 之一的代码,在本例中用于 ActiveMQ。此代码用于删除单独选择的消息。
此代码从队列 (queueName) 中删除 1 条消息 (messageId)
internal static void DeleteMessageFromQueue(ActiveMQOption activeMQOption, string queueName, string messageId)
{
IConnectionFactory factory = CreateConnectionFactory(activeMQOption);
using (IConnection connection = factory.CreateConnection())
{
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using (IDestination destination = session.GetQueue(queueName))
{
using (IMessageConsumer consumer = session.CreateConsumer(destination, $"JMSMessageID LIKE '%{messageId}%'"))
{
connection.Start();
var message = consumer.Receive(new TimeSpan(0, 0, 1));
consumer.Close();
connection.Close();
if (message == null)
{
throw new Exception($"Message '{messageId}' not found on queue '{queueName}'");
}
}
}
}
}
}
创建工厂的辅助方法(使用一个简单的模型 c# 类,您需要将其替换为您自己的代码,但示例应该足够简单以便遵循)
public static Apache.NMS.ActiveMQ.ConnectionFactory CreateConnectionFactory(ActiveMQOption activeMQOption)
{
Uri connecturi = new Uri(activeMQOption.ConnectionString);
Apache.NMS.ActiveMQ.ConnectionFactory factory = new Apache.NMS.ActiveMQ.ConnectionFactory(connecturi);
if (activeMQOption.UseAuthentication)
{
factory.UserName = activeMQOption.User;
factory.Password = activeMQOption.Password;
}
return factory;
}
【讨论】:
是的,但前提是该目的地没有活跃的消费者。你可以这样做:
protected static void DeleteDestination()
{
IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
using (Connection connection = factory.CreateConnection() as Connection)
{
using (ISession session = connection.CreateSession())
{
IQueue queue = session.GetQueue(testQueueName);
try
{
connection.DeleteDestination(queue);
}
catch
{
}
}
}
}
【讨论】: