【发布时间】:2021-01-12 10:28:54
【问题描述】:
我在使用 Ubuntu 运行 Win 10 的本地计算机上设置了 Airflow 2.0。我使用 PostgreSQL 作为数据库,使用 CeleryExecutor 和 RabbitMQ 作为 Celery 后端。我创建了一些 DAG,每个 DAG 通过 SSH 隧道连接到 Redshift 数据库并执行 SQL 命令。当我手动触发或通过调度程序运行时,每个 DAG 运行顺利。
但是,当我为这些 DAG 同时开始运行设置计划时遇到错误。例如,如果 DAG1 和 DAG2 在上午 8:00 开始运行,这 2 个 dag 将失败并显示以下错误:
psycopg2.OperationalError: 服务器意外关闭连接 这可能意味着服务器异常终止 在处理请求之前或期间。
如果我将这 2 个 dag 设置在不同的时间开始,一切都会顺利进行。另外,如果我将 2 个 dag 组合成 1 个 dag 和 2 个任务,这个组合的 dag 运行良好。
这是我的 DAG 代码,每个 dag 都相同(只是 SQL 命令不同):
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
import time
dag = DAG('test', description='Simple test tutorial DAG',
schedule_interval= None,
start_date=datetime(2021, 1, 6), tags = ['test'])
def select_from_tunnel_db():
# Open SSH tunnel
ssh_hook = SSHHook(ssh_conn_id='dw_ssh')
tunnel = ssh_hook.get_tunnel(remote_port = 5439, remote_host='**.**.**.**', local_port=5439)
tunnel.start()
# Connect to DB and run query
pg_hook = PostgresHook(
postgres_conn_id='dw', # NOTE: host='localhost'
schema='test'
)
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('''
insert into abc values (1, 'a')
''')
cursor.close()
conn.commit()
conn.close()
python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
)
【问题讨论】:
标签: ssh amazon-redshift airflow psycopg2