41-Gunicorn 源码解析

总体结构

Gunicorn 使用 pre-fork worker model

  • worker model:意味着这个模型有一个 master 进程,来管理一组 worker进程;
  • fork:意味着 worker 进程是由 master 进程 fork(复刻)出来的;
  • pre-:意味着在任何客户端请求到来之前,就已从 master 进程 fork 出了多个 worker 进程,坐等请求到来;

在处理请求之前,master 会预先 fork 出多个 worker 进程。每个 worker 进程都是独立的,可以在不同的 CPU 核心上运行。

  • Master 不处理 HTTP 请求,只用来管理这些子进程。比如对 worker 进程的创建、销毁、以及根据负载情况增减、升级、重载配置、kill 进程避免 OOM 等。
  • worker 进程的事件循环就是监听网络事件并处理(如新建连接,断开连接,处理请求发送响应等等)。worker 继承了主进程的 listening fd,从 accept、parse http protocol、response 都是在一个 gevent 协程里面的,也就说在协程池的数目允许下,每个连接就是一个 gevent 协程。

执行流程:在 worker 进程创建时,就被实例化了 Python web app;并由 worker 进程监听端口、处理请求。当请求到来时,worker 进程就能解析 HTTP 请求、调用 Python web app 处理、得到处理结果后,再整理成 HTTP Response,通过 TCP 返回给客户端。

启动时设置的 workers 参数只是 worker 数,而 Gunicorn 还会创建个 master 进程。所以,即使配置 workers 为 1,你的 app 也至少有两个进程。

这种模型的优点在于:

  • 隔离性好:每个 worker 进程都是独立的,如果一个进程崩溃,不会影响其他进程。
  • 扩展性强:可以根据需要增加或减少worker进程的数量。
  • 性能优越:可以充分利用多核CPU的并行处理能力。

(以下代码中为了方面理解,均去除了一些干扰代码)。

启动之后,manager 维护数量固定的 worker:

1
2
3
4
5
6
7
class Arbiter:
def manage_workers(self):
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGQUIT)

master 通过 pre-fork 的方式创建多个 worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Arbiter:
def spawn_worker(self):
self.worker_age += 1
# 创建 worker。请注意这里的 app 对象并不是真正的 wsgi app 对象,而是 Gunicorn 的 app 对象。
# Gunicorn 的 app 对象负责 import 我们自己写的 wsgi app 对象。
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
pid = os.fork()
if pid != 0: #父进程,返回后继续创建其他worker,没worker后进入到自己的消息循环
self.WORKERS[pid] = worker
return pid

# Process Child
worker_pid = os.getpid()
try:
# ..........
worker.init_process() # 子进程,初始化woker,进入worker的消息循环,
sys.exit(0)
except SystemExit:
raise

worker.init_process() 函数中,worker 中 Gunicorn 的 app 对象会去 import 我们的 wsgi app。也就是说,每个 woker 子进程都会单独去实例化我们的 wsgi app 对象。每个 worker 中的 swgi app 对象是相互独立、互不干扰的。

创建完所有的 worker 后,worker 和 master 各自进入自己的消息循环。

worker

woker 有很多种,包括:ggevent、geventlet、gtornado、gthread 等等。这里主要分析 ggevent。

每个 ggevent worker 启动的时候会启动多个 server 对象:worker 首先为每个 listener 创建一个 server 对象(注为什么是一组 listener,因为 Gunicorn 可以绑定一组地址,每个地址对应一个 listener),每个 server 对象都有运行在一个单独的 gevent pool 对象中。真正等待链接和处理链接的操作是在 server 对象中进行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class GeventWorker(AsyncWorker):
def run(self):
# 为每个 listener 创建 server 对象
for s in self.sockets:
pool = Pool(self.worker_connections) # 创建gevent pool
if self.server_class is not None:
# 创建server对象
server = self.server_class(
s, # s 是 server 用来监听链接的套接字
application=self.wsgi, # application 即是我们的 wsgi app(通俗点讲就是你用 flask 或者 django 写成的app),我们的 app 就是通过这种方式交给 Gunicorn 的 woker 去跑的
spawn=pool, # spawn 是 gevent 的协程池
log=self.log,
handler_class=self.wsgi_handler, # handler_class 是 gevent 的 pywsgi.WSGIHandler 子类
**ssl_args
)
# .............
server.start() # 启动 server,开始等待链接,服务链接
servers.append(server)
# .........

# 当所有 server 对象创建完毕后,worker 需要定时通知 manager,否则会被认为是挂掉了。
# 这个地方的 notify 机制设计的比较有趣,每个 worker 有个与之对应的 tmp file,每次 notify 的时候去操作一下这个tmp file(比如通过os.fchmod),这个tmp file的last update的时间戳就会更新。而manager则通过检查每个worker对应的temp file的last update的时间戳,来判断这个进程是否是挂掉的。
while self.alive:
self.notify()
gevent.sleep(1.0)

上面代码中的 server_class 实际上是一个 gevent 的 WSGI SERVER 的子类:

1
2
3
4
5
6
7
class PyWSGIServer(pywsgi.WSGIServer):

def __init__(self, listener, application=None, backlog=None, spawn='default',
log='default', error_log='default',
handler_class=None,
environ=None, **ssl_args):
pass

WSGI SERVER

真正等待链接和处理链接的操作是在 gevent 的 WSGIServer 和 WSGIHandler 中进行的。

最后再来看一下 gevent 的 WSGIServer 和 WSGIHandler 的主要实现:

WSGIServer 的 start 函数里面调用 start_accepting 来处理到来的链接。在 start_accepting 里面得到接收到的套接字后调用 do_handle 来处理套接字:

1
2
3
def do_handle(self, *args):
spawn = self._spawn
spawn(self._handle, *args)

可以看出,WSGIServer 实际上是创建一个协程去处理该套接字,也就是说在 WSGIServer 中,一个协程单独负责一个 HTTP 链接。协程中运行的 self._handle 函数实际上是调用了 WSGIHandler 的 handle 函数来不断处理 HTTP 请求:

1
2
3
4
5
6
7
8
9
10
def handle(self):
try:
while self.socket is not None:
result = self.handle_one_request() # 处理HTTP请求
if result is None:
break
if result is True:
continue
self.status, response_body = result
self.socket.sendall(response_body) # 发送回应报文

在 handle 函数的循环内部,handle_one_request 函数首先读取 HTTP 请求,初始化 WSGI 环境,然后最终调用 run_application 函数来处理请求:

1
2
3
def run_application(self):
self.result = self.application(self.environ, self.start_response)
self.process_result()

在这个地方才真正的调用了我们的 app。

总结:Gunicorn 会启动一组 worker 进程,所有 worker 进程公用一组 listener,在每个 worker 中为每个 listener 建立一个 wsgi server。每当有 HTTP 链接到来时,wsgi server 创建一个协程来处理该链接,协程处理该链接的时候,先初始化 WSGI 环境,然后调用用户提供的 app 对象去处理 HTTP 请求。

accept 惊群

  • 在内核 2.6 就没有 accept 惊群这个问题了,

  • 但是当我们多个进程各自把 listen fd 放到 epoll 监听池里面时,其实会造成事件的唤醒,虽然最终只会被一次 accept,但也会有惊群的现象产生。

    • nginx 是通过多个进程轮流持锁的方式来避免 epoll accept 唤醒问题。

Reference


41-Gunicorn 源码解析
https://flepeng.github.io/021-Python-34-框架-Gunicorn-41-Gunicorn-源码解析/
作者
Lepeng
发布于
2024年9月20日
许可协议