【问题标题】:spring batch write to ActiveMQ春季批量写入ActiveMQ
【发布时间】:2017-05-29 12:26:18
【问题描述】:

我正在尝试将消息写入 JMS 队列,我将在下一步中取出该队列以写入数据库。第一部分应该是同步的,第二部分是异步的。 JMS 部分真的很慢(1 分钟内 1100 个项目进入队列)。

这就是我的工作的样子。

@Bean
public Job multiThreadedStepJob() { 
   Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end(); 
   Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end();
   Flow splitFlow = new FlowBuilder<Flow>("splitflow")
   .split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build();

   return jobBuilders.get("multiThreadedStepJob")
                          .start(splitFlow).end().build();

}

第一步:

@Bean
public Step step() {
     return stepBuilders.get("step")
         .<OrderDTO, OrderDTO>chunk(CHUNK_SIZE)
         .reader(reader())
         .writer(writer())   
         .build();
}

第二步:

@Bean
public Step step2() {
    return stepBuilders.get("step2")
            .<OrderDTO, OrderDTO>chunk(100)
            .reader(reader2())
            .writer(writer2())
            .build();
}

我认为我的错误在step的writer和step2的reader内部,因为我可以同时运行另一个reader和writer,我没有问题。

@Bean
public JmsItemWriter<OrderDTO> writer() {
    JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>();
    itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
    return itemWriter;
}

@Bean
public JmsItemReader<OrderDTO> reader2() {
    JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>();
    itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
    itemReader.setItemType(OrderDTO.class);
    return itemReader;
}

他们使用相同的 JmsTemplate 连接到队列:

@Bean
public JmsTemplate jmsTemplate() {
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
    jmsTemplate.setDefaultDestination(queue());
    jmsTemplate.setReceiveTimeout(500);
    return jmsTemplate;
}

@Bean
public Queue queue() {
    return new ActiveMQQueue("orderList");
}

@Bean
public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
    factory.setTrustAllPackages(true);

    ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
    prefetchPolicy.setQueuePrefetch(30);

    factory.setPrefetchPolicy(prefetchPolicy);

    PooledConnectionFactory pool = new PooledConnectionFactory(factory);
    pool.setMaxConnections(10);
    pool.setMaximumActiveSessionPerConnection(10);
    pool.isCreateConnectionOnStartup();

    return pool;
}

我使用的其余配置是来自@EnableBatchProcessing 的配置。有谁知道为什么进展如此缓慢?

【问题讨论】:

    标签: java spring activemq spring-batch jmstemplate


    【解决方案1】:

    显然 jmsTemplate.setSessionTransacted(true);真的很重要。这大大加快了 JMS 队列的写入和读取速度。出于某种原因,我认为默认值是正确的,因为我正在处理批处理。

    无论如何,如果其他人有这个问题,请先检查一下,因为它很容易忘记。

    【讨论】:

      猜你喜欢
      • 2020-01-25
      • 2019-03-26
      • 2018-02-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-01-21
      • 2017-10-06
      • 2018-02-08
      相关资源
      最近更新 更多