【问题标题】:How to introduce delay within a RxJava group processed parallely?如何在并行处理的 RxJava 组中引入延迟?
【发布时间】:2025-12-08 16:00:01
【问题描述】:

我的用例是对流进行分组,开始并行处理一些组,并在每个组内,将每个项目的处理延迟一个恒定间隔。我似乎无法正确获得组内的延迟,因为它们不是定期发出,而是几乎瞬间发出。以下是我使用 RxJava 2.0.5 的测试代码:

@Slf4j
public class GroupsAndDelays {
    Function<Integer, Flowable<Integer>> remoteClient;
    int groupMemberDelaySeconds;
    int remoteCallTimeoutSeconds;
    int maxRetryCount;
    int retryDelaySeconds;
    Map<Long, List<Integer>> threadMap;
    Map<Long, List<Integer>> resultThreadMap;

    public ParallelFlowable<Integer> doStuff(Flowable<Integer> src,
                                             Function<Integer, Integer> groupByFn,
                                             Function<Integer, Flowable<Integer>> responseMapper) {
        return src
                .groupBy(groupByFn)
                .parallel(5).runOn(Schedulers.newThread())
                .map(g -> g.distinct().toList())
                .flatMap(i -> i.toFlowable())
                .flatMap(i -> {
                    log.debug("Processing group: {}.", i);
                    return Flowable.fromIterable(i)
                            .delay(groupMemberDelaySeconds, SECONDS);
                })
                .flatMap(i -> {
                    log.debug("Processing: {}.", i);
                    putInThreadMap(threadMap, i);
                    return remoteCall(i * 2, responseMapper);
                });
    }

    private Flowable<Integer> remoteCall(int i, Function<Integer, Flowable<Integer>> responseMapper) throws
            Exception {
        return remoteClient.apply(i)
                .timeout(remoteCallTimeoutSeconds, SECONDS)
                .retryWhen(t -> t.zipWith(Flowable.range(1, maxRetryCount), (ex, retryCount) -> retryCount)
                        .flatMap(retryCount -> Flowable.timer(retryCount * retryDelaySeconds, SECONDS)))
                .flatMap(result -> {
                    log.debug("Processing result: {}.", result);
                    putInThreadMap(resultThreadMap, result);
                    return responseMapper.apply(result);
                })
                .onErrorReturnItem(-1);
    }

    private void putInThreadMap(Map<Long, List<Integer>> map, int i) {
        map.merge(Thread.currentThread().getId(), singletonList(i), this::merge);
    }

    private List<Integer> merge(List<Integer> a, List<Integer> b) {
        return Stream.concat(a.stream(), b.stream()).collect(Collectors.toList());
    }
}

这是一个 Spock 测试:

class GroupsAndDelaysSpec extends Specification {
    final int groupMemberDelaySeconds = 3
    final int remoteCallTimeoutSeconds = 3
    final int maxRetryCount = 2
    final int retryDelaySeconds = 2
    Function<Integer, Flowable<Integer>> remoteClient
    Function<Integer, Integer> groupByFn
    Function<Integer, Flowable<Integer>> responseMapper

    GroupsAndDelays groupsAndDelays

    final Flowable<Integer> src = Flowable.fromArray(
            1, 2, 3, 4, 5, 1, 2, 3, 4, 5,
            11, 12, 13, 14, 15, 11, 12, 13, 14, 15,
            21, 22, 23, 24, 25, 21, 22, 23, 24, 25,
            31, 32, 33, 34, 35, 31, 32, 33, 34, 35,
            41, 42, 43, 44, 45, 41, 42, 43, 44, 45
    )

    def setup() {
        remoteClient = Mock(Function)

        groupsAndDelays = new GroupsAndDelays()
        groupsAndDelays.groupMemberDelaySeconds = groupMemberDelaySeconds
        groupsAndDelays.remoteCallTimeoutSeconds = remoteCallTimeoutSeconds
        groupsAndDelays.maxRetryCount = maxRetryCount
        groupsAndDelays.retryDelaySeconds = retryDelaySeconds
        groupsAndDelays.remoteClient = remoteClient
        groupsAndDelays.threadMap = new ConcurrentHashMap<Long, List<Integer>>()
        groupsAndDelays.resultThreadMap = new ConcurrentHashMap<Long, List<Integer>>()

        groupByFn = Mock(Function)
        groupByFn.apply(_) >> { args -> args[0] % 10 }

        responseMapper = Mock(Function)
        responseMapper.apply(_) >> { args -> args[0] }
    }

    def cleanup() {
        println("Thread map: ${groupsAndDelays.threadMap}")
        println("Result thread map: ${groupsAndDelays.resultThreadMap}")

        assert groupsAndDelays.threadMap.size() == 5
        assert groupsAndDelays.threadMap.findAll { k, v -> v.size() == 5 }.size() == 5
    }

    def "each group executes on a separate thread"() {
        setup:
        remoteClient.apply(_) >> { args -> Flowable.just(args[0]) }

        when:
        groupsAndDelays.doStuff(src, groupByFn, responseMapper)
                .sequential()
                .toList()
                .blockingGet()

        then:
        true
    }
}

示例运行:

2017-02-04 00:49:19.430 [RxNewThreadScheduler-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [3, 13, 23, 33, 43].
2017-02-04 00:49:19.430 [RxNewThreadScheduler-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [1, 11, 21, 31, 41].
2017-02-04 00:49:19.430 [RxNewThreadScheduler-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [5, 15, 25, 35, 45].
2017-02-04 00:49:19.430 [RxNewThreadScheduler-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [2, 12, 22, 32, 42].
2017-02-04 00:49:19.430 [RxNewThreadScheduler-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [4, 14, 24, 34, 44].
2017-02-04 00:49:22.443 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 2.
2017-02-04 00:49:22.443 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 1.
2017-02-04 00:49:22.443 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 5.
2017-02-04 00:49:22.443 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 4.
2017-02-04 00:49:22.443 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 3.
2017-02-04 00:49:22.456 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 10.
2017-02-04 00:49:22.456 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 2.
2017-02-04 00:49:22.456 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 8.
2017-02-04 00:49:22.456 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 6.
2017-02-04 00:49:22.456 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 4.
2017-02-04 00:49:22.459 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 13.
2017-02-04 00:49:22.459 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 14.
2017-02-04 00:49:22.459 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 11.
2017-02-04 00:49:22.459 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 15.
2017-02-04 00:49:22.459 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 12.
2017-02-04 00:49:22.466 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 26.
2017-02-04 00:49:22.466 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 30.
2017-02-04 00:49:22.466 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 24.
2017-02-04 00:49:22.466 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 22.
2017-02-04 00:49:22.466 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 28.
2017-02-04 00:49:22.466 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 23.
2017-02-04 00:49:22.467 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 25.
2017-02-04 00:49:22.467 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 22.
2017-02-04 00:49:22.467 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 21.
2017-02-04 00:49:22.467 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 24.
2017-02-04 00:49:22.467 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 46.
2017-02-04 00:49:22.467 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 50.
2017-02-04 00:49:22.467 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 44.
2017-02-04 00:49:22.468 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 42.
2017-02-04 00:49:22.468 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 48.
2017-02-04 00:49:22.468 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 33.
2017-02-04 00:49:22.468 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 35.
2017-02-04 00:49:22.468 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 32.
2017-02-04 00:49:22.468 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 31.
2017-02-04 00:49:22.468 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 34.
2017-02-04 00:49:22.469 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 66.
2017-02-04 00:49:22.469 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 62.
2017-02-04 00:49:22.469 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 68.
2017-02-04 00:49:22.469 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 64.
2017-02-04 00:49:22.469 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 70.
2017-02-04 00:49:22.470 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 43.
2017-02-04 00:49:22.470 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 44.
2017-02-04 00:49:22.470 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 41.
2017-02-04 00:49:22.470 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 42.
2017-02-04 00:49:22.470 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 45.
2017-02-04 00:49:22.470 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 86.
2017-02-04 00:49:22.470 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 88.
2017-02-04 00:49:22.470 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 82.
2017-02-04 00:49:22.470 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 84.
2017-02-04 00:49:22.470 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 90.
Thread map: [20:[3, 13, 23, 33, 43], 21:[2, 12, 22, 32, 42], 22:[5, 15, 25, 35, 45], 23:[4, 14, 24, 34, 44], 24:[1, 11, 21, 31, 41]]
Result thread map: [20:[6, 26, 46, 66, 86], 21:[4, 24, 44, 64, 84], 22:[10, 30, 50, 70, 90], 23:[8, 28, 48, 68, 88], 24:[2, 22, 42, 62, 82]]

Process finished with exit code 0

编辑

如果您还可以在 project Reactor 中展示如何执行此操作,则可获得奖励积分。

编辑 2: 使用项目 Reactor 的解决方案是here

【问题讨论】:

    标签: rx-java reactive-programming rx-java2


    【解决方案1】:

    RxJava 2 扩展库包含 spanout operator

    delay() 替换为

    compose(FlowableTransformers.spanout(
        groupMemberDelaySeconds, groupMemberDelaySeconds, SECONDS))
    

    【讨论】:

    • 你为什么说大卫?我也在尝试 Reactor 项目(见编辑)。
    • 你认为谁完成了 RxJava 2 的大部分工作,实际上是 Reactor 3 的重要部分?
    • 相信我,我理解这种情绪,去过那里,做到了。我不会削弱你对这些项目的贡献。但我希望看到更多人对维护代码感兴趣,而不是一个人成为瓶颈。 Ben Christensen 离开后,RxJava1 的开发不是放缓了吗?
    • RxJava 没有开发瓶颈,在我去年 6 月带头之前只有管理。
    • 我认为您正在寻找反应堆中的 Flux#delayElements 运算符? (3.0.5.BUILD-SNAPSHOT,以前是3.0.4的调用只是延迟
    【解决方案2】:

    我假设您想在此 flatMap 中返回的可迭代的发射之间插入延迟:

    .flatMap(i -> {
       log.debug("Processing group: {}.", i);
           return Flowable.fromIterable(i)
               .delay(groupMemberDelaySeconds, SECONDS);
    })
    

    在这种情况下,您误解了delay 运算符。它只是将排放物转移到指定的时间。要在每次发射之间插入延迟,您可以使用interval observable 压缩它

    .flatMap(i -> {
       log.debug("Processing group: {}.", i);
           return Flowable.fromIterable(i)
               .zipWith(Flowable.interval(groupMemberDelaySeconds, SECONDS), (item, time) -> item)
    })
    

    然而,您需要了解这种方法只有在您可以确定您的可观察对象始终 比指定的间隔更频繁地产生,否则您最终可能会缓冲来自间隔的排放,这意味着一旦它们进入接下来的几个项目,就会从所需的可观察到的瞬时发射,具体取决于事件的数量从可观察的区间缓冲起来。当然,有一些方法可以解决这个问题,但是这个方法要简单得多,并且当您使用 Iterable 时,您可以确定(在合理范围内)它不会发生。

    【讨论】:

    • 你可能是对的,我修改了我的评论。我已经迁移到使用项目 Reactor 而不是 RxJava2 并且无法在代码中添加您的评论。我会在尝试后回复。我在我的问题中添加了一个编辑,询问如何在项目 Reactor 中执行此操作。
    【解决方案3】:

    你可以试试下面的代码。关键是使用zipWithinterval 结合使用,以确保所有项目的特定时间发射。

    public static void main(String[] args) {
        Observable<Integer> o1 = Observable.range(1, 3);
        System.out.println("Without delay");
        o1.subscribe(v -> System.out.println(v));
    
        System.out.println("With delay");
        o1.zipWith(Observable.interval(1, TimeUnit.SECONDS), (a, b)->a ).subscribe(a->System.out.println(a));
        Observable.timer(5, TimeUnit.SECONDS).toBlocking().subscribe();
    }
    

    【讨论】:

    • groupBy及其组的并行处理在哪里?
    • 嗯,这不是一个完整的解决方案,而是一个示例实现来介绍您需要的延迟。在此基础上,为问题量身定做完整的解决方案。