核心 Greenlets gevent 中使用的主要模式是 Greenlet,这是作为 C 扩展模块提供给 Python 的一个轻量级协同程序。所有 greenlet 都在主程序的 OS 进程中运行,但它们是协同调度的。
在任何给定的时间内,只有一个 greenlet 在运行。
这不同于由 multiprocessing 或 threading 库提供的并行结构,它们这些库可以自旋进程和 POSIX 线程,由操作系统调度,并且真正并行的。
异步和同步执行 并发性的核心思想是,可以将较大的任务分解为多个子任务的集合,这些子任务计划同时或异步运行,而不是一次或同步运行。两个子任务之间的切换称为上下文切换。
gevent 中的上下文切换是通过 yielding 来完成的。在本例中,我们有两个上下文,它们通过调用 gevent.sleep(0) 相互让步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import geventdef foo () : print('Running in foo' ) gevent.sleep(0 ) print('Explicit context switch to foo again' )def bar () : print('Explicit context to bar' ) gevent.sleep(0 ) print('Implicit context switch back to bar' ) gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
1 2 3 4 Running in foo Explicit context to bar Explicit context switch to foo againImplicit context switch back to bar
当我们将 gevent 用于网络和 IO 绑定函数时,它的真正威力就来了,这些函数可以协同调度。Gevent 负责处理所有细节,以确保网络库尽可能隐式地让步出它们的 greenlet 上下文。我再怎么强调这是一个多么有力的成语也不为过。但也许可以举个例子来说明。
在这种情况下,select()
函数通常是一个阻塞调用,它轮询各种文件描述符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import timeimport geventfrom gevent import select start = time.time() tic = lambda : 'at %1.1f seconds' % (time.time() - start)def gr1 () : print('Started Polling: %s' % tic()) select.select([], [], [], 2 ) print('Ended Polling: %s' % tic())def gr2 () : print('Started Polling: %s' % tic()) select.select([], [], [], 2 ) print('Ended Polling: %s' % tic())def gr3 () : print("Hey lets do some stuff while the greenlets poll, %s" % tic()) gevent.sleep(1 ) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])
1 2 3 4 5 Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 2.0 seconds
另一个比较综合的例子定义了一个不确定的任务函数 (它的输出不能保证对相同的输入给出相同的结果) 。在这种情况下,运行该函数的副作用是任务暂停执行的时间是随机的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import geventimport randomdef task (pid) : """ Some non-deterministic task """ gevent.sleep(random.randint(0 ,2 )*0.001 ) print('Task %s done' % pid)def synchronous () : for i in range(1 ,10 ): task(i)def asynchronous () : threads = [gevent.spawn(task, i) for i in range(10 )] gevent.joinall(threads) print('Synchronous:' ) synchronous() print('Asynchronous:' ) asynchronous()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Synchronous :Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous :Task 1 done Task 5 done Task 6 done Task 2 done Task 4 done Task 7 done Task 8 done Task 9 done Task 0 done Task 3 done
在同步情况下,所有任务都是按顺序运行的,这会导致在执行每个任务时主程序阻塞(即暂停主程序的执行)。
程序的重要部分是 gevent.spawn
,它将给定的函数封装在一个 Greenlet 线程中。初始化的 greenlet 列表存储在传递给 gevent 的数组线程中。gevent.joinall
函数,它会阻塞当前程序,来运行所有给定的 greenlet。只有当所有 greenlet 终止时,执行才会继续进行。
需要注意的是,异步情况下的执行顺序本质上是随机的,异步情况下的总执行时间比同步情况下少得多。实际上,同步的例子完成的最大时间是每个任务停顿0.002秒,导致整个队列停顿0.02秒。在异步情况下,最大运行时间大约为0.002秒,因为没有一个任务会阻塞其他任务的执行。
在更常见的用例中,异步地从服务器获取数据,fetch()
的运行时在请求之间会有所不同,这取决于请求时远程服务器上的负载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import gevent.monkey gevent.monkey.patch_socket()import geventimport urllib2import simplejson as jsondef fetch (pid) : response = urllib2.urlopen('http://json-time.appspot.com/time.json' ) result = response.read() json_result = json.loads(result) datetime = json_result['datetime' ] print('Process %s: %s' % (pid, datetime)) return json_result['datetime' ]def synchronous () : for i in range(1 ,10 ): fetch(i)def asynchronous () : threads = [] for i in range(1 ,10 ): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads) print('Synchronous:' ) synchronous() print('Asynchronous:' ) asynchronous()
确定性 如前所述,greenlet 是确定的。给定相同的 greenlet 配置和相同的输入集,它们总是产生相同的输出。例如,让我们将一个任务分散到一个多进程(multiprocessing)池中,并将其结果与一个 gevent 池的结果进行比较。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import timedef echo (i) : time.sleep(0.001 ) return ifrom multiprocessing.pool import Pool p = Pool(10 ) run1 = [a for a in p.imap_unordered(echo, range(10 ))] run2 = [a for a in p.imap_unordered(echo, range(10 ))] run3 = [a for a in p.imap_unordered(echo, range(10 ))] run4 = [a for a in p.imap_unordered(echo, range(10 ))] print(run1 == run2 == run3 == run4)from gevent.pool import Pool p = Pool(10 ) run1 = [a for a in p.imap_unordered(echo, range(10 ))] run2 = [a for a in p.imap_unordered(echo, range(10 ))] run3 = [a for a in p.imap_unordered(echo, range(10 ))] run4 = [a for a in p.imap_unordered(echo, range(10 ))] print(run1 == run2 == run3 == run4)
尽管 gevent 通常是确定的,但当您开始与外部服务(如 socket 和文件)进行交互时,非确定性的来源可能会潜入您的程序。因此,即使 green 线程是确定性并发的一种形式,它们仍然会遇到POSIX线程和进程遇到的一些相同的问题。
与并发有关的长期问题称为竞争条件。简而言之,当两个并发线程/进程依赖于某些共享资源但还试图修改该值时,就会发生竞争状态。这将导致资源的值变得依赖于执行顺序。这是一个问题,一般来说,应该尽量避免竞态条件,因为它们会导致全局的不确定程序行为。
最好的方法是在任何时候都避免所有全局状态。全局状态和导入时间的副作用总是会回来咬你一口
生成 Greenlets gevent 提供了一些关于 Greenlet 初始化的包装器。一些最常见的模式是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import geventfrom gevent import Greenletdef foo (message, n) : """ Each thread will be passed the message, and n arguments in its initialization. """ gevent.sleep(n) print(message) thread1 = Greenlet.spawn(foo, "Hello" , 1 ) thread2 = gevent.spawn(foo, "I live!" , 2 ) thread3 = gevent.spawn(lambda x: (x+1 ), 2 ) threads = [thread1, thread2, thread3] gevent.joinall(threads)
除了使用基本的Greenlet类,您还可以子类化 Greenlet 类并覆盖 _run 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import geventfrom gevent import Greenletclass MyGreenlet (Greenlet) : def __init__ (self, message, n) : Greenlet.__init__(self) self.message = message self.n = n def _run (self) : print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!" , 3 ) g.start() g.join()
Greenlets 状态 与代码的其他部分一样,greenlet 可能以各种方式失败。greenlet 可能无法抛出异常、无法停止或消耗太多系统资源。
greenlet 的内部状态通常是一个与时间相关的参数。在 greenlets 上有许多标志,它们允许您监视线程的状态:
started
– 布尔值,指示Greenlet是否已启动
ready()
– 布尔值,指示Greenlet是否已停止
successful()
– 布尔值,指示Greenlet是否已停止且没有抛出异常
value
– Greenlet返回的值
exception
– 异常,在greenlet中抛出的未捕获异常实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import geventdef win () : return 'You win!' def fail () : raise Exception('You fail at failing.' ) winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) print(loser.started) try : gevent.joinall([winner, loser])except Exception as e: print('This will never be reached' ) print(winner.value) print(loser.value) print(winner.ready()) print(loser.ready()) print(winner.successful()) print(loser.successful()) print(loser.exception)
1 2 3 4 5 6 7 8 9 True True You win! None True True True False You fail at failing.
程序关闭 当主程序接收到 SIGQUIT 时,无法生成(yield)的 greenlet 可能会使程序的执行时间比预期的更长。这将导致所谓的“僵尸进程”,需要从 Python 解释器外部杀死这些进程。
一种常见的模式是监听主程序上的 SIGQUIT 事件并在退出前调用 gevent.shutdown
。
1 2 3 4 5 6 7 8 9 10 import geventimport signaldef run_forever () : gevent.sleep(1000 )if __name__ == '__main__' : gevent.signal(signal.SIGQUIT, gevent.kill) thread = gevent.spawn(run_forever) thread.join()
超时 超时是对代码块或 Greenlet 的运行时的约束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import geventfrom gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start()def wait () : gevent.sleep(10 )try : gevent.spawn(wait).join()except Timeout: print('Could not complete' )
在 with 语句中,它们还可以与上下文管理器一起使用。
1 2 3 4 5 6 7 8 9 10 import geventfrom gevent import Timeout time_to_wait = 5 class TooLong (Exception) : pass with Timeout(time_to_wait, TooLong): gevent.sleep(10 )
此外,gevent 还为各种 Greenlet 和数据结构相关的调用提供超时参数。例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import geventfrom gevent import Timeoutdef wait () : gevent.sleep(2 ) timer = Timeout(1 ).start() thread1 = gevent.spawn(wait)try : thread1.join(timeout=timer)except Timeout: print('Thread 1 timed out' ) timer = Timeout.start_new(1 ) thread2 = gevent.spawn(wait)try : thread2.get(timeout=timer)except Timeout: print('Thread 2 timed out' )try : gevent.with_timeout(1 , wait)except Timeout: print('Thread 3 timed out' )
1 2 3 Thread 1 timed out Thread 2 timed out Thread 3 timed out
猴子补丁 我们来到了 Gevent 的黑暗角落。到目前为止,我一直避免提到 monkey patching,以尝试和激发强大的协同模式,但是现在是讨论 monkey patching 的黑魔法的时候了。 如果您注意到上面我们调用了命令 monkey.patch_socket()
,这是一个纯粹用于修改标准库套接字库(socket)的副作用命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 import socket print(socket.socket) print("After monkey patch" )from gevent import monkey monkey.patch_socket() print(socket.socket)import select print(select.select) monkey.patch_select() print("After monkey patch" ) print(select.select)
1 2 3 4 5 6 7 class 'socket.socket' After monkey patchclass 'gevent.socket.socket' built-in function select After monkey patchfunction select at 0x1924de8
Python 允许在运行时修改大多数对象,包括模块、类甚至函数。这通常是一个令人震惊的坏主意,因为它创建了一个“隐式副作用”,如果出现问题,通常非常难以调试,然而在极端情况下,库需要改变 Python 本身的基本行为,可以使用 monkey 补丁。在这种情况下,gevent 能够修补标准库中的大多数阻塞系统调用,包括 socket、ssl、threading 和 select 模块中的调用。
例如,Redis-python 的绑定通常使用常规 tcp socket 与 Redis-server 实例通信。只需调用 gevent.monkey.patch_all()
,我们可以让 Redis 绑定协同调度请求,并与 gevent 堆栈的其他部分一起工作。
这让我们可以在不编写任何代码的情况下集成通常无法与 gevent 一起工作的库。(尽管猴子补丁仍然是邪恶的,但在这种情况下,它是“有用的邪恶”。)
数据结构 Events 事件 事件是 greenlet 之间异步通信的一种形式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import geventfrom gevent.event import Event''' Illustrates the use of events ''' evt = Event()def setter () : '''After 3 seconds, wake all threads waiting on the value of evt''' print('A: Hey wait for me, I have to do something' ) gevent.sleep(3 ) print("Ok, I'm done" ) evt.set() def waiter () : '''After 3 seconds the get call will unblock''' print("I'll wait for you" ) evt.wait() print("It's about time" )def main () : gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter) ])if __name__ == '__main__' : main()
事件对象的扩展是 AsyncResult,它允许您在唤醒调用时发送一个值。这有时被称为 future 或 deferred,因为它有对 future 值的引用,可以在任意的时间设置该值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import geventfrom gevent.event import AsyncResult a = AsyncResult()def setter () : """ After 3 seconds set the result of a. """ gevent.sleep(3 ) a.set('Hello!' )def waiter () : """ After 3 seconds the get call will unblock after the setter puts a value into the AsyncResult. """ print(a.get()) gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
Queues 队列 队列是按顺序排列的数据集,它们具有通常的 put / get 操作,但可以在 Greenlets 上安全操作的方式编写。
例如,如果一个 Greenlet 从队列中获取一个项目(item),则同一项目(item)不会被同时执行的另一个 Greenlet 获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import geventfrom gevent.queue import Queue tasks = Queue()def worker (n) : while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0 ) print('Quitting time!' )def boss () : for i in range(1 ,25 ): tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve' ), gevent.spawn(worker, 'john' ), gevent.spawn(worker, 'nancy' ), ])
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Worker steve got task 1 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker john got task 5 Worker nancy got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Worker steve got task 10 Worker john got task 11 Worker nancy got task 12 Worker steve got task 13 Worker john got task 14 Worker nancy got task 15 Worker steve got task 16 Worker john got task 17 Worker nancy got task 18 Worker steve got task 19 Worker john got task 20 Worker nancy got task 21 Worker steve got task 22 Worker john got task 23 Worker nancy got task 24 Quitting time! Quitting time! Quitting time!
根据需要,队列还可以在 put 或 get 上阻塞。
每个 put 和 get 操作都有一个非阻塞的对应操作,put_nowait 和 get_nowait 不会阻塞。如果操作是不可能的,会引发 gevent.queue.Empty 或 gevent.queue.Full
在这个例子中,我们让 boss 同时和 workers 运行,并且对队列有一个限制,防止它包含三个以上的元素。这个限制意味着put操作将阻塞,直到队列上有空间为止。相反,如果队列上没有要获取的元素,get 操作就会阻塞,它还会接受一个超时参数,如果在超时的时间范围内找不到工作(work),则允许队列以异常 gevent.queue.Empty
退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import geventfrom gevent.queue import Queue, Empty tasks = Queue(maxsize=3 )def worker (name) : try : while True : task = tasks.get(timeout=1 ) print('Worker %s got task %s' % (name, task)) gevent.sleep(0 ) except Empty: print('Quitting time!' )def boss () : """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """ for i in range(1 ,10 ): tasks.put(i) print('Assigned all work in iteration 1' ) for i in range(10 ,20 ): tasks.put(i) print('Assigned all work in iteration 2' ) gevent.joinall([ gevent.spawn(boss), gevent.spawn(worker, 'steve' ), gevent.spawn(worker, 'john' ), gevent.spawn(worker, 'bob' ), ])
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Worker steve got task 1 Worker john got task 2 Worker bob got task 3 Worker steve got task 4 Worker john got task 5 Worker bob got task 6 Assigned all work in iteration 1 Worker steve got task 7 Worker john got task 8 Worker bob got task 9 Worker steve got task 10 Worker john got task 11 Worker bob got task 12 Worker steve got task 13 Worker john got task 14 Worker bob got task 15 Worker steve got task 16 Worker john got task 17 Worker bob got task 18 Assigned all work in iteration 2 Worker steve got task 19 Quitting time! Quitting time! Quitting time!
Groups and Pools 分组和池 组是运行中的 greenlet 的集合,这些 greenlet 作为组一起管理和调度。它还兼做并行调度程序,借鉴 Python multiprocessing 库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import geventfrom gevent.pool import Groupdef talk (msg) : for i in range(3 ): print(msg) g1 = gevent.spawn(talk, 'bar' ) g2 = gevent.spawn(talk, 'foo' ) g3 = gevent.spawn(talk, 'fizz' ) group = Group() group.add(g1) group.join()
1 2 3 4 5 6 7 8 9 bar bar bar foo foo foo fizz fizz fizz
这对于管理异步任务组非常有用。
如上所述,Group 还提供了一个 API,用于将作业分派给分组的 greenlet 并以各种方式收集它们的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import geventfrom gevent import getcurrentfrom gevent.pool import Group group = Group()def hello_from (n) : print('Size of group %s' % len(group)) print('Hello from Greenlet %s' % id(getcurrent())) group.map(hello_from, range(3 ))def intensive (n) : gevent.sleep(3 - n) return 'task' , n print('Ordered' ) ogroup = Group()for i in ogroup.imap(intensive, range(3 )): print(i) print('Unordered' ) igroup = Group()for i in igroup.imap_unordered(intensive, range(3 )): print(i)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Size of group 3 Hello from Greenlet 4340152592 Size of group 3 Hello from Greenlet 4340928912 Size of group 3 Hello from Greenlet 4340928592 Ordered ('task' , 0) ('task' , 1) ('task' , 2) Unordered ('task' , 2) ('task' , 1) ('task' , 0)
池是一种结构,用于处理需要限制并发的动态数量的greenlets。在需要并行执行许多网络或IO绑定任务的情况下,这通常是可取的。
1 2 3 4 5 6 7 8 9 import geventfrom gevent.pool import Pool pool = Pool(2 )def hello_from (n) : print('Size of pool %s' % len(pool)) pool.map(hello_from, range(3 ))
1 2 3 Size of pool 2 Size of pool 2 Size of pool 1
通常在构建 gevent 驱动的服务时,会将整个服务围绕一个池结构进行中心处理。一个例子可能是在各种套接字(socket)上轮询的类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from gevent.pool import Poolclass SocketPool (object) : def __init__ (self) : self.pool = Pool(1000 ) self.pool.start() def listen (self, socket) : while True : socket.recv() def add_handler (self, socket) : if self.pool.full(): raise Exception("At maximum pool size" ) else : self.pool.spawn(self.listen, socket) def shutdown (self) : self.pool.kill()
locks and semaphores,锁和信号量 信号量是一个允许 greenlet 相互合作,限制并发访问或运行的低层次的同步原语。 信号量有两个方法,acquire 和 release。在信号量是否已经被 acquire 或 release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。如果一个信号量的范围已经降低到0,它会 阻塞 acquire 操作直到另一个已经获得信号量的 greenlet 作出释放。
信号量是一种低级同步原语,它允许 greenlet 协调和限制并发访问或执行。信号量公开两种方法,获取和释放信号量被获取和释放的次数之差称为信号量的界限。如果信号量范围达到0,它就会阻塞,直到另一个 greenlet释放它的捕获。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from gevent import sleepfrom gevent.pool import Poolfrom gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2 )def worker1 (n) : sem.acquire() print('Worker %i acquired semaphore' % n) sleep(0 ) sem.release() print('Worker %i released semaphore' % n)def worker2 (n) : with sem: print('Worker %i acquired semaphore' % n) sleep(0 ) print('Worker %i released semaphore' % n) pool = Pool() pool.map(worker1, range(0 ,2 )) pool.map(worker2, range(3 ,6 ))
1 2 3 4 5 6 7 8 9 10 Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore Worker 3 acquired semaphore Worker 4 acquired semaphore Worker 3 released semaphore Worker 4 released semaphore Worker 5 acquired semaphore Worker 5 released semaphore
界限为1的信号量称为锁。它提供对一个greenlet的独占执行。它们通常用于确保资源只在程序上下文中使用一次。
Thread Locals 线程局部变量 Gevent 还允许您指定 greenlet 上下文的本地数据。在内部,这是作为全局查找实现的,它寻址一个由 greenlet getcurrent()
的值键控的私有命名空间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import geventfrom gevent.local import local stash = local() def f1 () : stash.x = 1 print(stash.x)def f2 () : stash.y = 2 print(stash.y) try : stash.x except AttributeError: print("x is not local to f2" ) g1 = gevent.spawn(f1) g2 = gevent.spawn(f2) gevent.joinall([g1, g2])
1 2 3 1 2 x is not local to f2
许多使用 gevent 的 Web 框架将 HTTP 会话对象存储在 gevent 线程局部变量中。例如,使用 Werkzeug 实用程序库及其代理对象,我们可以创建 Flask 样式的请求对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from gevent.local import localfrom werkzeug.local import LocalProxyfrom werkzeug.wrappers import Requestfrom contextlib import contextmanagerfrom gevent.pywsgi import WSGIServer _requests = local() request = LocalProxy(lambda : _requests.request) @contextmanager def sessionmanager (environ) : _requests.request = Request(environ) yield _requests.request = None def logic () : return f"Hello {request.remote_addr} " .encode('utf-8' ) def application_1 (environ, start_response) : status = '200 OK' headers = [('Content-type' , 'text/plain' )] start_response(status, headers) return [b"Hello, World!" ] def application_2 (environ, start_response) : status = '200 OK' with sessionmanager(environ): body = logic() headers = [ ('Content-Type' , 'text/html' ) ] start_response(status, headers) return [body] http_server = WSGIServer(('0.0.0.0' , 8000 ), application_2, log=None ) http_server.serve_forever()
Flask 系统比这个例子复杂一点,然而使用线程局部变量作为局部的会话存储,这个思想是相同的。
Subprocess 子进程 自 gevent 1.0 起,gevent.subprocess
添加了 Python subprocess
模块的修补版本。它支持对子进程进行协作等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import geventfrom gevent.subprocess import Popen, PIPE def cron () : while True : print("cron" ) gevent.sleep(0.2 ) g = gevent.spawn(cron) sub = Popen(['sleep 1; uname' ], stdout=PIPE, shell=True ) out, err = sub.communicate() g.kill() print(out.rstrip())
1 2 3 4 5 6 cron cron cron cron cron Linux
很多人也想将 gevent 和 multiprocessing 一起使用。最明显的挑战之一就是 multiprocessing 提供的进程间通信默认不是协作式的。由于基于 multiprocessing.Connection 的对象(例如Pipe)暴露了它们下面的 文件描述符(file descriptor),gevent.socket.wait_read
和 wait_write
可以用来在直接读写之前协作式的等待 ready-to-read/ready-to-write 事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import geventfrom multiprocessing import Process, Pipefrom gevent.socket import wait_read, wait_write a, b = Pipe() c, d = Pipe() def relay () : for i in range(10 ): msg = b.recv() c.send(msg + " in " + str(i)) def put_msg () : for i in range(10 ): wait_write(a.fileno()) a.send('hi' ) def get_msg () : for i in range(10 ): wait_read(d.fileno()) print(d.recv()) if __name__ == '__main__' : proc = Process(target=relay) proc.start() g1 = gevent.spawn(get_msg) g2 = gevent.spawn(put_msg) gevent.joinall([g1, g2], timeout=1 )
然而要注意,组合 multiprocessing
和 gevent 必定带来 依赖于操作系统(os-dependent)的缺陷,其中有:
在兼容POSIX的系统创建子进程(forking) 之后, 在子进程的gevent的状态是不适定的(ill-posed)。一个副作用就是, multiprocessing.Process
创建之前的greenlet创建动作,会在父进程和子进程两 方都运行。
上例的put_msg()
中的a.send()
可能依然非协作式地阻塞调用的线程:一个 ready-to-write事件只保证写了一个byte。在尝试写完成之前底下的buffer可能是满的。
上面表示的基于wait_write()
/wait_read()
的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)
),因为Windows不能监视 pipe事件。
Python包gipc 以大体上透明的方式在 兼容POSIX系统和Windows上克服了这些挑战。它提供了gevent感知的基于multiprocessing.Process
的子进程和gevent基于pipe的协作式进程间通信。
Actors 并发模型 actor 模型是一个由于 Erlang 变得普及的更高层的并发模型。简单的说它的主要思想就是许多个独立的 Actor,每个 Actor 有一个可以从 其它 Actor 接收消息的收件箱。Actor 内部的主循环遍历它收到的消息,并根据它期望的行为来采取行动。
Gevent 没有原生的 Actor 类型,但在一个子类化的 Greenlet 内使用队列, 我们可以定义一个非常简单的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import geventfrom gevent.queue import Queue class Actor (gevent.Greenlet) : def __init__ (self) : super(Actor, self).__init__() self.inbox = Queue() def receive (self, message) : """ Define in your subclass. """ raise NotImplemented () def _run (self) : self.running = True while self.running: message = self.inbox.get() self.receive(message)
下面是一个使用的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import geventfrom gevent.queue import Queue class Actor (gevent.Greenlet) : def __init__ (self) : super(Actor, self).__init__() self.inbox = Queue() def receive (self, message) : """ Define in your subclass. """ raise NotImplemented () def _run (self) : self.running = True while self.running: message = self.inbox.get() self.receive(message) class Pinger (Actor) : def receive (self, message) : print(message) pong.inbox.put('ping' ) gevent.sleep(0 ) class Ponger (Actor) : def receive (self, message) : print(message) ping.inbox.put('pong' ) gevent.sleep(0 ) ping = Pinger() pong = Ponger() ping.start() pong.start() ping.inbox.put('start' ) gevent.joinall([ping, pong])
实际应用 Gevent ZeroMQ ZeroMQ 被它的作者描述为 “一个表现得像一个并发框架的socket库”。它是一个非常强大的,为构建并发和分布式应用的消息传递层。ZeroMQ 提供了各种各样的 socket 原语。最简单的是请求-应答socket对 (Request-Response socket pair)。一个socket有两个方法send
和recv
, 两者一般都是阻塞操作。但是Travis Cline 的一个杰出的库弥补了这一点,这个库使用 gevent.socket
来以非阻塞的方式 轮询 ZereMQ socket。安装 gevent-zeremq 命令:pip install gevent-zeromq
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import geventimport zmq.green as zmq context = zmq.Context() def server () : server_socket = context.socket(zmq.REQ) server_socket.bind("tcp://127.0.0.1:5000" ) for request in range(1 , 10 ): server_socket.send("Hello" ) print('Switched to Server for %s' % request) server_socket.recv() def client () : client_socket = context.socket(zmq.REP) client_socket.connect("tcp://127.0.0.1:5000" ) for request in range(1 , 10 ): client_socket.recv() print('Switched to Client for %s' % request) client_socket.send("World" ) publisher = gevent.spawn(server) client = gevent.spawn(client) gevent.joinall([publisher, client])
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Switched to Server for 1 Switched to Client for 1 Switched to Server for 2 Switched to Client for 2 Switched to Server for 3 Switched to Client for 3 Switched to Server for 4 Switched to Client for 4 Switched to Server for 5 Switched to Client for 5 Switched to Server for 6 Switched to Client for 6 Switched to Server for 7 Switched to Client for 7 Switched to Server for 8 Switched to Client for 8 Switched to Server for 9 Switched to Client for 9
简单 server 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from gevent.server import StreamServer def handle (socket, address) : socket.send("Hello from a telnet!\n" ) for i in range(5 ): socket.send(str(i) + '\n' ) socket.close() server = StreamServer(('127.0.0.1' , 5000 ), handle) server.serve_forever()
WSGI 服务器 ( 多进程 ) Gevent 为 HTTP 内容服务提供了两种 WSGI server:
windows 运行报错,linux 可以正常运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from gevent import monkeyfrom gevent.pywsgi import WSGIServer monkey.patch_all() import datetimeimport osfrom multiprocessing import cpu_count, Processfrom flask import Flask, jsonify app = Flask(__name__) @app.route("/cppla", methods=['GET']) def function_benchmark () : return jsonify( { "status" : "ok" , "time" : datetime.datetime.now().strftime('%Y-%m-%d %H:%M' ), "pid" : os.getpid() } ), 200 def run (multi_process=None) : if not multi_process: WSGIServer(('0.0.0.0' , 8080 ), app).serve_forever() else : multi_server = WSGIServer(('0.0.0.0' , 8080 ), app) multi_server.start() def server_forever () : multi_server.start_accepting() multi_server._stop_event.wait() for i in range(cpu_count()): p = Process(target=server_forever) p.start() if __name__ == "__main__" : run(False )
Streaming 服务器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from gevent.pywsgi import WSGIServer def application (environ, start_response) : status = '200 OK' body = b'<p>Hello World</p>' headers = [ ('Content-Type' , 'text/html' ) ] start_response(status, headers) return [body] WSGIServer(('' , 8000 ), application).serve_forever()
然而使用 pywsgi 我们可以将 handler 写成 generator,并以块的形式 yield 出结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from gevent.pywsgi import WSGIServer def application (environ, start_response) : status = '200 OK' headers = [ ('Content-Type' , 'text/html' ) ] start_response(status, headers) yield "<p>Hello" yield "World</p>" WSGIServer(('' , 8000 ), application).serve_forever()
Long Polling 长轮询 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import geventfrom gevent.queue import Queue, Emptyfrom gevent.pywsgi import WSGIServerimport simplejson as json data_source = Queue() def producer () : while True : data_source.put_nowait('Hello World' ) gevent.sleep(1 ) def ajax_endpoint (environ, start_response) : status = '200 OK' headers = [ ('Content-Type' , 'application/json' ) ] start_response(status, headers) while True : try : datum = data_source.get(timeout=5 ) yield json.dumps(datum) + '\n' except Empty: pass gevent.spawn(producer) WSGIServer(('' , 8000 ), ajax_endpoint).serve_forever()
Websockets 网络套接字 运行Websocket的例子需要gevent-websocket 包。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import jsonimport random from gevent import pywsgi, sleepfrom geventwebsocket.handler import WebSocketHandler class WebSocketApp (object) : '''Send random data to the websocket''' def __call__ (self, environ, start_response) : ws = environ['wsgi.websocket' ] x = 0 while True : data = json.dumps({'x' : x, 'y' : random.randint(1 , 5 )}) ws.send(data) x += 1 sleep(0.5 ) server = pywsgi.WSGIServer( ("" , 10000 ), WebSocketApp(), handler_class=WebSocketHandler ) server.serve_forever()
HTML Page:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 <html> <head> <title>Minimal websocket application</title> <script type="text/javascript" src="jquery.min.js"></script> <script type="text/javascript"> $(function() { // Open up a connection to our server var ws = new WebSocket("ws://localhost:10000/"); // What do we do when we get a message? ws.onmessage = function(evt) { $("#placeholder").append('<p>' + evt.data + '</p>') } // Just update our conn_status field with the connection status ws.onopen = function(evt) { $('#conn_status').html('<b>Connected</b>'); } ws.onerror = function(evt) { $('#conn_status').html('<b>Error</b>'); } ws.onclose = function(evt) { $('#conn_status').html('<b>Closed</b>'); } }); </script> </head> <body> <h1>WebSocket Example</h1> <div id="conn_status">Not Connected</div> <div id="placeholder" style="width:600px;height:300px;"></div> </body> </html>
Chat Server 聊天服务器 实现一个实时聊天室, 运行这个例子需要 Flask (你可以使用Django, Pyramid等,但不是必须的)。 对应的Javascript和HTML文件可以在 这里 找到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 from flask import Flask, render_template, requestfrom gevent import queuefrom gevent.pywsgi import WSGIServer import simplejson as json app = Flask(__name__) app.debug = True class Room (object) : def __init__ (self) : self.users = set() self.messages = [] def backlog (self, size=25 ) : return self.messages[-size:] def subscribe (self, user) : self.users.add(user) def add (self, message) : for user in self.users: print(user) user.queue.put_nowait(message) self.messages.append(message) class User (object) : def __init__ (self) : self.queue = queue.Queue() rooms = { 'topic1' : Room(), 'topic2' : Room(), } users = {} @app.route('/') def choose_name () : return render_template('choose.html' ) @app.route('/<uid>') def main (uid) : return render_template( 'main.html' , uid=uid, rooms=rooms.keys() ) @app.route('/<room>/<uid>') def join (room, uid) : user = users.get(uid, None ) if not user: users[uid] = user = User() active_room = rooms[room] active_room.subscribe(user) print('subscribe %s %s' % (active_room, user)) messages = active_room.backlog() return render_template( 'room.html' , room=room, uid=uid, messages=messages ) @app.route("/put/<room>/<uid>", methods=["POST"]) def put (room, uid) : user = users[uid] room = rooms[room] message = request.form['message' ] room.add(':' .join([uid, message])) return '' @app.route("/poll/<uid>", methods=["POST"]) def poll (uid) : try : msg = users[uid].queue.get(timeout=10 ) except queue.Empty: msg = [] return json.dumps(msg) if __name__ == "__main__" : http = WSGIServer(('' , 5000 ), app) http.serve_forever()