Python 标准库之 select - IO 多路复用

https://docs.python.org/zh-cn/3.8/library/select.html

注解:selectors 模块是在 select 模块原型的基础上进行高级且高效的 I/O 复用。推荐用户改用 selectors 模块,除非用户希望对 OS 级的函数原型进行精确控制。

0、背景

对于初学 Socket 的人来说都不太爱用 Select 写程序,大部分人习惯写诸如connect、accept、recv 或 recvfrom 这样的阻塞程序(所谓阻塞方式block,顾名思义,就是进程或是线程执行到这些函数时必须等待某个事件的发生,如果事件没有发生,进程或线程就被阻塞,函数不能立即返回)。

可以使用 Select 完成非阻塞(所谓非阻塞方式non-block,就是进程或线程执行此函数时不必非要等待事件的发生,一旦执行肯定返回,以返回值的不同来反映函数的执行情况,如果事件发生则与阻塞方式相同,若事件没有发生,则返回一个代码来告知事件未发生,而进程或线程继续执行,所以效率较高)方式工作的程序,它能够监视我们需要监视的文件描述符的变化情况——读写或是异常。

1、select 模块

  • select 模块提供了对 select()poll() 函数的访问,这两个函数在大多数操作系统中是可用的。
    • 在 Linux2.5+ 上可用 epoll()
    • 在 Solaris 及其衍生版本上可用 devpoll()
    • 在大多数 BSD 上可用 kqueue()
    • 在 Windows 上,本模块仅适用于套接字;在其他操作系统上,本模块也适用于其他文件类型(特别地,在Unix上也适用于管道)。
  • select 模块不能用于常规文件,不能检测出(自上次读取文件后)文件是否有新数据写入。

select模块内容:

select模块的函数 描述
select.devpoll() 返回一个 /dev/po11 轮询对象(仅支持 Solaris 及其衍生版本)
select.epol1() 返回一个 edge poll对象(仅支持 Linux 2.5.44 或更高版本)
select.poll() 返回一个 po11 对象(部分操作系统不支持)
select.kqueue() 返回一个内核队列对象(仅支持 FreeBSD)
select.kevent() 返回一个内核事件对象(仅支持 FreeBSD)
select.select() 返回值是三个列表,可以监视读、写、异常事件。当这些事件中的任何一个发生时,select函数会返回。Unixselect()系统调用接口
select.PIPE BUF() 与管道相关

在使用 select 模块时,需要注意以下几点:

  • 最好使用非阻塞的 socket,以避免程序在等待 socket 数据时被阻塞,从而可以处理其它 socket 数据。
  • select 函数可能会有一些性能问题,当需要同时监听大量的文件描述符时,可能会导致 CPU 占用过高,所以使用时需要注意调优。

2、select.select()

1、select.select() 函数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
select.select(rlist, wlist, xlist[, timeout])

作用:这是一个 Unix select() 系统调用接口。

参数:
* rlist:要监听的可读文件对象
* wlist:要监听的可写文件对象
* xlist:要监听的产生异常的文件
* 前三个参数可以是空的可迭代对象,但是否允许三个都是空的则取决于具体平台。(已知在Unix上可行但在Windows上不可行。)
* timeout:超时设置,可选参数以,一个浮点数表示超时秒数。
* timeout 省略时,该函数将被阻塞,直到至少有一个文件描述符就绪。
* timeout 为 0 时,表示执行轮询并且永远不会阻塞。
* timeout 为 n(正整数)时,那么如果监听的句柄均无任何变化,则 select 会阻塞 n 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

返回值:三个列表,包含已就绪对象,返回的三个列表是前三个参数的子集(分别是可读的、可写的、异常的)。当超时时间已到且没有文件描述符就绪时,返回三个空列表。
* 当参数1 序列中的 fd 满足“可读”条件时,则获取发生变化的 fd 并添加到 fd_rlist 中
* 当参数2 序列中的 fd 满足“可写”条件时,则将该序列中的 fd 添加到 fd_wlist中
* 当参数3 序列中的 fd 发生错误时,则将该发生错误的 fd 添加到 fd_elist 中

注意:Windows上不接受文件对象,但接受套接字。在Windows上,底层的select()函数由WinSock库提供,且不处理不是源自WinSock的文件描述符。

当我们调用 select() 时:

  1. 上下文切换转换为内核态
  2. 将fd从用户空间复制到内核空间
  3. 内核遍历所有fd,查看其对应事件是否发生
  4. 如果没发生,将进程阻塞,当设备驱动产生中断或者timeout时间后,将进程唤醒,再次进行遍历
  5. 返回遍历后的fd
  6. 将fd从内核空间复制到用户空间

2、select.select() 示例

示例:服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from socket import *
import select

server = socket(AF_INET, SOCK_STREAM) # 生成socket对象
server.bind(('127.0.0.1', 10000)) # 绑定地址,设置监听
server.listen(5)
inputs = [server] # 将自己也放进待监测列表里

while True:
rlist, wlist, elist = select.select(inputs, [], []) # 使用 select 监听套接字,如果没有任何 fd 就绪,那程序就会一直阻塞在这里
for obj in rlist: # 遍历可读的套接字
if obj is server: # 这个 obj 是 server,说明有新的客户端连接过来了
conn, client_addr = server.accept()
inputs.append(conn) # 将新的客户端套接字加入到套接字列表中
print(inputs)
else: # 这个 obj 不是服务器,说明有客户端发送数据过来了
try: # 异常处理,这是为了防止客户端异常断开报错(比如手动关掉客户端黑窗口,服务器也会跟着报错退出)
data = obj.recv(1024)
print(data)
if not data: # 如果收到的数据为空,表示连接已经断开,需要关闭 socket 并从 inputs 中移除
inputs.remove(obj) # 将客户端套接字从列表中删除
obj.close() # 关闭客户端连接
break
obj.send(data)
except ConnectionResetError: # 如果报错,说明客户端断开了
print("客户端异常断开了", obj)
inputs.remove(obj) # 清理已断开的连接

for w in wlist: # 遍历可写的 fd 列表,即准备好发送数据的那些fd
pass

for e in elist: # 处理报错的 fd
e.close()
print("Error occured in ",e.getpeername())
inputs.remove(e)

tcpSerSock.close() # 最后一行永远不会执行,它只是用来提醒读者,有这种优雅的退出方式

示例:客户端

  • 可以开多个客户端同时连接服务端,每个客户端都可以与服务端进行独立的通信,互不影响。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
from socket import *

conn = socket(AF_INET, SOCK_STREAM)
conn.connect(('127.0.0.1',10000))

while True:
data = input('> ')
if not data:
break
conn.send(bytes(data, 'utf-8'))
data = conn.recv(1024)
if not data:
break
print(str(data, 'utf-8'))
conn.close()

2、select.epoll()

1、select.epoll() 函数说明

1
2
3
4
5
6
7
8
9
select.epoll(sizehint=-1, flags=0)

作用:仅支持 Linux 2.5.44 或更高版本。返回一个edge poll对象,该对象可作为I/O事件的边缘触发或水平触发接口。

参数:
sizehint:指示epoll预计需要注册的事件数。它必须为正数,或为-1(使用默认值)。它仅在 epoll_create1()不可用的旧系统上会被用到,其他情况下它没有任何作用(尽管仍会检查其值)。
flags:已经弃用且完全被忽略(3.4 版后已移除)。但是,如果提供该值,则它必须是 0 或 select.EPOLL_CLOEXEC,否则会抛出OSError异常。

注意:epoll对象支持上下文管理器:当在with语句中使用时,新建的文件描述符会在运行至语句块结束时自动关闭。(在3.4版及之后)

1、epoll 的 eventmask 事件含义

常量 含义
EPOLLIN 可读,状态符为1
EPOLLOUT 可写,状态符为4
EPOLLPRI 紧急数据读取
EPOLLERR 在关联的文件描述符上有错误情况发生,状态符为8
EPOLLHUP 关联的文件描述符已挂起
EPOLLET 设置触发方式为边缘触发,默认为水平触发
EPOLLONESHOT 设置 one-shot 模式。触发一次事件后,该描述符会在轮询对象内部被禁用。
EPOLLEXCLUSIVE 当已关联的描述符发生事件时,仅唤醒一个 epo11 对象。默认(如果未设置此标志)是唤醒所有轮询该描述符的 epol1 对象。3.6新版功能:增加了EPOLLEXCLUSIVE。仅支持 Linux Kernel 4.5或更高版本。
EPOLLRDHUP 流套接字的对侧关闭了连接或关闭了写入到一半的连接。
EPOLLRDNORH 等同于EPOLLIN
EPOLLRDBAND 可以读职优先数据带。
EPOLLTRNORM 等同于 EPOLLOUT
EPOLLTRBAND 可以写入优先级数据。
EPOLLMSG 忽略

2、epoll 的方法

epol1的方法 描述
epoll.close() 关闭用于控制 epo11 对象的文件描述符。
epoll.closed() 如果epo11对象已关闭,则返回True。
epoll.fileno() 返回文件描述符对应的数字,该描述符用于控制epo11对象。
epoll.fromfd(fd) 根据给定的文件描述符创建epo11对象。
epoll.register(fdl,eventmask]) 在epo11对象中注册一个文件描述符。
epoll.modify(fd,eventmask) 修改一个已注册的文件描述符。
epoll.unregister(fd) 从epo11对象中删除一个已注册的文件描述符。
epol1.poll(timeout=None,maxevents=-1) 等待事件发生,timeout 是浮点数,单位为秒。

3、水平触发(LT)和边缘触发(ET)

  • LT(level triggered)
    • 是缺省的工作方式(即epoll的默认工作方式),支持block和no-block socket。
    • 只要满足条件,就触发一个事件。(只要有数据没有被获取完,内核就一直通知)
      • 在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.
    • 优点:当进行socket通信的时候,保证了数据的完整输出,进行IO操作的时候,如果还有数据,就会一直的通知你。
    • 缺点:由于只要还有数据,内核就会不停的从内核空间转到用户空间,所有占用了大量内核资源,试想一下当有大量数据到来的时候,每次读取一个字节,这样就会不停的进行切换。内核资源的浪费严重。效率来讲也是很低的。
  • ET(edge-triggered)
    • 是高速工作方式,只支持no-block socket。
    • 每当状态变化时,触发一个事件(内核只通知一次)
      • 在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once).
    • 优点:每次内核只会通知一次,大大减少了内核资源的浪费,提高效率。
    • 缺点:不能保证数据的完整。不能及时的取出所有的数据。
    • 应用场景: 处理大数据。使用non-block模式的socket。

2、select.epoll() 示例

示例:服务端(必须运行在Linux2.5+)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from socket import *
import select, queue

server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1', 10000))
server.listen(5)
server.setblocking(False)

epoll = select.epoll() # 创建epoll事件对象,后续要监控的事件添加到其中
epoll.register(server.fileno(), select.EPOLLIN) # 注册监听 fd 到等待读事件集合(注意此处不是直接用是server, 而是使用s的fileno)
message_queues = {} # 保存连接客户端消息的字典,格式为{}
fd_to_socket = {server.fileno(): server} # 文件句柄到所对应对象的字典,格式为{句柄:对象}
timeout = 10

while True:
events = epoll.poll(timeout) # 对 epoll 中的套接字进行扫描,返回值为[(文件句柄,对应的事件),(...),....]
if not events:
print('epoll超时无活动连接,重新轮询......')
continue
for fd, event in events:
socket = fd_to_socket[fd]
if socket == server: # 如果活动 socket 为当前服务器 socket,表示有新连接
conn, address = server.accept()
conn.setblocking(False) # 新连接socket设置为非阻塞
epoll.register(conn.fileno(), select.EPOLLIN) # 注册新连接 fd 到待读事件集合
fd_to_socket[conn.fileno()] = conn # 把新连接的文件句柄以及对象保存到字典
message_queues[conn] = queue.Queue()
elif event & select.EPOLLHUP: # 如果事件为:关闭事件
epoll.unregister(fd) # 在epoll中注销客户端的文件句柄
fd_to_socket[fd].close() # 关闭客户端的文件句柄
del fd_to_socket[fd] # 在字典中删除与已关闭客户端相关的信息
elif event & select.EPOLLIN: # 如果事件为:可读事件
data = socket.recv(1024) # 接收数据
if data:
message_queues[socket].put(data) # 将数据放入对应客户端的字典
epoll.modify(fd, select.EPOLLOUT) # 修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)
elif event & select.EPOLLOUT: # 如果事件为:可写事件
try:
msg = message_queues[socket].get_nowait() # 从字典中获取对应客户端的信息
except queue.Empty:
epoll.modify(fd, select.EPOLLIN) # 队列为空,修改文件句柄为读事件
else:
socket.send(msg) # 发送数据给客户端
epoll.unregister(serversocket.fileno()) # 在epoll中注销服务端文件句柄
epoll.close() # 关闭epoll
server.close() # 关闭服务器socket

示例:客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python3
from socket import *

conn = socket(AF_INET, SOCK_STREAM)
conn.connect(('192.168.248.128', 10000))

while True:
data = input('> ')
if not data:
break
conn.send(bytes(data, 'utf-8'))
data = conn.recv(1024)
if not data:
break
print(str(data, 'utf-8'))
conn.close()

Reference


Python 标准库之 select - IO 多路复用
https://flepeng.github.io/021-Python-32-Python-标准库-10-网络相关-Python-标准库之-select-IO-多路复用/
作者
Lepeng
发布于
2016年8月2日
许可协议