Python 第三方模块之 Gevent - 协程

官网:https://www.gevent.org/

1、gevent 简介

背景介绍

在现代的软件开发中,异步编程模式因其在处理 I/O 密集型任务时的高效率而越来越受到重视。Python 作为一种动态、解释型的高级编程语言,其原生的异步编程支持相对较弱。

然而,gevent 库的出现,为 Python 带来了一种全新的异步编程方式。

gevent 是一个基于协程的并发库,它基于 greenlet ,并基于 libev(早期是libevent) 实现快速事件循环(Linux 上是 epoll,FreeBSD 上是 kqueue,Mac OS X 上是 select)。它提供了一种高级别的 API,允许开发者以同步的方式编写异步代码,从而简化了异步编程的复杂性。

通过 gevent,开发者可以轻松地实现并发的网络请求、文件操作等 I/O 密集型任务,而无需深入理解底层的异步编程细节。

gevent 实现了 Python 标准库里面大部分的阻塞式系统调用,包括 socket、ssl、threading 和 select 等模块,简单的使用 “猴子补丁” 将这些阻塞式调用变为协作式运行。

gevent 原理

gevent 的实现原理如下:

  1. gevent 使用 greenlet 来管理线程的执行。
  2. gevent 使用非阻塞的 IO 操作来模拟阻塞的 IO 操作,如使用 gevent.monkey.patch_all() 来替换标准库中的阻塞IO操作。
  3. gevent 提供了 Greenlet 的上下文管理器,如 greenlet.greenlet()greenlet.spawn(),来创建和管理协程。

其基本思想是:

当一个 greenlet 遇到 IO 操作时,比如访问网络,就自动切换到其他的 greenlet,等到 IO 操作完成,再在适当的时候切换回来继续执行。由于 IO 操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

Gevent 主要特性有以下几点:

  1. 基于 libevlibuv 的快速事件循环。(在1.0.x 之前更早期的版本里,gevent 使用 libevent 而不是 libev)
  2. 基于 greenlets 的轻量级执行单元。
  3. API 复用 Python 标准库中的概念(例如,有事件和队列)。
  4. TCP / UDP / HTTP 服务
  5. 支持 SSL 的协作套接字
  6. 通过 threadpool、dnspython 或 c-ares 执行的协作 DNS 查询。
  7. 通过 “猴子修补” 很容易使第三方模块变成协程
  8. 子进程 支持。(通过 gevent.subprocess)
  9. Thread pools 线程池
  10. gevent 的代码风格和线程非常相似,运行出来后的效果也非常相似。

gevent 的应用场景

gevent 在 Python 中有广泛的应用场景,以下是一些常见的应用场景:

  1. 网络编程:使用gevent实现异步的网络请求和响应,提高网络应用程序的响应性和性能。
  2. Web开发:使用gevent实现异步的Web服务器和客户端,提高Web应用程序的处理能力和并发性能。
  3. 数据库操作:使用gevent实现异步的数据库操作,提高数据库应用程序的并发性能和响应性。

gevent 的最佳实践

在使用gevent时,以下是一些最佳实践:

  1. 避免在协程中使用全局变量和可变数据类型,以减少同步的需求。
  2. 确保协程中的yield表达式正确使用,以避免竞态条件。
  3. 在协程中使用上下文管理器,如asyncio库,以实现异步编程。

安装 gevent

1
pip install gevent

gevent 库函数使用示例

  • gevent.sleep() 模拟异步的延时操作。
  • gevent.spawn() 创建一个新的协程
  • gevent.joinall() 等待多个协程完成
  • gevent.getcurrent() 获取当前的协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import gevent

def test1():
print(12)
gevent.sleep(0)
print(34)

def test2():
print(56)
gevent.sleep(0)
print(78)


greenlet_1 = gevent.spawn(test1) # 创建一个新的greenlet协程对象,并运行它。
greenlet_2 = gevent.spawn(test2)
gevent.joinall([ # 等待所有传入的greenlet协程运行结束后再退出,这个方法可以接受一个timeout参数来设置超时时间,单位是秒
greenlet_1,
greenlet_2
])
current = gevent.getcurrent()
print(f'Current greenlet: {current}')

gevent.Greenlet 直接使用 Greenlet 类创建协程。

1
2
3
4
5
6
7
8
9
10
11
class MyTask:
def __init__(self, name):
self.name = name

def __call__(self):
print(f'Task {self.name} started')
gevent.sleep(1)
print(f'Task {self.name} finished')

task = gevent.Greenlet(MyTask, 'B')
task.start()

事件驱动 (事件分发)

Linux 的 epoll 机制

epoll 是 Linux 内核为处理大批量文件描述符而作了改进的 poll,是 Linux 下 “多路复用IO“ select/poll 的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。epoll 的优点:支持一个进程打开大数目的 socket 描述符。select 的一个进程所打开的 FD 由FD_SETSIZE 的设置来限定,而 epoll 没有这个限制,它所支持的 FD 上限是最大可打开文件的数目,远大于2048,而且IO 效率不随 FD 数目增加而线性下降:由于 epoll 只会对 “活跃” 的 socket 进行操作,于是,只有 “活跃” 的 socket 才会主动去调用 callback 函数,其他 idle 状态的 socket 则不会。epoll 使用 mmap 加速内核与用户空间的消息传递。epoll 是通过内核于用户空间 mmap 同一块内存实现的。

事件驱动(事件分发)库

libevent、libev、libuv 都是开源的 事件驱动(事件分发)库,用来处理网络和IO事件的异步编程。原理:当文件描述符事件发生时,调用回调函数的机制。

libevent、libev和 libuv 都是用于异步IO编程的开源库。有以下几点区别:

  • 底层实现:libevent 和 libev 都是使用底层系统调用来实现事件驱动,如select、poll、epoll等。而libuv则是使用自己实现的事件驱动机制。
  • 跨平台支持:libevent、libev和libuv都具有跨平台的能力,但它们的实现方式不同。libevent和libev通过提供多种事件模型来适应不同操作系统的特性;而libuv则通过自己实现的事件驱动机制来屏蔽了操作系统的差异。
  • 功能丰富程度:libevent和libev主要用于事件驱动编程,提供了事件通知、回调和监听等基础功能;而libuv则提供了更多的功能,如异步文件IO、网络套接字、DNS解析、进程管理等。
  • 适用范围:libevent、libev和libuv都适用于异步编程,但在不同场景下可能会有不同的选择。libevent和libev适用于需要高效处理大量并发连接的场景,如服务器开发;而libuv则适用于开发面向网络和文件的异步应用。

gevent 和其他协程库区别

  • greenlet:greenlet 是 Python 中的一个轻量级协程库,基于Python中的yield关键字实现。Greenlet 没有自己的调度过程,需要手动编写协程调度逻辑。所以一般不会直接使用。

  • eventlet:基于 greenlet 的一个高级协程库,Eventlet 实现了自己调度器称为 Hub,在 Hub 中有一个 event loop,根据不同的事件来切换到对应的 GreenThread 中。同时 Eventlet 还实现了一系列的补丁来使 Python 标准库中的 socket 等等 module 来支持 GreenThread 的切换。Eventlet 的 Hub 可以被定制来实现自己调度过程。

  • Gevent:基于 libev 与 greenlet 实现。gevent 使用 libev 来实现一个高效的 event loop 调度循环。同时类似于 Event,Gevent 也有自己的 monkey_patch,在打了补丁后,完全可以使用 python 线程的方式来无感知的使用协程。

gevent 特点总结:事件驱动 + 协程 + 非阻塞IO

  • 事件驱动指的是 libvent 对 epool 的封装,是基于事件的方式处理 IO。
  • 协程指的是 greenlet
  • 非阻塞 IO 指的是 gevent 已经 patch 过的各种库,例如 socket 和 select 等等。

猴子补丁(Monkey Patch)

官网文档:https://www.gevent.org/intro.html#monkey-patching

猴子补丁的由来

猴子补丁的这个叫法起源于 Zope 框架,大家在修正 Zope 的 Bug 的时候经常在程序后面追加更新部分,这些被称作是 “杂牌军补丁(guerillapatch)”,后来 guerilla 就渐渐的写成了 gorllia(猩猩),再后来就写了 monkey(猴子),所以猴子补丁的叫法是这么莫名其妙的得来的。 **后来在动态语言中,不改变源代码而对功能进行追加和变更,统称为 “猴子补丁”**。所以猴子补丁并不是 Python 中专有的。猴子补丁这种东西充分利用了动态语言的灵活性,可以对现有的语言Api 进行追加,替换,修改 Bug,甚至性能优化等等。 gevent 通过猴子补丁的方式能够修改标准库里面大部分的阻塞式系统调用,包括 socket、ssl、threading 和 select 等模块,而变为协作式运行。

猴子补丁使用时的注意事项

猴子补丁的功能很强大,但是也带来了很多的风险,尤其是像 gevent 这种直接进行 API 替换的补丁,整个 Python 进程所使用的模块都会被替换,可能自己的代码能 hold 住,但是其它第三方库,有时候问题并不好排查,即使排查出来也是很棘手,所以,就像松本建议的那样,**如果要使用猴子补丁,那么只是做功能追加,尽量避免大规模的 API 覆盖。 虽然猴子补丁仍然是邪恶的(evil),但在这种情况下它是 “有用的邪恶(useful evil)”**。

使用 gevent 注意事项

  1. gevent.spawn 启动的所有协程,都是运行在同一个线程中,所以协程不能跨线程同步数据。
  2. gevent.queue.Queue 是协程安全的。
  3. gevent 启动的并发协程,具体到 task function,不能有长时间阻塞的IO操作。因为 gevent 的协程的特点是,当前协程阻塞了才会切换到别的协程。如果当前协程长时间阻塞,则不能显示(gevent.sleep(0),或隐式,由gevent来做)切换到别的协程。导致程序出问题。
  4. 如果有长时间阻塞的 IO 操作,还是用传统的线程模型比较好。
  5. 使用 gevent 的协程,最好使用 gevent 自身的非阻塞库。如 httplib, socket, select 等等。

使用 gevent 示例

使用方法:**程序的重要部分是将任务函数封装到 gevent.spawn()**。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent
from gevent import socket

urls = ['www.baidu.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)
result = [job.value for job in jobs]
print(result)

"""
解释:
通过 gevent.spawn() 方法创建 job
通过 gevent.joinall 将 jobs 加入到 微线程 执行队列中等待其完成,设置超时为 2 秒。
执行后的结果通过检查 gevent.Greenlet.value 值来收集。
gevent.socket.gethostbyname()是非阻塞的,同时与标准的socket.gethotbyname()有相同的接口。
"""

方法 1:继承 Gevent 的 Greenlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import gevent
from gevent import monkey
from gevent import Greenlet


monkey.patch_all()


class Task(Greenlet):
def __init__(self, name):
Greenlet.__init__(self)
self.name = name

def _run(self):
print("Task %s: some task..." % self.name)


t1 = Task("task1")
t2 = Task("task2")
t1.start()
t2.start()
# here we are waiting all tasks
gevent.joinall([t1, t2])

方法 2:直接使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import gevent
from gevent import monkey;monkey.patch_all()
import random
import requests
from pprint import pprint


def func_1():
print("func_1 结束")


def func_2():
print("func_2 结束")


def get_url(url):
print(f"开始 ---> {url}")
res = requests.get(url)
gevent.sleep(random.randint(1, 3))
print(f"[{res.status_code}][{url}] ---> {len(res.text)}")


def main():
url_list = [
'https://httpbin.org/',
'https://www.baidu.com',
'https://www.cnblogs.com'
]
greenlet_list = []
for index in url_list:
greenlet_list.append(gevent.spawn(get_url, index))
greenlet_list += [gevent.spawn(func_1), gevent.spawn(func_2)]
# timeout 设置最大等待时间,如果不设置就一直等待
gevent.joinall(greenlet_list, timeout=None)
# gevent.spawn(lambda: 1 / 0).join()
print("main 结束")


if __name__ == '__main__':
main()

示例:抓取豆瓣

利用 gevent 并发 抓取豆瓣

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from gevent import monkey
monkey.patch_all()
import gevent
import requests


class Douban(object):

def __init__(self):
self.host = 'movie.douban.com'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) Gecko/20100101 Firefox/61.0',
'Referer': 'https://movie.douban.com/',
}

def test_search_tags_movie(self):
method = 'search_tags'
url = f"https://{self.host}/j/{method}"
post_data = {
'type': 'movie',
'source': 'index'
}
resp = requests.post(url=url, data=post_data, headers=self.headers)
ret_val = f"{url} ---> {resp.status_code}"
return ret_val


if __name__ == '__main__':
douban = Douban()
job_list = []
for index in range(6):
job_obj = gevent.spawn(douban.test_search_tags_movie)
job_list.append(job_obj)
gevent.joinall(job_list)
result_list = [job_obj.value for job_obj in job_list]
list(map(lambda item=None: print(item), result_list))
pass

示例:生产者、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from gevent import monkey; monkey.patch_all()
import gevent
from gevent.queue import Queue # 队列 gevent中的队列
import random

task_queue = Queue(3)


def producer(index=1):
while True:
print(f'生产者 [{index}]', end='')
item = random.randint(0, 99)
task_queue.put(item)
print(f"生产 ---> {item}")


def consumer(index=1):
while True:
print(f'消费者 [{index}]', end='')
item = task_queue.get()
print(f"消费 ---> {item}")


def main():
job_1 = gevent.spawn(producer)
job_2 = gevent.spawn(consumer)
job_3 = gevent.spawn(consumer, 2)
job_list = [job_1, job_2, job_3]
gevent.joinall(job_list)


if __name__ == '__main__':
main()
pass

示例:后门程序

gevent.backdoor:https://www.gevent.org/api/gevent.backdoor.html

此后门不提供身份验证,也不尝试限制远程用户可以执行的操作。任何可以访问服务器的人都可以执行正在运行的 python 进程可以执行的任何操作。因此,虽然您可以绑定到任何接口,但出于安全考虑,建议您绑定到只能由本地计算机访问的接口,例如 127.0.0.1/localhost。

基本用法:

1
2
3
4
5
6
7
8
from gevent.backdoor import BackdoorServer

server = BackdoorServer(
('127.0.0.1', 5001),
banner="Hello from gevent backdoor!",
locals={'foo': "From defined scope!"}
)
server.serve_forever()
  • locals – 如果给定,则为将在顶层提供的“内置”值字典。
  • banner – 如果为 geven,则为将打印给每个连接用户的字符串。

在另一个终端中,连接:telnet 127.0.0.1 5001 连接成功后,会进入一个交互式 Python shell

2、gevent 指南

gevent是一个基于libev 的并发库。它为各种并发和网络相关的任务提供了整洁的API。

1
2
from gevent import monkey
monkey.patch_all()

这样两行,就可以使用 python 以前的 socket 之类的,因为 gevent 已经给你自动转化了。
而且安装 gevent 也是很方便,首先安装依赖 libevent 和 greenlet,再利用 pypi 安装即可

安装 libevent:sudo apt-get install libevent-dev
安装 python-dev:sudo apt-get install python-dev
安装 gevent:sudo pip install gevent
安装 greenlet:sudo pip install greenlet

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from gevent import monkey; monkey.patch_socket()
import gevent


def f(n):
for i in range(n):
print(gevent.getcurrent(), i)
gevent.sleep(0)


g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()

3 个 greenlet 交替运行,把循环次数改为 500000,运行时间长一点,然后在操作系统的进程管理器中看,线程数只有1个。 gevent.sleep() 作用是交出控制权

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import gevent
from gevent import monkey


# 切换是在 IO 操作时自动完成,所以gevent需要修改Python自带的一些标准库
# 这一过程在启动时通过monkey patch完成
monkey.patch_all()


def func_a():
count = 10
while count > 0:
print(f"func_a ---> {count}")
# 用来模拟一个耗时操作,注意不是time模块中的sleep
# 每当碰到耗时操作,会自动跳转至其他协程
count -= 1
gevent.sleep(1)


def func_b():
count = 10
while count > 0:
print(f"func_b ---> {count}")
count -= 1
gevent.sleep(0.5)


# gevent.joinall([gevent.spawn(fn)
g1 = gevent.spawn(func_a) # 创建一个协程
g2 = gevent.spawn(func_b)
g1.join() # 等待协程执行结束
g2.join()

select() 函数通常是对各种文件描述符进行轮询的阻塞调用。

1
2
3
from gevent import select
...
select.select([], [], [], 2)

gevent 池

示例代码,测试 gevent 的 任务池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from gevent import pool

gevent_pool = pool.Pool()


def func_1():
for index in range(100):
gevent_pool.spawn(func_2, index)


def func_2(arg=None):
print(f'func_2 ---> {arg}')


gevent_pool.spawn(func_1)
gevent_pool.join()

示例代码。程序及注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# -*- coding: utf-8 -*-

import time
import gevent
from gevent import event # 调用 gevent 的 event 子模块


# 三个进程需要定义三个事件 event1,event2,event3,来进行12,23,31循环机制,即进程一,进程二,进程三顺序执行

def fun1(num, event1, event2): # 固定格式
i = 0
while i < 10: # 设置循环10次
i += 1
time.sleep(1) # 睡眠1秒
print('进程一:111111111')
event2.set() # 将event2值设为True
event1.clear() # 将event1值设为False
event1.wait() # event1等待,其值为True时才执行


def fun2(num, event2, event3):
i = 0
while i < 10:
i += 1
time.sleep(1)
print('进程二:222222222')
event3.set() # 将event3值设为True
event2.clear() # 将event2值设为False
event2.wait() # event2等待,其值为True时才执行


def fun3(num, event3, event1):
i = 0
while i < 10:
i += 1
time.sleep(1)
print('进程三:333333333')
event1.set()
event3.clear()
event3.wait()


if __name__ == "__main__": # 执行调用格式
act1 = gevent.event.Event() # 调用event中的Event类,用act1表示
act2 = gevent.event.Event()
act3 = gevent.event.Event()

# 三个进程,act1,act2,act3
gevent_list = [] # 建立一个数列,用来存和管理进程

# 调用gevent中的Greenlet子模块,用Greenlet创建进程一
g = gevent.Greenlet(fun1, 1, act1, act2)
g.start()
gevent_list.append(g) # 将进程一加入到Gevents数列
print('进程一启动:')

g = gevent.Greenlet(fun2, 2, act2, act3)
g.start()
gevent_list.append(g)
print('进程二启动:')

g = gevent.Greenlet(fun3, 3, act3, act1)
g.start()
gevent_list.append(g)
print('进程三启动:')
print('所有进程都已启动!')

# 调用Greenlet中的joinall函数,将Gevents的进程收集排列
gevent.joinall(gevent_list)

3、gevent API 参考

更多可以查阅:Module Listing.

高级概念

网络接口

同步原语(锁、队列、事件)

低级接口详细信息

模块 列表

Reference


Python 第三方模块之 Gevent - 协程
https://flepeng.github.io/021-Python-31-Python-第三方模块-01-进程、线程、协程-相关-Python-第三方模块之-Gevent-协程/
作者
Lepeng
发布于
2021年4月27日
许可协议