【问题标题】:RxSwift - Queue observables of different type from SingletonRxSwift - 来自 Singleton 的不同类型的队列 observables
【发布时间】:2018-05-17 10:55:16
【问题描述】:

我有一个带有一些公共可用功能的单例类(实际上它会进行网络抓取,但在这里进行了简化)。这些函数都返回一个Single<T>,但类型不同。

它可能看起来像这样:

class Singleton {
    static let shared = Singleton()
    private init() { }

    func doSomethingInt() -> Single<Int> {
        return Single.just(1)
            .delay(3, scheduler: MainScheduler.instance)
    }

    func doSomethingString() -> Single<String> {
        return Single.just("Wow")
            .delay(3, scheduler: MainScheduler.instance)
    }
}

当有人调用Singleton.shared.doSomthingInt() 时,该函数应该放在一个队列中,直到它通过队列才执行。队列中的下一个 observable 不应该在它完成之前开始执行。理想情况下,Singleton 会有一个函数,它会延迟传递给它的每个函数的执行。像这样的:

private func placeInQueue<T: Any>(operation: Single<T>) -> Single<T> {
    // place in some magic shared queue
    return operation
}

然后我可以将这个函数链接到应该放在队列中的函数的开头,如下所示:

func doSomethingString() -> Single<String> {
    let operation = Single.just("Wow")
        .delay(3, scheduler: MainScheduler.instance)
    return placeInQueue(operation)
}

我觉得这应该可以通过concat 操作以某种方式实现,但我还没有能够解决它。

有什么线索吗?

【问题讨论】:

    标签: swift queue rx-swift


    【解决方案1】:

    我已经创建了这个类,它似乎正在工作:)

    也许这是它运行的调度程序的问题。至少,如果我在队列中添加一些操作,每个操作在MainScheduler 上都有 3 秒的延迟,我可以看到一些排队的操作最终在前一个操作之后或多或少 3.5~4 秒后完成已完成。不过对我来说这不是什么大问题:)

    class ObservableQueue {
        init() { }
    
        private var queueArray = [(operation: Observable<Void>, id: Double)]()
    
        /// Adding the `operation` to an internal queue. Starts execution of the `operation` when all previous operations in the queue had sendt an stop event.
        func placeInQueue<T: Any>(_ operation: Single<T>) -> Single<T> {
            let operationId = createId()
            let queuedOperation = currentQueue()
                .flatMap { _ -> Single<T> in
                    return operation
                }
                .do(
                    onNext: { [weak self] _ in self?.removeFromQueue(id: operationId) },
                    onError: { [weak self] _ in self?.removeFromQueue(id: operationId)
                })
            let queueableOperation = operation
                .map { _ in return () }
                .asObservable()
                .catchErrorJustReturn(())
            addToQueue(queueableOperation, id: operationId)
            return queuedOperation
        }
    
        private func createId() -> Double {
            var operationId: Double = Date().timeIntervalSince1970
            while (queueArray.contains { $0.id == operationId }) {
                operationId = Date().timeIntervalSince1970
            }
            return operationId
        }
    
        private func currentQueue() -> Single<Void> {
            var queue = queueArray.map{ $0.operation }
            if queue.isEmpty {
                queue = [Observable.just(())]
            }
            return Observable.concat(queue).takeLast(1).asSingle()
        }
    
        private func addToQueue(_ operation: Observable<Void>, id: Double) {
            queueArray.append((operation: operation, id: id))
        }
    
        private func removeFromQueue(id: Double) {
            guard let index = (queueArray.index { $0.id == id }) else { return }
            queueArray.remove(at: index)
        }
    }
    

    我是这样使用它的:

    private let queue = ObservableQueue()
    
    func doSomethingInt() -> Single<Int> {
        let operation = Single.just(1)
            .delay(3, scheduler: MainScheduler.instance)
        return queue.placeInQueue(operation)
    }
    

    我希望这对某人有帮助:) 请随时发布对此解决方案的疑虑,或者如果您有任何更好的解决方案。

    【讨论】:

      猜你喜欢
      • 2016-12-27
      • 2016-03-09
      • 2016-06-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-07
      • 1970-01-01
      相关资源
      最近更新 更多