欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Celery

时间:2023-07-22
1.基础 1.1 定义

Celery 是一个处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度

1.2 组成

Celery 的架构由三部分组成:

组成部分含义消息中间件Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成.包括,RabbitMQ, Redis 等等任务执行单元Worker是Celery 提供的任务执行的单元,worker 并发的运行在分布式的系统节点中任务结果存储用来存储 Worker 执行的任务的结果,Celery支持以不同方式存储任务的结果,包括 AMQP,redis

另外,Celery 还支持不同的并发和序列化的手段

并发:Prefork, Eventlet, gevent, threads/single threaded序列化:pickle, json, yaml, msgpack、zlib, bzip2 compression, Cryptographic message signing 等 1.3 使用场景

celery 是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行.我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

异步任务:将耗时操作任务提交给 Celery 去异步执行,比如发送短信 / 邮件,消息推送,音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

1.4 优点 特点含义简单Celery 使用和维护都非常简单,并且不需要配置文件高可用woker和client会在网络连接丢失或者失败时,自动进行重试.并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用快速单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟使(用 RabbitMQ,librabbitmq,和优化设置时)灵活Celery几乎每个部分都可以扩展使用,自定义池实现,序列化,压缩方案,日志记录,调度器,消费者,生产者,broker传输等等2.异步任务 2.1 安装

pip install Celery

2.2 基本使用

celery_task

#!/usr/bin/env pythonimport celeryimport timebackend='redis://127.0.0.1:6379/1'broker='redis://127.0.0.1:6379/2'cel=celery.Celery('test',backend=backend,broker=broker)@cel.taskdef send_email(name): print("向%s发送邮件..."%name) time.sleep(5) print("向%s发送邮件完成"%name) return "ok"  

produce_task.py:

#!/usr/bin/env pythonfrom celery_task import send_email, celfrom celery.result import AsyncResultresult = send_email.delay("ja1")print(result.id)async_result=AsyncResult(id=result.id, app=cel)if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除elif async_result.failed(): print('执行失败')elif async_result.status == 'PENDING': print('任务等待中被执行')elif async_result.status == 'RETRY': print('任务异常后正在重试')elif async_result.status == 'STARTED': print('任务已经开始被执行')

启动命令:

celery -A celery_task worker -l info -P eventlet

2.3 多任务结构

celery_task/celery.py:

#!/usr/bin/env pythonfrom celery import Celerybroker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'include_list = ['celery_tasks.task01', 'celery_tasks.task02']cel = Celery('celery_demo', broker=broker, backend=backend, include=include_list)# 时区cel.conf.timezone = 'Asia/Shanghai'# 是否使用UTCcel.conf.enable_utc = False

celery_task/task01.py

import timefrom celery_tasks.celery import cel@cel.taskdef send_email(res): time.sleep(5) return "完成向%s发送邮件任务"%res

celery_task/task02.py

import timefrom celery_tasks.celery import cel@cel.taskdef send_msg(name): time.sleep(5) return "完成向%s发送短信任务"%name

celery_task/…/produce_task.py:

#!/usr/bin/env pythonfrom celery_task import send_email, celfrom celery.result import AsyncResultresult = send_email.delay("ja1")print(result.id)async_result=AsyncResult(id=result.id, app=cel)if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除elif async_result.failed(): print('执行失败')elif async_result.status == 'PENDING': print('任务等待中被执行')elif async_result.status == 'RETRY': print('任务异常后正在重试')elif async_result.status == 'STARTED': print('任务已经开始被执行')

启动命令

celery worker -A celery_task -l info -P eventlet -c 1000

3.4 Django中的应用

celery_task/main.py

# celery启动文件from celery import Celery# 创建celery实例celery_app = Celery('ja')# 把celery和django进行组合,识别和加载django的配置文件os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')# 加载celery配置celery_app.config_from_object('celery_tasks.config')# 自动注册celery任务celery_app.autodiscover_tasks(['celery_tasks.sms'])

celery_task/config.py

#!/usr/bin/env python# 消息中间件broker_url= "redis://guest:guest@127.0.0.1:6379/1"# 任务结果存储result_backend= "redis://guest:guest@127.0.0.1:6379/2"# 时区timezone = 'Asia/Shanghai'# 是否使用UTCenable_utc = False

celery_task/task01.py

import timefrom celery_tasks.celery import cel@cel.taskdef send_email(res): time.sleep(5) return "完成向%s发送邮件任务"%res

celery_task/task02.py

import timefrom celery_tasks.celery import cel@cel.taskdef send_msg(name): time.sleep(5) return "完成向%s发送短信任务"%name

celery_task/…/produce_task.py:

#!/usr/bin/env pythonfrom celery_task import send_email, celfrom celery.result import AsyncResultresult = send_email.delay("ja1")print(result.id)async_result=AsyncResult(id=result.id, app=cel)if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除elif async_result.failed(): print('执行失败')elif async_result.status == 'PENDING': print('任务等待中被执行')elif async_result.status == 'RETRY': print('任务异常后正在重试')elif async_result.status == 'STARTED': print('任务已经开始被执行')

启动命令

celery -A celery_tasks.main worker -l info --concurrency=1000 -P eventlet -c 1000

-A指对应的应用程序, 其参数是项目中 Celery实例的位置。worker指这里要启动的worker。-l指日志等级,比如info等级。默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。如何自己指定进程数:celery worker -A proj --concurrency=4如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000 3.定时任务

crontab_task.py:

from celery_task import send_emailimport datetimev1 = datetime.datetime.now()v1=v1+datetime.timedelta(seconds=5)print(f"now time {v1}")v2 = datetime.datetime.utcfromtimestamp(v1.timestamp())result = send_email.apply_async(args=["ja",], eta=v2)print(result.id)

多任务结构中 celery.py 修改如下:

#!/usr/bin/env pythonfrom celery import Celeryfrom celery.schedules import crontab# 消息中间件broker_url = "redis://guest:guest@127.0.0.1:6379/1"# 任务结果存储result_backend = "redis://guest:guest@127.0.0.1:6379/2"cel = Celery('tasks', broker=broker_url, backend=result_backend, include=['celery_tasks.task01', 'celery_tasks.task02', ])cel.conf.timezone = 'Asia/Shanghai'cel.conf.enable_utc = Falsecel.conf.beat_schedule = { # 名字随意命名 'add-every-10-seconds': { # 执行tasks1下的test_celery函数 'task': 'celery_tasks.task01.send_email', # 每隔1分钟执行一次 'schedule': crontab(minute="*/1"), # 传递参数 'args': ('ja',) }, 'add-every-12-seconds': { 'task': 'celery_tasks.task01.send_email', # 每年4月11号,8点42分执行 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), 'args': ('ja2',) }}

#方式一# Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列celery beat -A proj # 启动 Beat 程序 celery -A proj worker -l info #启动 worker 进程#方式二celery -B -A proj worker -l info

注意

启动定时任务需要清空消息队列broker_url 中的celery

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。