Celery 分布式任务队列

Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的一系列工具。

Celery 是一款消息队列工具,可用于处理实时数据以及任务调度。

任务队列

任务队列一般用于线程或计算机之间分配工作的一种机制。

  1. 任务队列的输入是一个称为任务的工作单元
  2. 有专门的工作进行不断的监视任务队列
  3. 进行执行新的任务工作。

工作方式

  1. Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节
  2. 启动一个任务,客户端向消息队列发送一条消息
  3. 然后中间人(Broker)将消息传递给一个职程(Worker
  4. 最后由职程(Worker)进行执行中间人(Broker)分配的任务。

Celery 可以有多个职程(Worker)和中间人(Broker),用来提高 Celery 的高可用性以及横向扩展能力。

Celery 是用 Python 编写的,但协议可以用任何语言实现。

除了 Python 语言实现之外,还有 Node.js 的 node-celery 和 php 的 celery-php。

可以通过暴露 HTTP 的方式进行,任务交互以及其它语言的集成开发。

环境

Celery 需要消息中间件来进行发送和接收消息。

RabbitMQRedis 中间人的功能比较齐全,但也支持其它的实验性的解决方案,其中包括 SQLite 进行本地开发。

Celery 可以在一台机器上运行,也可以在多台机器上运行,甚至可以跨数据中心运行。


安装

您可以通过 python 的 pip 安装或通过源代码进行安装 Celery 。

使用 pip 进行安装

pip install -U Celery

捆绑

Celery 自定义了一组用于安装 Celery 和特定功能的依赖。

可以在中括号加入需要依赖,并可以通过逗号分割需要安装的多个依赖包。

pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"

目前发行的依赖包如下:

序列化

celery[auth]:使用 auth 保证程序的安全 celery[msgpack]:使用 msgpack 序列化 celery[yaml]:使用 yaml 序列化

并发

celery[eventlet]:基于 eventle 的并发池 celery[gevent]:基于 gevent 的并发池

传输和后端

celery[librabbitmq]:使用 librabbitmq 库 celery[redis]:使用 Redis 进行消息传输或后端结果存储 celery[sqs]:使用 Amazon SQS 进行消息传输(实验阶段) celery[tblib]:使用 task_remote_tracebacks 的功能 celery[memcache]:使用 Memcached 作为后端结果存储(使用的是 pylibmc) celery[pymemcache]:使用 Memcached 作为后端结果存储(纯 Python 实现) celery[cassandra]:使用 Apache Cassandra 作为后端结果存储 celery[couchbase]:使用 CouchBase 作为后端结果存储 celery[arangodb]:使用 ArangoDB 作为后端结果存储 celery[elasticsearch]:使用 ElasticSearch 作为后端结果存储 celery[riak]:使用 Riak 作为后端结果存储 celery[dynamodb]:使用 AWS DynamoDB 作为后端结果存储 celery[zookeeper]:使用 Zookeeper 进行消息传输 celery[sqlalchemy]:使用 SQLlchemy 作为后端结果存储(支持) celery[pyro]:使用 Pyro4 进行消息传输(实验阶段) celery[slmq]:使用 SoftLayer Message Queue 进行消息传输(实验阶段) celery[consul]:使用 Consul.io Key/Value 进行存储传输消息或后端结果存储(实验阶段) celery[django]:支持比较低的 Django 版本,不建议您在项目中使用它,它仅供参考