【发布时间】:2019-05-31 18:38:35
【问题描述】:
我正在使用 Scrapy 库来抓取一个销售汽车的网站。
我正在使用 Python 和 IBM Cloud 函数以及 Scrapy 来实现这一目标。这个想法是每天使用 IBM Cloud 操作来抓取站点,并将每辆车添加到 Postgres 数据库中的 vehicles 表中。那部分工作正常。
vehicles表的结构如下:
第一步是将除数据列(即汽车的详细信息,需要在第二步中添加的那些)之外的所有内容添加到vehicles 表中。效果很好。
第二步每天检查vehicles表中添加的每辆汽车是否仍然存在于网站上(可以删除或出售)。在这一步中,我将每个循环车辆添加到 daily_run_vehicle 表中。 daily_run_vehicle的结构如下:
如果车辆存在,我会抓取详细信息并更新vehicles 表data 列并将handled 列设置为daily_run_vehicle 表中的TRUE。如果它被出售或删除,那么我在daily_run_vehicle 表中增加retries 列。
第二步应该每天运行。
首先,我遍历来自vehicles 的每辆车,其中daily_run_vehicle 表中的handled 列不为TRUE,或者handled 为False,但retries 的数量为5 或更多。对于每次迭代,我都会在 daily_run_vehicle 表中添加一条新记录。
动作为prepare-get-vehicles,代码如下:
import json
import requests
from common.db import add_logs, get_vehicle_references
from common.db import capture_error
from common.common import APIHOST, NAMESPACE, USER_PASS
def execute_reference(reference, reference_url):
action = "prepare-get-vehicle"
url = APIHOST + "/api/v1/namespaces/" + NAMESPACE + "/actions/" + action
response = requests.post(url,
data=json.dumps({"reference": reference, 'reference_url': reference_url}),
params={"blocking": "false"},
auth=(USER_PASS[0], USER_PASS[1]),
headers={"Content-Type": "application/json"})
print(response.json())
def main(params):
try:
for reference in get_vehicle_references():
execute_reference(reference[0], reference[1])
return {"Success": "prepare-get-vehicles action executed successfully."}
except Exception as e:
capture_error(str(e))
return {"Failure": "prepare-get-vehicles action NOT executed successfully."}
get_vehicle_references函数如下:
def get_vehicle_references():
conn = db_connection()
cur = conn.cursor()
try:
s = "SELECT reference, reference_url FROM vehicles v WHERE (NOT EXISTS (select reference from daily_run_vehicle WHERE (handled = %s or (handled = %s and retries >= %s)) AND reference = v.reference))"
cur.execute(s, (True, False, 5))
return cur.fetchall()
except Exception as e:
capture_error(str(e))
conn.close()
prepare-get-vehicle 操作除了向daily_run_vehicle 表添加新记录外,什么都不做,如下所示:
def main(params):
try:
insert_daily_run_vehicle(params.get("reference", None), params.get("reference_url", None))
return {"Success.": "The DB filler (daily_run_vehicle) is successfully executed."}
except Exception as e:
capture_error(str(e))
return {"Failure": "The DB filler (daily_run_vehicle) action NOT executed successfully."}
但问题是vehicles 表有超过 300k 条记录,并且每天都变得越来越大。比 prepare-get-vehicles 操作中的 for 循环在 IBM Cloud 上执行要花费很多时间。有 600 秒超时,但 for 循环需要更多时间。
有什么建议可以解决我的问题,以及如何遍历一个记录超过 300k 的表,并为每条记录添加新行到 daily_run_table?
提前致谢。
【问题讨论】:
-
您确定您的问题是 for 循环而不是查询中的每条记录子选择吗?那些通常真的很慢。可以通过多种方式优化查询......例如
where reference not in (select ...)或者你可以加入daily_run_vehicle然后过滤。您也可以改进prepare-get-vehicle以获取多条记录并分批发送。 -
@swenzel 知道我可以尝试什么吗?目前我不确定。 :-)
-
我将从查询开始。连接到您的 posgres 服务器并尝试您当前的版本。如果它需要永远,你知道这是你的瓶颈。确实有几种方法可以通过使用来自另一个表的数据来过滤一个表的数据,基本上一切都比
WHERE ([NOT] EXISTS ...)更好。如果查询不是问题,请尝试批处理您发送到prepare-get-vehicles的数据。这样,您就没有每条记录的 REST API 开销,而只有每个批次的开销。你甚至可以使用 asyncio/multithreading 伪并行化(这里不需要多处理)。 -
SQL 是无序的使用 LiMIT 没有 ORDER BY 几乎没有意义
标签: python postgresql ibm-cloud-functions