Python并⾏分布式框架Celery
Python 并⾏分布式框架 Celery
Celery 官⽹:
Celery 官⽅⽂档英⽂版:
Celery 官⽅⽂档中⽂版:
celery配置:
参考:
分布式队列神器 Celery:
celery最佳实践:
Celery 分布式任务队列快速⼊门:
异步任务神器 Celery 快速⼊门教程:
定时任务管理之python篇celery使⽤:
异步任务神器 Celery:
celery任务调度框架实践:
Celery-4.1 ⽤户指南: Monitoring and Management Guide:
Celery安装及使⽤:
Celery学习笔记(⼀):
Celery 简介
除了redis,还可以使⽤另外⼀个神器---Celery。Celery是⼀个异步任务的调度⼯具。
Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表⽰其是异步操作,即存在⼀个产⽣任务提出需求的⼯头,和⼀等着被分配⼯作的码农。
在 Python 中定义 Celery 的时候,我们要引⼊ Broker,中⽂翻译过来就是“中间⼈”的意思,在这⾥ Broker 起到⼀个中间⼈的⾓⾊。在⼯头提出任务的时候,把所有的任务放到 Broker ⾥⾯,在 Broker
的另外⼀头,⼀码农等着取出⼀个个任务准备着⼿做。
这种模式注定了整个系统会是个开环系统,⼯头对于码农们把任务做的怎样是不知情的。所以我们要引⼊ Backend 来保存每次任务的结果。这个 Backend 有点像我们的Broker,也是存储任务的信息⽤的,只不过这⾥存的是那些任务的返回结果。我们可以选择只让错误执⾏的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执⾏失败了。
Celery(芹菜)是⼀个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度⽀持也很好。Celery⽤于⽣产系统每天处理数以百万计的任务。Celery是⽤Python编写的,但该协议可以在任何语⾔实现。它也可以与其他语⾔通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限⽀持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使⽤SQLAlchemy的或Django的 ORM)。Celery是易于集成Django, Pylons and Flask,使⽤ django-celery, celery-pylons and Flask-Celery 附加包即可。
Celery 介绍
在Celery中⼏个基本的概念,需要先了解下,不然不知道为什么要安装下⾯的东西。概念:Broker、Backend。
什么是broker?
broker是⼀个消息传输的中间件,可以理解为⼀个邮箱。每当应⽤程序调⽤celery的异步任务的时候,会向broker传递消息,⽽后celery的worker将会取到消息,进⾏对于的程序执⾏。好吧,这个邮箱可以看成是⼀个消息队列。其中Broker的中⽂意思是经纪⼈,其实就是⼀开始说的消息队列,⽤来发送和接受消息。这个Broker有⼏个⽅案可供选择:RabbitMQ (消息队列),(缓存数据库),(不推荐),等等
什么是backend?
通常程序发送的消息,发完就完了,可能都不知道对⽅时候接受了。为此,celery实现了⼀个backend,⽤于存储这些消息以及celery执⾏的⼀些消息和结果。Backend是在Celery的配置中的⼀个配置项 CELERY_RESULT_BACKEND ,作⽤是保存结果和状态,如果你需要跟踪任务的状态,那么需要设置这⼀项,可以是Database backend,也可以是Cache backend,具体可以参考这⾥:。
对于 brokers,官⽅推荐是 rabbitmq 和 redis,⾄于 backend,就是数据库。为了简单可以都使⽤ redis。
我⾃⼰演⽰使⽤RabbitMQ作为Broker,⽤作为backend。
来⼀张图,这是在⽹上最多的⼀张Celery的图了,确实描述的⾮常好
Celery的由三部分组成,消息中间件(message broker),任务执⾏单元(worker)和任务执⾏结果存储(task result store)组成。
消息中间件
Celery本⾝不提供消息服务,但是可以⽅便的和第三⽅提供的消息中间件集成。包括,RabbitMQ, , (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任务执⾏单元
Worker是Celery提供的任务执⾏的单元,worker并发的运⾏在分布式的系统节点中。
任务结果存储
Task result store⽤来存储Worker执⾏的任务的结果,Celery⽀持以不同⽅式存储任务的结果,包括AMQP, ,memcached, ,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。
这⾥我先不去看它是如何存储的,就先选⽤redis来存储任务执⾏结果。
因为涉及到消息中间件(在Celery帮助⽂档中称呼为中间⼈<broker>),为了更好的去理解⽂档中的例⼦,可以安装两个中间件,⼀个是RabbitMQ,⼀个redis。
根据 Celery的帮助⽂档安装和设置RabbitMQ, 要使⽤ Celery,需要创建⼀个 RabbitMQ ⽤户、⼀个虚拟主机,并且允许这个⽤户访问这个虚拟主机。
$ sudo rabbitmqctl add_user forward password #创建了⼀个RabbitMQ⽤户,⽤户名为forward,密码是password
$ sudo rabbitmqctl add_vhost ubuntu #创建了⼀个虚拟主机,主机名为ubuntu
# 设置权限。允许⽤户forward访问虚拟主机ubuntu,因为RabbitMQ通过主机名来与节点通信
$ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"
$ sudo rabbitmq-server # 启⽤RabbitMQ服务器
结果如下,成功运⾏:
安装Redis,它的安装⽐较简单
$ sudo pip install redis
然后进⾏简单的配置,只需要设置 Redis 数据库的位置:
BROKER_URL = 'redis://localhost:6379/0'
URL的格式为:
python官方文档中文版redis://:password@hostname:port/db_number
URL Scheme 后的所有字段都是可选的,并且默认为 localhost 的 6479 端⼝,使⽤数据库 0。我的配置是:
redis://:password@ubuntu:6379/5
安装Celery,我是⽤标准的Python⼯具pip安装的,如下:
$ sudo pip install celery
是⼀个强⼤的分布式任务队列的异步处理框架,它可以让任务的执⾏完全脱离主程序,甚⾄可以被分配到其他主机上运⾏。我们通常使⽤它来实现异步任务(async task)和定时任务(crontab)。我们需要⼀个消息队列来下发我们的任务。⾸先要有⼀个消息中间件,此处选择rabbitmq (也可选择 redis 或 Amazon Simple Queue Service(SQS)消息队列服务)。推荐选择 rabbitmq 。使⽤RabbitMQ是官⽅特别推荐的⽅式,因此我也使⽤它作为我们的broker。它的组成如下图:
可以看到,Celery 主要包含以下⼏个模块:
任务模块 Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,⽽定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
消息中间件 Broker
Broker,即为任务调度队列,接收任务⽣产者发来的消息(即任务),将任务存⼊队列。Celery 本⾝不提供队列服务,官⽅推荐使⽤ RabbitMQ 和等。
任务执⾏单元 Worker
Worker 是执⾏任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执⾏它。
任务结果存储 Backend
Backend ⽤于存储任务的执⾏结果,以供查询。同消息中间件⼀样,存储也可使⽤ RabbitMQ, 和等。
安装
有了上⾯的概念,需要安装这么⼏个东西:RabbitMQ、SQLAlchemy、Celery
安装rabbitmq
官⽹安装⽅法:
启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management
启动rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已经启动,可以打开页⾯来看看
地址:
⽤户名密码都是guest 。现在可以进来了,可以看到具体页⾯。关于rabbitmq的配置,⽹上很多⾃⼰去搜以下就ok了。
消息中间件有了,现在该来代码了,使⽤ celeby官⽹代码。
剩下两个都是的东西了,直接pip安装就好了,对于从来没有安装过驱动的同学可能需要安装MySQL-。安装完成之后,启动服务: $ rabbitmq-server[回车]。启动后不要关闭窗⼝,下⾯操作新建窗⼝(Tab)。
安装celery
Celery可以通过pip⾃动安装,如果你喜欢使⽤虚拟环境安装可以先使⽤virtualenv创建⼀个⾃⼰的虚拟环境。反正我喜欢使⽤virtualenv建⽴⾃⼰的环境。
pip install celery
www.open-open/lib/view/open1441161168878.html
开始使⽤ Celery
使⽤celery包含三个⽅⾯:1. 定义任务函数。2. 运⾏celery服务。3. 客户应⽤程序的调⽤。
创建⼀个⽂件tasks.py输⼊下列代码:
1.
from celery import Celery
2.
3.
broker = 'redis://127.0.0.1:6379/5'
4.
backend = 'redis://127.0.0.1:6379/6'
5.
6.
7.
app = Celery('tasks', broker=broker, backend=backend)
8.
9.
@app.task
10.
def add(x, y):
11.
return x + y
上述代码导⼊了celery,然后创建了celery 实例 app,实例化的过程中指定了任务名tasks(和⽂件名⼀致),传⼊了broker和backend。然后创建了⼀个任务函数add。下⾯启动celery服务。在当前命令⾏终端运⾏(分别在 env1 和 env2 下执⾏):
celery -A tasks worker --loglevel=info
⽬录结构(celery -A tasks worker --loglevel=info 这条命令当前⼯作⽬录必须和 tasks.py 所在的⽬录相同。即进⼊tasks.py所在⽬录执⾏这条命令。)
使⽤ python 虚拟环境模拟两个不同的主机。
此时会看见⼀对输出。包括注册的任务啦。
交互式客户端程序调⽤⽅法
打开⼀个命令⾏,进⼊Python环境。
In [0]:from tasks import add
In [1]: r = add.delay(2, 2)
In [2]: add.delay(2, 2)
Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d>
In [3]: r = add.delay(3, 3)
In [4]: r.re
In [4]: r.ready()
Out[4]: True
In [6]: r.result
Out[6]: 6
In [7]: r.get()
Out[7]: 6
调⽤ delay 函数即可启动 add 这个任务。这个函数的效果是发送⼀条消息到broker中去,这个消息包括要执⾏的函数、函数的参数以及其他信息,具体的可以看 Celery官⽅⽂档。这个时候 worker 会等待 broker 中的消息,⼀旦收到消息就会⽴刻执⾏消息。
启动了⼀个任务之后,可以看到之前启动的worker已经开始执⾏任务了。
现在是在python环境中调⽤的add函数,实际上通常在应⽤程序中调⽤这个⽅法。
注意:如果把返回值赋值给⼀个变量,那么原来的应⽤程序也会被阻塞,需要等待异步任务返回的结果。因此,实际使⽤中,不需要把结果赋值。
应⽤程序中调⽤⽅法
新建⼀个 main.py⽂件代码如下:
1.
from tasks import add
2.
3.
r = add.delay(2, 2)
4.
r = add.delay(3, 3)
5.
ady()
6.
sult
7.
()
在celery命令⾏可以看见celery执⾏的⽇志。打开 backend的redis,也可以看见celery执⾏的信息。
使⽤查看 Redis 数据库内容如图:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论