【问题标题】:Skipping Exception in ItemProcessor leads to quadratic behavior在 ItemProcessor 中跳过异常会导致二次行为
【发布时间】:2021-07-18 16:28:55
【问题描述】:

有没有办法改变 ItemProcessor 的跳过行为,使其表现得像 ItemWriter?在 ItemProcessor 中抛出可跳过的异常会导致重新处理所有已接受的项目,从而导致其复杂性呈二次方的行为。有没有办法改变行为以对所有项目进行回滚并逐个处理元素,就像 ItemWriter 一样?

工作定义:

@Bean
public Job job() {
    return jobBuilderFactory.get("job").start(step(null)).build();
}

@Bean
public Step step(ReaderProcessorWriter readerProcessorWriter) {
    return stepBuilderFactory.get("step")
            .<Integer, Integer>chunk(20).faultTolerant()
            .skip(RuntimeException.class).skipLimit(10)
            .reader(readerProcessorWriter)
            .processor(readerProcessorWriter)
            .writer(readerProcessorWriter)
            .build();
}

读取器、处理器和写入器:

@Component
public static class ReaderProcessorWriter extends ListItemReader<Integer> implements ItemProcessor<Integer, Integer>, ItemWriter<Integer> {
    private int run;

    public ReaderProcessorWriter() {
        super(List.of(0, 1, 2, 3, 4, 5, 6, 7,8 ,9));
    }

    @Override
    public Integer process(Integer integer) {
        // Probably some long computation involving lots of DB Reading, lasting minutes at worst
        if (integer >= 5) {
            System.out.println("FAIL: " + integer);
            throw new RuntimeException("Hue hue");
        }
        System.out.println("OK: " + integer);
        return integer;
    }

    @Override
    public void write(List<? extends Integer> list) {
        if (run++ == 0) {
            throw new RuntimeException("Writer");
        }
        System.out.println(list);
    }
}

输出:

OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 5
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 6
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 7
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 8
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
FAIL: 9
OK: 0
OK: 1
OK: 2
OK: 3
OK: 4
OK: 0
[0]
OK: 1
[1]
OK: 2
[2]
OK: 3
[3]
OK: 4
[4]

在示例中,项目 0-4 在成功写入之前被处理了 6 次(假设 Writer 不会抛出)。

  • 5 次,因为处理器上的回滚
  • 1 表示处理成功

如果 ItemProcessor 使用与 ItemWriter 相同的跳过策略,那么它们只会被处理 2 次:

  • 1 次处理失败
  • 1 次单独交易

【问题讨论】:

  • 也许使用 RetryTemplate?
  • 也许使用 RetryTemplate?见:baeldung.com/spring-retry
  • 重试模板仅适用于瞬态异常,例如网络故障、并发修改或乐观锁定。如果异常是由格式错误的数据引起的,那么 RetryTemplate 会导致 Step Failure 而不是 Skipping element。
  • 我的意思是用它来管理一些外部状态对象以防出现异常并将该对象用作水印来恢复处理?我在互联网上找不到其他任何东西,也许是因为批处理并不意味着在失败的情况下恢复,我认为您将不得不实施一种解决方法
  • 是的。这就是 Skip 机制的原因。有一个简单的解决方案 - 只需将所有业务逻辑移至 ItemWriter,但这不是最干净的方法。使用内部对象的方法失败了,因为事务已回滚并且从未发生写入并且它与可重新启动性混淆。

标签: java spring spring-batch


【解决方案1】:

您可以尝试将容错处理器配置为非事务性的。然后将处理成功的item的处理结果缓存起来,这样在回滚和重试再次处理chunk的过程中,它只是从缓存中获取处理结果,而不是重新处理成功的item。

@Bean
public Step step(ReaderProcessorWriter readerProcessorWriter) {
    return stepBuilderFactory.get("step")
            .<Integer, Integer>chunk(20).faultTolerant()
            .skip(RuntimeException.class).skipLimit(10)
            .reader(readerProcessorWriter)
            .processor(readerProcessorWriter)
            .processorNonTransactional()
            .writer(readerProcessorWriter)
            .build();
}

您必须查看它是否适合您的用例。当使用 JPA 处理项目时,我觉得在回滚的情况下更安全,所有在回滚事务中加载和处理的实体都应该被丢弃,最好不要在另一个新事务中重用它们,因为实体将在新交易对我来说似乎让事情变得更加复杂。

如果重新处理一个项目需要相当长的时间,我会尝试看看处理一个项目中的瓶颈操作是否可以微调,例如使用缓存来缓存那些扩展操作的结果。

【讨论】:

  • 当所有实体在回滚时都被丢弃时,我也感觉更安全,但我正在寻找一种方法来使用与 ItemWriter 相同的跳过行为 - 在跳过时,所有实体都被丢弃并且每个实体都被丢弃其中的一部分在单独的事务中重新处理。
  • 我认为您可以在使用构建器构建步骤时尝试自定义completionPolicy(甚至是新的chunkOperations?),以便在发生跳过的情况下,它将更改@ 987654324@ 到 1。类似于现有的SimpleCompletionPolicy,但如果发生跳过,其大小将更新为 1。您可以查看RepeatListener 是否有助于让您在发生跳过时接收事件,以便您可以更新chunkSize
  • 但作为具有高性能批处理作业的关键点之一是尝试批处理多个项目以在事务中处理和提交。因此,使每个事务只处理一个项目会减慢您的批处理作业.我仍然会选择使每个项目的处理速度非常快(例如使用缓存等)的选项,这样即使 spring-batch 会在这种情况下重新处理项目,由于这种重新处理它的影响是非常大的与批处理多个项目以在同一事务中提交/写入时的性能增益相比,可以忽略不计。
  • 是的,但它仍然比为除失败元素之外的所有元素重做事务要快。当您有 100 个元素并且最后三个元素格式错误时,前 97 个元素的处理将发生 4 次(几乎 400 次处理调用,而只有 200 次)。当最后 10 次失败时,您有约 800 次处理调用,相比之下,当最后 20 次失败时,您有约 1700 次处理调用而不是 200 次。
  • 好吧,你必须测量它。也许你是对的,但我说的是在重新处理项目的情况下,它可以微调到水平,就像调用本地 java 方法一样连接到任何外部系统(如果可能的话,即毫秒的一位数)并根据我的经验并以您的最后一个示例为例,在 80 次交易中处理 80 项(即每笔交易 1 项)+ 200 process() 调用 VS 处理 80 项在 1 笔交易中 + 1700 个process() 电话。那么后者还是快很多。但它适用于我的情况,你必须根据你的用例来衡量它。
【解决方案2】:

此问题的一个可能解决方法是以下包装器,它将抛出的异常推迟到 ItemWriter,它将失败的块拆分为单个事务。我仍然想知道是否有针对此问题的标准 Spring Batch 解决方案:

public static class Either<T> {
    private final T t;
    private final Exception e;

    public Either(T t, Exception e) {
        this.t = t;
        this.e = e;
    }

    public T get() throws Exception {
        if (e != null) {
            throw e;
        }
        return t;
    }
}

public static class EitherItemProcessor<Input, Output> implements ItemProcessor<Input, Either<Output>> {
    private final ItemProcessor<Input, Output> wrapped;

    public EitherItemProcessor(ItemProcessor<Input, Output> wrapped) {
        this.wrapped = wrapped;
    }

    @Override
    public Either<Output> process(Input input) throws Exception {
        try {
            return new Either<>(wrapped.process(input), null);
        } catch (Exception e) {
            return new Either<>(null, e);
        }
    }
}


public static class EitherItemWriter<Output> implements ItemWriter<Either<Output>> {
    private final ItemWriter<Output> itemWriter;

    public EitherItemWriter(ItemWriter<Output> itemWriter) {
        this.itemWriter = itemWriter;
    }

    @Override
    public void write(List<? extends Either<Output>> list) throws Exception {
        List<Output> outputs = new ArrayList<>(list.size());
        for (Either<Output> e : list) {
            outputs.add(e.get());
        }
        itemWriter.write(outputs);
    }
}

【讨论】:

    猜你喜欢
    • 2013-06-03
    • 2012-05-11
    • 2010-09-18
    • 2022-01-12
    • 1970-01-01
    • 2018-02-11
    • 1970-01-01
    • 2017-03-10
    • 1970-01-01
    相关资源
    最近更新 更多