80-32 IO 模型-多路复用 poll

简介

poll 的机制与 select 类似,与 select 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll 没有最大文件描述符数量的限制。poll 和 select 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

poll 函数

函数格式如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# include <poll.h>
int poll (struct pollfd * fds, unsigned int nfds, int timeout);

返回值:
就绪描述符的数目,超时返回 0,出错返回 -1,并设置errno为下列值之一:
EBADF 一个或多个结构体中指定的文件描述符无效。
EFAULTfds 指针指向的地址超出进程的地址空间。
EINTR 请求的事件之前产生一个信号,调用可以重新发起。
EINVALnfds 参数超出 PLIMIT_NOFILE 值。
ENOMEM 可用内存不足,无法完成请求。

参数:

fds:fds 是一个 struct pollfd 类型的数组,用于存放需要检测其状态的 socket 描述符,并且调用 poll 函数之后 fds 数组不会被清空。

nfdsfds 数组的成员数量。

timeout:调用应该等待的最大毫秒数,以阻塞的方式等待文件描述符变为就绪。
如果值是 -1,poll 将会无限期的阻塞。
如果值是 0, poll 将立即返回。

pollfd 结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 等待的事件 */
short revents; /* 实际发生了的事件 */
} ;

其中:
fd:需要被 poll 监控的文件描述符

events:一组位标志,表示对应的文件描述符上我们感兴趣的事件。比如:POLLIN(有数据可读),POLLOUT(写数据不会阻塞),POLLERR(错误事件),POLLHUP(挂起事件)等。

revents:一组位标志,表示在对应的文件描述符上实际发生了哪些事件。这是 poll 调用返回后由内核填充的。

原理

Poll IO 多路复用的执行原理

  1. 将当前进程所有文件描述符,一次性的从用户态拷贝到内核态。
  2. 在内核中遍历每个 pollfd 结构体中列出的文件描述符,判断是否有数据达到。如果有,内核将会在 revents 字段中设置相应的位,以指示哪些事件已经发生。
  3. 然后将所有 pollfd 状态,从内核态拷贝到用户态,并返回已就绪 pollfd 的个数。
  4. 在用户态应用程序可以检查每个 pollfd 结构体的 revents 字段来确定每个文件描述符上发生了哪些事件,然后进行相应的事件处理。

优缺点

Poll IO多 路复用解决的问题和不足

  1. poll 模型采用的 pollfd 结构数组,解决了 Select 的 1024 个文件描述符的限制。
  2. 但仍然存在频繁的用户态和内核态拷贝,性能开销较大。
  3. 需要对文件描述符表进行遍历,0(n) 的轮询时间复杂度。
  4. poll 只能在 linux 系统中使用,不能夸系统使用,所以 poll 的地位就很尴尬。

代码

c

服务器端程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <poll.h>
#include <unistd.h>
#include <sys/types.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8787
#define MAXLINE 1024
#define LISTENQ 5
#define OPEN_MAX 1000
#define INFTIM -1

//函数声明
//创建套接字并进行绑定
static int socket_bind(const char* ip,int port);
//IO多路复用poll
static void do_poll(int listenfd);
//处理多个连接
static void handle_connection(struct pollfd *connfds,int num);

int main(int argc,char *argv[])
{
int listenfd,connfd,sockfd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
listenfd = socket_bind(IPADDRESS,PORT);
listen(listenfd,LISTENQ);
do_poll(listenfd);
return 0;
}

static int socket_bind(const char* ip,int port)
{
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1)
{
perror("socket error:");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
{
perror("bind error: ");
exit(1);
}
return listenfd;
}

static void do_poll(int listenfd)
{
int connfd,sockfd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
struct pollfd clientfds[OPEN_MAX];
int maxi;
int i;
int nready;
//添加监听描述符
clientfds[0].fd = listenfd;
clientfds[0].events = POLLIN;
//初始化客户连接描述符
for (i = 1;i < OPEN_MAX;i++)
clientfds[i].fd = -1;
maxi = 0;
//循环处理
for ( ; ; )
{
//获取可用描述符的个数
nready = poll(clientfds,maxi+1,INFTIM);
if (nready == -1)
{
perror("poll error:");
exit(1);
}
//测试监听描述符是否准备好
if (clientfds[0].revents & POLLIN)
{
cliaddrlen = sizeof(cliaddr);
//接受新的连接
if ((connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen)) == -1)
{
if (errno == EINTR)
continue;
else
{
perror("accept error:");
exit(1);
}
}
fprintf(stdout,"accept a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
//将新的连接描述符添加到数组中
for (i = 1;i < OPEN_MAX;i++)
{
if (clientfds[i].fd < 0)
{
clientfds[i].fd = connfd;
break;
}
}
if (i == OPEN_MAX)
{
fprintf(stderr,"too many clients.\n");
exit(1);
}
//将新的描述符添加到读描述符集合中
clientfds[i].events = POLLIN;
//记录客户连接套接字的个数
maxi = (i > maxi ? i : maxi);
if (--nready <= 0)
continue;
}
//处理客户连接
handle_connection(clientfds,maxi);
}
}

static void handle_connection(struct pollfd *connfds,int num)
{
int i,n;
char buf[MAXLINE];
memset(buf,0,MAXLINE);
for (i = 1;i <= num;i++)
{
if (connfds[i].fd < 0)
continue;
//测试客户描述符是否准备好
if (connfds[i].revents & POLLIN)
{
//接收客户端发送的信息
n = read(connfds[i].fd,buf,MAXLINE);
if (n == 0)
{
close(connfds[i].fd);
connfds[i].fd = -1;
continue;
}
// printf("read msg is: ");
write(STDOUT_FILENO,buf,n);
//向客户端发送buf
write(connfds[i].fd,buf,n);
}
}
}

客户端代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>

#define MAXLINE 1024
#define IPADDRESS "127.0.0.1"
#define SERV_PORT 8787

#define max(a,b) (a > b) ? a : b

static void handle_connection(int sockfd);

int main(int argc,char *argv[])
{
int sockfd;
struct sockaddr_in servaddr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);
connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
//处理连接描述符
handle_connection(sockfd);
return 0;
}

static void handle_connection(int sockfd)
{
char sendline[MAXLINE],recvline[MAXLINE];
int maxfdp,stdineof;
struct pollfd pfds[2];
int n;
//添加连接描述符
pfds[0].fd = sockfd;
pfds[0].events = POLLIN;
//添加标准输入描述符
pfds[1].fd = STDIN_FILENO;
pfds[1].events = POLLIN;
for (; ;)
{
poll(pfds,2,-1);
if (pfds[0].revents & POLLIN)
{
n = read(sockfd,recvline,MAXLINE);
if (n == 0)
{
fprintf(stderr,"client: server is closed.\n");
close(sockfd);
}
write(STDOUT_FILENO,recvline,n);
}
//测试标准输入是否准备好
if (pfds[1].revents & POLLIN)
{
n = read(STDIN_FILENO,sendline,MAXLINE);
if (n == 0)
{
shutdown(sockfd,SHUT_WR);
continue;
}
write(sockfd,sendline,n);
}
}
}

Reference


80-32 IO 模型-多路复用 poll
https://flepeng.github.io/010-network-80-32-IO-模型-多路复用-poll/
作者
Lepeng
发布于
2021年3月8日
许可协议