* 本文将主要描述Combine的示例代码。
* 在示例中,AnyCancellableDisposeBag 没有描述,因为它们是多余的。注意。

首先

架构如Combine/RxSwift反应式编程如果您使用ModelRepository,您可以使用响应式编程编写连接部分。 (因为拼接容易,代码变漂亮了)

但随着Concurrency 的引入,这种情况即将改变。

这一次,当我用Concurrency替换了一些处理时,就需要从Combine调用Concurrency,所以我试图找到一种方法来整齐地写这个。

任务

在反应式编程中,处理结束拨打Concurrency 没有什么特别的问题。让我们看一个例子。

我准备了一个简单的Concurrency 方法。

。迅速
func calculate(number: Int) async -> Int {
    return number + 2
}

我会打电话给这个。

[示例 1]

。迅速
Just(10)
    .map { $0 / 2 }
    .sink { number in 
        Task {
            let result = await calculate(number: number)
            print(result) // 「7」が出力される
        }
    }
    .store(in: &cancelables)

它运行良好。困扰我的是每次都得写Task,而且会嵌套,所以我想把这个写的整整齐齐。 (稍后会详细介绍。)

[示例 2]

如果在处理过程中处理Concurrency,则会出现问题。

。迅速
Just(10)
    .map { $0 / 2 }
    .handleEvents(receiveOutput: { number in
        Task {
            let result = await task(number: number)
            return result // エラーになる
        }
    })
    .sink { number in 
        print(number)
    }
    .store(in: &cancelables)

不能在中间处理这个过程并传递给后续。
因此,我们将通过隐藏处理来解决这个问题。

执行

RxSwift 已经有一个名为 RxConcurrency 的库,所以我们也将实现 Combine

1. 查看 RxSwift 中的实现

库实现的摘录如下:

。迅速
extension ObservableType {

    static func async(_ handler: @escaping () async throws -> Element) -> Observable<Element> {
        Observable<Element>.create { observer in
            let task = Task {
                do {
                    observer.on(.next(try await handler()))
                    observer.on(.completed)
                } catch {
                    observer.on(.error(error))
                }
            }
            
            return Disposables.create {
                task.cancel()
            }
        }
    }
}

它所做的很简单,它在内部执行Observable create 并包装Coucurrency 处理。

这是它在实际使用中的样子。

。迅速
Observable<Int>
    .async {
        await self.calculate(number: 10)
    }
    .subscribe { number in
        print(number) // 「12」が出力される
    }
    .disposed(by: disposeBag)

这样,您就可以在处理过程中处理Coucurrency

2.在Combine中的实现

我将使用Combine 来实现它。然而,CombineRxSwift 中没有与create 等效的运算符因此,不能像以前那样实施。

如果是Combine的扩展库组合分机如果你使用 ,你有create 操作符,所以你可以用同样的方式扩展它。

- 使用 CombineExt

我们将使用下面的create 运算符对其进行扩展。

现在让我们看一下实现。

。迅速
extension Publishers {
        
    static func async<Output>(_ handler: @escaping () async -> Output) -> AnyPublisher<Output, Never> {
        AnyPublisher<Output, Never>.create { subscriber in
            let task = Task {
                let result = await handler()
                subscriber.send(result)
                subscriber.send(completion: .finished)
            }

            return AnyCancellable {
                task.cancel()
            }
        }
    }
}

可以看到,它的结构和前面给出的RxSwift的扩展码是一样的。

而且,在实际使用中,它看起来是这样的。

。迅速
Publishers
    .async {
        await self.calculate(number: 10)
    }
    .sink { number in
        print(number) // 「12」が出力される
    }
    .store(in: &cancellable)

像这样很容易实现。

这次它使用Publishers(和static func)扩展,但也可以单独扩展每个。例如,以下是Just 的扩展示例。

。迅速
extension Just {
    
    func async<Input, Output>(_ handler: @escaping (Input) async -> Output) -> AnyPublisher<Output, Never> {
        AnyPublisher<Output, Never>.create { subscriber in
            let task = Task {
                let result = await handler(output as! Input) // ※注意
                subscriber.send(result)
                subscriber.send(completion: .finished)
            }

            return AnyCancellable {
                task.cancel()
            }
        }
    }
}

* output属性是从Just流出的值,需要用值类型进行强制转换。

这是它在实际使用中的样子。

。迅速
Just<Int>(10)
    .async { number in
        await self.calculate(number: number)
    }
    .sink { number in
        print(number) // 「12」が出力される
    }
    .store(in: &cancellable)

这样,您可以根据需要准备一些扩展来灵活地实现它。然而,正如开头所说,是一个扩展库组合分机只能使用实现是。

现在,让我们考虑一个不使用扩展库的方法。

- 不使用 CombineExt 时

您必须要么实现前面提到的create 运算符,要么准备一些行为类似的东西。

我应该将create 的代码复制到扩展库CombineExt 中吗?你可能会想,但是依赖项出乎意料的多,所以有必要拉一些代码,如果你自己实现它到最低限度,代码会更少。.

使用flatMapFuture 进行扩展。


(1) 不需要错误处理的扩展
。迅速
extension Publisher {
    
    func asyncMap<V>(
        _ asyncFunction: @escaping (Output) async -> V
    ) -> Publishers.FlatMap<Future<V, Never>, Self> {
        
        flatMap { value in
            Future { promise in
                Task {
                    promise(.success(await asyncFunction(value)))
                }
            }
        }
    }
}

这是它在实际使用中的样子。

。迅速
Just<Int>(20)
    .asyncMap { number in
        await self.calculate(number: number)
    }
    .sink { number in
        print(number) // 「22」が出力される
    }
    .store(in: &cancellable)

(例如Just,但可以使用Publisher。)


(2) 可以处理错误的扩展

如果您想要错误处理,请进一步扩展前一个。

。迅速
extension Publisher {

    func asyncMapWithThrows<V>(
        _ asyncFunction: @escaping (Output) async throws -> V
    ) -> Publishers.FlatMap<Future<V, Error>, Publishers.SetFailureType<Self, Error>> {
        
        flatMap { value in
            Future { promise in
                Task {
                    do {
                        let output = try await asyncFunction(value)
                        promise(.success(output))
                    } catch {
                        promise(.failure(error))
                    }
                }
            }
        }
    }
}

这是它在实际使用中的样子。

。迅速
let subject = PassthroughSubject<(), Never>()

subject
    .asyncMapWithThrows {
        try await APIClient.fetch()
    }
    .sink(receiveCompletion: { result in
        // handle result
    }, receiveValue: { value in
        // handle value
    })
    .store(in: &cancellable)

subject.send(())

Publisher 中间打API 是一种模式。

您将能够像这样无缝地执行Combine -> Concurrency -> Combine


- 补充

顺便说一句,可以将TaskPriority作为参数传递,但是与Combine.subscribe(on: )冲突,所以无法设置。
CombineExt的PR中也提出了类似的观点,由于RxConcurrency被设计成不传参数,所以采用)


3. 额外

减少任务嵌套

困扰我的是每次都得写Task,最后都是嵌套,所以想把这个写的整齐一点。

这是您要消除开头提到的Task 的嵌套的时候。

正如我在开始时写的那样简单地做Combine -> Concurrency

。迅速
Just(10)
    .map { $0 / 2 }
    .sink { number in 
        Task {
            do {
                // do some async task
            } catch {
                // error handling
            }
        }
    }
    .store(in: &cancelables)

结果sinkTask嵌套了一层,在进行错误处理时,do catch变得更深。因此,我们将准备一个包装函数来解决它。


① 当失败永远不会
。迅速
extension Publisher where Self.Failure == Never {

    func asyncSink(
        receiveValue: @escaping ((Self.Output) async -> Void)
    ) -> AnyCancellable {
        self.sink { value in
            Task {
                await receiveValue(value)
            }
        }
    }
}

这是它在实际使用中的样子。

。迅速
Just<Int>(30)
    .asyncSink { number in
        let result = await self.calculate(number: number)
        print(result) // 「32」が出力される
    }
    .store(in: &cancellable)

它非常干净。


(2) 当失败为错误时
。迅速
extension Publisher where Self.Failure == Error {

    func asyncSinkWithThrows(
        receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) async -> Void),
        receiveValue: @escaping ((Self.Output) async -> Void)
    ) -> AnyCancellable {
        sink(receiveCompletion: { result in
            Task { await receiveCompletion(result) }
        }, receiveValue: { value in
            Task { await receiveValue(value) }
        })
    }
}

这是它在实际使用中的样子。

。迅速
let subject = PassthroughSubject<(), Never>()

subject
    .setFailureType(to: Error.self)
    .asyncSinkWithThrows(receiveCompletion: { result in
        // handling result
    }, receiveValue: {
        let response = try await APIClient.fetch()
        // handling response
    })
    .store(in: &cancellable)

subject.send(())

这次是用没有错误的模式完成的,但即使receiveValue 也可以实现Concurrency

在最后

好久不见,我在这里创建了一个实现各种东西的库,所以希望你能轻松使用它!
我们正在等待额外的扩展和公关!

请给星!

其他

附加扩展

RxConcurrency 有其他扩展实现,因此最好参考它们。

参考


原创声明:本文系作者授权爱码网发表,未经许可,不得转载;

原文地址:https://www.likecs.com/show-308632009.html

相关文章: