您的位置:新葡亰496net > 奥门新萄京娱乐场 > 新葡亰496net试行异步职务和定期职责,Celery布满

新葡亰496net试行异步职务和定期职责,Celery布满

发布时间:2019-06-17 12:42编辑:奥门新萄京娱乐场浏览(91)

    一、前言

      Celery是一个基于python开发的分布式任务队列,如果不了解请阅读笔者上一篇博文Celery入门与进阶,而做python WEB开发最为流行的框架莫属Django,但是Django的请求处理过程都是同步的无法实现异步任务,若要实现异步任务处理需要通过其他方式(前端的一般解决方案是ajax操作),而后台Celery就是不错的选择。倘若一个用户在执行某些操作需要等待很久才返回,这大大降低了网站的吞吐量。下面将描述Django的请求处理大致流程(图片来源于网络):

    新葡亰496net 1

    请求过程简单说明:浏览器发起请求-->请求处理-->请求经过中间件-->路由映射-->视图处理业务逻辑-->响应请求(template或response)

    网上有很多celery django实现定时任务的教程,不过它们大多数是基于djcelery celery3的;
    或者是使用django_celery_beat配置较为繁琐的。

    所有演示均基于Django2.0

    Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
    它是一个专注于实时处理的任务队列,同时也支持任务调度。
    以上是celery自己官网的介绍

    【Celery分布式任务队列】

    二、配置使用

      celery很容易集成到Django框架中,当然如果想要实现定时任务的话还需要安装django-celery-beta插件,后面会说明。需要注意的是Celery4.0只支持Django版本>=1.8的,如果是小于1.8版本需要使用Celery3.1。

    显然简洁而高效才是我们最终的追求,而celery4已经不需要额外插件即可与django结合实现定时任务了,原生的celery beat就可以很好的实现定时任务功能。

    celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。采用典型的生产者-消费者模型,主要由三部分组成:

    celery的应用场景很广泛

    一、Celery介绍和基本使用

    Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

    1. 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。 
    2. 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

    Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis

    1.1 Celery有以下优点:

      简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的

      高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务

      快速:一个单进程的celery每分钟可处理上百万个任务

      灵活: 几乎celery的各个组件都可以被扩展及自定制

    Celery基本工作流程图

      新葡亰496net 2

    配置

      新建立项目taskproj,目录结构(每个app下多了个tasks文件,用于定义任务):

    taskproj
    ├── app01
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tasks.py
    │   └── views.py
    ├── manage.py
    ├── taskproj
    │   ├── __init__.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    └── templates
    

    在项目目录taskproj/taskproj/目录下新建celery.py:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境
    
    app = Celery('taskproj')
    
    app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置
    
    app.autodiscover_tasks()  # 发现任务文件每个app下的task.py
    

    taskproj/taskproj/__init__.py:

    from __future__ import absolute_import, unicode_literals
    
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    

    taskproj/taskproj/settings.py

    CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件
    
    CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
    
    CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
    

    进入项目的taskproj目录启动worker:

    celery worker -A taskproj -l debug
    

    当然使用原生方案的同时有几点插件所带来的好处被我们放弃了:

    • 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis、rabbitmq等作为broker
    • 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程
    • 存储结果的backend:执行结果存储在backend,默认也会存储在broker使用的MQ队列服务中,也可以单独配置用何种服务做backend
    • 处理异步任务
    • 任务调度
    • 处理定时任务
    • 分布式调度

    1.2 Celery安装使用

    Celery的默认broker是RabbitMQ, 仅需配置一行就可以

    broker_url ``= 'amqp://guest:guest@localhost:5672//'

     

    使用Redis做broker也可以

      安装redis组件

    $ pip3 install -U "celery[redis]"
    

     

    配置
    
    Configuration is easy, just configure the location of your Redis database:
    
    app.conf.broker_url = 'redis://localhost:6379/0'
    
    Where the URL is in the format of:
    
    redis://:password@hostname:port/db_number
    
    all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.
    

     

     

    如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪
    
    If you also want to store the state and return values of tasks in Redis, you should configure these settings:
    
    app.conf.result_backend = 'redis://localhost:6379/0'
    

     

    定义与触发任务

      任务定义在每个tasks文件中,app01/tasks.py:

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        return x   y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    

    视图中触发任务

    from django.http import JsonResponse
    from app01 import tasks
    
    # Create your views here.
    
    def index(request,*args,**kwargs):
        res=tasks.add.delay(1,3)
        #任务逻辑
        return JsonResponse({'status':'successful','task_id':res.task_id})
    

    访问

    新葡亰496net 3

     若想获取任务结果,可以通过task_id使用AsyncResult获取结果,还可以直接通过backend获取:

    新葡亰496net 4

     

    • 插件提供的定时任务管理将不在可用,当我们只需要任务定期执行而不需要人为调度的时候这点忽略不计。
    • 无法高效的管理或追踪定时任务,定时任务的跟踪其实交给日志更合理,但是对任务的修改就没有那么方便了,不过如果不需要经常变更/增减任务的话这点也在可接受范围内。

    新葡亰496net 5

    好处也很多,尤其在使用python构建的应用系统中,无缝衔接,使用相当方便。

    1. 3 开始使用Celery

      安装celery模块

        pip3 install celery

    创建一个celery application 用来定义你的任务列表

      创建一个任务文件就叫tasks.py

     

    from celery import Celery
    
    app = Celery('tasks',
                 broker='redis://localhost',
            #有用户名密码的话,broker="redis://:mima@127.0.0.1"
                 backend='redis://localhost')
    
    @app.task
    def add(x,y):
        print("running...",x,y)
        return x y
    

     

    启动Celery Worker来开始监听并执行任务

    $ celery -A tasks worker --loglevel=info
    

     

    调用任务

      再打开一个终端, 进行命令行模式,调用任务

    >>> from tasks import add
    >>> add.delay(4, 4)
    
    看你的worker终端会显示收到 一个任务,此时你想看任务结果的话,需要在调用 任务时 赋值个变量
    

    >>> result ``= add.delay(``4``, ``4``)

     

    The ready() method returns whether the task has finished processing or not:

    >>> result.ready()
    False
    

    You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

    >>> result.get(timeout=1)
    8
    

    In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

    >>> result.get(propagate=False)
    

    If the task raised an exception you can also gain access to the original traceback:

    >>> result.traceback
    …
    

    扩展

      除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作,下面介绍如何使用orm作为结果存储。

    1.安装

    pip install django-celery-results
    

    2.配置settings.py,注册app

    INSTALLED_APPS = (
        ...,
        'django_celery_results',
    )
    

    4.修改backend配置,将redis改为django-db

    #CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
    
    CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储
    

    5.修改数据库

    python3 manage.py migrate django_celery_results
    

    此时会看到数据库会多创建:

    新葡亰496net 6 当然你有时候需要对task表进行操作,以下源码的表结构定义:

    class TaskResult(models.Model):
        """Task result/status."""
    
        task_id = models.CharField(_('task id'), max_length=255, unique=True)
        task_name = models.CharField(_('task name'), null=True, max_length=255)
        task_args = models.TextField(_('task arguments'), null=True)
        task_kwargs = models.TextField(_('task kwargs'), null=True)
        status = models.CharField(_('state'), max_length=50,
                                  default=states.PENDING,
                                  choices=TASK_STATE_CHOICES
                                  )
        content_type = models.CharField(_('content type'), max_length=128)
        content_encoding = models.CharField(_('content encoding'), max_length=64)
        result = models.TextField(null=True, default=None, editable=False)
        date_done = models.DateTimeField(_('done at'), auto_now=True)
        traceback = models.TextField(_('traceback'), blank=True, null=True)
        hidden = models.BooleanField(editable=False, default=False, db_index=True)
        meta = models.TextField(null=True, default=None, editable=False)
    
        objects = managers.TaskResultManager()
    
        class Meta:
            """Table information."""
    
            ordering = ['-date_done']
    
            verbose_name = _('task result')
            verbose_name_plural = _('task results')
    
        def as_dict(self):
            return {
                'task_id': self.task_id,
                'task_name': self.task_name,
                'task_args': self.task_args,
                'task_kwargs': self.task_kwargs,
                'status': self.status,
                'result': self.result,
                'date_done': self.date_done,
                'traceback': self.traceback,
                'meta': self.meta,
            }
    
        def __str__(self):
            return '<Task: {0.task_id} ({0.status})>'.format(self)
    

     

    Celery定时任务配置

    在进行配置前先来看看项目结构:

    .├── linux_news│   ├── celery.py│   ├── __init__.py│   ├── settings.py│   ├── urls.py│   └── wsgi.py├── manage.py├── news│   ├── admin.py│   ├── apps.py│   ├── __init__.py│   ├── migrations│   ├── models│   ├── tasks.py│   ├── tests.py│   └── views└── start-celery.sh
    

    其中news是我们的app,用于从一些rss订阅源获取新闻信息,linux_news则是我们的project。我们需要关心的主要是celery.py,settings.py,tasks.py和start-celery.sh。

    首先是celery.py,想让celery执行任务就必须实例化一个celery app,并把settings.py里的配置传入app:

    import osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'linux_news.settings')app = Celery('linux_news')# 'django.conf:settings'表示django,conf.settings也就是django项目的配置,celery会根据前面设置的环境变量自动查找并导入# - namespace表示在settings.py中celery配置项的名字的统一前缀,这里是'CELERY_',配置项的名字也需要大写app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()
    

    配置就是这么简单,为了能在django里使用这个app,我们需要在__init__.py中导入它:

    from .celery import app as celery_app
    

    然后我们来看tasks.py,它应该位于你的app目录中,前面我们配置了自动发现,所以celery会自动找到这些tasks,我们的tasks将写在这一模块中,代码涉及了一些orm的使用,为了契合主题我做了些精简:

    from linux_news.celery import celery_app as appfrom .models import *import timeimport feedparserimport pytzimport html@app.task(ignore_result=True)def fetch_news(origin_name):    """    fetch all news from origin_name    """    origin = get_feeds_origin(origin_name)    feeds = feedparser.parse(origin.feed_link)    for item in feeds['entries']:        entry = NewsEntry()        entry.title = item.title        entry.origin = origin        entry.author = item.author        entry.link = item.link        # add timezone        entry.publish_time = item.time.replace(tzinfo=pytz.utc)        entry.summary = html.escape(item.summary)        entry.save()@app.task(ignore_result=True)def fetch_all_news():    """    这是我们的定时任务    fetch all origins' news to db    """    origins = NewsOrigin.objects.all()    for origin in origins:        fetch_news.delay(origin.origin_name)
    

    tasks里是一些耗时操作,比如网络IO或者数据库读写,因为我们不关心任务的返回值,所以使用@app.task(ignore_result=True)将其屏蔽了。

    任务配置完成后我们就要配置celery了,我们选择redis作为任务队列,我强烈建议在生产环境中使用rabbitmq或者redis作为任务队列或结果缓存后端,而不应该使用关系型数据库:

    # redisREDIS_PORT = 6379REDIS_DB = 0# 从环境变量中取得redis服务器地址REDIS_HOST = os.environ.get('REDIS_ADDR', 'redis')# celery settings# 这两项必须设置,否则不能正常启动celery beatCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = TIME_ZONE# 任务队列配置CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_ACCEPT_CONTENT = ['application/json', ]CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_TASK_SERIALIZER = 'json'
    

    然后是我们的定时任务设置:

    from celery.schedules import crontabCELERY_BEAT_SCHEDULE={        'fetch_news_every-1-hour': {            'task': 'news.tasks.fetch_all_news',            'schedule': crontab(minute=0, hour='*/1'),        }}
    

    定时任务配置对象是一个dict,由任务名和配置项组成,主要配置想如下:

    • task:任务函数所在的模块,模块路径得写全,否则找不到将无法运行该任务
    • schedule:定时策略,一般使用celery.schedules.crontab,上面例子为每小时的0分执行一次任务,具体写法与linux的crontab类似可以参考文档说明
    • args:是个元组,给出任务需要的参数,如果不需要参数也可以不写进配置,就像例子中的一样
    • 其余配置项较少用,可以参考文档
      至此,配置celery beat的部分就结束了。

    异步任务

    Celery

    在项目中使用celery 

    可以把celery配置成一个应用

    目录格式如下

    proj/__init__.py
        /celery.py
        /tasks.py
    

    三、Django中使用定时任务

      如果想要在django中使用定时任务功能同样是靠beat完成任务发送功能,当在Django中使用定时任务时,需要安装django-celery-beat插件。以下将介绍使用过程。

    启动celery beat

    配置完成后只需要启动celery了。

    启动之前配置一下环境。不要用root运行celery!不要用root运行celery!不要用root运行celery!重要的事情说三遍。

    start-celery.sh:

    export REDIS_ADDR=127.0.0.1celery -A linux_news worker -l info -B -f /path/to/log
    

    -A 表示app所在的目录,-B表示启动celery beat运行定时任务。
    celery正常启动后就可以通过日志来查看任务是否正常运行了:

    [2018-12-21 13:00:00,022: INFO/MainProcess] Received task: news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d]  [2018-12-21 13:00:00,046: INFO/MainProcess] Received task: news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b]  [2018-12-21 13:00:00,051: INFO/ForkPoolWorker-2] Task news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] succeeded in 0.02503809699555859s: None[2018-12-21 13:00:00,052: INFO/MainProcess] Received task: news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25]  [2018-12-21 13:00:00,449: INFO/ForkPoolWorker-5] Task news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] succeeded in 0.39487219898728654s: None[2018-12-21 13:00:00,606: INFO/ForkPoolWorker-3] Task news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] succeeded in 0.5523456179944333s: None
    

    以上就是celery4运行定时任务的内容,如有错误和疏漏,欢迎指正。

    我的异步使用场景为项目上线:前端web上有个上线按钮,点击按钮后发请求给后端,后端执行上线过程要5分钟,后端在接收到请求后把任务放入队列异步执行,同时马上返回给前端一个任务执行中的结果。若果没有异步执行会怎么样呢?同步的情况就是执行过程中前端一直在等后端返回结果,页面转呀转的就转超时了。

    安装

    proj/celery.py内容

     

    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    
    app = Celery('proj',
                 broker='amqp://',
                 backend='amqp://',
                 include=['proj.tasks'])
    
    # Optional configuration, see the application user guide.
    app.conf.update(
        result_expires=3600,
    )
    
    if __name__ == '__main__':
        app.start()
    

     

    安装配置

    1.beat插件安装

    pip3 install django-celery-beat
    

    2.注册APP

    INSTALLED_APPS = [
        ....   
        'django_celery_beat',
    ]
    

    3.数据库变更

    python3 manage.py migrate django_celery_beat
    

    4.分别启动woker和beta

    celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #启动beta 调度器使用数据库
    
    celery worker -A taskproj -l info #启动woker
    

    5.配置admin

    urls.py

    # urls.py
    from django.conf.urls import url
    from django.contrib import admin
    
    urlpatterns = [
        url(r'^admin/', admin.site.urls),
    ]
    

    6.创建用户

    python3 manage.py createsuperuser 
    

    7.登录admin进行管理(地址

    新葡亰496net 7

     

     使用示例:

    新葡亰496net 8

     

     

     

     

    新葡亰496net 9

     

     

     查看结果:

    新葡亰496net 10

     

    异步任务配置

    安装Celery

    推荐使用pip安装,如果你使用的是虚拟环境,请在虚拟环境里安装

    $ pip install celery
    

    proj/tasks.py中的内容

     

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    
    @app.task
    def add(x, y):
        return x   y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    

     

    启动worker 

    $ celery -A proj worker -l info
    

    输出

     

    -------------- celery@Zhangwei-MacBook-Pro.local v4.0.2 (latentcall)
    ---- **** -----
    --- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         proj:0x103a020f0
    - ** ---------- .> transport:   redis://localhost:6379//
    - ** ---------- .> results:     redis://localhost/
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    

     

    后台启动worker

    二次开发

      django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器,django-celery-beat插件已经内置了model,只需要进行导入便可进行orm操作,以下我用django reset api进行示例:

    settings.py

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'app01.apps.App01Config',
        'django_celery_results',
        'django_celery_beat',
        'rest_framework',
    ]
    

    urls.py

    urlpatterns = [
        url(r'^admin/', admin.site.urls),
        url(r'^index$', views.index),
        url(r'^res$', views.get_res),
        url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
    ]
    

    views.py

    from django_celery_beat.models import PeriodicTask  #倒入插件model
    from rest_framework import serializers
    from rest_framework import pagination
    from rest_framework.viewsets import ModelViewSet
    class Userserializer(serializers.ModelSerializer):
        class Meta:
            model = PeriodicTask
            fields = '__all__'
    
    class Mypagination(pagination.PageNumberPagination):
        """自定义分页"""
        page_size=2
        page_query_param = 'p'
        page_size_query_param='size'
        max_page_size=4
    
    class TaskView(ModelViewSet):
        queryset = PeriodicTask.objects.all()
        serializer_class = Userserializer
        permission_classes = []
        pagination_class = Mypagination
    

    访问

    新葡亰496net 11

     

    1.安装rabbitmq,这里我们使用rabbitmq作为broker,安装完成后默认启动了,也不需要其他任何配置

    安装消息中间件

    Celery 支持 RabbitMQ、Redis 甚至其他数据库系统作为其消息代理中间件

    你希望用什么中间件和后端就请自行安装,一般都使用redis或者RabbitMQ

    新葡亰496net试行异步职务和定期职责,Celery布满式职务队列。In the background

    In production you’ll want to run the worker in the background, this is described in detail in the daemonization tutorial.

    The daemonization scripts uses the celery multi command to start one or more workers in the background:

    $ celery multi start w1 -A proj -l info
    celery multi v4.0.0 (latentcall)
    > Starting nodes...
        > w1.halcyon.local: OK
    

    You can restart it too:

    $ celery  multi restart w1 -A proj -l info
    celery multi v4.0.0 (latentcall)
    > Stopping nodes...
        > w1.halcyon.local: TERM -> 64024
    > Waiting for 1 node.....
        > w1.halcyon.local: OK
    > Restarting node w1.halcyon.local: OK
    celery multi v4.0.0 (latentcall)
    > Stopping nodes...
        > w1.halcyon.local: TERM -> 64052
    

    or stop it:

    $ celery multi stop w1 -A proj -l info
    

    The stop command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait command instead, this ensures all currently executing tasks is completed before exiting:

    $ celery multi stopwait w1 -A proj -l info
    
    # apt-get install rabbitmq-server
    

    安装Redis

    在Ubuntu系统下使用apt-get命令就可以

    $ sudo apt-get install redis-server
    

    如果你使用redis作为中间件,还需要安装redis支持包,同样使用pip安装即可

    $ pip install redis
    

    能出现以下结果即为成功

    redis 127.0.0.1:6379>
    

    其他的redis知识这里不左介绍,如果有兴趣,可以自行了解

    如果你使用RabbitMQ,也请安装RabbitMQ

    Celery 定时任务

    celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫celery beat

    写一个脚本 叫periodic_task.py

     

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery()
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    
        # Calls test('world') every 30 seconds
        sender.add_periodic_task(30.0, test.s('world'), expires=10)
    
        # Executes every Monday morning at 7:30 a.m.
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            test.s('Happy Mondays!'),
        )
    
    @app.task
    def test(arg):
        print(arg)
    

     

    add_periodic_task 会添加一条定时任务

    上面是通过调用函数添加定时任务,也可以像写配置文件 一样的形式添加, 下面是每30s执行的任务

     

    app.conf.beat_schedule = {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': 30.0,
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'UTC'
    

     

      任务添加好了,需要让celery单独启动一个进程来定时发起这些任务, 注意, 这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划, 每发现有任务需要执行了,就发起一个任务调用消息,交给celery worker去执行

    启动任务调度器 celery beat

    $ celery -A periodic_task beat

    输出like below

     

    celery beat v4.0.2 (latentcall) is starting.
    __    -    ... __   -        _
    LocalTime -> 2017-02-08 18:39:31
    Configuration ->
        . broker -> redis://localhost:6379//
        . loader -> celery.loaders.app.AppLoader
        . scheduler -> celery.beat.PersistentScheduler
        . db -> celerybeat-schedule
        . logfile -> [stderr]@%WARNING
        . maxinterval -> 5.00 minutes (300s)
    

     

     

    此时还差一步,就是还需要启动一个worker,负责执行celery beat发起的任务

    启动celery worker来执行任务

     

    $ celery -A periodic_task worker
    
     -------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
    ---- **** -----
    --- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-02-08 18:42:08
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         tasks:0x104d420b8
    - ** ---------- .> transport:   redis://localhost:6379//
    - ** ---------- .> results:     redis://localhost/
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    

     

    好啦,此时观察worker的输出,是不是每隔一小会,就会执行一次定时任务呢!

    注意:Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:

    $ celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule
    

    2.安装celery

    安装RabbitMQ

    $ sudo apt-get install rabbitmq-server
    

    更复杂的定时配置

    # pip3 install celery
    

    使用Celery

    上面的定时任务比较简单,只是每多少s执行一个任务,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?哈,其实也简单,用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间

    3.celery用在django项目中,django项目目录结构(简化)如下

    简单直接使用

    可以在需要的地方直接引入Celery,直接使用即可。最简单的方式只需要配置一个任务和中间人即可

    from celery import Celery
    
    app = Celery('tasks', broker='redis://localhost:6379/3')
    
    @app.task
    def add(x, y):
        return x   y
    

    我这里使用了redis作为中间件,这是可以按自己的习惯替换的

    由于默认的配置不是最切合我们的项目实际需要,一般来说我们都需要按我们自己的要求配置一些,
    但是由于需要将项目解耦,也好维护,我们最好使用单独的一个文件编写配置。

    Linux crontab  

     

     

    from celery.schedules import crontab
    
    app.conf.beat_schedule = {
        # Executes every Monday morning at 7:30 a.m.
        'add-every-monday-morning': {
            'task': 'tasks.add',
            'schedule': crontab(hour=7, minute=30, day_of_week=1),
            'args': (16, 16),
        },
    }
    

     

    上面的这条意思是每周1的早上7.30执行tasks.add任务

    还有更多定时配置方式如下:

    Example

      Meaning
    crontab() Execute every minute.
    crontab(minute=0, hour=0) Execute daily at midnight.
    crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
    crontab(minute=0,
    hour='0,3,6,9,12,15,18,21')
    Same as previous.
    crontab(minute='*/15') Execute every 15 minutes.
    crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
    crontab(minute='*',
    hour='*',day_of_week='sun')
    Same as previous.
    crontab(minute='*/10',
    hour='3,17,22',day_of_week='thu,fri')
    Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
    crontab(minute=0,hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
    crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
    crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
    crontab(0, 0,day_of_month='2') Execute on the second day of every month.
    crontab(0, 0,
    day_of_month='2-30/3')
    Execute on every even numbered day.
    crontab(0, 0,
    day_of_month='1-7,15-21')
    Execute on the first and third weeks of the month.
    crontab(0, 0,day_of_month='11',
    month_of_year='5')
    Execute on the eleventh of May every year.
    crontab(0, 0,
    month_of_year='*/3')
    Execute on the first month of every quarter.

    上面能满足你绝大多数定时任务需求了,甚至还能根据潮起潮落来配置定时任务, 具体看

    website/
    |-- deploy
    |  |-- admin.py
    |  |-- apps.py
    |  |-- __init__.py
    |  |-- models.py
    |  |-- tasks.py
    |  |-- tests.py
    |  |-- urls.py
    |  `-- views.py
    |-- manage.py
    |-- README
    `-- website
      |-- celery.py
      |-- __init__.py
      |-- settings.py
      |-- urls.py
      `-- wsgi.py
    

    单独配置配置文件

    比上面的稍微复杂一点,我们需要创建两个文件,一个为config.py的celery配置文件,在其中填写适合我们项目的配置,在创建一个tasks.py文件来编写我们的任务。文件的名字可以按你的喜好自己命名。

    config.py内容为:

    # coding=utf-8
    # 配置文件同一配置celery
    BROKER_URL = 'redis://localhost:6379/3'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'
    
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = True
    
    # 把“脏活”路由到专用的队列:
    CELERY_ROUTES = {
        'tasks.add': 'low-priority',
    }
    
    # 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
    CELERY_ANNOTATIONS = {
        'tasks.add': {'rate_limit': '10/m'}
    }
    

    配置好以后可以用以下命令检查配置文件是否正确(config为配置文件名)

    $ python -m config
    

    tasks.py内容为:

    # coding=utf-8
    from celery import Celery
    
    app = Celery()
    # 参数为配置文件的文件名
    app.config_from_object('config')
    
    @app.task
    def add(x, y):
        return x   y
    

    还有一种同一设置配置的方式,不是很推荐

    app.conf.update(
        task_serializer='json',
        accept_content=['json'],  # Ignore other content
        result_serializer='json',
        timezone='Europe/Oslo',
        enable_utc=True,
    )
    

    在app使用前先需要用以上方法批量更新配置文件。

    Celery与django结合

    django 可以轻松跟celery结合实现异步任务,只需简单配置即可

    If you have a modern Django project layout like:

    - proj/
      - proj/__init__.py
      - proj/settings.py
      - proj/urls.py
    - manage.py
    

    then the recommended way is to create a new proj/proj/celery.py module that defines the Celery instance:

    file: proj/proj/celery.py 

     

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    app = Celery('proj')
    
    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

     

    Then you need to import this app in your proj/proj/__init__.py module. This ensures that the app is loaded when Django starts so that the @shared_task decorator (mentioned later) will use it:

    proj/proj/__init__.py: 

      

     

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    

     

    Note that this example project layout is suitable for larger projects, for simple projects you may use a single contained module that defines both the app and tasks, like in the First Steps with Celery tutorial.

    Let’s break down what happens in the first module, first we import absolute imports from the future, so that our celery.py module won’t clash with the library:

    from __future__ import absolute_import
    

    Then we set the default DJANGO_SETTINGS_MODULE environment variable for the celery command-line program:

    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    

    You don’t need this line, but it saves you from always passing in the settings module to the celery program. It must always come before creating the app instances, as is what we do next:

    app = Celery('proj')
    

    This is our instance of the library.

    We also add the Django settings module as a configuration source for Celery. This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings; but you can also separate them if wanted.

    The uppercase name-space means that all Celery configuration options must be specified in uppercase instead of lowercase, and start with CELERY_, so for example the task_always_eager` setting becomes CELERY_TASK_ALWAYS_EAGER, and the broker_url setting becomes CELERY_BROKER_URL.

    You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.

    app.config_from_object('django.conf:settings', namespace='CELERY')
    

    Next, a common practice for reusable apps is to define all tasks in a separate tasks.pymodule, and Celery does have a way to  auto-discover these modules:

      app.autodiscover_tasks()

    With the line above Celery will automatically discover tasks from all of your installed apps, following the tasks.py convention:

      

    - app1``/

    ``- tasks.py

    ``- models.py

    - app2``/

    ``- tasks.py

    ``- models.py

    Finally, the debug_task example is a task that dumps its own request information. This is using the new bind=True task option introduced in Celery 3.1 to easily refer to the current task instance.

    然后在具体的app里的tasks.py里写你的任务

     

    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        return x   y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    

     

     

    在你的django views里调用celery task

     

    from django.shortcuts import render,HttpResponse
    
    # Create your views here.
    
    from  bernard import tasks
    
    def task_test(request):
    
        res = tasks.add.delay(228,24)
        print("start running task")
        print("async task res",res.get() )
    
        return HttpResponse('res %s'%res.get())
    

     

     

    4.创建 website/celery.py 主文件

    在应用上使用

    工程目录结构为

    proj/
        __init__.py
        # 存放配置和启动celery代码
        celery.py
        # 存放任务
        tasks.py
    

    celery.py为:

    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    
    app = Celery('proj',
                 broker='redis://localhost:6379/3',
                 backend='redis://localhost:6379/4',
                 include=['proj.tasks'])
    
    # Optional configuration, see the application user guide.
    app.conf.update(
        result_expires=3600,
    )
    
    if __name__ == '__main__':
        app.start()
    

    tasks.py为:

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    
    @app.task
    def add(x, y):
        return x   y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    

    启动celery只需要在proj同级目录下:

    $ celery -A proj worker -l info
    

    在django中使用计划任务功能

    There’s  the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

    To install and use this extension:

    1. Use pip to install the package:

      $ pip install django-celery-beat
      
    2. Add the django_celery_beat module to INSTALLED_APPS in your Django project’ settings.py:

          INSTALLED_APPS = (
              ...,
              'django_celery_beat',
          )
      
      Note that there is no dash in the module name, only underscores.
      
    3. Apply Django database migrations so that the necessary tables are created:

      $ python manage.py migrate
      
    4. Start the celery beat service using the django scheduler:

      $ celery -A proj beat -l info -S django
      
    5. Visit the Django-Admin interface to set up some periodic tasks.

     

    在admin页面里,有3张表

    新葡亰496net 12

    配置完长这样

    新葡亰496net 13

     

     

    此时启动你的celery beat 和worker,会发现每隔2分钟,beat会发起一个任务消息让worker执行scp_task任务

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery, platforms
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')
    
    app = Celery('website')
    
    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #  should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    # 允许root 用户运行celery
    platforms.C_FORCE_ROOT = True
    
    @app.task(bind=True)
    def debug_task(self):
      print('Request: {0!r}'.format(self.request))
    

    在django中使用celery

    我们的django的项目的目录结构一般如下

    proj/
        manage.py
        myapp/
        proj/
            __init__py
            settings.py
            urls.py
            wsgi.py
    

    想要在django项目中使用celery,我们首先需要在django中配置celery

    我们需要在与工程名同名的子文件夹中添加celery.py新葡亰496net,文件
    在本例中也就是proj/proj/celery.py

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    # 第二个参数为工程名.settings
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    # 括号里的参数为工程名
    app = Celery('proj')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    # 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    # 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

    然后我们需要在同级目录下的**init.py文件中配置如下内容 proj/proj/init.py**

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    

    然后我们就可以把需要的任务放到需要的app下的tasks.py中,现在项目目录结构如下

    proj/
        manage.py
        myapp1/
            __init__.py
            tasks.py
            views.py
            model.py
            tests.py
        myapp2/
            __init__.py
            tasks.py
            views.py
            model.py
            tests.py
        proj/
            __init__py
            settings.py
            urls.py
            wsgi.py
    

    可能的一个tasks.py文件内容如下:
    myapp1/tasks.py为:

    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    import time
    
    
    @shared_task
    def add(x, y):
        # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
        time.sleep(5)
        print(x y)
        return x   y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    

    @shared_task修饰器可以让你创建task不需要app实体

    在需要的地方调用相关任务即可,例如在myapp1/views.py中调用

    from django.shortcuts import render
    from .tasks import add
    
    
    def index(request):
        # 测试celery任务
        add.delay(4,5)
        return render(request,'index.html')
    

    然后就可以启动项目,celery需要单独启动,所以需要开两个终端,分别

    启动web应用服务器

    $ python manage.py runserver
    

    启动celery

    $ celery -A proj worker -l info
    

    然后访问浏览器就可以在启动celery的终端中看到输出

    新葡亰496net 14

    测试结果

    5.在 website/__init__.py 文件中增加如下内容,确保django启动的时候这个app能够被加载到

    扩展

    • 如果你的项目需要在admin中管理调度,请使用django-celery-beat
    1. 使用pip安装django-celery-beat
    $ pip install django-celery-beat
    

    不要在使用django-celery,这个项目已经停止更新好好多年。。。。

    1. 在settings.py中添加这个app
    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )
    
    1. 同步一下数据库
    $ python manage.py migrate
    
    1. 设置celery beat服务使用django_celery_beat.schedulers:DatabaseScheduler scheduler
    $ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    

    然后在就可以admin界面看到了。

    • 如果你想使用Django-ORM或者Django Cache作为后端,需要安装django-celery-results扩展(笔者不建议)
    1. 使用pip安装django-celery-results
    $ pip install django-celery-results
    

    不要在使用django-celery,这个项目已经停止更新好好多年。。。。

    1. 在settings.py中添加这个app
    INSTALLED_APPS = (
        ...,
        'django_celery_results',
    )
    
    1. 同步一下数据库
    $ python manage.py migrate django_celery_results
    
    1. 配置后端,在settings.py中配置
    # 使用数据库作为结果后端
    CELERY_RESULT_BACKEND = 'django-db'
    
    # 使用缓存作为结果后端
    CELERY_RESULT_BACKEND = 'django-cache'
    

    基本使用大概就是上述这些,其他具体配置和使用还需自己研读官方文档

    注:

    • 上述环境在ubuntu16.04 lts django1.9中搭建测试成功
    • 上述文字皆为个人看法,如有错误或建议请及时联系我
    from __future__ import absolute_import
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    

    6.各应用创建tasks.py文件,这里为 deploy/tasks.py

    from __future__ import absolute_import
    from celery import shared_task
    
    @shared_task
    def add(x, y):
      return x   y
    

    注意tasks.py必须建在各app的根目录下,且只能叫tasks.py,不能随意命名

    7.views.py中引用使用这个tasks异步处理

    from deploy.tasks import add
    
    def post(request):
      result = add.delay(2, 3)
    
    
    result.ready()
    result.get(timeout=1)
    result.traceback
    

    8.启动celery

    # celery -A website worker -l info
    

    9.这样在调用post这个方法时,里边的add就可以异步处理了

    定时任务

    定时任务的使用场景就很普遍了,比如我需要定时发送报告给老板~

    定时任务配置

    1. website/celery.py 文件添加如下配置以支持定时任务crontab
    from celery.schedules import crontab
    
    app.conf.update(
      CELERYBEAT_SCHEDULE = {
        'sum-task': {
          'task': 'deploy.tasks.add',
          'schedule': timedelta(seconds=20),
          'args': (5, 6)
        }
        'send-report': {
          'task': 'deploy.tasks.report',
          'schedule': crontab(hour=4, minute=30, day_of_week=1),
        }
      }
    )
    

    定义了两个task:

    • 名字为'sum-task'的task,每20秒执行一次add函数,并传了两个参数5和6
    • 名字为'send-report'的task,每周一早上4:30执行report函数

    timedelta是datetime中的一个对象,需要 from datetime import timedelta 引入,有如下几个参数

    • days
    • seconds
    • microseconds
    • milliseconds
    • minutes
    • hours

    crontab的参数有:

    month_of_year
    day_of_month
    day_of_week
    hour
    minute

    1. deploy/tasks.py 文件添加report方法:
    @shared_task
    def report():
      return 5
    

    3.启动celery beat,celery启动了一个beat进程一直在不断的判断是否有任务需要执行

    # celery -A website beat -l info
    

    Tips

    1.如果你同时使用了异步任务和计划任务,有一种更简单的启动方式 celery -A website worker -b -l info ,可同时启动worker和beat

    2.如果使用的不是rabbitmq做队列那么需要在主配置文件中 website/celery.py 配置broker和backend,如下:

    # redis做MQ配置
    app = Celery('website', backend='redis', broker='redis://localhost')
    # rabbitmq做MQ配置
    app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')
    

    3.celery不能用root用户启动的话需要在主配置文件中添加 platforms.C_FORCE_ROOT = True

    4.celery在长时间运行后可能出现内存泄漏,需要添加配置 CELERYD_MAX_TASKS_PER_CHILD = 10 ,表示每个worker执行了多少个任务就死掉

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    您可能感兴趣的文章:

    • 异步任务队列Celery在Django中的使用方法
    • Django使用Celery异步任务队列的使用
    • Django中使用celery完成异步任务的示例代码

    本文由新葡亰496net发布于奥门新萄京娱乐场,转载请注明出处:新葡亰496net试行异步职务和定期职责,Celery布满

    关键词: