defmake_server(host=None, port=None, app=None, threaded=False, processes=1, request_handler=None, passthrough_errors=False, ssl_context=None, fd=None): """Create a new server instance that is either threaded, or forks or just processes one request after another. """ if threaded and processes > 1: raise ValueError("cannot have a multithreaded and multi process server.") elif threaded: return ThreadedWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd) elif processes > 1: return ForkingWSGIServer(host, port, app, processes, request_handler, passthrough_errors, ssl_context, fd=fd) else: return BaseWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd)
classThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer): """A WSGI server that does threading.""" multithread = True daemon_threads = True
classThreadingMixIn: """Mix-in class to handle each request in a new thread."""
# Decides how threads will act upon termination of the # main process daemon_threads = False
defprocess_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request)
################################### 看这里,重写了 BaseServer 中的 process_request() 方法 defprocess_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
process_request 就是对每个请求产生一个新的线程来处理
验证
最后写一个非常简单的应用来验证以上说法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from flask import Flask from flask import _request_ctx_stack
classForkingMixIn: """Mix-in class to handle each request in a new process."""
timeout = 300 active_children = None max_children = 40 # If true, server_close() waits until all child processes complete. block_on_close = True
defcollect_children(self, *, blocking=False): """Internal routine to wait for children that have exited.""" if self.active_children isNone: return
# If we're above the max number of children, wait and reap them until # we go back below threshold. Note that we use waitpid(-1) below to be # able to collect children in size(<defunct children>) syscalls instead # of size(<children>): the downside is that this might reap children # which we didn't spawn, which is why we only resort to this when we're # above max_children. while len(self.active_children) >= self.max_children: try: pid, _ = os.waitpid(-1, 0) self.active_children.discard(pid) except ChildProcessError: # we don't have any children, we're done self.active_children.clear() except OSError: break
# Now reap all defunct children. for pid in self.active_children.copy(): try: flags = 0if blocking else os.WNOHANG pid, _ = os.waitpid(pid, flags) # if the child hasn't exited yet, pid will be 0 and ignored by # discard() below self.active_children.discard(pid) except ChildProcessError: # someone else reaped it self.active_children.discard(pid) except OSError: pass
defhandle_timeout(self): """Wait for zombies after self.timeout seconds of inactivity.
May be extended, do not override. """ self.collect_children()
defservice_actions(self): """Collect the zombie child processes regularly in the ForkingMixIn.
service_actions is called in the BaseServer's serve_forever loop. """ self.collect_children()
################################### 看这里,重写了 BaseServer 中的 process_request() 方法 defprocess_request(self, request, client_address): """Fork a new subprocess to process the request.""" pid = os.fork() # 创建一个进程,下面的代码会同时在父进程和子进程中执行。 if pid: # Parent process if self.active_children isNone: self.active_children = set() self.active_children.add(pid) self.close_request(request) return else: # Child process. # This must never return, hence os._exit()! status = 1 try: self.finish_request(request, client_address) status = 0 except Exception: self.handle_error(request, client_address) finally: try: self.shutdown_request(request) finally: os._exit(status)