【问题标题】:Executing Combine Publishers in parallel creates race condition并行执行合并发布者会创建竞争条件
【发布时间】:2021-12-01 08:01:28
【问题描述】:

我正在尝试在 HKWorkoutEvent 定义的时间间隔内查询 HealthKit 的心率值和步数,以填充我定义的用于存储多个变量的自定义本地模型,它定义如下。

struct SGWorkoutEvent: Identifiable {
    let id = UUID()
    let type: HKWorkoutEventType
    let splitActiveDurationQuantity: HKQuantity?
    let splitDistanceQuantity: HKQuantity?
    let totalDistanceQuantity: HKQuantity?
    let splitMeasuringSystem: HKUnit

    let steps: HKQuantity?
    let heartRate: HKQuantity?
}

stepsheartRate 之外的所有属性都可以从HKWorkoutEvent 中提取。但是,我正在尝试构建一个组合管道,它可以让我创建一个发布者数组,以并行查询心率、步数并传递锻炼事件,因此在sink 我收到一个包含这些值的 3 元素元组所以我可以填充上面的模型。我目前拥有的如下,

// Extract the workout's segments (defined automatically by an Apple Watch)
let workoutSegments = (workout.workoutEvents ?? []).filter({ $0.type == .segment })

// For each of the workout segments defined above create a HKStatisticQuery that starts on the interval's
// beginning and ends on the interval's end so the HealthKit query is properly defined to be
// executed between that interval.
let segmentsWorkoutPublisher = Publishers.MergeMany(workoutSegments.map({ $0.dateInterval }).map({
    healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: $0.start, to: $0.end)
}))
.assertNoFailure()

// Do the same logic as above in `segmentsWorkoutPublisher` but for steps
let stepsPublisher = Publishers.MergeMany(workoutSegments.map({ $0.dateInterval }).map({

    healthStore.statistic(for: HKObjectType.quantityType(forIdentifier: HKQuantityTypeIdentifier.stepCount)!, with: .cumulativeSum, from: $0.start, to: $0.end)
}))
.assertNoFailure()

Publishers.Zip3(workoutSegments.publisher, stepsPublisher, segmentsWorkoutPublisher)
    
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: { pace, steps, hrs in
        
        let d = SGWorkoutEvent(type: pace.type,
                               splitActiveDurationQuantity: pace.splitDuration,
                               splitDistanceQuantity: pace.splitDistance,
                               totalDistanceQuantity: pace.totalDistanceQuantity,
                               splitMeasuringSystem: pace.splitMeasuringSystem,
                               steps: steps.sumQuantity(),
                               heartRate: hrs.averageQuantity())
        
        self.paces.append(d)
    })
    .store(in: &bag)

HKHealthStore.statistic(for:...) 只不过是在 HKHealthStore 扩展上定义的 HKStatisticsQuery 的组合包装器,见下文。

public func statistic(for type: HKQuantityType, with options: HKStatisticsOptions, from startDate: Date, to endDate: Date, _ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
    
    let subject = PassthroughSubject<HKStatistics, Error>()
    
    let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
    
    let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
        
        guard error == nil else {
            hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
            return
        }
        
        subject.send(statistics!)
        subject.send(completion: .finished)
    })
    
    self.execute(query)
    
    return subject.eraseToAnyPublisher()
}

我在这里看到的是某种竞赛条件,其中检索到的步数和心率没有同时返回广告。结果,我看到了没有意义的值,例如 5' 200steps 的 1K 拆分和相同持续时间 700steps 的另一个。实际情况应该是这两个间隔应该显示一个大约 150 的值,但似乎我可能没有使用正确的组合运算符。

我希望看到的预期行为是Publishers.Zip 上的每个发布者让每个 3 项元组按顺序(第一个间隔,第二个间隔......)完成其查询,而不是这种不可复制的竞争条件.

为了尝试提供更多上下文,我认为这类似于为不同的时间戳创建一个包含温度、湿度和下雨概率的模型,然后查询三个不同的 API 端点以检索三个不同的值并将它们合并到模型中。

【问题讨论】:

  • 当您的代码无法编译时,很难提供帮助。我建议制作一个具有类似功能的示例项目并将其发布。也就是说,乍一看,您的 statistic 方法做得太多了。将其缩减为 3-4 个参数并进行重构。此外,Array 有一个发布者。而不是使用 MergeMany 尝试workoutSegments.publisher 并玩弄它。

标签: swift combine healthkit


【解决方案1】:

这里有很多东西要解压,但我会试一试。让我们从您的HKHealthStore.statistic 函数开始。您想运行(可能是异步的)查询,发布具有单个结果的序列,然后结束。这似乎是使用Future 的理想案例。我对HealthKit 没有任何经验(完全没有),我不能保证它会编译,但是转换可能看起来像这样:

public func statistic(
    for type: HKQuantityType,
    with options: HKStatisticsOptions,
    from startDate: Date,
    to endDate: Date,
    _ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {

    let future = Future<HKStatistics, Error> {
        fulfillPromise in

        let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])

        let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
            guard error == nil else {
                hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
                fulfillPromise(.failure(error!))
            }

            fulfillPromise(.success(statistics!))
        })

        self.execute(query)
    }

    return future.eraseToAnyPublisher()
}

所以现在我们有一个“一次性”发布者,它运行一个查询并在它有值时触发。

现在让我们看看您的segmentsWorkoutPublisher(以及扩展名stepsPublisher)。

在使用 Combine 时,如果您发现自己在使用 Publisher.&lt;SomeOperatorType&gt; 构造函数,您应该非常小心。根据我的经验,这样做很少是正确的。 (话虽如此,我以后使用Zip3 似乎没问题)。

在这种情况下,您正在创建Publishers(您的Futures)的序列。但是你真的对Publishers 的序列不感兴趣。您对Publishers 产生的 序列感兴趣。从某种意义上说,您希望“解包”每个 Publisher(通过等待其值)并将这些结果发送到序列中。这正是flatMap 的用途!让我们做这样的事情:

let segmentsWorkoutPublisher =
    workoutSegments
        .map { $0.dateInterval }
        .flatMap {
            healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: $0.start, to: $0.end)
        }
        .assertNoFailure()

这会生成一串序列,然后等待每个序列发出一个值并将这些值发送到更远的地方。

stepsPublisher 也会以类似的方式发生变化。

我认为这会让你到达你需要去的地方。作为查看此内容的一部分,创建了一个 Playground,我在其中重新设计了您的示例,但使用了更简化的类型。下次遇到此类问题时,您可能会尝试类似的方法 - 过滤掉多余的细节并尝试创建一个更简单的示例。如果您可以像这样在操场上将您的代码放在一起,那么编译起来不会有太多麻烦,而且会更容易回答问题。游乐场:

import Foundation
import Combine
import PlaygroundSupport

enum MockEventType : CaseIterable {
    case segment
    case notSegment
}

struct MockSegment {
    let type : MockEventType
    let dateInterval : DateInterval = DateInterval.init(start: Date.now, duration: 3600)
}

func statistic() -> AnyPublisher<Float, Never> {
    let future = Future<Float, Never>() {
        fulfillPromise in

        DispatchQueue.global(qos: .background).async {
            sleep(UInt32.random(in: 1...3))
            fulfillPromise(.success(Float.random(in: 100.0...150.0)))
        }
    }

    return future
        .eraseToAnyPublisher()
}

// Generate an endless stream of mock events.
let rawWorkouts = Timer.publish(every: 1.0, on: .current, in: .common)
    .autoconnect()
    .map{ _ in MockSegment(type: MockEventType.allCases.randomElement()!) }

let workoutSegments = rawWorkouts.filter { $0.type == .segment }

let dateIntervals =
    workoutSegments
        .map { $0.dateInterval }

let segmentsWorkoutPublisher =
        dateIntervals
        .flatMap { _ in statistic() }
        .assertNoFailure()

let stepsPublisher =
        dateIntervals
        .flatMap { _ in statistic() }
        .assertNoFailure()

var bag = Set<AnyCancellable>()

Publishers.Zip3(workoutSegments, stepsPublisher, segmentsWorkoutPublisher)
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: { pace, steps, hrs in
        print("pace: \(pace) steps: \(steps), hrs: \(hrs)")
    })
    .store(in: &bag)

PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-08-01
    • 2013-12-31
    • 1970-01-01
    • 2023-01-30
    • 1970-01-01
    • 2016-11-09
    • 2017-03-24
    • 2017-02-20
    相关资源
    最近更新 更多