【问题标题】:Cancel a nested Combine publisher取消嵌套的组合发布者
【发布时间】:2023-10-12 00:54:01
【问题描述】:

我正在尝试将我的应用切换为使用组合管道。我希望简化线程管理,但却让自己陷入了 Combine 的意外行为。

我假设尽管我订阅了 DispatchQueue.global(),但取消主管道会取消嵌套订阅。

这是我的游乐场:

import Cocoa
import Combine

let folders = ["folder1", "folder2", "folder3", "folder4"]

class OneByOnePublisher: Publisher {
    typealias Output = String
    typealias Failure = Never
    
    let input: [String]
    init(input: [String]) {
        self.input = input
    }
  
    func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
        let subject = PassthroughSubject<String, Never>()
        subject.receive(subscriber: subscriber)
        for value in input {
            subject.send(value)
        }
        subject.send(completion: .finished)
    }
}

func uppercase(_ character: Character) -> String {
    print("Uppercasing \(character)")
    Thread.sleep(forTimeInterval: 0.5)
    return character.uppercased()
}

func uppercasePublisher(_ folder: String) -> AnyPublisher<String, Never> {
    return folder.publisher
//        .handleEvents(receiveCancel: { print("Received cancel in nested") })
        .map{uppercase($0)}
        .collect()
        .map{$0.joined()}
        .eraseToAnyPublisher()
}


let stringPublisher = PassthroughSubject<String, Never>()
let oneByOnePublisher = OneByOnePublisher(input: folders)


let cancelable = oneByOnePublisher
    .subscribe(on: DispatchQueue.global())
    .handleEvents(receiveCancel: { print("Received cancel in main") })
    .flatMap{uppercasePublisher($0)}
    .receive(on: DispatchQueue.main)
    .sink { (completion) in
        print("Received completion: \(completion)")
    } receiveValue: { (value) in
        print("Received value: \(value)")
    }

Thread.sleep(forTimeInterval: 2)
cancelable.cancel()
Thread.sleep(forTimeInterval: 2)
print("Done")

那个输出

Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in main
Uppercasing e
Uppercasing r
Uppercasing 1
Done

但是,如果我取消注释该行

//        .handleEvents(receiveCancel: { print("Received cancel in nested") })

那么输出就是我一开始所期望的

Uppercasing f
Uppercasing o
Uppercasing l
Uppercasing d
Received cancel in nested
Received cancel in main
Done

我错过了什么?为什么在第一种情况下嵌套订阅不会立即取消?为什么添加 handleEvents() 会改变取消流程?

【问题讨论】:

  • “立即”在这里变得混乱,因为您正在订阅并发队列。
  • 不相关,但您实际上不需要创建OneByOnePublisher 发布者...只需执行folders.publisher 即可获得相同的结果。此外,您不需要uppercasePublisher - 您可以通过简单的.map { uppercase($0) } 来实现这一点
  • 是的,我创建 OneByOnePublisher 只是作为示例。它与实际设计非常相似,它有一个嵌套的发布者,并且所有内容都在全局队列中运行。
  • 对于即时,我可以不立即停止,但它仍然应该在中途取消嵌套发布者。由于某种原因,这没有发生,嵌套发布者一直在发送数据。
  • 那为什么我有handleEvents()时它会及时到达?从调试输出中我可以看到,在后一种情况下,嵌套发布者停止发出值。

标签: swift combine


【解决方案1】:

Publishers.Sequence 发布者可能存在错误,这是由folder.publisher 调用产生的错误。添加handleEvents 调用会隐藏此错误,因为调用会导致包装发布者似乎可以正确处理取消。

为了检验这个理论,让我们调整您的OneByOnePublisher 以适用于任何类型的序列(这也将使其适用于原始字符串数组):

class OneByOnePublisher<Seq: Sequence>: Publisher {
    typealias Output = Seq.Element
    typealias Failure = Never
    
    let input: Seq
    init(input: Seq) {
        self.input = input
    }
  
    func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
        let subject = PassthroughSubject<Output, Failure>()
        subject.receive(subscriber: subscriber)
        for value in input {
            subject.send(value)
        }
        subject.send(completion: .finished)
    }
}

extension Sequence {
    var oneByOne: OneByOnePublisher<Self> { OneByOnePublisher(input: self) }
}

现在如果我们更改 uppercasePublisher 以使用增强版发布者

func uppercasePublisher(_ folder: String) -> AnyPublisher<String, Never> {
    return folder.oneByOne
        // .handleEvents(receiveCancel: { NSLog("Received cancel in nested") })
        .map{uppercase($0)}
        .collect()
        .map{$0.joined()}
        .eraseToAnyPublisher()
}

我们可以看到,无论handleEvents 行是否被注释,取消都按预期工作。这表明原来的 folder.publisher 行是问题的根源,更具体的说 - 它是有问题的 Publishers.Sequence 实例。

【讨论】:

  • 准确!感谢您解开这个谜题。我不确定究竟是什么错误,但实际上不仅是 .handleEvents() 避免了错误,而且 print() 也是如此。无论如何,再次感谢!
  • @JkMu 你指的是哪个print 电话?如果它是来自 receiveCancel 闭包的那个,那么这不是游戏规则改变者,您可以简单地 .handleEvents() 不带参数并看到相同的行为。
  • 我的意思是如果你把.handleEvents()换成.print(),取消也是一样的效果。