【问题标题】:Difference between two observables两个可观察量之间的差异
【发布时间】:2018-05-26 22:14:09
【问题描述】:

假设我有两个 observables。

第一个 observable 是某些列表的数组:

[
    {id: 'zzz', other props here...},
    {id: 'aaa', ...},
    {id: '007', ...}
    ... and more over time
]

第二个 observable 是一个忽略列表的数组:

[
    {id: '007'}, // only id, no other props
    {id: 'zzz'}
    ... and more over time
]

结果应该是一个新的 observable 列表(第一个 observable),但不能有任何被忽略的列表:

[
    {id: 'aaa', other props here...}
    ... and more over time
] 

这是我现在发布之前的内容:

obs2.pipe(withLatestFrom(obs1, ? => ?, filter(?));

【问题讨论】:

  • 如果列表是在第一个 observable 上发出的,然后在第二个 observable 上作为被忽略的发出?你能保证这不会发生吗?
  • 另外,由于您显然需要缓冲第二个 observable,您希望该缓冲区永远存在(并增长)还是有某种条件可以再次删除它们(即,任何列表只能出现在第一个可观察的一次,基于时间,..)?

标签: javascript angular firebase rxjs angular6


【解决方案1】:

我没有测试,但我认为应该可以:

combineLatest(values$, excluded$).pipe(
  map(([values, excluded]) => {
    // put all the excluded IDs into a map for better perfs
    const excludedIds: Map<string, undefined> = excluded.reduce(
      (acc: Map<string, undefined>, item) => {
        acc.set(item.id, undefined)
        return acc;
      },
      new Map()
    );

    // filter the array, by looking up if the current
    // item.id is in the excluded list or not
    return values.filter(item => !excludedIds.has(item.id))
  })
)

说明:

使用combineLatest,无论您从哪里获得更新,您都会收到警告。如果您在示例中使用withLatestFrom,则仅当更新values$ observable 时才会触发更新。但是,如果 excluded$ 发生更改,则不会触发您的情况的更新。

然后将所有排除的 ID 放入映射而不是数组,因为我们需要知道是否应该排除给定的 ID。查看地图比查看数组要快。

然后只过滤值数组。

【讨论】:

    【解决方案2】:

    如果我理解正确,你会想要做的是

    1. 随着时间的推移汇总传入的项目
    2. 聚合随着时间推移将被忽略的 ID
    3. 最后,由于上述两个流都会随着时间的推移而发出,因此会发出一个结果列表,其中不包括被忽略的 ID。

    鉴于上述情况,以下是您可以尝试的粗略示例。如底部所述,根据前两个流的节奏,您将获得不同的结果,因为这就是异步发生的情况。为了证明这一点,我模拟了事物随时间发射的随机延迟。

    希望这会有所帮助!

    P.S.:以下是 Typescript,假设为 rxjs@^6。

    import { BehaviorSubject, combineLatest, of, Observable } from "rxjs";
    import { delay, map, scan, concatMap } from "rxjs/operators";
    
    /**
     * Data sources
     */
    
    // Just for showcase purposes... Simulates items emitted over time
    const simulatedEmitOverTime = <T>() => (source: Observable<T>) =>
      source.pipe(
        concatMap(thing => of(thing).pipe(delay(Math.random() * 1000)))
      );
    
    interface Thing {
      id: string;
    }
    
    // Stream of things over time
    const thingsOverTime$ = of(
      { id: "zzz" },
      { id: "aaa" },
      { id: "007" }
    ).pipe(
      simulatedEmitOverTime()
    );
    
    // Stream of ignored things over time
    const ignoredThingsOverTime$ = of(
      { id: "007" },
      { id: "zzz" }
    ).pipe(
      simulatedEmitOverTime()
    );
    
    
    /**
     * Somewhere in your app
     */
    
    // Aggregate incoming things
    // `scan` takes a reducer-type function
    const aggregatedThings$ = thingsOverTime$.pipe(
      scan(
        (aggregatedThings: Thing[], incomingThing: Thing) =>
          aggregatedThings.concat(incomingThing),
        []
      )
    );
    
    // Create a Set from incoming ignored thing ids
    // A Set will allow for easy filtering over time
    const ignoredIds$ = ignoredThingsOverTime$.pipe(
      scan(
        (excludedIdSet, incomingThing: Thing) =>
          excludedIdSet.add(incomingThing.id),
        new Set<string>()
      )
    );
    
    // Combine stream and then filter out ignored ids
    const sanitizedThings$ = combineLatest(aggregatedThings$, ignoredIds$)
      .pipe(
        map(([things, ignored]) => things.filter(({ id }) => !ignored.has(id)))
      );
    
    // Subscribe where needed
    // Note: End result will vary depending on the timing of items coming in
    // over time (which is being simulated here-ish)
    sanitizedThings$.subscribe(console.log);
    

    【讨论】:

      猜你喜欢
      • 2020-07-13
      • 2017-05-17
      • 1970-01-01
      • 2015-04-22
      • 2019-06-03
      • 2012-01-31
      • 2020-10-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多