Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
611 views
in Technique[技术] by (71.8m points)

amazon redshift - Airflow DAGs with SSH connection can't start running in the same time by schedule

I setup Airflow 2.0 on a local machine running Win 10 using Ubuntu. I use PostgreSQL as database, CeleryExecutor and RabbitMQ as Celery backend. I created some DAGs, every DAG connects to Redshift database through a SSH tunnel and execute a SQL command. Each DAG runs smoothly when I trigger manually or run via scheduler.

However, I encounter an error when I set schedule for these DAGs starting running at the same time. For example, if DAG1 and DAG2 start running at 8:00 AM, these 2 dags will fail and show below error:

psycopg2.OperationalError: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request.

If I set these 2 dags starting at different time, everything runs smoothly. Also, if I combine 2 dags into 1 dag with 2 tasks, this combined dag runs well.

This is my DAG code, it's the same for every dag (just different SQL commands):

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
import time

dag = DAG('test', description='Simple test tutorial DAG',
          schedule_interval= None,
          start_date=datetime(2021, 1, 6), tags = ['test'])

def select_from_tunnel_db():
    # Open SSH tunnel
    ssh_hook = SSHHook(ssh_conn_id='dw_ssh')
    tunnel = ssh_hook.get_tunnel(remote_port = 5439, remote_host='**.**.**.**', local_port=5439)
    tunnel.start()

    # Connect to DB and run query
    pg_hook = PostgresHook(
    postgres_conn_id='dw', # NOTE: host='localhost'
    schema='test'
    )
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    cursor.execute('''
    insert into abc values (1, 'a')
    ''')
    cursor.close()
    conn.commit()
    conn.close()


python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...