【问题标题】:RxJS ordering an async firebase observable streamRxJS 订购异步 firebase 可观察流
【发布时间】:2021-06-09 16:33:03
【问题描述】:

我正在 Angular 中构建一个数据库应用程序,并使用 Firebase 作为后端。

在第一种情况下,我在 firebase 中有两个集合:groupsusers。 Groups 有一堆文档,其中包含一个名称字符串(即 StackOverflow 成员组),以及一组对管理该组的用户的引用。这些引用将用于交叉引用“用户”集合,其中文档 ID 与引用 ID 相同。

所以处理的方式是:

  • 查询组以获取当前用户所属的组列表 经理。
  • 这将返回组的名称以及组的 ID 所有可以管理课程的人。
  • 这些 ID 然后用于 查询用户集合并获取名称和电子邮件 人/经理。

我正在努力解决与 RxJS 的同步问题。我调用它的主要功能如下:

export interface Group {
    name: string; managers: {name: string, email: string, uid: string}[]
}

export class ClassesComponent implements OnInit {

    groups: Group[];
    user: User;

    constructor(private db: DatabaseService) { }

    ngOnInit(): void {
        const groups = this.getAllGroups();

        groups.subscribe(() => {
            console.log(JSON.stringify(this.groups));
        })
    }

    getAllGroups(): Observable<any> {
        let newListOfGroups: Group[] = [];

        return this.getGroups().pipe(map((groups: QuerySnapshot<any>) => {
            groups.forEach((group: QueryDocumentSnapshot<any>) => {
                
                // define the variables for the groups
                let newGroup: Group = {name: group.data().name, managers: null};
                let groupManagers: {name: string, email: string, uid: string}[] = [];

                // create the empty array for the observables for each user query
                let obsArray: Observable<any>[] = [];

                // for each user reference get an observable reference to the data
                group.data().managers.forEach(user => {
                    obsArray.push(this.getUserName(user.id));
                });

                // use concat to execute all subscriptions at once
                concat(obsArray).subscribe(data => {
                    // log the output of the observable 
                    console.log(data);
                });

                // add these values to the group variables
                newGroup.managers = groupManagers;
                newListOfGroups.push(newGroup);
            })
            
            // make the groups list the global list and repopulate the screen
            this.groups = newListOfGroups;

        }, (error: any) => {
            // standard error...
            console.log(`Error loading groups: ${error.message}`);
        }));
    }

    // these two functions are normally in a service but put here for simplicity.

    getGroups(): Observable<QuerySnapshot<any>> {
        const userId = this.user.id;
        return this.firebase.collection('group', grp => grp.where('managers', 'array-contains', this.firebase.collection('users').doc(userId).ref)).get();
    }

    getUserName(uid: string): Observable<any> {
        return this.firebase.collection('users').doc(uid).get().pipe(take(1), map((data: DocumentSnapshot<any>) => {
            return {name: data.data().name, email: data.data().email, id: uid};
        }));
    }

}
组的输出很好,但是我希望返回数组的 getUserName 的管理器数据返回一个可观察的并且显然它完全不同步,因为用户查询是在同步模式内异步完成的。

我正在努力弄清楚我需要做什么才能让所有这些都异步运行,然后再一起返回,以便它在我想要的结构中。

【问题讨论】:

    标签: angular rxjs observable angularfire2


    【解决方案1】:

    我模仿了你的类型,因为我没有使用过 firebase,但我认为我已经很接近了。

    https://stackblitz.com/edit/rxjs-demo-s7m1mc?file=index.ts

    import { of, forkJoin, Observable, from } from 'rxjs';
    import { delay, map, mergeAll, mergeMap, tap, toArray } from 'rxjs/operators';
    // Made up some fake types
    interface User {
      name: string;
      email: string;
      uid: string;
    }
    
    type InCompleteGroup = {
      data: () => { name: string; managers: { uid: string }[] };
    };
    // End of fake types
    
    const randomResponseDelay = () => Math.floor(Math.random() * 1000) + 1;
    
    function getGroups(): Observable<Partial<InCompleteGroup>[]> {
      return of([
        {
          data: () => ({ name: 'g1', managers: [{ uid: '111' }, { uid: '333' }] })
        },
        {
          data: () => ({ name: 'g2', managers: [{ uid: '222' }] })
        }
      ]).pipe(delay(randomResponseDelay()));
    }
    
    function getUserName(uid: string): Observable<string> {
      return of(`Mr Smith ${uid}`).pipe(delay(randomResponseDelay()));
    }
    
    function getAllGroups() {
      return getGroups().pipe(
        mergeAll(), // split groups into single group emits
        mergeMap(group => {
          const namedManagersInGroup$ = forkJoin(
            group.data().managers.map(userManager =>
              getUserName(userManager.uid).pipe(
                tap(x =>
                  console.log('getUserName done for user with uid', userManager.uid)
                ),
                map(userName => ({ ...userManager, name: userName }))
              )
            )
          );
          return namedManagersInGroup$.pipe(
            map(managersOfGroup => ({
              name: group.data().name,
              managers: managersOfGroup
            }))
          );
        }),
        toArray() // glue back together to an array
      );
    }
    
    getAllGroups().subscribe(all => console.log('done', all));
    

    【讨论】:

    • 您好安德烈亚斯,非常感谢。在逐行工作之后,我得到了它的大部分工作原理 - VSC 不接受该位置的“mergeAll()”,但出现错误:TypeError: You provide an invalid object where a stream is expected。您可以提供 Observable、Promise、Array 或 Iterable。我不知道为什么,因为文档 mergeAll 应该提供一个 Observable...
    • 可能与使用的 rxjs 版本有关,我的 stackblitz 使用的是最新可用的。或者,如果导入混合,例如导入静态而不是运算符。我只是猜测,需要更多细节来帮助进一步:)
    • RxJS 是 7.1.0,我最初检查它不是静态的,似乎是从 rxjs 版本导入的。这似乎是一个类型错误,当我将类型更改为任何类型时,它仍然失败... 'OperatorFunction,unknown>' 类型的参数不可分配给 'OperatorFunction 类型的参数,未知>'。类型“ObservableInput”不可分配给类型“QuerySnapshot”。类型“PromiseLike”缺少类型“QuerySnapshot”的以下属性:文档、查询、元数据、大小等 4 个。
    • 所以它与传递的 observable 的类型有关 - 所以我想我只需要通过 observable 链来解决这个问题:/
    猜你喜欢
    • 2019-11-05
    • 1970-01-01
    • 2017-04-13
    • 2020-07-27
    • 1970-01-01
    • 2017-09-21
    • 1970-01-01
    • 2022-11-09
    • 1970-01-01
    相关资源
    最近更新 更多