admin 管理员组

文章数量: 887032

AirFlow

Airflow 核心概念

  • DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
  • Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
  • Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
  • Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括"running", “success”, “failed”, “skipped”, "up for retry"等。
  • Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task1了。
    通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)

官方文档
/

安装

pip3 install apache-airflow[all]

上述命令为安装airflow的全部包,可能会出现报错。下面是几种解决方法。

Command “python setup.py egg_info” failed with error code 1

FIX:

	python3 -m pip install -U pippython3 -m pip install -U setuptools

OSError: mysql_config not found

FIX:

	yum install mysql-devel gcc gcc-devel python-devel

sasl/sasl.h: No such file or directory

FIX:

	yum -y install cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib

fatal error: sql.h: No such file or directory

	yum install unixODBC-devel

fatal error: lber.h: No such file or directory

	yum install python-devel openldap-devel

初始化

airflow db init
airflow users create --username admin --firstname YongHeng --lastname Wang --role Admin --email your_email@email.com
password: 123456

遇到的问题

error: sqlite C library version too old

	yum updateyum install sqlite
从,在本地制作并安装。
1)下载源码
[root@stg-airflow001 ~]$ wget .tar.gz2) 编译
[root@stg-airflow001 ~]$ tar zxvf sqlite-autoconf-3290000.tar.gz 
[root@stg-airflow001 ~]$ cd sqlite-autoconf-3290000/
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ ./configure --prefix=/usr/local
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ make && make install3)替换系统低版本 sqlite3
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ cd 
[root@stg-airflow001 ~]$ mv /usr/bin/sqlite3  /usr/bin/sqlite3_old
[root@stg-airflow001 ~]$ ln -s /usr/local/bin/sqlite3   /usr/bin/sqlite3
[root@stg-airflow001 ~$ echo "/usr/local/lib" > /etc/ld.so.conf.d/sqlite3.conf
[root@stg-airflow001 ~]$ ldconfig
[root@stg-airflow001 ~]$ sqlite3 -version
3.29.0 2019-07-10 17:32:03 fc82b73eaac8b36950e527f12c4b5dc1e147e6f4ad2217ae43ad82882a88bfa6

启动web服务

airflow webserver --port 8282

启动调度器

airflow scheduler

A few commands

# initialize the database tables
airflow db init# print the list of active DAGs
airflow dags list# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree# 单独测试执行wyh_test这个dag下的print_date和hello_task这两个task, 后面跟着的是逻辑时间
airflow tasks test wyh_test print_date 2020-06-01
airflow tasks test wyh_test hello_task 2020-06-01
# 该命令在本地运行任务实例,将其日志输出到stdout(在屏幕上),不打扰依赖项,
# 并且不与数据库通信状态(运行,成功,失败等)。
# 它仅仅只是测试单个任务实例。

编写自己的Dag

找到 airflow/dags
增加一个TestDag.py

from datetime import datetime, timedelta
from textwrap import dedent# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {'owner': 'wyh','depends_on_past': False,'email': ['your@email.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016, 1, 1),# 'wait_for_downstream': False,# 'dag': dag,# 'sla': timedelta(hours=2),# 'execution_timeout': timedelta(seconds=300),# 'on_failure_callback': some_function,# 'on_success_callback': some_other_function,# 'on_retry_callback': another_function,# 'sla_miss_callback': yet_another_function,# 'trigger_rule': 'all_success'
}
with DAG('wyh_test',default_args=default_args,description='A simple DAG',schedule_interval=timedelta(days=1),start_date=datetime(2021, 11, 1),catchup=False,tags=['test'],
) as dag:# t1, t2 and t3 are examples of tasks created by instantiating operatorst1 = BashOperator(task_id='print_date',bash_command='date',)t2 = BashOperator(task_id='sleep',depends_on_past=False,bash_command='sleep 5',retries=3,)t1.doc_md = dedent("""\#### Task DocumentationYou can document your task using the attributes `doc_md` (markdown),`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which getsrendered in the UI's Task Instance Details page.![img](.png)""")dag.doc_md = __doc__def print_hello():print("Hello_Airflow")return 'Hello world!'t3 = PythonOperator(task_id='hello_task',python_callable=print_hello,)t1 >> [t2, t3]

用 python3 TestDag.py 验证下有没有错误
接着airflow dags list
就会看到自己定义的dag_id filepath owner paused
刷新页面即可看到。

更改Sqlite -> MySQL

CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
CREATE USER 'airflow' IDENTIFIED BY 'airflow';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow';
flush privileges;$AIRFLOW_HOME/airflow.cfg文件中修改
# 配置mysql
sql_alchemy_conn = mysql+pymysql://airflow:airflow@127.0.0.1/airflow
# 配置支持并行性的工作器
executor = LocalExecutor
# 若之前使用sqllite初始化过,需要 
# 重置数据库
airflow db reset
# 初始化数据库
airflow db init

注意修改MySQL的配置否则会报错如下:
Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql

mysql -u root
show global variables like '%timestamp%';
set global explicit_defaults_for_timestamp =TRUE;
exit;
service mysql restart

接着重新注册角色用户
启动webserverschedule

配置CeleryExecutor

要先配置好redis

vim airflow.cfg
executor = CeleryExecutor
broker_url = redis://10.11.ip.ip:6379/0
result_backend = db+mysql://airflow:airflow@10.11.ip.ip/airflow
之后启动worker
airflow celery worker -D

集群部署

其实是利用Celery把任务发给集群中其他的机器,搞几台机器设置成Worker角色。
相同配置的airflow安装到其他机器上,配置文件更要一致。
再执行airflow celery worker就激活了一个Worker.

本文标签: AirFlow