之前想学muduo 但因为一些原因,暂时编译不过,碰巧看到一个博主是模仿着muduo写的mini-muduo,看了看觉得挺好,仅仅实现Echo服务器,从最简单的epoll模型开始到反应堆+多线程,为了简洁没有使用智能指针(它也提示了会内存泄漏),部分有bug,但依旧是一个很好的学习对象。本文就最后的 反应堆+多线程模型作一下分析。(原文分成了13小节)
main.cpp
#include "TcpServer.h"
#include "EventLoop.h"
#include "EchoServer.h"
int main(int args, char** argv)
{
EventLoop loop;
EchoServer echoserver(&loop);
echoserver.start();
loop.loop();
return 0;
}
接口都是一样的,一个服务器,一个loop,分别使用start loop方法就可以让程序跑起来,
EchoServer.h
#ifndef ECHOSERVER_H
#define ECHOSERVER_H
#include "IMuduoUser.h"
#include "IRun.h"
#include "TcpServer.h"
#include "ThreadPool.h"
class EchoServer : public IMuduoUser
, public IRun2
{
public:
EchoServer(EventLoop* pLoop);
~EchoServer();
void start();
virtual void onConnection(TcpConnection* pCon);
virtual void onMessage(TcpConnection* pCon, Buffer* pBuf);
virtual void onWriteComplate(TcpConnection* pCon);
virtual void run2(const string& str, void* tcp);
private:
int fib(int n);
EventLoop* _pLoop;
TcpServer _pServer;
ThreadPool _threadpool;
long _timer;
int _index;
};
#endif
这个EchoServer 是应用层面的服务器,继承于IMuduoUser和IRun2。IMuduoUser类包含onConnection,onMessage,onWriteComplate三个虚函数,要求子类EchoServer实现,分别对应着连接时,收到消息时,发送消息完成时需要的部分处理,
IRun2类虚函数是run2,这个函数运用于回调。
EventLoop* _pLoop是事件轮讯对象指针,TcpServer _pServer是服务器对象,ThreadPool _threadpool是线程池,_timer是计时器的地址(原博主的意思是用计时器的id,但他直接使用了计时器的指针存放在long类型的_timer里,可以说是有点古怪,后面需要多次的转型)
_index暂无使用
EchoServer.cc
#include "EchoServer.h"
#include "TcpConnection.h"
#include "EventLoop.h"
#include "CurrentThread.h"
#include "Task.h"
#include <iostream>
#define MESSAGE_LENGTH 8
EchoServer::EchoServer(EventLoop* pLoop)
:_pLoop(pLoop)
,_pServer(pLoop)
,_timer(-1)
,_index(0)
{
_pServer.setCallback(this);
}
EchoServer::~EchoServer()
{}
void EchoServer::start()
{
_pServer.start();
_threadpool.start(3);
}
void EchoServer::onConnection(TcpConnection* pCon)
{
printf("%s\n",__func__);
cout << "onConnection" << endl;
}
void EchoServer::onMessage(TcpConnection* pCon, Buffer* pBuf)
{
printf("%s\n",__func__);
while(pBuf->readableBytes() > MESSAGE_LENGTH)
{
string message = pBuf->retrieveAsString(MESSAGE_LENGTH);
Task task(this, message, pCon);
_threadpool.addTask(task);
}
if (pBuf->readableBytes() <= MESSAGE_LENGTH){
string message = pBuf->retrieveAllAsString();
Task task(this, message, pCon);
_threadpool.addTask(task);
}
}
void EchoServer::onWriteComplate(TcpConnection* pCon)
{
printf("%s\n",__func__);
cout << "onWriteComplate" << endl;
}
//run in different therad
void EchoServer::run2(const string& str, void* tcp)
{
printf("%s\n",__func__);
//IO blocking task or CPU busy task
cout << "fib(30) = " << fib(30) << " tid = " << CurrentThread::tid() << endl;
((TcpConnection*)tcp)->send(str);
}
//fib is short for Fibonacci, fib is a CPU busy method
int EchoServer::fib(int n)
{
return (n == 1 || n == 2) ? 1 : (fib(n-1) + fib(n-2));
}
可以看到 onConnection,onWriteComplate类仅仅是打印一下函数名称, printf("%s\n",__func__);
是我为了打印函数执行流程而打印的,使用时可去除。
先不看run2 ,fib,onMessage,而是看main里面的echoserver.start
void EchoServer::start()
{
_pServer.start();
_threadpool.start(3);
}
转到TcpServer
TcpServer.h
//author voidccc
#ifndef TCPSERVER_H
#define TCPSERVER_H
#include <sys/epoll.h>
#include "Declear.h"
#include "Define.h"
#include "IAcceptorCallback.h"
#include "IMuduoUser.h"
#include <map>
using namespace std;
class TcpServer : public IAcceptorCallback
{
public:
TcpServer(EventLoop* pLoop);
~TcpServer();
void start();
void setCallback(IMuduoUser* pUser);
virtual void newConnection(int sockfd);
private:
struct epoll_event _events[MAX_EVENTS];
map<int, TcpConnection*> _connections;
Acceptor* _pAcceptor;
EventLoop* _pLoop;
IMuduoUser* _pUser;
};
#endif
TcpServer.cc
//author voidccc
#include <errno.h>
#include "TcpServer.h"
#include "Channel.h"
#include "Acceptor.h"
#include "TcpConnection.h"
#include <vector>
TcpServer::TcpServer(EventLoop* pLoop)
:_pAcceptor(NULL)
,_pLoop(pLoop)
,_pUser(NULL)
{
}
TcpServer::~TcpServer()
{
}
void TcpServer::start()
{
_pAcceptor = new Acceptor(_pLoop); // Memory Leak !!!
_pAcceptor->setCallback(this);
_pAcceptor->start();
}
void TcpServer::newConnection(int sockfd)
{
TcpConnection* tcp = new TcpConnection(sockfd, _pLoop); // Memory Leak !!!
_connections[sockfd] = tcp;
tcp->setUser(_pUser);
tcp->connectEstablished();
}
void TcpServer::setCallback(IMuduoUser* user)
{
_pUser = user;
}
直接看TcpServer.start里面new了一个Accpetor
Accpetor.h
//author voidccc
#ifndef ACCEPTOR_H
#define ACCEPTOR_H
#include "Declear.h"
#include "Define.h"
#include "IChannelCallback.h"
class Acceptor : public IChannelCallback
{
public:
Acceptor(EventLoop* pLoop);
~Acceptor();
void start();
void setCallback(IAcceptorCallback* pCallback);
virtual void handleRead();
virtual void handleWrite();
private:
int createAndListen();
int _listenfd;
Channel* _pSocketAChannel;
IAcceptorCallback* _pCallback;
EventLoop* _pLoop;
};
#endif
跳到Acceptor.cc内
直接看start,
Acceptor.cc
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#include "Acceptor.h"
#include "Channel.h"
#include "IAcceptorCallback.h"
#include "EventLoop.h"
#include <iostream>
using namespace std;
Acceptor::Acceptor(EventLoop* pLoop)
:_listenfd(-1)
,_pSocketAChannel(NULL)
,_pCallback(NULL)
,_pLoop(pLoop)
{}
Acceptor::~Acceptor()
{}
void Acceptor::start()
{
_listenfd = createAndListen();
_pSocketAChannel = new Channel(_pLoop, _listenfd); // Memory Leak !!!
_pSocketAChannel->setCallback(this);
_pSocketAChannel->enableReading();
}
int Acceptor::createAndListen()
{
int on = 1;
_listenfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
fcntl(_listenfd, F_SETFL, O_NONBLOCK); //no-block io
setsockopt(_listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(11111);
if(-1 == bind(_listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)))
{
cout << "bind error, errno:" << errno << endl;
}
if(-1 == listen(_listenfd, MAX_LISTENFD))
{
cout << "listen error, errno:" << errno << endl;
}
return _listenfd;
}
void Acceptor::handleRead()
{
int connfd;
struct sockaddr_in cliaddr;
socklen_t clilen = sizeof(struct sockaddr_in);
connfd = accept(_listenfd, (sockaddr*)&cliaddr, (socklen_t*)&clilen);
if(connfd > 0)
{
cout << "new connection from "
<< "[" << inet_ntoa(cliaddr.sin_addr)
<< ":" << ntohs(cliaddr.sin_port) << "]"
<< " new socket fd:" << connfd
<< endl;
}
else
{
cout << "accept error, connfd:" << connfd
<< " errno:" << errno << endl;
}
fcntl(connfd, F_SETFL, O_NONBLOCK); //no-block io
_pCallback->newConnection(connfd);
}
void Acceptor::handleWrite()
{
}
void Acceptor::setCallback(IAcceptorCallback* pCallback)
{
_pCallback = pCallback;
}
可以知道这个Acceptor类是生成了一个监听套接字,再new 了一个Channel,在其构造函数传入listenfd,那这个Channel是什么类呢?
Channel的原意是频道,波段,我觉得这个Channel更像是文件描述符对于epoll的一层封装。
Channel.h
#ifndef CHANNEL_H
#define CHANNEL_H
#include "Declear.h"
class Channel
{
public:
Channel(EventLoop* pLoop, int sockfd);
~Channel();
void setCallback(IChannelCallback* pCallback);
void handleEvent();
void setRevents(int revent);
void setIndex(int index);
void enableReading();
void enableWriting();
void disableWriting();
bool isWriting();
int getEvents();
int getfd();
int getIndex();
private:
void update();
int _sockfd;
int _events;
int _revents;
int _index;
IChannelCallback* _pCallback;
EventLoop* _pLoop;
};
#endif
Channel.cpp
//author voidccc
#include <sys/epoll.h>
#include "Channel.h"
#include "IChannelCallback.h"
#include "EventLoop.h"
#include <iostream>
using namespace std;
Channel::Channel(EventLoop* pLoop, int sockfd)
:_sockfd(sockfd)
,_events(0)
,_revents(0)
,_index(-1)
,_pCallback(NULL)
,_pLoop(pLoop)
{
}
void Channel::setCallback(IChannelCallback* pCallback)
{
_pCallback = pCallback;
}
void Channel::setRevents(int revents)
{
_revents = revents;
}
void Channel::setIndex(int index)
{
_index = index;
}
void Channel::handleEvent()
{
if(_revents & EPOLLIN)
{
_pCallback->handleRead();
}
if(_revents & EPOLLOUT)
{
_pCallback->handleWrite();
}
}
void Channel::enableReading()
{
_events |= EPOLLIN;
update();
}
void Channel::enableWriting()
{
_events |= EPOLLOUT;
update();
}
void Channel::disableWriting()
{
_events &= ~EPOLLOUT;
update();
}
bool Channel::isWriting()
{
return _events & EPOLLOUT;
}
void Channel::update()
{
_pLoop->update(this);
}
int Channel::getEvents()
{
return _events;
}
int Channel::getfd()
{
return _sockfd;
}
int Channel::getIndex()
{
return _index;
}
可以看出对这个类的操作仅仅是对文件描述符的events属性做了修改,再在_pLoop中update了这个对象,在Acceptor::start的enableReading也就是将这个修改了的文件描述符让其epoll的属性设置为EPOLLIN,enableWriting调用的EventLoop::update
void EventLoop::update(Channel *pChannel)
{
printf("%s\n", __func__);
_pPoller->update(pChannel);
}
会在其Epoll *_pPoller内update这个Channel,也就是调用了 ::epoll_ctl EPOLL_CTL_ADD或者EPOLL_CTL_MOD
void Epoll::update(Channel* pChannel)
{
int index = pChannel->getIndex();
if(index == kNew)
{
struct epoll_event ev;
ev.data.ptr = pChannel;
ev.events = pChannel->getEvents();
int fd = pChannel->getfd();
pChannel->setIndex(kAdded);
::epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev);
}
else
{
struct epoll_event ev;
ev.data.ptr = pChannel;
ev.events = pChannel->getEvents();
int fd = pChannel->getfd();
::epoll_ctl(_epollfd, EPOLL_CTL_MOD, fd, &ev);
}
}
可见在这个TcpServer中的Acceptor成员会通过Channel在EventLoop的Epoll对象中将listenfd监听套接字让epoll去监听。
一步步回到EchoServer.start 它接着调用了threadpoll.start(3)也就是让线程池开启了3个线程
它的线程池实现:
ThreadPool.h
//author voidccc
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include "Declear.h"
#include "BlockingQueue.h"
#include "Task.h"
#include "IRun.h"
#include <vector>
using namespace std;
class ThreadPool : public IRun0
{
public:
ThreadPool();
void start(int numThreads);
void addTask(Task& task);
virtual void run0();
private:
void runInThread();
BlockingQueue<Task> _tasks;
vector<Thread*> _threads;
};
#endif
ThreadPool.cc
#include "ThreadPool.h"
#include "Thread.h"
ThreadPool::ThreadPool() { }
void ThreadPool::start(int numThreads)
{
printf("%s\n",__func__);
_threads.reserve(numThreads);
for(int i = 0 ; i < numThreads; i++)
{
Task task(this);//runInThread while 1
Thread* p = new Thread(task);
_threads.push_back(p);
p->start();
}
}
//virtual for Thread
void ThreadPool::addTask(Task& task)
{
printf("%s\n",__func__);
_tasks.put(task);
}
//virtual for Thread class
void ThreadPool::run0()
{
printf("%s\n",__func__);
runInThread();
}
void ThreadPool::runInThread()
{
printf("%s\n",__func__);
while(true)
{
_tasks.take().doTask();// 从跨线程的阻塞队列取出任务执行
}
}
利用的阻塞队列
BlockingQueue.h
//author voidccc
#ifndef BLOCKINGQUEUE_H
#define BLOCKINGQUEUE_H
#include <deque>
#include "Condition.h"
#include "Mutex.h"
using namespace std;
template<class T>
class BlockingQueue
{
public:
BlockingQueue()
:_cond(_mutex)
{}
void put(const T& one)
{
MutexLockGuard lock(_mutex);
_queue.push_back(one);
_cond.notify();
}
T take()
{
MutexLockGuard lock(_mutex);
while(_queue.empty())
{
_cond.wait();
}
T front(_queue.front());
_queue.pop_front();
return front;
}
private:
deque<T> _queue;
MutexLock _mutex;
Condition _cond;
};
#endif
其中的锁和条件变量
//author voidccc
#ifndef MUTEX_H
#define MUTEX_H
#include <pthread.h>
class MutexLock
{
public:
MutexLock()
{
pthread_mutex_init(&_mutexid, NULL);
}
~MutexLock()
{
pthread_mutex_destroy(&_mutexid);
}
void lock()
{
pthread_mutex_lock(&_mutexid);
}
void unlock()
{
pthread_mutex_unlock(&_mutexid);
}
pthread_mutex_t* getPthreadMutex()
{
return &_mutexid;
}
private:
pthread_mutex_t _mutexid;
};
class MutexLockGuard
{
public:
MutexLockGuard(MutexLock& mutex)
:_mutex(mutex)
{
_mutex.lock();
}
~MutexLockGuard()
{
_mutex.unlock();
}
private:
MutexLock& _mutex;
};
#endif
Conition.h
//author voidccc
#ifndef CONDITION_H
#define CONDITION_H
#include <pthread.h>
#include "Mutex.h"
class Condition
{
public:
Condition(MutexLock& mutex)
:_mutex(mutex)
{
pthread_cond_init(&_condid, NULL);
}
~Condition()
{
pthread_cond_destroy(&_condid);
}
void wait()
{
pthread_cond_wait(&_condid, _mutex.getPthreadMutex());
}
void notify()
{
pthread_cond_signal(&_condid);
}
void notifyAll()
{
pthread_cond_broadcast(&_condid);
}
private:
MutexLock& _mutex;
pthread_cond_t _condid;
};
#endif
这里不说什么,直接看ThreadPool::start
void ThreadPool::start(int numThreads)
{
printf("%s\n",__func__);
_threads.reserve(numThreads);
for(int i = 0 ; i < numThreads; i++)
{
Task task(this);//runInThread while 1
Thread* p = new Thread(task);
_threads.push_back(p);
p->start();
}
}
在一个线程的指针vector中逐渐加入new的Thread指针。肯定疑惑的是这个task任务是什么
task.h
//author voidccc
#ifndef TASK_H
#define TASK_H
#include "Declear.h"
#include <string>
class Task
{
public:
Task(IRun0* func);
Task(IRun2* func, const std::string& str, void* param);
void doTask();
private:
IRun0* _func0;
IRun2* _func2;
std::string _str;
void* _param;
};
#endif
task.cc
#include "Task.h"
#include "IRun.h"
Task::Task(IRun0* func)
:_func0(func)
,_func2(NULL)
,_param(NULL)
{
}
Task::Task(IRun2* func, const string& str, void* param)
:_func0(NULL)
,_func2(func)
,_str(str)
,_param(param)
{
}
void Task::doTask()
{
printf("%s\n",__func__);
if(_func0) {
_func0->run0();
} else {
_func2->run2(_str, _param);
}
}
看出来task只是根据它传入的参数个数分别调用相应的run0或者run2。
而ThreadPool实现的run0由绕圈子的调用了runInThread也就是
void ThreadPool::runInThread()
{
printf("%s\n",__func__);
while(true)
{
_tasks.take().doTask();// 从跨线程的阻塞队列取出任务执行
}
}
可以看到这个函数是从_tasks阻塞队列中取出任务执行。
所以每个线程的任务也就是循环着取出任务执行。当然此时任务队列是空的。
此时echoserver.start已经好了
接着看loop.loop();
//author voidccc
#ifndef EVENTLOOP_H
#define EVENTLOOP_H
#include "Declear.h"
#include "IChannelCallback.h"
#include "Task.h"
#include "Mutex.h"
#include <vector>
using namespace std;
class EventLoop : public IChannelCallback
{
public:
EventLoop();
~EventLoop();
void loop();
void update(Channel* pChannel);
void queueInLoop(Task& task);
void runInLoop(Task& task);
long runAt(Timestamp when, IRun0* pRun);
long runAfter(double delay, IRun0* pRun);
long runEvery(double interval, IRun0* pRun);
void cancelTimer(long timerfd);
bool isInLoopThread();
virtual void handleRead();
virtual void handleWrite();
private:
void wakeup();
int createEventfd();
void doPendingFunctors();
bool _quit;
bool _callingPendingFunctors;
Epoll* _pPoller;
int _eventfd;
const pid_t _threadId;
Channel* _pEventfdChannel;
MutexLock _mutex;
vector<Task> _pendingFunctors;
TimerQueue* _pTimerQueue;
};
#endif
//author voidccc
#include <sys/eventfd.h>
#include "EventLoop.h"
#include "Channel.h"
#include "Epoll.h"
#include "TimerQueue.h"
#include "Timestamp.h"
#include "Task.h"
#include "CurrentThread.h"
#include <iostream>
using namespace std;
EventLoop::EventLoop()
: _quit(false), _callingPendingFunctors(false), _pPoller(new Epoll()) // Memory Leak !!!
,
_threadId(CurrentThread::tid()), _pTimerQueue(new TimerQueue(this)) // Memory Leak!!!
{
_eventfd = createEventfd();
_pEventfdChannel = new Channel(this, _eventfd); // Memory Leak !!!
_pEventfdChannel->setCallback(this);
_pEventfdChannel->enableReading();
}
EventLoop::~EventLoop()
{
}
void EventLoop::loop()
{
while (!_quit)
{
puts("loop");
printf("tid : %d\n", CurrentThread::tid());
vector<Channel *> channels;
_pPoller->poll(&channels);
vector<Channel *>::iterator it;
for (it = channels.begin(); it != channels.end(); ++it)
{
puts("for");
(*it)->handleEvent();
}
doPendingFunctors();
}
}
void EventLoop::update(Channel *pChannel)
{
printf("%s\n", __func__);
_pPoller->update(pChannel);
}
void EventLoop::queueInLoop(Task &task)
{
printf("%s\n", __func__);
{
MutexLockGuard guard(_mutex);
_pendingFunctors.push_back(task);
}
if (!isInLoopThread() || _callingPendingFunctors)
{
wakeup();
}
}
void EventLoop::runInLoop(Task &task)
{
printf("%s\n", __func__);
if (isInLoopThread())
{
task.doTask();
}
else
{
queueInLoop(task);
}
}
void EventLoop::wakeup()
{
printf("%s\n", __func__);
uint64_t one = 1;
ssize_t n = ::write(_eventfd, &one, sizeof one);
if (n != sizeof one)
{
cout << "EventLoop::wakeup() writes " << n << " bytes instead of 8" << endl;
}
}
int EventLoop::createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
cout << "Failed in eventfd" << endl;
}
return evtfd;
}
void EventLoop::handleRead()
{
printf("%s\n", __func__);
uint64_t one = 1;
ssize_t n = ::read(_eventfd, &one, sizeof one);
if (n != sizeof one)
{
cout << "EventEventLoop::handleRead() reads " << n << " bytes instead of 8" << endl;
}
}
void EventLoop::handleWrite()
{
}
void EventLoop::doPendingFunctors()
{
printf("%s\n", __func__);
printf("tid - %d\n", CurrentThread::tid());
vector<Task> tempRuns;
_callingPendingFunctors = true;
{
MutexLockGuard guard(_mutex);
tempRuns.swap(_pendingFunctors);
}
vector<Task>::iterator it;
for (it = tempRuns.begin(); it != tempRuns.end(); ++it)
{
it->doTask();
}
_callingPendingFunctors = false;
}
long EventLoop::runAt(Timestamp when, IRun0 *pRun)
{
return _pTimerQueue->addTimer(pRun, when, 0.0);
}
long EventLoop::runAfter(double delay, IRun0 *pRun)
{
return _pTimerQueue->addTimer(pRun, Timestamp::nowAfter(delay), 0.0);
}
long EventLoop::runEvery(double interval, IRun0 *pRun)
{
return _pTimerQueue->addTimer(pRun, Timestamp::nowAfter(interval), interval);
}
void EventLoop::cancelTimer(long timerId)
{
printf("%s\n", __func__);
_pTimerQueue->cancelTimer(timerId);
}
bool EventLoop::isInLoopThread()
{
printf("%s\n", __func__);
return _threadId == CurrentThread::tid();
}
在loop中调用_pPoller->poll
也就是使用epoll_wait监听事件。
Epoll.h
//author voidccc
#ifndef EPOLL_H
#define EPOLL_H
#include <sys/epoll.h>
#include "Declear.h"
#include "Define.h"
#include <vector>
using namespace std;
class Epoll
{
public:
Epoll();
~Epoll();
void poll(vector<Channel*>* pChannels);
void update(Channel* pChannel);
private:
int _epollfd;
struct epoll_event _events[MAX_EVENTS];
};
#endif
Epoll.cpp
//author voidccc
#include <errno.h>
#include "Epoll.h"
#include "Channel.h"
#include "Define.h"
#include <iostream>
using namespace std;
const int kNew = -1;
const int kAdded = 1;
Epoll::Epoll()
{
_epollfd = ::epoll_create(1);
if (_epollfd <= 0)
cout << "epoll_create error, errno:" << _epollfd << endl;
}
Epoll::~Epoll()
{}
void Epoll::poll(vector<Channel*>* pChannels)
{
int fds = ::epoll_wait(_epollfd, _events, MAX_EVENTS, -1);
if(fds == -1)
{
cout << "epoll_wait error, errno:" << errno << endl;
return;
}
for(int i = 0; i < fds; i++)
{
Channel* pChannel = static_cast<Channel*>(_events[i].data.ptr);
pChannel->setRevents(_events[i].events);
pChannels->push_back(pChannel);
}
}
void Epoll::update(Channel* pChannel)
{
int index = pChannel->getIndex();
if(index == kNew)
{
struct epoll_event ev;
ev.data.ptr = pChannel;
ev.events = pChannel->getEvents();
int fd = pChannel->getfd();
pChannel->setIndex(kAdded);
::epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev);
}
else
{
struct epoll_event ev;
ev.data.ptr = pChannel;
ev.events = pChannel->getEvents();
int fd = pChannel->getfd();
::epoll_ctl(_epollfd, EPOLL_CTL_MOD, fd, &ev);
}
}
从poll可见通过_events[i].data.ptr
取出Channel指针,并返回一个Channel指针vector,紧接着EventLoop::loop遍历vector,如果事件是EPOLLIN调用读操作 ,或者是EPOLLOUT调用写操作
如果有客户端连接事件来临,会调用Acceptor::handleRead接受连接并回调TcpServer::newConnection其中会new一个 TcpConnection并在构造函数中new出一个有连接套接字的 Channel,并调用 enableReading将其让epoll监听。接着在TcpConnection::connectEstablished中回调EchoServer::onConnection打印了下函数名,当然可以用来 打印日志
此刻连接的全部工作结束。
当客户端发送消息 epoll_wait返回,loop中调用TcpConnection::handleRead 读取输入,
string linestr(line, readlength);
_inBuf.append(linestr);
_pUser->onMessage(this, &_inBuf);
在应用层输入缓冲区_inbuf中写入读取的内容。
调用EchoServer::onMessage
void EchoServer::onMessage(TcpConnection* pCon, Buffer* pBuf)
{
printf("%s\n",__func__);
while(pBuf->readableBytes() > MESSAGE_LENGTH)
{
string message = pBuf->retrieveAsString(MESSAGE_LENGTH);
Task task(this, message, pCon);
_threadpool.addTask(task);
}
if (pBuf->readableBytes() <= MESSAGE_LENGTH){
string message = pBuf->retrieveAllAsString();
Task task(this, message, pCon);
_threadpool.addTask(task);
}
}
(原博主的这个函数写的有问题,我修改了下)
大致意思是从缓冲区中每次取出定长发送 这里定长是MESSAGE_LENGTH,当然最后小于MESSAGE_LENGTH的也发,这里的Buffer类不难,可以直接看源码。接着它这里是定义一个含有message的Task,并放入线程池中,threadpool的三个线程会有线程来执行,由于是三参数的任务会调用EchoServer::run2,为模拟计算密集型的线程,它在run2中调用了递归的Fibonacci函数,接着再调用TcpConnection::send。
void EchoServer::run2(const string& str, void* tcp)
{
printf("%s\n",__func__);
//IO blocking task or CPU busy task
cout << "fib(30) = " << fib(30) << " tid = " << CurrentThread::tid() << endl;
((TcpConnection*)tcp)->send(str);
}
//fib is short for Fibonacci, fib is a CPU busy method
int EchoServer::fib(int n)
{
return (n == 1 || n == 2) ? 1 : (fib(n-1) + fib(n-2));
}
接着send函数会判断当前线程是不是Eventloop(用来io)的线程,如果是直接调用sendInLoop,否则将调用runInLoop。因为send是线程池中其中一个线程执行的,而io是主线程做的,所以会调用_pLoop->runInLoop
void EventLoop::runInLoop(Task &task)
{
printf("%s\n", __func__);
if (isInLoopThread())
{
task.doTask();
}
else
{
queueInLoop(task);
}
}
其中还会检验一次是不是io线程,是则执行任务,此时不是,则将执行queueInLoop
void EventLoop::queueInLoop(Task &task)
{
printf("%s\n", __func__);
{
MutexLockGuard guard(_mutex);
_pendingFunctors.push_back(task);
}
if (!isInLoopThread() || _callingPendingFunctors)
{
wakeup();
}
}
其中会将任务加入到一个vector中,再一次判断是不是io线程,此时不是,执行wakeup();姑且先不管_callingPendingFunctors。
void EventLoop::wakeup()
{
printf("%s\n", __func__);
uint64_t one = 1;
ssize_t n = ::write(_eventfd, &one, sizeof one);
if (n != sizeof one)
{
cout << "EventLoop::wakeup() writes " << n << " bytes instead of 8" << endl;
}
}
可见wakeup是写向一个_eventfd,而这个_eventfd是什么呢?
int EventLoop::createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
cout << "Failed in eventfd" << endl;
}
return evtfd;
}
可以看到linux的新api eventfd生成一个事件文件描述符,我们姑且不管这个描述符在其他领域的作用,此时它也是epoll可以选择去监听的(在EventLoop的构造函数中就调用了new Channel +enableReading )
而当我们执行write(_eventfd)的时候,如果epoll_wait没有返回, epoll_wait会唤醒并通知,并且调用相应的处理函数,这里是EventLoop::handleRead,其中仅仅是读取了刚刚wakeup发送的内容,其作用也就是让沉睡的loop唤醒而已。
void EventLoop::handleRead()
{
printf("%s\n", __func__);
uint64_t one = 1;
ssize_t n = ::read(_eventfd, &one, sizeof one);
if (n != sizeof one)
{
cout << "EventEventLoop::handleRead() reads " << n << " bytes instead of 8" << endl;
}
}
在loop内的for轮讯结束后,会调用一个函数doPendingFunctors。在这个函数里面会去执行刚刚我们放入任务vector内的任务,执行TcpConnection::run2其中调用sendInLoop,将消息发送给相应的客户端。在其中看到了刚才所见的_callingPendingFunctors,它的用意可能是希望在loop执行doPendingFunctors的时候调用weakup让下一轮的epoll_wait监听, if (!isInLoopThread() || _callingPendingFunctors)
这里我觉得这个_callingPendingFunctors就没有什么必要了,因为!isInLoopThread()==flase ,是io线程,在执行runInLoop就已经执行任务了,根本不会执行到这一步,而如果io线程非要直接调用queueInLoop来添加任务的话,或许有这个必要去执行weakup。
void EventLoop::doPendingFunctors()
{
printf("%s\n", __func__);
printf("tid - %d\n", CurrentThread::tid());
vector<Task> tempRuns;
_callingPendingFunctors = true;
{
MutexLockGuard guard(_mutex);
tempRuns.swap(_pendingFunctors);
}
vector<Task>::iterator it;
for (it = tempRuns.begin(); it != tempRuns.end(); ++it)
{
it->doTask();
}
_callingPendingFunctors = false;
}
接着我们看sendInLoop这个发送给客户消息的函数
void TcpConnection::sendInLoop(const string& message)
{
printf("%s\n",__func__);
int n = 0;
if(_outBuf.readableBytes() == 0)
{
n = ::write(_sockfd, message.c_str(), message.size());
if(n < 0)
cout << "write error" << endl;
if(n == static_cast<int>(message.size()))
{
Task task(this);
_pLoop->queueInLoop(task); //invoke onWriteComplate
}
}
if( n < static_cast<int>(message.size()))
{
_outBuf.append(message.substr(n, message.size()));
if(!_pSocketChannel->isWriting())
{
_pSocketChannel->enableWriting(); //add EPOLLOUT
}
}
}
这里是需要发送_outBuf+message
如果没发完将message放入_outBuf中,如果发完了再去Eventloop放一个任务,这个任务最终会执行TcpConnection::run0,再执行EchoServer::onWriteComplate,这里仅仅打印了函数名称。
历经千辛万苦,整次Echo完毕。
我积累到的经验:
1. 多线程异步执行任务的方法。
2.网络模型的解耦
3.阻塞队列BlockingQueue的使用方法。
计时器这里没讲,它这个版本的程序忽略了计时器。在前几个版本中有写,但我觉得它用的也不好,感兴趣可以去看原博主的源码。
voidccc/mini-muduo
我稍微修改过一点点的
adlternative/mini-muduo-learn