ETL是数据分析不可缺少的一环,对于ETL的调度管理也是至关重要的。
一个好的ETL调度平台应该具备以下优点:
- 作业集成管理
- 运行监控(WEB监控)
- 异常警告(邮件及短信)
- 日志可查看
- 后台调度
- 系统配置
- 作业执行情况分析界面
探索中发现了ETL调度平台——airflow。airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统。
(一)安装
1.安装airflow
pip install apache-airflow
2.airflow相关组件
pip install apache-airflow[all]
可选择性安装相关数据库连接的驱动组件,这里我选择了第一项[all],若是分布式数据库,airflow检测到不符合条件,该组件会安装失败,不影响其他组件的安装。
3.初始化数据表
airflow initdb
3.问题及解决方式:
(1)环境变量配置报错
pip install apache-airflow
添加环境变量:
[[email protected] ~]$ SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow==1.10.0
(2)缺少gcc
若是安装中出现如下的报错,证明linux缺少gcc
解决方式:
sudo yum install gcc
- 部署配置
1.启动webserver守护进程:
airflow webserver
2.启动调度器,默认每300s检测一下/home/centos/airflow/dags下的文件,
airflow scheduler
将下面的代码放到/home/centos/airflow/dags目录下,调度器会自动检测该目录,并执行。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
查看依赖关系:
自(2015, 6, 1)开始执行任务:
从log可以查看执行结果:
templated_command:
:通过循环5次,执行了5次bash命令,
echo "{{ macros.ds_add(ds, 7)}}" 当前时间加7day。
还可以传递参数。