Python 第三方模块之 Gevent - 协程(官方)

核心

Greenlets

gevent 中使用的主要模式是 Greenlet,这是作为 C 扩展模块提供给 Python 的一个轻量级协同程序。所有 greenlet 都在主程序的 OS 进程中运行,但它们是协同调度的。

在任何给定的时间内,只有一个 greenlet 在运行。

这不同于由 multiprocessing 或 threading 库提供的并行结构,它们这些库可以自旋进程和 POSIX 线程,由操作系统调度,并且真正并行的。

异步和同步执行

并发性的核心思想是,可以将较大的任务分解为多个子任务的集合,这些子任务计划同时或异步运行,而不是一次或同步运行。两个子任务之间的切换称为上下文切换。

gevent 中的上下文切换是通过 yielding 来完成的。在本例中,我们有两个上下文,它们通过调用 gevent.sleep(0) 相互让步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent

def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')

def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')

gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
1
2
3
4
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

当我们将 gevent 用于网络和 IO 绑定函数时,它的真正威力就来了,这些函数可以协同调度。Gevent 负责处理所有细节,以确保网络库尽可能隐式地让步出它们的 greenlet 上下文。我再怎么强调这是一个多么有力的成语也不为过。但也许可以举个例子来说明。

在这种情况下,select() 函数通常是一个阻塞调用,它轮询各种文件描述符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: %s' % tic())
select.select([], [], [], 2)
# 可以理解成一个 IO 阻塞的操作,阻塞了2秒,这时 Greenlet 会自动切换到 gr2() 上下文执行
print('Ended Polling: %s' % tic())

def gr2():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: %s' % tic())
select.select([], [], [], 2)
print('Ended Polling: %s' % tic())

def gr3():
print("Hey lets do some stuff while the greenlets poll, %s" % tic())
gevent.sleep(1)
# 让当前 Greenlet 休眠1秒,上面 gr1() gr2() 阻塞操作完成后,再切换到 gr1() gr2() 的上下文执行

gevent.joinall([
gevent.spawn(gr1),
gevent.spawn(gr2),
gevent.spawn(gr3),
])
1
2
3
4
5
Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds

另一个比较综合的例子定义了一个不确定的任务函数 (它的输出不能保证对相同的输入给出相同的结果) 。在这种情况下,运行该函数的副作用是任务暂停执行的时间是随机的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import gevent
import random

def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(random.randint(0,2)*0.001)
print('Task %s done' % pid)

def synchronous():
for i in range(1,10):
task(i)

def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]
gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 1 done
Task 5 done
Task 6 done
Task 2 done
Task 4 done
Task 7 done
Task 8 done
Task 9 done
Task 0 done
Task 3 done

在同步情况下,所有任务都是按顺序运行的,这会导致在执行每个任务时主程序阻塞(即暂停主程序的执行)。

程序的重要部分是 gevent.spawn,它将给定的函数封装在一个 Greenlet 线程中。初始化的 greenlet 列表存储在传递给 gevent 的数组线程中。gevent.joinall 函数,它会阻塞当前程序,来运行所有给定的 greenlet。只有当所有 greenlet 终止时,执行才会继续进行。

需要注意的是,异步情况下的执行顺序本质上是随机的,异步情况下的总执行时间比同步情况下少得多。实际上,同步的例子完成的最大时间是每个任务停顿0.002秒,导致整个队列停顿0.02秒。在异步情况下,最大运行时间大约为0.002秒,因为没有一个任务会阻塞其他任务的执行。

在更常见的用例中,异步地从服务器获取数据,fetch() 的运行时在请求之间会有所不同,这取决于请求时远程服务器上的负载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import gevent.monkey
gevent.monkey.patch_socket()
# 把内置的阻塞的 socket替换成非阻塞的socket

import gevent
import urllib2
import simplejson as json

def fetch(pid):
response = urllib2.urlopen('http://json-time.appspot.com/time.json')
result = response.read()
json_result = json.loads(result)
datetime = json_result['datetime']

print('Process %s: %s' % (pid, datetime))
return json_result['datetime']

def synchronous():
for i in range(1,10):
fetch(i)

def asynchronous():
threads = []
for i in range(1,10):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

确定性

如前所述,greenlet 是确定的。给定相同的 greenlet 配置和相同的输入集,它们总是产生相同的输出。例如,让我们将一个任务分散到一个多进程(multiprocessing)池中,并将其结果与一个 gevent 池的结果进行比较。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time

def echo(i):
time.sleep(0.001)
return i

# Non Deterministic Process Pool

from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))] # imap_unordered 返回一个顺序随机的 iterable 对象
run2 = [a for a in p.imap_unordered(echo, range(10))]
run3 = [a for a in p.imap_unordered(echo, range(10))]
run4 = [a for a in p.imap_unordered(echo, range(10))]

print(run1 == run2 == run3 == run4)

# Deterministic Gevent Pool

from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
run2 = [a for a in p.imap_unordered(echo, range(10))] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
run3 = [a for a in p.imap_unordered(echo, range(10))] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
run4 = [a for a in p.imap_unordered(echo, range(10))] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

print(run1 == run2 == run3 == run4)
1
2
False
True

尽管 gevent 通常是确定的,但当您开始与外部服务(如 socket 和文件)进行交互时,非确定性的来源可能会潜入您的程序。因此,即使 green 线程是确定性并发的一种形式,它们仍然会遇到POSIX线程和进程遇到的一些相同的问题。

与并发有关的长期问题称为竞争条件。简而言之,当两个并发线程/进程依赖于某些共享资源但还试图修改该值时,就会发生竞争状态。这将导致资源的值变得依赖于执行顺序。这是一个问题,一般来说,应该尽量避免竞态条件,因为它们会导致全局的不确定程序行为。

最好的方法是在任何时候都避免所有全局状态。全局状态和导入时间的副作用总是会回来咬你一口

生成 Greenlets

gevent 提供了一些关于 Greenlet 初始化的包装器。一些最常见的模式是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import gevent
from gevent import Greenlet

def foo(message, n):
"""
Each thread will be passed the message, and n arguments
in its initialization.
"""
gevent.sleep(n)
print(message)

# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)

# Wrapper for creating and running a new Greenlet from the named
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)

# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

# Block until all threads complete.
gevent.joinall(threads)
1
2
Hello
I live!

除了使用基本的Greenlet类,您还可以子类化 Greenlet 类并覆盖 _run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

def __init__(self, message, n):
Greenlet.__init__(self)
self.message = message
self.n = n

def _run(self):
print(self.message)
gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)
g.start()
g.join()
1
Hi there!

Greenlets 状态

与代码的其他部分一样,greenlet 可能以各种方式失败。greenlet 可能无法抛出异常、无法停止或消耗太多系统资源。

greenlet 的内部状态通常是一个与时间相关的参数。在 greenlets 上有许多标志,它们允许您监视线程的状态:

  • started – 布尔值,指示Greenlet是否已启动
  • ready() – 布尔值,指示Greenlet是否已停止
  • successful() – 布尔值,指示Greenlet是否已停止且没有抛出异常
  • value – Greenlet返回的值
  • exception – 异常,在greenlet中抛出的未捕获异常实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import gevent

def win():
return 'You win!'

def fail():
raise Exception('You fail at failing.')

winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started) # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
gevent.joinall([winner, loser])
except Exception as e:
print('This will never be reached')

print(winner.value) # 'You win!'
print(loser.value) # None

print(winner.ready()) # True
print(loser.ready()) # True

print(winner.successful()) # True
print(loser.successful()) # False

# The exception raised in fail, will not propagate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()
1
2
3
4
5
6
7
8
9
True
True
You win!
None
True
True
True
False
You fail at failing.

程序关闭

当主程序接收到 SIGQUIT 时,无法生成(yield)的 greenlet 可能会使程序的执行时间比预期的更长。这将导致所谓的“僵尸进程”,需要从 Python 解释器外部杀死这些进程。

一种常见的模式是监听主程序上的 SIGQUIT 事件并在退出前调用 gevent.shutdown

1
2
3
4
5
6
7
8
9
10
import gevent
import signal

def run_forever():
gevent.sleep(1000)

if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.kill)
thread = gevent.spawn(run_forever)
thread.join()

超时

超时是对代码块或 Greenlet 的运行时的约束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
gevent.sleep(10)

try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')

在 with 语句中,它们还可以与上下文管理器一起使用。

1
2
3
4
5
6
7
8
9
10
import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
pass

with Timeout(time_to_wait, TooLong):
gevent.sleep(10)

此外,gevent 还为各种 Greenlet 和数据结构相关的调用提供超时参数。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import gevent
from gevent import Timeout

def wait():
gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
thread1.join(timeout=timer)
except Timeout:
print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
thread2.get(timeout=timer)
except Timeout:
print('Thread 2 timed out')

# --

try:
gevent.with_timeout(1, wait)
except Timeout:
print('Thread 3 timed out')
1
2
3
Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

猴子补丁

我们来到了 Gevent 的黑暗角落。到目前为止,我一直避免提到 monkey patching,以尝试和激发强大的协同模式,但是现在是讨论 monkey patching 的黑魔法的时候了。
如果您注意到上面我们调用了命令 monkey.patch_socket(),这是一个纯粹用于修改标准库套接字库(socket)的副作用命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket
print(socket.socket)

print("After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)

import select
print(select.select)
monkey.patch_select()
print("After monkey patch")
print(select.select)
1
2
3
4
5
6
7
class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'

built-in function select
After monkey patch
function select at 0x1924de8

Python 允许在运行时修改大多数对象,包括模块、类甚至函数。这通常是一个令人震惊的坏主意,因为它创建了一个“隐式副作用”,如果出现问题,通常非常难以调试,然而在极端情况下,库需要改变 Python 本身的基本行为,可以使用 monkey 补丁。在这种情况下,gevent 能够修补标准库中的大多数阻塞系统调用,包括 socket、ssl、threading 和 select 模块中的调用。

例如,Redis-python 的绑定通常使用常规 tcp socket 与 Redis-server 实例通信。只需调用 gevent.monkey.patch_all(),我们可以让 Redis 绑定协同调度请求,并与 gevent 堆栈的其他部分一起工作。

这让我们可以在不编写任何代码的情况下集成通常无法与 gevent 一起工作的库。(尽管猴子补丁仍然是邪恶的,但在这种情况下,它是“有用的邪恶”。)

数据结构

Events 事件

事件是 greenlet 之间异步通信的一种形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import gevent
from gevent.event import Event

'''
Illustrates the use of events
'''


evt = Event()

def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print("Ok, I'm done")
evt.set() # 运行到evt.set()会将flag设置为True,然后另外两个被阻塞的waitter的evt.wait()方法在看到flag已经为True之后不再继续阻塞运行并且结束。


def waiter():
'''After 3 seconds the get call will unblock'''
print("I'll wait for you")
evt.wait() # blocking
print("It's about time")


def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])

if __name__ == '__main__':
main()

事件对象的扩展是 AsyncResult,它允许您在唤醒调用时发送一个值。这有时被称为 future 或 deferred,因为它有对 future 值的引用,可以在任意的时间设置该值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import gevent
from gevent.event import AsyncResult

a = AsyncResult()

def setter():
"""
After 3 seconds set the result of a.
"""
gevent.sleep(3)
a.set('Hello!')

def waiter():
"""
After 3 seconds the get call will unblock after the setter
puts a value into the AsyncResult.
"""
print(a.get())

gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])

Queues 队列

队列是按顺序排列的数据集,它们具有通常的 put / get 操作,但可以在 Greenlets 上安全操作的方式编写。

例如,如果一个 Greenlet 从队列中获取一个项目(item),则同一项目(item)不会被同时执行的另一个 Greenlet 获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n):
while not tasks.empty():
task = tasks.get()
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)

print('Quitting time!')

def boss():
for i in range(1,25):
tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker john got task 5
Worker nancy got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker john got task 11
Worker nancy got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker john got task 17
Worker nancy got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker john got task 23
Worker nancy got task 24
Quitting time!
Quitting time!
Quitting time!

根据需要,队列还可以在 put 或 get 上阻塞。

每个 put 和 get 操作都有一个非阻塞的对应操作,put_nowait 和 get_nowait 不会阻塞。如果操作是不可能的,会引发 gevent.queue.Empty 或 gevent.queue.Full

在这个例子中,我们让 boss 同时和 workers 运行,并且对队列有一个限制,防止它包含三个以上的元素。这个限制意味着put操作将阻塞,直到队列上有空间为止。相反,如果队列上没有要获取的元素,get 操作就会阻塞,它还会接受一个超时参数,如果在超时的时间范围内找不到工作(work),则允许队列以异常 gevent.queue.Empty 退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)

def worker(name):
try:
while True:
task = tasks.get(timeout=1) # decrements queue size by 1
print('Worker %s got task %s' % (name, task))
gevent.sleep(0)
except Empty:
print('Quitting time!')

def boss():
"""
Boss will wait to hand out work until a individual worker is
free since the maxsize of the task queue is 3.
"""

for i in range(1,10):
tasks.put(i) # 输入1,2,3,到4时,队列达到最大值,put方法阻塞,gevent 切换到下一个协程worker(steve)
print('Assigned all work in iteration 1')

for i in range(10,20):
tasks.put(i)
print('Assigned all work in iteration 2')

gevent.joinall([
gevent.spawn(boss),
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'bob'),
])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Worker steve got task 1
Worker john got task 2
Worker bob got task 3
Worker steve got task 4
Worker john got task 5
Worker bob got task 6
Assigned all work in iteration 1
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
Worker steve got task 10
Worker john got task 11
Worker bob got task 12
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
Worker steve got task 16
Worker john got task 17
Worker bob got task 18
Assigned all work in iteration 2
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!

Groups and Pools 分组和池

组是运行中的 greenlet 的集合,这些 greenlet 作为组一起管理和调度。它还兼做并行调度程序,借鉴 Python multiprocessing 库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import gevent
from gevent.pool import Group

def talk(msg):
for i in range(3):
print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.join() # 修改了官方的例子,这里join只会让当前线程等待g1,但g2和g3已经被启动,会被继续安排执行
1
2
3
4
5
6
7
8
9
bar
bar
bar
foo
foo
foo
fizz
fizz
fizz

这对于管理异步任务组非常有用。

如上所述,Group 还提供了一个 API,用于将作业分派给分组的 greenlet 并以各种方式收集它们的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n):
print('Size of group %s' % len(group))
print('Hello from Greenlet %s' % id(getcurrent()))

group.map(hello_from, range(3))


def intensive(n):
gevent.sleep(3 - n)
return 'task', n

print('Ordered')

ogroup = Group()
for i in ogroup.imap(intensive, range(3)):
print(i)

print('Unordered')

igroup = Group()
for i in igroup.imap_unordered(intensive, range(3)):
print(i)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Size of group 3
Hello from Greenlet 4340152592
Size of group 3
Hello from Greenlet 4340928912
Size of group 3
Hello from Greenlet 4340928592
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)

池是一种结构,用于处理需要限制并发的动态数量的greenlets。在需要并行执行许多网络或IO绑定任务的情况下,这通常是可取的。

1
2
3
4
5
6
7
8
9
import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):
print('Size of pool %s' % len(pool))

pool.map(hello_from, range(3))
1
2
3
Size of pool 2
Size of pool 2
Size of pool 1

通常在构建 gevent 驱动的服务时,会将整个服务围绕一个池结构进行中心处理。一个例子可能是在各种套接字(socket)上轮询的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from gevent.pool import Pool

class SocketPool(object):

def __init__(self):
self.pool = Pool(1000)
self.pool.start()

def listen(self, socket):
while True:
socket.recv()

def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)

def shutdown(self):
self.pool.kill()

locks and semaphores,锁和信号量

信号量是一个允许 greenlet 相互合作,限制并发访问或运行的低层次的同步原语。 信号量有两个方法,acquire 和 release。在信号量是否已经被 acquire 或 release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。如果一个信号量的范围已经降低到0,它会 阻塞 acquire 操作直到另一个已经获得信号量的 greenlet 作出释放。

信号量是一种低级同步原语,它允许 greenlet 协调和限制并发访问或执行。信号量公开两种方法,获取和释放信号量被获取和释放的次数之差称为信号量的界限。如果信号量范围达到0,它就会阻塞,直到另一个 greenlet释放它的捕获。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
sleep(0)
sem.release()
print('Worker %i released semaphore' % n)

def worker2(n):
with sem:
print('Worker %i acquired semaphore' % n)
sleep(0)
print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, range(0,2))
pool.map(worker2, range(3,6))
1
2
3
4
5
6
7
8
9
10
Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

界限为1的信号量称为锁。它提供对一个greenlet的独占执行。它们通常用于确保资源只在程序上下文中使用一次。

Thread Locals 线程局部变量

Gevent 还允许您指定 greenlet 上下文的本地数据。在内部,这是作为全局查找实现的,它寻址一个由 greenlet getcurrent() 的值键控的私有命名空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import gevent
from gevent.local import local

stash = local()


def f1():
stash.x = 1
print(stash.x)


def f2():
stash.y = 2
print(stash.y)

try:
stash.x
except AttributeError:
print("x is not local to f2")


g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])
1
2
3
1
2
x is not local to f2

许多使用 gevent 的 Web 框架将 HTTP 会话对象存储在 gevent 线程局部变量中。例如,使用 Werkzeug 实用程序库及其代理对象,我们可以创建 Flask 样式的请求对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager
from gevent.pywsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)


@contextmanager
def sessionmanager(environ):
_requests.request = Request(environ)
yield
_requests.request = None


def logic():
return f"Hello {request.remote_addr}".encode('utf-8')


def application_1(environ, start_response):
status = '200 OK'
headers = [('Content-type', 'text/plain')]
start_response(status, headers)
return [b"Hello, World!"]


def application_2(environ, start_response):
status = '200 OK'

with sessionmanager(environ):
body = logic()

headers = [
('Content-Type', 'text/html')
]

start_response(status, headers)
return [body]


# http_server = WSGIServer(('0.0.0.0', 8000), application_1, log=None)
http_server = WSGIServer(('0.0.0.0', 8000), application_2, log=None)
http_server.serve_forever()

Flask 系统比这个例子复杂一点,然而使用线程局部变量作为局部的会话存储,这个思想是相同的。

Subprocess 子进程

自 gevent 1.0 起,gevent.subprocess 添加了 Python subprocess 模块的修补版本。它支持对子进程进行协作等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import gevent
from gevent.subprocess import Popen, PIPE


def cron():
while True:
print("cron")
gevent.sleep(0.2)


g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())
1
2
3
4
5
6
cron
cron
cron
cron
cron
Linux

很多人也想将 gevent 和 multiprocessing 一起使用。最明显的挑战之一就是 multiprocessing 提供的进程间通信默认不是协作式的。由于基于 multiprocessing.Connection 的对象(例如Pipe)暴露了它们下面的 文件描述符(file descriptor),gevent.socket.wait_readwait_write 可以用来在直接读写之前协作式的等待 ready-to-read/ready-to-write 事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write

# To Process
a, b = Pipe()

# From Process
c, d = Pipe()


def relay():
for i in range(10):
msg = b.recv()
c.send(msg + " in " + str(i))


def put_msg():
for i in range(10):
wait_write(a.fileno())
a.send('hi')


def get_msg():
for i in range(10):
wait_read(d.fileno())
print(d.recv())


if __name__ == '__main__':
proc = Process(target=relay)
proc.start()

g1 = gevent.spawn(get_msg)
g2 = gevent.spawn(put_msg)
gevent.joinall([g1, g2], timeout=1)

然而要注意,组合 multiprocessing 和 gevent 必定带来 依赖于操作系统(os-dependent)的缺陷,其中有:

  • 在兼容POSIX的系统创建子进程(forking)之后, 在子进程的gevent的状态是不适定的(ill-posed)。一个副作用就是, multiprocessing.Process创建之前的greenlet创建动作,会在父进程和子进程两 方都运行。

  • 上例的put_msg()中的a.send()可能依然非协作式地阻塞调用的线程:一个 ready-to-write事件只保证写了一个byte。在尝试写完成之前底下的buffer可能是满的。

  • 上面表示的基于wait_write()/wait_read()的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)),因为Windows不能监视 pipe事件。

Python包gipc以大体上透明的方式在 兼容POSIX系统和Windows上克服了这些挑战。它提供了gevent感知的基于multiprocessing.Process的子进程和gevent基于pipe的协作式进程间通信。

Actors 并发模型

actor 模型是一个由于 Erlang 变得普及的更高层的并发模型。简单的说它的主要思想就是许多个独立的 Actor,每个 Actor 有一个可以从 其它 Actor 接收消息的收件箱。Actor 内部的主循环遍历它收到的消息,并根据它期望的行为来采取行动。

Gevent 没有原生的 Actor 类型,但在一个子类化的 Greenlet 内使用队列, 我们可以定义一个非常简单的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent
from gevent.queue import Queue


class Actor(gevent.Greenlet):

def __init__(self):
super(Actor, self).__init__()
self.inbox = Queue()

def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()

def _run(self):
self.running = True

while self.running:
message = self.inbox.get()
self.receive(message)

下面是一个使用的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import gevent
from gevent.queue import Queue


class Actor(gevent.Greenlet):

def __init__(self):
super(Actor, self).__init__()
self.inbox = Queue()

def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()

def _run(self):
self.running = True

while self.running:
message = self.inbox.get()
self.receive(message)


class Pinger(Actor):
def receive(self, message):
print(message)
pong.inbox.put('ping')
gevent.sleep(0)


class Ponger(Actor):
def receive(self, message):
print(message)
ping.inbox.put('pong')
gevent.sleep(0)


ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])

实际应用

Gevent ZeroMQ

ZeroMQ 被它的作者描述为 “一个表现得像一个并发框架的socket库”。它是一个非常强大的,为构建并发和分布式应用的消息传递层。ZeroMQ 提供了各种各样的 socket 原语。最简单的是请求-应答socket对 (Request-Response socket pair)。一个socket有两个方法sendrecv, 两者一般都是阻塞操作。但是Travis Cline 的一个杰出的库弥补了这一点,这个库使用 gevent.socket 来以非阻塞的方式 轮询 ZereMQ socket。安装 gevent-zeremq 命令:pip install gevent-zeromq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import gevent
import zmq.green as zmq

# Global Context
context = zmq.Context()


def server():
server_socket = context.socket(zmq.REQ)
server_socket.bind("tcp://127.0.0.1:5000")

for request in range(1, 10):
server_socket.send("Hello")
print('Switched to Server for %s' % request)
# Implicit context switch occurs here
server_socket.recv()


def client():
client_socket = context.socket(zmq.REP)
client_socket.connect("tcp://127.0.0.1:5000")

for request in range(1, 10):
client_socket.recv()
print('Switched to Client for %s' % request)
# Implicit context switch occurs here
client_socket.send("World")


publisher = gevent.spawn(server)
client = gevent.spawn(client)

gevent.joinall([publisher, client])

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Switched to Server for 1
Switched to Client for 1
Switched to Server for 2
Switched to Client for 2
Switched to Server for 3
Switched to Client for 3
Switched to Server for 4
Switched to Client for 4
Switched to Server for 5
Switched to Client for 5
Switched to Server for 6
Switched to Client for 6
Switched to Server for 7
Switched to Client for 7
Switched to Server for 8
Switched to Client for 8
Switched to Server for 9
Switched to Client for 9

简单 server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``

from gevent.server import StreamServer


def handle(socket, address):
socket.send("Hello from a telnet!\n")
for i in range(5):
socket.send(str(i) + '\n')
socket.close()


server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()

WSGI 服务器 ( 多进程 )

Gevent 为 HTTP 内容服务提供了两种 WSGI server:

  • gevent.pywsgi.WSGIServer

windows 运行报错,linux 可以正常运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from gevent import monkey
from gevent.pywsgi import WSGIServer

monkey.patch_all()

import datetime
import os
from multiprocessing import cpu_count, Process
from flask import Flask, jsonify

app = Flask(__name__)


@app.route("/cppla", methods=['GET'])
def function_benchmark():
return jsonify(
{
"status": "ok",
"time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M'),
"pid": os.getpid()
}
), 200


def run(multi_process=None):
if not multi_process:
WSGIServer(('0.0.0.0', 8080), app).serve_forever()
else:
multi_server = WSGIServer(('0.0.0.0', 8080), app)
multi_server.start()

def server_forever():
multi_server.start_accepting()
multi_server._stop_event.wait()

for i in range(cpu_count()):
p = Process(target=server_forever)
p.start()


if __name__ == "__main__":
# 单进程 + 协程
run(False)
# 多进程 + 协程
# run(True)

Streaming 服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from gevent.pywsgi import WSGIServer


def application(environ, start_response):
status = '200 OK'
body = b'<p>Hello World</p>'

headers = [
('Content-Type', 'text/html')
]

start_response(status, headers)
return [body]


WSGIServer(('', 8000), application).serve_forever()

然而使用 pywsgi 我们可以将 handler 写成 generator,并以块的形式 yield 出结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from gevent.pywsgi import WSGIServer

def application(environ, start_response):
status = '200 OK'

headers = [
('Content-Type', 'text/html')
]

start_response(status, headers)
yield "<p>Hello"
yield "World</p>"

WSGIServer(('', 8000), application).serve_forever()

Long Polling 长轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import simplejson as json

data_source = Queue()


def producer():
while True:
data_source.put_nowait('Hello World')
gevent.sleep(1)


def ajax_endpoint(environ, start_response):
status = '200 OK'
headers = [
('Content-Type', 'application/json')
]

start_response(status, headers)

while True:
try:
datum = data_source.get(timeout=5)
yield json.dumps(datum) + '\n'
except Empty:
pass


gevent.spawn(producer)
WSGIServer(('', 8000), ajax_endpoint).serve_forever()

Websockets 网络套接字

运行Websocket的例子需要gevent-websocket包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Simple gevent-websocket server
import json
import random

from gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandler


class WebSocketApp(object):
'''Send random data to the websocket'''

def __call__(self, environ, start_response):
ws = environ['wsgi.websocket']
x = 0
while True:
data = json.dumps({'x': x, 'y': random.randint(1, 5)})
ws.send(data)
x += 1
sleep(0.5)


server = pywsgi.WSGIServer(
("", 10000), WebSocketApp(),
handler_class=WebSocketHandler
)
server.serve_forever()

HTML Page:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<html>
<head>
<title>Minimal websocket application</title>
<script type="text/javascript" src="jquery.min.js"></script>
<script type="text/javascript">
$(function() {
// Open up a connection to our server
var ws = new WebSocket("ws://localhost:10000/");

// What do we do when we get a message?
ws.onmessage = function(evt) {
$("#placeholder").append('<p>' + evt.data + '</p>')
}
// Just update our conn_status field with the connection status
ws.onopen = function(evt) {
$('#conn_status').html('<b>Connected</b>');
}
ws.onerror = function(evt) {
$('#conn_status').html('<b>Error</b>');
}
ws.onclose = function(evt) {
$('#conn_status').html('<b>Closed</b>');
}
});
</script>
</head>
<body>
<h1>WebSocket Example</h1>
<div id="conn_status">Not Connected</div>
<div id="placeholder" style="width:600px;height:300px;"></div>
</body>
</html>

Chat Server 聊天服务器

实现一个实时聊天室, 运行这个例子需要 Flask (你可以使用Django, Pyramid等,但不是必须的)。 对应的Javascript和HTML文件可以在 这里找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from flask import Flask, render_template, request
from gevent import queue
from gevent.pywsgi import WSGIServer

import simplejson as json

app = Flask(__name__)
app.debug = True


class Room(object):

def __init__(self):
self.users = set()
self.messages = []

def backlog(self, size=25):
return self.messages[-size:]

def subscribe(self, user):
self.users.add(user)

def add(self, message):
for user in self.users:
print(user)
user.queue.put_nowait(message)
self.messages.append(message)


class User(object):

def __init__(self):
self.queue = queue.Queue()


rooms = {
'topic1': Room(),
'topic2': Room(),
}
users = {}


@app.route('/')
def choose_name():
return render_template('choose.html')


@app.route('/<uid>')
def main(uid):
return render_template(
'main.html',
uid=uid,
rooms=rooms.keys()
)


@app.route('/<room>/<uid>')
def join(room, uid):
user = users.get(uid, None)

if not user:
users[uid] = user = User()

active_room = rooms[room]
active_room.subscribe(user)
print('subscribe %s %s' % (active_room, user))

messages = active_room.backlog()

return render_template(
'room.html',
room=room, uid=uid, messages=messages
)


@app.route("/put/<room>/<uid>", methods=["POST"])
def put(room, uid):
user = users[uid]
room = rooms[room]

message = request.form['message']
room.add(':'.join([uid, message]))

return ''


@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):
try:
msg = users[uid].queue.get(timeout=10)
except queue.Empty:
msg = []
return json.dumps(msg)


if __name__ == "__main__":
http = WSGIServer(('', 5000), app)
http.serve_forever()

Python 第三方模块之 Gevent - 协程(官方)
https://flepeng.github.io/021-Python-31-Python-第三方模块-01-进程、线程、协程-相关-Python-第三方模块之-Gevent-协程(官方)/
作者
Lepeng
发布于
2021年4月27日
许可协议