【问题标题】:Combine`s subscribe(on:options:) operatorCombine`s subscribe(on:options:) 操作符
【发布时间】:2021-04-27 06:24:06
【问题描述】:

我对@9​​87654321@ 运算符有疑问。如果有人能帮我弄清楚,我将不胜感激。

所以我们从文档中得到了什么:

指定执行订阅、取消和请求操作的调度程序。 与影响下游消息的receive(on:options:)相比,subscribe(on:options:)改变了上游消息的执行上下文。

另外,我从不同的文章中得到的是,除非我们明确指定Scheduler 来接收我们的下游消息(使用receive(on:options:)),否则消息将在用于接收订阅的Scheduler 上发送。

此信息与我在执行期间实际获得的信息不一致。

我有下一个代码:

Just("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

我期待下一个输出:

Map: false
Sink: false

但我得到的是:

Map: true
Sink: false

当我使用Sequence publisher 时也会发生同样的事情。

如果我交换map 运算符和subscribe 运算符的位置,我会收到我想要的:

Just("Some text")
    .subscribe(on: DispatchQueue.global())
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出:

Map: false
Sink: false

有趣的事实是,当我使用与我的自定义发布者的第一个列表相同的运算符顺序时,我收到了我想要的行为:

struct TestJust<Output>: Publisher {
    typealias Failure = Never
    
    private let value: Output
    
    init(_ output: Output) {
        self.value = output
    }
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(value)
        subscriber.receive(completion: .finished)
    }
}

TestJust("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出:

Map: false
Sink: false

所以我认为要么是我对所有这些机制完全误解,要么是某些发布者故意选择线程来发布值(JustSequence -> MainURLSession.DataTaskPublisher -> Some of Background) ,这对我来说没有意义,因为在这种情况下我们为什么需要这个subscribe(on:options:)

你能帮我理解我错过了什么吗?提前谢谢你。

【问题讨论】:

    标签: swift combine frp


    【解决方案1】:

    首先要了解的是消息既向上 流向管道,也向下 流向管道。流向上管道(“上游”)的消息是:

    • 订阅的实际表现(接收订阅)

    • 订阅者向上游发布者请求新值

    • 取消消息(这些消息从最终订阅者向上渗透)

    流向管道(“下游”)的消息是:

    • 价值观

    • 完成,包括失败(错误)或按顺序完成(报告发布者发出了最后一个值)

    好的,正如文档明确指出的那样,subscribe(on:) 是关于前者:上游的消息。但是您实际上并没有在您的测试中跟踪 任何 那些 消息,因此您的任何结果都不会反映有关它们的任何信息!在订阅点上方插入一个适当的 handleEvents 运算符,以查看内容沿管道向上流动(例如,实现其 receiveRequest: 参数):

    Just("Some text")
        .handleEvents(receiveRequest: {
            _ in print("Handle1: \(Thread.isMainThread)")
        })
        .map // etc.
    

    同时,您应该对消息将在其上流动下游的线程做出假设(即值和完成)。你说:

    另外,我从不同的文章中得到的是,除非我们明确指定调度程序来接收我们的下游消息(使用receive(on:options:)),否则消息将在用于接收订阅的调度程序上发送。

    但这似乎是一个虚假的假设。您的代码没有任何内容可以明确地确定下游发送线程。正如您所说的那样,您可以使用receive(on:) 来控制它,但如果您不这样做,我会说您必须对此一无所知。一些发布者确实会在后台线程上产生一个值,例如数据任务发布者,这是完全合理的(数据任务完成处理程序也会发生同样的事情)。其他人没有。

    可以假设除receive(on:) 之外的运算符通常不会更改值传递线程。但是运营商是否以及如何使用订阅线程来确定接收线程,这是你不应该假设的。要控制接收线程,请控制它!致电receive(on:) 或不承担任何责任。

    举个例子,如果你把你的开口改为

    Just("Some text")
        .receive(on: DispatchQueue.main)
    

    那么您的map 和您的sink 都会报告他们正在主线程上接收值。为什么?因为你控制了接收线程。无论您在任何subscribe(on:) 命令中说什么,这都有效。它们是完全不同的事情。

    也许如果你调用subscribe(on:)但你不调用receive(on:),关于下游发送线程的一些事情是由subscribe(on:)线程决定的,但我肯定不会依赖于有任何困难和关于它的快速规则;文档中没有这样说!相反,不要那样做。如果您实施subscribe(on:),也实施receive(on:),以便负责发生的事情。

    【讨论】:

    • 感谢您的详细解答!我的主要误解是理解这个“上游”流实际上是什么。我在想上游是每个运算符都放在subscribe(on:) 之前(或之上)。
    • 我会重写我的答案,然后从那个误解开始!
    • 我希望我的重写是有用的。我想在一个问得很好的问题上称赞你:非常好的例子。在提出问题之前,您已经完成了功课。
    • 谢谢。所以.map 不是上游运营商。会不会是下游运营商?因为它在闭包中转换了值。这意味着它通过receive(on:) 中继来控制其线程。