【发布时间】:2019-07-19 23:06:01
【问题描述】:
是否可以通过 API Gateway 端点调用 AWS Step 函数并监听响应(直到工作流完成并从结束步骤返回结果)?
目前,我能够从文档中发现步进函数本质上是异步的,并且最后有一个最终回调。我需要 API 调用响应从 step 函数流中获取最终结果而无需轮询。
【问题讨论】:
标签: amazon-web-services aws-lambda aws-step-functions
是否可以通过 API Gateway 端点调用 AWS Step 函数并监听响应(直到工作流完成并从结束步骤返回结果)?
目前,我能够从文档中发现步进函数本质上是异步的,并且最后有一个最终回调。我需要 API 调用响应从 step 函数流中获取最终结果而无需轮询。
【问题讨论】:
标签: amazon-web-services aws-lambda aws-step-functions
我想这是不可能的。
它是异步的,还有API Gateway Timeout
您不需要通过轮询来获取结果,您可以结合 Lambda、Step Functions、SNS 和 Websockets 来实时获取结果。
如果您想向客户端(Web 浏览器)推送通知,并且不想管理自己的基础架构(扩展套接字服务器等),则可以使用 AWS IOT。本教程可能会帮助您入门:
如果您只需要将结果发送到后端(例如 Web 服务端点),SNS 应该没问题。
【讨论】:
这可能会起作用:创建一个 HTTP“网关”服务器,将请求分派到您的 Steps 工作流,然后保留请求对象,直到它收到允许它发送响应的通知。
网关服务器需要将关联 ID 添加到有效负载,步骤工作流需要执行此操作。
接收通知的一种可能的方式是使用 SQS。
一些模糊的 Node/Express 风格的伪代码:
const cache = new Cache(); // pick your favourite cache library
const gatewayId = guid(); // this lets us scale horizontally
const subscription = subscribeToQueue({
filter: { gatewayId },
topic: topicName,
});
httpServer.post( (req, res) => {
const correlationId = guid();
cache.add(correlationId, res);
submitToStepWorkflow(gatewayId, correlationId, req);
});
subscription.onNewMessage( message => {
const req = cache.pop(message.attributes.correlationId);
req.send(extractResponse(message));
req.end();
});
(这里假设的队列读取 API 与 aws-sdk 的 SQS API 完全不同,但你懂的)
因此,在您的步骤工作流结束时,您只需向 SQS(可能通过 SNS)发布一条消息,以确保保留 correlationId 和 gatewayId。
为了处理失败,并避免缓存被孤立的请求对象填充,您可能希望在缓存上设置过期时间,并处理过期事件:
cache.onExpiry( (key, req) => {
req.status(502);
req.send(gatewayTimeoutMessage());
req.end();
}
当然,整个方法只适用于您希望在适合浏览器和代理超时的时间内正常完成的工作流。
【讨论】: