【发布时间】:2020-01-25 06:24:25
【问题描述】:
刚接触 Kafka,最近我正在尝试从 Spring Batch 中获取数据,然后写入 Kafka,但我不知道该怎么做。 有人可以帮我弄清楚如何将数据写入卡夫卡吗? 下面是我用 SpringBatch 编写的获取数据的演示代码:
@配置 公共类 FileReader {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileWriter")
private ItemWriter<? super Demo1> flatFileWriter;
@Bean
public Job FileReaderJob() {
return jobBuilderFactory.get("FileReaderJob").start(FileReaderStep()).build();
}
private Step FileReaderStep() {
// TODO Auto-generated method stub
return stepBuilderFactory.get("FileReaderStep").<Demo1,Demo1>chunk(100).reader(flatFileReader())
.writer(flatFileWriter).build();
}
@Bean
@StepScope
public FlatFileItemReader<Demo1> flatFileReader() {
// TODO Auto-generated method stub
FlatFileItemReader<Demo1> reader = new FlatFileItemReader<Demo1>();
reader.setResource(new ClassPathResource("Demo1.csv"));
reader.setLinesToSkip(1);
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String [] {"id","first","last"});
DefaultLineMapper<Demo1> mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper<Demo1>() {
@Override
public Demo1 mapFieldSet(FieldSet fieldSet) throws BindException {
Demo1 demo1 = new Demo1();
demo1.setId(fieldSet.readLong("id"));
demo1.setFirst(fieldSet.readString("first"));
demo1.setLast(fieldSet.readString("last"));
// TODO Auto-generated method stub
return demo1;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
}
【问题讨论】:
标签: spring spring-boot apache-kafka spring-batch spring-kafka