80-31 IO 模型-多路复用 select

select 函数

该函数准许进程指示内核等待多个事件中的任何一个发送,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒。函数原型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <sys/select.h>
#include <sys/time.h>

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)

返回值:
就绪描述符的数目,超时返回 0,出错返回 -1

函数参数:

maxfdp1:指定待测试的描述字个数,需要检查的所有文件描述符中的最大值加 1(因此把该参数命名为 maxfdp1)
fd 012...maxfdp1-1 均将被测试。因为文件描述符是从 0 开始的。
大小限制为 FD_SETSIZE(1024)(32 位默认102464位默认2048),位数组的每一位代表其对应的描述符是否需要被检查。

中间的三个参数:
readset:要监视的读文件描述符集合 如果不关心 可以传NULL
writeset:要监视的写文件描述符集合 如果不关心 可以传NULL
exceptset:要监视的异常文件描述符集合 如果不关心 可以传NULL

struct fd_set 是一个文件描述符集合,select 中是用一位数组来实现的,一个描述占一位,可以简单理解为 bitmap,有以下几种设置方法:

void FD_ZERO(fd_set *fdset); // 用于清除一个 fd_set 的所有位,即初始化一个 fd_set,全部置为 0
void FD_SET(int fd, fd_set *fdset); // 用于将特定的文件描述符 fd 加入到 fd_set 中
void FD_CLR(int fd, fd_set *fdset); // 用于将特定的文件描述符 fd 从 fd_set 中移除
int FD_ISSET(int fd, fd_set *fdset); // 用于检查特定的文件描述符fd是否在fd_set中,如果在,函数返回非零值,否则返回0。

timeout:告知内核等待所指定描述字中的任何一个就绪可花多少时间。其 timeval 结构用于指定这段时间的秒数和微秒数。
struct timeval{
long tv_sec; //seconds
long tv_usec; //microseconds
};

这个参数有三个取值:
1. NULL:永远等待下去:仅在有一个描述字准备好I/O时才返回。
2. 大于0:等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
3. 0:检查描述字后立即返回,这称为轮询。

原理图:

原理

Select 多络复用的执行原理

  1. 将当前进程的所有文件描述符,一次性的从用户态拷贝到内核态。
  2. 在内存中快速的无差别遍历每个 fd,判断是否有数据达到。
  3. 将所有 fd 状态,从内核态拷贝到用户态,并返回已就绪 fd 的个数。
  4. 在用户态遍历判断具体哪个 fd 已就绪,然后再发送系统调用(比如 accept),把数据 copy 到用户空间。

伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#define MAXCLINE 5 // 连接队列中的个数
int fd[MAXCLINE]; // 连接的文件描述符队列
int main(void){
sock_fd = socket(AF_INET, SOCK_STREAM, 0) // 建立主机间通信的 socket 结构体
.....
bind(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr); // 绑定socket到当前服务器
listen(sock_fd, 5); // 监听 5 个TCP 连接
fd_set fdsr; // bitmap 类型的文件描述符集合,01100 表示第1、2位有数据到达
int max;

for(i = 0; i < 5; i++){
.....
fd[i] = accept(sock_fd, (struct sockaddr *)&client_addr, &sin_size); // 跟 5 个客户端依次建立 TCP 连接,并将连接放入 fd 文件描述符队列
}

while(1){ // 循环监听连接上的数据是否到达
FD_ZERO(&fdsr); // 对 fd_set 即 bitmap 类型进行复位,即全部重置为0

for(i = 0; i < 5; i++){
FD_SET(fd[i], &fdsr); // 将要监听的TCP连接对应的文件描述符所在的bitmap的位置置1,比如 0110010110 表示需要监听第 1、2、5、7、8个文件描述符对应的 TCP 连接
}

ret = select(max + 1, &fdsr, NULL, NULL, NULL); // 调用select系统函数进入内核检查哪个连接的数据到达,是否阻塞取决于最后一个参数

for(i=0;i<5;i++){
if(FD_ISSET(fd[i], &fdsr)){ // fd_set 中为1的位置表示的连接,意味着有数据到达,可以让用户进程读取
ret = recv(fd[i], buf,sizeof(buf), 0);
......
}
}
}
}

优缺点

优点

  • 使用 select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。
  • select 可以夸系统使用,poll 不能夸系统使用。

Select IO 多路复用的限制和不足

  • bitMap 默认大小只有 1024(32 位默认 1024 个,64 位默认 2048),可以改,select 性能就这样了,再往上改意义不大。
    因为 bitMap 需要从用户空间拷贝到内核空间,为了控制拷贝的大小而做了限制。
  • 在 FD_SET 之前每次都得清零(FD_ZERO),那么意味着我们不能重用 bitMap
  • 在整体拷贝 bitMap 到内核态时是仍会有一定的开销的(比我们 for 循环一个一个判断要好得多)
  • 最后仍需要进行一次 O(n) 的遍历

示例代码

c

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <assert.h>

#define IPADDR "127.0.0.1"
#define PORT 8787
#define MAXLINE 1024
#define LISTENQ 5
#define SIZE 10

typedef struct server_context_st
{
int cli_cnt; /*客户端个数*/
int clifds[SIZE]; /*客户端的个数*/
fd_set allfds; /*句柄集合*/
int maxfd; /*句柄最大值*/
} server_context_st;
static server_context_st *s_srv_ctx = NULL;
/*===========================================================================
* ==========================================================================*/
static int create_server_proc(const char* ip,int port)
{
int fd;
struct sockaddr_in servaddr;
fd = socket(AF_INET, SOCK_STREAM,0);
if (fd == -1) {
fprintf(stderr, "create socket fail,erron:%d,reason:%s\n",
errno, strerror(errno));
return -1;
}

/*一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。*/
int reuse = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
return -1;
}

bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);

if (bind(fd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1) {
perror("bind error: ");
return -1;
}

listen(fd,LISTENQ);

return fd;
}

static int accept_client_proc(int srvfd)
{
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
cliaddrlen = sizeof(cliaddr);
int clifd = -1;

printf("accpet clint proc is called.\n");

ACCEPT:
clifd = accept(srvfd,(struct sockaddr*)&cliaddr,&cliaddrlen);

if (clifd == -1) {
if (errno == EINTR) {
goto ACCEPT;
} else {
fprintf(stderr, "accept fail,error:%s\n", strerror(errno));
return -1;
}
}

fprintf(stdout, "accept a new client: %s:%d\n",
inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);

//将新的连接描述符添加到数组中
int i = 0;
for (i = 0; i < SIZE; i++) {
if (s_srv_ctx->clifds[i] < 0) {
s_srv_ctx->clifds[i] = clifd;
s_srv_ctx->cli_cnt++;
break;
}
}

if (i == SIZE) {
fprintf(stderr,"too many clients.\n");
return -1;
}
}

static int handle_client_msg(int fd, char *buf)
{
assert(buf);
printf("recv buf is :%s\n", buf);
write(fd, buf, strlen(buf) +1);
return 0;
}

static void recv_client_msg(fd_set *readfds)
{
int i = 0, n = 0;
int clifd;
char buf[MAXLINE] = {0};
for (i = 0;i <= s_srv_ctx->cli_cnt;i++) {
clifd = s_srv_ctx->clifds[i];
if (clifd < 0) {
continue;
}
/*判断客户端套接字是否有数据*/
if (FD_ISSET(clifd, readfds)) {
//接收客户端发送的信息
n = read(clifd, buf, MAXLINE);
if (n <= 0) {
/*n==0表示读取完成,客户都关闭套接字*/
FD_CLR(clifd, &s_srv_ctx->allfds);
close(clifd);
s_srv_ctx->clifds[i] = -1;
continue;
}
handle_client_msg(clifd, buf);
}
}
}
static void handle_client_proc(int srvfd)
{
int clifd = -1;
int retval = 0;
fd_set *readfds = &s_srv_ctx->allfds;
struct timeval tv;
int i = 0;

while (1) {
/*每次调用select前都要重新设置文件描述符和时间,因为事件发生后,文件描述符和时间都被内核修改啦*/
FD_ZERO(readfds);
/*添加监听套接字*/
FD_SET(srvfd, readfds);
s_srv_ctx->maxfd = srvfd;

tv.tv_sec = 30;
tv.tv_usec = 0;
/*添加客户端套接字*/
for (i = 0; i < s_srv_ctx->cli_cnt; i++) {
clifd = s_srv_ctx->clifds[i];
/*去除无效的客户端句柄*/
if (clifd != -1) {
FD_SET(clifd, readfds);
}
s_srv_ctx->maxfd = (clifd > s_srv_ctx->maxfd ? clifd : s_srv_ctx->maxfd);
}

/*开始轮询接收处理服务端和客户端套接字*/
retval = select(s_srv_ctx->maxfd + 1, readfds, NULL, NULL, &tv);
if (retval == -1) {
fprintf(stderr, "select error:%s.\n", strerror(errno));
return;
}
if (retval == 0) {
fprintf(stdout, "select is timeout.\n");
continue;
}
if (FD_ISSET(srvfd, readfds)) {
/*监听客户端请求*/
accept_client_proc(srvfd);
} else {
/*接受处理客户端消息*/
recv_client_msg(readfds);
}
}
}

static void server_uninit()
{
if (s_srv_ctx) {
free(s_srv_ctx);
s_srv_ctx = NULL;
}
}

static int server_init()
{
s_srv_ctx = (server_context_st *)malloc(sizeof(server_context_st));
if (s_srv_ctx == NULL) {
return -1;
}

memset(s_srv_ctx, 0, sizeof(server_context_st));

int i = 0;
for (;i < SIZE; i++) {
s_srv_ctx->clifds[i] = -1;
}

return 0;
}

int main(int argc,char *argv[])
{
int srvfd;
/*初始化服务端context*/
if (server_init() < 0) {
return -1;
}
/*创建服务,开始监听客户端请求*/
srvfd = create_server_proc(IPADDR, PORT);
if (srvfd < 0) {
fprintf(stderr, "socket create or bind fail.\n");
goto err;
}
/*开始接收并处理客户端请求*/
handle_client_proc(srvfd);
server_uninit();
return 0;
err:
server_uninit();
return -1;
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/select.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <errno.h>

#define MAXLINE 1024
#define IPADDRESS "127.0.0.1"
#define SERV_PORT 8787

#define max(a,b) (a > b) ? a : b

static void handle_recv_msg(int sockfd, char *buf)
{
printf("client recv msg is:%s\n", buf);
sleep(5);
write(sockfd, buf, strlen(buf) +1);
}

static void handle_connection(int sockfd)
{
char sendline[MAXLINE],recvline[MAXLINE];
int maxfdp,stdineof;
fd_set readfds;
int n;
struct timeval tv;
int retval = 0;

while (1) {

FD_ZERO(&readfds);
FD_SET(sockfd,&readfds);
maxfdp = sockfd;

tv.tv_sec = 5;
tv.tv_usec = 0;

retval = select(maxfdp+1,&readfds,NULL,NULL,&tv);

if (retval == -1) {
return ;
}

if (retval == 0) {
printf("client timeout.\n");
continue;
}

if (FD_ISSET(sockfd, &readfds)) {
n = read(sockfd,recvline,MAXLINE);
if (n <= 0) {
fprintf(stderr,"client: server is closed.\n");
close(sockfd);
FD_CLR(sockfd,&readfds);
return;
}

handle_recv_msg(sockfd, recvline);
}
}
}

int main(int argc,char *argv[])
{
int sockfd;
struct sockaddr_in servaddr;

sockfd = socket(AF_INET,SOCK_STREAM,0);

bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);

int retval = 0;
retval = connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
if (retval < 0) {
fprintf(stderr, "connect fail,error:%s\n", strerror(errno));
return -1;
}

printf("client send to server .\n");
write(sockfd, "hello server", 32);

handle_connection(sockfd);

return 0;
}

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#服务端
from socket import *
import select

s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8081))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
read_l=[s,]
while True:
r_l,w_l,x_l=select.select(read_l,[],[])
print(r_l)
for ready_obj in r_l:
if ready_obj == s:
conn,addr=ready_obj.accept() #此时的ready_obj等于s
read_l.append(conn)
else:
try:
data=ready_obj.recv(1024) #此时的ready_obj等于conn
if not data:
ready_obj.close()
read_l.remove(ready_obj)
continue
ready_obj.send(data.upper())
except ConnectionResetError:
ready_obj.close()
read_l.remove(ready_obj)

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))

while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))

Reference


80-31 IO 模型-多路复用 select
https://flepeng.github.io/010-network-80-31-IO-模型-多路复用-select/
作者
Lepeng
发布于
2021年3月8日
许可协议