【问题标题】:Using Mule instances in a master/slave or failover relationship在主/从或故障转移关系中使用 Mule 实例
【发布时间】:2014-06-21 21:46:13
【问题描述】:

我有两个 Mule 实例订阅了队列中的同一个主题,但我只希望每条消息消费一次。

我可以通过将消息汇集到唯一队列并从那里处理来实现这一点,但为了降低操作复杂性,我想设置在每个 Mule 实例上运行的消息消费者流以推迟到其中一个实例。

这类似于 ActiveMQ 故障转移设置(一次只有一个实例在运行,空闲实例仅在运行的实例无法响应时唤醒)或我将授予其中一个实例命令的主/从安排超过其他人。或者像实例间而不是实例内的 VM 传输。

这需要在没有任何 Mule 企业版组件(仅依赖于 Mule 社区版组件)的情况下使用 Mule 3.4 版来完成。或 3.5。

【问题讨论】:

    标签: mule


    【解决方案1】:

    我无法找到一种方便的内置方法来执行此操作。相反,我假设每个 mule 实例将在单独的盒子上运行,并使用 server.host 值来确定哪个实例进行处理:

    <mule xmlns:redis="http://www.mulesoft.org/schema/mule/redis" 
        xmlns="http://www.mulesoft.org/schema/mule/core"  
        version="CE-3.5.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.mulesoft.org/schema/mule/redis http://www.mulesoft.org/schema/mule/redis/3.4/mule-redis.xsd
    http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">
    
        <!-- define redis instance -->
        <redis:config name="redis-instance" />
    
        <flow name="topicConsumer">
            <!-- listen to redis channel (topic) -->
            <redis:subscribe config-ref="redis-instance">
                <redis:channels>
                    <redis:channel>topic.channel</redis:channel>
                </redis:channels>
            </redis:subscribe>
    
            <!-- save original payload (message from Redis) -->
            <set-session-variable variableName="redisPayload" value="#[payload]" />
    
            <!-- select processor -->
            <flow-ref name="topicProcessorSelector"/>
    
            <choice>
                <when expression="#[sessionVars['subscriberProcessor'] == server.host]">
                    <logger level="INFO" message="processing on #[server.host]"/>
                </when>
                <otherwise>
                    <logger level="INFO" message="take no action"/>
                </otherwise>
            </choice>
        </flow>
    
        <flow name="topicProcessorSelector" processingStrategy="synchronous">
            <!-- get key -->
            <redis:get config-ref="redis-instance" 
                key="topic_processor"/>
    
            <!-- if no key, then add this instance as the processor -->
            <choice>
                <when expression="#[payload instanceof org.mule.transport.NullPayload]">
                    <!-- set key -->
                    <redis:set config-ref="redis-instance" 
                        key="topic_processor"
                        expire="10"
                        value="#[server.host]">
                    </redis:set>
                    <set-session-variable variableName="subscriberProcessor" value="#[server.host]" />
                </when>
                <otherwise>
                    <!-- use existing key -->
                    <byte-array-to-string-transformer/>
                    <set-session-variable variableName="subscriberProcessor" value="#[payload]" />
                </otherwise>
            </choice>
        </flow>
    </mule>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-03-18
      • 1970-01-01
      • 1970-01-01
      • 2012-05-08
      • 1970-01-01
      • 2019-08-08
      相关资源
      最近更新 更多