【问题标题】:Swift Combine Chunk OperatorSwift 组合块运算符
【发布时间】:2020-04-17 12:55:43
【问题描述】:

我正在尝试在 Apple 的 Combine 框架中创建流的块。

我想要的是这样的:

Stream a:
--1-2-3-----4-5--->

Stream b:
--------0-------0->

a.chunk(whenOutputFrom: b)

-------[1, 2, 3]---[4, 5]-->

这可以在 Combine 中实现吗?

【问题讨论】:

    标签: swift combine


    【解决方案1】:

    您正在寻找的是 ReactiveX 世界中的 buffer 运算符。

    Combine 中没有内置 buffer 运算符(在 ReactiveX 意义上)。内置的buffer 似乎更像是ReactiveX 中的bufferCount

    我找到了 Daniel T 的 this answer,它在 RxSwift 中重新创建了 buffer 运算符,还有 this cheatsheet,它告诉你如何将 RxSwift 移植到 Combine。

    但是,Daniel T 的答案使用 Observable.create,这在 Combine 中不可用。我看了更深一点,发现this other answer,它在Combine 中重新创建了Observable.create

    结合我发现的三件事(不是双关语),这就是我想出的:

    // -------------------------------------------------
    // from https://stackoverflow.com/a/61035663/5133585
    struct AnyObserver<Output, Failure: Error> {
        let onNext: ((Output) -> Void)
        let onError: ((Failure) -> Void)
        let onCompleted: (() -> Void)
    }
    
    struct Disposable {
        let dispose: () -> Void
    }
    
    extension AnyPublisher {
        static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
            let subject = PassthroughSubject<Output, Failure>()
            var disposable: Disposable?
            return subject
                .handleEvents(receiveSubscription: { subscription in
                    disposable = subscribe(AnyObserver(
                        onNext: { output in subject.send(output) },
                        onError: { failure in subject.send(completion: .failure(failure)) },
                        onCompleted: { subject.send(completion: .finished) }
                    ))
                }, receiveCancel: { disposable?.dispose() })
                .eraseToAnyPublisher()
        }
    }
    // -------------------------------------------------  
    
    // -------------------------------------------------
    // adapted from https://stackoverflow.com/a/43413167/5133585
    extension Publisher {
    
        /// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
        func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
            return AnyPublisher.create { observer in
                var buffer: [Output] = []
                let lock = NSRecursiveLock()
                let boundaryDisposable = boundary.sink(receiveCompletion: {
                    _ in
                }, receiveValue: {_ in
                    lock.lock(); defer { lock.unlock() }
                    observer.onNext(buffer)
                    buffer = []
                })
                let disposable = self.sink(receiveCompletion: { (event) in
                    lock.lock(); defer { lock.unlock() }
                    switch event {
                    case .finished:
                        observer.onNext(buffer)
                        observer.onCompleted()
                    case .failure(let error):
                        observer.onError(error)
                        buffer = []
                    }
                }) { (element) in
                    lock.lock(); defer { lock.unlock() }
                    buffer.append(element)
                }
                return Disposable {
                    disposable.cancel()
                    boundaryDisposable.cancel()
                }
            }
        }
    }
    // -------------------------------------------------
    

    【讨论】:

    【解决方案2】:

    我想你会对Combine collect() 方法感兴趣。 它也有变化,例如时间、计数或两者兼而有之。

    .collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
    

    我们传递上下文的地方 -> 例如全局队列 等待它的时间,例如上面示例中的 1s 以及 10 个元素的数量。

    用例看起来像这样:

    let bufferSubject = PassthroughSubject<Int, Never>()
    let cancelBag = Set<AnyCancellable>()
    
    let subscriber = bufferSubject.eraseToAnyPublisher()
      .collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
      .sink { value in
          print("?? value: \(value)")
      }
      .store(in: &cancelBag)
    

    一定要测试一下 :)

    bufferSubject.send(1)
    bufferSubject.send(2)
    bufferSubject.send(3)
    
    ...
    
    DispatchQueue.asyncAfter(...) {
      bufferSubject.send(4)
      bufferSubject.send(5)
      bufferSubject.send(6)
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-10-24
      • 2015-03-31
      • 1970-01-01
      • 2020-06-20
      • 1970-01-01
      • 2022-07-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多