Django中异步任务---django-celery

Celery文档参考:http://docs.jinkan.org/docs/celery/

参考博客:https://blog.csdn.net/bbwangj/article/details/89312355

Django中异步任务---django-celery

Celery简单介绍:

celery使用场景:

  • 耗时任务 定时任务
  • 请求结果不怎么重要的
    • 耗时任务比如:发送短信验证码我们可以先发送给客户任务状态(请求成功或失败)
    • 请求结果重要的建议使用django实现 比如:支付

首先简单介绍一下,Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(asynctask)和定时任务(crontab)。它的架构组成如下图

Django中异步任务---django-celery

Celery 主要包含以下几个模块:

任务模块 Task

包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

消息中间件 Broker

Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

任务执行单元 Worker

Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

任务结果存储 Backend

Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

django-celery


首先需要统一一下使用的环境,以为如果redis的版本过高会报错 Django中异步任务---django-celery

解决方法:建议降低redis版本

推荐版本

Django == 2.2.6

django-celery == 3.3.1

django-redis == 4.11.0

redis == 2.10.6

celery == 3.1.26.post2

依赖安装:pip install ..... Django中异步任务---django-celery人都知道

  1. 修改setting.py django配置文件,增加如下:

    import djcelery  ###导包
    djcelery.setup_loader()  ###
    BROKER_URL = 'redis://127.0.0.1:6379/2'
    # BROKER_URL='redis://192.168.217.77:16379/2'  #任何可用的redis都可以,不一定要在django server运行的主机上
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  ### 
    INSTALLED_APPS = [
        ...
        "djcelery",	# 加入djcelery应用
        ...
    
    ]
    CELERY_TIMEZONE='Asia/Shanghai'  #并没有北京时区,与下面TIME_ZONE应该一致
    TIME_ZONE='Asia/Shanghai'  #
    

    开头增加如上配置文件,根据实际情况配置redis的地址和端口,时区一定要设置为Asia/Shanghai。否则时间不准确回影响定时任务的运行。

    上面代码首先导出djcelery模块,并调用setup_loader方法加载有关配置;注意配置时区,不然默认使用UTC时间会比东八区慢8个小时。其中INSTALLED_APPS末尾添加两项,分别表示添加celery服务和自己定义的apps服务。

  2. 创建Celery所需的数据表

    python manage.py migrate
    #如若不成功可以尝试一下命令语句
    #python manage.py syncdb
    
  3. 创建task

    Django中异步任务---django-celery

    stasks.py
    # -*- coding: utf-8 -*-
    import json, time
    from syl.settings import ALY_ACCESSKEY_ID, ALY_ACCESSKEY_SECRET
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.request import CommonRequest
    from celery import task
    
    
    # 阿里云短信验证码
    
    @task
    def Celery_Send_Sms(phone, data):
        client = AcsClient(ALY_ACCESSKEY_ID, ALY_ACCESSKEY_SECRET, 'cn-hangzhou')
        request = CommonRequest()
        request.set_accept_format('json')
        request.set_domain('dysmsapi.aliyuncs.com')
        request.set_method('POST')
        request.set_protocol_type('https')  # https | http
        request.set_version('2017-05-25')
        request.set_action_name('SendSms')
        request.add_query_param('RegionId', "cn-hangzhou")
        request.add_query_param('PhoneNumbers', phone)
        request.add_query_param('SignName', "美多商城")
        request.add_query_param('TemplateCode', "SMS_205397849")
        request.add_query_param('TemplateParam', data)
        response = client.do_action(request)
        time.sleep(10)
        print(str(response, encoding='utf-8'))
        res = json.loads(str(response, encoding='utf-8'))
    
    #celery运行命令
    # python manage.py celery worker --loglevel=info
    
    
    • 当settings.py中的djcelery.setup_loader()运行时, Celery便会查看所有INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为task的function, 并将它们注册为celery task.
    • 在执行djcelery.setup_loader()时, task是以INSTALLED_APPS中的app名, 加.tasks.function_name注册的
    • 一次需要注意 在impprt task时, 需要保持一致
    • 如果我们由于python path不同而使用不同的引用方式时(例如在tasks.py中使用from myproject.myapp.tasks import add形式), Celery将无法得知这是同一task, 因此可能会引起奇怪的bug。

    让任务变成异步

      例如我们希望在用户发出request后异步执行该task, 马上返回response, 从而不阻塞该request, 使用户有一个流畅的访问过程. 那么, 我们可以使用.delay。

Django中异步任务---django-celery

views.py

import re
import random
from rest_framework.permissions import AllowAny
from django_redis import get_redis_connection
from rest_framework.views import APIView
from rest_framework.response import Response
# from utils.MyBaseView import send_message, Send_Sms
from verifications.stasks import Celery_Send_Sms

# 用户注册短信验证码
class SmsCodeView(APIView):
    '''使用apiview的限流'''
    # 1. 所有人可以访问
    permission_classes = (AllowAny,)

    def post(self, request):
        # 1. 获取参数
        phone = request.data.get('phone')  # 手机号
        image_code = request.data.get('image_code')  # 字符串验证码
        image_code_uuid = request.data.get('image_code_uuid')  # 前端生成的uuid,是redis中图片验证码的key

        # 2. 检查参数
        if not all([phone, image_code, image_code_uuid]):
            return Response({'code': 400, 'msg': '参数不全'})

        # 检查手机号是否正确
        if not re.match(r'^1[3456789]\d{9}$', phone):
            return Response({"code": 999, "msg": "手机号码不正确"})

        # 3. 检查是否发送
        redis_client = get_redis_connection('img_code')  # 连接redis数据库
        # phone_exists = redis_client.get(phone)
        # if phone_exists:
        #     return Response({"code": 999, "msg": "频繁发送, 请稍后再试"})

        # 4.检查图片验证码是否合法
        redis_image_code = redis_client.get(image_code_uuid)  # 字符串验证码
        if redis_image_code:
            # bytes 转成 string
            redis_image_code = redis_image_code.decode()  # 把uuid解码

        # 比较用户提供的图片内容是否和redis中保存的一致
        if image_code.upper() != redis_image_code:
            return Response({'code': 999, 'msg': '图片验证码不正确'})
        # 5. 发送
        code = '%06d' % random.randint(100000, 999999)  # 随机6位验证码
        print('code===============================================', code)
        # 使用容联云短信验证码
        # send_resp = send_message(phone, (code, "5"))

        # 使用阿里云短信验证码
        data = {'code': code}
        # send_resp = Send_Sms(phone, data)
        # 使用celery异步发送短信
        Celery_Send_Sms.delay(phone, data)  #delay是注册为celery异步任务的关键点
        # Celery_Send_Sms(phone, data)  # delay是注册为celery异步任务的关键点

        # 5.1 保存code 到 redis中
        redis_client.setex(phone, 60 * 5, code)  # phone:code, 5分钟有效期
        # 5.2 从redis中删除这个图片验证码, 以防再次被使用
        redis_client.delete(image_code_uuid)

        # 6.存储这个已经发送验证码的手机号,防止频繁发送(使用pipeline 批量操作 )
        pl = redis_client.pipeline()
        pl.setex(phone, 60 * 5, code)
        pl.delete(image_code_uuid)
        pl.execute()
        # 7. 返回结果
        return Response({"code": 200, "msg": "短信发送成功"})
  1. 启动celery

    首先正常启动你的django任务,然后启动celery服务即可。

    python manage.py celery worker --loglevel=info
    

    Django中异步任务---django-celery

    出现上图这个报错不让超级管理员来启动,在settings.py加入以下配置

    from celery import Celery, platforms
    platforms.C_FORCE_ROOT = True
    
  2. 验证celery任务

      在搞定上面的东西以后,你就可以通过postman来请求接口让接口使用celery来异步执行任务而不阻塞你的request请求。
    Django中异步任务---django-celery

上一篇:Celery介绍


下一篇:celery原理与组件