大道至简,知行合一。

Python异步任务神器celery

Celery 简单来说就是一个分布式消息队列。简单、灵活且可靠,能够处理大量消息,它是一个专注于实时处理的任务队列,同时也支持异步任务调度。Celery 不仅可以单机运行,也能够同时在多台机器上运行,甚至可以跨数据中心。

Celery 中比较关键的概念:

  • worker: worker 是一个独立的进程,任务执行单元,它持续监视队列中是否有需要处理的任务;
  • broker: broker 消息传输中间件,任务调度队列,接收生产者发出的消息,将任务存入队列,broker 负责协调客户端和 worker 的沟通。客户端向队列添加消息,broker 负责把消息派发给 worker。
  • 任务模块:包含异步任务和定时任务,异步任务通常在业务逻辑中被触发并发往任务队列,定时任务由 celery beat 进程周期性发送
  • 任务结果 backend:backend 存储任务执行结果,同消息中间件一样,需要由其他存储系统提供支持

一个典型的 Celery 使用场景就是,当用户在网站注册时,请求可以立即返回而不用等待发送注册激活邮件之后返回,网站可以将发送邮件这样的耗时不影响主要流程的操作放到消息队列中,Celery 就提供了这样的便捷。

安装 Celery

直接使用 python 工具 pip 或者 easy_install 来安装:

$ pip install celery

安装 Broker

Celery 支持多种 broker, 但主要以 RabbitMQ 和 Redis 为主,其他都是试验性的,虽然也可以使用, 但是没有专门的维护者。如何在 RabbitMQ 和 Redis 之间选择呢?

RabbitMQ is feature-complete, stable, durable and easy to install. It’s an excellent choice for a production environment.

Redis is also feature-complete, but is more susceptible to data loss in the event of abrupt termination or power failures.

Celery 官方明确表示推荐在生产环境下使用 RabbitMQ,Redis 存在丢数据的问题。所以如果你的业务可以容忍 worker crash 或者电源故障导致的任务丢失,采用 redis 是个不错的选择,本篇就以 redis 为例来介绍。

Celery 对于 redis 的支持需要安装相关的依赖,以下命令可以同时安装 celery 和 redis 相关的依赖,但是 redis server 还是必须单独安装的。

$ pip install -U celery[redis]  # -U 的意思是把所有指定的包都升级到最新的版本

Celery 的配置和使用

Celery 本身的配置项是很多的,但是如果要让它跑起来,你只需要加一行配置:

BROKER_URL = 'redis://localhost:6379/0'

这一行就是告诉 celery broker 的地址和选择的 redis db,默认是 0。接下来用个很简单的例子来介绍 celery 是如何使用的:

# task.py
from celery import Celery

broker = 'redis://127.0.0.1:6379/0'

app = Celery('tasks', broker=broker)

@app.task()
def add(x, y):
   return x + y

上述代码创建了一个 celery 的实例 app,可以通过它来创建任务和管理 workers。在上面的例子中,我们创建了一个简单的任务 task,它返回了两个数相加后的结果。然后启动 celery 服务,通过它来监听是否有任务要处理。

$ celery worker -A task -l info
  • -A 选项指定 celery 实例 app 的位置,本例中 task.py 中自动寻找,当然可以直接指定 celery worker -A task.app -l info
  • -l 选项指定日志级别, -l 是 --loglevel 的缩略形式

其他更多选项通过 celery worker –-help 查看

调用任务或者发送消息

然后我们再打开一个 shell 窗口,进入 python 控制台去调用 add 任务:

>>> from task import add
>>> add.delay(1, 2)
<AsyncResult: 42ade14e-c7ed-4b8d-894c-1ca1ec7ca192>

delay() 是 apply_async 的简写,用于一个任务消息(task message)。我们发现 add 任务并没有返回结果 3,而是一个对象 AsyncResult,它的作用是被用来检查任务状态,等待任务执行完毕或获取任务结果,如果任务失败,它会返回异常信息或者调用栈。

我们先尝试获取任务的执行结果:

>>> result = add.delay(1, 2)
>>> result.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/base.py", line 604, in _is_disabled
    'No result backend configured.  '
NotImplementedError: No result backend configured.  Please see the documentation for more information.

报错了:No result backend configured. 错误信息告诉我们没有配置 result backend。因为 celery 会将任务的 状态或结果保存在 result backend,result backend 的选择也有很多,本例中依然选用 redis 作为 result backend。

我们修改 task.py 的代码,添加上 result backend 的设置,保存后重启 celery worker。

# task.py
...
app = Celery('tasks', backend='redis://localhost', broker='redis://localhost//')
...

然后重新调用 add task:

>>> from task import add
>>> result = add.delay(1,2)
>>> result.get()
3

delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果。

任务状态

  • PENDING 任务等待中
  • STARTED 任务已开始
  • SUCCESS 任务执行成功
  • FAILURE 任务执行失败
  • RETRY 任务将被重试
  • REVOKED 任务取消

当我们有个耗时时间较长的任务进行时一般我们想得知它的实时进度,这里就需要我们自定义一个任务状态用来说明进度并手动更新状态,从而告诉回调当前任务的进度。

vim tasks.py
 
from celery import Celery
import time
 
@app.task(bind=True)
def mTasks(self):
    for i in xrange(1, 11):
        time.sleep(0.1)
        self.update_state(state="PROGRESS", meta={'p': i*10})
    return 'done'
vim trigger.py
 
#!/usr/bin/env python
#coding: utf-8
 
from task import mTasks
import sys
 
def pm(body):
    res = body.get('result')
    if body.get('status') == 'PROGRESS':
        sys.stdout.write('\r任务进度: {0}%'.format(res.get('p')))
        sys.stdout.flush()
    else:
        print '\r'
        print res
         
r = mTasks.delay()
print r.get(on_message=pm, propagate=False)

定时/周期性任务

vim celery_config.py
 
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'ptask': {
        'task': 'tasks.period_task',
        'schedule': timedelta(seconds=5), # 间隔执行的时间可以用 datetime.timedelta 或者 crontab
    },
}
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

定时参考:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules

vim tasks.py
 
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')
 
@app.task(bind=True)
def period_task(self):
    print 'period task done: {0}'.format(self.request.id)

链式任务

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()
    # 或
    #fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
     
@app.task()
def fetch_page(url):
    return myhttplib.get(url)
     
@app.task()
def parse_page(page):
    return myparser.parse_document(page)
     
@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

链式任务中前一个任务的返回值默认是下一个任务的输入值之一 ( 不想让返回值做默认参数可以用 si() 或者 s(immutable=True) 的方式调用 )。

这里的 s() 是方法 celery.signature() 的快捷调用方式,signature 具体作用就是生成一个包含调用任务及其调用参数与其他信息的对象,有点类似偏函数的概念:先不执行任务,而是把任务与任务参数存起来以供其他地方调用。

多个消息队列

vim tasks.py
 
from celery import Celery
 
app = Celery(broker="redis://localhost:6379/1" )
app.config_from_object("celery_config")
 
@app.task
def t1(x, y):
    return x * y
 
@app.task
def t2(x, y, z):
   return x + y + z
 
@app.task
def add(x, y):
   return x + y
vim celery_config.py
from kombu import Queue, Exchange
 
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_QUEUES = {
    Queue("default", Exchange("default"), routing_key = "default"),
    Queue("t1", Exchange("t1"), routing_key = "t1"),
    Queue("t2", Exchange("t2"), routing_key = "t2")
}
     
#路由
CELERY_ROUTES = {
    "tasks.t1":{"queue": "t1",  "routing_key": "t1"},
    "tasks.t2":{"queue": "t2",  "routing_key": "t2"}
}
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
 
# CELERY_TIMEZONE = 'UTC'
# 定时任务
# CELERYBEAT_SCHEDULE = 
#    't1_schedule' : {
#        'task':'tasks.t1',
#        'schedule':20,
3        'args':(5,6)
#     }

启动worker

celery -A tasks worker -l info -n workerA.%h -Q t1
celery -A tasks worker -l info -n workerA.%h -Q t2
celery -A tasks worker -l info -n worker.%h -Q celery # 默认的
# celery -A tasks beat

触发执行

vim trigger.py 
 
import time
from tasks import t1,t2,add
 
r1 = t1.delay(10, 20)
time.sleep(1)
print (r1.result)
print (r1.status)
 
r2 = t2.delay(10, 20, 30)
time.sleep(1)
print (r2.result)
print (r2.status)
 
r3 = add.delay(100, 200)
time.sleep(1)
print (r3.result)
print (r3.status)

任务调用方式

add.delay(2, 2)
add.apply_async(2, 2)

实际上 delay 只是 apply_async 的快捷方式,只是 apply_async 可以进行更多的任务属性设置,更多参考

关于 AsyncResult 

AsyncResult 主要用来储存任务执行信息与执行结果,有点类似 tornado 中的 Future 对象,都有储存异步结果与任务执行状态的功能。更多参考

Celery Flower

flower 是一个 celery 的监控工具,它提供了一个图形用户界面,可以极大的方便我们监控任务的执行过程, 执行细节及历史记录,还提供了统计功能。

flower 安装

pip install flower    #方法1
easy_install flower  #方法2

flower 使用简介,首先启动通过命令行启动 flower 进程:

flower -A proj --port=5555

然后打开浏览器 http://localhost:5555/

celery flower

Celery 任务类型

apply_async

调用一个异步任务,这也是最常用的任务类型之一,delay 与它的作用相同,只是 delay 不支持 apply_async 中额外的参数。该方法有几个比较重要的参数,在实际应用中会经常用到:

countdown: 任务延迟执行的秒数,默认立即执行; eta:任务被执行的绝对时间

crontab 计划任务

Celery 同样也支持定时任务:

from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (20, 20),
    },
    # execute every ten minutes
   'every_ten_minutes': {
        'task': 'worker.cntv.cntv_test',
        'schedule': timedelta(minutes=10),
        'args': ('args1',),
        'options': {
            'queue': 'queue_name'
        }
    },
}

要启动定时任务,需要启动一个心跳进程,假设

celery beat -A celery_app.celery_config -s /path/to/celerybeat-schedule -l info

其中 -s 参数指定 celerybeat 文件保存的位置。beat 主要的功能就是将 task 下发到 broker 中,让 worker 去消费。

取消队列中任务

取消队列中任务,可以使用命令行,也可以导入 celery app 然后使用 control()

celery -A proj -Q queue_name purge      # 取消队列 queue_name 中的任务
# or
from proj.celery import app
app.control.purge()

From:stackoverflow

celery 在 supervisor 中 root 不能启动问题

Celery 不能用 root 用户启动,所以在 supervisor 中启动时会报错:

If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).

User information: uid=0 euid=0 gid=0 egid=0

解决办法

from celery import Celery, platforms
platforms.C_FORCE_ROOT = True

或者 supervisor 配置中

environment=C_FORCE_ROOT="true"

参考文献

赞(1)
未经允许不得转载:北凉柿子 » Python异步任务神器celery
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址