Celery 是一个处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度
1.2 组成Celery 的架构由三部分组成:
另外,Celery 还支持不同的并发和序列化的手段
并发:Prefork, Eventlet, gevent, threads/single threaded序列化:pickle, json, yaml, msgpack、zlib, bzip2 compression, Cryptographic message signing 等 1.3 使用场景
celery 是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行.我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时操作任务提交给 Celery 去异步执行,比如发送短信 / 邮件,消息推送,音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
1.4 优点pip install Celery
2.2 基本使用celery_task
#!/usr/bin/env pythonimport celeryimport timebackend='redis://127.0.0.1:6379/1'broker='redis://127.0.0.1:6379/2'cel=celery.Celery('test',backend=backend,broker=broker)@cel.taskdef send_email(name): print("向%s发送邮件..."%name) time.sleep(5) print("向%s发送邮件完成"%name) return "ok"
produce_task.py:
#!/usr/bin/env pythonfrom celery_task import send_email, celfrom celery.result import AsyncResultresult = send_email.delay("ja1")print(result.id)async_result=AsyncResult(id=result.id, app=cel)if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除elif async_result.failed(): print('执行失败')elif async_result.status == 'PENDING': print('任务等待中被执行')elif async_result.status == 'RETRY': print('任务异常后正在重试')elif async_result.status == 'STARTED': print('任务已经开始被执行')
启动命令:
celery -A celery_task worker -l info -P eventlet
2.3 多任务结构celery_task/celery.py:
#!/usr/bin/env pythonfrom celery import Celerybroker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'include_list = ['celery_tasks.task01', 'celery_tasks.task02']cel = Celery('celery_demo', broker=broker, backend=backend, include=include_list)# 时区cel.conf.timezone = 'Asia/Shanghai'# 是否使用UTCcel.conf.enable_utc = False
celery_task/task01.py
import timefrom celery_tasks.celery import cel@cel.taskdef send_email(res): time.sleep(5) return "完成向%s发送邮件任务"%res
celery_task/task02.py
import timefrom celery_tasks.celery import cel@cel.taskdef send_msg(name): time.sleep(5) return "完成向%s发送短信任务"%name
celery_task/…/produce_task.py:
#!/usr/bin/env pythonfrom celery_task import send_email, celfrom celery.result import AsyncResultresult = send_email.delay("ja1")print(result.id)async_result=AsyncResult(id=result.id, app=cel)if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除elif async_result.failed(): print('执行失败')elif async_result.status == 'PENDING': print('任务等待中被执行')elif async_result.status == 'RETRY': print('任务异常后正在重试')elif async_result.status == 'STARTED': print('任务已经开始被执行')
启动命令
celery worker -A celery_task -l info -P eventlet -c 1000
3.4 Django中的应用celery_task/main.py
# celery启动文件from celery import Celery# 创建celery实例celery_app = Celery('ja')# 把celery和django进行组合,识别和加载django的配置文件os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')# 加载celery配置celery_app.config_from_object('celery_tasks.config')# 自动注册celery任务celery_app.autodiscover_tasks(['celery_tasks.sms'])
celery_task/config.py
#!/usr/bin/env python# 消息中间件broker_url= "redis://guest:guest@127.0.0.1:6379/1"# 任务结果存储result_backend= "redis://guest:guest@127.0.0.1:6379/2"# 时区timezone = 'Asia/Shanghai'# 是否使用UTCenable_utc = False
celery_task/task01.py
import timefrom celery_tasks.celery import cel@cel.taskdef send_email(res): time.sleep(5) return "完成向%s发送邮件任务"%res
celery_task/task02.py
import timefrom celery_tasks.celery import cel@cel.taskdef send_msg(name): time.sleep(5) return "完成向%s发送短信任务"%name
celery_task/…/produce_task.py:
#!/usr/bin/env pythonfrom celery_task import send_email, celfrom celery.result import AsyncResultresult = send_email.delay("ja1")print(result.id)async_result=AsyncResult(id=result.id, app=cel)if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除elif async_result.failed(): print('执行失败')elif async_result.status == 'PENDING': print('任务等待中被执行')elif async_result.status == 'RETRY': print('任务异常后正在重试')elif async_result.status == 'STARTED': print('任务已经开始被执行')
启动命令
celery -A celery_tasks.main worker -l info --concurrency=1000 -P eventlet -c 1000
-A指对应的应用程序, 其参数是项目中 Celery实例的位置。worker指这里要启动的worker。-l指日志等级,比如info等级。默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。如何自己指定进程数:celery worker -A proj --concurrency=4如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000 3.定时任务
crontab_task.py:
from celery_task import send_emailimport datetimev1 = datetime.datetime.now()v1=v1+datetime.timedelta(seconds=5)print(f"now time {v1}")v2 = datetime.datetime.utcfromtimestamp(v1.timestamp())result = send_email.apply_async(args=["ja",], eta=v2)print(result.id)
多任务结构中 celery.py 修改如下:
#!/usr/bin/env pythonfrom celery import Celeryfrom celery.schedules import crontab# 消息中间件broker_url = "redis://guest:guest@127.0.0.1:6379/1"# 任务结果存储result_backend = "redis://guest:guest@127.0.0.1:6379/2"cel = Celery('tasks', broker=broker_url, backend=result_backend, include=['celery_tasks.task01', 'celery_tasks.task02', ])cel.conf.timezone = 'Asia/Shanghai'cel.conf.enable_utc = Falsecel.conf.beat_schedule = { # 名字随意命名 'add-every-10-seconds': { # 执行tasks1下的test_celery函数 'task': 'celery_tasks.task01.send_email', # 每隔1分钟执行一次 'schedule': crontab(minute="*/1"), # 传递参数 'args': ('ja',) }, 'add-every-12-seconds': { 'task': 'celery_tasks.task01.send_email', # 每年4月11号,8点42分执行 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), 'args': ('ja2',) }}
#方式一# Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列celery beat -A proj # 启动 Beat 程序 celery -A proj worker -l info #启动 worker 进程#方式二celery -B -A proj worker -l info
注意
启动定时任务需要清空消息队列broker_url 中的celery