【问题标题】:Copy header tag in xml spring batch application在xml spring批处理应用程序中复制标题标签
【发布时间】:2021-06-09 17:34:23
【问题描述】:

我在spring-boot 应用程序中使用spring-batch。 Spring Boot 版本为2.3.3.RELEASE

我打算实现的目标

我必须read a xml file 包含数千个Transactionsheader tag(文件信息)。对事务执行一些业务逻辑,然后使用事务中的更新值将文件写回。我使用StaxEventItemReader 读取文件,使用StaxEventItemWriter 写入文件。然后我有几个ItemProcessors 用于处理业务逻辑。 XML 文件看起来像:

<?xml version="1.0" encoding="UTF-8"?>
<reportFile>
   <fileInformation>
      <sender>200GH7XZ60</sender>
      <timestamp>2020-12-23T09:05:34Z</timestamp>
      <environment>PRO</environment>
      <version>001.60</version>
   </fileInformation>
   <record>
      <transaction>
         <buyer><buyer/>
      </transaction>
      <transaction>
         <buyer><buyer/>
      </transaction>
      <transaction>
         <buyer><buyer/>
      </transaction>
   </record>
</reportFile>

我面临的问题是标题标签的值。

我已经配置了OmegaXmlHeaderCallBack,它会生成所需的标头标签,但这些标签中的值应该从输入文件中复制。据我所知,StaxWriterCallback 在读取器、处理器和写入器之前初始化。所以我无法使用late binding 注入值。 这看起来像是一项基本要求,但在 stackoverflow 上找不到任何解决方案。

这里是配置spring批处理作业的代码sn-ps。

@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
PIExtractorItemProcessor pIExtractorItemProcessor;

@Autowired
JobBuilderFactory jobBuilderFactory;
 
@Autowired
StepBuilderFactory stepBuilderFactory;

@Value( "${eugateway.batch.chunk.size}" )
private int chunkSize;

@Bean
public Step jobStep(ItemStreamReader<CustomHeaderTransactionXmlElement> reader,
        CompositeItemProcessor<CustomHeaderTransactionXmlElement, 
        ProcessorWriterDto> processor,
        CompositeItemWriter<ProcessorWriterDto> writer,
        EdsClientItemWriteListener<ProcessorWriterDto> writeListener, 
        StepBuilderFactory stepBuilderFactory) {
    return stepBuilderFactory.get("extractAndReplacePersonalDataStep")
            .<CustomHeaderTransactionXmlElement, ProcessorWriterDto>chunk(chunkSize)
            .reader(reader)
            .processor(processor)
            .listener(writeListener)
            .writer(writer)
            .build();
}

@Bean
public Job extractPersonalDataJob(Step jobStep, JobResultListener jobListener,
        JobBuilderFactory jobBuilderFactory) {
    return jobBuilderFactory.get("extractAndReplacePersonalDataJob")
            .incrementer(new RunIdIncrementer())
            .start(jobStep)
            .listener(jobListener)
            .build();
}

@Bean
@StepScope
public ItemStreamReader<CustomHeaderTransactionXmlElement> itemReader(@Value("#{jobParameters[file.path]}") String path) {
    Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
    transactionMarshaller.setClassesToBeBound (FileInformation.class, TransactionPositionReport.class);
    log.info("Generating StaxEventItemReader");

    return new StaxEventItemReaderBuilder<CustomHeaderTransactionXmlElement>()
            .name("headerTransaction")
            .resource(new FileSystemResource(new FileSystemResource(path)))
            .addFragmentRootElements("fileInformation", "transaction")
            .unmarshaller(transactionMarshaller)
            .build();
}

@Bean
@StepScope
OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version){
    return new OmegaXmlHeaderCallBack(sender, timestamp, environment, version);
}

@Bean
@StepScope
OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
    return new OmegaXmlFooterCallBack();
}

@StepScope
@Bean(name = "staxTransactionWriter")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    String exportFilePath = "C:\\Users\\sasharma\\Documents\\TO_BE_DELETED\\eugateway\\outputfile.xml";
    Resource exportFileResource = new FileSystemResource(exportFilePath);

    Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    marshaller.setSupportDtd(true);
    marshaller.setSupportJaxbElementClass(true);
    marshaller.setClassesToBeBound(TransactionPositionReport.class);

    return new StaxEventItemWriterBuilder<TransactionPositionReport>()
            .name("transactionWriter")
            .version("1.0")
            .resource(exportFileResource)
            .marshaller(marshaller)
            .rootTagName("reportFile")
            .headerCallback(getOmegaXmlHeaderCallBack(sender, timestamp, environment, version))
            .footerCallback(getOmegaXmlFooterCallBack())
            .shouldDeleteIfEmpty(true)
            .build();
}

@Bean
@StepScope
public PIExtractorItemProcessor extractItemProcessor() {
    log.info("Generating PIExtractorItemProcessor");
    return new PIExtractorItemProcessor();
}

@Bean
public PIRemoverItemProcessor removeItemProcessor() {
    log.info("Generating PIRemoverItemProcessor");
    return new PIRemoverItemProcessor();
}

@Bean
@StepScope
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> extractAndRemoveItemProcessor() {
    log.info("Generating CompositeItemProcessor");
    CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
    itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
    return itemProcessor;
}

@Bean
@StepScope
public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
    log.info("Generating EdsClientItemWriter");
    return new EdsClientItemWriter<>();
}

@Bean
@StepScope
public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    log.info("Generating OmegaXmlFileWriter");
    return new OmegaXmlFileWriter(staxTransactionItemWriter(sender, timestamp, environment, version));
}


@Bean
@StepScope
public CompositeItemWriter<ProcessorWriterDto> compositeItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    log.info("Generating CompositeItemWriter");
    CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
    compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(sender, timestamp, environment, version)));
    return compositeItemWriter;
 }  
}

下面是OmegaXmlHeaderCallBack 类。由于没有后期绑定,我总是在标题标签中得到空值。

@Slf4j
public class OmegaXmlHeaderCallBack implements StaxWriterCallback {
    private String sender;
    private String timestamp;
    private String environment;
    private String version;
    
    public OmegaXmlHeaderCallBack(String sender, String timestamp, String environment, String version) {
        super();
        this.sender = sender;
        this.timestamp = timestamp;
        this.environment = environment;
        this.version = version;
    }

    @Override
    public void write(XMLEventWriter writer) {
        XMLEventFactory factory = XMLEventFactory.newInstance();
        try {
            writer.add(factory.createStartElement("", "", "fileInformation"));

            writer.add(factory.createStartElement("", "", "sender"));
            writer.add(factory.createCharacters(sender));
            writer.add(factory.createEndElement("", "", "sender"));


            writer.add(factory.createStartElement("", "", "timestamp"));
            writer.add(factory.createCharacters(timestamp));
            writer.add(factory.createEndElement("", "", "timestamp"));

            writer.add(factory.createStartElement("", "", "environment"));
            writer.add(factory.createCharacters(environment));
            writer.add(factory.createEndElement("", "", "environment"));

            writer.add(factory.createStartElement("", "", "version"));
            writer.add(factory.createCharacters(version));
            writer.add(factory.createEndElement("", "", "version"));
            
            writer.add(factory.createEndElement("", "", "fileInformation"));
            
            writer.add(factory.createStartElement("", "", "record"));
            
        } catch (XMLStreamException e) {
            log.error("Error writing OMEGA XML Header: {}", e.getMessage());
            throw new OmegaXmlHeaderWriterException(e.getMessage());
        }
    }
}

ItemProcessor 的代码如下。我将标头数据设置为ExecutionContext,它旨在由 headerCallback 读取(遗憾的是不会发生)。

@Slf4j
public class PIExtractorItemProcessor implements ItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> {

    @Autowired
    PersonalDataExtractor personalDataExtractor;
    
    @Value("#{jobParameters['submission.account']}") 
    private String subAccntId;
    
    @Value("#{stepExecution}")
    private StepExecution stepExecution;
    
    @Override
    public ProcessorWriterDto process(CustomHeaderTransactionXmlElement headerTransactionElement) throws Exception {
        FileInformation header = null;
        TransactionPositionReport transaction = null;
        if(headerTransactionElement instanceof FileInformation) {
            header = (FileInformation)headerTransactionElement;
            stepExecution.getExecutionContext().put("header.sender", header.getSender());
            stepExecution.getExecutionContext().put("header.timestamp", header.getTimestamp());
            stepExecution.getExecutionContext().put("header.environment", header.getEnvironment());
            stepExecution.getExecutionContext().put("header.version", header.getVersion());
            log.debug("Header {} found.", header.toString());
            return null;
        } else {
            transaction = (TransactionPositionReport)headerTransactionElement;
            log.debug("NO header info found for transaction {}", transaction.getProcessingDetails().getCustomerTransactionId());
            log.info("Extracting personal data for transaction customer id {} and create EDS requestDto.", transaction.getProcessingDetails().getCustomerTransactionId());
            ProcessorWriterDto transferObject = new ProcessorWriterDto();
            transferObject.setEdsRequestDtoList(personalDataExtractor.extract(transaction, subAccntId));
            transferObject.setTransaction(transaction);
            return transferObject;
        }
    }
}

我推荐的链接:

【问题讨论】:

    标签: java spring-boot xml-parsing spring-batch spring-batch-stream


    【解决方案1】:

    你的步骤做得太多了。我会将事情分为两个步骤:

    • 第一步:提取文件信息头,放入作业执行上下文中
    • 第 2 步:从执行上下文中读取文件信息标头,并在该步骤所需的任何步骤范围的 bean 中使用它(例如,您的案例中的 stax 回调)

    这是一个简单的例子:

    import java.io.IOException;
    import java.io.Serializable;
    import java.util.Map;
    
    import javax.xml.stream.XMLEventWriter;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.xml.StaxWriterCallback;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class SO67909123 {
    
        @Bean
        public Step extractHeaderStep(StepBuilderFactory steps) {
            return steps.get("extractHeaderStep")
                    .tasklet((contribution, chunkContext) -> {
                        Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
                        String inputFile = (String) jobParameters.get("file");
                        FileInformation fileInformation = extractFileInformation(inputFile);
                        ExecutionContext jobExecutionContext =  chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
                        jobExecutionContext.put("file.information", fileInformation);
                        return RepeatStatus.FINISHED;
                    }).build();
        }
    
        private FileInformation extractFileInformation(String inputFile) {
            // TODO extract header from inputFile
            FileInformation fileInformation = new FileInformation();
            fileInformation.sender = "200GH7XKDGO3GLZ60";
            fileInformation.version = "001.60";
            return fileInformation;
        }
    
        @Bean
        public Step processFile(StepBuilderFactory steps) {
            return steps.get("processFile")
                    .tasklet((contribution, chunkContext) -> { // Change this to a chunk-oriented tasklet
                        Map<String, Object> jobExecutionContext = chunkContext.getStepContext().getJobExecutionContext();
                        FileInformation fileInformation = (FileInformation) jobExecutionContext.get("file.information");
                        System.out.println("Step 2: " + fileInformation);
                        return RepeatStatus.FINISHED;
            }).build();
        }
        
        @Bean
        @StepScope
        public StaxWriterCallback staxWriterCallback(@Value("#{jobExecutionContext['file.information']}") FileInformation fileInformation) {
            return new StaxWriterCallback() {
                @Override
                public void write(XMLEventWriter writer) throws IOException {
                    // use fileInformation as needed here 
                }
            };
        }
    
        @Bean
        public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
            return jobs.get("job")
                    .start(extractHeaderStep(steps))
                    .next(processFile(steps))
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(SO67909123.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            JobParameters jobParameters = new JobParametersBuilder()
                    .addString("file", "transactions.xml")
                    .toJobParameters();
            jobLauncher.run(job, jobParameters);
        }
    
        static class FileInformation implements Serializable {
            private String sender;
            private String version;
            // other fields
    
            @Override
            public String toString() {
                return "FileInformation{sender='" + sender + '\'' + ", version='" + version + '\'' + '}';
            }
        }
    
    }
    

    这个例子展示了这个想法。您只需要编写从文件中提取 xml 标记的 sn-p(仅标题,请参阅 TODO)。该示例中的StaxWriterCallback 是一个步进范围的bean,可以使用执行上下文中的标头。步骤 2 中的其他步骤范围组件也可以以相同方式配置(处理器、侦听器等)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-05-11
      • 2018-04-22
      • 2020-10-09
      • 2017-03-25
      • 1970-01-01
      • 2021-11-10
      • 1970-01-01
      • 2015-06-21
      相关资源
      最近更新 更多