【问题标题】:Spring Integration jdbc:inbound-channel-adapter - Set max-rows-per-poll dynamic to throttleSpring Integration jdbc:inbound-channel-adapter - 将 max-rows-per-poll 动态设置为节流
【发布时间】:2013-12-10 17:25:02
【问题描述】:

我有一个 JDBC:inbound-channel-adapter :设置 'max-rows-per-poll' 动态以限制在通道上传递的消息。

我有一个容量为 200 的 QueueChannel。入站通道适配器会将消息发送到此 QueueChannel。我想根据 QueueChannel 的 RemainingCapacity 设置 'max-rows-per-poll' 值。

为此,我尝试在 Bean 中注入 QueueChannel,但在部署 war 文件时出现错误。

错误:由于 StateConversionError 无法注入 QueueChannel。

有没有其他方法可以实现这一点。

更新:我正在使用 Spring-Integration-2.2.0.RC2

这是 jdbc-inbound-adapter 的配置:

<si-jdbc:inbound-channel-adapter id ="jdbcInboundAdapter" channel="queueChannel" data-source="myDataSource" auto-startup="true" query="${select.query}"
update="${update.query}" max-rows-per-poll="100"  row-mapper="rowMapper" update-per-row="true">
 <si:poller fixed-rate="5000">
    <si:transactional/>
     <si:advice-chain>
           <bean class="foo.bar.InboundAdapterPollingConfigurationImpl"/>
     </si:advice-chain>
  </si:poller>
</si-jdbc:inbound-channel-adapter>

豆子:

    @Service
public class InboundAdapterPollingConfigurationImpl implements InboundAdapterPollingConfiguration{

    private static final Logger logger = LoggerFactory.getLogger(InboundAdapterPollingConfigurationImpl.class);

    @Autowired
    QueueChannel queueChannel;
    @Autowired
    SourcePollingChannelAdapter jdbcInboundAdapter;

    public void setJdbcInboundAdapterMaxRowsPerPoll(){
        String size = String.valueOf(queueChannel.getRemainingCapacity());
        DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(jdbcInboundAdapter);      
        directFieldAccessor.setPropertyValue("setMaxRowsPerPoll", size);        
        String maxRowsPerPollSize = (String)directFieldAccessor.getPropertyValue("setMaxRowsPerPoll");
        System.out.println(maxRowsPerPollSize);
    }
}

问题是如何从建议链中调用 InboundAdapterPollingConfigurationImpl.setJdbcInboundAdapterMaxRowsPerPoll() 方法。抱歉这个天真的问题,但这是我第一次使用建议链。我也在寻找一个例子,但还不走运。

更新 2: 执行时出现以下错误:

JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter)dfa.getPropertyValue("source"); 

错误:

 java.lang.ClassCastException: $Proxy547 cannot be cast to org.springframework.integration.jdbc.JdbcPollingChannelAdapter – 

我有 JDK1.6_26。我在其中一篇文章中读到,这发生在 JDK1.6 的早期版本中。

【问题讨论】:

    标签: spring-integration


    【解决方案1】:

    好吧,让我们试着调查一下!

    1. max-rows-per-pollJdbcPollingChannelAdapter 的 volatile 属性,带有适当的设置器。
    2. JdbcPollingChannelAdapter 仅在TaskScheduler.schedule() 的倡议下在其receive() 方法中所做的事情而言,看起来在运行时更改该属性是安全的。这是我们任务的第一点
    3. QueueChannel 具有属性 getQueueSize()。至于 capacity 是您的配置选项,因此您可以简单地计算 max-rows-per-poll 的值
    4. 现在如何让它工作?实际上,您只对每次投票中的max-rows-per-poll 的值感兴趣。所以,我们应该以某种方式进入轮询或轮询任务。好吧,&lt;poller&gt;advice-chain 子元素,我们可以写一些Advice,在调用receive() 之前应该改变JdbcPollingChannelAdapter#setMaxRowsPerPoll 并且值应该基于QueueChannel#getQueueSize()
    5. QueueChannel 注入Advice 的bean
    6. 现在有些不好的地方:如何注入JdbcPollingChannelAdapter bean?仅在 Spring Integration 3.0 之后,我们才提供了一个挂钩来将 MessageSources 注册为 bean。从这里写下这段代码就足够了:

      @自动连线 @Qualifier("jdbcAdapter.source") private JdbcPollingChannelAdapter messageSource;

    我们将在本周发布 3.0.GA。所以,让我不考虑 Sring Integration 3.0 之前的反射“森林”。但是,您可以在注入的 SourcePollingChannelAdapter bean 上使用 DirectFieldAccessor

    更新

    您的Advice 可能如下所示:

    public class MyAdvice implements MethodInterceptor {
           @Autowired
           QueueChannel queueChannel;
    
           @Autowired
           SourcePollingChannelAdapter jdbcInboundAdapter; 
    
          Object invoke(MethodInvocation invocation) throws Throwable {
                DirectFieldAccessor dfa = new DirectFieldAccessor(jdbcInboundAdapter);
                JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter) dfa.getPropertyValue("source");
                source.setMaxRowsPerPoll(queueChannel.getRemainingCapacity());
                return invocation.proceed();
          }
    }
    

    理论在这里:http://docs.spring.io/spring/docs/3.2.5.RELEASE/spring-framework-reference/htmlsingle/#aop

    【讨论】:

    • 我正在努力实现它。一个快速的问题:如何从引用 bean 的建议链中调用特定方法。
    • 不确定你的意思。能不能举个例子,或者换个说法。谢谢
    • 我为你添加了一个实现。干杯!
    • 执行时出现此错误:JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter) dfa.getPropertyValue("source"); : java.lang.ClassCastException: $Proxy547 不能转换为 org.springframework.integration.jdbc.JdbcPollingChannelAdapter
    • 首先你应该在&lt;advice-chain&gt;中配置&lt;transactional&gt;&lt;tx:advice&gt;。不清楚为什么是`$Proxy547. I tested it locally and everyrthin OK. Do you have some specific global AOP configuration? E.g. JMX exposing? Try to find who wraps your JdbcPollingChannelAdapter` to Proxy
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-28
    • 2013-07-21
    • 1970-01-01
    相关资源
    最近更新 更多