air连接mysql_Airflow笔记-MySqlOperator使⽤及conn配置⽬录[-]
1. 依赖
MySqlOperator 的数据库交互通过 MySQLdb 模块来实现, 使⽤前需要安装相关依赖:
pip install apache-airflow[mysql]
2. 使⽤
使⽤ MySqlOperator 执⾏sql任务的⼀个简单例⼦:
from airflow import DAG
from airflow.utils.dates import days_ago
from sql_operator import MySqlOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['j_hao104@163'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
'MySqlOperatorExample',
default_args=default_args,
description='MySqlOperatorExample',
schedule_interval="30 18 * * *")
insert_sql = "insert into log SELECT * FROM temp_log"
task = MySqlOperator(
task_id='select_sql',
sql=insert_sql,
mysql_conn_id='mysql_conn',
autocommit=True,
dag=dag)
3. 参数
MySqlOperator 接收⼏个参数:
sql: 待执⾏的sql语句;
mysql_conn_id: mysql数据库配置ID, Airflow的conn配置有两种配置⽅式,⼀是通过os.environ来配置环境变量实现,⼆是通过web界⾯配置到代码中,具体的配置⽅法会在下⽂描述;
parameters: 相当于MySQLdb库的execute ⽅法的第⼆参数,⽐如: ute('insert into UserInfo values(%s,%s)',('alex',18));
autocommit: ⾃动执⾏ commit;
database: ⽤于覆盖conn配置中的数据库名称, 这样⽅便于连接统⼀个mysql的不同数据库;
4. conn配置
建议conn配置通过web界⾯来配置,这样不⽤硬编码到代码中,关于配置中的各个参数:
Conn Id: 对应 MySqlOperator 中的 mysql_conn_id;
Host: 数据库IP地址;schedule用法及搭配
Schema: 库名, 可以被MySqlOperator中的database重写;
Login: 登录⽤户名;
Password: 登录密码;
Port: 数据库端⼝;
Extra: t的额外参数,包含charset、cursor、ssl、local_infile
其中cursor的值的对应关系为: sscursor —> MySQLdb.cursors.SSCursor; dictcursor —> MySQLdb.cursors.DictCursor; ssdictcursor —> MySQLdb.cursors.SSDictCursor
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论