整体架构:
主要有这么几个类:
Server, Epoll, Socket, Accept, User.
初版代码,i/o处理不完善,仅供参考,主要是结构
还有部分功能没有实现,以后陆续加需求。
1.Accept类
第一层:Accept类,主要处理客户端连接,为上层提供与该客户端通信的套接字描述符
class FDB_Accept {
public:
FDB_Accept() = default;
FDB_Accept(int request_fd);
~FDB_Accept();
int getAccepted_fd();
/* int Accept_write(char * str);
int Accept_read (char * str);
*/
private:
int m_iAccepted_fd;
struct sockaddr_in m_Client_addr;
socklen_t m_cli_AddrLength = sizeof(m_Client_addr);
/*int m_iSock_fd;*/
/*char connfd_buf[100];*/
/*Buffer connfd_buf;*/
/*int id;*/
};
#include "FDB_Accept.h"
FDB_Accept::FDB_Accept(int request_fd) {
/* m_iSock_fd = request_fd;*/
m_iAccepted_fd = accept(request_fd, (struct sockaddr*)&m_Client_addr, &m_cli_AddrLength);
if( -1 == m_iAccepted_fd)
std::cout << "accept ERROR!" << std::endl;
}
int FDB_Accept::getAccepted_fd() {
return m_iAccepted_fd;
}
FDB_Accept::~FDB_Accept() {
}
2.User类
维护一个在线用户数组,主要为了管理与服务器建立连接的客户端。为上层提供添加,查找,删除,查看用户信息等功能。
#ifndef MY_NETLIB_USER_H
#define MY_NETLIB_USER_H
#include <iostream>
#include <vector>
#include "FDB_Accept.h"
class FDB_Users {
public:
FDB_Users();
~FDB_Users() = default;
void User_add(FDB_Accept rhs);
bool User_del(int rhs_fd);
bool User_mod();
bool User_find(int rhs_fd);
bool User_show();
FDB_Accept get_Accepter(int rhs_fd);
private:
std::vector <FDB_Accept> m_vct_Acpt_User_data;
};
#endif //MY_NETLIB_USER_H
#include "FDB_Users.h"
FDB_Users::FDB_Users(){
}
void FDB_Users::User_add(FDB_Accept rhs) {
m_vct_Acpt_User_data.push_back(rhs);
}
bool FDB_Users::User_del(int rhs_fd) {
for(auto item = m_vct_Acpt_User_data.begin(); item != m_vct_Acpt_User_data.end(); item++)
{
if((*item).getAccepted_fd() == rhs_fd) {
m_vct_Acpt_User_data.erase(item);
return true;
}
}
return false;
}
bool FDB_Users::User_mod() {
}
bool FDB_Users::User_find(int rhs_fd) {
int size = 0;
for(auto item : m_vct_Acpt_User_data){
size++;
if(rhs_fd == item.getAccepted_fd())
return true;
}
std::cout << size << std::endl;
return false;
}
bool FDB_Users::User_show() {
for(auto item:m_vct_Acpt_User_data)
std::cout << "On line user acpted_fd: " <<item.getAccepted_fd() << std::endl;
return true;
}
FDB_Accept FDB_Users::get_Accepter(int rhs_fd) {
for(auto item : m_vct_Acpt_User_data)
{
if(rhs_fd == item.getAccepted_fd())
return item;
}
// return NULL;
}
3.Socket类
主要用于服务端各种初始化,创建套接字,绑定监听,并设置非阻塞。
为上层提供用于监听的套接字描述符
class FDB_Socket {
public:
explicit FDB_Socket(int sockfd):m_iSockfd(sockfd){}; /*声明为explicit的构造函数不能在隐式转换中使用。,如果存在直接赋值*/
FDB_Socket(sa_family_t family, int listen_num);
~FDB_Socket();
bool Socket_createSocket(sa_family_t family = AF_INET, int backlog = 100); /*构造核心函数*/
bool Socket_bindAddress(); /*命名套接字*/
bool Socket_listen(int backlog); /*监听套接字*/
bool Socket_setReusePort(bool on); /*设置端口重用*/
bool Socket_setTimeOutRecnt(bool on); /*设置超时重连*/
int Socket_setNoBlocking(); /*设置 m_iSockfd 为非阻塞*/
int Socket_getfd(); /*获取套接字描述符 m_iSockfd*/
int Socket_do_accept();
bool Socket_getbacklog();
bool Socket_shutdownWrite();
bool Socket_TcpNoDelay(bool on);
bool Socket_setResuseAddr(bool on);
bool Socket_setKeepAlive(bool on);
bool test_accept();
/*
bool Socket_getTcpInfo(struct tcp_info *) const;
bool Socket_getTcpInfoString(char * buf, int len) const;
*/
private:
int m_iSockfd;
int m_iBacklog;
struct sockaddr_in m_addr_inAddress;
struct sockaddr_in m_addr_inCliaddr;
};
#include "FDB_Socket.h"
typedef struct sockaddr SA;
/***************************核心构造函数***********************************************/
FDB_Socket::FDB_Socket(sa_family_t family, int listen_num) {
Socket_createSocket(family, listen_num);
Socket_setReusePort(true);
bzero(&m_addr_inAddress, sizeof(m_addr_inAddress));
m_addr_inAddress.sin_family = AF_INET; /*协议族*/
m_addr_inAddress.sin_addr.s_addr = htonl(INADDR_ANY); /*ip*/
m_addr_inAddress.sin_port = htons(9201); /*port*/
if(Socket_bindAddress() && (m_iSockfd > 0)) { /*绑定*/
if(Socket_listen(listen_num)) { /*监听*/
if(Socket_setNoBlocking()) /*非阻塞*/
return;
}
}
}
bool FDB_Socket::Socket_createSocket(sa_family_t family, int listen_num) { /*创建一个socket*/
m_iBacklog = listen_num;
m_iSockfd = socket(family, SOCK_STREAM, 0); /*调用socket(),创建一个socket*/
if( m_iSockfd < 0)
std::cout << "log_net_error" << std::endl; /*创建socket失败*/
}
bool FDB_Socket::Socket_bindAddress() {
std::cout << "m_iSockfd = " << m_iSockfd << " len = " << sizeof(m_addr_inAddress) << std::endl;
int ret = bind(m_iSockfd, (SA*)&m_addr_inAddress, sizeof(m_addr_inAddress)); /*命名m_iSocket*/
if(ret < 0){
std::cout << "bind ERROR! return " << ret << std::endl; /*命名失败*/
return false;
}
return true;
}
bool FDB_Socket::Socket_listen(int backlog) {
int ret = listen(m_iSockfd, backlog); /*监听套接字m_iSockfd*/
if(ret < 0) {
std::cout << "listen ERROR" <<std::endl; /*监听失败*/
return false;
}
return true;
}
int FDB_Socket::Socket_do_accept() { /*测试用,进行链接监听并读取一次*/
socklen_t client_addr_length;
int connfd;
while(1) {
if(client_addr_length == 0){break;}
client_addr_length = sizeof(m_addr_inCliaddr);
connfd = accept(m_iSockfd, (SA*)&m_addr_inCliaddr, &client_addr_length); /*accept*/
if(connfd > 0) {
if(fork() == 0)
{
char buf[100];
while(read(connfd, buf, 100) > 0) {
std::cout << buf << std::endl;
}
}
}
}
}
bool FDB_Socket::Socket_getbacklog(){}
bool FDB_Socket::Socket_shutdownWrite(){}
bool FDB_Socket::Socket_TcpNoDelay(bool on){}
bool FDB_Socket::Socket_setReusePort(bool on) { /*设置端口重用*/
int optval = on ? 0 : 1;
::setsockopt(m_iSockfd, SOL_SOCKET, SO_REUSEPORT, &optval, static_cast<socklen_t >(sizeof optval));
return true;
}
bool FDB_Socket::Socket_setResuseAddr(bool on) { /*设置地址重用*/
int optval = on ? 1: 0;
::setsockopt(m_iSockfd, SOL_SOCKET, SO_REUSEADDR, &optval, static_cast<socklen_t>(sizeof optval));
return true;
}
bool FDB_Socket::Socket_setTimeOutRecnt(bool on){
int optval = on ? 1: 0;
::setsockopt(m_iSockfd, SOL_SOCKET, SO_SNDTIMEO, &optval, static_cast<socklen_t>(sizeof optval));
}
int FDB_Socket::Socket_setNoBlocking() { /*设置 m_iSockfd 为非阻塞*/
int old_option = fcntl(m_iSockfd,F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(m_iSockfd , F_SETFL, new_option);
return old_option;
}
bool FDB_Socket::Socket_setKeepAlive(bool on) { /*保持活着*/
int optval = on ? 1 : 0;
::setsockopt(m_iSockfd, SOL_SOCKET, SO_KEEPALIVE, &optval, static_cast<socklen_t >(sizeof optval));
return true;
}
bool FDB_Socket::test_accept(){
}
int FDB_Socket::Socket_getfd(){ /*获取套接字描述符 m_iSockfd*/
return m_iSockfd;
}
4.Epoll类
服务端核心类,主循环,监听读写事件,做转发
class FDB_Epoll {
public:
FDB_Epoll(int fd); /*构造函数*/
~FDB_Epoll() ; /*析构函数*/
bool Epoll_create_events(); /*创建epoll_event*/
bool Epoll_reset(int fd); /*重置epoll_fd*/
bool Epoll_add(int fd, bool enable_et, bool oneshot); /*添加可读事件*/
bool Epoll_add_initListen(int fd, bool enable_et); /*添加监听事件*/
bool Epoll_setnoblocking(int socket_fd); /*设置非阻塞*/
bool Epoll_del(int fd); /*删除epoll事件*/
bool Epoll_wait(); /*核心函数*/
private:
int m_iEpoll_fd ;
const int m_ciMAX_NUM = 100 ;
int m_iBUF_SIZE ;
int m_iListen_fd ;
int m_iDBfd ;
epoll_event * m_epEvent_p ;
};
#include "FDB_Epoll.h"
#include "FDB_Accept.h"
#include "FDB_Users.h"
#include "FDB_Users.h"
#include <iostream>
//extern FDB_Users user;
FDB_Users user;
FDB_Epoll::FDB_Epoll(int fd) { /*构造函数*/
m_iListen_fd = fd; /*初始化m_iSock_fd*/
m_iEpoll_fd = epoll_create(m_ciMAX_NUM); /*创建epoll实例*/
Epoll_create_events(); /*开辟epoll_event空间*/
if( -1 == m_iEpoll_fd)
std::cout << "epoll_create ERROR! "<< std::endl;
//测试用代码
const char * ip = "192.168.30.140";
//int port = 6550;
int port = 9201;
struct sockaddr_in server_address2;
bzero( &server_address2, sizeof(server_address2) );
server_address2.sin_family = AF_INET;
inet_pton( AF_INET, ip, &server_address2.sin_addr );
server_address2.sin_port = htons(port);
int DB_sockfd2 = socket( PF_INET, SOCK_STREAM, 0 );
assert(DB_sockfd2 >= 0);
if( connect(DB_sockfd2, (struct sockaddr*)&server_address2, sizeof(server_address2)) < 0 )
{
printf("connetcion failed\n");
//close(DB_sockfd2);
m_iDBfd = -1;
}else {
printf("connetc DB_SERVER SUCCESSED !\n");
m_iDBfd = DB_sockfd2;
}
}
bool FDB_Epoll::Epoll_create_events() {
m_epEvent_p = new epoll_event[m_ciMAX_NUM]; /*开辟epoll_event空间*/
if( !m_epEvent_p ) {
std::cout << "new error go to log now! " << std::endl;
return false;
}
return true;
}
bool FDB_Epoll::Epoll_add_initListen(int fd, bool enable_et){
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if(enable_et) {
event.events |= EPOLLET;
/*event.events |= EPOLLONESHOT;*/
}
Epoll_setnoblocking(fd);
epoll_ctl(m_iEpoll_fd, EPOLL_CTL_ADD, fd, &event); /*向兴趣列表里添加监听事件*/
return true;
}
bool FDB_Epoll::Epoll_add(int fd, bool enable_et, bool oneshot) {
std::cout << "one shot!" << std::endl;
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if( enable_et ) {
event.events |= EPOLLET; /*设置边沿触发*/
}
if(oneshot) {
event.events |= EPOLLONESHOT; /*只处理一次*/
}
epoll_ctl(m_iEpoll_fd, EPOLL_CTL_ADD, fd, &event); /*添加*/
Epoll_setnoblocking(fd);
return true;
}
bool FDB_Epoll::Epoll_setnoblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option); /*设置非阻塞*/
return (bool)old_option;
}
bool FDB_Epoll::Epoll_reset(int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(m_iEpoll_fd, EPOLL_CTL_MOD, fd, &event); /*重置读事件,边沿触发, 处理一次*/
}
bool FDB_Epoll::Epoll_del(int fd) {
epoll_event event;
event.data.fd = fd;
epoll_ctl(m_iEpoll_fd, EPOLL_CTL_DEL, fd, &event); /*删除*/
close(fd);
}
bool FDB_Epoll::Epoll_wait() { /*epoll_wait()服务器核心*/
bool work_status = true;
Epoll_add_initListen(m_iListen_fd, false);
while(work_status)
{
std::cout << "Debug_message: Showing On Line List ! " << std::endl;
user.User_show();
std::cout << "Debug_message: epoll waiting" << std::endl;
int ep_event_num = epoll_wait(m_iEpoll_fd, m_epEvent_p, m_ciMAX_NUM, -1); /*调用epoll_wait() 核心函数*/
assert(ep_event_num >= 0);
for(int ep_event_i = 0; ep_event_i < ep_event_num; ep_event_i++) {
int now_skfd = m_epEvent_p[ep_event_i].data.fd;
int accepted_fd;
if( ( m_epEvent_p[ep_event_i].events & EPOLLERR) ||
( m_epEvent_p[ep_event_i].events & EPOLLHUP) ||
(!(m_epEvent_p[ep_event_i].events & EPOLLIN )) ) { /*error,挂起,断开,并且没有发生读事件*/
std::cout << "Debug_message: epoll Error!" << std::endl;
Epoll_del(now_skfd);
continue;
}else if(now_skfd == m_iListen_fd &&
(m_epEvent_p[ep_event_i].events & EPOLLIN) ) { /*新用户接入,分配资源,接受连接*/
FDB_Accept connt(now_skfd);
accepted_fd = connt.getAccepted_fd();
user.User_add(connt);
Epoll_add(accepted_fd, true, false);
std::cout << "One New Client !" << std::endl;
}else if (m_epEvent_p[ep_event_i].events & EPOLLOUT) { /*写事件*/
//send()
}else if (m_epEvent_p[ep_event_i].events & EPOLLIN) { /*读事件*/
/***********************************************测试代码********************************************/
char tmp_buf[100];
memset(tmp_buf, 0, sizeof(tmp_buf));
fflush(stdin);
ssize_t recv_num = recv(now_skfd, tmp_buf, 100, 0);
puts(tmp_buf);
if(0 == recv_num) { /*客户端掉线*/
user.User_del(now_skfd);
Epoll_del(now_skfd);
std::cout << "Lost A Client ! " << std::endl;
}
std::cout << "Debug_message: Got a pice of message " << recv_num << " bites ";
std::cout << "from fd: " << now_skfd << " client " << std::endl;
if(m_iDBfd != -1) {
for(int i = 0; i < 300860; i++)
send(m_iDBfd, tmp_buf, (size_t)recv_num, 0);
}
send(now_skfd, tmp_buf, (size_t)recv_num, 0);
memset(tmp_buf, 0, sizeof(tmp_buf));
/************************************************测试代码****************************************************/
}
}/*end for(int i = 0; i < ret; i++)*/
}/*end while(work_status)*/
}
FDB_Epoll::~FDB_Epoll() {
delete[] m_epEvent_p;
}
5.Server类
主要实现多线程,将Epoll,Socket封装起来
#include <iostream>
#include "FDB_Epoll.h"
#include "FDB_Socket.h"
#include <thread>
class FDB_Server {
public:
FDB_Server(int status, int connfd_num, int pthread_num);
~FDB_Server();
bool server_start();
int get_server_fd();
/*
bool server_end();
bool server_change(int status);
int server_get_status();
*/
private:
int m_iServer_status;
int m_iConnfd_num;
int m_iChanges;
int m_iSocketfd;
int m_iServer_thread_num;
};
static void server_work(int fd);
#include "FDB_Server.h"
FDB_Server::FDB_Server(int status, int connfd_num, int pthread_num) { /*构造服务器基础信息*/
m_iServer_status = status; /*服务器状态*/
m_iConnfd_num = connfd_num; /*最大链接个数,其实需要限定下*/
m_iServer_thread_num = pthread_num; /*可配置的PEUSEPORT 个数*/
}
bool FDB_Server::server_start() {
FDB_Socket db_Server(AF_INET, 10) ;
m_iSocketfd = db_Server.Socket_getfd();
std::thread threads[m_iServer_thread_num]; /*可配置的服务器线程数*/
for(int i = 0 ; i < m_iServer_thread_num; i++) {
threads[i] = std::thread(server_work, m_iSocketfd);
}
for(int i = 0; i < m_iServer_thread_num; i++) {
threads[i].join();
}
}
static void server_work(int fd ) {
FDB_Epoll epo(fd);
epo.Epoll_wait() ;
}
int FDB_Server::get_server_fd() {
return m_iSocketfd;
}
/*
bool FDB_Server::server_end();
bool FDB_Server::server_change(int status);
int FDB_Server::server_get_status();
*/
FDB_Server::~FDB_Server(){
}
另附完整代码:
https://github.com/Dulun/Summer2016/tree/master/my_netlib