澳门新萄京官方网站-www.8455.com-澳门新萄京赌场网址

实行异步职务和定时职责,遍及式职责队列学习

2019-05-18 作者:www.8455.com   |   浏览(126)

一、前言

  Celery是二个依据python开荒的布满式任务队列,如果不精通请阅读小编上1篇博文Celery入门与进级,而做python WEB开垦最为盛行的框架莫属Django,不过Django的伸手管理过程都是联合签字的黔驴技穷兑现异步职责,若要实现异步义务管理须要经过任何办法(前端的相似消除方案是ajax操作),而后台Celery正是不错的精选。借使一个用户在施行有些操作供给拭目以待很久才回到,那大大下跌了网址的吞吐量。上面将讲述Django的请求管理大约流程(图片来源于互联网):

澳门新萄京官方网站 1

呼吁进度简单表明:浏览器发起呼吁-->请求管理-->请求经过中间件-->路由映射-->视图管理业务逻辑-->响应请求(template或response)

英特网有多数celery django完结按期职责的科目,可是它们超越伍分3是依照djcelery celery三的;
或许是采取django_celery_beat配置较为麻烦的。

持有演示均依据Django二.0

Celery 是一个简易、灵活且保险的,管理多量音讯的布满式系统,并且提供维护这么2个种类的画龙点睛工具。
它是多少个留意于实时管理的任务队列,同时也支撑职务调节。
上述是celery自个儿官方网站的牵线

【Celery布满式义务队列】

2、配置利用

  celery很轻松集成到Django框架中,当然假若想要落成定时职责的话还亟需设置django-celery-beta插件,前边会表明。需求留意的是Celery四.0只帮忙Django版本>=一.八的,借使是小于1.八本子要求采用Celery三.壹。

驾驭简洁而敏捷才是大家最终的求偶,而celery四已经没有须求额外插件就可以与django结合达成定期职分了,原生的celery beat就能够很好的落到实处定期职责功能。

celery是2个依据python开采的简要、灵活且保障的布满式职务队列框架,援救选用职务队列的不二诀窍在遍布式的机器/进度/线程上执行职责调解。选拔独立的生产者-消费者模型,主要由三部分构成:

celery的接纳场景很布满

壹、Celery介绍和中坚使用

Celery 是2个基于python开采的分布式异步新闻职责队列,通过它能够轻巧的得以实现任务的异步管理, 即便你的事务场景中须求用到异步任务,就足以惦记采纳celery, 举多少个实例场景中可用的事例:

  1. 你想对拾0台机器试行一条批量命令,大概会花十分长日子 ,但你不想令你的程序等着结果回到,而是给您回来 三个职责ID,你过一段时间只必要拿着那个职务id就足以得到义务实行结果, 在职分推行ing进行时,你能够一连做其它的事情。 
  2. 您想做三个定时职分,比方每一日检查评定一下你们全数客户的材质,如若发掘后天是客户的八字,就给她发个短信祝福

Celery 在举行职责时须求通过3个音信中间件来收取和发送职责音讯,以及存款和储蓄任务结果, 一般选取rabbitMQ or Redis

一.1 Celery有以下优点:

  轻便:1单熟稔了celery的行事流程后,配置和选用照旧相比轻松的

  高可用:当职务实行停业或实施进程中生出再三再四中断,celery 会自动尝试重新施行职分

  神速:1个单进度的celery每分钟可管理上百万个职责

  灵活: 差不多celery的逐条零部件都能够被扩展及自定制

Celery基本职业流程图

  澳门新萄京官方网站 2

配置

  新确立项目taskproj,目录结构(各类app下多了个tasks文件,用于定义职分):

taskproj
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── taskproj
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

在类型目录taskproj/taskproj/目录下新建celery.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj')

app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

taskproj/taskproj/__init__.py:

from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ['celery_app']

taskproj/taskproj/settings.py

CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

进入项指标taskproj目录运维worker:

celery worker -A taskproj -l debug

本来使用原生方案的同时有几点插件所带来的裨益被咱们屏弃了:

  • 音信队列broker:broker实际上就是三个MQ队列服务,能够采取redis、rabbitmq等作为broker
  • 拍卖职分的消费者workers:broker公告worker队列中有任务,worker去队列中抽取职责奉行,每贰个worker正是1个进度
  • 积存结果的backend:实行结果存款和储蓄在backend,暗许也会储存在broker使用的MQ队列服务中,也足以独立布署用何种服务做backend
  • 拍卖异步任务
  • 职分调节
  • 管理定期义务
  • 遍及式调治

一.2 Celery安装使用

Celery的暗中认可broker是RabbitMQ, 仅需安插一行就可以

broker_url ``= 'amqp://guest:guest@localhost:5672//'

 

使用Redis做broker也可以

  安装redis组件

$ pip3 install -U "celery[redis]"

 

配置

Configuration is easy, just configure the location of your Redis database:

app.conf.broker_url = 'redis://localhost:6379/0'

Where the URL is in the format of:

redis://:password@hostname:port/db_number

all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.

 

 

如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪

If you also want to store the state and return values of tasks in Redis, you should configure these settings:

app.conf.result_backend = 'redis://localhost:6379/0'

 

概念与触发任务

  职务定义在各样tasks文件中,app0一/tasks.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x   y


@shared_task
def mul(x, y):
    return x * y

视图中触发职分

from django.http import JsonResponse
from app01 import tasks

# Create your views here.

def index(request,*args,**kwargs):
    res=tasks.add.delay(1,3)
    #任务逻辑
    return JsonResponse({'status':'successful','task_id':res.task_id})

访问

澳门新萄京官方网站 3

 若想博得义务结果,可以经过task_id使用AsyncResult获取结果,还能直接通过backend获取:

澳门新萄京官方网站 4

 

  • 插件提供的定时职责处理将不在可用,当大家只必要任务按期实行而不供给人工资调度度的时候那点忽略不计。
  • 没辙急忙的治本或追踪定时职分,定时职责的追踪其实交给日志更合理,可是对职分的修改就不曾那么便宜了,不过倘诺无需平常改换/增减职分的话那点也在可承受范围内。

澳门新萄京官方网站 5

好处也多数,越发在接纳python创设的选用连串中,无缝过渡,使用格外便宜。

一. 三 先河接纳Celery

  安装celery模块

    pip3 install celery

始建叁个celery application 用来定义你的天职列表

  创制贰个职务文件就叫tasks.py

 

from celery import Celery

app = Celery('tasks',
             broker='redis://localhost',
        #有用户名密码的话,broker="redis://:mima@127.0.0.1"
             backend='redis://localhost')

@app.task
def add(x,y):
    print("running...",x,y)
    return x y

 

起步Celery Worker来起始监听并进行职责

$ celery -A tasks worker --loglevel=info

 

调用职务

  再张开多少个极限, 举行命令行形式,调用职分

>>> from tasks import add
>>> add.delay(4, 4)

看你的worker终端会显示收到 一个任务,此时你想看任务结果的话,需要在调用 任务时 赋值个变量

>>> result ``= add.delay(``4``, ``4``)

 

The ready() method returns whether the task has finished processing or not:

>>> result.ready()
False

You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

>>> result.get(timeout=1)
8

In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

>>> result.get(propagate=False)

If the task raised an exception you can also gain access to the original traceback:

>>> result.traceback
…

扩展

  除了redis、rabbitmq能做结果存款和储蓄外,还是能使用Django的orm作为结果存款和储蓄,当然须求安装重视插件,那样的收益在于大家得以一向通过django的数据查看到职务情形,同时为能够制定越来越多的操作,上面介绍如何利用orm作为结果存款和储蓄。

1.安装

pip install django-celery-results

2.配置settings.py,注册app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

4.修改backend配置,将redis改为django-db

#CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储

伍.修改数据库

python3 manage.py migrate django_celery_results

此时会看到数据库会多成立:

澳门新萄京官方网站 6 当然你有的时候候须求对task表进行操作,以下源码的表结构定义:

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(_('task id'), max_length=255, unique=True)
    task_name = models.CharField(_('task name'), null=True, max_length=255)
    task_args = models.TextField(_('task arguments'), null=True)
    task_kwargs = models.TextField(_('task kwargs'), null=True)
    status = models.CharField(_('state'), max_length=50,
                              default=states.PENDING,
                              choices=TASK_STATE_CHOICES
                              )
    content_type = models.CharField(_('content type'), max_length=128)
    content_encoding = models.CharField(_('content encoding'), max_length=64)
    result = models.TextField(null=True, default=None, editable=False)
    date_done = models.DateTimeField(_('done at'), auto_now=True)
    traceback = models.TextField(_('traceback'), blank=True, null=True)
    hidden = models.BooleanField(editable=False, default=False, db_index=True)
    meta = models.TextField(null=True, default=None, editable=False)

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

 

Celery定期任务安顿

在拓展示公布署前先来看看项目布局:

.├── linux_news│   ├── celery.py│   ├── __init__.py│   ├── settings.py│   ├── urls.py│   └── wsgi.py├── manage.py├── news│   ├── admin.py│   ├── apps.py│   ├── __init__.py│   ├── migrations│   ├── models│   ├── tasks.py│   ├── tests.py│   └── views└── start-celery.sh

中间news是大家的app,用于从一些rss订阅源获取情报新闻,linux_news则是大家的project。大家要求关注的第一是celery.py,settings.py,tasks.py和start-celery.sh。

第三是celery.py,想让celery推行职分就非得实例化二个celery app,并把settings.py里的安排传入app:

import osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'linux_news.settings')app = Celery('linux_news')# 'django.conf:settings'表示django,conf.settings也就是django项目的配置,celery会根据前面设置的环境变量自动查找并导入# - namespace表示在settings.py中celery配置项的名字的统一前缀,这里是'CELERY_',配置项的名字也需要大写app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()

配置便是那般轻松,为了能在django里使用这几个app,我们供给在__init__.py中程导弹入它:

from .celery import app as celery_app

接下来大家来看tasks.py,它应有放在你的app目录中,前边我们配备了电动发掘,所以celery会自动找到这几个tasks,大家的tasks将写在这一模块中,代码涉及了有的orm的行使,为了顺应核心作者做了些简洁明了:

from linux_news.celery import celery_app as appfrom .models import *import timeimport feedparserimport pytzimport html@app.task(ignore_result=True)def fetch_news(origin_name):    """    fetch all news from origin_name    """    origin = get_feeds_origin(origin_name)    feeds = feedparser.parse(origin.feed_link)    for item in feeds['entries']:        entry = NewsEntry()        entry.title = item.title        entry.origin = origin        entry.author = item.author        entry.link = item.link        # add timezone        entry.publish_time = item.time.replace(tzinfo=pytz.utc)        entry.summary = html.escape(item.summary)        entry.save()@app.task(ignore_result=True)def fetch_all_news():    """    这是我们的定时任务    fetch all origins' news to db    """    origins = NewsOrigin.objects.all()    for origin in origins:        fetch_news.delay(origin.origin_name)

tasks里是某个耗费时间操作,例如互连网IO只怕数据库读写,因为大家不敬服职务的重临值,所以利用@app.task(ignore_result=True)将其屏蔽了。

职分布署落成后大家就要安顿celery了,大家挑选redis作为职务队列,小编刚强建议在生产情形中运用rabbitmq或许redis作为职分队列或结果缓存后端,而不该运用关系型数据库:

# redisREDIS_PORT = 6379REDIS_DB = 0# 从环境变量中取得redis服务器地址REDIS_HOST = os.environ.get('REDIS_ADDR', 'redis')# celery settings# 这两项必须设置,否则不能正常启动celery beatCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = TIME_ZONE# 任务队列配置CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_ACCEPT_CONTENT = ['application/json', ]CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_TASK_SERIALIZER = 'json'

下一场是大家的定时职务设置:

from celery.schedules import crontabCELERY_BEAT_SCHEDULE={        'fetch_news_every-1-hour': {            'task': 'news.tasks.fetch_all_news',            'schedule': crontab(minute=0, hour='*/1'),        }}

定期任务安顿对象是2个dict,由职责名和布局项组成,主要配置想如下:

  • task:任务函数所在的模块,模块路线得写全,不然找不到将不也许运行该任务
  • schedule:定时布置,一般采用celery.schedules.crontab,上边例子为每小时的0分推行一回任务,具体写法与linux的crontab类似可以参照文书档案表达
  • args:是个元组,给出职责急需的参数,假如没有要求参数也能够不写进配置,就像是例子中的一样
  • 别的配置项较少用,能够参见文书档案
    到现在,配置celery beat的片段就过逝了。

异步任务

Celery

在类型中使用celery 

能够把celery配置成1个采纳

目录格式如下

proj/__init__.py
    /celery.py
    /tasks.py

叁、Django中使用定期任务

  倘使想要在django中使用定时任务功效雷同是靠beat达成职务发送功能,当在Django中使用定期职责时,需求安装django-celery-beat插件。以下将介绍使用进程。

启动celery beat

布局完毕后只须求运行celery了。

起步以前安插一下条件。不要用root运营celery!不要用root运维celery!不要用root运转celery!首要的事体说3回。

start-celery.sh:

export REDIS_ADDR=127.0.0.1celery -A linux_news worker -l info -B -f /path/to/log

-A 表示app所在的目录,-B代表运维celery beat运行按时职务。
celery平常运行后就足以经过日记来查看任务是或不是健康运行了:

[2018-12-21 13:00:00,022: INFO/MainProcess] Received task: news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d]  [2018-12-21 13:00:00,046: INFO/MainProcess] Received task: news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b]  [2018-12-21 13:00:00,051: INFO/ForkPoolWorker-2] Task news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] succeeded in 0.02503809699555859s: None[2018-12-21 13:00:00,052: INFO/MainProcess] Received task: news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25]  [2018-12-21 13:00:00,449: INFO/ForkPoolWorker-5] Task news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] succeeded in 0.39487219898728654s: None[2018-12-21 13:00:00,606: INFO/ForkPoolWorker-3] Task news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] succeeded in 0.5523456179944333s: None

如上正是celery4运营定期职分的剧情,如有错误和疏漏,应接指正。

本人的异步使用境况为项目上线:前端web上有个上线按键,点击按钮后发请求给后端,后端实践上线进度要陆分钟,后端在收受到请求后把职分放入队列异步施行,同时马上重回给前端贰个义务试行中的结果。若果未有异步推行会怎么啊?同步的情况就是执行进度中前端一贯在等后端重临结果,页面转呀转的就转超时了。

安装

proj/celery.py内容

 

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

 

设置配备

一.beat插件设置

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
    ....   
    'django_celery_beat',
]

三.数据库更动

python3 manage.py migrate django_celery_beat

4.分级运维woker和beta

celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #启动beta 调度器使用数据库

celery worker -A taskproj -l info #启动woker

5.配置admin

urls.py

# urls.py
from django.conf.urls import url
from django.contrib import admin

urlpatterns = [
    url(r'^admin/', admin.site.urls),
]

陆.开立用户

python3 manage.py createsuperuser 

7.登入admin实行田间管理(地址

澳门新萄京官方网站 7

 

 使用示例:

澳门新萄京官方网站 8

 

 

 

 

澳门新萄京官方网站 9

 

 

 查看结果:

澳门新萄京官方网站 10

 

异步职根据地署

安装Celery

引入使用pip安装,若是您选择的是虚拟景况,请在虚拟景况里安装

$ pip install celery

proj/tasks.py中的内容

 

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x   y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

 

启动worker 

$ celery -A proj worker -l info

实行异步职务和定时职责,遍及式职责队列学习笔记。输出

 

-------------- celery@Zhangwei-MacBook-Pro.local v4.0.2 (latentcall)
---- **** -----
--- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x103a020f0
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

 

后台运转worker

二回开采

  django-celery-beat插件本质上是对数码库表变化检查,一旦有数据库表退换,调解器重新读取任务进展调整,所以一旦想和谐定制的天职页面,只必要操作beat插件的4张表就足以了。当然你仍是能够谐和定义调治器,django-celery-beat插件已经停放了model,只供给开始展览导入便可进展orm操作,以下小编用django reset api实行出现说法:

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]

urls.py

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index$', views.index),
    url(r'^res$', views.get_res),
    url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]

views.py

from django_celery_beat.models import PeriodicTask  #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """自定义分页"""
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination

访问

澳门新萄京官方网站 11

实行异步职务和定时职责,遍及式职责队列学习笔记。 

1.安装rabbitmq,这里我们应用rabbitmq作为broker,安装到位后暗许运转了,也没有供给任何任何配置

安装新闻中间件

Celery 帮衬 RabbitMQ、Redis 以致别的数据库系统作为其音讯代理中间件

你希望用如何中间件和后端就请自行设置,一般都使用redis或许RabbitMQ

In the background

In production you’ll want to run the worker in the background, this is described in detail in the daemonization tutorial.

The daemonization scripts uses the celery multi command to start one or more workers in the background:

$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
    > w1.halcyon.local: OK

You can restart it too:

$ celery  multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

or stop it:

$ celery multi stop w1 -A proj -l info

The stop command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait command instead, this ensures all currently executing tasks is completed before exiting:

$ celery multi stopwait w1 -A proj -l info
# apt-get install rabbitmq-server

安装Redis

在Ubuntu系统下使用apt-get命令就足以

$ sudo apt-get install redis-server

假定您选用redis作为中间件,还供给安装redis帮忙包,同样应用pip安装就能够

$ pip install redis

能冒出以下结果即为成功

redis 127.0.0.1:6379>

别的的redis知识这里不左介绍,假若有趣味,可以活动了然

假若您使用RabbitMQ,也请安装RabbitMQ

Celery 定期任务

celery支持按期职分,设定好义务的实施时间,celery就能够定期自动帮您试行, 这几个按期任务模块叫celery beat

写二个剧本 叫periodic_task.py

 

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

 

add_periodic_task 会增添一条定期职责

地点是因此调用函数增加按期职责,也能够像写配置文件 同样的情势丰硕, 上面是每30s试行的职责

 

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

 

  使命增多好了,供给让celery单独运行三个进程来定时发起这一个职分, 注意, 这里是倡议职分,不是实施,那一个进度只会持续的去检查你的职分计划, 每发掘有职分急需实践了,就提倡叁个任务调用音讯,交给celery worker去试行

开发银行职务调整器 celery beat

$ celery -A periodic_task beat

输出like below

 

celery beat v4.0.2 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2017-02-08 18:39:31
Configuration ->
    . broker -> redis://localhost:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

 

 

那时还差一步,就是还索要运维三个worker,担任试行celery beat发起的职责

起步celery worker来施行职务

 

$ celery -A periodic_task worker

 -------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
---- **** -----
--- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-02-08 18:42:08
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x104d420b8
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

 

好啊,此时观测worker的出口,是或不是每隔一小会,就能实行三次按期义务吗!

注意:Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:

$ celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule

2.安装celery

安装RabbitMQ

$ sudo apt-get install rabbitmq-server

更头晕目眩的按期配置

# pip3 install celery

使用Celery

地点的按期义务相比较轻便,只是每多少s实施1个任务,但假如你想要每一周壹35的清早八点给您发邮件如何做吧?哈,其实也简单,用crontab成效,跟linux自带的crontab功效是同样的,能够脾性化定制职分实施时间

叁.celery用在django项目中,django项目目录结构(简化)如下

归纳直接采取

能够在急需的地点直接引进Celery,间接使用就可以。最简易的方法只须要配备1个任务和中间人就能够

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x   y

自个儿这里运用了redis作为中间件,这是足以按本身的习于旧贯替换的

鉴于私下认可的配置不是最符合大家的花色实际要求,一般的话大家都亟待按大家团结的渴求安插部分,
而是出于要求将品种解耦,也好维护,大家最佳应用单独的八个文书编写配置。

Linux crontab  

 

 

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

 

地点的那条意思是每一周一的中午柒.30实施tasks.add职责

还有越来越多定期配置格局如下:

Example

  Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,
hour='0,3,6,9,12,15,18,21')
Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='*',
hour='*',day_of_week='sun')
Same as previous.
crontab(minute='*/10',
hour='3,17,22',day_of_week='thu,fri')
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month='2') Execute on the second day of every month.
crontab(0, 0,
day_of_month='2-30/3')
Execute on every even numbered day.
crontab(0, 0,
day_of_month='1-7,15-21')
Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month='11',
month_of_year='5')
Execute on the eleventh of May every year.
crontab(0, 0,
month_of_year='*/3')
Execute on the first month of every quarter.

地点能满意你绝大多数定期职责急需了,以至还是能够凭借潮起潮落来布置定时任务, 具体看

website/
|-- deploy
|  |-- admin.py
|  |-- apps.py
|  |-- __init__.py
|  |-- models.py
|  |-- tasks.py
|  |-- tests.py
|  |-- urls.py
|  `-- views.py
|-- manage.py
|-- README
`-- website
  |-- celery.py
  |-- __init__.py
  |-- settings.py
  |-- urls.py
  `-- wsgi.py

独立布署配置文件

比下面的有一些复杂一点,大家要求成立八个文本,叁个为config.py的celery配置文件,在中间填写适合大家项指标安顿,在开创1个tasks.py文本来编排大家的任务。文件的名字能够按你的喜好本身取名。

config.py内容为:

# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

配置好之后能够用以下命令检查安插文件是或不是精确(config为布局文件名)

$ python -m config

tasks.py内容为:

# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x   y

再有一种同等设置配置的秘诀,不是很推荐

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先需求用上述措施批量创新配备文件。

Celery与django结合

django 能够轻易跟celery结合达成异步职务,只需简单布署就能够

If you have a modern Django project layout like:

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
- manage.py

then the recommended way is to create a new proj/proj/celery.py module that defines the Celery instance:

file: proj/proj/celery.py 

 

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

 

Then you need to import this app in your proj/proj/__init__.py module. This ensures that the app is loaded when Django starts so that the @shared_task decorator (mentioned later) will use it:

proj/proj/__init__.py: 

  

 

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

 

Note that this example project layout is suitable for larger projects, for simple projects you may use a single contained module that defines both the app and tasks, like in the First Steps with Celery tutorial.

Let’s break down what happens in the first module, first we import absolute imports from the future, so that our celery.py module won’t clash with the library:

from __future__ import absolute_import

Then we set the default DJANGO_SETTINGS_MODULE environment variable for the celery command-line program:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

You don’t need this line, but it saves you from always passing in the settings module to the celery program. It must always come before creating the app instances, as is what we do next:

app = Celery('proj')

This is our instance of the library.

We also add the Django settings module as a configuration source for Celery. This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings; but you can also separate them if wanted.

The uppercase name-space means that all Celery configuration options must be specified in uppercase instead of lowercase, and start with CELERY_, so for example the task_always_eager` setting becomes CELERY_TASK_ALWAYS_EAGER, and the broker_url setting becomes CELERY_BROKER_URL.

You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.

app.config_from_object('django.conf:settings', namespace='CELERY')

Next, a common practice for reusable apps is to define all tasks in a separate tasks.pymodule, and Celery does have a way to  auto-discover these modules:

  app.autodiscover_tasks()

With the line above Celery will automatically discover tasks from all of your installed apps, following the tasks.py convention:

  

- app1``/

``- tasks.py

``- models.py

- app2``/

``- tasks.py

``- models.py

Finally, the debug_task example is a task that dumps its own request information. This is using the new bind=True task option introduced in Celery 3.1 to easily refer to the current task instance.

然后在实际的app里的tasks.py里写你的天职

 

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x   y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

 

 

在你的django views里调用celery task

 

from django.shortcuts import render,HttpResponse

# Create your views here.

from  bernard import tasks

def task_test(request):

    res = tasks.add.delay(228,24)
    print("start running task")
    print("async task res",res.get() )

    return HttpResponse('res %s'%res.get())

 

 

4.创建 website/celery.py 主文件

在采纳上采用

工程目录结构为

proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x   y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启航celery只必要在proj同级目录下:

$ celery -A proj worker -l info

在django中使用布置任务功效

There’s  the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

To install and use this extension:

  1. Use pip to install the package:

    $ pip install django-celery-beat
    
  2. Add the django_celery_beat module to INSTALLED_APPS in your Django project’ settings.py:

        INSTALLED_APPS = (
            ...,
            'django_celery_beat',
        )
    
    Note that there is no dash in the module name, only underscores.
    
  3. Apply Django database migrations so that the necessary tables are created:

    $ python manage.py migrate
    
  4. Start the celery beat service using the django scheduler:

    $ celery -A proj beat -l info -S django
    
  5. Visit the Django-Admin interface to set up some periodic tasks.

 

在admin页面里,有3张表

澳门新萄京官方网站 12

布置完长这样

澳门新萄京官方网站 13

 

 

那时候起动你的celery beat 和worker,会意识每隔贰分钟,beat会发起一个职务消息让worker实施scp_task任务

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')

app = Celery('website')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))

在django中使用celery

作者们的django的类型的目录结构相似如下

proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中应用celery,大家率先供给在django中配置celery

大家需求在与工程名同名的子文件夹中加多celery.py文件
在本例中约等于proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

然后我们须要在同级目录下的**init.py文本中安顿如下内容 proj/proj/init.py**

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

接下来我们就可以把需求的义务放到供给的app下的tasks.py中,将来项目目录结构如下

proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

莫不的二个tasks.py文件内容如下:
myapp1/tasks.py为:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x y)
    return x   y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器能够让您创造task没有必要app实体

在急需的地点调用相关职务就可以,举个例子在myapp1/views.py中调用

from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

下一场就足以运维项目,celery要求单独运行,所以要求开两极分化,分别

开发银行web应用服务器

$ python manage.py runserver

启动celery

$ celery -A proj worker -l info

下一场访问浏览器就足以在起步celery的极端中见到输出

澳门新萄京官方网站 14

测试结果

5.在 website/__init__.py 文件中加进如下内容,确认保障django运营的时候这几个app能够被加载到

扩展

  • 只要您的类型须求在admin中管理调节,请使用django-celery-beat
  1. 使用pip安装django-celery-beat
$ pip install django-celery-beat

绝不在动用django-celery,那一个类型现已终止更新好繁多年。。。。

  1. 在settings.py中加上那么些app
INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
  1. 共同一下数据库
$ python manage.py migrate
  1. 设置celery beat劳务使用django_celery_beat.schedulers:DatabaseScheduler scheduler
$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

接下来在就能够admin界面看到了。

  • 1经你想选择Django-OLANDM也许Django Cache作为后端,供给设置django-celery-results扩大(小编不建议)
  1. 使用pip安装django-celery-results
$ pip install django-celery-results

绝不在使用django-celery,那一个连串曾经终止更新好许多年。。。。

  1. 在settings.py中增多这么些app
INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
  1. 1道一下数据库
$ python manage.py migrate django_celery_results
  1. 布局后端,在settings.py中布置
# 使用数据库作为结果后端
CELERY_RESULT_BACKEND = 'django-db'

# 使用缓存作为结果后端
CELERY_RESULT_BACKEND = 'django-cache'

着力使用差不多便是上述那些,别的实际布置和选用还需本身研读法定文书档案

注:

  • 上述条件在ubuntu16.04 lts django1.9中搭建测试成功
  • 上述文字皆为个人观点,如有错误或建议请登时联系小编
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

陆.各使用创设tasks.py文件,这里为 deploy/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
  return x   y

留意tasks.py必须建在各app的根目录下,且只好叫tasks.py,无法轻巧命名

7.views.py中引用使用那一个tasks异步管理

from deploy.tasks import add

def post(request):
  result = add.delay(2, 3)


result.ready()
result.get(timeout=1)
result.traceback

8.启动celery

# celery -A website worker -l info

九.这么在调用post这么些方法时,里边的add就可以异步管理了

定期职务

定期职责的运用处境就很常见了,比如小编急需按期发送报告给COO娘~

定时任务布署

  1. website/celery.py 文件加多如下配置以辅助定期职责crontab
from celery.schedules import crontab

app.conf.update(
  CELERYBEAT_SCHEDULE = {
    'sum-task': {
      'task': 'deploy.tasks.add',
      'schedule': timedelta(seconds=20),
      'args': (5, 6)
    }
    'send-report': {
      'task': 'deploy.tasks.report',
      'schedule': crontab(hour=4, minute=30, day_of_week=1),
    }
  }
)

概念了三个task:

  • 名称叫'sum-task'的task,每20秒实施一回add函数,并传了几个参数伍和陆
  • 名称叫'send-report'的task,每一周1上午四:30推行report函数

timedelta是datetime中的3个对象,需求 from datetime import timedelta 引进,有如下多少个参数

  • days
  • seconds
  • microseconds
  • 澳门新萄京官方网站,milliseconds
  • minutes
  • hours

crontab的参数有:

month_of_year
day_of_month
day_of_week
hour
minute

  1. deploy/tasks.py 文件加多report方法:
@shared_task
def report():
  return 5

三.开发银行celery beat,celery运维了贰个beat过程一贯在无时无刻的判断是或不是有任务要求实践

# celery -A website beat -l info

Tips

一.就算您还要使用了异步职务和安顿职责,有壹种更简便的起步方式 celery -A website worker -b -l info ,可同时开动worker和beat

贰.借使利用的不是rabbitmq做队列那么需求在主配置文件中 website/celery.py 配置broker和backend,如下:

# redis做MQ配置
app = Celery('website', backend='redis', broker='redis://localhost')
# rabbitmq做MQ配置
app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')

3.celery不可能用root用户运维的话要求在主配置文件中增加 platforms.C_FORCE_ROOT = True

四.celery在长日子运作后或许出现内部存款和储蓄器泄漏,需求加上配置 CELERYD_MAX_TASKS_PER_CHILD = 10 ,表示每一种worker实践了略微个职分就死掉

如上正是本文的全体内容,希望对大家的读书抱有辅助,也愿意我们多多援助脚本之家。

你或然感兴趣的文章:

  • 异步义务队列Celery在Django中的采用格局
  • Django使用Celery异步职责队列的利用
  • Django中使用celery达成异步职责的示范代码

本文由澳门新萄京官方网站发布于www.8455.com,转载请注明出处:实行异步职务和定时职责,遍及式职责队列学习

关键词: