-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy path9-4oneshot.cpp
163 lines (149 loc) · 4.59 KB
/
9-4oneshot.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
struct fds
{
int epollfd;
int sockfd;
};
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
// 将 fd 上的 EPOLLIN 和 EPOLLET 事件注册到 epollfd 指示的 epoll 内核事件表中
void addfd(int epollfd, int fd, bool oneshot)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
if (oneshot) // 指定是否注册 fd 上的 EPOLLONESHOT 事件
{
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
// 重置 fd 上的事件,这样操作之后,尽管 fd 上的 EPOLLONESHOT 事件被注册
// 但是操作系统仍然会触发 fd 上的 EPOLLIN 事件,且只触发一次
void reset_oneshot(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}
// 工作线程
void *worker(void *arg)
{
int sockfd = ((fds *)arg)->sockfd;
int epollfd = ((fds *)arg)->epollfd;
printf("start new thread to receive data on fd: %d\n", sockfd);
char buf[BUFFER_SIZE];
memset(buf, '\0', BUFFER_SIZE);
// 循环读取 sockfd 上的数据,直到遇到 EAGAIN 错误
while (1)
{
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret == 0)
{
close(sockfd);
printf("foreigner closed the connection\n");
break;
}
else if (ret < 0)
{
// 此时客户端的数据发送完毕,重置 EPOLL 事件,让其他线程有机会为该 socket 服务
if (errno == EAGAIN)
{
reset_oneshot(epollfd, sockfd);
printf("read later\n");
break;
}
}
else
{
printf("get content: %s\n", buf);
sleep(5); // 休眠 5s,模拟数据处理过程
}
}
printf("end thread receiving data on fd: %d\n", sockfd);
}
int main(int argc, char *argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
// 监听 socket listenfd 上是不能注册 EPOLLONESHOT 事件的,否则应用程序只能处理一个客户连接
// 因为后续的客户连接将不再触发 listenfd 上的 EPOLLIN 事件
addfd(epollfd, listenfd, false);
while (1)
{
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if (ret < 0)
{
printf("epoll failure\n");
break;
}
for (int i = 0; i < ret; i++)
{
int sockfd = events[i].data.fd;
if (sockfd == listenfd)
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
// 对每个非监听的 fd 都注册 EPOLLONESHOT 事件
addfd(epollfd, connfd, true);
}
else if (events[i].events & EPOLLIN)
{
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
// 启动一个工作线程为 sockfd 服务
pthread_create(&thread, NULL, worker, (void *)&fds_for_new_worker);
}
else
{
printf("something else happened \n");
}
}
}
close(listenfd);
return 0;
}