【发布时间】:2021-04-09 08:48:11
【问题描述】:
我是 rxjs 的新手,想用它来构建视频下载器。其目的是 24/7 全天候运行并自动录制偶尔的直播以供日后观看。这是我目前所拥有的。
import { BehaviorSubject, from, defer, of } from "rxjs";
import { delay, mergeMap, repeat, tap } from "rxjs/operators";
const downloader = url => {
const defaultDelay = 1000;
const maxDelay = 10000;
const delayTime = new BehaviorSubject(defaultDelay);
/*
* Simulated download output.
*
* @return {String|Number} potentialOutput
* A {Number} 1 means "FAILURE, stream is offline."
* A {String} means "SUCCESS, video was downloaded."
* 1 is the most likely value returned
*
* greets https://stackoverflow.com/a/8877271/1004931
*/
function randomWithProbability() {
var potentialOutput = [1, 1, 1, 1, 1, "/tmp/video.mp4"];
var idx = Math.floor(Math.random() * potentialOutput.length);
return potentialOutput[idx];
}
/**
* Simulated download. Returns a promise which resolves after 1 second.
*/
const download = url => {
let downloadP = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(randomWithProbability());
}, 1000);
});
return from(downloadP);
};
/**
* Conditionally adjust the delay inbetween download attempts.
* - If the video downloaded successfuly, reset the timer to it's default.
* (in case the stream went down by error, we want to record again ASAP.)
* - If the video stream was offline, increase the delay until our next download attempt.
* (we don't want to be rude and flood the server)
*/
const adjustTimer = (ytdlOutput) => {
if (typeof ytdlOutput === 'string') {
delayTime.next(defaultDelay); // video stream exited successfully, so reset in case the stream starts again
} else {
let adjustedTime = (delayTime.getValue() * 2 > maxDelay) ? maxDelay : delayTime.getValue() * 2;
delayTime.next(adjustedTime); // video stream exited abnormally, likely due to being offline. wait longer until next attempt
}
};
/**
* The Observable.
* 1. Start with the URL of the video stream
* 2. delay by the time defined in delayTime
* 3. download, merging the download observable with the parent observable.
* 4. adjust the delayTime based on download output.
* 5. repeat the process indefinitely.
*/
const stream = of(url)
.pipe(
delay(delayTime.getValue()),
mergeMap(download),
tap(res => {
adjustTimer(res);
}),
repeat()
)
stream.subscribe(val => {
console.log(
`download result:${val}, delayTime:${delayTime.getValue()}`
);
});
};
downloader("https://example.com/files/video.mp4");
我遇到的问题是 {BehaviorSubject} delayTime 在我的循环的每次迭代中都没有得到更新。 delayTime is 正在更新,如在订阅者的回调中调用 delayTime.getValue() 所示,但更改对 observable/subscriber(?) 的内存(?) 没有影响.
相反,我看到可观察对象的范围(?)中的 delayTime 保持不变,就像它第一次订阅时一样。在 observable 的世界中,没有像我希望的那样更新 BehaviorSubject 的值。
这就是我卡住的地方。如何重构我的代码以具有随时间变化的延迟计时器,并影响到下一次下载尝试的延迟?
【问题讨论】:
标签: javascript loops rxjs observable reactive-programming