Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: blockqueue pop timeout bug #259

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions log/block_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ class block_queue
m_mutex.lock();
if (m_size <= 0)
{
t.tv_sec = now.tv_sec + ms_timeout / 1000;
t.tv_nsec = (ms_timeout % 1000) * 1000;
long micro_timeout = ms_timeout * 1000 + now.tv_usec;
t.tv_sec = now.tv_sec + micro_timeout / 1000000;
t.tv_nsec = (micro_timeout % 1000000) * 1000;
if (!m_cond.timewait(m_mutex.get(), t))
{
m_mutex.unlock();
Expand Down
1 change: 1 addition & 0 deletions timer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
由于非活跃连接占用了连接资源,严重影响服务器的性能,通过实现一个服务器定时器,处理这种非活跃连接,释放连接资源。利用alarm函数周期性地触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务.
> * 统一事件源
> * 基于升序链表的定时器
> * 基于时间轮的定时器
> * 处理非活动连接
220 changes: 220 additions & 0 deletions timer/time_wheel_timer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
#include "time_wheel_timer.h"
#include "../http/http_conn.h"

time_wheel::time_wheel() : cur_slot(0)
{
for(int i = 0; i < N; ++i)
{
slots[i] = NULL;
}
}
time_wheel::~time_wheel()
{
for(int i = 0; i < N; ++i)
{
util_timer *tmp = slots[i];
while (tmp)
{
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}

}

void time_wheel::add_timer(util_timer *timer)
{
if (!timer)
{
return;
}
switch_time(timer);
int ts = timer->time_slot;
if(!slots[ts])
{
slots[ts] = timer;
}
else
{
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
}

void time_wheel::adjust_timer(util_timer *timer)
{
if (!timer)
{
return;
}
int ots = timer->time_slot;
switch_time(timer);
if(timer->time_slot == ots)
{
return;
}
if(slots[ots] == timer)
{
slots[ots] = timer->next;
if(slots[ots])
{
slots[ots]->prev = NULL;
}
}
else
{
timer->prev->next = timer->next;
if(timer->next)
{
timer->next->prev = timer->prev;
}
timer->prev = NULL;
}
timer->next = NULL;
add_timer(timer);
}
void time_wheel::del_timer(util_timer *timer)
{
if (!timer)
{
return;
}
int ts = timer->time_slot;
if(timer == slots[ts])
{
slots[ts] = slots[ts]->next;
if(slots[ts])
{
slots[ts]->prev = NULL;
}
}
else
{
timer->prev->next = timer->next;
if(timer->next)
{
timer->next->prev = timer->prev;
}
}
delete timer;
}
void time_wheel::tick()
{

for(int i = 0; i < timeslot / SI; i++)
{
util_timer *tmp = slots[cur_slot];
while(tmp)
{
if(tmp->rotation > 0)
{
tmp->rotation--;
tmp = tmp->next;
}
else
{
tmp->cb_func(tmp->user_data);
util_timer *tmp2 = tmp->next;
del_timer(tmp);
tmp = tmp2;
}
}
cur_slot = ++cur_slot % N;
}
}

void time_wheel::set_timeslot(int timeslot)
{
this->timeslot = timeslot;
}

void time_wheel::switch_time(util_timer *timer)
{
time_t cur = time(NULL);
int timeout = timer->expire - cur;
if(timeout < 0)timeout = 0;
timeout /= SI;
timer->rotation = timeout / N;
timer->time_slot = (cur_slot + (timeout % N)) % N;
}

void Utils::init(int timeslot)
{
m_TIMESLOT = timeslot;
m_timer_lst.set_timeslot(timeslot);
}

//对文件描述符设置非阻塞
int Utils::setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

//将内核事件表注册读事件,ET模式,选择开启EPOLLONESHOT
void Utils::addfd(int epollfd, int fd, bool one_shot, int TRIGMode)
{
epoll_event event;
event.data.fd = fd;

if (1 == TRIGMode)
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
else
event.events = EPOLLIN | EPOLLRDHUP;

if (one_shot)
event.events |= EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

//信号处理函数
void Utils::sig_handler(int sig)
{
//为保证函数的可重入性,保留原来的errno
int save_errno = errno;
int msg = sig;
send(u_pipefd[1], (char *)&msg, 1, 0);
errno = save_errno;
}

//设置信号函数
void Utils::addsig(int sig, void(handler)(int), bool restart)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart)
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}

//定时处理任务,重新定时以不断触发SIGALRM信号
void Utils::timer_handler()
{
m_timer_lst.tick();
alarm(m_TIMESLOT);
}

void Utils::show_error(int connfd, const char *info)
{
send(connfd, info, strlen(info), 0);
close(connfd);
}

int *Utils::u_pipefd = 0;
int Utils::u_epollfd = 0;

class Utils;
void cb_func(client_data *user_data)
{
epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
close(user_data->sockfd);
http_conn::m_user_count--;
}
112 changes: 112 additions & 0 deletions timer/time_wheel_timer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#ifndef TIME_WHEEL_TIMER_H
#define TIME_WHEEL_TIMER_H

#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <stdarg.h>
#include <errno.h>
#include <sys/wait.h>
#include <sys/uio.h>

#include <time.h>
#include "../log/log.h"

class util_timer;

struct client_data
{
sockaddr_in address;
int sockfd;
util_timer *timer;
};

class util_timer
{
public:
util_timer() : prev(NULL), next(NULL) {}

public:
time_t expire;
int rotation;
int time_slot;

void (* cb_func)(client_data *);
client_data *user_data;
util_timer *prev;
util_timer *next;
};

class time_wheel
{
public:
time_wheel();
~time_wheel();

void add_timer(util_timer *timer);
void adjust_timer(util_timer *timer);
void del_timer(util_timer *timer);
void tick();
void set_timeslot(int timeslot);

private:
void switch_time(util_timer *timer);
/* 时间轮上槽的数目 */
static const int N = 60;
/* 槽的间隔 1 s */
static const int SI = 1;
/* 每次转动的秒数 */
int timeslot;
util_timer *slots[N];
int cur_slot;
};



class Utils
{
public:
Utils() {}
~Utils() {}

void init(int timeslot);

//对文件描述符设置非阻塞
int setnonblocking(int fd);

//将内核事件表注册读事件,ET模式,选择开启EPOLLONESHOT
void addfd(int epollfd, int fd, bool one_shot, int TRIGMode);

//信号处理函数
static void sig_handler(int sig);

//设置信号函数
void addsig(int sig, void(handler)(int), bool restart = true);

//定时处理任务,重新定时以不断触发SIGALRM信号
void timer_handler();

void show_error(int connfd, const char *info);

public:
static int *u_pipefd;
time_wheel m_timer_lst;
static int u_epollfd;
int m_TIMESLOT;
};

void cb_func(client_data *user_data);

#endif // TIME_WHEEL_TIMER_H