【问题标题】:How can I synchronize two or more Spring contexts with RabbitMQ? Broadcasting messages/events?如何使用 RabbitMQ 同步两个或多个 Spring 上下文?广播消息/事件?
【发布时间】:2013-03-20 03:30:03
【问题描述】:

我目前正在评估如何管理多个 Spring 应用程序上下文(即 Tomcat 集群)之间的事件同步的一些想法。

在那个用例中,每个上下文都可以产生一个事件 X(不管是 Spring 上下文事件还是自制的),必须将其广播到所有其他上下文实例。该事件仅在它的“生命周期”中有效,这意味着我不想以任何方式持久化它(因为它们很多而且它们的状态将在几分钟后过时)。

我的想法是使用已经存在的 RabbitMQ 实例。但是,标准的生产者/消费者模式不适合,因为事件应该广播给所有消费者。每个消费者都是生产者……就像聊天室一样。

Q #1:这可以通过 RabbitMQ(+Spring 集成)实现吗?如何构建这样的广播消息设置?

Q #2:这是否可能?有没有人更好的解决方案/想法?

用例:每个 Web 应用程序上下文都可以产生诸如“用户 x 邀请用户 y”之类的事件,这些事件应该通过 Websocket 或 EventSource 或其他方式尽快传输到用户的浏览器。因为这是(已经)运行的请求,所以操作的地方(服务器 1)可能不是消费的地方(服务器 2)。

解决方案的主要目标是:

  1. (相对)快速且可扩展。
  2. 开火即忘。如果消息已发送,则将其销毁。如果消息无关紧要,请忘记它。没有坚持。在 RabbitMQ 中,可以使用 TTL 来实现。
  3. 消息正在针对模型进行序列化,即使用 Jackson 或类似的模型。
  4. 没有基于(活动)上下文/节点数量的手动配置。如果我要添加额外的上下文(Web 服务器、后端进程等),我不想修改同步设置。

更新 1

看了Gary Russell的答案,玩了一下。

    <beans profile="rabbit">
    <rabbit:connection-factory id="connectionFactory" channel-cache-size="10" host="${rabbitmq.host}"
                               port="${rabbitmq.port}" username="${rabbitmq.username}"
                               password="${rabbitmq.password}" virtual-host="${rabbitmq.virtualhost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue id="eventQueue" name="${rabbitmq.queue.springevents}" auto-delete="false" durable="true"></rabbit:queue>

    <bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="exchange" value="${rabbitmq.springevents.exchange.fanout}"/>
        <property name="replyTimeout" value="${rabbitmq.replyTimeout}"/>
    </bean>

    <!-- Receiving -->
    <int-amqp:inbound-channel-adapter connection-factory="connectionFactory" channel="mqEventInChannelJson"
                                      queue-names="${rabbitmq.queue.springevents}"/>

    <!-- Sending -->
    <int-amqp:outbound-channel-adapter channel="mqEventOutChannelJson" amqp-template="amqpTemplate" routing-key=""
                                       exchange-name="${rabbitmq.springevents.exchange.fanout}" />
</beans>

频道 mqEventOutChannelJson -> 兔子交换 (amqp.fanout) ->

如果我使用这种配置,多个并行启动将跳过事件,因为所有正在运行的进程都在同一个队列上运行(rabbitmq.queue.springevents)。是否有可能在不为每个节点提供不同配置的情况下创建自定义队列名称?

我已经使用单独的虚拟主机和交换器 amqp.fanout 对其进行了测试。与特定的 Fanout 交换相同。

更新 2

为确保每个消费者都有自己的队列,我为每个消费者创建了一个唯一的应用程序 ID。

bean 应用程序 本身创建了一个唯一标识符:

@Component("application")
public class Application {

private String id;

@PostConstruct
public void initialize() {
    id = "app" + Math.round(1000 * Math.random());
}

public String getId() {
    return id;
}

}

鉴于此,我可以在运行中创建一个独特的队列,该队列在公共交换中注册。无需外部配置步骤。

    <util:property-path id="applicationId" path="application.id" />
    <rabbit:queue id="eventQueue" name="${rabbitmq.queue.springevents}_#{applicationId}" auto-delete="true" durable="true" exclusive="true">
        <rabbit:queue-arguments>
            <!-- Attention if you want to declare mixed value types: https://jira.springsource.org/browse/AMQP-198 -->
            <entry key="x-message-ttl">
                <value type="java.lang.Long">${rabbitmq.queue.ttl}</value>
            </entry>
        </rabbit:queue-arguments>
    </rabbit:queue>

【问题讨论】:

  • 如果您使用 CometD、Atmosphere 或一些花哨的 AJAX(可能是 Spring 的 DeferedResult 或其他东西),您可以“订阅”客户端(浏览器)到某个主题,然后让客户端确定他们是否关心关于特定消息与否。我知道 Atmosphere 有一个 JMS 插件。

标签: spring events rabbitmq spring-integration


【解决方案1】:

您可以使用RabbitMQ topic exchangefanout exchange 并让每个消费者绑定一个队列。 Spring AMQP 和 Spring Integration 完全支持。

【讨论】:

  • 是的,这就是我想要的感觉和声音。只是忘记了交换的事情。我稍微更新了我的问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-06-05
  • 2013-05-25
  • 2020-08-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-11-25
相关资源
最近更新 更多