使用

Celery 是一个包含一系列的消息任务队列。您可以不用了解内部的原理直接使用,它的使用时非常简单的。

此外 Celery 可以快速与产品扩展与集成,以及 Celery 提供了一系列 Celery 可能会用到的工具和技术支持方案。

  1. 选择并且安装一个消息中间件(Broker)
  2. 安装 Celery 并且创建第一个任务
  3. 运行职程(Worker)以及调用任务
  4. 跟踪任务的情况以及返回值

选择中间人(Broker)

Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现,成为 消息中间人(Broker

以下有几种选择

RabbitMQ

RabbitMQ 的功能比较齐全、稳定、便于安装。

在生产环境来说是首选的,有关 Celery 中使用 RabbitMQ 的详细信息:

  1. 使用 RabbitMQ

    如果您使用的是 Ubuntu 或 Debian ,可以通过以下命令进行安装 RabbitMQ: $ sudo apt-get install rabbitmq-server

    如果在 Docker 中运行 RabbitMQ ,可以使用以下命令: $ docker run -d -p 5462:5462 rabbitmq

    命令执行完毕之后,中间人(Broker)会在后台继续运行,准备输出一条 Starting rabbitmq-server: SUCCESS 的消息。

  2. 使用 Redis

    Redis 功能比较全 在 Docker 中运行 Redis ,可以通过以下命令实现: $ docker run -d -p 6379:6379 redis

  3. 除以上提到的中间人(Broker)之外,还有处于实验阶段的中间人(Broker),其中包含 Amazon SQS

    相关完整的中间人(Broker)列表,请查阅中间人:Broker。


应用

创建第一个 Celery 实例程序

把创建 Celery 程序成为 Celery 应用或直接简称为 app,创建的第一个实例程序可能需要包含 Celery 中执行操作的所有入口点

例如 创建任务、管理职程(Worker)等,所以必须要导入 Celery 模块。

在本教程中将所有的内容,保存为一个 app 文件中。 针对大型的项目,可能需要创建 独立的模块。

首先创建 tasks.py

from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y
  1. 第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称。

  2. 第二个参数为中间人(Broker)的链接 URL ,实例中使用的 RabbitMQ(Celery 默认使用的也是 RabbitMQ)。

更多相关的 Celery 中间人(Broker)的选择方案,可查阅上面的中间人(Broker)。

例如,对于 RabbitMQ 可以写为 amqp://localhost ,使用 Redis 可以写为 redis://localhost。 创建了一个名称为 add 的任务,返回的俩个数字的和。


运行 Celery 职程(Worker)服务

现在可以使用 worker 参数进行执行刚刚创建职程(Worker)

$ celery -A tasks worker --loglevel=info

在生产环境中,如果需要将职程(Worker)作为守护进程在后台运行,可以使用平台提供的工具来进行实现

或使用类似 supervisord 这样的工具来进行管理(详情: 守护进程:Daemonization 部分)

  1. 关于 Celery 可用的命令完整列表,可以通过以下命令进行查看:

$ celery worker --help

  1. 也可以通过以下命令查看一些 Celery 帮助选项:

$ celery help


调用任务

  1. 需要调用创建的实例任务,可以通过 delay() 进行调用。
  2. delay()apply_async() 的快捷方法,可以更好的控制任务的执行(详情:调用任务:Calling Tasks
from tasks import add
add.delay(4, 4)

该任务已经有职程(Worker)开始处理,可以通过控制台输出的日志进行查看执行情况。 调用任务会返回一个 AsyncResult 的实例,用于检测任务的状态,等待任务完成获取返回值(如果任务执行失败,会抛出异常)。

默认这个功能是不开启的,如果开启则需要配置 Celery结果后端,下一小节会详细说明。


保存结果

如果需要跟踪任务的状态,Celery 需要在某处存储任务的状态信息。

Celery 内置了一些后端结果:

  1. SQLAlchemy/Django ORM
  2. Memcached
  3. Redis
  4. RPC (RabbitMQ/AMQP)
  5. 自定义的后端结果存储中间件。

针对本次实例,使用 RPC 作为结果后端,将状态信息作为临时消息回传。

  • 后端通过 backend 参数指定给 Celery(或者通过配置模块中的result_backend 选项设定):

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

result_backend - https://www.celerycn.io/yong-hu-zhi-nan/ren-wu-tasks

  • 现在已经配置结果后端,重新调用执行任务。 会得到调用任务后返回的一个 AsyncResult 实例:

>>> result = add.delay(4, 4)

  • ready() 可以检测是否已经处理完毕:

```python

result.ready() False ```

  • 整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:

>>> result.get(timeout=1)

  • 如果任务出现异常,get() 会再次引发异常,可以通过 propagate 参数进行覆盖:

>>> result.get(propagate=False)

  • 如果任务出现异常,可以通过以下命令进行回溯:

>>> result.traceback

警告:

如果后端使用资源进行存储结果,必须要针对调用任务后返回每一个 AsyncResult 实例调用 get()forget() ,进行资源释放。

完整的结果对象,请参阅:celery.result。