【问题标题】:Multiple parallel AWS Lambda invocations多个并行 AWS Lambda 调用
【发布时间】:2019-02-26 07:26:13
【问题描述】:

我正在尝试使用 python 3.7.2 和 aiobotocore 包执行多个 AWS Lambda 调用。这是我的代码。

import asyncio
import aiobotocore


async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        return await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)


def generate_invocations(payloads, session):
    for payload in payloads:
        yield invoke(payload, session)


def invoke_all(payloads):
    loop = asyncio.get_event_loop()

    async def wrapped():
        session = aiobotocore.get_session(loop=loop)
        invocations = generate_invocations(payloads, session)
        return await asyncio.gather(*invocations)

    return loop.run_until_complete(wrapped())


def main():
    payloads_list = []  # MY PAYLOADS LIST 
    lambda_responses = invoke_all(payloads_list)
    print(lambda_responses)


if __name__ == '__main__':
    main()

代码运行得非常快(使用 boto3 lambda 客户端调用,10 个有效负载大约需要 1 秒而不是 15 个),但我有两个问题:

1) lambda_responses 中的元素包括 'Payload' 键,其值为 aiobotocore.response.StreamingBody 类型。当我尝试 value.read() 时,我收到“coroutine object StreamingBody.read”,我认为我的代码中有一些问题。我可以通过“json.loads(json.loads(r['Payload']._buffer.pop())['body'])”接收到所需的响应,但是获取它的正确方法是什么。

2) 在极少数情况下,响应之一中的“有效负载”具有空缓冲区。如何确保 invoke_all 函数返回非空响应? aiobotocore的用法正确吗?

我是 python 3 和异步功能的新手。灵感来自 aiobotocore 文档和 Mathew Marcus blog 的示例。

谢谢!

【问题讨论】:

    标签: amazon-web-services aws-lambda python-asyncio python-3.7


    【解决方案1】:
    1. lambda_responses 中的元素包括“Payload”键,其值为 aiobotocore.response.StreamingBody 类型。当我尝试 value.read() 时,我收到“coroutine object StreamingBody.read”

    这意味着 read() 协程应该被等待,您应该在事件循环中执行此操作。例如,您可以更改 invoke 协程来读取响应:

    async def invoke(payload, session):
        async with session.create_client('lambda', region_name='us-east-1') as client:
            resp = await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
            payload = await resp['Payload'].read()
            return payload  # or assemble a dict with relevant parts
    
    1. 在极少数情况下,响应之一中的“有效负载”具有空缓冲区。

    这可能是因为您在实际读取内容之前访问了缓冲区。在某些情况下,信息很快就会到达,无论如何您都可以在内部缓冲区中找到它,但有时您必须等待它。使用 read() 等公共方法可确保您正确使用 API。另一方面,_buffer 属性以下划线开头,表示它是一个实现细节,不能直接访问。

    【讨论】:

    • 谢谢!效果很好。
    猜你喜欢
    • 2019-07-26
    • 2023-03-17
    • 2016-01-29
    • 2017-07-01
    • 2018-09-30
    • 1970-01-01
    • 2022-01-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多