【发布时间】:2012-09-03 07:39:31
【问题描述】:
我需要使用 PHP 在 ActiveMQ 队列中查找特定消息并将其删除。
AFAIK 这样做的唯一方法是读取当前排队的所有消息并确认我感兴趣的一条消息。(The example in the PHP manual for Stomp::ack 或多或少做同样的事情,他们不会阅读所有消息,但只确认匹配的那个)。
所以,我写了这段代码(显然,这只是相关部分):
class StompController {
private $con;
public function __construct($stompSettings) {
try {
$this->con = new Stomp($stompSettings['scheme']."://".$stompSettings['host'].":".$stompSettings['port']);
$this->con->connect();
$this->con->setReadTimeout(5);
} catch(StompException $e) {
die('Connection failed:' .$e->getMessage());
}
}
public function __destruct() {
$this->con->disconnect();
}
public function ackMessageAsRead($recipient,$message) {
if($this->con->isConnected()) {
//Subscribe to the recipient user's message queue.
$this->con->subscribe("/queue/".$recipient);
//Read all messages currently in the queue (but only ACK the one we're interested in).
while($this->con->hasFrameToRead()) {
$msg = $this->con->readFrame();
if($msg != null && $msg != false) {
//This is the message we are currently reading, ACK it to AMQ and be done with it.
if($msg->body == $message) {
$this->con->ack($msg);
}
}
}
} else {
return false;
}
}
}
根据我的逻辑,这应该可行。 在运行代码时,尽管检查了更多帧,但只有 一个 随机 消息被读取。
似乎只有在我们当前正在读取的帧已被确认时才准备下一帧。 (当我手动确认所有消息时,while 循环按预期工作,所有消息都被处理。
有谁知道如何从队列中获取完整的消息集,而无需确认所有消息?我可以确认所有消息,然后将我不感兴趣的消息放回队列中,但是这种查找单个消息的本已低效的方法会变得更加低效。
【问题讨论】:
-
您是否将 activemq.prefetchSize 设置为任何值?如果 activemq.prefetchSize 为 1,broker 在发送下一条消息之前会等待消息的 ack。
-
@Buchi 这听起来正是我正在寻找的东西!当我明天回到办公室时,我会检查一下更改是否对我有任何好处,但这听起来很像问题。
-
@Buchi 确实是问题所在,订阅队列时更高的
activemq.prefetchSize标头确实获取了剩余的消息。不幸的是,第一个帧之后的帧被“�”字符而不是\x00错误终止(或被PHP Stomp客户端错误解析,我猜是后者),它整理了第二个正文字段中的所有剩余消息信息。我想我现在坚持我最初的计划:将消息反馈到队列中。愿意将您的评论转换为答案,以便我可以接受它是正确的吗?