admin 管理员组

文章数量: 887031

定时器

时间轮:
时间论定时器示意图

如图所示的时间轮内,实线(指针)指向轮子上的一个槽(slot)。它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针指向的槽),每次转动称为一个滴答(tick)。一个滴答的时间称为时间论的槽间隔si(slot interval),它实际上是心搏时间。该时间共有N个槽,因此他运转一周的时间是N(N=7)si。每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差Nsi的整数倍。时间轮正是利用这个关系将定时器散列到不同的链表中。加入现在指针指向槽cs,我们要添加一个定时时间ti的定时器,则该定时器将被插入槽ts(time slot)对应的链表中:ts=(cs+(ti/si))%N.
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插入的效率随着定时器数目的增多而降低。而时间论正是使用哈希表的思想,将定时器散落到不同的链表上。这样每条链表上的定时器数目都明显少于原来的定时器数目,插入操作的效率基本不受定时器数目的影响。
很显然,对时间轮而言,要提高定时精度,就要使si值足够小;要提高执行效率,就要N值足够大。
图所示的是一种简单的时间论,因为他只有一个轮子。而复杂的时间论可能有多个轮子,不同的轮子拥有不同的粒度。相邻的两个轮子,精度高的转一圈,精度低的仅往前移动一槽,就像水表一样。如下是时间轮的代码

tw_timer.h

#ifndef TIME_WHEEL_TIMER
#define TIME_WHEEL_TIMER#include <time.h>
#include <netinet/in.h>
#include <stdio.h>#define BUFFER_SIZE 64
class tw_timer;
struct client_data
{sockaddr_in address;int sockfd;char buf[ BUFFER_SIZE ];tw_timer* timer;
};class tw_timer
{
public:tw_timer( int rot, int ts ) : next( NULL ), prev( NULL ), rotation( rot ), time_slot( ts ){}public:int rotation;int time_slot;void (*cb_func)( client_data* );client_data* user_data;tw_timer* next;tw_timer* prev;
};class time_wheel
{
public:time_wheel() : cur_slot( 0 ){for( int i = 0; i < N; ++i ){slots[i] = NULL;}}~time_wheel(){for( int i = 0; i < N; ++i ){tw_timer* tmp = slots[i];while( tmp ){slots[i] = tmp->next;delete tmp;tmp = slots[i];}}}tw_timer* add_timer( int timeout ){if( timeout < 0 ){return NULL;}int ticks = 0;if( timeout < TI ){ticks = 1;}else{ticks = timeout / TI;}int rotation = ticks / N;int ts = ( cur_slot + ( ticks % N ) ) % N;tw_timer* timer = new tw_timer( rotation, ts );if( !slots[ts] ){printf( "add timer, rotation is %d, ts is %d, cur_slot is %d\n", rotation, ts, cur_slot );slots[ts] = timer;}else{timer->next = slots[ts];slots[ts]->prev = timer;slots[ts] = timer;}return timer;}void del_timer( tw_timer* timer ){if( !timer ){return;}int ts = timer->time_slot;if( timer == slots[ts] ){slots[ts] = slots[ts]->next;if( slots[ts] ){slots[ts]->prev = NULL;}delete timer;}else{timer->prev->next = timer->next;if( timer->next ){timer->next->prev = timer->prev;}delete timer;}}void tick(){tw_timer* tmp = slots[cur_slot];printf( "current slot is %d\n", cur_slot );while( tmp ){printf( "tick the timer once\n" );if( tmp->rotation > 0 ){tmp->rotation--;tmp = tmp->next;}else{tmp->cb_func( tmp->user_data );if( tmp == slots[cur_slot] ){printf( "delete header in cur_slot\n" );slots[cur_slot] = tmp->next;delete tmp;if( slots[cur_slot] ){slots[cur_slot]->prev = NULL;}tmp = slots[cur_slot];}else{tmp->prev->next = tmp->next;if( tmp->next ){tmp->next->prev = tmp->prev;}tw_timer* tmp2 = tmp->next;delete tmp;tmp = tmp2;}}}cur_slot = ++cur_slot % N;}private:static const int N = 4;static const int TI = 1; tw_timer* slots[N];int cur_slot;
};#endif

登录一个连接main.cpp

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include "tw_timer.h"#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 60static int pipefd[2];
static time_wheel timer_lst;
static int epollfd = 0;
int to=2;
int setnonblocking( int fd )
{int old_option = fcntl( fd, F_GETFL );int new_option = old_option | O_NONBLOCK;fcntl( fd, F_SETFL, new_option );return old_option;
}void addfd( int epollfd, int fd )
{epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );setnonblocking( fd );
}void sig_handler( int sig )
{int save_errno = errno;int msg = sig;send( pipefd[1], ( char* )&msg, 1, 0 );errno = save_errno;
}
//就是如果有信号,就会触发这个函数
void addsig( int sig )
{struct sigaction sa;memset( &sa, '\0', sizeof( sa ) );sa.sa_handler = sig_handler;sa.sa_flags |= SA_RESTART;//SA_RESTART如果信号中断了进程的某个系统调用,则系统自动重启该系统调用sigfillset( &sa.sa_mask );assert( sigaction( sig, &sa, NULL ) != -1 );
}void timer_handler()
{timer_lst.tick();alarm( TIMESLOT );
}void cb_func( client_data* user_data )
{epoll_ctl( epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0 );assert( user_data );close( user_data->sockfd );printf( "close fd %d\n", user_data->sockfd );
}int main( int argc, char* argv[] )
{if( argc <= 2 ){printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );return 1;}const char* ip = argv[1];int port = atoi( argv[2] );int ret = 0;struct sockaddr_in address;bzero( &address, sizeof( address ) );address.sin_family = AF_INET;//inet_pton( AF_INET, ip, &address.sin_addr );address.sin_addr.s_addr = htonl(INADDR_ANY);address.sin_port = htons( port );int listenfd = socket( PF_INET, SOCK_STREAM, 0 );assert( listenfd >= 0 );ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );assert( ret != -1 );ret = listen( listenfd, 5 );assert( ret != -1 );epoll_event events[ MAX_EVENT_NUMBER ];int epollfd = epoll_create( 5 );assert( epollfd != -1 );addfd( epollfd, listenfd );ret = socketpair( PF_UNIX, SOCK_STREAM, 0, pipefd );assert( ret != -1 );setnonblocking( pipefd[1] );addfd( epollfd, pipefd[0] );// add all the interesting signals hereaddsig( SIGALRM );addsig( SIGTERM );bool stop_server = false;client_data* users = new client_data[FD_LIMIT]; bool timeout = false;alarm( TIMESLOT );while( !stop_server ){int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number < 0 ) && ( errno != EINTR ) ){printf( "epoll failure\n" );break;}for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;if( sockfd == listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );addfd( epollfd, connfd );users[connfd].address = client_address;users[connfd].sockfd = connfd;//记录定时器再时间轮转8圈后生效,记录定时器属于第四个槽//因为我不知道怎么设置,只能先尝试,之后再改改//tw_timer* timer = new tw_timer(rot,ts);// timer->user_data = &users[connfd];// timer->cb_func = cb_func;// time_t cur = time( NULL );// time_t timeout = cur + 3 * TIMESLOT;//users[connfd].timer = timer;tw_timer* timer=timer_lst.add_timer( to++ );timer->user_data = &users[connfd];timer->cb_func = cb_func;users[connfd].timer = timer;}else if( ( sockfd == pipefd[0] ) && ( events[i].events & EPOLLIN ) ){int sig;char signals[1024];ret = recv( pipefd[0], signals, sizeof( signals ), 0 );if( ret == -1 ){// handle the errorcontinue;}else if( ret == 0 ){continue;}else{for( int i = 0; i < ret; ++i ){switch( signals[i] ){case SIGALRM:{timeout = true;break;}case SIGTERM:{stop_server = true;}}}}}else if(  events[i].events & EPOLLIN ){memset( users[sockfd].buf, '\0', BUFFER_SIZE );ret = recv( sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0 );printf( "get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd );tw_timer* timer = users[sockfd].timer;if( ret < 0 ){if( errno != EAGAIN ){cb_func( &users[sockfd] );if( timer ){timer_lst.del_timer( timer );}}}else if( ret == 0 ){cb_func( &users[sockfd] );if( timer ){timer_lst.del_timer( timer );}}else{send( sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0 );if( timer ){time_t cur = time( NULL );//timer->expire = cur + 3 * TIMESLOT;printf( "adjust timer once\n" );//timer_lst.adjust_timer( timer );}}}else{// others}}//timeout=true;if( timeout ){timer_handler();timeout = false;}}close( listenfd );close( pipefd[1] );close( pipefd[0] );delete [] users;return 0;
}

本文标签: 定时器