首先,如果你还不了解什么是带外数据:点这里
心跳机制的产生就是为了检测出对端主机或到对端的通信路径是否过早失效。
注意:在使用心跳机制时,你应该考虑是不是符合你所处的情景,确定在对端应答的时间超过 5~10s 之后终止连接是件好事还是坏事。如果你的产品需要实时的知道对端的“生存状态”,(要么是为了需求,要么是为了节省资源)那么就是需要这种机制的。一般用于 长连接 。
在这里,我们使用TCP
的带外数据来完成心跳机制的实现(每秒钟轮询一次,若5秒没有得到响应就认为对端已经“死亡”),实现如下所示 :
客户端每隔1秒钟向服务器发送一个带外字节,服务器收到该类型的字节然后再发送回一个带外字节。因为每一端都需要对端不复存在或者不再可达。需要指出的是:**数据,回送数据和带外字节都通过单个的连接交换的 。**代码实现如下,具体实现细节在代码中有注释指出
- Recvline 函数
#include "../myhead.h"
static int recv_cnt = 0;
static char *recv_ptr = NULL;
static char recv_buf[MAXLINE];
static ssize_t my_recv(int fd, char *ptr, int flags)
{
if (recv_cnt <= 0)
{
again:
if ((recv_cnt = recv(fd, recv_buf, sizeof(recv_buf), flags)) < 0)
{
if (errno == EINTR)
goto again;
else
return (-1);
}
else if (recv_cnt == 0)
{
return (0);
}
recv_ptr = recv_buf;
}
recv_cnt--;
*ptr = *recv_ptr++;
return (1);
}
ssize_t recvline(int fd, void *vptr, size_t maxlen, int flags)
{
ssize_t n, rc;
char c, *ptr;
ptr = vptr;
for (n = 1; n < maxlen; n++)
{
if ((rc = my_recv(fd, &c, flags)) == 1)
{
*ptr++ = c;
if (c == '\n')
break;
}
else if (rc == 0)
{
*ptr = 0;
return (n - 1);
}
else
return (-1);
}
*ptr = 0;
return (n);
}
ssize_t Recvline(int fd, void *buf, size_t Maxlen, int flags)
{ //注意参数 Maxlen
ssize_t n;
if ((n = recvline(fd, buf, Maxlen, flags)) < 0)
err_sys("recvline error ");
return (n);
}
将recv_buf
写做我们的一个缓冲区,recv_ptr
指向缓冲区下一个可被读取的字节,在my_recv
函数中复制给参数ptr
,表示读取到一个字节。recv_cnt
表示缓冲区剩余字节的数量 ,如果<=0
,就进行下一次的缓冲区读取。如果在需要一个字节一个字节的处理数据时,比起一个字节一个字节的读取,显然这样的方式是更有效率的 !!!
- 服务端主函数
serv_main.c
#include "../myhead.h"
#include "test.h"
void fun_serv(int connfd) //子进程运行函数
{
ssize_t n;
char line[MAXLINE];
heartbeat_serv(connfd, 1, 5);
for (;;)
{
if ((n = Recvline(connfd, line, MAXLINE, 0)) == 0)
{
printf("客 户 端 关 闭 啦 !!!\n");
Close(connfd);
return ;
}
Sendlen(connfd, line, n, 0);
}
}
int main(int argc, char **argv)
{
int listenfd, connfd;
pid_t childpid;
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
listenfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT); //9877
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (int *)&opt, sizeof(int));
Bind(listenfd, (SA *)&servaddr, sizeof(servaddr));
Listen(listenfd, LISTENQ);
for (;;)
{
clilen = sizeof(cliaddr);
if ((connfd = Accept(listenfd, (SA *)&cliaddr, &clilen)) < 0)
{
if (errno == EINTR)
continue;
else
err_sys("accept error ");
}
if ((childpid = Fork()) == 0)
{
Close(listenfd);
printf(" 新的连接:connfd == %d \n", connfd);
fun_serv(connfd);
exit(0);
}
Close(connfd);
}
}
在我们fork
之后,父子进程都会有一个监听套接字和一个连接套接字,另外,所有的套接字选项都会从监听套接字传承给连接套接字。因为我们只需要让子进程去处理客户端请求即可。所以我们在父进程中关闭连接套接字,让他只去负责处理客户连接。子进程中关闭监听套接字,让子进程去处理客户请求。
- 服务端设置心跳包函数
heartbeat_serv.c
#include "../myhead.h"
static int servfd;
static int nsec;
static int maxnalarms;
static int nprobes; //统计 SIGALRM 数量
static void sig_urg(int), sig_alrm(int); // alarm 函数的使用是为了轮询
void heartbeat_serv(int servfd_arg, int nsec_arg, int maxnalarms_arg) //fd 1 5
{
servfd = servfd_arg;
nsec = nsec_arg;
maxnalarms = maxnalarms_arg;
nprobes = 0;
signal(SIGURG, sig_urg);
Fcntl(servfd, F_SETOWN, getpid());
signal(SIGALRM, sig_alrm);
alarm(nsec);
}
static void sig_urg(int signo)
{
//printf("产生 SIGURG 信号 \n");
int n;
char ch;
if ((n = recv(servfd, &ch, 1, MSG_OOB)) < 0) //只要产生带外数据,就说明客户端主机是存活的
{
if (errno != EWOULDBLOCK)
err_sys("recv error");
}
else if (n > 0)
{
//printf("服务器接收到带外数据,说明客户端主机是存活的\n");
if (send(servfd, &ch, 1, MSG_OOB) > 0)
// printf("服务器发送了带外数据\n");
nprobes = 0;
}
return;
}
static void sig_alrm(int signo)
{
if (++nprobes > maxnalarms)
{
fprintf(stderr, " 此客户端 gg,服务器子进程关闭套接字并退出 \n");
Close(servfd);
exit(0);
}
alarm(nsec);
return;
}
客户端连接之后,由客户端主动发起OOB
数据心跳,然后在服务端产生SIGURG
信号,服务端处理该信号和OOB
数据,并回送。如果已经产生了SIGURG
信号,但是oob
数还没有到达,recv
会返回EWOULDBLOCK
错误。nprobes
用来统计时钟。
- 客户端主函数
cli_main.c
#include "../myhead.h"
#include "test.h"
void fun_client(FILE *fp, int sockfd)
{
int maxfdp1, stdineof = 0;
fd_set rset;
char recvline[MAXLINE], sendline[MAXLINE];
int n;
heartbeat_cli(sockfd, 1, 5); // 1.调用函数
FD_ZERO(&rset);
for (;;)
{
if (stdineof == 0)
FD_SET(fileno(fp), &rset);
FD_SET(sockfd, &rset);
maxfdp1 = max(fileno(fp), sockfd) + 1;
if ((n = select(maxfdp1, &rset, NULL, NULL, NULL)) < 0)
{
if (errno == EINTR) // 2.处理 select
continue;
else
err_sys("select error");
}
if (FD_ISSET(sockfd, &rset))
{
if ((n = Recvline(sockfd, recvline, MAXLINE, 0)) == 0) //读取一行,Readline 返回读取到的字节数
{
if (stdineof == 1)
return;
else
err_quit("fun_client:server terminated permaturely ");
}
write(STDOUT_FILENO, recvline, n); //3.调用 write 函数,而不是 fputs,见`UNP`
}
if (FD_ISSET(fileno(fp), &rset)) //可以进行输入了
{
if (Fgets(sendline, MAXLINE, fp) == NULL)
{
stdineof = 1;
shutdown(sockfd, SHUT_WR);
FD_CLR(fileno(fp), &rset); // 在文件描述符集合中删除一个文件描述符。
continue;
}
Sendlen(sockfd, sendline, strlen(sendline), 0);
}
}
}
int main(int argc, char **argv)
{
int sockfd;
struct sockaddr_in servaddr;
if (argc != 2)
{
printf("usage: tcpcli <IPaddress>\n");
return -1;
}
sockfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT); // 9877
inet_pton(AF_INET, argv[1], &servaddr.sin_addr);
connect(sockfd, (SA *)&servaddr, sizeof(servaddr));
fun_client(stdin, sockfd);
exit(0);
}
使用select
来监听两个套接字,一个是连接套接字,一个是标准输入。stdeof=0
,说明可以输入。如果输入结束,我们将stdeof
设为1,关闭对sockfd
的任何写操作,删除select
对应位。EINTR:当慢系统调用被信号打断时,慢系统调用会返回它。对比于下面一种简单的写法,他为什么能处理服务器崩溃的情况?因为在这里,当读到EOF
时,判断了是否是正常结束(stdeof=1
正常,否则服务器过早终止server terminated permaturely
)
- 客户端设置心跳包函数
heartbeat_cli.c
#include "../myhead.h"
#include "test.h"
static int servfd;
static int nsec;
static int maxnprobes;
static int nprobes; //统计产生信号`SIGALRM`的数量
static void sig_urg(int), sig_alrm(int); // alarm 函数的使用是为了轮询
void heartbeat_cli(int servfd_arg, int nsec_arg, int maxnprobes_arg) //fd 1 5
{
//sleep(6);
servfd = servfd_arg;
nsec = nsec_arg;
maxnprobes = maxnprobes_arg;
nprobes = 0;
signal(SIGURG, sig_urg);
Fcntl(servfd, F_SETOWN, getpid());
signal(SIGALRM, sig_alrm);
alarm(nsec);
}
static void sig_urg(int signo)
{
//printf("产生 SIGURG 信号 \n");
int n;
char ch;
if ((n = recv(servfd, &ch, 1, MSG_OOB)) < 0) //只要产生带外数据,就说明服务器主机是存活的
{
if (errno != EWOULDBLOCK)
err_sys("recv error");
}
else if (n > 0)
{
//printf("客户端接收到带外数据,说明服务器主机是存活的\n");
nprobes = 0;
}
return;
}
static void sig_alrm(int signo)
{
if (++nprobes > maxnprobes)
{
fprintf(stderr, "此服务器gg,客户端关闭套接字并退出 \n");
Close(servfd);
exit(0);
}
if( send(servfd, "1", 1, MSG_OOB) > 0 )
//printf("客户端发送了带外数据\n");
alarm(nsec);
return;
}
测试:
-
是否正确运行
-
服务器超时响应:
-
回射服务器正常运行:
- 某一端崩溃:
具体源码见我的github
:源码
讨论:1. 为什么不选用TCP 保持存活特性(SO_KEEPLIVE)来提供这种功能?
两个原因:
SO_KEEPALIVE
选项默认是闲置2小时,发送保持存活检测段。可以改动时间,但是改动之后会影响所有的开启该选项的套接字。SO_KEEPALIVE
也不是用来高频率的轮询的 。
2. Recvline 函数有什么缺点?静态数据有什么缺点?
使用静态数据会使得recvline
函数变得非线程安全了 。持续更新----------》
3. 像下面一样写,当服务器崩溃时,客户端会有什么效果?
void fun_client00000(int connfd)
{
char sendline[MAXLINE] = {0};
char recvline[MAXLINE] = {0};
int tt = 0;
while (Fgets(sendline, MAXLINE, stdin) != NULL)
{
tt = Sendlen(connfd, sendline, strlen(sendline), 0);
if ( Recvline(connfd, recvline, sizeof(recvline), 0) == 0)
{
printf("服 务 器 关 闭 连 接 退 出 \n");
Close(connfd);
return;
}
Fputs(recvline, stdout);
}
}
int main(int argc, char **argv)
{
int sockfd;
struct sockaddr_in servaddr;
if (argc != 2)
{
printf("usage: tcpcli <IPaddress>\n");
return -1;
}
sockfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT); // 9877
inet_pton(AF_INET, argv[1], &servaddr.sin_addr);
connect(sockfd, (SA *)&servaddr, sizeof(servaddr));
fun_client00000(sockfd);
exit(0);
}
客户端首先阻塞于Fgets
,然后阻塞于Recvline
,如果一旦服务端崩溃,客户端的Recvline
返回0,打印输出信息。但是,这里遇到的一个问题就是“当服务器崩溃时,只有按了回车键才会输出打印信息”,还没有搞清楚这是为什么? ~_~