Unix网络编程(六、select模型)

select

函数原型

1
int select(int maxfd, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);

头文件
#include <sys/select.h>
参数
timeval 表示等待描述符任意一个就绪所需的时间。它的结构如下

1
2
3
4
5
struct timeval
{
long tv_sec;
long tv_usec; // 微秒
}

用户有一下三种

  1. 将该参数设置为NULL,永远等待,直到某个描述符准备好IO才返回。
  2. 等待指定时间。即用户设置的时间。
  3. 不等待,直接返回。将秒和微秒设置为0。

中间的三个参数为需要内核测试读、写、和异常条件的描述符集。通常只需使用读和写就足够了。
读、写描述符集通常为一个整型数组,数组中的每一个元素的每一个bit对应到一个文件描述符,该描述符在数值上等于该bit相对于0bit的偏移。比如说通过socket函数打开了一个文件描述符,返回的值为5,那么将这个fdset中的第5bit置1,即为将该文件描述符添加到fdset中。
所以对描述符的存取涉及到较为复杂的位运算。为了让这些存取操作与用户无关,select模型提供了一组接口进行描述符的存取

FD_ZERO

1
2
// 将整个描述符集的所有bits置0
void FD_ZERO(fd_set *fdset);

FD_SET

1
2
// 将fdset中偏移 fd bits的位 置1
void FD_SET(int fd, fd_set *fdset);

FD_CLR(int fd, fd_set *fdset);

1
2
// 将fdset中偏移 fd bits的位 置0
void FD_CLR(int fd, fd_set *fdset);

FD_ISSET

1
2
// 判断该fd是否在fdset中,也就是检查该位是否为1,返回该位在fdset的值
int FD_ISSET(int fd, fd_set *fdset);

maxfd 表示在fd_set中需要检测的最大位。因为Linux中一个进程能打开的文件描述符有限,它的最大值在内核中有个最大值 FD_SETSIZE(1024,不同内核这个值不一样)。在不修改内核中这个值得前提下,select最大能监控的文件描述符只有1024个,这也是它的限制所在。
存在这个值得原因是出于效率考虑,如何一个进程用不了那么多文件描述符,就可以将该值设置的小一点,那么检测那些为0的描述符了。

select函数通过修改readset, writeset, exceptset这些文件描述符集来标识哪些文件描述符已经就绪。该函数返回后,就可以调用函数FD_ISSET来测试fd_set中的文件描述符,它会将所有未就绪的描述符对应的bit置0。所以每次调用select的时候,都得把需要检测的描述符集中对应的bit置1.

返回值 成功返回以就绪的文件描述符的数量。如果定时器到了,还没有已就绪的文件描述符,返回0。-1表示出错。

#描述符的就绪条件
满足一下四个条件中的任意一个时,一个套接字准备好读。

  1. 套接字中接受缓冲区的数据字节数大于等于套接字缓冲区的低水位标记(也就是表示套接字可读的最下数据量),在TCP中通常为1。
  2. 套接字读半关闭(close_wait,接收了FIN的连接)。对这样的套接字不阻塞,直接返回0。
  3. socket是一个用于监听的socket,并且已经完成的连接数为非0.这样的soocket处于可读状态,是因为socket收到了对方的connect请求,执行了三次握手的第一步:对方发送SYN请求过来,使监听socket处于可读状态;正常情况下,这样的socket上的accept操作不会阻塞
  4. 有一个socket有异常错误条件待处理.对于这样的socket的读操作将不会阻塞,并且返回一个错误(-1),errno则设置成明确的错误条件.这些待处理的错误也可通过指定socket选项SO_ERROR调用getsockopt来取得并清除

满足以下四个条件的任意一个时,套接字准备好写。

  1. socket的发送缓冲区中的数据字节大于等于该socket的发送缓冲区低水位标记的当前大小。对这样的socket的写操作将不阻塞并返回一个大于0的值(也就是返回准备好写入的数据)。我们可以用SO_SNDLOWAT socket选项来设置该socket的低水位标记。对于TCP和UDPsocket而言,其缺省值为2048
  2. 该连接的写这一半关闭(TIME_WAIT2)。对这样的socket的写操作将产生SIGPIPE信号,该信号的缺省行为是终止进程
  3. 有一个socket异常错误条件待处理.对于这样的socket的写操作将不会阻塞并且返回一个错误(-1),errno则设置成明确的错误条件.这些待处理的错误也可以通过指定socket选项SO_ERROR调用getsockopt函数来取得并清除
  4. 使用非阻塞式connect的套接字已建立连接,或者connect返回失败。

当某个套接字上发生错误时,它将由select标记为即可读又可写

这张图总结了select可读可写的条件
图片来自unix网络编程

用select实现客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void my_cli(FILE* fd, int sockfd) {
char sendline[MAXLINE] = {0};
char recvline[MAXLINE]= {0};

fd_set read_set;
int maxfd = 0;
FD_ZERO(&read_set);
while (true) {
FD_SET(sockfd, &read_set);
FD_SET(fileno(fd), &read_set);
maxfd = (sockfd > fileno(fd)) ? (sockfd + 1) : (fileno(fd) + 1);

if (select(maxfd, &read_set, NULL, NULL, NULL) == -1) {
return;
}

if (FD_ISSET(sockfd, &read_set))
{
if (read(sockfd, recvline, MAXLINE) == 0) {
printf("service has exit");
return;
}
fputs(recvline, stdout);
memset(recvline, 0, MAXLINE);
}

if (FD_ISSET(fileno(fd), &read_set)) {
if (fgets(sendline, MAXLINE, fd) == NULL) {
return;
}
write(sockfd, sendline, strlen(sendline));
}

}
}

这段代码通过select同时管理标准输入和与服务端通信的文件描述符。
如果标准输入可读,就通过fgets获取一行输入,然后发送到服务端。 如果服务端有数据到来,就通过read读取到服务端发过来的数据。这样程序的阻塞态就与具体的业务实现无关了。也就顺利的解决了上一篇文章所说的客户端因为阻塞在fgets函数上导致接收到服务端的FIN之后还在向服务端发数据的问题。现在客户端只有收到服务端发过来的FIN后,便立即退出。

客户端批量收发数据bug

目前我们的客户端如果用来与服务端进行交互的话已经算一个比较健壮的程序了。但是如果进行数据批量处理时又会产生新的bug。
考虑这样一种情况。现在有大量的数据要发往服务端,比如说将一个文件的每行数据读出来,然后发往服务端。

看下面这段代码, 客户端从文件读数据发往服务端,服务端收到数据后将这行数据返回,客户端收到服务端返回的数据后将其写入文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void my_cli(FILE* fd, int sockfd) {
char sendline[MAXLINE] = {0};
char recvline[MAXLINE]= {0};

fd_set read_set;
int maxfd = 0;
FD_ZERO(&read_set);

FILE* writefp = fopen("out.txt", "w+");

while (true) {
FD_SET(sockfd, &read_set);
FD_SET(fileno(fd), &read_set);
maxfd = (sockfd > fileno(fd)) ? (sockfd + 1) : (fileno(fd) + 1);

if (select(maxfd, &read_set, NULL, NULL, NULL) == -1) {
return;
}

if (FD_ISSET(sockfd, &read_set))
{
if (read(sockfd, recvline, MAXLINE) == 0) {
printf("service has exit");
return;
}
fputs(recvline, writefp);
memset(recvline, 0, MAXLINE);
}

if (FD_ISSET(fileno(fd), &read_set)) {
if (fgets(sendline, MAXLINE, fd) == NULL) {
return;
}
write(sockfd, sendline, strlen(sendline));
}

}
fclose(writefp);
}

在批量的情况下,客户端以网络能够接收的接收的最快速度持续发送请求。服务端以相同的速度处理并回应。那么此时TCP全双工管道数据类似下图
图片来自unix网络编程
引发bug的原因在于,客户端的数据发送完了之后,会立马关闭与服务端的连接。而此时服务端发送过来的数据可能有一部分在网络中,有一部分还没来得及发送。也就导致,客户端接收到的数据是不完整的。

这一张是wireshark抓到的包,可以看到客户端发送了FIN(蓝色)后,服务端还在向其push数据(红色),最后服务端当然只能收到RST了,而这些服务端返回的数据也就丢失了。

也可以对比一下读取的文件和写入的文件差异
读取的文件

写入的文件

解决bug

看了上面的表现之后,我们便可以找到引发bug的根本原因:客户端在发送完数据后就关闭了连接,并没有等数据全部接受完。

具体做法就是让客户端所有数据发送完之后,主动关闭写半部的链接,也就是主动发一个FIN到服务端,让其变成半关闭状态,此时可读,但不可写,通过shutdown函数可以实现这点。直到接收完从服务端发来的数据后,才退出进程。

shutdown

1
int shutdown(int sockfd, int howto);

howto 这个参数有三个值可选 SHUT_WR(1)SHUT_RD(0)SHUT_RDWR(2)

改善后的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void my_cli(FILE* fd, int sockfd) {
char sendline[MAXLINE] = {0};
char recvline[MAXLINE]= {0};

fd_set read_set;
int maxfd = 0;
FD_ZERO(&read_set);

FILE* writefp = fopen("out.txt", "w+");
bool fileeof = false;
while (true) {
FD_SET(sockfd, &read_set);

if (!shutwd) {
FD_SET(fileno(fd), &read_set);
}
maxfd = (sockfd > fileno(fd)) ? (sockfd + 1) : (fileno(fd) + 1);

if (select(maxfd, &read_set, NULL, NULL, NULL) == -1) {
return;
}

if (FD_ISSET(sockfd, &read_set))
{
if (read(sockfd, recvline, MAXLINE) == 0) {
if (fileeof) {
return; //正常结束
}
printf("service has exit");
return;
}
fputs(recvline, writefp);
memset(recvline, 0, MAXLINE);

}

if (FD_ISSET(fileno(fd), &read_set)) {
if (fgets(sendline, MAXLINE, fd) == NULL) {
fileeof = true;
shutdown(sockfd, SHUT_WR);
FD_CLR(fileno(fd), &read_set);
continue;
}
write(sockfd, sendline, strlen(sendline));
}

}
fclose(writefp);
}

在fgets读到EOF之后,关闭读半部连接,并让select不在检测读数据的文件描述符。直到从服务端读不到数据之后,才结束进程。
下面是fix之后Wireshark捕获到的数据包

用select重写服务端

之前服务端在监听到客户端的连接之后,是通过fork子进程的方式。这样在有大量连接的时候,会fork出大量的子进程。虽然Linux通过写时复制技术,使得fork出的子进程不会占用太多的页表空间,但在一些复杂的场景下,fork子进程终归不是一个好的方式。现在,就可以通过select来管理这些客户端的连接。
代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <errno.h>
#include <asm-generic/errno-base.h>
#include <signal.h>
#include <wait.h>
#include <signal.h>

static const int MAXLINE = 4096;
static const int MAXLISTEN = 1024;
static const int PORT = 45678;

int e_socket(int domain, int type, int protocol);
int e_bind(int sockfd, sockaddr_in *myaddr, socklen_t addrlen);
int e_listen(int sockfd, int backlog);
int e_accept(int sockfd, sockaddr_in *myaddr, socklen_t *addrlen);
void init_clientfd(int sockfd[], int socklen);
void init_sockaddr(sockaddr_in sockaddr);
int append_clientfd(int sockfd[], int socklen, int clientfd);
int handle_client_connect(int listensock, int sockfd[], int socklen, int *max_use_index);
void handle_client_data(fd_set *read_set, fd_set *all_set, int sockfds[], int use_count, int ready_count);

int e_socket(int domain, int type, int protocol)
{
int sockfd = socket(domain, type, protocol);
if (sockfd < 0)
{
exit(0);
}
return sockfd;
}

int e_bind(int sockfd, sockaddr_in *myaddr, socklen_t addrlen)
{
int ret = bind(sockfd, (sockaddr *)myaddr, addrlen);
if (ret != 0)
{
close(sockfd);
exit(0);
}
return ret;
}

int e_listen(int sockfd, int backlog)
{
int ret = listen(sockfd, backlog);
if (ret != 0)
{
close(sockfd);
exit(0);
}
return ret;
}

int e_accept(int sockfd, sockaddr_in *myaddr, socklen_t *addrlen)
{
int ret = accept(sockfd, (sockaddr *)myaddr, addrlen);
if (ret == -1)
{
printf("%d", errno);
exit(0);
}
return ret;
}

void init_clientfd(int sockfd[], int socklen)
{
for (int i = 0; i < socklen; ++i)
{
sockfd[i] = -1;
}
}

void init_sockaddr(sockaddr_in *sockaddr)
{
if (!sockaddr)
{
return;
}

bzero(sockaddr, sizeof(sockaddr_in));
sockaddr->sin_family = AF_INET;
sockaddr->sin_addr.s_addr = htonl(INADDR_ANY);
sockaddr->sin_port = htons(PORT);
}

int append_clientfd(int sockfd[], int socklen, int clientfd)
{
int i = 0;
for (i = 0; i < socklen; ++i)
{
if (sockfd[i] == -1)
{
sockfd[i] = clientfd;
break;
}
}
if (i == socklen)
{
return -1;
}

return i;
}

int handle_client_connect(int listensock, int sockfd[], int socklen, int *max_use_index)
{
sockaddr_in clientaddr;
socklen_t clientlen = sizeof(clientaddr);
int connfd = e_accept(listensock, &clientaddr, &clientlen);

int cur_use = 0;
if ((cur_use = append_clientfd(sockfd, socklen, connfd)) == -1)
{
return -1;
}

if (*max_use_index < cur_use)
{
*max_use_index = cur_use;
}

return connfd;
}

void handle_client_data(fd_set *read_set, fd_set *all_set, int sockfds[], int use_count, int ready_count)
{
char buf[MAXLINE] = {0};
for (int i = 0; i <= use_count; ++i) {
int sockfd = sockfds[i];
if (sockfd < 0) {
continue;
}

if (FD_ISSET(sockfd, read_set)) {
int read_len = read(sockfd, buf, MAXLISTEN);
if (read_len == 0) {
close(sockfd);
FD_CLR(sockfd, all_set);
sockfds[i] = -1;
} else {
write(sockfd, buf, read_len);
}

if (--ready_count <= 0) {
break;
}
}
}
}

int main(int argc, const char *argv[])
{

int listen_sock = e_socket(AF_INET, SOCK_STREAM, 0);

sockaddr_in servaddr;
init_sockaddr(&servaddr);

e_bind(listen_sock, &servaddr, sizeof(servaddr));
e_listen(listen_sock, MAXLISTEN);

fd_set all_set, read_set;
int clientfds[FD_SETSIZE];

init_clientfd(clientfds, FD_SETSIZE);

FD_ZERO(&all_set);
FD_ZERO(&read_set);
FD_SET(listen_sock, &all_set);

int maxfd = listen_sock + 1;
int readyfdcount = 0;
int client_use_count = 0;
while (true)
{
read_set = all_set;
readyfdcount = select(maxfd, &read_set, NULL, NULL, NULL);

if (FD_ISSET(listen_sock, &read_set))
{
int connfd = handle_client_connect(listen_sock, clientfds, FD_SETSIZE, &client_use_count);

if (connfd == -1)
{
continue;
}
FD_SET(connfd, &all_set);

if (maxfd <= connfd)
{
maxfd = connfd + 1;
}

if (--readyfdcount <= 0)
{
continue;
}
}

handle_client_data(&read_set, &all_set, clientfds, client_use_count, readyfdcount);
}
return 0;
}