澳门新萄京官方网站-www.8455.com-澳门新萄京赌场网址

澳门新萄京官方网站:Celery学习笔记,Django使用

2019-05-18 作者:www.8455.com   |   浏览(196)

一、简介

  Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。特点:

  • 简单:熟悉celery的工作流程后,配置使用简单
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活:几乎celery的各个组件都可以被扩展及自定制

应用场景举例:

  1.web应用:当用户在网站进行某个操作需要很长时间完成时,我们可以将这种操作交给Celery执行,直接返回给用户,等到Celery执行完成以后通知用户,大大提好网站的并发以及用户的体验感。

  2.任务场景:比如在运维场景下需要批量在几百台机器执行某些命令或者任务,此时Celery可以轻松搞定。

  3.定时任务:向定时导数据报表、定时发送通知类似场景,虽然Linux的计划任务可以帮我实现,但是非常不利于管理,而Celery可以提供管理接口和丰富的API。

1 Celery简介

Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行。

任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ、Redis)。

1  Celery简介

Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行。

任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ、Redis)。

 

Django使用Celery异步任务队列,djangocelery

Tips:在学习Celery过程中,使用的系统为Windows 10、Celery版本为3.1.18①、中间人使用RabbitMQ。

二、架构&工作原理

  Celery由以下三部分构成:消息中间件(Broker)、任务执行单元Worker、结果存储(Backend),如下图:

  澳门新萄京官方网站 1

工作原理:

  1. 任务模块Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往消息队列,而定时任务由Celery Beat进程周期性地将任务发往消息队列;
  2. 任务执行单元Worker实时监视消息队列获取队列中的任务执行;
  3. Woker执行完任务后将结果保存在Backend中;

1.1 Celery原理

澳门新萄京官方网站 2

Celery的 架构 由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括, RabbitMQRedis ,  MongoDB  (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。推荐使用:RabbitMQ、Redis作为消息队列。

任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

1.1  Celery原理

 澳门新萄京官方网站 3

 

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQRedis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。推荐使用:RabbitMQ、Redis作为消息队列。
  • 任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
  • 任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

1  Celery简介

Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行。

任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ、Redis)。

 

C:Usersfoolf>celery --version
3.1.18 (Cipater)

消息中间件Broker

  消息中间件Broker官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ。

1.2Celery适用场景

异步任务处理:例如给注册用户发送短消息或者确认邮件任务。 大型任务:执行时间较长的任务,例如视频和图片处理,添加水印和转码等,需要执行任务时间长。 定时执行的任务:支持任务的定时执行和设定时间执行。例如性能压测定时执行。

1.2     Celery适用场景

  • 异步任务处理:例如给注册用户发送短消息或者确认邮件任务。
  • 大型任务:执行时间较长的任务,例如视频和图片处理,添加水印和转码等,需要执行任务时间长。
  • 定时执行的任务:支持任务的定时执行和设定时间执行。例如性能压测定时执行。

 

1.1  Celery原理

 澳门新萄京官方网站 4

 

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQRedis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。推荐使用:RabbitMQ、Redis作为消息队列。
  • 任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
  • 任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

什么是任务队列

任务队列是一种在线程或者机器之间分发任务的机制。
消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。

Celery使用消息通信,通信一般使用中间人(Broker)在客户端和职程之间斡旋。这个过程从客户端想队列中添加消息开始,之后中间人将消息派送给职程。

Celery是Python编写的,但协议可以使用任何语言实现。

任务执行单元Worker

  Worker是任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心。

 2Celery开发环境准备

2      Celery开发环境准备

1.2     Celery适用场景

  • 异步任务处理:例如给注册用户发送短消息或者确认邮件任务。
  • 大型任务:执行时间较长的任务,例如视频和图片处理,添加水印和转码等,需要执行任务时间长。
  • 定时执行的任务:支持任务的定时执行和设定时间执行。例如性能压测定时执行。

 

需要什么

Celery需要一个发送和接受消息的传述者。RabbbitMQ和Redis中间人的消息支持所有的特性,我们主要是使用RabbitMQ作中间人(关于中间人RabbitMQ的安装可以网上搜索,有很多详细的教程)。

结果存储Backend

  Backend结果存储官方也提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。

 2.1 环境准备

软件名称

版本号

说明

Linux

Centos 6.5(64bit)

操作系统

Python

3.5.2

Django

1.10

Web框架

Celery

4.0.2

异步任务队列

Redis

2.4

消息队列

2.1     环境准备

软件名称

版本号

说明

Linux

Centos 6.5(64bit)

操作系统

Python

3.5.2

 

Django

1.10

Web框架

Celery

4.0.2

异步任务队列

Redis

2.4

消息队列

 

2      Celery开发环境准备

Celery优势

在程序运行过程中,我们经常会遇到一些耗时耗资源的操作,为了避免阻塞主程序,我们会采用异步或者多线程来处理任务。比如在主程序中调用一个函数,并从该函数中获取函数返回值。如果这个函数不能很快执行完成并返回,那么主程序就会阻塞,知直到函数返回。
Celery是一个强大的分布式任务队列,它可以让人物的执行完全脱离主程序,甚至可以被分配到其他的主机上运行。

三、安装使用 

  这里我使用的redis作为消息中间件,redis安装可以参考

Celery安装: 

pip3 install celery

2.2     Celery安装

使用方法介绍:

Celery的运行依赖消息队列,使用时需要安装redis或者rabbit。

这里我们使用Redis。安装redis库:

sudo yum install redis

启动redis:

sudo service redis start

安装celery库

sudo pip install celery==4.0.2

2.2     Celery安装

使用方法介绍:

Celery的运行依赖消息队列,使用时需要安装redis或者rabbit。

这里我们使用Redis。安装redis库:

sudo yum install redis

  

启动redis:

sudo service redis start

 

安装celery库

sudo pip install celery==4.0.2

 

2.1     环境准备

软件名称

版本号

说明

Linux

Centos 6.5(64bit)

操作系统

Python

3.5.2

 

Django

1.10

Web框架

Celery

4.0.2

异步任务队列

Redis

2.4

消息队列

 

Celery架构:

架构

从图上可以看出Celery包含几个模块:

  • 任务模块
    主要包异步任务和定时任务,异步任务通常在业务逻辑中被触发并发送到任务队列中,而定时任务是由Celery Beat进程周期性的将任务发往任务队列。
  • 消息中间件Broker
    Broker就是任务调度队列,接受任务生产者发送来的消息,将任务存入队列,之所以需要中间人的原因是Celrey本身是不提供消息队列的服务,所以需要第三方组件实现。
  • 任务执行单元Worker
    Worker是执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
  • 任务存储Backend
    Backend用于存储任务只想的结果,存储可以使用RabbitMQ或者Redis或者数据库等。

简单使用

  目录结构:

project/
├── __init__.py  
├── config.py
└── tasks.py

各目录文件说明:

__init__.py:初始化Celery以及加载配置文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from celery import Celery
app = Celery('project')                                # 创建 Celery 实例
app.config_from_object('project.config')               # 加载配置模块

config.py:  Celery相关配置文件,更多配置参考:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
)

tasks.py :任务定义文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
@app.task
def show_name(name):
    return name

启动Worker:

celery worker -A project -l debug

各个参数含义:

  worker: 代表第启动的角色是work当然还有beat等其他角色;

  -A :项目路径,这里我的目录是project

  -l:启动的日志级别,更多参数使用celery --help查看

查看日志输出,会发现我们定义的任务,以及相关配置:

澳门新萄京官方网站 5

 

  虽然启动了worker,但是我们还需要通过delay或apply_async来将任务添加到worker中,这里我们通过交互式方法添加任务,并返回AsyncResult对象,通过AsyncResult对象获取结果:

澳门新萄京官方网站 6

AsyncResult除了get方法用于常用获取结果方法外还提以下常用方法或属性:

  • state: 返回任务状态;
  • task_id: 返回任务id;
  • result: 返回任务结果,同get()方法;
  • ready(): 判断任务是否以及有结果,有结果为True,否则False;
  • info(): 获取任务信息,默认为结果;
  • wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
  • successfu(): 判断任务是否成功,成功为True,否则为False;

3Celery单独执行任务

3      Celery单独执行任务

2.2     Celery安装

使用方法介绍:

Celery的运行依赖消息队列,使用时需要安装redis或者rabbit。

这里我们使用Redis。安装redis库:

sudo yum install redis

  

启动redis:

sudo service redis start

 

安装celery库

sudo pip install celery==4.0.2

 

安装Celery

Celery已经提交到Pypi上,所以我们可是使用Python的工具pip来安装。

pip install celery==3.1.18

上面的安装命令,如果没有指定版本,系统会默认安装最新版本的Celery,但是这里可能在后面的学习中遇到问题。关于问题放到最后解释,暂且我们先安装3.1.18版本的Celery。

四、进阶使用

  对于普通的任务来说可能满足不了我们的任务需求,所以还需要了解一些进阶用法,Celery提供了诸多调度方式,例如任务编排、根据任务状态执行不同的操作、重试机制等,以下会对常用高阶用法进行讲述。

 3.1编写任务

创建task.py文件

说明:这里初始Celery实例时就加载了配置,使用的redis作为消息队列和存储任务结果。

澳门新萄京官方网站 7

运行celery:

$ celery -A task worker --loglevel=info

看到下面的打印,说明celery成功运行。

澳门新萄京官方网站 8

3.1     编写任务

创建task.py文件

说明:这里初始Celery实例时就加载了配置,使用的redis作为消息队列和存储任务结果。

 澳门新萄京官方网站 9

 

运行celery:

$ celery -A task worker --loglevel=info

看到下面的打印,说明celery成功运行。

 澳门新萄京官方网站 10

3      Celery单独执行任务

创建Celery实例

如果你已经安装好了Celery,那么现在就可以创建Celery实例了:
tasks.py

# coding:utf-8

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x   y

Celery的第一个参数是当前模块名称,这个参数是必须的,第二个参数是中间人关键字参数,指定我们所使用的的消息中间人的URL,这里我们使用的是RabbitMQ。我们定义了一个单一的任务,称为add,返回两个数字的和。

定时任务&计划任务

  Celery的提供的定时任务主要靠schedules来完成,通过beat组件周期性将任务发送给woker执行。在示例中,新建文件period_task.py,并添加任务到配置文件中:

period_task.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, add.s(1,3), name='1 3=') # 每10秒执行add
    sender.add_periodic_task(
        crontab(hour=16, minute=56, day_of_week=1),      #每周一下午四点五十六执行sayhai
        sayhi.s('wd'),name='say_hi'
    )



@app.task
def add(x,y):
    print(x y)
    return x y


@app.task
def sayhi(name):
    return 'hello %s' % name

config.py

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task', #定时任务
)

启动worker和beat:

celery worker -A project -l debug #启动work
celery beat -A  project.period_task -l  debug #启动beat,注意此时对应的文件路径

我们可以观察worker日志:

澳门新萄京官方网站 11

还可以通过配置文件方式指定定时和计划任务,此时的配置文件如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
    'period_add_task': {    # 计划任务
        'task': 'project.period_task.add',  #任务路径
        'schedule': crontab(hour=18, minute=16, day_of_week=1),
        'args': (3, 4),
    },
'add-every-30-seconds': {          # 每10秒执行
        'task': 'project.period_task.sayhi',  #任务路径
        'schedule': 10.0,
        'args': ('wd',)
    },
}

此时的period_task.py只需要注册到woker中就行了,如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    print(x y)
    return x y


@app.task
def sayhi(name):
    return 'hello %s' % name

同样启动worker和beat结果和第一种方式一样。更多详细的内容请参考:

3.2 调用任务

直接打开python交互命令行

执行下面代码:

澳门新萄京官方网站 12

可以celery的窗口看到任务的执行信息

澳门新萄京官方网站 13

任务执行状态监控和获取结果:

澳门新萄京官方网站 14

3.2     调用任务

 直接打开python交互命令行

 执行下面代码:

澳门新萄京官方网站 15

 

可以celery的窗口看到任务的执行信息

 澳门新萄京官方网站 16

 

任务执行状态监控和获取结果:

 澳门新萄京官方网站 17

 

 

3.1     编写任务

创建task.py文件

说明:这里初始Celery实例时就加载了配置,使用的redis作为消息队列和存储任务结果。

 澳门新萄京官方网站 18

 

运行celery:

$ celery -A task worker --loglevel=info

看到下面的打印,说明celery成功运行。

 澳门新萄京官方网站 19

启动Celery职程服务器(Worker)

 celery -A tasks worker --loglevel=info

参数-A指定了Celery实例的位置,这个实例是在tasks.py文件中,Celery会自动在该文件中查找Celery对象实例。
--loglevel指定日志的级别,默认是warning。
如果启动正常,就会看到下面的输出。

running

任务绑定

  Celery可通过任务绑定到实例获取到任务的上下文,这样我们可以在任务运行时候获取到任务的状态,记录相关日志等。

修改任务中的period_task.py,如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
@app.task(bind=True)  # 绑定任务
def add(self,x,y):
    logger.info(self.request.__dict__)  #打印日志
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=3) # 出错每5秒尝试一次,总共尝试3次
    return x y

在以上代码中,通过bind参数将任务绑定,self指任务的上下文,通过self获取任务状态,同时在任务出错时进行任务重试,我们观察日志:

澳门新萄京官方网站 20

3.3任务调用方法总结

有两种方法:

delay和apply_async ,delay方法是apply_async简化版。

add.delay(2, 2)
add.apply_async((2, 2))
add.apply_async((2, 2), queue='lopri')

delay方法是apply_async简化版本。

apply_async方法是可以带非常多的配置参数,包括指定队列等

Queue 指定队列名称,可以把不同任务分配到不同的队列 3.4     任务状态

每个任务有三种状态:PENDING -> STARTED -> SUCCESS

任务查询状态:res.state

来查询任务的状态

澳门新萄京官方网站 21

3.3     任务调用方法总结

有两种方法:

delay和apply_async ,delay方法是apply_async简化版。

add.delay(2, 2)
add.apply_async((2, 2))
add.apply_async((2, 2), queue='lopri')

 

delay方法是apply_async简化版本。

apply_async方法是可以带非常多的配置参数,包括指定队列等

  • Queue 指定队列名称,可以把不同任务分配到不同的队列

 

3.2     调用任务

 直接打开python交互命令行

 执行下面代码:

澳门新萄京官方网站 22

 

可以celery的窗口看到任务的执行信息

 澳门新萄京官方网站 23

 

任务执行状态监控和获取结果:

 澳门新萄京官方网站 24

 

 

调用任务

现在我们已经开启了一个Worker了,这样我们可以在应用程序中使用 delay()或者 apply_async()方法来调用任务。
在tasks.py文件所在的目录打开终端。

>>> from tasks import add
>>> add.delay(2, 8)
<AsyncResult: 1b50f449-8fa2-478a-9eea-561a3c29fd43>
>>>

我们先从tasks.py文件中导入add任务对象,然后使用delay()方法将任务发送到消息中间件,我们之前开启的那个Worker会一直监控任务队列,知道有任务到来,就会执行。
我们到Worker中可以看到多了几条日志信息:

[2017-03-09 19:45:35,351: INFO/MainProcess] Received task: tasks.add[1b50f449-8fa2-478a-9eea-561a3c29fd43]
[2017-03-09 19:45:40,920: INFO/MainProcess] Task tasks.add[1b50f449-8fa2-478a-9eea-561a3c29fd43] succeeded in 5.56299996376s: 10

说明我们的任务被调度并执行成功了。

内置钩子函数

  Celery在执行任务时候,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行),在进行使用是我们只需要重写这些方法,完成相应的操作即可。

在以下示例中,我们继续修改period_task.py,分别定义三个任务来演示任务失败、重试、任务成功后执行的操作:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger
from celery import Task

logger = get_task_logger(__name__)

class demotask(Task):

    def on_success(self, retval, task_id, args, kwargs):   # 任务成功执行
        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))



    def on_failure(self, exc, task_id, args, kwargs, einfo):  #任务失败执行
        logger.info('task id:{} , arg:{} , failed ! erros : {}' .format(task_id,args,exc))


    def on_retry(self, exc, task_id, args, kwargs, einfo):    #任务重试执行
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

@app.task(base=demotask,bind=True)
def add(self,x,y):
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=1) # 出错每5秒尝试一次,总共尝试1次
    return x y

@app.task(base=demotask)
def sayhi(name):
    a=[]
    a[10]==1
    return 'hi {}'.format(name)

@app.task(base=demotask)
def sum(a,b):
    return 'a b={} '.format(a b)

此时的配置文件config.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
'add': {          # 每10秒执行
        'task': 'project.period_task.add',  #任务路径
        'schedule': 10.0,
        'args': (10,12),
    },
'sayhi': {          # 每10秒执行
        'task': 'project.period_task.sayhi',  #任务路径
        'schedule': 10.0,
        'args': ('wd',),
    },
'sum': {          # 每10秒执行
        'task': 'project.period_task.sum',  #任务路径
        'schedule': 10.0,
        'args': (1,3),
    },
}

然后重启worker和beat,查看日志:

澳门新萄京官方网站 25

 

4与Django集成

上面简单介绍了celery异步任务的基本方法,结合我们实际的应用,我们需要与Django一起使用,下面介绍如何与Django结合。

3.4     任务状态

每个任务有三种状态:

PENDING -> STARTED -> SUCCESS

 

任务查询状态:

res.state

 

来查询任务的状态

 澳门新萄京官方网站 26

 

3.3     任务调用方法总结

有两种方法:

delay和apply_async ,delay方法是apply_async简化版。

add.delay(2, 2)
add.apply_async((2, 2))
add.apply_async((2, 2), queue='lopri')

 

delay方法是apply_async简化版本。

apply_async方法是可以带非常多的配置参数,包括指定队列等

  • Queue 指定队列名称,可以把不同任务分配到不同的队列

 

获得结果

刚我们在命令行中调用任务,很明显任务执行完成,但是我们并不知道任务执行后得到的结果是什么。如果我们想获得执行后的结果可以这样:

>>> result = add.delay(2, 8)
>>> result.ready()  # 查看任务执行的状态,此刻任务没有执行完成,显示False
False
>>> result.ready()
True  # 表示任务已经执行完成
>>> result.get()  # 获取任务的执行结果
10
>>>

任务编排

  在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成,Celery同样也能实现这样的任务,完成这类型的任务通过以下模块完成:

  • group: 并行调度任务

  • chain: 链式任务调度

  • chord: 类似group,但分header和body2个部分,header可以是一个group任务,执行完成后调用body的任务

  • map: 映射调度,通过输入多个入参来多次调度同一个任务

  • starmap: 类似map,入参类似*args

  • chunks: 将任务按照一定数量进行分组

 

修改tasks.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    return x y


@app.task
def mul(x,y):
    return x*y


@app.task
def sum(data_list):
    res=0
    for i in data_list:
        res =i
    return res

 

group: 组任务,组内每个任务并行执行

和project同级目录新建consumer.py如下:

from celery import group
from project.tasks import add,mul,sum
res = group(add.s(1,2),add.s(1,2))()  # 任务 [1 2,1 2] 
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

结果:

澳门新萄京官方网站 27

 

chain:链式任务

链式任务中,默认上一个任务的返回结果作为参数传递给子任务

from celery import chain
from project.tasks import add,mul,sum
res = chain(add.s(1,2),add.s(3),mul.s(3))()  # 任务((1 2) 3)*3
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break
#结果
#res:18

还可以使用|表示链式任务,上面任务也可以表示为:

res = (add.s(1,2) | add.s(3) | (mul.s(3)))()
res.get()

 

chord:任务分割,分为header和body两部分,hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body

from celery import chord
from project.tasks import add,mul,sum
res = chord(header=[add.s(1,2),mul.s(3,4)],body=sum.s())()  # 任务(1 2) (3*4)
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

#结果:
#res:15

 

chunks:任务分组,按照任务的个数分组

from project.tasks import add,mul,sum
res = add.chunks(zip(range(5),range(5)),4)()  # 4 代表每组的任务的个数
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

结果:

澳门新萄京官方网站 28

 

4.1与Django集成方法

与Django集成有两种方法:

  1. Django 1.8 以上版本:与Celery 4.0版本集成
  2. Django 1.8 以下版本:与Celery3.1版本集成,使用django-celery库

今天我们介绍celery4.0 和django 1.8以上版本集成方法。

4      与Django集成

上面简单介绍了celery异步任务的基本方法,结合我们实际的应用,我们需要与Django一起使用,下面介绍如何与Django结合。

3.4     任务状态

每个任务有三种状态:

PENDING -> STARTED -> SUCCESS

 

任务查询状态:

res.state

 

来查询任务的状态

 澳门新萄京官方网站 29

 

注解:

①:之前我是在windows下学习的Celery,安装的Celery版本是4.0.2;在运行Worker过程中遇到如下ed错误:

I:Celerycelery-examples>celery -A tasks worker --loglevel=info

 -------------- celery@DESKTOP-N53SFFK v4.0.2 (latentcall)
---- **** -----
--- * ***  * -- Windows-10-10.0.14393 2017-02-28 00:32:22
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x4700908
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2017-02-28 00:32:22,619: CRITICAL/MainProcess] Unrecoverable error: TypeError('argument 1 must be an integer, not _subprocess_handle',)
Traceback (most recent call last):
  File "c:python27libsite-packagesceleryworkerworker.py", line 203, in start
    self.blueprint.start(self)
  File "c:python27libsite-packagescelerybootsteps.py", line 119, in start
    step.start(parent)
  File "c:python27libsite-packagescelerybootsteps.py", line 370, in start
    return self.obj.start()
  File "c:python27libsite-packagesceleryconcurrencybase.py", line 131, in start
    self.on_start()
  File "c:python27libsite-packagesceleryconcurrencyprefork.py", line 112, in on_start
    **self.options)
  File "c:python27libsite-packagesbilliardpool.py", line 1008, in __init__
    self._create_worker_process(i)
  File "c:python27libsite-packagesbilliardpool.py", line 1117, in _create_worker_process
    w.start()
  File "c:python27libsite-packagesbilliardprocess.py", line 122, in start
    self._popen = self._Popen(self)
  File "c:python27libsite-packagesbilliardcontext.py", line 383, in _Popen
    return Popen(process_obj)
  File "c:python27libsite-packagesbilliardpopen_spawn_win32.py", line 64, in __init__
    _winapi.CloseHandle(ht)
TypeError: argument 1 must be an integer, not _subprocess_handle

I:Celerycelery-examples>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "c:python27libsite-packagesbilliardspawn.py", line 159, in spawn_main
    new_handle = steal_handle(parent_pid, pipe_handle)
  File "c:python27libsite-packagesbilliardreduction.py", line 121, in steal_handle
    _winapi.PROCESS_DUP_HANDLE, False, source_pid)
WindowsError: [Error 87]

经过搜索发现是因为winsows是不支持celery4的。参照的回答在这https://github.com/celery/celery/issues/3551
所以我机制的将版本降低到3,运行正常。记录下来仅仅是避免其他人在学习中不会再这个小问题上浪费时间。

delay &apply_async

  对于delay和apply_async都可以用来进行任务的调度,本质上是delay对apply_async进行了再一次封装(或者可以说是快捷方式),两者都返回AsyncResult对象,以下是两个方法源码。

澳门新萄京官方网站 30澳门新萄京官方网站 31

    def delay(self, *args, **kwargs):
        """Star argument version of :meth:`apply_async`.

        Does not support the extra options enabled by :meth:`apply_async`.

        Arguments:
            *args (Any): Positional arguments passed on to the task.
            **kwargs (Any): Keyword arguments passed on to the task.
        Returns:
            celery.result.AsyncResult: Future promise.
        """
        return self.apply_async(args, kwargs)

delay源码

澳门新萄京官方网站 32澳门新萄京官方网站 33

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        """Apply tasks asynchronously by sending a message.

        Arguments:
            args (Tuple): The positional arguments to pass on to the task.

            kwargs (Dict): The keyword arguments to pass on to the task.

            countdown (float): Number of seconds into the future that the
                task should execute.  Defaults to immediate execution.

            eta (~datetime.datetime): Absolute time and date of when the task
                should be executed.  May not be specified if `countdown`
                is also supplied.

            expires (float, ~datetime.datetime): Datetime or
                seconds in the future for the task should expire.
                The task won't be executed after the expiration time.

            shadow (str): Override task name used in logs/monitoring.
                Default is retrieved from :meth:`shadow_name`.

            connection (kombu.Connection): Re-use existing broker connection
                instead of acquiring one from the connection pool.

            retry (bool): If enabled sending of the task message will be
                retried in the event of connection loss or failure.
                Default is taken from the :setting:`task_publish_retry`
                setting.  Note that you need to handle the
                producer/connection manually for this to work.

            retry_policy (Mapping): Override the retry policy used.
                See the :setting:`task_publish_retry_policy` setting.

            queue (str, kombu.Queue): The queue to route the task to.
                This must be a key present in :setting:`task_queues`, or
                :setting:`task_create_missing_queues` must be
                enabled.  See :ref:`guide-routing` for more
                information.

            exchange (str, kombu.Exchange): Named custom exchange to send the
                task to.  Usually not used in combination with the ``queue``
                argument.

            routing_key (str): Custom routing key used to route the task to a
                worker server.  If in combination with a ``queue`` argument
                only used to specify custom routing keys to topic exchanges.

            priority (int): The task priority, a number between 0 and 9.
                Defaults to the :attr:`priority` attribute.

            serializer (str): Serialization method to use.
                Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
                serialization method that's been registered
                with :mod:`kombu.serialization.registry`.
                Defaults to the :attr:`serializer` attribute.

            compression (str): Optional compression method
                to use.  Can be one of ``zlib``, ``bzip2``,
                or any custom compression methods registered with
                :func:`kombu.compression.register`.
                Defaults to the :setting:`task_compression` setting.

            link (Signature): A single, or a list of tasks signatures
                to apply if the task returns successfully.

            link_error (Signature): A single, or a list of task signatures
                to apply if an error occurs while executing the task.

            producer (kombu.Producer): custom producer to use when publishing
                the task.

            add_to_parent (bool): If set to True (default) and the task
                is applied while executing another task, then the result
                will be appended to the parent tasks ``request.children``
                attribute.  Trailing can also be disabled by default using the
                :attr:`trail` attribute

            publisher (kombu.Producer): Deprecated alias to ``producer``.

            headers (Dict): Message headers to be included in the message.

        Returns:
            celery.result.AsyncResult: Promise of future evaluation.

        Raises:
            TypeError: If not enough arguments are passed, or too many
                arguments are passed.  Note that signature checks may
                be disabled by specifying ``@task(typing=False)``.
            kombu.exceptions.OperationalError: If a connection to the
               transport cannot be made, or if the connection is lost.

        Note:
            Also supports all keyword arguments supported by
            :meth:`kombu.Producer.publish`.
        """
        if self.typing:
            try:
                check_arguments = self.__header__
            except AttributeError:  # pragma: no cover
                pass
            else:
                check_arguments(*(args or ()), **(kwargs or {}))

        app = self._get_app()
        if app.conf.task_always_eager:
            with denied_join_result():
                return self.apply(args, kwargs, task_id=task_id or uuid(),
                                  link=link, link_error=link_error, **options)

        if self.__v2_compat__:
            shadow = shadow or self.shadow_name(self(), args, kwargs, options)
        else:
            shadow = shadow or self.shadow_name(args, kwargs, options)

        preopts = self._get_exec_options()
        options = dict(preopts, **options) if options else preopts

        options.setdefault('ignore_result', self.ignore_result)

        return app.send_task(
            self.name, args, kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, result_cls=self.AsyncResult,
            shadow=shadow, task_type=self,
            **options
        )

apply_async源码

对于其使用,apply_async支持常用参数:

  • eta:指定任务执行时间,类型为datetime时间类型;
  • countdown:倒计时,单位秒,浮点类型;
  • expires:任务过期时间,如果任务在超过过期时间还未执行则回收任务,浮点类型获取datetime类型;
  • retry:任务执行失败时候是否尝试,布尔类型。;
  • serializer:序列化方案,支持pickle、json、yaml、msgpack;
  • priority:任务优先级,有0~9优先级可设置,int类型;
  • 澳门新萄京官方网站:Celery学习笔记,Django使用Celery异步任务队列的使用。retry_policy:任务重试机制,其中包含几个重试参数,类型是dict如下:

澳门新萄京官方网站 34澳门新萄京官方网站 35

max_retries:最大重试次数

interval_start:重试等待时间

interval_step:每次重试叠加时长,假设第一重试等待1s,第二次等待1+n秒

interval_max:最大等待时间

####示例
 add.apply_async((1, 3), retry=True, retry_policy={
        'max_retries': 1,
        'interval_start': 0,
        'interval_step': 0.8,
        'interval_max': 5,
    })

View Code

更多参数参考:

 

  

4.2 创建项目文件

创建一个项目:名字叫做proj

- proj/
 - proj/__init__.py
 - proj/settings.py
 - proj/urls.py
 - proj/wsgi.py
- manage.py

创建一个新的文件: proj/proj/mycelery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

在proj/proj/__init__.py:添加

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .mycelery import app as celery_app

__all__ = ['celery_app']

4.1     与Django集成方法

与Django集成有两种方法:

  • Django 1.8 以上版本:与Celery 4.0版本集成
  • Django 1.8 以下版本:与Celery3.1版本集成,使用django-celery库

 

今天我们介绍celery4.0 和django 1.8以上版本集成方法。

4      与Django集成

上面简单介绍了celery异步任务的基本方法,结合我们实际的应用,我们需要与Django一起使用,下面介绍如何与Django结合。

END

由于学习的还是celery的基础,所以后面的更复杂的内容等学了再更。

 五、管理与监控

  Celery管理和监控功能是通过flower组件实现的,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理。

4.3 配置Celery

我们在mycelery.py文件中说明celery的配置文件在settings.py中,并且是以CELERY开头。

app.config_from_object('django.conf:settings', namespace='CELERY')

在settings.py文件中添加celery配置:

澳门新萄京官方网站 36

我们的配置是使用redis作为消息队列,消息的代理和结果都是用redis,任务的序列化使用json格式。

重要:redis://127.0.0.1:6379/0这个说明使用的redis的0号队列,如果有多个celery任务都使用同一个队列,则会造成任务混乱。最好是celery实例单独使用一个队列。

4.2     创建项目文件

创建一个项目:名字叫做proj

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
  - proj/wsgi.py
- manage.py

 

创建一个新的文件:proj/proj/mycelery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

 

在proj/proj/__init__.py:添加

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .mycelery import app as celery_app

__all__ = ['celery_app']

 

4.1     与Django集成方法

与Django集成有两种方法:

  • Django 1.8 以上版本:与Celery 4.0版本集成
  • Django 1.8 以下版本:与Celery3.1版本集成,使用django-celery库

 

今天我们介绍celery4.0 和django 1.8以上版本集成方法。

参考文章:

http://docs.jinkan.org/docs/celery/index.html
http://www.guodongkeji.com/newsshow-24-2135-1.html

安装使用

pip3 install flower

启动

 flower -A project --port=5555   
# -A :项目目录
#--port 指定端口

访问http:ip:5555

澳门新萄京官方网站 37

api使用,例如获取woker信息:

curl http://127.0.0.1:5555/api/workers

结果:

澳门新萄京官方网站 38

更多api参考:

 

4.4创建APP

创建Django的App,名称为celery_task,在app目录下创建tasks.py文件。

完成后目录结构为:

├── celery_task
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tasks.py
│ ├── tests.py
│ └── views.py
├── db.sqlite3
├── manage.py
├── proj
│ ├── celery.py
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── templates

4.3     配置Celery

我们在mycelery.py文件中说明celery的配置文件在settings.py中,并且是以CELERY开头。

   

app.config_from_object('django.conf:settings', namespace='CELERY')

 

在settings.py文件中添加celery配置:

 澳门新萄京官方网站 39

 

我们的配置是使用redis作为消息队列,消息的代理和结果都是用redis,任务的序列化使用json格式。

重要:redis://127.0.0.1:6379/0这个说明使用的redis的0号队列,如果有多个celery任务都使用同一个队列,则会造成任务混乱。最好是celery实例单独使用一个队列。

4.2     创建项目文件

创建一个项目:名字叫做proj

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
  - proj/wsgi.py
- manage.py

 

创建一个新的文件:proj/proj/mycelery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

 

在proj/proj/__init__.py:添加

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .mycelery import app as celery_app

__all__ = ['celery_app']

 

4.5编写task任务

编辑任务文件

tasks.py

在tasks.py文件中添加下面代码

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
 return x   y

@shared_task
def mul(x, y):
 return x * y

@shared_task
def xsum(numbers):
 return sum(numbers)

启动celery:celery -A proj.mycelery worker -l info

说明:proj 为模块名称,mycelery 为celery 的实例所在的文件。

启动成功打印:

澳门新萄京官方网站 40

4.4     创建APP

创建Django的App,名称为celery_task,在app目录下创建tasks.py文件。

完成后目录结构为:

├── celery_task
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   ├── tests.py
│   └── views.py
├── db.sqlite3
├── manage.py
├── proj
│   ├── celery.py
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

 

4.3     配置Celery

我们在mycelery.py文件中说明celery的配置文件在settings.py中,并且是以CELERY开头。

   

app.config_from_object('django.conf:settings', namespace='CELERY')

 

在settings.py文件中添加celery配置:

 澳门新萄京官方网站 41

 

我们的配置是使用redis作为消息队列,消息的代理和结果都是用redis,任务的序列化使用json格式。

重要:redis://127.0.0.1:6379/0这个说明使用的redis的0号队列,如果有多个celery任务都使用同一个队列,则会造成任务混乱。最好是celery实例单独使用一个队列。

4.6在views中调用任务

在views中编写接口,实现两个功能:

  1. 触发任务,然后返回任务的结果和任务ID
  2. 根据任务ID查询任务状态

代码如下:

澳门新萄京官方网站 42

启动django。

新开一个会话启动celery;启动命令为:celery –A proj.mycelery worker –l info

访问 ,可以看到返回的结果。

澳门新萄京官方网站 43

在celery运行的页面,可以看到下面输出:

澳门新萄京官方网站 44

4.5     编写task任务

编辑任务文件

tasks.py

在tasks.py文件中添加下面代码

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x   y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

 

启动celery:

celery -A proj.mycelery worker -l info

 

说明:proj为模块名称,mycelery为celery的实例所在的文件。

启动成功打印:

 澳门新萄京官方网站 45

 

4.4     创建APP

创建Django的App,名称为celery_task,在app目录下创建tasks.py文件。

完成后目录结构为:

├── celery_task
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   ├── tests.py
│   └── views.py
├── db.sqlite3
├── manage.py
├── proj
│   ├── celery.py
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

 

4.7在views中查询任务状态

有的时候任务执行时间较长,需要查询任务是否执行完成,可以根据任务的id来查询任务状态,根据状态进行下一步操作。

可以看到任务的状态为:SUCCESS

澳门新萄京官方网站 46

4.6     在views中调用任务

澳门新萄京官方网站,在views中编写接口,实现两个功能:

  • 触发任务,然后返回任务的结果和任务ID
  • 根据任务ID查询任务状态

代码如下:

 澳门新萄京官方网站 47

 

启动django。

新开一个会话启动celery;启动命令为:

celery –A proj.mycelery worker –l info

 

访问,可以看到返回的结果。

 澳门新萄京官方网站 48

 

在celery运行的页面,可以看到下面输出:

 澳门新萄京官方网站 49

 

4.5     编写task任务

编辑任务文件

tasks.py

在tasks.py文件中添加下面代码

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x   y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

 

启动celery:

celery -A proj.mycelery worker -l info

 

说明:proj为模块名称,mycelery为celery的实例所在的文件。

启动成功打印:

 澳门新萄京官方网站 50

 

5Celery定时任务

Celery作为异步任务队列,我们可以按照我们设置的时间,定时的执行一些任务,例如每日数据库备份,日志转存等。

Celery的定时任务配置非常简单:

定时任务的配置依然在setting.py文件中。

说明:如果觉得celery 的数据配置文件和Django 的都在setting.py 一个文件中不方便,可以分拆出来,只需要在mycelery.py 的文件中指明即可。

app.config_from_object('django.conf:yoursettingsfile', namespace='CELERY')

4.7     在views中查询任务状态

有的时候任务执行时间较长,需要查询任务是否执行完成,可以根据任务的id来查询任务状态,根据状态进行下一步操作。

可以看到任务的状态为:SUCCESS

 澳门新萄京官方网站 51

 

4.6     在views中调用任务

在views中编写接口,实现两个功能:

  • 触发任务,然后返回任务的结果和任务ID
  • 根据任务ID查询任务状态

代码如下:

 澳门新萄京官方网站 52

 

启动django。

新开一个会话启动celery;启动命令为:

celery –A proj.mycelery worker –l info

 

访问

 澳门新萄京官方网站 53

 

在celery运行的页面,可以看到下面输出:

 澳门新萄京官方网站 54

 

5.1任务间隔运行

#每30秒调用task.add
from datetime import timedelta

CELERY_BEAT_SCHEDULE = {
 'add-every-30-seconds': {
  'task': 'tasks.add',
  'schedule': timedelta(seconds=30),
  'args': (16, 16)
 },
}

5      Celery定时任务

Celery作为异步任务队列,我们可以按照我们设置的时间,定时的执行一些任务,例如每日数据库备份,日志转存等。

Celery的定时任务配置非常简单:

定时任务的配置依然在setting.py文件中。

说明:如果觉得celery的数据配置文件和Django的都在setting.py一个文件中不方便,可以分拆出来,只需要在mycelery.py的文件中指明即可。

app.config_from_object('django.conf:yoursettingsfile', namespace='CELERY')

 

 

4.7     在views中查询任务状态

有的时候任务执行时间较长,需要查询任务是否执行完成,可以根据任务的id来查询任务状态,根据状态进行下一步操作。

可以看到任务的状态为:SUCCESS

 澳门新萄京官方网站 55

 

5.2定时执行

定时每天早上7:30分运行。

注意:设置任务时间时注意时间格式,UTC时间或者本地时间。

#crontab任务
#每天7:30调用task.add
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
 # Executes every Monday morning at 7:30 A.M
 'add-every-monday-morning': {
  'task': 'tasks.add',
  'schedule': crontab(hour=7, minute=30),
  'args': (16, 16),
 },
}

5.1     任务间隔运行

#每30秒调用task.add
from datetime import timedelta

CELERY_BEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16)
    },
}

 

5      Celery定时任务

Celery作为异步任务队列,我们可以按照我们设置的时间,定时的执行一些任务,例如每日数据库备份,日志转存等。

Celery的定时任务配置非常简单:

定时任务的配置依然在setting.py文件中。

说明:如果觉得celery的数据配置文件和Django的都在setting.py一个文件中不方便,可以分拆出来,只需要在mycelery.py的文件中指明即可。

app.config_from_object('django.conf:yoursettingsfile', namespace='CELERY')

 

 

5.3定时任务启动

配置了定时任务,除了worker进程外,还需要启动一个beat进程。

Beat进程的作用就相当于一个定时任务,根据配置来执行对应的任务。

5.3.1  启动beat进程

命令如下:celery -A proj.mycelery beat -l info

澳门新萄京官方网站 56

5.3.2  启动worker进程

Worker进程启动和前面启动命令一样。celery –A proj.mycelery worker –l info

澳门新萄京官方网站 57

5.2     定时执行

定时每天早上7:30分运行。

注意:设置任务时间时注意时间格式,UTC时间或者本地时间。

#crontab任务
#每天7:30调用task.add
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30),
        'args': (16, 16),
    },
}

 

5.1     任务间隔运行

#每30秒调用task.add
from datetime import timedelta

CELERY_BEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16)
    },
}

 

6 Celery深入

Celery任务支持多样的运行模式:

  1. 支持动态指定并发数 --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
  2. 支持链式任务
  3. 支持Group任务
  4. 支持任务不同优先级
  5. 支持指定任务队列
  6. 支持使用eventlet模式运行worker

例如:指定并发数为1000

celery -A proj.mycelery worker -c 1000

这些可以根据使用的深入自行了解和学习。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

5.3     定时任务启动

配置了定时任务,除了worker进程外,还需要启动一个beat进程。

Beat进程的作用就相当于一个定时任务,根据配置来执行对应的任务。

5.2     定时执行

定时每天早上7:30分运行。

注意:设置任务时间时注意时间格式,UTC时间或者本地时间。

#crontab任务
#每天7:30调用task.add
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30),
        'args': (16, 16),
    },
}

 

您可能感兴趣的文章:

  • Django中使用celery完成异步任务的示例代码
  • 异步任务队列Celery在Django中的使用方法

5.3.1  启动beat进程

命令如下:

celery -A proj.mycelery beat -l info

 澳门新萄京官方网站 58

 

5.3     定时任务启动

配置了定时任务,除了worker进程外,还需要启动一个beat进程。

Beat进程的作用就相当于一个定时任务,根据配置来执行对应的任务。

5.3.2  启动worker进程

Worker进程启动和前面启动命令一样。

 

celery –A proj.mycelery worker –l info

澳门新萄京官方网站 59

 

5.3.1  启动beat进程

命令如下:

celery -A proj.mycelery beat -l info

 澳门新萄京官方网站 60

 

6      Celery深入

Celery任务支持多样的运行模式:

  • 支持动态指定并发数 --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
  • 支持链式任务
  • 支持Group任务
  • 支持任务不同优先级
  • 支持指定任务队列
  • 支持使用eventlet模式运行worker

例如:指定并发数为1000

celery -A proj.mycelery worker -c 1000

 

这些可以根据使用的深入自行了解和学习。

 

 

5.3.2  启动worker进程

Worker进程启动和前面启动命令一样。

 

celery –A proj.mycelery worker –l info

 

7      参考资料

Celery官网:

Celery与Django:

celery定时任务:

6      Celery深入

Celery任务支持多样的运行模式:

  • 支持动态指定并发数 --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
  • 支持链式任务
  • 支持Group任务
  • 支持任务不同优先级
  • 支持指定任务队列
  • 支持使用eventlet模式运行worker

例如:指定并发数为1000

celery -A proj.mycelery worker -c 1000

 

这些可以根据使用的深入自行了解和学习。

 

 

7      参考资料

Celery官网:

Celery与Django:

celery定时任务:

1 Celery简介 Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务...

本文由澳门新萄京官方网站发布于www.8455.com,转载请注明出处:澳门新萄京官方网站:Celery学习笔记,Django使用

关键词: