【发布时间】:2018-12-14 23:10:38
【问题描述】:
这可能是一种特殊情况:
我想从队列 (AWS SQS) 中读取数据,这是通过调用等待几秒钟的消息然后解析来完成的 - 只要您想处理它,就可以在循环中一次又一次地调用队列(它每次检查一个标志)。
这意味着我有一个 consume 函数,只要应用程序处于活动状态或队列未标记,它就会运行。
我还有一个用于订阅队列的subscribe 函数——它应该在知道消费者能够连接到队列后立即解决。即使这个函数调用消费者,消费者一直在运行,直到队列被取消标记才会返回。
这给了我一些挑战——你有什么技巧可以用现代 JS 和 async/await 承诺来解决这个问题吗?我记住这段代码是在 React Web 应用程序中运行的,而不是在 node.js 中。
我基本上只是希望await subscribe(QUEUE) 调用(来自 GUI)在确定它可以从该队列中读取后立即解决。但如果不能,我希望它抛出一个错误,该错误会传播到订阅调用的来源——这意味着我必须await consume(QUEUE),对吗?
更新: 添加了一些未经测试的草稿代码(如果我没有采取正确的方法,我不想花更多时间让它工作) - 我考虑向消费函数发送成功和失败回调,以便它可以报告成功一旦它从队列中获得第一个有效(但可能是空的)响应,这使得它将队列 url 存储为订阅 - 如果队列轮询失败,则取消订阅。
由于我设置了几个队列消费者,它们不应该阻塞任何东西,而只是在后台工作
let subscribedQueueURLs = []
async function consumeQueue(
url: QueueURL,
success: () => mixed,
failure: (error: Error) => mixed
) {
const sqs = new AWS.SQS()
const params = {
QueueUrl: url,
WaitTimeSeconds: 20,
}
try {
do {
// eslint-disable-next-line no-await-in-loop
const receivedData = await sqs.receiveMessage(params).promise()
if (!subscribedQueueURLs.includes(url)) {
success()
}
// eslint-disable-next-line no-restricted-syntax
for (const message of receivedData.Messages) {
console.log({ message })
// eslint-disable-next-line no-await-in-loop
eventHandler && (await eventHandler.message(message, url))
const deleteParams = {
QueueUrl: url,
ReceiptHandle: message.ReceiptHandle,
}
// eslint-disable-next-line no-await-in-loop
const deleteResult = await sqs.deleteMessage(deleteParams).promise()
console.log({ deleteResult })
}
} while (subscribedQueueURLs.includes(url))
} catch (error) {
failure(error)
}
}
export const subscribe = async (entityType: EntityType, entityId: EntityId) => {
const url = generateQueueURL(entityType, entityId)
consumeQueue(
url,
() => {
subscribedQueueURLs.push(url)
eventHandler && eventHandler.subscribe(url)
},
error => {
console.error(error)
unsubscribe(entityType, entityId)
}
)
}
【问题讨论】:
-
处理它的“现代”方法是使用异步迭代器 -
yield任务,因为它们来自队列并使用for await表达式来使用它们。也就是说 - 它们不是唯一正确或必然是最好的方法 - 存在权衡。 -
您可能需要异步设计,而不是轮询设计。例如,编写一个
readNextMessage()异步操作,立即返回一个promise(无需等待),然后在消息到达时解析promise。在处理了已解决的承诺后,调用者可以再次调用readNextMessage()。或者将整个设计更改为事件驱动设计,您只需在消息到达时触发一个事件(可以有一个或多个侦听器)。 -
@jfr 我已经将消息发送到事件处理程序,该事件处理程序对它们进行解码并相应地更新应用程序状态。 do - while 我等待 sqs.receiveMessage(params).promise() 和我将其包装在 readNextMessage() 之间有什么区别?
-
太抽象了,我无法在没有看到代码的情况下进一步评论。我必须看到实际的代码。你听起来好像你有一个投票设计,这在 node.js 中通常不是一个受欢迎的设计模式。我建议您停止轮询,并在消息可用时使用异步通知 - 无需轮询。
-
@jfriend00 我现在添加了一些代码。
标签: javascript promise amazon-sqs ecmascript-2017