CentOS7安装Airflow
实验环境:
centos7
python3.6
安装配置:
1.看看是否有gcc,没有的话需要进⾏安装:
yum install gcc  (后续安装airflow如果不成功,可以再次执⾏,它会更新包)【这个很重要哦】
2.安装脚本和依赖:
yum install -y python36
yum install -y python36-pip
yum install -y python36-devel
pip3 install paramiko
 安装airflow前,还需要安装依赖的环境:
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel
  安装airflow
pip3 install apache-airflow
安装pymysql
pip3 install pymysql
3.配置环境变量
# vi /etc/profile
  #airflow
  export AIRFLOW_HOME=/software/airflow
# source /etc/profile
初始化
1.初始化数据库表(默认使⽤本地的sqlite数据库):
[root@centos-slave1 centos]# airflow initdb
[2019-09-1922:58:10,546] {__init__.py:51} INFO - Using executor SequentialExecutor
DB: sqlite:////software/airflow/airflow.db
[2019-09-1922:58:11,457] {db.py:369} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/usr/local/lib/python3.6/site-packages/alembic/ddl/sqlite.py:39: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
"Skipping unsupported ALTER for "
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO  [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO  [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO  [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO  [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea
INFO  [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO  [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO  [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO  [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO  [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
Done.
2.查看其⽣成⽂件:
[root@centos-slave1 centos]# cd /software/airflow/
[root@centos-slave1 airflow]# ls
airflow.cfg airflow.db logs unittests.cfg
3.配置MySQL数据库(创建airflow数据库,并创建⽤户和授权,给airflow访问数据库使⽤):
mysql> CREATE DATABASE airflow;
Query OK, 1 row affected (0.00 sec)
mysql> GRANT all privileges on root.* TO 'root'@'localhost'  IDENTIFIED BY 'root';
ERROR 1819 (HY000): Your password does not satisfy the current policy requirements
#这个错误与validate_password_policy的值有关。默认值是1,即MEDIUM,所以刚开始设置的密码必须符合长度,且必须含有数字,⼩写或⼤写字母,特殊字符。 有时候,只是为了⾃⼰测试,不想密码设置得那么复杂,譬如说,我只想设置root的密码为root。
 必须修改两个全局参数:
1)⾸先,修改validate_password_policy参数的值:
mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)
#这样,判断密码的标准就基于密码的长度了。这个由validate_password_length参数来决定。
mysql> select @@validate_password_length;
+----------------------------+
| @@validate_password_length |
+----------------------------+
|                          8 |
+----------------------------+
1 row in set (0.00 sec)
2)修改validate_password_length参数,设置密码仅由密码长度决定。
mysql> set global validate_password_length=1;
Query OK, 0 rows affected (0.00 sec)
mysql> select @@validate_password_length;
+----------------------------+
| @@validate_password_length |
+----------------------------+
|                          4 |
+----------------------------+
1 row in set (0.00 sec)
mysql> GRANT all privileges on root.* TO 'root'@'localhost'  IDENTIFIED BY 'root';
Query OK, 0 rows affected, 1 warning (0.35 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.01 sec)
4.配置airflow使⽤LocalExecutor执⾏器,及使⽤MySQL数据库:
vim airflow/airflow.cfg
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor #executor = SequentialExecutor
executor = LocalExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////data/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://root:123456@localhost:3306/airflow
再次初始化数据库表:
airflow initdb
报错:
Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
 解决⽅案:
更改MySQL配置
vim /etc/myf
[mysqld]
explicit_defaults_for_timestamp=1
或者在数据库中运⾏⼀下语句:
set @@plicit_defaults_for_timestamp=on;
5.查看创建的airflow数据表:
mysql> use airflow;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+-------------------+
| Tables_in_airflow |
+-------------------+
| alembic_version  |
| chart            |
| connection        |
| dag              |
| dag_pickle        |
| dag_run          |
| dag_stats        |
| import_error      |
| job              |
| known_event      |centos7没有vim命令
| known_event_type  |
| log              |
| sla_miss          |
| slot_pool        |
| task_fail        |
| task_instance    |
| users            |
| variable          |
| xcom              |
+-------------------+
19 rows in set (0.00 sec)
服务启动
1.添加airflow-scheduler服务启动脚本:
airflow webserver
airflow scheduler
[root@centos-master airflow]# airflow webserver
[2019-09-2000:09:52,980] {settings.py:213} INFO - figure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=3168 [2019-09-2000:09:53,111] {__init__.py:51} INFO - Using executor LocalExecutor
____________      _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /  _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-2000:09:54,071] {dagbag.py:90} INFO - Filling up the DagBag from /software/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================
[root@centos-master airflow]# airflow scheduler
[2019-09-2000:10:35,983] {settings.py:213} INFO - figure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=3287 [2019-09-2000:10:36,117] {__init__.py:51} INFO - Using executor LocalExecutor
____________      _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /  _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-2000:10:36,542] {scheduler_job.py:1315} INFO - Starting the scheduler
[2019-09-2000:10:36,542] {scheduler_job.py:1323} INFO - Running execute loop for -1 seconds
[2019-09-2000:10:36,544] {scheduler_job.py:1324} INFO - Processing each file at most -1 times
[2019-09-2000:10:36,544] {scheduler_job.py:1327} INFO - Searching for files in /software/airflow/dags
[2019-09-2000:10:36,550] {scheduler_job.py:1329} INFO - There are 20 files in /software/airflow/dags
[2019-09-2000:10:36,769] {scheduler_job.py:1376} INFO - Resetting orphaned tasks for active dag runs
[2019-09-2000:10:36,801] {dag_processing.py:545} INFO - Launched DagFileProcessorManager with pid: 3338
[2019-09-2000:10:36,905] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>
[2019-09-2000:10:36,920] {settings.py:213} INFO - figure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=3338

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。