【问题标题】:Spring Batch - AmqpWriter and AmqpReader exampleSpring Batch - AmqpWriter 和 AmqpReader 示例
【发布时间】:2021-01-16 13:43:17
【问题描述】:

需要一种解决方案,使用AmqpWriter 在 RabbitMQ 上写入数据,并使用 AmqpReader 使用 RabbitMQ 读取数据。我们不是在寻找 Apache Kafka,我们只想简单地发送说程序详细信息并使用它。

编写器代码

JobConfig.java

@Configuration
public class JobConfig {
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }
    
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public Queue myQueue() {
       return new Queue("myqueue");
    }
    

    @Bean
    public FlatFileItemReader<Customer> customerItemReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new ClassPathResource("/data/customer.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });

        DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
        customerLineMapper.setLineTokenizer(tokenizer);
        customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
        customerLineMapper.afterPropertiesSet();

        reader.setLineMapper(customerLineMapper);

        return reader;
    }
        
    @Bean
    public AmqpItemWriter<Customer> amqpWriter(){
        AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
        return amqpItemWriter;
    }
        
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customerItemReader())
                .writer(amqpWriter())
                .build();
    }
    
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
}

CustomerFieldSetMapper.java

public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
    
    @Override
    public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
        return Customer.builder()
                .id(fieldSet.readLong("id"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .birthdate(fieldSet.readRawString("birthdate"))
                .build();
    }
}

客户.java

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

SpringBatchAmqpApplication.java

@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchAmqpApplication.class, args);
    }
}

阅读器代码

JobConfiguration.java

@Configuration
public class JobConfiguration {
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(jsonMessageConverter());
        return factory; 
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setDefaultReceiveQueue("myqueue");
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    @Bean
    public ItemReader<Customer> customerReader(){
        return new AmqpItemReader<>(this.rabbitTemplate());
    }

    @Bean
    public ItemWriter<Customer> customerItemWriter(){
        return items -> {
            for(Customer c : items) {
                System.out.println(c.toString());
            }
        };
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(10)
                .reader(customerReader())
                .writer(customerItemWriter())
                .listener(customerStepListener())
                .build();
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(step1())
                .build();
    }

    @Bean
    public CustomerStepListener customerStepListener() {
        return new CustomerStepListener();
    }
}

CustomerStepListener.java

public class CustomerStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("==");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("READ COUNT = "+stepExecution);
        return ExitStatus.COMPLETED;
    }
}

日志

2021-01-18 18:41:05.023 INFO 25532 --- [main] os.batch.core.job.SimpleStepHandler:执行步骤:[step1] == 2021-01-18 18:41:05.031 INFO 25532 --- [main] o.s.a.r.c.CachingConnectionFactory:尝试连接到:localhost:5672 2021-01-18 18:41:05.072 INFO 25532 --- [main] o.s.a.r.c.CachingConnectionFactory:创建新连接:connectionFactory#20a14b55:0/SimpleConnection@4650a407 [delegate=amqp://guest@127.0.0.1:5672/,本地端口 = 55797] READ COUNT = StepExecution: id=1, version=2, name=step1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount= 1、rollbackCount=0、exitDescription= 2021-01-18 18:41:05.097 INFO 25532 --- [main] os.batch.core.step.AbstractStep:步骤:[step1] 在 73 毫秒内执行 2021-01-18 18:41:05.099 INFO 25532 --- [main] o.s.b.c.l.support.SimpleJobLauncher:作业:[SimpleJob:[name=job]] 使用以下参数完成:[{-spring.output.ansi.enabled =always}] 和以下状态:[COMPLETED] in 87ms

【问题讨论】:

  • no data is going to Queue.: 日志中是否有任何错误/警告?作业完成后,您可以检查步骤执行中的readCount/writeCount 吗?

标签: spring spring-batch spring-amqp


【解决方案1】:

在“编写器代码”方面,您使用的是配置有 RabbitTemplateAmqpItemWriter。默认情况下,消息将被发送到无名交换,这里是 Javadoc 的摘录:

Messages will be sent to the nameless exchange if not specified on the provided AmqpTemplate.

在您的编写器配置中,兔子模板和您的队列之间没有“连接”。所以你需要配置rabbit模板来向你的队列发送消息:

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setRoutingKey(myQueue().getName());
    return rabbitTemplate;
}

这与您在阅读器方面使用rabbitTemplate.setDefaultReceiveQueue("myqueue"); 所做的类似。

【讨论】:

  • 作家现在可以工作,但读者给出错误 - org.springframework.amqp.support.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.demo.model.Customer]
  • 确保在写入端被序列化的类Customer 也出现在读取端,并且在同一个包中以便正确反序列化。如果您的作业是单独打包的,则需要将类放在两个 jar 中,或者将其提取到单独的 jar 中,然后将该 jar 作为依赖项添加到两个项目中。
  • 伟大的指针,它为你解决了我的错误+1!
  • 很高兴它有帮助!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-04-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-11-05
  • 2018-06-13
相关资源
最近更新 更多