【问题标题】:Spring batch - Could not commit JDBC transaction; nested exception is java.sql.SQLTransactionRollbackExceptionSpring 批处理 - 无法提交 JDBC 事务;嵌套异常是 java.sql.SQLTransactionRollbackException
【发布时间】:2021-06-23 16:08:33
【问题描述】:

我正在使用带有 mysql 5.7.24 的 spring 云数据流 2.7.1,它执行与作业相关的任务。使用的架构是 Externalizing Batch Process Execution - 远程分区 (manger,worker) 每个分区都是从 rabbitmq 读取的,并且每个 worker 连接了几个消费者(目前我们使用 1 个 worker 和 2 个消费者),有时会有分区失败有死锁

org.springframework.transaction.TransactionSystemException: Could not commit JDBC transaction; nested exception is java.sql.SQLTransactionRollbackException: (conn=2542684) Deadlock found when trying to get lock; try restarting transaction
    at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:334) ~[spring-jdbc-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:744) ~[spring-tx-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:712) ~[spring-tx-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at jdk.internal.reflect.GeneratedMethodAccessor407.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at com.sun.proxy.$Proxy95.commit(Unknown Source) ~[na:na]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152) ~[spring-tx-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at org.springframework.batch.integration.partition.StepExecutionRequestHandler.handle(StepExecutionRequestHandler.java:64) ~[spring-batch-integration-4.2.1.RELEASE.jar!/:4.2.1.RELEASE]
    at jdk.internal.reflect.GeneratedMethodAccessor421.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1092) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:581) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:133) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:61) ~[spring-integration-amqp-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:266) ~[spring-integration-amqp-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:232) ~[spring-integration-amqp-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$795/000000003D931C00.invokeListener(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:958) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:908) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1279) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1185) ~[spring-rabbit-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:836) ~[na:na]
Caused by: java.sql.SQLTransactionRollbackException: (conn=2542684) Deadlock found when trying to get lock; try restarting transaction
    at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.get(ExceptionMapper.java:244) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.getException(ExceptionMapper.java:171) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.MariaDbStatement.executeExceptionEpilogue(MariaDbStatement.java:248) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.MariaDbStatement.executeInternal(MariaDbStatement.java:338) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.MariaDbStatement.execute(MariaDbStatement.java:389) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.MariaDbConnection.commit(MariaDbConnection.java:755) ~[mariadb-java-client-2.4.4.jar!/:na]
    at com.zaxxer.hikari.pool.ProxyConnection.commit(ProxyConnection.java:366) ~[HikariCP-3.4.2.jar!/:na]
    at com.zaxxer.hikari.pool.HikariProxyConnection.commit(HikariProxyConnection.java) ~[HikariCP-3.4.2.jar!/:na]
    at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:331) ~[spring-jdbc-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
    ... 76 common frames omitted
Caused by: java.sql.SQLException: Deadlock found when trying to get lock; try restarting transaction
    at org.mariadb.jdbc.internal.protocol.AbstractQueryProtocol.readErrorPacket(AbstractQueryProtocol.java:1594) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.internal.protocol.AbstractQueryProtocol.readPacket(AbstractQueryProtocol.java:1453) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.internal.protocol.AbstractQueryProtocol.getResult(AbstractQueryProtocol.java:1415) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.internal.protocol.AbstractQueryProtocol.executeQuery(AbstractQueryProtocol.java:228) ~[mariadb-java-client-2.4.4.jar!/:na]
    at org.mariadb.jdbc.MariaDbStatement.executeInternal(MariaDbStatement.java:332) ~[mariadb-java-client-2.4.4.jar!/:na]
    ... 81 common frames omitted

这里是作为 Spring Cloud 数据流中的任务执行的管理器

@Configuration
@EnableConfigurationProperties(ConsolidationBatchProperties.class)
public class ManagerConfiguration {

    private static final String GET_MAXIMO_ADDRESS_LOCATION;
    private static final String INSERT_ADDRESS_LOCATION;

    static {
        GET_MAXIMO_ADDRESS_LOCATION = ResourceReader.readAsString("classpath:sql/getMaximoAddressLocation.sql");
        INSERT_ADDRESS_LOCATION =  ResourceReader.readAsString("classpath:sql/insertAddressLocation.sql");
    }

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final ConsolidationBatchProperties consolidationBatchProperties;
    private final RemotePartitioningManagerStepBuilderFactory remotePartitioningManagerStepBuilderFactory;


    @Autowired
    public ManagerConfiguration(JobBuilderFactory jobBuilderFactory,
                                StepBuilderFactory stepBuilderFactory,
                                @Qualifier("appDataSource") DataSource dataSource,
                                ConsolidationBatchProperties consolidationBatchProperties,
                                RemotePartitioningManagerStepBuilderFactory remotePartitioningManagerStepBuilderFactory) {

        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;

        this.dataSource = dataSource;
        this.consolidationBatchProperties = consolidationBatchProperties;
        this.remotePartitioningManagerStepBuilderFactory = remotePartitioningManagerStepBuilderFactory;
    }

    /*
     * Configure outbound flow (requests going to workers)
     */

    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
        return IntegrationFlows.from(requests())
                .handle(Amqp.outboundAdapter(amqpTemplate)
                        .routingKey(consolidationBatchProperties.getQueueName()))
                .get();
    }

    @Bean
    public JobExecutionDeltaDao jobExecutionDeltaDao(final @Qualifier("batchDataSource") DataSource batchDataSource){

        JobExecutionDeltaDao jobExecutionDao = new JobExecutionDeltaDao();
        jobExecutionDao.setJdbcTemplate(new JdbcTemplate(batchDataSource));

        return jobExecutionDao;
    }

    @Bean
    public StartDateConsolidation startDateConsolidation(){
        return new StartDateConsolidation(this.dataSource);
    }

    @Bean
    public JobExecutionDeltaExplorer jobExecutionDeltaExplorer(JobExecutionDeltaDao jobExecutionDeltaDao,
                                                               StartDateConsolidation startDateConsolidation){

        return  new JobExecutionDeltaExplorer(jobExecutionDeltaDao, startDateConsolidation);
    }


    @Bean
    @StepScope
    public ColumnRangePartitioner partitioner(final @Qualifier("batchDataSource") DataSource batchDataSource) {

        ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
        columnRangePartitioner.setColumn("id");
        columnRangePartitioner.setDataSource(batchDataSource);
        columnRangePartitioner.setTable("GEOSITES_ADDRLOC");

        return columnRangePartitioner;
    }

    @Bean
    public JdbcCursorItemReader<AddressLocation> addressLocationReader(@Value("${customers:}") String[] customers, @Value("${mode:ALL}") String mode, @Value("${locations:}") String[] locations, JobExecutionDeltaExplorer jobExecutionDeltaExplorer) {

        AddressLocationSqlTransformer sqlTransformer = new AddressLocationSqlTransformer();
        sqlTransformer.setSql(GET_MAXIMO_ADDRESS_LOCATION);
        sqlTransformer.setCustomers(customers);
        sqlTransformer.setMode(mode);
        sqlTransformer.setLocations(locations);
        sqlTransformer.setJobExecutionDeltaExplorer(jobExecutionDeltaExplorer);

        return new JdbcCursorItemReaderBuilder<AddressLocation>()
                .name("addressLocationReader")
                .dataSource(this.dataSource)
                .fetchSize(1000)
                .sql(sqlTransformer.getTransformedSql())
                .rowMapper(new BeanPropertyRowMapper<>(AddressLocation.class))
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<AddressLocation> addressLocationItemWriter(final @Qualifier("batchDataSource") DataSource batchDataSource) {

        return new JdbcBatchItemWriterBuilder<AddressLocation>()
                .dataSource(batchDataSource)
                .sql(INSERT_ADDRESS_LOCATION)
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .build();

    }

    @Bean
    public Step createAliasAndIndexStep(CreateIndexAndAliasTasklet createIndexAndAliasTasklet){
        return stepBuilderFactory.get("createAliasAndIndexStep")
                .tasklet(createIndexAndAliasTasklet).build();
    }

    @Bean
    public Step migrateAddressLocationStep(final JdbcCursorItemReader<AddressLocation> addressLocationReader,
                                           final JdbcBatchItemWriter<AddressLocation> addressLocationItemWriter) {

        return stepBuilderFactory.get(MIGRATE_ADDR_LOC_STEP_NAME)
                .<AddressLocation, AddressLocation>chunk(1000)
                .reader(addressLocationReader)
                .writer(addressLocationItemWriter)
                .build();
    }

    @Bean
    public Step masterStep(ColumnRangePartitioner partitioner) {
          return remotePartitioningManagerStepBuilderFactory.get(MASTER_STEP_NAME)
                .partitioner(WORKER_STEP_NAME, partitioner)
                .gridSize(consolidationBatchProperties.getGridSize())
                .outputChannel(requests())
                .build();
    }

    @Bean
    public Job consolidateAddressFromMaximoJob(final Step createAliasAndIndexStep, final Step migrateAddressLocationStep, final Step masterStep) {

        return jobBuilderFactory.get(JOB_NAME)
                .incrementer(new RunIdIncrementer())
                .start(createAliasAndIndexStep)
                .next(migrateAddressLocationStep)
                .next(masterStep)
                .build();
    }
}

这里的worker在数据流的上下文之外运行,它一直在监听rabbitmq中的消息

@Configuration
@EnableConfigurationProperties(ConsolidationBatchProperties.class)
public class WorkerConfiguration {

    private static final Logger log = LoggerFactory.getLogger(WorkerConfiguration.class);

    private static final String GET_ADDRESS_LOCATION_WITHIN_RANGE;

    static {
        GET_ADDRESS_LOCATION_WITHIN_RANGE = ResourceReader.readAsString("classpath:sql/getAddressLocationWithinRange.sql");
    }

    private final RemotePartitioningWorkerStepBuilderFactory remotePartitioningWorkerStepBuilderFactory;
    private final DataSource dataSource;
    private final ConsolidationBatchProperties consolidationBatchProperties;

    @Autowired
    public WorkerConfiguration(
            RemotePartitioningWorkerStepBuilderFactory remotePartitioningWorkerStepBuilderFactory,
            @Qualifier("batchDataSource") DataSource dataSource,
            ConsolidationBatchProperties consolidationBatchProperties) {

        this.remotePartitioningWorkerStepBuilderFactory = remotePartitioningWorkerStepBuilderFactory;
        this.dataSource = dataSource;
        this.consolidationBatchProperties = consolidationBatchProperties;
    }

    /*
     * Configure inbound flow (requests coming from the master)
     */
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(SimpleMessageListenerContainer listenerContainer) {
        return IntegrationFlows
                .from(Amqp.inboundAdapter(listenerContainer))
                .channel(requests())
                .get();
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container =
                new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(consolidationBatchProperties.getQueueName());
        container.setConcurrentConsumers(consolidationBatchProperties.getConsumers());

        return container;
    }

    /*
     * Configure outbound flow (replies going to the master)
     */
    @Bean
    public NullChannel replies() {
        return new NullChannel();
    }

    /*
     * Configure worker components
     */

    @Bean
    public GeographicSiteProcessor geographicSiteProcessor(GeographicSiteApiClient GeographicSiteApiClient, GeographicAddressApiClient geographicAddressApiClient){
        return new GeographicSiteProcessor(geographicAddressApiClient, GeographicSiteApiClient);
    }

    @Bean
    public ElasticObjectItemWriter elasticObjectItemWriter(RestHighLevelClient client, ObjectMapper objectMapper, ElasticIndexProperties elasticIndexProperties){

        ElasticObjectItemWriter elasticObjectItemWriter = new ElasticObjectItemWriter();
        elasticObjectItemWriter.setClient(client);
        elasticObjectItemWriter.setObjectMapper(objectMapper);
        elasticObjectItemWriter.setIndexName(elasticIndexProperties.getWritingIndexName());
        elasticObjectItemWriter.setType(elasticIndexProperties.getType());

        return elasticObjectItemWriter;

    }

    @Bean
    @StepScope
    public JdbcCursorItemReader<AddressLocation> itemReader(
            @Value("#{stepExecutionContext['minValue']}")Long minValue,
            @Value("#{stepExecutionContext['maxValue']}")Long maxValue) {

        log.info("reading " + minValue + " to " + maxValue);
        RangeSetter rangeSetter = new RangeSetter();
        rangeSetter.setStartValue(minValue);
        rangeSetter.setEndValue(maxValue);

        return new JdbcCursorItemReaderBuilder<AddressLocation>()
                .name("itemReader")
                .dataSource(this.dataSource)
                .sql(GET_ADDRESS_LOCATION_WITHIN_RANGE)
                .preparedStatementSetter(rangeSetter)
                .rowMapper(new BeanPropertyRowMapper<>(AddressLocation.class))
                .build();
    }

    @Bean
    public Step workerStep(JdbcCursorItemReader<AddressLocation> itemReader,
                           GeographicSiteProcessor geographicSiteProcessor,
                           ElasticObjectItemWriter elasticObjectItemWriter) {

        return remotePartitioningWorkerStepBuilderFactory.get("workerStep")
                .inputChannel(requests())
                .<AddressLocation, ElasticGeographicSite>chunk(consolidationBatchProperties.getChunkSize())
                .reader(itemReader)
                .processor(geographicSiteProcessor)
                .writer(elasticObjectItemWriter)
                .faultTolerant()
                .retryLimit(5)
                .retry(MicroserviceException.class)
                .retry(TransactionSystemException.class)
                .skip(NullPointerException.class)
                .skipLimit(10)
                .listener(new LoggingSkipListener<AddressLocation, ElasticObject>())
                .build();
    }

}

你有什么办法解决这个问题吗?

【问题讨论】:

  • 您的堆栈跟踪被截断。请提供完整的堆栈跟踪,以便能够查看问题的根本原因。此外,请分享您的代码以了解您的设置并能够以有效的方式为您提供帮助,请参阅stackoverflow.com/help/minimal-reproducible-example
  • 我放了新的stacktrace,我忘了告诉你我正在使用mariadb客户端
  • 感谢您的更新。但是,正如我在之前的评论中所说,请分享一个重现该问题的最小示例,以便为您提供帮助。
  • 死锁错误总是发生在单个分区上
  • worker端的事务属性是什么?我怀疑问题出在每个工人有多个消费者。您是否尝试过每个工人只有一个消费者?

标签: spring-batch spring-cloud-dataflow spring-cloud-task


【解决方案1】:

死锁发生在工作人员端,我怀疑这是因为两个(或更多)消息消费者在资源上竞争。

每个工人使用一个消费者是否解决了问题?

我们已经用 1 个单一消费者进行了测试,我们没有遇到死锁问题,但我的问题是扩展,所以我们应该使用多个 pod worker 和 1 个单一消费者,这个公式可以避免死锁吗?

我不确定工人是否应该拥有多个消费者。远程分区的想法是每个分区都由一个工作人员处理。现在,如何在工作人员端处理此分区取决于您。如果您想在 Worker 端添加另一个级别的扩展,您可以尝试多线程步骤或本地分区步骤,而不是使用多个使用者。

【讨论】:

    猜你喜欢
    • 2013-01-09
    • 2020-05-11
    • 2021-08-26
    • 2021-09-02
    • 2016-07-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-27
    相关资源
    最近更新 更多