【发布时间】:2021-04-15 05:16:15
【问题描述】:
我有一个 ActiveMQ 队列,其中有几条使用持久设置为 true 发送的消息。当我在 Python 中创建订阅者来读取队列时,我得到了队列中的所有消息。下次我打开订阅者时,我会收到所有相同的消息。我调整了写入队列的代码以将持久设置为 false,但消息仍保留在队列中。我是否忽略了发送确认?
代码是使用 Python 2.7 编写的,因为这是我们的客户使用的。我很想升级它们,但我没有时间。
这是读取队列的脚本:
import socket
import threading
import xml.etree.ElementTree as etree
from xml.dom import minidom # for pretty printing
# import SampleXML
import sys
import os
import math
import time
from time import monotonic
import string
import stomp # for queue support
import platform
class ConnectionListener(stomp.ConnectionListener):
def __init__(self, connection):
self.connection = connection
print ("Listener created")
def on_message(self, message):
print ("Received message with body ") + message.body
class Reader:
def __init__(self):
pass
def ConnectToQueue(self):
#For Production
user = os.getenv("ACTIVEMQ_USER") or "worthington"
#user = os.getenv("ACTIVEMQ_USER") or "worthington_test"
password = os.getenv("ACTIVEMQ_PASSWORD") or "level3"
host = os.getenv("ACTIVEMQ_HOST") or "localhost"
port = os.getenv("ACTIVEMQ_PORT") or 61613
# destination = sys.argv[1:2] or ["/topic/event"]
# destination = destination[0]
dest = "from_entec_test"
#For Production
# dest = "from_entec"
try:
conn = stomp.Connection10(host_and_ports = [(host, port)])
conn.set_listener('message', ConnectionListener(conn))
# conn.start()
# subscribe_id = '-'.join(map(str, (platform.node(), os.getppid(), os.getpid())))
conn.connect(login=user,passcode=password)
subscribe_id = "Queue Test Listener"
conn.subscribe(destination=dest, id=subscribe_id, ack='client-individual')
conn.unsubscribe(id=subscribe_id)
conn.disconnect()
except Exception as error:
reason = str(error)
print("Exception when readig data from queue: " + str(error))
pass
if __name__ == "__main__" :
try:
UploadData = Reader()
UploadData.ConnectToQueue()
print ("Reader finished.")
except Exception as Value:
reason = str(Value)
pass
这是写入它的代码:
import socket
import threading
import xml.etree.ElementTree as etree
from xml.dom import minidom # for pretty printing
# import SampleXML
import sys
import os
import math
import time
from time import monotonic
import string
import stomp # for queue support
import platform
class ConnectionListener(stomp.ConnectionListener):
def __init__(self, connection):
self.connection = connection
print "Listener created"
def on_message(self, message):
print "Received message with body " + message.body
class UploadData:
def __init__(self):
pass
def ConnectToQueue(self):
#For Production
user = os.getenv("ACTIVEMQ_USER") or "worthington"
#user = os.getenv("ACTIVEMQ_USER") or "worthington_test"
password = os.getenv("ACTIVEMQ_PASSWORD") or "level3"
host = os.getenv("ACTIVEMQ_HOST") or "localhost"
port = os.getenv("ACTIVEMQ_PORT") or 61613
# destination = sys.argv[1:2] or ["/topic/event"]
# destination = destination[0]
dest = "from_entec_test"
#For Production
# dest = "from_entec"
try:
conn = stomp.Connection10(host_and_ports = [(host, port)])
# conn.start()
# subscribe_id = '-'.join(map(str, (platform.node(), os.getppid(), os.getpid())))
subscribe_id = "Queue Test Listener"
conn.connect(login=user,passcode=password)
message = "This is a test message."
conn.send(dest, message, persistent='true')
print "Sent message containing: " + message
conn.disconnect()
except Exception, error:
reason = str(error)
print "Exception when writing data to queue: " + str(error)
pass
if __name__ == "__main__" :
try:
UploadData = UploadData()
UploadData.ConnectToQueue()
except Exception, Value:
reason = str(Value)
print "Main routine exception: " + str(Value)
pass
【问题讨论】:
-
消息是否持久对消费者是否确认消息没有影响。持久消息将在代理重新启动后继续存在,仅此而已。
标签: activemq