【问题标题】:How do I invoke and wait for 1,000 AWS Lambdas running in parallel from Python?如何调用并等待从 Python 并行运行的 1,000 个 AWS Lambda?
【发布时间】:2021-07-27 00:40:50
【问题描述】:

当我使用第三方 aiobotocore 时,它​​可以达到 NUM_WORKERS=500,如果我想达到 1000,我会收到此错误:

    r, w, _ = self._select(self._readers, self._writers, [], timeout)
  File ".....\lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
ValueError: too many file descriptors in select()

如果有办法并行执行1000?

来源:


import os, sys, time, json
import asyncio
from itertools import chain
from typing import List
import logging
from functools import partial
from pprint import pprint 



# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS=500

async def execute_lambda( lambda_name: str, key: str, client):
    # Get json content from s3 object
    if 1:
        name=lambda_name
        response = await client.invoke(
            InvocationType='RequestResponse',
            FunctionName=name,
            LogType='Tail',
            Payload=json.dumps({
                'exec_id':key,
                })
            )
    out=[]
    async for event in response['Payload']:
        out.append(event.decode())

    #await asyncio.sleep(1)
    return out


async def submit(lambda_name: str) -> List[dict]:
    """
    Returns list of AWS Lambda outputs executed in parallel

    :param name: name of lambda function
    :return: list of lambda returns
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    #client = boto3.client('lambda', region_name='us-west-2')
    async with session.create_client('lambda', region_name='us-west-2', config=config) as client:
        worker_co = partial(execute_lambda, lambda_name)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 'lambda_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            for x in range(_NUM_WORKERS):
                contents.append(await work_pool.push(x, client))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))



def main(name, files):
    s = time.perf_counter()
    _loop = asyncio.get_event_loop()
    _result = _loop.run_until_complete(submit(name))
    pprint(_result)
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

Lambda 函数:

import time
def lambda_handler(event, context):
    time.sleep(10)
    return {'code':0, 'exec_id':event['exec_id']}

结果:

 '{"code": 0, "exec_id": 0}',
 '{"code": 0, "exec_id": 1}',
 '{"code": 0, "exec_id": 2}',
 '{"code": 0, "exec_id": 3}',
...
 '{"code": 0, "exec_id": 496}',
 '{"code": 0, "exec_id": 497}',
 '{"code": 0, "exec_id": 498}',
 '{"code": 0, "exec_id": 499}']
my_cli_script.py executed in 14.56 seconds.

【问题讨论】:

  • 你的意思是异步的吗?
  • @Evert,不,我想在我的本地 cli 脚本中处理结果
  • 异步通常表示“并行”,同步表示“一个接一个”
  • @Evert 我必须是 Lambda 术语中的“同步” - 意味着我想等待 Lambda 结果与“异步”当我不这样做时。但是每个 exec 在 Python 术语中都必须是异步的。
  • 我编辑了你的问题标题以澄清一点。

标签: python-3.x amazon-web-services aws-lambda boto3 python-asyncio


【解决方案1】:

针对 cme​​ts here 中提出的问题,这是我用来并行启动 100 个 lambda 实例的代码:


import boto3
import json
from concurrent.futures import ThreadPoolExecutor

# AWS credentials are exported in my env variables
# so region and account-id are fetched from there
lambda_ = boto3.client('lambda')

def invoke_lambda(payload):
    payload = {'body': json.dumps(payload)}

    response = lambda_.invoke(
        FunctionName='my-func',
        # I need to receive a response back from lambda
        # so I use sync invocation
        InvocationType='RequestResponse',
        LogType='Tail',
        Payload=json.dumps(payload)
    )

    res_payload = response.get('Payload').read()
    body = json.loads(res_payload).get('body')
    
    return body


MAX_WORKERS = 100  # how many lambdas you want to spin up concurrently

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    result = list(executor.map(invoke_lambda, data))
# data is a list of dicts, each element is a single "payload"

最后两句:

  1. 十几毫秒生成 100 个并发 lambda 可能有点夸张。出于某种原因,如果我在 cloudwatch 指标中设置更高的粒度,它不会绘制任何内容,所以我不能确定它到底花了多长时间。为了安全线程,我会说在 2 秒内。
  2. 到目前为止,这段代码只在我的本地环境中运行过。它非常普通,所以我不明白为什么它不能在其他地方工作(例如,另一个父 lambda),但作为警告,我还没有在线测试它。

【讨论】:

    【解决方案2】:

    找到这篇文章:python-asyncio-aiohttp-valueerror-too-many-file-descriptors-in-select-on-win

    更改后它开始工作

    # 1000 is a soft concurrency limit
    _NUM_WORKERS=990  
    
    def main(name, files):
        if sys.platform == 'win32':
            _loop = asyncio.ProactorEventLoop()
            asyncio.set_event_loop(_loop)
            _result = _loop.run_until_complete(submit(name))
        else:
            _loop = asyncio.get_event_loop()
            _result = _loop.run_until_complete(submit(name))
        process = psutil.Process(os.getpid())
        print(f"{__file__}: memory[{process.memory_info().rss/1024:7,.2f}], elapsed {elapsed:0.2f} sec")
    

    结果:

    ...
     '{"code": 0, "exec_id": 986}',
     '{"code": 0, "exec_id": 987}',
     '{"code": 0, "exec_id": 988}',
     '{"code": 0, "exec_id": 989}']
    my_cli_script.py: memory[201,064.00], elapsed 16.53 sec
    

    【讨论】:

    • 哟,我想知道,aiobotocore 是并行 lambda 调用所必需的吗? asyncio 不能单独处理这个吗?我实际上是在尝试完全按照您正在做的事情做,但是除了 AWS lambda 中已经内置的东西之外,没有使用外部包,但我似乎无法做到。我的处决仍然是连续的。
    • 工程师说这样使用 Lambda 是一个糟糕的设计。 EKS 是要走的路。至于您的问题-我不确定您为什么要限制-如果您在 lambda 中运行此代码-您总是可以将第三方的东西煮成一层。如果我在 EC2 上运行,我不明白为什么你不能使用第三方库
    • 没有具体原因,我目前正在为我的依赖项使用 ECR,并且我正在尝试将图像尺寸保持在尽可能小的范围内(因为我使用的是 pytorch,所以它已经很大了)。最终我设法使用concurrent.futures 实现了我想要的,所以不需要 aiobotocore。没用过EKS,怎么样?我选择 lambda 的原因是因为我喜欢易于使用无服务器架构(例如,使用 sls 框架)和按使用付费的定价。由于我正在部署的模型使用量非常少,因此我不想为持续运行的服务器付费。
    • 对于concurrent.futures,您无法使用单个 s3client 连接。
    • 对不起,s3 连接与此有什么关系?我设法使用 concurrent.futures 使用单个 client 对象 (client = boto3.client('lambda') 启动了数百个并发 lambda 实例。我真的不确定你的句子是什么意思。
    猜你喜欢
    • 1970-01-01
    • 2022-10-14
    • 1970-01-01
    • 2019-10-21
    • 2015-08-21
    • 2018-11-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多