【发布时间】:2020-04-17 12:55:43
【问题描述】:
我正在尝试在 Apple 的 Combine 框架中创建流的块。
我想要的是这样的:
Stream a:
--1-2-3-----4-5--->
Stream b:
--------0-------0->
a.chunk(whenOutputFrom: b)
-------[1, 2, 3]---[4, 5]-->
这可以在 Combine 中实现吗?
【问题讨论】:
我正在尝试在 Apple 的 Combine 框架中创建流的块。
我想要的是这样的:
Stream a:
--1-2-3-----4-5--->
Stream b:
--------0-------0->
a.chunk(whenOutputFrom: b)
-------[1, 2, 3]---[4, 5]-->
这可以在 Combine 中实现吗?
【问题讨论】:
您正在寻找的是 ReactiveX 世界中的 buffer 运算符。
Combine 中没有内置 buffer 运算符(在 ReactiveX 意义上)。内置的buffer 似乎更像是ReactiveX 中的bufferCount。
我找到了 Daniel T 的 this answer,它在 RxSwift 中重新创建了 buffer 运算符,还有 this cheatsheet,它告诉你如何将 RxSwift 移植到 Combine。
但是,Daniel T 的答案使用 Observable.create,这在 Combine 中不可用。我看了更深一点,发现this other answer,它在Combine 中重新创建了Observable.create。
结合我发现的三件事(不是双关语),这就是我想出的:
// -------------------------------------------------
// from https://stackoverflow.com/a/61035663/5133585
struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onCompleted: (() -> Void)
}
struct Disposable {
let dispose: () -> Void
}
extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onCompleted: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}
// -------------------------------------------------
// -------------------------------------------------
// adapted from https://stackoverflow.com/a/43413167/5133585
extension Publisher {
/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
return AnyPublisher.create { observer in
var buffer: [Output] = []
let lock = NSRecursiveLock()
let boundaryDisposable = boundary.sink(receiveCompletion: {
_ in
}, receiveValue: {_ in
lock.lock(); defer { lock.unlock() }
observer.onNext(buffer)
buffer = []
})
let disposable = self.sink(receiveCompletion: { (event) in
lock.lock(); defer { lock.unlock() }
switch event {
case .finished:
observer.onNext(buffer)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
buffer = []
}
}) { (element) in
lock.lock(); defer { lock.unlock() }
buffer.append(element)
}
return Disposable {
disposable.cancel()
boundaryDisposable.cancel()
}
}
}
}
// -------------------------------------------------
【讨论】:
create 函数的版本:gist.github.com/xavierLowmiller/…
我想你会对Combine collect() 方法感兴趣。 它也有变化,例如时间、计数或两者兼而有之。
.collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
我们传递上下文的地方 -> 例如全局队列 等待它的时间,例如上面示例中的 1s 以及 10 个元素的数量。
用例看起来像这样:
let bufferSubject = PassthroughSubject<Int, Never>()
let cancelBag = Set<AnyCancellable>()
let subscriber = bufferSubject.eraseToAnyPublisher()
.collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
.sink { value in
print("?? value: \(value)")
}
.store(in: &cancelBag)
一定要测试一下 :)
bufferSubject.send(1)
bufferSubject.send(2)
bufferSubject.send(3)
...
DispatchQueue.asyncAfter(...) {
bufferSubject.send(4)
bufferSubject.send(5)
bufferSubject.send(6)
}
【讨论】: