【问题标题】:How to limit flatMap concurrency in Combine still having all source events processed?如何在仍然处理所有源事件的组合中限制 flatMap 并发?
【发布时间】:2021-04-09 05:01:35
【问题描述】:

如果我指定 maxPublishers 参数,那么第一个 maxPublishers 事件之后的源事件将不会被平面映射。虽然我只想限制并发。也就是在一些第一批 maxPublishers 平面地图发布者完成后继续处理下一个事件。

Publishers.Merge(
    addImageRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
        .compactMap { $0 }
        .flatMap(maxPublishers: .max(3)) { self.addImage($0) },
    addVideoRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

我还尝试在 OperationQueue 的帮助下限制并发。但是maxConcurrentOperationCount好像没有效果。

Publishers.Merge(
    addImageRequestSubject
        .receive(on: imageCompressionQueue)
        .flatMap { self.compressImage($0) }
        .compactMap { $0 }
        .receive(on: mediaAddingQueue)
        .flatMap { self.addImage($0) },
    addVideoRequestSubject
        .receive(on: mediaAddingQueue)
        .flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

private lazy var imageCompressionQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

private lazy var mediaAddingQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

平面地图发布者是这样看的:

func compressImage(_ image: UIImage) -> Future<Data?, Never> {
    Future { promise in
        DispatchQueue.global().async {
            let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
            promise(Result.success(result))
        }
    }
}

【问题讨论】:

  • @matt 感谢您的回复。我试图推迟我的未来,但仍然只处理了 3 张图像。你的意思是我以不适当的方式切换线程并且应该使用receiveOn?
  • @matt 关于 maxPublishers,那些在 flatMap 操作符加载指定的最大发布者数量时发送的源事件将被忽略。 flatMap 操作完成后,我可以发送新的源事件,它们将被成功处理。这是我能观察到的。但我需要处理所有源事件以满足对 flatMap 运算符的需求。
  • 好吧,也许你需要添加一个缓冲区?
  • @matt 你是对的,缓冲区是我错过的!要添加答案吗?

标签: swift combine


【解决方案1】:

您已经非常漂亮地偶然发现了.buffer 运算符的用例。其目的是通过累积本来会被丢弃的值来补偿.flatMap 背压。

我将通过一个完全人为的例子来说明:

class ViewController: UIViewController {
    let sub = PassthroughSubject<Int,Never>()
    var storage = Set<AnyCancellable>()
    var timer : Timer!
    override func viewDidLoad() {
        super.viewDidLoad()
        sub
            .flatMap(maxPublishers:.max(3)) { i in
                return Just(i)
                    .delay(for: 3, scheduler: DispatchQueue.main)
                    .eraseToAnyPublisher()
            }
            .sink { print($0) }
            .store(in: &storage)
        
        var count = 0
        self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { 
            _ in
            count += 1
            self.sub.send(count)
        }
    }
}

所以,我们的发布者每秒发出一个递增的整数,但我们的 flatMap.max(3) 并且需要 3 秒才能重新发布一个值。结果是我们开始错过值:

1
2
3
5
6
7
9
10
11
...

解决办法是在flatMap前面放一个缓冲区。它需要足够大以保存任何丢失的值足够长的时间以便请求它们:

        sub
            .buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
            .flatMap(maxPublishers:.max(3)) { i in

结果是所有数值实际上都到达了sink。当然,在现实生活中,如果缓冲区不够大,无法补偿来自发布者的价值排放率与来自背压flatMap 的价值排放率之间的差异,我们可能仍然会丢失价值。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-09-01
    • 1970-01-01
    • 2019-02-26
    • 2018-09-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多