【问题标题】:I want to use rx.js to merge multiple data sources and support adding and deleting data sources想用rx.js合并多个数据源,支持增删数据源
【发布时间】:2021-01-15 17:50:02
【问题描述】:

我有多个数据源,它们都是热门的 Observable(BehaviorSubject)

const data1$ = new BehaviorSubject({key: 'a', value: 1})
const data2$ = new BehaviorSubject({key: 'b', value: 2})
const data3$ = new BehaviorSubject({key: 'c', value: 3})

我有一个聚合其他数据源的根

const root$ = new BehaviorSubject({ field: {/* Stores the values of other data sources*/ } })

这个想法是,当我订阅 root$ 时,我会得到他们数据的聚合,而当 dataX$ 更新(下一个)时,我会得到更新的聚合。

另外,我希望在添加/删除数据源时收到更新。

这是我目前的代码,我想不出实现它的方法。

const a1$ = new BehaviorSubject({key: 'a', value: 1})
const a2$ = new BehaviorSubject({key: 'b', value: 2})
const trunk$ = new BehaviorSubject([a1$, a2$])

function add(dataSourch) {
  trunk$.next([...trunk$.getValue(), dataSourch])
}
function remove(key) {
  const dataArr = trunk$.getValue()
  const newArr = dataArr.filter((sourch) => sourch.key !== key)
  trunk$.next([...newArr])
}

————————————————————————————————————

目前:我的问题是在下面的代码中添加一个新的数据源,最终的数据源会触发多次更新

我添加了有关此问题的其他信息和代码

我有一些基础数据源,不同角色订阅,不同角色修改。 同时我有一个聚合数据源,将基础数据源的数据聚合到一个字段中(聚合数据源本身还有其他数据), 并且被某些角色订阅。 基础数据源管理模块支持添加和删除数据源。

const data1$ = new BehaviorSubject({key: 'a', value: 1})
const data2$ = new BehaviorSubject({key: 'b', value: 1})
const data3$ = new BehaviorSubject({key: 'c', value: 1})
const data4$ = new BehaviorSubject({key: 'd', value: 1})

const addSourch = (curr: any) => (prev: any) => ([curr, ...prev])
const addSourch$ = new Subject()
const deleteSourchByOb = (curr: any) => (arr: any) => {
  const newArr = arr.filter((ele: any) => ele !== curr)
  return newArr
} 
const deleteSourchByOb$ = new Subject()
const deleteAll = () => (prev: any) => ([])
const deleteAll$ = new Subject()

const sourch$ = new BehaviorSubject([data1$, data2$, data3$])


const root$ = merge(
  sourch$.pipe(map(arr => (curr: any) => [...arr, ...curr])),
  addSourch$.pipe(map(addSourch)),
  deleteSourchByOb$.pipe(map(deleteSourchByOb)),
  deleteAll$.pipe(map(deleteAll))
).pipe(
  scan((state, fn: Function) => fn(state), [])
) 

const thunk$ = new BehaviorSubject({ field: { }})

const root2$= root$.pipe(
  mergeMap(arr => merge(...arr)),
  map((v: any) => (curr: any) => {
    const obj = cloneDeep(curr)
    obj[v.key] = v
    return obj
  })
).pipe(
  scan((state, fn) => fn(state), {})
)

const final$ = combineLatest([root2$, thunk$]).pipe(
  map((arr) => {
    arr[1].field = arr[0]
    return arr[1]
  }) 
)
final$.subscribe(console.log)

【问题讨论】:

    标签: rxjs


    【解决方案1】:

    您可以通过使用scan 运算符来实现此行为。

    const { BehaviorSubject, Subject, merge } = rxjs;
    const { map, scan } = rxjs.operators;
    
    // Your functions that mutate your state/data aggregation
    const add = (data) => (state) => ([...state, data]);
    const deleteAll = () => (state) => ([]);
    
    const data1$ = new BehaviorSubject({key: 'a', value: 1})
    const data2$ = new BehaviorSubject({key: 'b', value: 2})
    const data3$ = new BehaviorSubject({key: 'c', value: 3})
    
    // Observable that represents all add events
    const add$ = merge(data1$, data2$, data3$)
    // Observable that represents all deleteAll events
    const deleteAll$ = new Subject();
    
    const data$ = merge(
      // Applies the first (outer) mutate function to your event observables
      add$.pipe(map(add)),
      deleteAll$.pipe(map(deleteAll))
    ).pipe(
      // Applies the second (inner) mutate function to finally mutate and return your updated state
      scan((state, fn) => fn(state), [])
    )
    
    data$.subscribe(console.log)
    
    // Sample further events
    data1$.next({key: 'd', value: 4})
    deleteAll$.next();
    data1$.next({key: 'e', value: 5})
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

    如果您需要更多详细信息,请在 cmets 中询问,我会尝试将它们添加到答案中。 仅供参考:出于您提出问题的主要原因(在 rxjs 中随时间变化状态),我没有像您在问题中那样实现删除功能。您可以轻松地将此函数调整为给定的adddeleteAll 函数。

    【讨论】:

    • 感谢您的回答。它给我带来了解决方案。现在,我已经更新了我的问题和代码。在代码中,新的数据源会触发四次更新。有没有办法只触发一次。
    • 好吧,你可以一个接一个地问。你有一个具体的问题,答案确实合适。如果出现新问题,请提出新问题。你能不能回到你的老问题。如果你愿意,你可以在我的评论下发布你的新问题,我会看看。另请注意,如果答案对您的问题有效,您就有机会投票和/或给它一个绿色检查。
    • 因为它可能会回答你的问题(但是当你提出一个新问题时你会得到一个详细的问题)——你可以将下一个可观察对象转换为可观察对象。这样你就可以将新的dataN$ 放入一个data$ observable 并在你需要的地方“拆箱”它。
    • 感谢您的回答。我想我现在知道如何在这里提问了。
    猜你喜欢
    • 2015-10-02
    • 2018-08-12
    • 1970-01-01
    • 1970-01-01
    • 2010-11-27
    • 2021-10-31
    • 1970-01-01
    • 2019-06-30
    • 1970-01-01
    相关资源
    最近更新 更多