【问题标题】:Signal: Collect values over time interval信号:收集时间间隔内的值
【发布时间】:2017-03-09 11:20:29
【问题描述】:

这可能是一个微不足道的问题,但我无法为这个看似简单的任务找到解决方案。由于我是 ReactiveSwift 和响应式编程的新手,我可能会错过一些明显的东西。

基本上我想做的是这样的:

signal.collect(timeInterval: .seconds(5))

我想从信号中收集特定时间段内的所有值。生成的信号将每 x 秒产生一个事件,其中包含从第一个信号收集的事件数组。

在 ReactiveSwift 中最好的方法是什么?

【问题讨论】:

    标签: swift swift3 reactive-cocoa reactive-swift


    【解决方案1】:

    ReactiveSwift 中没有用于此任务的内置运算符。相反,您可以使用以下方法,编写扩展:

    import Foundation
    import ReactiveSwift
    import Result
    public extension Signal {
        public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
            let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
            onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
                pipe.1.sendCompleted()
            }
            return Signal { observer in
                return self.observe { event in
                    switch event {
                    case let .value(value):
                        observer.send(value: value)
                    case .completed:
                        observer.sendCompleted()
                    case let .failed(error):
                        observer.send(error: error)
                    case .interrupted:
                        observer.sendInterrupted()
                    }
                }
            }.take(until: pipe.0)
        }
    
        public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
            return self.completeAfter(after: until).collect()
        }
    }
    

    然后使用signal.collectUntil(5)方法。

    另一种方法是使用 ReactiveSwift 中的timer 函数。示例(添加到相同的扩展,如上):

    public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
        var signal: Signal<(), NoError>? = nil
        timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
            signal = innerSignal.map { _ in () }.take(first: 1)
        }
        return self.take(until: signal!).collect()
    }
    

    但是,我不喜欢这种方法,因为它是 SignalProducer 类型提取内部信号的伪装。

    Signal 类型本身也有timeout 功能,但是由于它会产生错误,因此很难使用它。如何使用它的示例(仍然,添加到相同的扩展):

    public func completeOnError() -> Signal<Value, Error> {
        return Signal { observer in
            return self.observe { event in
                switch(event) {
                case .value(let v): observer.send(value: v)
                case .failed(_): observer.sendCompleted()
                case .interrupted: observer.sendInterrupted()
                case .completed: observer.sendCompleted()
                }
            }
        }
    }
    
    public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
        return self
            .timeout(after: until,
                     raising: NSError() as! Error,
                     on: QueueScheduler())
            .completeOnError()
            .collect()
    }
    

    P.S.通过选择 3 个选项中的任何一个,请注意传递正确的调度程序或使用正确的调度程序参数化您的解决方案。

    【讨论】:

    • 我已将您的答案标记为正确,尽管这不是我想要的,但您已为我指明了正确的方向。我添加了我自己的答案,其中包括我编写的解决我的问题的扩展。
    【解决方案2】:

    基于answer by Petro Korienev(很遗憾这不是我想要的),我创建了一个扩展来解决我的问题。该扩展遵循 ReactiveSwift collect 函数的结构,以尽可能接近 ReactiveSwift 的意图。

    它将收集给定timeInterval 上的所有发送值,然后将它们作为数组发送。在终止事件时,它还将发送剩余值(如果有)。

    extension Signal {
        func collect(timeInterval: DispatchTimeInterval,
                     on scheduler: QueueScheduler = QueueScheduler()) -> Signal<[Value], Error> {
            return Signal<[Value], Error> { observer in
                var values: [Value] = []
                let sendAction: () -> Void = {
                    observer.send(value: values)
    
                    values.removeAll(keepingCapacity: true)
                }
                let disposable = CompositeDisposable()
                let scheduleDisposable = scheduler.schedule(
                        after: Date(timeInterval: timeInterval.timeInterval, since: scheduler.currentDate),
                        interval: timeInterval,
                        action: sendAction
                )
    
                disposable += scheduleDisposable
                disposable += self.observe { (event: Event<Value, Error>) in
                    if event.isTerminating {
                        if !values.isEmpty {
                            sendAction()
                        }
    
                        scheduleDisposable?.dispose()
                    }
    
                    switch event {
                    case let .value(value):
                        values.append(value)
                    case .completed:
                        observer.sendCompleted()
                    case let .failed(error):
                        observer.send(error: error)
                    case .interrupted:
                        observer.sendInterrupted()
                    }
                }
    
                return disposable
            }
        }
    }
    
    extension SignalProducer {
        func collect(timeInterval: DispatchTimeInterval,
                     on scheduler: QueueScheduler = QueueScheduler()) -> SignalProducer<[Value], Error> {
            return lift { (signal: ProducedSignal) in
                signal.collect(timeInterval: timeInterval, on: scheduler)
            }
        }
    }
    
    extension DispatchTimeInterval {
        var timeInterval: TimeInterval {
            switch self {
            case let .seconds(s):
                return TimeInterval(s)
            case let .milliseconds(ms):
                return TimeInterval(TimeInterval(ms) / 1000.0)
            case let .microseconds(us):
                return TimeInterval(UInt64(us) * NSEC_PER_USEC) / TimeInterval(NSEC_PER_SEC)
            case let .nanoseconds(ns):
                return TimeInterval(ns) / TimeInterval(NSEC_PER_SEC)
            }
        }
    }
    

    【讨论】:

    • 现在我明白了你的意思,在我看来,你想要达到的目标更接近于“抽样”方法。我已经创建了一个简短的要点,probalby 也可能对您有所帮助gist.github.com/soxjke/f9271d452292af67cfc4bb83593588d3
    • @PetroKorienev,我从您的 Gist 和 Zeekers 中尝试了您的两种方法。但是,如果事件在发送集合或样本的时间前后间隔很短,则两者都会让事件下降。有什么想法可以缓解这个问题吗?
    • 我超前了。问题出在完全不同的地方,看起来数据在收集过程中丢失了
    猜你喜欢
    • 2018-06-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多