【问题标题】:How to break an "for await ...of" loop, if loop do not complete within a given time?如果循环未在给定时间内完成,如何打破“for await ...of”循环?
【发布时间】:2021-12-01 01:55:48
【问题描述】:

如果异步循环没有在预期的时间段内完成,是否有打破异步循环的技术? 我有这样的代码:

(async()=>{
  for await(let t of asynDataStreamOrGenerator){
    //some data processing
  }
  //some other code I need to run based on whatever data collected by
  //asyncDataStreamOrGenerators within given time period
})()

如果此循环未在某个时间跨度内完成,则跳出循环并进一步处理请求。

【问题讨论】:

  • 看来这应该是asyncDataStreamOrGenerator对我的责任...
  • 每次迭代都检查一个标志有什么问题? (不会破坏当前的等待,但它真的很重要吗?)
  • const startTime = Date.now(); for(....) { if (Date.now() - startTime > XXXXXX) break;
  • 将它包装在您可以控制的东西中,您可以取消它。 for await ... of 确实适用于异步生成器,因此如果超时失效,您应该有机会不再生成(使用@T.J.Crowder 提出的技术)。
  • 什么是异步代码?通话是如何进行的?更好的答案取决于我在第一次发表评论后要求的确切细节。

标签: javascript ecmascript-6 es6-promise


【解决方案1】:

您可以使用超时Promise(代码中的timer) 并在每次迭代中使用Promise.race


下面的代码最多可以打印 30 个左右,而生成器可以生成更多。

async function wait(ms) {
  return new Promise(r=>setTimeout(r, ms))
}

async function* asynDataStreamOrGenerator() {
  for (let i = 0; i < 100; ++i) {
    await wait(30)
    yield i;
  }
}

async function* iterate_until(generator, timeout) {
  let timer = wait(timeout).then(_=>Promise.reject("TimeOut"))
  for (;;) {
    let it = generator.next()
    let result = await Promise.race([timer, it])
    if (result.done) break;
    yield result.value;
  }
}

{(async () => {
  try {
    for await (let t of iterate_until(asynDataStreamOrGenerator(), 1000)) {
      console.log(t)
    }
  } catch (e) { /* catch timeout, rethrow if needed*/ }
})()}

JSFiddle Link

【讨论】:

  • @T.J.Crowder 是的,永远不要认为我不能遍历异步生成器并获得承诺......
  • @T.J.Crowder 更新了异步生成器。谢谢指出:)
  • jsfiddle 链接是因为最近 SO sn-p 对我来说很奇怪
  • 不错的一个!这与我在 Heretic Monkey 的评论启发下发布的社区 wiki 答案非常相似。
【解决方案2】:

(另请参阅community wiki answer I posted 的替代方法。)

在你说过的评论中:

我正在设计一种共识算法,其中每个来源都需要在给定的时间范围内发送响应。如果其中一些参与者死了!我的意思是他们不发送值,循环将永远保持!

对我来说这听起来像是超时。实现超时的常用方法是通过Promise.race 并带有一个围绕计时器机制(setTimeout 或类似)的承诺。 Promise.race 观察你传递给它的承诺,并在它们中的任何一个完成后立即结算(传递该履行或拒绝),而不管其他任何人后来如何解决。

为此,您需要以另一种方式循环而不是for-await-of,并直接而不是间接地使用结果对象的承诺。假设你有一个实用函数:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

这会返回一个承诺,它会在 X 毫秒后使用您提供的任何值(如果有)来履行。

然后:

(async () => {
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time
                console.log("Timeout");
            } else {
                // Got a response
                if (result.done) {
                    // Iteration complete
                    console.log("Iteration complete");
                    break;
                }
                // ...some data processing on `result.value`...
                console.log(`Process ${result.value}`);
            }
        }
    } finally {
        try {
            it.return?.(); // Close the iterator if it needs closing
        } catch { }
    }
})();

使用随机持续时间进行异步迭代器工作的实时示例,但在第三次迭代时强制超时:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
        await delay(ms);
        yield i;
    }
}

(async () => {
    const asynDataStreamOrGenerator = example();
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const start = Date.now();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            const elapsed = Date.now() - start;
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time
                console.log(`Got timeout in ${elapsed}ms`);
            } else {
                // Got a response
                if (result.done) {
                    // Iteration complete
                    console.log(`Got iteration complete result in ${elapsed}ms`);
                    break;
                }
                // ...some data processing on `result.value`...
                console.log(`Got result ${result.value} to process in ${elapsed}ms`);
            }
        }
    } finally {
        try {
            it.return?.(); // Close the iterator if it needs closing
        } catch { }
    }
})();
.as-console-wrapper {
    max-height: 100% !important;
}

这是第一次迭代超时的示例,因为您似乎担心这种情况:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
        await delay(ms);
        yield i;
    }
}

(async () => {
    const asynDataStreamOrGenerator = example();
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const start = Date.now();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            const elapsed = Date.now() - start;
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time
                console.log(`Got timeout in ${elapsed}ms`);
            } else {
                // Got a response
                if (result.done) {
                    // Iteration complete
                    console.log(`Got iteration complete result in ${elapsed}ms`);
                    break;
                }
                // ...some data processing on `result.value`...
                console.log(`Got result ${result.value} to process in ${elapsed}ms`);
            }
        }
    } finally {
        try {
            it.return?.(); // Close the iterator if it needs closing
        } catch { }
    }
})();
.as-console-wrapper {
    max-height: 100% !important;
}

如果您不希望处理阻止下一个值的收集,您不能 await 进行您所做的处理(也许建立一个承诺数组以完成该处理并 Promise.all 他们最后)。

或者如果您想退出整个操作:

(async () => {
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const results = [];
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time, bail
                console.log("Timeout");
                break;
            }
            // Got a response
            if (result.done) {
                // Iteration complete
                console.log("Iteration complete");
                break;
            }
            console.log(`Got ${result.value}`);
            results.push(result.value);
        }
    } finally {
        try {
            it.return?.();
        } catch { }
    }
    // ...code here to process the contents of `results`...
    for (const value of results) {
        console.log(`Process ${value}`);
    }
})();

现场示例:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
        await delay(ms);
        yield i;
    }
}

(async () => {
    const asynDataStreamOrGenerator = example(); // For the example
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const results = [];
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const start = Date.now();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            const elapsed = Date.now() - start;
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time, bail
                console.log(`Got timeout after ${elapsed}ms`);
                break;
            }
            // Got a response
            if (result.done) {
                // Iteration complete
                console.log(`Got iteration complete after ${elapsed}ms`);
                break;
            }
            console.log(`Got value ${result.value} after ${elapsed}ms`);
            results.push(result.value);
        }
    } finally {
        try {
            it.return?.();
        } catch { }
    }
    // ...code here to process the contents of `results`...
    for (const value of results) {
        console.log(`Process ${value}`);
    }
})();
.as-console-wrapper {
    max-height: 100% !important;
}

第一次超时,但不是每次都超时(因为第一次超时,我们看不到后续超时):

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
        await delay(ms);
        yield i;
    }
}

(async () => {
    const asynDataStreamOrGenerator = example(); // For the example
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const results = [];
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const start = Date.now();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            const elapsed = Date.now() - start;
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time, bail
                console.log(`Got timeout after ${elapsed}ms`);
                break;
            }
            // Got a response
            if (result.done) {
                // Iteration complete
                console.log(`Got iteration complete after ${elapsed}ms`);
                break;
            }
            console.log(`Got value ${result.value} after ${elapsed}ms`);
            results.push(result.value);
        }
    } finally {
        try {
            it.return?.();
        } catch { }
    }
    // ...code here to process the contents of `results`...
    for (const value of results) {
        console.log(`Process ${value}`);
    }
})();
.as-console-wrapper {
    max-height: 100% !important;
}

或两者的某种组合。您需要根据自己的实际工作进行调整,但这是一个似乎合理的方向。


以上所有:

  • 如果您的环境尚不支持可选链接,请将 it.return?.(); 替换为 if (it.return) { it.return(); }
  • 如果您的环境尚不支持可选的catch 绑定,请将catch { } 替换为catch (e) { }

【讨论】:

  • 我相信 OP 希望在整个操作(而不是单个)上超时
  • @appleapple - 我无法确定,但对我来说,“延迟”听起来不像“取消”或救助。如果是这样,这是一个小调整。我已经添加了这个调整。
  • @appleapple。确切地。上面共享的代码假设我们一定会按时收到价值。异步生成器可以在任何间隔发送值。如果给定的时间已经过去,我想打破循环,即使我同时没有收到一个值。上述解决方案的超时代码 n 只会在我们接收到值时触发,然后它会计算时间是否已过。但问题是“异步循环不应超过 X 时间,即使我们没有收到单个值”。
  • @AnuragVohra - “上面共享的代码假定我们一定会按时收到价值。” 不,它没有。这就是Promise.race 的全部意义所在。 (注意在for-of 上没有await;我们从异步迭代器中得到了承诺,我们不会在调用Promise.race 之前等待它解决。)如果你想,我已经更新了答案以显示调整打破(不是“延迟”)循环。
  • @T.J.Crowder 这个for (const p of asynDataStreamOrGenerator) 代码即使我们还没有收到一个值,也会迭代一次吗?我不是专家,只是问。我的假设是它只有在收到一些数据后才会创建一个 Promise。
【解决方案3】:

在问题的 cmets 中,Heretic Monkey suggested 将异步迭代包装在另一个实现超时的迭代中。这是一个非常好的主意,因为使用该包装器的代码可以使用for-await-of

如果你想在超时后继续,它看起来像这样:

async function* timeoutWrapper(asyncIterable, timeoutDuration, timeoutValue) {
    const it = asyncIterable[Symbol.asyncIterator]();
    try {
        while (true) {
            const result = await Promise.race([
                it.next(),
                delay(timeoutDuration, timeoutValue)
            ]);
            if (result === timeoutValue) {
                yield timeoutValue;
            } else if (result.done) {
                break;
            } else {
                yield result.value;
            }
        }
    } finally {
        it.return?.();
    }
}

使用它:

for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT, GOT_TIMEOUT)) {
    if (t === GOT_TIMEOUT) {
        // Didn't get a response in time
        console.log("Timeout");
    } else {
        // Got a response
        // ...some data processing on `result.value`...
        console.log(`Process ${t}`);
    }
}
console.log("Done");

现场示例:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        await delay(200 + i * 100);
        yield i;
    }
}

async function* timeoutWrapper(asyncIterable, timeoutDuration, timeoutValue) {
    const it = asyncIterable[Symbol.asyncIterator]();
    try {
        while (true) {
            const result = await Promise.race([
                it.next(),
                delay(timeoutDuration, timeoutValue)
            ]);
            if (result === timeoutValue) {
                yield timeoutValue;
            } else if (result.done) {
                break;
            } else {
                yield result.value;
            }
        }
    } finally {
        it.return?.();
    }
}

(async () => {
    const asynDataStreamOrGenerator = example();
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT, GOT_TIMEOUT)) {
        if (t === GOT_TIMEOUT) {
            // Didn't get a response in time
            console.log("Timeout");
        } else {
            // Got a response
            // ...some data processing on `result.value`...
            console.log(`Process ${t}`);
        }
    }
    console.log("Done");
})();

如果您不想在超时后继续,您可以让它抛出错误(终止迭代):

async function* timeoutWrapper(asyncIterable, timeoutDuration) {
    const TIMEOUT_VALUE = {};
    const it = asyncIterable[Symbol.asyncIterator]();
    try {
        while (true) {
            const result = await Promise.race([
                it.next(),
                delay(timeoutDuration, TIMEOUT_VALUE)
            ]);
            if (result === TIMEOUT_VALUE) {
                throw new Error(`Timeout after ${timeoutDuration}ms`);
            } else if (result.done) {
                break;
            } else {
                yield result.value;
            }
        }
    } finally {
        it.return?.();
    }
}

使用它:

try {
    for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT)) {
        // Got a response
        console.log(`Process ${t}`);
    }
    console.log("Done");
} catch (e) {
    console.error(e.message);
}

现场示例:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        await delay(200 + i * 100);
        yield i;
    }
}

async function* timeoutWrapper(asyncIterable, timeoutDuration) {
    const TIMEOUT_VALUE = {};
    const it = asyncIterable[Symbol.asyncIterator]();
    try {
        while (true) {
            const result = await Promise.race([
                it.next(),
                delay(timeoutDuration, TIMEOUT_VALUE)
            ]);
            if (result === TIMEOUT_VALUE) {
                throw new Error(`Timeout after ${timeoutDuration}ms`);
            } else if (result.done) {
                break;
            } else {
                yield result.value;
            }
        }
    } finally {
        it.return?.();
    }
}

(async () => {
    const asynDataStreamOrGenerator = example();
    const TIMEOUT = 500; // Milliseconds
    try {
        for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT)) {
            // Got a response
            console.log(`Process ${t}`);
        }
        console.log("Done");
    } catch (e) {
        console.error(e.message);
    }
})();

【讨论】:

  • A let start = Date.now(); before const asynDataStreamOrGenerator = example();console.log("Duration: ",Date.now()-start); after catch 块将显示此异步内部工作时间超过 500 毫秒(在我的系统上约为 1700 毫秒)
  • @AnuragVohra - 上面故意延迟显示正在发生的事情。这没有显着的额外开销。 “异步内部工作超过 500 毫秒(在我的系统上约为 1700 毫秒)” 您在查看总时间吗?仔细阅读代码。上面应用的超时是每次迭代(因为这似乎是您想要的)。如果您想要其他东西,只需稍微修改代码以执行其他操作。您需要的所有技术和信息都已经存在。
猜你喜欢
  • 1970-01-01
  • 2012-12-07
  • 2019-10-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-19
  • 1970-01-01
  • 2021-03-11
相关资源
最近更新 更多