Python每日一库|Celery (二)
在我之前的文章中,我向你介绍了 Celery 并进行了一些实际操作。如果你还没有阅读我之前的帖子,请阅读。
在这篇文章中,我们将讨论我们Celery的使用场景跟Celery Workers、Pool 及其并发配置。
Celery使用的场景
1) 你想在后台运行一些任务。
2) 你想将一些任务(长时间运行或耗时或 CPU 消耗)下发到运行在不同机器上的 Celery Worker,以提高主应用程序服务器的响应。
3) 你想定期安排任务。
4) 你想并行执行任务或扩展你的软件或应用程序系统。
根据我的经验,当满足上述条件时,你可以在电子商务、医疗保健、银行等不同领域使用 Python Celery。
后台运行任务
假设你有一个电子商务应用程序。当用户下订单时,你的应用程序需要发送有关订单更新(例如下订单、付款、发货等)的短信和电子邮件。
假设你有一个 Django/Flask 应用程序,当用户订购时,你不应该以相同的顺序发送电子邮件/短信 django/flask http 请求。如果你这样做,它只会延迟用户的响应,因为电子邮件服务器/文本消息服务器可能不会快速响应。
在这里你需要创建/定义发送电子邮件/短信的任务。你相应地调用这些任务。Celery 工作人员将在后台处理这些任务,但对用户的响应会很快。
分布式任务
假设你有一个数据分析应用程序。当用户请求对数据进行一些复杂的数据分析计算时,它已经来到你的 Django/Flask 应用程序,但是你不想在你的 Django/Flask 应用程序运行的同一台机器上运行这个计算,因为计算需要很多时间并且还吃掉了这台机器的 CPU,导致 Django/Flask 应用程序响应不佳。你的 Django/Flask 应用程序将无法响应不同用户的进一步请求。那你在这里做什么?
很简单,你在 Python Celery 中创建/定义一个用于复杂数据分析计算的任务,并在不同机器上运行 celery worker,在不同机器上运行 Django/Flask 应用程序。
现在,当用户请求对数据进行一些复杂的数据分析计算时,它已经到达你的 Django/Flask 应用程序,但你调用了相应的 Celery 任务。由于 Celery worker 在不同的机器上运行,因此计算发生在不同的机器上。
定时任务
假设你有一个电子商务应用程序。你希望每天通过电子邮件向用户发送产品推荐。你可以创建/定义一个 Celery 任务并使用 Celery beat 安排它。
并行执行任务
假设你有 celery 任务,但你只有一名工人在执行所有任务。你的应用程序无法满足用户需求的要求。
你希望同时运行多个任务以提高应用程序的性能。
答案是增加多台机器上的 celery worker 数量。
Celery Workers、Pool 及其并发配置
在说这个Celery Workers跟Pool之前我们先看下Celery启动worker的命令。
celery -A tasks worker --pool=prefork --concurrency=1 --loglevel=info
- -A, -app <app> 这个命令选项创建了Celery对象。
- worker用于启动Celery worker
- --loglevel用于记录日志可以使用DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL
- --pool=prefork,这个选项是设置Celery 运行任务不同的方式,可以选择prefork|eventlet|gevent|solo
- --concurrency=1,这个选项是worker并发运行的任务数量
通过上面的说明,Celery Pool应该是重点,让我们继续深入去了解pool运行任务方式有什么不一样。
Celery 提供了使用“pool”选项运行任务的不同方式。我们必须根据我们正在执行的任务类型(CPU 绑定或 I/O 或网络绑定)选择“pool”。Celery提供了5种不同方式的Pool,我将一个个介绍。
- Solo
Solo 只创建一个线程并使用该线程运行 celery 任务。并且无法提供并发数。
celery -A tasks worker --pool=solo --loglevel=info
例如; 你已经定义了一个任务,该任务下载任务中指定的电影。
现在你要下载10部电影,你提交了10个任务。因为我们有一个Work在跑。Worker从Queue中取出任务,开始在线程中运行。因为我们这里只有一个线程。在现有任务完成之前,它不能选择另一个任务。
假设下载一部电影的平均时间为一小时。现在这个 Worker 需要 10 个小时来完成所有 10 个任务,即下载所有 10 部电影。所以 pool solo 不适合这类任务。
- prefork
prefork 使用多重处理,可以提供并发选项。(建议提供运行Celery Worker的机器的CPU数量),prefork 是 CPU 绑定的最佳池选项。
celery -A tasks worker --pool=prefork --concurrency=4 --loglevel=info
例如; 我们有 Celery Worker 在 4 个 CPU 机器上运行。你已经定义了一项任务,该任务执行一些复杂的数学计算。
现在你想在 10 个不同的数据集上运行这个计算,你已经提交了 10 个任务。因为我们有一个Work在跑。Worker 从 Queue 中取出任务并开始在这个进程中运行。由于我们这里有 4 个进程,它可以同时运行 4 个任务。
- eventlet
eventlet 和 gevent 是 I/O 和网络的最佳池选项。
注意:使用eventlet跟gevent需要额外安装相对应的包。
pip install eventlet/gevent
Eventlet 是 Python 的并发网络库,允许你更改运行代码的方式,而不是编写代码的方式。
- 它使用 epoll 或 kqueue 或 libevent 来实现高度可扩展的非阻塞 I/O。
- 协程确保开发人员使用类似于线程的阻塞式编程,但提供非阻塞 I/O 的优势。
- 事件分派是隐式的,这意味着你可以轻松地从 Python 解释器中使用 Eventlet,或者将其作为大型应用程序的一小部分。
eventlet 不会使用并发选项创建多个线程。相反,它所做的是仅创建一个线程并使用称为事件循环的概念处理一个线程的并发。
celery -A tasks worker --pool=eventlet --concurrency=10 --loglevel=info
例如; 你已经定义了一个任务,该任务下载任务中指定的电影。
现在你要下载10部电影,你提交了10个任务。因为我们有一个Work在跑。Worker从Queue中取出任务,开始在线程中运行。由于我们的 eventlet 以 10 并发运行,因此所有任务都开始下载相应的电影。
根据网络速度,所有 10 个文件的下载速度都非常快。
- gevent
gevent 是一个基于协程的Python网络库,它使用greenlet在libev或libuv事件循环之上提供高级同步 API 。
功能包括:
- 基于libev或libuv的快速事件循环。
- 基于 greenlet 的轻量级执行单元。
- 重新使用 Python 标准库中的概念的 API(例如,有事件和队列)。
- 支持 SSL 的协作套接字
- 通过线程池、dnspython 或 c-ares 执行的合作 DNS 查询。
- 猴子补丁实用程序使第 3 方模块变得合作
- TCP/UDP/HTTP 服务器
- 子流程支持(通过gevent.subprocess)
- 线程池
gevent受到 eventlet 的启发,但具有更一致的 API、更简单的实现和更好的性能。
gevent 跟Eventlet一样不会使用并发选项创建多个线程。相反,它所做的是仅创建一个线程并使用称为事件循环的概念处理一个线程的并发
celery -A tasks worker --pool=gevent --concurrency=10 --loglevel=info
例如; 你已经定义了一个任务,该任务下载任务中指定的电影。
现在你要下载10部电影,你提交了10个任务。因为我们有一个Work在跑。Worker从Queue中取出任务,开始在线程中运行。由于我们的 eventlet 以 10 并发运行,因此所有任务都开始下载相应的电影。
根据网络速度,所有 10 个文件的下载速度都非常快。
- threads
新加入的模式,高并发的情况性能不如协程。
- processes
兼容别名跟prefork一样
如果你发现我的任何文章对你有帮助或者有用,麻烦点赞或者转发。 谢谢!