【问题标题】:Jms component transacted and camel route transactedJms组件事务处理和骆驼路由事务处理
【发布时间】:2020-03-11 03:30:49
【问题描述】:

看完Camel In Action这本书后,我遇到了以下疑惑。

我有以下 2 条路线

A.
from("file:/home/src") //(A.1) .transacted("required") //(A.2) .bean("dbReader", "readFromDB()") //(A.3) only read from DB .bean("dbReader", "readFromDB()") //(A.4) only read from DB .to("jms:queue:DEST_QUEUE") //(A.5)

问题:
互诫协会这里真的需要(A.2)中的交易吗?

A.b.如果对#a 的回答是肯定的,那么“必需”策略的关联事务管理器应该是什么?应该是 JmsTransactionManager 还是 JpaTransactionManager ?

交流电由于DEST_QUEUE在生产者端,那么(A.5)中的JMS组件是否需要事务处理?

B. from("jms:queue:SRC_QUEUE") //(B.1) transactional jms endpoint .transacted("required") //(B.2) .bean("someBean", "someMethod()") //(B.3) simple arithmetic computation .to("jms1:queue:DEST_QUEUE") //(B.4)

SRC_QUEUE和DEST_QUEUE是不同jms broker的队列

问题:

文学学士(B.1) 中的 JMS 组件被标记为已事务处理,那么在这种情况下,是否需要按照 (B.2) 中的说明对路由进行事务处理?

B.b.由于DEST_QUEUE在生产者端,那么(B.4)中的JMS组件是否需要进行事务处理?

【问题讨论】:

    标签: apache-camel spring-transactions spring-camel


    【解决方案1】:

    关于 Camel 事务处理的非常好的问题。

    一般说明:当谈到 Camel 事务时,它的意思是使用来自具有事务能力的系统(如数据库或 JMS 代理)进行的事务。路由中的transacted 语句必须紧跟from 语句,因为它总是与消耗相关

    A.a.这里真的需要(A.2)中的交易吗?

    不,不是。由于文件系统不支持事务,它在这条路线上没有任何帮助。

    A.b.如果#a 的答案是肯定的,那么...?

    没有“文件系统事务管理器”

    交流电由于DEST_QUEUE在生产者端,那么(A.5)中的JMS组件是否需要事务处理?

    不确定,但我不这么认为。生产者尝试将消息移交给代理。 事务用于启用回滚,但如果代理没有收到数据,回滚可以做什么?

    文学学士(B.1)中的JMS组件被标记为transacted,那么在这种情况下是否需要像(B.2)中提到的那样对路由进行transacted?

    这取决于因为 SRC 和 DEST 位于不同的代理上

    • 如果您希望在代理之间进行端到端事务,则需要使用 XA 事务管理器,然后必须将路由标记为 transacted
    • 如果您对 consumer transaction 没问题,您可以 configure the JMS component for it 并省略 Spring Tx 管理器和 Camel transacted 语句。

    澄清最后一点:如果您使用本地代理事务进行消费,在成功处理路由之前,Camel 不会提交消息。因此,如果发生任何错误,则会发生回滚并重新传递消息。

    在大多数情况下,这完全没问题,但是,两个不同的代理仍然可能发生的情况是路由已成功处理,消息已传递到 DEST 代理,但 Camel 不再能够向 SRC 代理提交。然后发生重新传递,再处理一次路由,消息被多次传递给 DEST 代理

    在我看来,XA 交易的复杂性比本地代理交易的非常罕见的边缘情况更难处理。但这是一个非常主观的意见,可能还取决于您使用的上下文或数据。

    需要注意的是:如果 SRC 和 DEST 代理相同,则本地代理事务是 100% 足够的! 完全不需要 Spring Tx 管理器和 Camel transacted

    B.b.由于DEST_QUEUE在生产者端,那么(B.4)中的JMS组件是否需要进行事务处理?

    与 B.a. 的答案相同。

    【讨论】:

      【解决方案2】:

      下午好,

      我想花一点时间来回答您的问题。我将解决“B”方面的问题。

      WRT:

      文学学士(B.1)中的JMS组件被标记为transacted,那么在这种情况下是否需要像(B.2)中提到的那样对路由进行transacted?

      是的。源组件和目标组件都需要标记为已处理。将组件标记为已处理将在源会话和目标会话上启动本地 JMS 事务。请注意,这是两个独立的本地 JMS 事务,由两个独立的 JmsTransactionManager 管理。

      将路由标记为“已处理”将启动 JTA 事务上下文。请注意,PlatformTransactionManager 必须是 JtaTransactionManager。当调用 'to' 组件时,消息发送的本地 JMS 事务将与消息获取的本地事务同步。 (JTA 同步事务)。这意味着当远程代理确认发送的提交时,发送将获得回调。此时,消息接收将被提交。这是“dups OK”事务行为(不是 XA)。您有一个消息已发送的窗口,但尚未确认接收。

      实际上让这个工作起来很棘手。这是一个示例:

      <!-- ******************** Camel route definition ********************* -->
      <camelContext allowUseOriginalMessage="false"
          id="camelContext-Bridge-Local" streamCache="true" trace="true" xmlns="http://camel.apache.org/schema/blueprint">
          <route id="amq-to-amq">
              <from id="from" uri="amqLoc:queue:IN"/>
              <transacted id="trans"/>
              <to id="to" uri="amqRem:queue:OUT"/>
          </route>
      </camelContext>
      <!-- ********************* Local AMQ configuration ************************** -->
      <bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amqLoc">
          <property name="configuration">
              <bean class="org.apache.camel.component.jms.JmsConfiguration">
                  <property name="connectionFactory" ref="AmqCFLocalPool"/>
                  <property name="receiveTimeout" value="100000"/>
                  <property name="maxConcurrentConsumers" value="3"/>
                  <property name="cacheLevelName" value="CACHE_NONE"/>
                  <property name="transacted" value="true"/>
              </bean>
          </property>
      </bean>
      <bean class="org.apache.activemq.jms.pool.PooledConnectionFactory" id="AmqCFLocalPool">
          <property name="maxConnections" value="1"/>
          <property name="idleTimeout" value="0"/>
          <property name="connectionFactory" ref="AmqCFLocal"/>
      </bean>
      <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AmqCFLocal">
          <property name="brokerURL" value="tcp://10.0.0.170:61616?jms.prefetchPolicy.all=0"/>
          <property name="userName" value="admin"/>
          <property name="password" value="admin"/>
      </bean>
      <!-- ********************* Remote AMQ configuration ************************** -->
      <bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amqRem">
          <property name="configuration">
              <bean class="org.apache.camel.component.jms.JmsConfiguration">
                  <property name="connectionFactory" ref="AmqCFRemotePool"/>
                  <property name="transacted" value="true"/>
              </bean>
          </property>
      </bean>
      <bean class="org.apache.activemq.jms.pool.PooledConnectionFactory"
          destroy-method="stop" id="AmqCFRemotePool" init-method="start">
          <property name="maxConnections" value="1"/>
          <property name="idleTimeout" value="0"/>
          <property name="connectionFactory" ref="AmqCFRemote"/>
      </bean>
      <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AmqCFRemote">
          <property name="brokerURL" value="tcp://10.0.0.171:61616"/>
          <property name="userName" value="admin"/>
          <property name="password" value="admin"/>
      </bean>
      

      为 org.springframework.jms.connection.JmsTransactionManager 启用 DEBUG 日志记录,并为您正在使用的 JTA 事务管理器启用 DEBUG/TRACE 级别的日志记录。

      【讨论】:

        猜你喜欢
        • 2012-09-09
        • 1970-01-01
        • 2013-05-23
        • 1970-01-01
        • 2018-08-15
        • 2014-02-11
        • 1970-01-01
        • 2013-07-10
        • 1970-01-01
        相关资源
        最近更新 更多