【问题标题】:Using Python Multiprocessing Queue Inside AWS Lambda Function在 AWS Lambda 函数中使用 Python 多处理队列
【发布时间】:2020-04-25 12:39:46
【问题描述】:

我有一些 python 可以创建多个进程来更快地完成任务。当我创建这些进程时,我会进入一个队列。在我使用 queue.put(data) 的进程内部,我可以在进程之外检索数据。它在我的本地机器上运行良好,但是当我将 zip 上传到 AWS Lambda 函数(Python 3.8)时,它说 Queue() 函数尚未实现。当我简单地取出队列时,该项目在 AWS Lambda 中运行良好功能,所以我知道这是我目前唯一的挂机。

我确保使用“pip install multiprocess -t ./”和“pip install boto3 -t ./”将多处理包直接安装到我的python项目中。

我特别是 python 和 AWS 的新手,但我最近遇到的研究可能将我们指向 SQS。

阅读这些 SQS docs 我不确定这是否正是我想要的。

这是我在本地运行但不能在 AWS 上运行的 Lambda 中运行的代码。重要部分见*:

from multiprocessing import Process, Queue
from craigslist import CraigslistForSale
import time
import math

sitesHold = ["sfbay", "seattle", "newyork", "(many more)..." ]

results = []


def f(sites, category, search_keys, queue):
    local_results = []
    for site in sites:
        cl_fs = CraigslistForSale(site=site, category=category, filters={'query': search_keys})
        for result in cl_fs.get_results(sort_by='newest'):
            local_results.append(result)
    if len(local_results) > 0:
        print(local_results)
    queue.put(local_results) # Putting data *********************************


def scan_handler(event, context):
    started_at = time.monotonic()
    queue = Queue()
    print("Running...")
    amount_of_lists = int(event['amountOfLists'])
    list_length = int(len(sitesHold) / amount_of_lists)
    extra_lists = math.ceil((len(sitesHold) - (amount_of_lists * list_length)) / list_length)
    site_list = []
    list_creator_counter = 0
    site_counter = 0
    for i in range(amount_of_lists + extra_lists):
        site_list.append(sitesHold[list_creator_counter:list_creator_counter + list_length])
        list_creator_counter += list_length
    processes = []
    for i in range(len(site_list)):
        site_counter = site_counter + len(site_list[i])
        processes.append(Process(target=f, args=(site_list[i], event['category'], event['searchQuery'], queue,))) # Creating processes and creating queues ***************************

    for process in processes:
        process.start() # Starting processes ***********************

    for process in processes:
        listings = queue.get() # Getting from queue ****************************
        if len(listings) > 0:
            for listing in listings:
                results.append(listing)

    print(f"Results: {results}")

    for process in processes:
        process.join()

    total_time_took = time.monotonic() - started_at
    print(f"Sites processed: {site_counter}")
    print(f'Took {total_time_took} seconds long')

这是 Lambda 函数给我的错误:

{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "stackTrace": [
    "  File \"/var/task/main.py\", line 90, in scan_handler\n    queue = Queue()\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 103, in Queue\n    return Queue(maxsize, ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/queues.py\", line 42, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 162, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock(\n"
  ]
}

Queue() 是否在 AWS Lambda 中工作?实现我的目标的最佳方式是什么?

【问题讨论】:

    标签: python aws-lambda multiprocessing queue


    【解决方案1】:

    【讨论】:

    • 看来我要潜入 Pipes 了!
    • 我遇到了与multiprocessing.pool 相同的问题。令人难以置信的是,这不受 AWS 支持。
    【解决方案2】:

    来自 AWS docs

    如果您使用 Python 开发 Lambda 函数,则不会出现并行性 默认。 Lambda 支持 Python 2.7 和 Python 3.6,这两者 具有多处理和线程模块。

    Python 自带的多处理模块可以让你运行 多个进程并行。由于 Lambda 执行 没有/dev/shm(进程共享内存)支持的环境, 你不能使用 multiprocessing.Queue 或 multiprocessing.Pool。

    另一方面,您可以使用 multiprocessing.Pipe 代替 multiprocessing.Queue 来完成你需要的,而不需要任何东西 Lambda 函数执行过程中的错误。

    【讨论】:

      猜你喜欢
      • 2018-09-10
      • 1970-01-01
      • 1970-01-01
      • 2013-10-02
      • 2021-10-03
      • 1970-01-01
      • 2021-09-19
      • 2021-05-01
      • 2019-01-27
      相关资源
      最近更新 更多