【问题标题】:How can I remove messages from an ActiveMQ queue using Python?如何使用 Python 从 ActiveMQ 队列中删除消息?
【发布时间】:2021-04-15 05:16:15
【问题描述】:

我有一个 ActiveMQ 队列,其中有几条使用持久设置为 true 发送的消息。当我在 Python 中创建订阅者来读取队列时,我得到了队列中的所有消息。下次我打开订阅者时,我会收到所有相同的消息。我调整了写入队列的代码以将持久设置为 false,但消息仍保留在队列中。我是否忽略了发送确认?

代码是使用 Python 2.7 编写的,因为这是我们的客户使用的。我很想升级它们,但我没有时间。

这是读取队列的脚本:

import socket
import threading
import xml.etree.ElementTree as etree
from xml.dom import minidom  # for pretty printing
# import SampleXML
import sys
import os
import math
import time
from time import monotonic
import string
import stomp # for queue support
import platform


class ConnectionListener(stomp.ConnectionListener):
    def __init__(self, connection):
        self.connection = connection
        print ("Listener created")

    def on_message(self, message):
        print ("Received message with body ") + message.body

class Reader:
    def __init__(self):
        pass
    
    def ConnectToQueue(self):
        #For Production
        user = os.getenv("ACTIVEMQ_USER") or "worthington"
        #user = os.getenv("ACTIVEMQ_USER") or "worthington_test"
        password = os.getenv("ACTIVEMQ_PASSWORD") or "level3"
        host = os.getenv("ACTIVEMQ_HOST") or "localhost"
        port = os.getenv("ACTIVEMQ_PORT") or 61613
        # destination = sys.argv[1:2] or ["/topic/event"]
        # destination = destination[0]
        dest = "from_entec_test"
        #For Production
        # dest = "from_entec"

        try:
            conn = stomp.Connection10(host_and_ports = [(host, port)])
            conn.set_listener('message', ConnectionListener(conn))
            # conn.start()
            # subscribe_id = '-'.join(map(str, (platform.node(), os.getppid(), os.getpid())))
            conn.connect(login=user,passcode=password)
            subscribe_id = "Queue Test Listener"
            conn.subscribe(destination=dest, id=subscribe_id, ack='client-individual')
            conn.unsubscribe(id=subscribe_id)
            conn.disconnect()
        except Exception as error:
            reason = str(error)
            print("Exception when readig data from queue: " + str(error))
        
        pass

if __name__ == "__main__" :

    try:
        UploadData = Reader()
        UploadData.ConnectToQueue()
        print ("Reader finished.")
    except Exception as Value:
        reason = str(Value)
        pass

这是写入它的代码:

import socket
import threading
import xml.etree.ElementTree as etree
from xml.dom import minidom  # for pretty printing
# import SampleXML
import sys
import os
import math
import time
from time import monotonic
import string
import stomp # for queue support
import platform


class ConnectionListener(stomp.ConnectionListener):
    def __init__(self, connection):
        self.connection = connection
        print "Listener created"

    def on_message(self, message):
        print "Received message with body " + message.body

class UploadData:
    def __init__(self):
        pass
    
    def ConnectToQueue(self):
        #For Production
        user = os.getenv("ACTIVEMQ_USER") or "worthington"
        #user = os.getenv("ACTIVEMQ_USER") or "worthington_test"
        password = os.getenv("ACTIVEMQ_PASSWORD") or "level3"
        host = os.getenv("ACTIVEMQ_HOST") or "localhost"
        port = os.getenv("ACTIVEMQ_PORT") or 61613
        # destination = sys.argv[1:2] or ["/topic/event"]
        # destination = destination[0]
        dest = "from_entec_test"
        #For Production
        # dest = "from_entec"

        try:
            conn = stomp.Connection10(host_and_ports = [(host, port)])
            # conn.start()
            # subscribe_id = '-'.join(map(str, (platform.node(), os.getppid(), os.getpid())))
            subscribe_id = "Queue Test Listener"
            conn.connect(login=user,passcode=password)
            message = "This is a test message."
            conn.send(dest, message, persistent='true')
            print "Sent message containing: " + message
            conn.disconnect()
        except Exception, error:
            reason = str(error)
            print "Exception when writing data to queue: " + str(error)
        
        pass

if __name__ == "__main__" :

    try:
        UploadData = UploadData()
        UploadData.ConnectToQueue()
    except Exception, Value:
        reason = str(Value)
        print "Main routine exception: " + str(Value)
        pass

        

【问题讨论】:

  • 消息是否持久对消费者是否确认消息没有影响。持久消息将在代理重新启动后继续存在,仅此而已。

标签: activemq


【解决方案1】:

我对 Python STOMP 客户端不是很熟悉,但从代码来看,您似乎使用 STOMP 的“客户端-个人”模式订阅,这意味着您收到的每条消息都需要您发送回带有消息的 ACK 帧Id 值,以便远程可以将其标记为已使用。由于您没有这样做,因此消息不会从队列中删除。

作为替代方案,您可以使用“自动”确认模式,该模式在代理发送消息后立即将消息标记为已使用。了解STOMP订阅模式请参考STOMPspecification

【讨论】:

    猜你喜欢
    • 2017-06-09
    • 2018-09-28
    • 1970-01-01
    • 1970-01-01
    • 2014-09-17
    • 2016-09-21
    • 2017-12-22
    • 2019-02-08
    • 2014-06-07
    相关资源
    最近更新 更多