【发布时间】:2021-04-09 05:01:35
【问题描述】:
如果我指定 maxPublishers 参数,那么第一个 maxPublishers 事件之后的源事件将不会被平面映射。虽然我只想限制并发。也就是在一些第一批 maxPublishers 平面地图发布者完成后继续处理下一个事件。
Publishers.Merge(
addImageRequestSubject
.flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
.compactMap { $0 }
.flatMap(maxPublishers: .max(3)) { self.addImage($0) },
addVideoRequestSubject
.flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)
我还尝试在 OperationQueue 的帮助下限制并发。但是maxConcurrentOperationCount好像没有效果。
Publishers.Merge(
addImageRequestSubject
.receive(on: imageCompressionQueue)
.flatMap { self.compressImage($0) }
.compactMap { $0 }
.receive(on: mediaAddingQueue)
.flatMap { self.addImage($0) },
addVideoRequestSubject
.receive(on: mediaAddingQueue)
.flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)
private lazy var imageCompressionQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
return queue
}()
private lazy var mediaAddingQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
return queue
}()
平面地图发布者是这样看的:
func compressImage(_ image: UIImage) -> Future<Data?, Never> {
Future { promise in
DispatchQueue.global().async {
let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
promise(Result.success(result))
}
}
}
【问题讨论】:
-
@matt 感谢您的回复。我试图推迟我的未来,但仍然只处理了 3 张图像。你的意思是我以不适当的方式切换线程并且应该使用receiveOn?
-
@matt 关于 maxPublishers,那些在 flatMap 操作符加载指定的最大发布者数量时发送的源事件将被忽略。 flatMap 操作完成后,我可以发送新的源事件,它们将被成功处理。这是我能观察到的。但我需要处理所有源事件以满足对 flatMap 运算符的需求。
-
好吧,也许你需要添加一个缓冲区?
-
@matt 你是对的,缓冲区是我错过的!要添加答案吗?