(另请参阅community wiki answer I posted 的替代方法。)
在你说过的评论中:
我正在设计一种共识算法,其中每个来源都需要在给定的时间范围内发送响应。如果其中一些参与者死了!我的意思是他们不发送值,循环将永远保持!
对我来说这听起来像是超时。实现超时的常用方法是通过Promise.race 并带有一个围绕计时器机制(setTimeout 或类似)的承诺。 Promise.race 观察你传递给它的承诺,并在它们中的任何一个完成后立即结算(传递该履行或拒绝),而不管其他任何人后来如何解决。
为此,您需要以另一种方式循环而不是for-await-of,并直接而不是间接地使用结果对象的承诺。假设你有一个实用函数:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
这会返回一个承诺,它会在 X 毫秒后使用您提供的任何值(如果有)来履行。
然后:
(async () => {
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
if (result === GOT_TIMEOUT) {
// Didn't get a response in time
console.log("Timeout");
} else {
// Got a response
if (result.done) {
// Iteration complete
console.log("Iteration complete");
break;
}
// ...some data processing on `result.value`...
console.log(`Process ${result.value}`);
}
}
} finally {
try {
it.return?.(); // Close the iterator if it needs closing
} catch { }
}
})();
使用随机持续时间进行异步迭代器工作的实时示例,但在第三次迭代时强制超时:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example();
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Didn't get a response in time
console.log(`Got timeout in ${elapsed}ms`);
} else {
// Got a response
if (result.done) {
// Iteration complete
console.log(`Got iteration complete result in ${elapsed}ms`);
break;
}
// ...some data processing on `result.value`...
console.log(`Got result ${result.value} to process in ${elapsed}ms`);
}
}
} finally {
try {
it.return?.(); // Close the iterator if it needs closing
} catch { }
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
这是第一次迭代超时的示例,因为您似乎担心这种情况:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example();
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Didn't get a response in time
console.log(`Got timeout in ${elapsed}ms`);
} else {
// Got a response
if (result.done) {
// Iteration complete
console.log(`Got iteration complete result in ${elapsed}ms`);
break;
}
// ...some data processing on `result.value`...
console.log(`Got result ${result.value} to process in ${elapsed}ms`);
}
}
} finally {
try {
it.return?.(); // Close the iterator if it needs closing
} catch { }
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
如果您不希望处理阻止下一个值的收集,您不能 await 进行您所做的处理(也许建立一个承诺数组以完成该处理并 Promise.all 他们最后)。
或者如果您想退出整个操作:
(async () => {
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
if (result === GOT_TIMEOUT) {
// Didn't get a response in time, bail
console.log("Timeout");
break;
}
// Got a response
if (result.done) {
// Iteration complete
console.log("Iteration complete");
break;
}
console.log(`Got ${result.value}`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...code here to process the contents of `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
现场示例:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example(); // For the example
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Didn't get a response in time, bail
console.log(`Got timeout after ${elapsed}ms`);
break;
}
// Got a response
if (result.done) {
// Iteration complete
console.log(`Got iteration complete after ${elapsed}ms`);
break;
}
console.log(`Got value ${result.value} after ${elapsed}ms`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...code here to process the contents of `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
第一次超时,但不是每次都超时(因为第一次超时,我们看不到后续超时):
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example(); // For the example
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Didn't get a response in time, bail
console.log(`Got timeout after ${elapsed}ms`);
break;
}
// Got a response
if (result.done) {
// Iteration complete
console.log(`Got iteration complete after ${elapsed}ms`);
break;
}
console.log(`Got value ${result.value} after ${elapsed}ms`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...code here to process the contents of `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
或两者的某种组合。您需要根据自己的实际工作进行调整,但这是一个似乎合理的方向。
以上所有:
- 如果您的环境尚不支持可选链接,请将
it.return?.(); 替换为 if (it.return) { it.return(); }。
- 如果您的环境尚不支持可选的
catch 绑定,请将catch { } 替换为catch (e) { }。