【发布时间】:2012-01-13 23:43:12
【问题描述】:
我有一个问题,在另一个消费者收到消息之前,将大约 20 个 AMQP 消息发布到 RabbitMQ 中的共享队列会被批量延迟。我的应用程序拓扑具有三个端点:
- 第一个端点将生成一批消息,每个消息都包含网络设备的 FQDN,以从中收集配置文件。该端点将有一个实例,并且是名为 net.svc.ssh 的直接队列的唯一发布者。
- 第二个端点将使用其中一条消息,通过 SSH 连接到指定的网络设备,运行 CLI 命令并将结果从每个设备发布到另一个队列。该端点将有多个实例,所有这些实例都从 net.svc.ssh 消耗并发布到 net.svc.savefile。
- 第三个端点将接收该结果并将其保存到本地磁盘中的单独文件中。此实例是 net.svc.savefile 的唯一使用者,不发布任何消息。
我看到的是,所有第二个端点都将“保留”它们的传出消息,直到 ssh 队列中没有消息可供使用。这会阻止将设备配置及时传递到最终端点。我观察到的是,一旦所有中间端点都消耗了它们的消息,每个配置都会同时进入。
所以我的问题是为什么我会看到这种假脱机行为?我希望我的消息在它们准备好时就被发布,这种行为不是有意的。
端点 1 - 网络设备备份请求的发送者:
EventMachine.add_timer( 0 ) do
YAML::load_file( 'hosts.yaml' ).each do |fqdn,ip|
payload = { :fqdn => fqdn }
exchange.publish( payload.to_json, :routing_key => 'net.svc.ssh' )
end
end
端点 2 - ssh 节点
mq_queue.subscribe( :ack => false ) do |meta, mq_payload|
payload = JSON.parse( mq_payload)
# SSH stuff happens here
mq_channel.default_exchange.publish( payload.to_json,
:routing_key => 'net.svc.savefile',
)
end
end
端点 3 - 将设备配置保存到文件的节点
queue.subscribe( ) do |meta, mq_payload|
payload = JSON.parse( mq_payload )
payload[ "host" ].each do |host,hash|
File.open( File.join('backups', host), "w" ) do |file|
result = hash[ "result" ]
if( result[ "error" ].nil? )
file << result[ "show running-config" ].join( "\n" )
else
file << result[ "error" ]
end
end
end
end
【问题讨论】: