为了能够在作业中使用聚合,利用 Spring Batch 的所有功能,您必须创建一个自定义 ItemReader。
扩展 AbstractPaginatedDateItemReader 我们可以使用来自可分页操作的所有元素。
这是一个简单的自定义类:
public class CustomAggreagationPaginatedItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {
private static final Pattern PLACEHOLDER = Pattern.compile("\\?(\\d+)");
private MongoOperations template;
private Class<? extends T> type;
private Sort sort;
private String collection;
public CustomAggreagationPaginatedItemReader() {
super();
setName(ClassUtils.getShortName(CustomAggreagationPaginatedItemReader.class));
}
public void setTemplate(MongoOperations template) {
this.template = template;
}
public void setTargetType(Class<? extends T> type) {
this.type = type;
}
public void setSort(Map<String, Sort.Direction> sorts) {
this.sort = convertToSort(sorts);
}
public void setCollection(String collection) {
this.collection = collection;
}
@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {
Pageable pageRequest = new PageRequest(page, pageSize, sort);
BasicDBObject cursor = new BasicDBObject();
cursor.append("batchSize", 100);
SkipOperation skipOperation = skip(Long.valueOf(pageRequest.getPageNumber()) * Long.valueOf(pageRequest.getPageSize()));
Aggregation aggregation = newAggregation(
//Include here all your aggreationOperations,
skipOperation,
limit(pageRequest.getPageSize())
).withOptions(newAggregationOptions().cursor(cursor).build());
return (Iterator<T>) template.aggregate(aggregation, collection, type).iterator();
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.state(template != null, "An implementation of MongoOperations is required.");
Assert.state(type != null, "A type to convert the input into is required.");
Assert.state(collection != null, "A collection is required.");
}
private String replacePlaceholders(String input, List<Object> values) {
Matcher matcher = PLACEHOLDER.matcher(input);
String result = input;
while (matcher.find()) {
String group = matcher.group();
int index = Integer.parseInt(matcher.group(1));
result = result.replace(group, getParameterWithIndex(values, index));
}
return result;
}
private String getParameterWithIndex(List<Object> values, int index) {
return JSON.serialize(values.get(index));
}
private Sort convertToSort(Map<String, Sort.Direction> sorts) {
List<Sort.Order> sortValues = new ArrayList<Sort.Order>();
for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) {
sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey()));
}
return new Sort(sortValues);
}
}
如果您仔细观察,您会发现它是使用 Spring 框架中的 MongoItemReader 创建的,您可以在 org.springframework.batch.item.data.MongoItemReader 中看到该类,您必须创建一个全新的类扩展AbstractPaginatedDataItemReader,如果你看一下“doPageRead”方法你应该可以看到它只使用了MongoTemplate的find操作,因此无法在其中使用Aggregate操作。
这是我们的CustomReader应该如何使用它:
@Bean
public ItemReader<YourDataClass> reader(MongoTemplate mongoTemplate) {
CustomAggreagationPaginatedItemReader<YourDataClass> customAggreagationPaginatedItemReader = new CustomAggreagationPaginatedItemReader<>();
Map<String, Direction> sort = new HashMap<String, Direction>();
sort.put("id", Direction.ASC);
customAggreagationPaginatedItemReader.setTemplate(mongoTemplate);
customAggreagationPaginatedItemReader.setCollection("collectionName");
customAggreagationPaginatedItemReader.setTargetType(YourDataClass.class);
customAggreagationPaginatedItemReader.setSort(sort);
return customAggreagationPaginatedItemReader;
}
您可能注意到,您还需要一个 MongoTemplate 实例,它也应该是这样的:
@Bean
public MongoTemplate mongoTemplate(MongoDbFactory mongoDbFactory) {
return new MongoTemplate(mongoDbFactory);
}
MongoDbFactory 是 spring 框架自动装配的对象。
希望这足以帮助您。