【发布时间】:2017-05-29 12:26:18
【问题描述】:
我正在尝试将消息写入 JMS 队列,我将在下一步中取出该队列以写入数据库。第一部分应该是同步的,第二部分是异步的。 JMS 部分真的很慢(1 分钟内 1100 个项目进入队列)。
这就是我的工作的样子。
@Bean
public Job multiThreadedStepJob() {
Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end();
Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end();
Flow splitFlow = new FlowBuilder<Flow>("splitflow")
.split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build();
return jobBuilders.get("multiThreadedStepJob")
.start(splitFlow).end().build();
}
第一步:
@Bean
public Step step() {
return stepBuilders.get("step")
.<OrderDTO, OrderDTO>chunk(CHUNK_SIZE)
.reader(reader())
.writer(writer())
.build();
}
第二步:
@Bean
public Step step2() {
return stepBuilders.get("step2")
.<OrderDTO, OrderDTO>chunk(100)
.reader(reader2())
.writer(writer2())
.build();
}
我认为我的错误在step的writer和step2的reader内部,因为我可以同时运行另一个reader和writer,我没有问题。
@Bean
public JmsItemWriter<OrderDTO> writer() {
JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>();
itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
return itemWriter;
}
@Bean
public JmsItemReader<OrderDTO> reader2() {
JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>();
itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
itemReader.setItemType(OrderDTO.class);
return itemReader;
}
他们使用相同的 JmsTemplate 连接到队列:
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(500);
return jmsTemplate;
}
@Bean
public Queue queue() {
return new ActiveMQQueue("orderList");
}
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
factory.setTrustAllPackages(true);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(30);
factory.setPrefetchPolicy(prefetchPolicy);
PooledConnectionFactory pool = new PooledConnectionFactory(factory);
pool.setMaxConnections(10);
pool.setMaximumActiveSessionPerConnection(10);
pool.isCreateConnectionOnStartup();
return pool;
}
我使用的其余配置是来自@EnableBatchProcessing 的配置。有谁知道为什么进展如此缓慢?
【问题讨论】:
标签: java spring activemq spring-batch jmstemplate