【问题标题】:Write data to Kafka in spring batch春季批量向Kafka写入数据
【发布时间】:2020-01-25 06:24:25
【问题描述】:

刚接触 Kafka,最近我正在尝试从 Spring Batch 中获取数据,然后写入 Kafka,但我不知道该怎么做。 有人可以帮我弄清楚如何将数据写入卡夫卡吗? 下面是我用 SpringBatch 编写的获取数据的演示代码:

@配置 公共类 FileReader {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("flatFileWriter")
private ItemWriter<? super Demo1> flatFileWriter;
@Bean
public Job FileReaderJob() {
    return jobBuilderFactory.get("FileReaderJob").start(FileReaderStep()).build();
}
private Step FileReaderStep() {
    // TODO Auto-generated method stub
    return stepBuilderFactory.get("FileReaderStep").<Demo1,Demo1>chunk(100).reader(flatFileReader())
            .writer(flatFileWriter).build();
}

@Bean
@StepScope
public FlatFileItemReader<Demo1> flatFileReader() {
    // TODO Auto-generated method stub

    FlatFileItemReader<Demo1> reader = new FlatFileItemReader<Demo1>();
    reader.setResource(new  ClassPathResource("Demo1.csv"));
    reader.setLinesToSkip(1);
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames(new String [] {"id","first","last"});
    DefaultLineMapper<Demo1> mapper = new DefaultLineMapper<>();
    mapper.setLineTokenizer(tokenizer);
    mapper.setFieldSetMapper(new FieldSetMapper<Demo1>() {

        @Override
        public Demo1 mapFieldSet(FieldSet fieldSet) throws BindException {
            Demo1 demo1 = new Demo1();
            demo1.setId(fieldSet.readLong("id"));
            demo1.setFirst(fieldSet.readString("first"));
            demo1.setLast(fieldSet.readString("last"));

            // TODO Auto-generated method stub
            return demo1;
        }
    });

    mapper.afterPropertiesSet();
    reader.setLineMapper(mapper);

    return reader;
}

}

【问题讨论】:

    标签: spring spring-boot apache-kafka spring-batch spring-kafka


    【解决方案1】:

    即将推出的 Spring Batch v4.2 GA 将支持读取/写入数据到 Apache Kafka 主题。您已经可以使用4.2.0.RC1 release 进行尝试了。

    对于KafkaItemWriter,您需要配置KafkaTemplate。下面是作者的一个例子:

    @Bean
    public KafkaItemWriter<String, Demo1> kafkaItemWriter(KafkaTemplate<String, Demo1> kafkaTemplate) {
        return new KafkaItemWriterBuilder<String, Demo1>()
                .kafkaTemplate(kafkaTemplate)
                .build();
    }
    

    您还可以查看 Josh Long 撰写的关于 Spring Batch 中 Kafka 支持的 Spring Tips installment

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-03-26
      • 2018-02-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-01-21
      • 1970-01-01
      相关资源
      最近更新 更多