Flask 源码解析:多进程多线程

app.run() 中可以接受两个参数,分别是 threadedprocesses,用于开启线程支持和进程支持。先从 app.run() 这个方法看起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def run(self, host=None, port=None, debug=None, **options):
from werkzeug.serving import run_simple
if host is None:
host = '127.0.0.1'
if port is None:
server_name = self.config['SERVER_NAME']
if server_name and ':' in server_name:
port = int(server_name.rsplit(':', 1)[1])
else:
port = 5000
if debug is not None:
self.debug = bool(debug)
options.setdefault('use_reloader', self.debug)
options.setdefault('use_debugger', self.debug)
options.setdefault("threaded", True) # 新版本新增
try:
run_simple(host, port, self, **options) # 会进入这个函数,这个函数 from werkzeug.serving import run_simple
finally:
self._got_first_request = False

经过判断和设置后进入 run_simple() 这个函数,看下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def run_simple(
hostname, # 应用程序的主机
port, # 端口
application, # WSGI 应用程序
use_reloader=False, # 如果程序代码修改,是否需要自动启动服务
use_debugger=False, # 程序是否要使用工具和调试系统
use_evalex=True, # 应用是否开启异常评估
extra_files=None, # 重载器应该查看的文件列表附加到模块。例如配置文件夹
reloader_interval=1, # 秒重载器的间隔
reloader_type="auto", # 重载器的类型
threaded=False, # 进程是否处理单线程的每次请求
processes=1, # 如果大于1,则在新进程中处理每个请求。达到这个最大并发进程数
request_handler=None, # 可以自定义替换BaseHTTPRequestHandler
static_files=None, # 静态文件路径的列表或DICT
passthrough_errors=False,# 将此设置为“真”以禁用错误捕获。这意味着服务器会因错误而死亡
ssl_context=None, # 如何进行传输数据加密,可以设置的环境
):
"省略部分代码"
def inner():
try:
fd = int(os.environ["WERKZEUG_SERVER_FD"])
except (LookupError, ValueError):
fd = None
srv = make_server(
hostname,
port,
application,
threaded,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
if fd is None:
log_startup(srv.socket)
srv.serve_forever()

if use_reloader:
"省略部分代码"
else:
inner()

还是经过一系列判断后默认会进入 inner() 函数,这个函数定义在 run_simple() 内,inner() 中会执行 make_server() 这个函数,看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def make_server(host=None, port=None, app=None, threaded=False, processes=1,
request_handler=None, passthrough_errors=False,
ssl_context=None, fd=None):
"""Create a new server instance that is either threaded, or forks
or just processes one request after another.
"""
if threaded and processes > 1:
raise ValueError("cannot have a multithreaded and multi process server.")
elif threaded:
return ThreadedWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd)
elif processes > 1:
return ForkingWSGIServer(host, port, app, processes, request_handler, passthrough_errors, ssl_context, fd=fd)
else:
return BaseWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd)

make_server 中可知,Flask 提供了三种 server:

  • ThreadedWSGIServer,多线程
  • ForkingWSGIServer,多进程
  • BaseWSGIServer,单进程单线程。默认情况下是 BaseWSGIServer

看到这也很明白了,想要配置多线程或者多进程,则需要设置 threaded 或 processes 这两个参数,而这两个参数是从 app.run()中传递过来的:

1
app.run(**options) ---> run_simple(threaded,processes) ---> make_server(threaded,processes)

默认情况下 Flask 是单线程,单进程的,想要开启只需要在 run 中传入对应的参数: app.run(threaded=True) 即可

多线程

先看多线程,看下 ThreadedWSGIServer 这个类,继承自 ThreadingMixIn, BaseWSGIServer

1
2
3
4
5
6
import socketserver

class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
"""A WSGI server that does threading."""
multithread = True
daemon_threads = True

看下 socketserver.ThreadingMixIn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import threading

class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""

# Decides how threads will act upon termination of the
# main process
daemon_threads = False

def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.

In addition, exception handling is done here.

"""
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)

################################### 看这里,重写了 BaseServer 中的 process_request() 方法
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()

process_request 就是对每个请求产生一个新的线程来处理

验证

最后写一个非常简单的应用来验证以上说法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask import Flask
from flask import _request_ctx_stack

app = Flask(__name__)

@app.route('/')
def index():
print(_request_ctx_stack._local.__ident_func__()) # _request_ctx_stack._local.__ident_func__() 对应这 get_ident()这个函数返回当前线程id
while True:
pass
return '<h1>hello</h1>'


app.run()
# 如果需要开启多线程则 app.run(threaded=True)

这就会产生两种情况:

  1. 没开启多线程的情况下,一次请求过来,服务器直接阻塞,并且之后的其他请求也都阻塞
  2. 开启多线程情况下,每次都会打印出不同的线程 id

结果

第一种情况

1
2
Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
139623180527360

第二种情况

1
2
3
4
Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
140315469436672
140315477829376
140315486222080

结果显而易见,flask 支持多线程,每个请求使用一个线程进行处理。但默认没开启,其次 app.run() 只适用于开发环境,生产环境下可以使用 uWSGI,Gunicorn 等 web 服务器

多进程

1
2
class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
pass

看下 ForkingMixIn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class ForkingMixIn:
"""Mix-in class to handle each request in a new process."""

timeout = 300
active_children = None
max_children = 40
# If true, server_close() waits until all child processes complete.
block_on_close = True

def collect_children(self, *, blocking=False):
"""Internal routine to wait for children that have exited."""
if self.active_children is None:
return

# If we're above the max number of children, wait and reap them until
# we go back below threshold. Note that we use waitpid(-1) below to be
# able to collect children in size(<defunct children>) syscalls instead
# of size(<children>): the downside is that this might reap children
# which we didn't spawn, which is why we only resort to this when we're
# above max_children.
while len(self.active_children) >= self.max_children:
try:
pid, _ = os.waitpid(-1, 0)
self.active_children.discard(pid)
except ChildProcessError:
# we don't have any children, we're done
self.active_children.clear()
except OSError:
break

# Now reap all defunct children.
for pid in self.active_children.copy():
try:
flags = 0 if blocking else os.WNOHANG
pid, _ = os.waitpid(pid, flags)
# if the child hasn't exited yet, pid will be 0 and ignored by
# discard() below
self.active_children.discard(pid)
except ChildProcessError:
# someone else reaped it
self.active_children.discard(pid)
except OSError:
pass

def handle_timeout(self):
"""Wait for zombies after self.timeout seconds of inactivity.

May be extended, do not override.
"""
self.collect_children()

def service_actions(self):
"""Collect the zombie child processes regularly in the ForkingMixIn.

service_actions is called in the BaseServer's serve_forever loop.
"""
self.collect_children()

################################### 看这里,重写了 BaseServer 中的 process_request() 方法
def process_request(self, request, client_address):
"""Fork a new subprocess to process the request."""
pid = os.fork() # 创建一个进程,下面的代码会同时在父进程和子进程中执行。
if pid:
# Parent process
if self.active_children is None:
self.active_children = set()
self.active_children.add(pid)
self.close_request(request)
return
else:
# Child process.
# This must never return, hence os._exit()!
status = 1
try:
self.finish_request(request, client_address)
status = 0
except Exception:
self.handle_error(request, client_address)
finally:
try:
self.shutdown_request(request)
finally:
os._exit(status)

def server_close(self):
super().server_close()
self.collect_children(blocking=self.block_on_close)

Reference


Flask 源码解析:多进程多线程
https://flepeng.github.io/021-Python-32-框架-Flask-Flask-源码解析:多进程多线程/
作者
Lepeng
发布于
2024年9月25日
许可协议