【发布时间】:2023-10-12 00:54:01
【问题描述】:
我正在尝试将我的应用切换为使用组合管道。我希望简化线程管理,但却让自己陷入了 Combine 的意外行为。
我假设尽管我订阅了 DispatchQueue.global(),但取消主管道会取消嵌套订阅。
这是我的游乐场:
import Cocoa
import Combine
let folders = ["folder1", "folder2", "folder3", "folder4"]
class OneByOnePublisher: Publisher {
typealias Output = String
typealias Failure = Never
let input: [String]
init(input: [String]) {
self.input = input
}
func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
let subject = PassthroughSubject<String, Never>()
subject.receive(subscriber: subscriber)
for value in input {
subject.send(value)
}
subject.send(completion: .finished)
}
}
func uppercase(_ character: Character) -> String {
print("Uppercasing \(character)")
Thread.sleep(forTimeInterval: 0.5)
return character.uppercased()
}
func uppercasePublisher(_ folder: String) -> AnyPublisher<String, Never> {
return folder.publisher
// .handleEvents(receiveCancel: { print("Received cancel in nested") })
.map{uppercase($0)}
.collect()
.map{$0.joined()}
.eraseToAnyPublisher()
}
let stringPublisher = PassthroughSubject<String, Never>()
let oneByOnePublisher = OneByOnePublisher(input: folders)
let cancelable = oneByOnePublisher
.subscribe(on: DispatchQueue.global())
.handleEvents(receiveCancel: { print("Received cancel in main") })
.flatMap{uppercasePublisher($0)}
.receive(on: DispatchQueue.main)
.sink { (completion) in
print("Received completion: \(completion)")
} receiveValue: { (value) in
print("Received value: \(value)")
}
Thread.sleep(forTimeInterval: 2)
cancelable.cancel()
Thread.sleep(forTimeInterval: 2)
print("Done")
那个输出
Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in main
Uppercasing e
Uppercasing r
Uppercasing 1
Done
但是,如果我取消注释该行
// .handleEvents(receiveCancel: { print("Received cancel in nested") })
那么输出就是我一开始所期望的
Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in nested
Received cancel in main
Done
我错过了什么?为什么在第一种情况下嵌套订阅不会立即取消?为什么添加 handleEvents() 会改变取消流程?
【问题讨论】:
-
“立即”在这里变得混乱,因为您正在订阅并发队列。
-
不相关,但您实际上不需要创建
OneByOnePublisher发布者...只需执行folders.publisher即可获得相同的结果。此外,您不需要uppercasePublisher- 您可以通过简单的.map { uppercase($0) }来实现这一点 -
是的,我创建 OneByOnePublisher 只是作为示例。它与实际设计非常相似,它有一个嵌套的发布者,并且所有内容都在全局队列中运行。
-
对于即时,我可以不立即停止,但它仍然应该在中途取消嵌套发布者。由于某种原因,这没有发生,嵌套发布者一直在发送数据。
-
那为什么我有handleEvents()时它会及时到达?从调试输出中我可以看到,在后一种情况下,嵌套发布者停止发出值。