* 本文将主要描述Combine
的示例代码。
* 在示例中,AnyCancellable
和 DisposeBag
没有描述,因为它们是多余的。注意。
首先
架构如Combine
/RxSwift
反应式编程如果您使用Model
或Repository
,您可以使用响应式编程编写连接部分。 (因为拼接容易,代码变漂亮了)
但随着Concurrency
的引入,这种情况即将改变。
这一次,当我用Concurrency
替换了一些处理时,就需要从Combine
调用Concurrency
,所以我试图找到一种方法来整齐地写这个。
任务
在反应式编程中,处理结束拨打Concurrency
没有什么特别的问题。让我们看一个例子。
我准备了一个简单的Concurrency
方法。
func calculate(number: Int) async -> Int {
return number + 2
}
我会打电话给这个。
[示例 1]
Just(10)
.map { $0 / 2 }
.sink { number in
Task {
let result = await calculate(number: number)
print(result) // 「7」が出力される
}
}
.store(in: &cancelables)
它运行良好。困扰我的是每次都得写Task
,而且会嵌套,所以我想把这个写的整整齐齐。 (稍后会详细介绍。)
[示例 2]
如果在处理过程中处理Concurrency
,则会出现问题。
Just(10)
.map { $0 / 2 }
.handleEvents(receiveOutput: { number in
Task {
let result = await task(number: number)
return result // エラーになる
}
})
.sink { number in
print(number)
}
.store(in: &cancelables)
不能在中间处理这个过程并传递给后续。
因此,我们将通过隐藏处理来解决这个问题。
执行
RxSwift
已经有一个名为 RxConcurrency
的库,所以我们也将实现 Combine
。
1. 查看 RxSwift 中的实现
库实现的摘录如下:
extension ObservableType {
static func async(_ handler: @escaping () async throws -> Element) -> Observable<Element> {
Observable<Element>.create { observer in
let task = Task {
do {
observer.on(.next(try await handler()))
observer.on(.completed)
} catch {
observer.on(.error(error))
}
}
return Disposables.create {
task.cancel()
}
}
}
}
它所做的很简单,它在内部执行Observable
create
并包装Coucurrency
处理。
这是它在实际使用中的样子。
Observable<Int>
.async {
await self.calculate(number: 10)
}
.subscribe { number in
print(number) // 「12」が出力される
}
.disposed(by: disposeBag)
这样,您就可以在处理过程中处理Coucurrency
。
2.在Combine中的实现
我将使用Combine
来实现它。然而,Combine
在RxSwift
中没有与create
等效的运算符因此,不能像以前那样实施。
如果是Combine
的扩展库组合分机如果你使用 ,你有create
操作符,所以你可以用同样的方式扩展它。
- 使用 CombineExt
我们将使用下面的create
运算符对其进行扩展。
现在让我们看一下实现。
extension Publishers {
static func async<Output>(_ handler: @escaping () async -> Output) -> AnyPublisher<Output, Never> {
AnyPublisher<Output, Never>.create { subscriber in
let task = Task {
let result = await handler()
subscriber.send(result)
subscriber.send(completion: .finished)
}
return AnyCancellable {
task.cancel()
}
}
}
}
可以看到,它的结构和前面给出的RxSwift
的扩展码是一样的。
而且,在实际使用中,它看起来是这样的。
Publishers
.async {
await self.calculate(number: 10)
}
.sink { number in
print(number) // 「12」が出力される
}
.store(in: &cancellable)
像这样很容易实现。
这次它使用Publishers
(和static func
)扩展,但也可以单独扩展每个。例如,以下是Just
的扩展示例。
extension Just {
func async<Input, Output>(_ handler: @escaping (Input) async -> Output) -> AnyPublisher<Output, Never> {
AnyPublisher<Output, Never>.create { subscriber in
let task = Task {
let result = await handler(output as! Input) // ※注意
subscriber.send(result)
subscriber.send(completion: .finished)
}
return AnyCancellable {
task.cancel()
}
}
}
}
* output
属性是从Just
流出的值,需要用值类型进行强制转换。
这是它在实际使用中的样子。
Just<Int>(10)
.async { number in
await self.calculate(number: number)
}
.sink { number in
print(number) // 「12」が出力される
}
.store(in: &cancellable)
这样,您可以根据需要准备一些扩展来灵活地实现它。然而,正如开头所说,是一个扩展库组合分机只能使用实现是。
现在,让我们考虑一个不使用扩展库的方法。
- 不使用 CombineExt 时
您必须要么实现前面提到的create
运算符,要么准备一些行为类似的东西。
我应该将create
的代码复制到扩展库CombineExt
中吗?你可能会想,但是依赖项出乎意料的多,所以有必要拉一些代码,如果你自己实现它到最低限度,代码会更少。.
使用flatMap
和Future
进行扩展。
(1) 不需要错误处理的扩展
extension Publisher {
func asyncMap<V>(
_ asyncFunction: @escaping (Output) async -> V
) -> Publishers.FlatMap<Future<V, Never>, Self> {
flatMap { value in
Future { promise in
Task {
promise(.success(await asyncFunction(value)))
}
}
}
}
}
这是它在实际使用中的样子。
Just<Int>(20)
.asyncMap { number in
await self.calculate(number: number)
}
.sink { number in
print(number) // 「22」が出力される
}
.store(in: &cancellable)
(例如Just
,但可以使用Publisher
。)
(2) 可以处理错误的扩展
如果您想要错误处理,请进一步扩展前一个。
extension Publisher {
func asyncMapWithThrows<V>(
_ asyncFunction: @escaping (Output) async throws -> V
) -> Publishers.FlatMap<Future<V, Error>, Publishers.SetFailureType<Self, Error>> {
flatMap { value in
Future { promise in
Task {
do {
let output = try await asyncFunction(value)
promise(.success(output))
} catch {
promise(.failure(error))
}
}
}
}
}
}
这是它在实际使用中的样子。
let subject = PassthroughSubject<(), Never>()
subject
.asyncMapWithThrows {
try await APIClient.fetch()
}
.sink(receiveCompletion: { result in
// handle result
}, receiveValue: { value in
// handle value
})
.store(in: &cancellable)
subject.send(())
在Publisher
中间打API 是一种模式。
您将能够像这样无缝地执行Combine
-> Concurrency
-> Combine
。
- 补充
顺便说一句,可以将TaskPriority
作为参数传递,但是与Combine
的.subscribe(on: )
冲突,所以无法设置。
(CombineExt
的PR中也提出了类似的观点,由于RxConcurrency
被设计成不传参数,所以采用)
3. 额外
减少任务嵌套
困扰我的是每次都得写
Task
,最后都是嵌套,所以想把这个写的整齐一点。这是您要消除开头提到的
Task
的嵌套的时候。正如我在开始时写的那样简单地做
Combine
->Concurrency
,。迅速Just(10) .map { $0 / 2 } .sink { number in Task { do { // do some async task } catch { // error handling } } } .store(in: &cancelables)
结果
sink
和Task
嵌套了一层,在进行错误处理时,do catch
变得更深。因此,我们将准备一个包装函数来解决它。① 当失败永远不会
。迅速extension Publisher where Self.Failure == Never { func asyncSink( receiveValue: @escaping ((Self.Output) async -> Void) ) -> AnyCancellable { self.sink { value in Task { await receiveValue(value) } } } }
这是它在实际使用中的样子。
。迅速Just<Int>(30) .asyncSink { number in let result = await self.calculate(number: number) print(result) // 「32」が出力される } .store(in: &cancellable)
它非常干净。
(2) 当失败为错误时
。迅速extension Publisher where Self.Failure == Error { func asyncSinkWithThrows( receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) async -> Void), receiveValue: @escaping ((Self.Output) async -> Void) ) -> AnyCancellable { sink(receiveCompletion: { result in Task { await receiveCompletion(result) } }, receiveValue: { value in Task { await receiveValue(value) } }) } }
这是它在实际使用中的样子。
。迅速let subject = PassthroughSubject<(), Never>() subject .setFailureType(to: Error.self) .asyncSinkWithThrows(receiveCompletion: { result in // handling result }, receiveValue: { let response = try await APIClient.fetch() // handling response }) .store(in: &cancellable) subject.send(())
这次是用没有错误的模式完成的,但即使
receiveValue
也可以实现Concurrency
。在最后
好久不见,我在这里创建了一个实现各种东西的库,所以希望你能轻松使用它!
我们正在等待额外的扩展和公关!
请给星!
其他
附加扩展
RxConcurrency
有其他扩展实现,因此最好参考它们。
参考
原创声明:本文系作者授权爱码网发表,未经许可,不得转载;
原文地址:https://www.likecs.com/show-308632009.html