前言
服务端而言,对于每一个新的连接我们都需要去保存其基本信息,如ip地址,套接字fd,也需要赋予其唯一标识如用户名。
这里,我们来谈谈对用户连接的封装。
用户连接需要哪些数据
1. 套接字描述符 sockfd
执行读写操作时当然不可缺
2. 连接信息 sockaddr
基本信息的保存
3. 用户缓存区 Buffer
非阻塞读写不可缺
4. 唯一标识
服务端进行消息转发时不可能根据sockfd去进行查找,所以我们需要对每一个连接有一个唯一标识,用于对其进行检索
关于Buffer细节可看Epoll-ET模式下非阻塞读写之Buffer的封装
用户连接需要哪些操作
1. 读
epoll-Et模式下,读必须将缓冲区全部读完,否则不会再次触发
2. 写
epoll-Et模式下,写必须将所写数据全部写入或是写到缓冲区满,否则不会再次触发
3. 断开
断开时,如果是异步读写,需要需要确认写事件已全部完成,再关闭套接字
4. 初始化
保证连接可复用,否则每来一个连接,都要开辟空间,太过低效。
关于读写的细节
本身 send和recv都是线程安全的。
因为读操作,我们可以确定只会有一个线程来对读操作进行处理(通过设置EPOLLONESHOT),也就是说对读缓冲区的写入(接受数据)和取出(处理数据)都是在一个线程内,所以读缓冲区本质上是线程安全的。
但是写操作,写缓冲区可能会有多个线程对其进行写入和更改,所以我们用一个原子变量,来标志写缓冲区是否被操作,来保证数据读写不会冲突。
代码
本代码数据处理只是简单的进行转发到原本socket里
//
// Connection.h
// QuoridorServer
//
// Created by shiyi on 2016/12/2.
// Copyright © 2016年 shiyi. All rights reserved.
//
#ifndef Connection_H
#define Connection_H
#include <stdio.h>
#include <iostream>
#include <atomic>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <assert.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <errno.h>
#include "Buffer.h"
#include "Util.h"
#include "BaseFunc.h"
const size_t BUFFER_SIZE = 65535;
class Connection
{
public:
Connection() : m_writeing(true), m_epollfd(-1), m_sockfd(-1)
{}
~Connection(){}
//初始化连接
void init(int epollfd, int sockfd, const sockaddr_in& client_addr)
{
m_epollfd = epollfd;
m_sockfd = sockfd;
m_writeing = true;
m_address = client_addr;
//初始化读写缓冲区
m_inBuff.init();
m_outBuff.init();
}
void HandleRead()
{
while(true)
{
char buf[BUFFER_SIZE];
int ret = recv(m_sockfd, buf, BUFFER_SIZE, 0);
if(ret < 0)
{
//缓冲区内容已读完
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
break;
}
//其他错误直接断开连接
Util::outError("HandleRead");
shutdown();
}
//断开连接
else if(ret == 0)
{
shutdown();
}
else
{
//将读取的内容加入缓冲区
m_inBuff.PutData(buf, ret);
}
//printf("[%s]\n", buf);
}
worker();
}
void HandleWrite()
{
//更改临界值
if(!m_writeing)
return;
m_writeing = false;
//取出数据
char buf[BUFFER_SIZE];
int len = m_outBuff.GetData(buf, BUFFER_SIZE);
int n = len;
while (n > 0)
{
int ret = send(m_sockfd, buf+len-n, n, 0);
if (ret < n)
{
if (ret == -1 && errno != EAGAIN)
{
Util::outError("write error");
}
break;
}
n -= ret;
}
//n=0表示数据全部写完,删除写事件
if(n == 0)
modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
//恢复临界值
m_writeing = true;
}
void worker()
{
//解析
//取出数据
char buf[BUFFER_SIZE];
int len = m_inBuff.GetData(buf, BUFFER_SIZE);
while(!m_writeing)
usleep(1000);
m_writeing = false;
m_outBuff.PutData(buf, len);
modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);
m_writeing = true;
}
void shutdown()
{
//等待写事件完成后关闭
while(!m_writeing)
usleep(1000);
m_writeing = false;
removeFd(m_epollfd, m_sockfd);
}
private:
int m_epollfd; //epoll描述符
int m_sockfd; //套接字描述符
sockaddr_in m_address; //套接字地址
Buffer m_inBuff; //读缓冲
Buffer m_outBuff; //写缓冲
std::atomic_bool m_writeing; //是否正在写
};
#endif /* Connection_H */