节点类
相比较时间升序链表中的绝对时间expire
tw_timer采用的是相对时间的概念也就多出
rotation ,time_slot俩属性,rotation是转的“圈数”,time_slot是转到时间轮的哪一块区域。好比现在在1:05,过了65分钟时针走了闹钟的1rotation并走到10 time_slot;
tw_timer.h
//
// Created by adl on 2020/7/22.
//
#ifndef TEST2_TW_TIMER_H
#define TEST2_TW_TIMER_H
class client_data ;
class tw_timer {
public:
tw_timer(int rot,int ts);
public:
int rotation;
int time_slot;
void(*cb_func)(client_data*);
client_data*user_data;
tw_timer*next;
tw_timer*prev;
};
#endif //TEST2_TW_TIMER_H
tw_timer.cpp
//
// Created by adl on 2020/7/22.
//
#include "tw_timer.h"
tw_timer::tw_timer(int rot, int ts) :next(nullptr),prev(nullptr),rotation(rot),time_slot(ts){
}
client_data.h
//
// Created by adl on 2020/7/22.
//
#ifndef TEST2_CLIENT_DATA_H
#define TEST2_CLIENT_DATA_H
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <libgen.h>
#include <stdlib.h>
#include <errno.h>
#define BUFFER_SIZE 64
//class util_timer;
class tw_timer;
//class heap_timer;
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
union {
tw_timer *timer;
// util_timer *timer;
// heap_timer*timer3;
};
};
#endif //TEST2_CLIENT_DATA_H
time_wheel.cpp
可以注意到这个时间轮类的接口和时间升序链表的接口基本一致
//
// Created by adl on 2020/7/22.
//
#ifndef TEST2_TIME_WHEEL_H
#define TEST2_TIME_WHEEL_H
class tw_timer;
class time_wheel {
public:
time_wheel();
virtual ~time_wheel();
tw_timer *add_timer(int timeout);
void del_timer(tw_timer *timer);
void tick();
tw_timer *adjust_timer(tw_timer *timer, int timeout);
private:
static const int N= 60;//一圈60块区域
static const int SI =1;//SI可以想象成时针或者分钟每次转动的幅度
tw_timer*slots[N];
int cur_slot;//当前所在区域
};
#endif //TEST2_TIME_WHEEL_H
time_wheel.cpp
//
// Created by adl on 2020/7/22.
//
#include <cstdio>
#include "time_wheel.h"
#include "tw_timer.h"
time_wheel::~time_wheel() {
for (int i = 0; i < N; ++i) {
tw_timer *tmp = slots[i];
while (tmp) {
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
}
time_wheel::time_wheel() : cur_slot(0) {
for (int i = 0; i < N; ++i) {
slots[i] = nullptr;
}
}
tw_timer *time_wheel::add_timer(int timeout) {
if (timeout < 0) {
return nullptr;
}
int ticks = 0;
if (timeout < SI) {
//这个水表一次只能跳5小格,那当然定时3小格至少要时针跳一次
ticks = 1;
} else {
ticks = timeout / SI;//跳的次数
}
int rotation = ticks / N;//跳的圈数
int ts = (cur_slot + (ticks % N)) % N;//跳的最终位置(时间)
//这里它将timer在timer_wheel内部动态分配了,这也就导致了使用接口和时间升序链表有所区别,见后文
tw_timer *timer = new tw_timer(rotation, ts);
if (!slots[ts]) {
printf("ticks =%d ,timeout=%d\n",ticks,timeout);
printf("add timer,rotation is %d,ts is %d,cur_slot is %d\n", rotation, ts, cur_slot);
slots[ts] = timer;
} else {
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}
//这个adjust_timer函数其实书上没有,因为和时间升序链表的接口比较相似,我就把节点摘下来,再重新放到时间轮上=del_timer+add_timer,也就是用于客户在发送消息之后我们重新设置超时的需求
tw_timer *time_wheel::adjust_timer(tw_timer *timer, int timeout) {
if (timeout < 0) {
return nullptr;
}
int ots = timer->time_slot;
if (timer == slots[ots]) {
slots[ots] = slots[ots]->next;
if (slots[ots]) {
slots[ots]->prev = nullptr;
}
} else {
timer->prev->next = timer->next;
if (timer->next) {
timer->next->prev = timer->prev;
}
}
int ticks = 0;
if (timeout < SI) {
ticks = 1;
} else {
ticks = timeout / SI;
}
int rotation = ticks / N;
int ts = (cur_slot + (ticks % N)) % N;
timer->time_slot = ts;
timer->rotation = rotation;
if (!slots[ts]) {
printf("add timer,rotation is %d,ts is %d,cur_slot is %d\n", rotation, ts, cur_slot);
slots[ts] = timer;
} else {
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}
void time_wheel::del_timer(tw_timer *timer) {
if (!timer) {
return;
}
int ts = timer->time_slot;
if (timer == slots[ts]) {
slots[ts] = slots[ts]->next;
if (slots[ts]) {
slots[ts]->prev = nullptr;
}
delete timer;
} else {
timer->prev->next = timer->next;
if (timer->next) {
timer->next->prev = timer->prev;
}
delete timer;
}
}
void time_wheel::tick() {
tw_timer *tmp = slots[cur_slot];
printf("current slot is %d\n", cur_slot);
while (tmp) {
printf("tick the timer once\n");
//因为时针转过一圈才会回到相同的cur_slot,所以每调用tick且轮到相同的cur_slot时,将rotation--直到0为止
if (tmp->rotation > 0) {
tmp->rotation--;
tmp = tmp->next;//换到这个区域内链表的下一个节点,查看他它是否超时
} else {
//超时则调用函数,删除节点
tmp->cb_func(tmp->user_data);
if (tmp == slots[cur_slot]) {
printf("delete header in cur_slot\n");
slots[cur_slot] = tmp->next;
delete tmp;
if (slots[cur_slot]) {
slots[cur_slot]->prev = nullptr;
}
tmp = slots[cur_slot];
} else {
tmp->prev->next = tmp->next;
if (tmp->next) {
tmp->next->prev = tmp->prev;
}
tw_timer *tmp2 = tmp->next;
delete tmp;
tmp = tmp2;
}
}
}
//时针转1格
cur_slot = ++cur_slot % N;
}
Handleinactiveconnections.cpp
书上说让我们自己实现一下这个,但是时间升序链表中操作略有不同(没在用户空间动态分配节点类,并且直接使用相对时间在时间轮上添加新定时器,更为清晰方便)
//
// Created by adl on 2020/7/22.
//
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <libgen.h>
#include <stdlib.h>
#include <errno.h>
#include <iostream>
//#include "sort_timer_lst.h"
//#include "util_timer.h"
#include "client_data.h"
#include "tw_timer.h"
#include "time_wheel.h"
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 1
static int pipefd[2];
static time_wheel timeWheel;
//static sort_timer_lst timerLst;
static int epollfd = 0;
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
void addfd(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
send(pipefd[1], (char *) &msg, 1, 0);
errno = save_errno;
}
void addsig(int sig) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
void cb_func(client_data *user_data) {
assert(user_data);
epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
close(user_data->sockfd);
printf("close fd %d\n", user_data->sockfd);
}
void timer_handler() {
timeWheel.tick();
// timerLst.tick();
alarm(TIMESLOT);
}
int main(int argc, char *argv[]) {
if (argc <= 2) {
printf("usage: %s ip_address port_number \n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
int op=1;
setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&op,sizeof(op));
int ret = bind(listenfd, (struct sockaddr *) &address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd);
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
assert(ret != -1);
setnonblocking(pipefd[1]);
addfd(epollfd, pipefd[0]);
addsig(SIGALRM);
addsig(SIGTERM);
bool stop_server = false;
client_data *users = new client_data[FD_LIMIT];
bool timeout = false;
alarm(TIMESLOT);
while (!stop_server) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}
// std::cout<<number<<std::endl;
for (int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd;
for (int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *) &client_address, &client_addrlength);
addfd(epollfd, connfd);
users[connfd].address = client_address;
users[connfd].sockfd = connfd;
// util_timer *timer = new util_timer;
// tw_timer*timer =new tw_timer;
//我们无需在接口外分配节点类
// time_t cur = time(nullptr);
//此时超时时间也变成了相对时间
auto timer = timeWheel.add_timer(/*cur+*/3*TIMESLOT);
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
// timer->expire = cur + 3 * TIMESLOT;
users[connfd].timer = timer;
// timerLst.add_timer(timer);
} else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) {
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
continue;
} else if (ret == 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGALRM: {
timeout = true;
break;
}
case SIGTERM: {
stop_server = true;
}
}
}
}
} else if (events[i].events & EPOLLIN) {
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd);
tw_timer/*util_timer*/ *timer = users[sockfd].timer;
if (ret < 0) {
if (errno != EAGAIN) {
cb_func(&users[sockfd]);
if (timer) {
// timerLst.del_timer(timer);
timeWheel.del_timer(timer);
}
}
} else if (ret == 0) {
cb_func(&users[sockfd]);
if (timer) {
timeWheel.del_timer(timer);
// timerLst.del_timer(timer);
}
} else {
if (timer) {
// time_t cur = time(nullptr);
// timer->expire = cur + 3 * TIMESLOT;
timeWheel.adjust_timer(timer,/*cur +*/ 3 * TIMESLOT);
printf("adjust timer once\n");
// timer->time_slot;
// timerLst.adjust_timer(timer);
}
}
} else {
}
}
if (timeout) {
timer_handler();
timeout = false;
}
}
}
close(listenfd);
close(pipefd[1]);
close(pipefd[0]);
delete[]users;
}
实现效果和时间升序链表类似
思考:为什么用了时间轮就可以使用相对时间了而升序链表不行呢?
因为时间轮中将时间节点“哈希”到每一块时间区域,并将相对时间100转换成了rotation(1)+slot(40)也就是时钟上的小时和分钟,从而使用者可以根据时钟上位置的rotation,slot的变化来判断真实时间的变化,而升序链表无法得知过去的相对时间,便只能使用绝对时间比较了。当然时间升序链表也可以存储绝对时间,但并没这个必要。
注意的是无论时间升序链表还是时间轮,他们如果调用的超时回调函数会占用一段时间,那么如果有多个节点超时并需要删除,他们的真实删除时间其实不是定时器所定的时间,而是稍微晚一些。