Python 第三方模块之 Celery - 分布式任务队列

Celery简单介绍

Celery 是一个基于 Python 开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用 Celery,举几个实例场景:

  • 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果,在任务执行ing进行时,你可以继续做其它的事情
  • 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天是客户的生日,就给他发个短信祝福

Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis

Celery有以下优点

  • 简单:一旦熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活:几乎celery的各个组件都可以被扩展及自定制

Celery基本工作流程图

  • Producer:任务委托方
  • Broker:任务中心(中介),如RabbitMQ、Redis等
  • Beat:任务调度器
  • Worker:任务执行者,可以有多个(分布式)
  • Result:任务中心的数据库,储存任务执行结果
  • Backend:因为任务经由中介,而非直接委派到Worker手上,所以Producer并不知道任务被委派给了谁,以及任务的完成结果,所以这时候需要一个Backend(理解成手机,通过手机查看任务完成情况)

Celery安装使用

安装celery模块

1
$ pip install celery

celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

  1. Celery的默认broker是RabbitMQ, 仅需配置一行就可以

    1
    broker_url = 'amqp://guest:guest@localhost:5672//'

    rabbitMQ 没装的话请装一下,安装看这里

    http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3

  2. 使用Redis做broker也可以,安装redis组件

    1
    $ pip install -U "celery[redis]"

    配置redis

    1
    2
    3
    4
    5
    6
    7
    # Configuration is easy, just configure the location of your Redis database:
    app.conf.broker_url = 'redis://localhost:6379/0'

    # Where the URL is in the format of:
    redis://:password@hostname:port/db_number

    # all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.

    如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪

    1
    2
    # If you also want to store the state and return values of tasks in Redis, you should configure these settings:
    app.conf.result_backend = 'redis://localhost:6379/0'

使用 Celery

创建一个celery application 用来定义你的任务列表,创建一个任务文件tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# tasks.py
from celery import Celery

app = Celery('tasks', # 随便
broker='redis://localhost', # 中间件
backend='redis://localhost') # 存储

# 弱如果redis 有密码,改成下面的方式,password前面有冒号
# redis://:password@127.0.0.1:6379/2

@app.task
def add(x,y):
print("running...",x,y)
return x+y

启动Celery Worker来开始监听并执行任务

1
$ celery -A tasks worker --loglevel=info

调用任务,再打开一个终端, 进行命令行模式,调用任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[root@localhost celerys]# python3
Python 3.5.2 (default, Jul 7 2017, 23:36:01)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add # import add
>>> add.delay(4,6) # 执行函数
<AsyncResult: 4b5a8ab6-693c-4ce5-b779-305cfcdf70cd> # 返回taskid

>>> result.ready() # 是否运行完成
False

>>> result = add.delay(4,6) # 执行函数
>>> result.get() # 同步获取结果,一直等待
10

>>> result.get(timeout=1) # 设置超时时间,过期错误异常
Traceback (most recent call last):
--strip--
celery.exceptions.TimeoutError: The operation timed out.

>>> result = add.delay(4,'a') # 执行错误命令
>>> result.get() # get后获取到错误信息,触发异常
Traceback (most recent call last):
--strip--
celery.backends.base.TypeError: unsupported operand type(s) for +: 'int' and 'str'

>>> result = add.delay(4,'a')
>>> result.get(propagate=False) # propagate=False 不触发异常,获取错误信息
TypeError("unsupported operand type(s) for +: 'int' and 'str'",)
>>> result.traceback # 获取具体错误信息 log打印用
'Traceback (most recent call last):\n File "/usr/local/python3.5/lib/python3.5/site-packages/celery/app/trace.py", line 367, in trace_task\n R = retval = fun(*args, **kwargs)\n File "/usr/local/python3.5/lib/python3.5/site-packages/celery/app/trace.py", line 622, in __protected_call__\n return self.run(*args, **kwargs)\n File "/data/celerys/tasks.py", line 12, in add\n return x+y\nTypeError: unsupported operand type(s) for +: \'int\' and \'str\'\n'

在项目中如何使用celery

可以把celery配置成一个应用,目录格式如下

1
2
3
proj/__init__.py
/celery.py
/tasks.py

proj/celery.py内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# proj/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)

if __name__ == '__main__':
app.start()

proj/tasks.py中的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# proj/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
return x + y


@app.task
def mul(x, y):
return x * y


@app.task
def xsum(numbers):
return sum(numbers)

启动worker

1
$ celery -A proj worker -l info

输出,像不像一个c

1
2
3
4
5
6
7
8
9
10
11
12
13
-------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x103a020f0
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery

后台启动worker

在生产环境中,您希望在后台运行worker,这在 daemonization tutorial 教程中有详细介绍

daemonization 脚本使用 Celery multi 命令在后台启动一个或多个worker

1
2
3
4
$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
> w1.halcyon.local: OK

重启

1
2
3
4
5
6
7
8
9
10
$ celery multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
> w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64052

停止

1
$ celery multi stop w1 -A proj -l info

stop 命令是异步的,所以它不会等待worker关闭。你可能会想使用 stopwait 命令来代替,这可以确保所有当前正在执行的任务在退出前完成:

1
$ celery multi stopwait w1 -A proj -l info

Celery 定时任务

celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行,这个定时任务模块叫celery beat

写一个脚本periodic_task.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# periodic_task.py
from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
# add_periodic_task 会添加一条定时任务
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

# 每 30s 执行一次 test('world')
sender.add_periodic_task(30.0, test.s('world'), expires=10)

# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)

@app.task
def test(arg):
print(arg)

上面是通过调用函数添加定时任务,也可以像写配置文件一样的形式添加,下面是每30s执行的任务

1
2
3
4
5
6
7
8
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'

任务添加好了,需要让celery单独启动一个进程来定时发起这些任务,注意,这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划,每发现有任务需要执行了,就发起一个任务调用消息,交给celery worker去执行

启动任务调度器 celery beat

1
$ celery -A periodic_task beat

输出:

1
2
3
4
5
6
7
8
9
10
celery beat v4.0.2 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2017-02-08 18:39:31
Configuration ->
. broker -> redis://localhost:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)

此时还差一步,就是还需要启动一个worker,负责执行celery beat发起的任务

启动celery worker来执行任务,此时观察worker的输出,是不是每隔一小会,就会执行一次定时任务呢!

beat需要将任务的最后运行时间存储在本地数据库文件中(默认情况下名为celerybeat schedule),因此它需要访问当前目录,或者您可以为该文件指定一个自定义位置:

1
$ celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule

更复杂的定时配置

上面的定时任务比较简单,只是每多少s执行一个任务,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?哈,其实也简单,用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间

linux crontab http://www.cnblogs.com/peida/archive/2013/01/08/2850483.html

1
2
3
4
5
6
7
8
9
10
from celery.schedules import crontab

app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': { # 给任务起个名字
'task': 'tasks.add', # 任务调用的函数
'schedule': crontab(hour=7, minute=30, day_of_week=1), # 定时任务
'args': (16, 16), # 任务调用的参数
},
}

上面的这条意思是每周1的早上7.30执行tasks.add任务。还有更多定时配置方式参考:https://blog.csdn.net/fenglepeng/article/details/104481808

上面能满足你绝大多数定时任务需求了,甚至还能根据潮起潮落来配置定时任务,具体看 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#solar-schedules

最佳实践之与django结合

django 可以轻松跟celery结合实现异步任务,只需简单配置即可。如果你有一个 Django 项目目录如下:

1
2
3
4
5
- proj/
- proj/__init__.py
- proj/settings.py
- proj/urls.py
- manage.py

那么推荐的方法是创建一个新的 proj/proj/Celery.py 模块,它定义了Celery实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# proj/proj/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

然后你需要在 proj/proj/__init__.py 模块中导入这个应用程序。这确保了应用程序在Django启动时被加载,以便 @shared_task 装饰器(后面会提到)使用它:

1
2
3
4
5
6
7
8
# proj/proj/__init__.py
from __future__ import absolute_import, unicode_literals # 绝对导入

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

请注意,这个示例项目布局适用于较大的项目,对于简单的项目,您可以使用一个单独的包含模块来定义应用程序和任务,如First Steps with celery教程。

让我们分析一下在第一个模块中发生了什么,首先我们从未来导入绝对导入,这样我们的celery.py模块就不会与库冲突:

1
from __future__ import absolute_import   # 绝对导入

然后我们为 Celery 命令行程序设置默认的DJANGO_SETTINGS_MODULE环境变量:

1
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')  # 设置环境

你不需要这一行,但它使你不必总是把设置模块传递给 Celery 程序。它必须总是在创建应用实例之前出现,正如我们接下来所做的:

1
app = Celery('proj')

这是我们的库实例。

我们还添加了Django的设置模块作为芹菜的配置源。这意味着你不必使用多个配置文件,而是直接从Django设置中配置 Celery;但如果需要,也可以将它们分开。

大写名称空间意味着所有芹菜配置选项必须用大写而不是小写指定,并且以CELERY_开头,因此例如 task_always_eager 设置变成 CELERY_TASK_ALWAYS_EAGERbroker_url设置变成 CELERY_BROKER_URL

你可以直接在这里传递对象,但是使用字符串更好,因为这样worker就不必序列化对象了。

1
app.config_from_object('django.conf:settings', namespace='CELERY')

接下来,可重用应用的一个常见做法是在一个单独的’ tasks.py ‘模块中定义所有的任务,而 Celery 确实有办法自动发现这些模块:

1
app.autodiscover_tasks()

celery 会自动发现目录下的所有task

1
2
3
4
5
6
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py

最后,debug_task 示例是一个转储自己请求信息的任务。这是使用在 celery3.1 中引入的新的 bind=True 任务选项来方便地引用当前任务实例。

然后在具体的app里的tasks.py里写你的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
return x + y

@shared_task
def mul(x, y):
return x * y

@shared_task
def xsum(numbers):
return sum(numbers)

在你的django views里调用celery task

1
2
3
4
5
6
7
8
9
10
11
12
13
from django.shortcuts import render,HttpResponse

# Create your views here.

from bernard import tasks

def task_test(request):

res = tasks.add.delay(228,24)
print("start running task")
print("async task res",res.get() )

return HttpResponse('res %s'%res.get())

django中使用计划任务功能

Django -celery-beat扩展,它将调度存储在Django数据库中,并提供了一个方便的管理界面,在运行时管理周期性任务。

安装和使用这个扩展:

1
$ pip install django-celery-beat

djanggo_celery_beat 模块添加到你的Django项目 settings.py 中的 INSTALLED_APPS 中:

1
2
3
4
5
6
INSTALLED_APPS = (
...,
'django_celery_beat',
)

# 注意,模块名中没有破折号,只有下划线

应用Django数据库迁移,以便创建必要的表:

1
$ python manage.py migrate

使用 django 调度程序启动 celery beat 服务:

1
$ celery -A proj beat -l info -S django

访问Django-Admin界面设置一些定期任务,在admin页面里,有3张表

配置完长这样

此时启动你的celery beat 和worker,会发现每隔2分钟,beat会发起一个任务消息让worker执行scp_task任务

注意,经测试,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到


Python 第三方模块之 Celery - 分布式任务队列
https://flepeng.github.io/021-Python-31-Python-第三方模块-Python-第三方模块之-Celery-分布式任务队列/
作者
Lepeng
发布于
2021年4月27日
许可协议