From 45b5ab4e4627d764fce1d5cd4f98a1cf85c37688 Mon Sep 17 00:00:00 2001 From: zwh <2532175300@qq.com> Date: Sat, 10 Feb 2024 16:22:12 +0000 Subject: [PATCH 1/3] fix: blockqueue pop timeout bug --- log/block_queue.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/log/block_queue.h b/log/block_queue.h index 34c77bd5..c17eaeb3 100644 --- a/log/block_queue.h +++ b/log/block_queue.h @@ -176,8 +176,9 @@ class block_queue m_mutex.lock(); if (m_size <= 0) { - t.tv_sec = now.tv_sec + ms_timeout / 1000; - t.tv_nsec = (ms_timeout % 1000) * 1000; + long micro_timeout = ms_timeout * 1000 + now.tv_usec; + t.tv_sec = now.tv_sec + micro_timeout / 1e6; + t.tv_nsec = (micro_timeout % 1e6) * 1000; if (!m_cond.timewait(m_mutex.get(), t)) { m_mutex.unlock(); From 6566e2fa3a303f89285bc3ee1807cbde25ade4fe Mon Sep 17 00:00:00 2001 From: zwh <2532175300@qq.com> Date: Sat, 2 Mar 2024 04:55:11 +0000 Subject: [PATCH 2/3] fix: blockqueue constant --- log/block_queue.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/log/block_queue.h b/log/block_queue.h index c17eaeb3..05c7f4bb 100644 --- a/log/block_queue.h +++ b/log/block_queue.h @@ -177,8 +177,8 @@ class block_queue if (m_size <= 0) { long micro_timeout = ms_timeout * 1000 + now.tv_usec; - t.tv_sec = now.tv_sec + micro_timeout / 1e6; - t.tv_nsec = (micro_timeout % 1e6) * 1000; + t.tv_sec = now.tv_sec + micro_timeout / 1000000; + t.tv_nsec = (micro_timeout % 1000000) * 1000; if (!m_cond.timewait(m_mutex.get(), t)) { m_mutex.unlock(); From 7ae94d332a927a1284e1e8ca3a60dc8d7e987227 Mon Sep 17 00:00:00 2001 From: zwh <2532175300@qq.com> Date: Sat, 2 Mar 2024 04:55:58 +0000 Subject: [PATCH 3/3] feat:add time_wheel_timer --- timer/README.md | 1 + timer/time_wheel_timer.cpp | 220 +++++++++++++++++++++++++++++++++++++ timer/time_wheel_timer.h | 112 +++++++++++++++++++ 3 files changed, 333 insertions(+) create mode 100644 timer/time_wheel_timer.cpp create mode 100644 timer/time_wheel_timer.h diff --git a/timer/README.md b/timer/README.md index ad1f7b81..551f4af9 100644 --- a/timer/README.md +++ b/timer/README.md @@ -4,4 +4,5 @@ 由于非活跃连接占用了连接资源,严重影响服务器的性能,通过实现一个服务器定时器,处理这种非活跃连接,释放连接资源。利用alarm函数周期性地触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务. > * 统一事件源 > * 基于升序链表的定时器 +> * 基于时间轮的定时器 > * 处理非活动连接 diff --git a/timer/time_wheel_timer.cpp b/timer/time_wheel_timer.cpp new file mode 100644 index 00000000..9f7c13eb --- /dev/null +++ b/timer/time_wheel_timer.cpp @@ -0,0 +1,220 @@ +#include "time_wheel_timer.h" +#include "../http/http_conn.h" + +time_wheel::time_wheel() : cur_slot(0) +{ + for(int i = 0; i < N; ++i) + { + slots[i] = NULL; + } +} +time_wheel::~time_wheel() +{ + for(int i = 0; i < N; ++i) + { + util_timer *tmp = slots[i]; + while (tmp) + { + slots[i] = tmp->next; + delete tmp; + tmp = slots[i]; + } + } + +} + +void time_wheel::add_timer(util_timer *timer) +{ + if (!timer) + { + return; + } + switch_time(timer); + int ts = timer->time_slot; + if(!slots[ts]) + { + slots[ts] = timer; + } + else + { + timer->next = slots[ts]; + slots[ts]->prev = timer; + slots[ts] = timer; + } +} + +void time_wheel::adjust_timer(util_timer *timer) +{ + if (!timer) + { + return; + } + int ots = timer->time_slot; + switch_time(timer); + if(timer->time_slot == ots) + { + return; + } + if(slots[ots] == timer) + { + slots[ots] = timer->next; + if(slots[ots]) + { + slots[ots]->prev = NULL; + } + } + else + { + timer->prev->next = timer->next; + if(timer->next) + { + timer->next->prev = timer->prev; + } + timer->prev = NULL; + } + timer->next = NULL; + add_timer(timer); +} +void time_wheel::del_timer(util_timer *timer) +{ + if (!timer) + { + return; + } + int ts = timer->time_slot; + if(timer == slots[ts]) + { + slots[ts] = slots[ts]->next; + if(slots[ts]) + { + slots[ts]->prev = NULL; + } + } + else + { + timer->prev->next = timer->next; + if(timer->next) + { + timer->next->prev = timer->prev; + } + } + delete timer; +} +void time_wheel::tick() +{ + + for(int i = 0; i < timeslot / SI; i++) + { + util_timer *tmp = slots[cur_slot]; + while(tmp) + { + if(tmp->rotation > 0) + { + tmp->rotation--; + tmp = tmp->next; + } + else + { + tmp->cb_func(tmp->user_data); + util_timer *tmp2 = tmp->next; + del_timer(tmp); + tmp = tmp2; + } + } + cur_slot = ++cur_slot % N; + } +} + +void time_wheel::set_timeslot(int timeslot) +{ + this->timeslot = timeslot; +} + +void time_wheel::switch_time(util_timer *timer) +{ + time_t cur = time(NULL); + int timeout = timer->expire - cur; + if(timeout < 0)timeout = 0; + timeout /= SI; + timer->rotation = timeout / N; + timer->time_slot = (cur_slot + (timeout % N)) % N; +} + +void Utils::init(int timeslot) +{ + m_TIMESLOT = timeslot; + m_timer_lst.set_timeslot(timeslot); +} + +//对文件描述符设置非阻塞 +int Utils::setnonblocking(int fd) +{ + int old_option = fcntl(fd, F_GETFL); + int new_option = old_option | O_NONBLOCK; + fcntl(fd, F_SETFL, new_option); + return old_option; +} + +//将内核事件表注册读事件,ET模式,选择开启EPOLLONESHOT +void Utils::addfd(int epollfd, int fd, bool one_shot, int TRIGMode) +{ + epoll_event event; + event.data.fd = fd; + + if (1 == TRIGMode) + event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; + else + event.events = EPOLLIN | EPOLLRDHUP; + + if (one_shot) + event.events |= EPOLLONESHOT; + epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); + setnonblocking(fd); +} + +//信号处理函数 +void Utils::sig_handler(int sig) +{ + //为保证函数的可重入性,保留原来的errno + int save_errno = errno; + int msg = sig; + send(u_pipefd[1], (char *)&msg, 1, 0); + errno = save_errno; +} + +//设置信号函数 +void Utils::addsig(int sig, void(handler)(int), bool restart) +{ + struct sigaction sa; + memset(&sa, '\0', sizeof(sa)); + sa.sa_handler = handler; + if (restart) + sa.sa_flags |= SA_RESTART; + sigfillset(&sa.sa_mask); + assert(sigaction(sig, &sa, NULL) != -1); +} + +//定时处理任务,重新定时以不断触发SIGALRM信号 +void Utils::timer_handler() +{ + m_timer_lst.tick(); + alarm(m_TIMESLOT); +} + +void Utils::show_error(int connfd, const char *info) +{ + send(connfd, info, strlen(info), 0); + close(connfd); +} + +int *Utils::u_pipefd = 0; +int Utils::u_epollfd = 0; + +class Utils; +void cb_func(client_data *user_data) +{ + epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); + assert(user_data); + close(user_data->sockfd); + http_conn::m_user_count--; +} diff --git a/timer/time_wheel_timer.h b/timer/time_wheel_timer.h new file mode 100644 index 00000000..6e0b44b2 --- /dev/null +++ b/timer/time_wheel_timer.h @@ -0,0 +1,112 @@ +#ifndef TIME_WHEEL_TIMER_H +#define TIME_WHEEL_TIMER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "../log/log.h" + +class util_timer; + +struct client_data +{ + sockaddr_in address; + int sockfd; + util_timer *timer; +}; + +class util_timer +{ +public: + util_timer() : prev(NULL), next(NULL) {} + +public: + time_t expire; + int rotation; + int time_slot; + + void (* cb_func)(client_data *); + client_data *user_data; + util_timer *prev; + util_timer *next; +}; + +class time_wheel +{ +public: + time_wheel(); + ~time_wheel(); + + void add_timer(util_timer *timer); + void adjust_timer(util_timer *timer); + void del_timer(util_timer *timer); + void tick(); + void set_timeslot(int timeslot); + +private: + void switch_time(util_timer *timer); + /* 时间轮上槽的数目 */ + static const int N = 60; + /* 槽的间隔 1 s */ + static const int SI = 1; + /* 每次转动的秒数 */ + int timeslot; + util_timer *slots[N]; + int cur_slot; +}; + + + +class Utils +{ +public: + Utils() {} + ~Utils() {} + + void init(int timeslot); + + //对文件描述符设置非阻塞 + int setnonblocking(int fd); + + //将内核事件表注册读事件,ET模式,选择开启EPOLLONESHOT + void addfd(int epollfd, int fd, bool one_shot, int TRIGMode); + + //信号处理函数 + static void sig_handler(int sig); + + //设置信号函数 + void addsig(int sig, void(handler)(int), bool restart = true); + + //定时处理任务,重新定时以不断触发SIGALRM信号 + void timer_handler(); + + void show_error(int connfd, const char *info); + +public: + static int *u_pipefd; + time_wheel m_timer_lst; + static int u_epollfd; + int m_TIMESLOT; +}; + +void cb_func(client_data *user_data); + +#endif // TIME_WHEEL_TIMER_H \ No newline at end of file