【发布时间】:2021-10-02 12:24:09
【问题描述】:
下面有更新
我正在尝试创建一个 id 字符串缓冲区。我希望这个缓冲区关闭两个谓词:
- 在我决定的设定时间后超时
- 它达到了我决定的最大缓冲区大小
一旦缓冲区在这两个谓词中的任何一个上关闭,我希望它将收集到的 id 字符串数组传递给另一个函数。我下面的代码是我想要传递数据的确切设置,但我是 RXJS 的新手!
class TestClass {
private ids: Array<string>;
private id$: Subject<string> = new Subject();
private buffered$: Observable<Array<string>>;
private timer$: Observable<number>;
constructor() {
this.ids = [];
this.id$ = new Subject<string>();
this.id$.subscribe();
this.timer$ = interval(2000); // timeoutTimer
this.buffered$ = this.id$.pipe(buffer(this.timer$));
this.buffered$.subscribe((idBuffer) => console.log("Id buffer: " + idBuffer));
}
fun1(id: string, x: number) {
this.ids.push(id);
this.id$.next(id); // Source data to be Observed and buffered
return new Promise((reslove) => {
// I want to call fun2 here with
// 1. array from buffer Observable
// 2. the x value from this function
resolve(fun2(buffered, x))
});
}
fun2(ids: Array<string>, x: number) {
console.log('In function 2!');
}
}
let test = new TestClass();
setInterval(() => {
test.fun1('hey' + Math.floor(Math.random() * 10), Math.random());
}, 300);
更新:
我更新了描述我面临的问题以及如何解决它的伪代码。
问题:本质上,我多次调用 fun1 以将内容添加到缓冲区中,并且一旦达到我指定的时间限制或最大缓冲区大小,我想调用 fun2!然后我希望 fun1 返回 fun2 返回的相应数据对象。所以我是在每次调用 fun1 时批量调用而不是调用 fun2,我想等待缓冲区或超时并立即返回所有数据!
我需要找到一种方法,只在第一次调用 fun1 时订阅它,然后等待它完成返回所有元素,然后一旦缓冲区为空,再次订阅下一个要添加到缓冲区中的第一个元素
/*
* PROBLEM: I want this to run only for the first addition of an ID string to a new buffer
* but currently it is returning the new Promise<>... after every call to fun1! This results in
* repeating elements being returned from the Promise
*
* Example:
*
* Buffer = ["id1", "id2", "id3", "id4", "id5"] // We called fun1 five times with these respective IDs
*
* // We are assuming the buffers max out at 3 elements
* (1) idBuffer = ["id1", "id2", "id3"] // After first emission of bufferedIdAccumulator$
* (2) idBuffer = ["id2", "id3", "id4"] // After second emission of bufferedIdAccumulator$
* (3) idBuffer = ["id3", "id4", "id5"] // After third emission of bufferedIdAccumulator$
*
* What I Want:
* (1) idBuffer = ["id1", "id2", "id3"] // After first emission of bufferedIdAccumulator$
* (2) idBuffer = ["id1", "id2", "id3"] // After second emission of bufferedIdAccumulator$
* (3) idBuffer = ["id1", "id2", "id3"] // After third emission of bufferedIdAccumulator$
*
* This is becasuse the first three calls to fun1 contain the first three objects I need to return so I
* only want to return once for all three of these objects
*
*/
fun1(id: string, x: number): Promise<DataClass> {
// console.log("In Function 1!");
this.ids.push(id);
this.id$.next(id); // Source data to be Observed and buffered
console.log(`Adding id: ${id} to buffer!`);
return new Promise<DataClass>(async (resolve) => {
// I want to call fun2 here with
// 1. array from buffer Observable
// 2. the x value from this function
let idBuffer = await firstValueFrom(this.bufferedIdAccumulator$);
// let idBuffer = await new Promise<Array<string>>((resolve) => {
// this.bufferedIdAccumulator$.subscribe((idBuffer) => {
// console.log(idBuffer);
// resolve(idBuffer);
// });
// });
console.log(`<fun1> idBuffer: ${idBuffer}`);
let dataClasses = this.fun2(idBuffer, x);
let dataClass = dataClasses[0];
resolve(dataClass);
});
}
fun2(ids: Array<string>, x: number): Array<DataClass> {
// console.log('In function 2!');
// console.log('\nFunction 2:');
// console.log(ids);
let dataClasses = [];
ids.forEach((id) => {
dataClasses.push(new DataClass(id, x));
});
return dataClasses;
}
}
class DataClass {
id: string;
x: number;
constructor(id: string, x: number) {
this.id = id;
this.x = x;
}
}
let test = new TestClass();
let x = 0;
setInterval(() => {
test.fun1('hey' + x++, Math.random()); // Math.floor(Math.random() * 10), Math.random()
}, 2500);
【问题讨论】:
-
听起来可能对你有用:stackoverflow.com/a/69126161/1858357
-
哇!!谢谢你们!让我去测试一下然后回来!我也没有提到在我的 fun1 代码中,我希望能够等待我的缓冲区由于时间或空间而关闭,然后通过该 Promise 返回 fun2 的结果。这可能吗?
标签: javascript rxjs