【问题标题】:How to use Aggregation Query with MongoItemReader in spring batch如何在春季批处理中将聚合查询与 MongoItemReader 一起使用
【发布时间】:2018-02-02 05:56:45
【问题描述】:

需求发生了一些变化,我必须在 setQuery() 中使用基本查询的聚合查询。这甚至可能吗? 请建议我该怎么做?我的聚合查询已准备就绪,但不确定如何在春季批处理中使用它

public ItemReader<ProfileCollection> searchMongoItemReader() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        MongoItemReader<MyCollection> mongoItemReader = new MongoItemReader<>();
        mongoItemReader.setTemplate(myMongoTemplate);
        mongoItemReader.setCollection(myMongoCollection);

        mongoItemReader.setQuery(" Some Simple Query - Basic");

        mongoItemReader.setTargetType(MyCollection.class);
        Map<String, Sort.Direction> sort = new HashMap<>();
        sort.put("field4", Sort.Direction.ASC);
        mongoItemReader.setSort(sort);
        return mongoItemReader;

    }

【问题讨论】:

    标签: java mongodb spring-batch


    【解决方案1】:

    为了能够在作业中使用聚合,利用 Spring Batch 的所有功能,您必须创建一个自定义 ItemReader。 扩展 AbstractPaginatedDateItemReader 我们可以使用来自可分页操作的所有元素。 这是一个简单的自定义类:

    public class CustomAggreagationPaginatedItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {
    
        private static final Pattern PLACEHOLDER = Pattern.compile("\\?(\\d+)");
        private MongoOperations template;
        private Class<? extends T> type;
        private Sort sort;
        private String collection;
    
        public CustomAggreagationPaginatedItemReader() {
            super();
            setName(ClassUtils.getShortName(CustomAggreagationPaginatedItemReader.class));
        }
    
        public void setTemplate(MongoOperations template) {
            this.template = template;
        }
    
        public void setTargetType(Class<? extends T> type) {
            this.type = type;
        }
    
        public void setSort(Map<String, Sort.Direction> sorts) {
            this.sort = convertToSort(sorts);
        }
    
        public void setCollection(String collection) {
            this.collection = collection;
        }
    
        @Override
        @SuppressWarnings("unchecked")
        protected Iterator<T> doPageRead() {
            Pageable pageRequest = new PageRequest(page, pageSize, sort);
    
            BasicDBObject cursor = new BasicDBObject();
            cursor.append("batchSize", 100);
    
            SkipOperation skipOperation = skip(Long.valueOf(pageRequest.getPageNumber()) * Long.valueOf(pageRequest.getPageSize()));
    
            Aggregation aggregation = newAggregation(
                    //Include here all your aggreationOperations,
                    skipOperation,
                    limit(pageRequest.getPageSize())
                ).withOptions(newAggregationOptions().cursor(cursor).build());
    
            return (Iterator<T>) template.aggregate(aggregation, collection, type).iterator();
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            Assert.state(template != null, "An implementation of MongoOperations is required.");
            Assert.state(type != null, "A type to convert the input into is required.");
            Assert.state(collection != null, "A collection is required.");
        }
    
        private String replacePlaceholders(String input, List<Object> values) {
            Matcher matcher = PLACEHOLDER.matcher(input);
            String result = input;
    
            while (matcher.find()) {
                String group = matcher.group();
                int index = Integer.parseInt(matcher.group(1));
                result = result.replace(group, getParameterWithIndex(values, index));
            }
    
            return result;
        }
    
        private String getParameterWithIndex(List<Object> values, int index) {
            return JSON.serialize(values.get(index));
        }
    
        private Sort convertToSort(Map<String, Sort.Direction> sorts) {
            List<Sort.Order> sortValues = new ArrayList<Sort.Order>();
    
            for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) {
                sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey()));
            }
    
            return new Sort(sortValues);
        }
    }
    

    如果您仔细观察,您会发现它是使用 Spring 框架中的 MongoItemReader 创建的,您可以在 org.springframework.batch.item.data.MongoItemReader 中看到该类,您必须创建一个全新的类扩展AbstractPaginatedDataItemReader,如果你看一下“doPageRead”方法你应该可以看到它只使用了MongoTemplate的find操作,因此无法在其中使用Aggregate操作。

    这是我们的CustomReader应该如何使用它:

    @Bean
    public ItemReader<YourDataClass> reader(MongoTemplate mongoTemplate) {
        CustomAggreagationPaginatedItemReader<YourDataClass> customAggreagationPaginatedItemReader = new CustomAggreagationPaginatedItemReader<>();
    
        Map<String, Direction> sort = new HashMap<String, Direction>();
        sort.put("id", Direction.ASC);
    
        customAggreagationPaginatedItemReader.setTemplate(mongoTemplate);
        customAggreagationPaginatedItemReader.setCollection("collectionName");
        customAggreagationPaginatedItemReader.setTargetType(YourDataClass.class);
        customAggreagationPaginatedItemReader.setSort(sort);
    
        return customAggreagationPaginatedItemReader;
    }
    

    您可能注意到,您还需要一个 MongoTemplate 实例,它也应该是这样的:

    @Bean
    public MongoTemplate mongoTemplate(MongoDbFactory mongoDbFactory) {
        return new MongoTemplate(mongoDbFactory);
    }
    

    MongoDbFactory 是 spring 框架自动装配的对象。

    希望这足以帮助您。

    【讨论】:

      【解决方案2】:

      扩展 MongoItemReader 并为方法 doPageRead() 提供您自己的实现。这样,您将获得完整的分页支持,并且阅读文档将成为一个步骤的一部分。

      public class CustomMongoItemReader<T, O> extends MongoItemReader<T> {
      private MongoTemplate template;
      private Class<? extends T> inputType;
      private Class<O> outputType
      private MatchOperation match;
      private ProjectionOperation projection;
      private String collection;
      
      @Override
      protected Iterator<T> doPageRead() {
          Pageable page = PageRequest.of(page, pageSize) //page and page size are coming from the class that MongoItemReader extends
          Aggregation agg = newAggregation(match, projection, skip(page.getPageNumber() * page.getPageSize()), limit(page.getPageSize()));
          return (Iterator<T>) template.aggregate(agg, collection, outputType).iterator();
      
      }
      }
      

      还有其他getter和setter等方法。只需查看 MongoItemReader here 的源代码即可。 我还从中删除了 Query 支持。你也可以用同样的方法从 MongoItemReader 复制粘贴它。与排序相同。

      在你有读者的课堂上,你会做这样的事情:

      public MongoItemReader<T> reader() {
          CustomMongoItemReader reader = new CustomMongoItemReader();
          reader.setTemplate(mongoTemplate);
          reader.setName("abc");
          reader.setTargetType(input.class);
          reader.setOutputType(output.class);
          reader.setCollection(myMongoCollection);
          reader.setMatch(Aggregation.match(new Criteria()....)));
          reader.setProjection(Aggregation.project("..","..");
          return reader;
      }
      

      【讨论】:

      • 我尝试过聚合,下面是我尝试过的代码。 public class CustomeMongoItemReader&lt;T, O&gt; extends MongoItemReader&lt;T&gt; { private MongoTemplate template; private Class&lt;? extends T&gt; inputType; private Class&lt;O&gt; outputType; private MatchOperation match; private ProjectionOperation projection; private String collection; private Aggregation aggregation; @Override protected Iterator&lt;T&gt; doPageRead() { return (Iterator&lt;T&gt;) template.aggregate(aggregation, collection, outputType).iterator(); } 失败,结果为空
      • 我在 mongoOperations() 方法中这样做CustomeMongoItemReader reader=new CustomeMongoItemReader(); reader.setTemplate(mongoOperations()); 我正在返回 MongoTemplate 但是在自定义阅读器模板中执行时设置为 null 而不是 MongoTemplate 对象你能帮我吗跨度>
      • 我发现问题MongoTemplate对象设置为MongoItemReader setTemplate()方法而不是CustomItemReader setTemplate()方法设置
      • 我终于实现了这个,但是我的CustomMongoItemReader 没有维护项目的状态,它也在重复读取已处理的项目。我们如何解决这种情况?
      • 我不知道。我以前没有遇到过这种情况。
      猜你喜欢
      • 2014-12-15
      • 2018-05-27
      • 2015-05-13
      • 2017-06-25
      • 2017-10-06
      • 2020-03-21
      • 2018-06-15
      • 2021-06-21
      • 1970-01-01
      相关资源
      最近更新 更多