协程实现原理
协程的本质都是通过修改 ESP 和 EIP 指针来实现的。其理论上还是单线程在运行.
程序在CPU上运行时依赖3个寄存器:
- ESP寄存值指向当前栈顶地址,指向当前指令需要的数据
- EBP指向当前活动栈帧的基地址
- 指令寄存器IP,指向当前需要运行的指令
其中主要有(IP,ESP)寄存器最重要,这两个寄存器指针的改变可以修改当前需要加载到 CPU 运行的指令和数据,当某个操作陷入到耗时的等待中时,通过修改这两个指针即可让出CPU,交给其他的任务去使用,每个任务都必须主动的让出CPU,然后等待下一次的调度来继续未完成的任务,这样就可以最大程度的利用CPU,当一个任务等待过程非常短的时候,就出现了多个任务并行运行的效果,也就是协程。
实现协程的多种方法
- 利用
glibc
的ucontext
组件(云风的库) - 使用汇编来切换上下文(实现miniC协程,腾讯libco)
- 利用C语言语法
switch-case
的奇淫技巧来实现(Protothreads) - 利用了 C 语言的
setjmp
和longjmp
( 一种协程的 C/C++ 实现,要求函数里面使用 static local 的变量来保存协程内部的数据) - 利用
Boost
库提供的两种实现,分别是stackless
与stackfull
- 等等语言和库....
当然了,最"厉害"的还是去用汇编去实现,因为这样会避免许多跨平台等问题.比如在有些平台就会用不了setcontext
等函数.有些函数可能也慢慢的废除了,所以用汇编实现是最有效的,也是腾讯公司经过多年的实践得出的.
glibc
库里的上下文操作函数:
1.getcontext() : 获取当前context
2.setcontext() : 切换到指定context
3.makecontext() : 设置 函数指针 和 堆栈 到对应context保存的 sp 和 pc 寄存器中,
调用之前要先调用 getcontext()
4.swapcontext() : 保存当前context,并且切换到指定context
具体使用可以自己查询一下,最后我会用他们实现一个C++11
协程库.
如何实现保存和恢复协程运行栈
实例:使用C++11
和setcontext
类函数实现协程库
main.cpp
#include "Coroutine.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
using namespace std;
using namespace Tattoo;
class TEST
{
public:
void func3(std::shared_ptr<CoroutineSchedule> s, void *arg)
{
for (int i = 0; i < 10; i++)
{
cout << "coroutine : " << s->GetCurCoID() << " : " << *(int *)arg + i << endl;
s->Yield();
}
}
};
int main()
{
std::shared_ptr<CoroutineSchedule> schedule = std::make_shared<CoroutineSchedule>();
int test1 = 1;
int test2 = 2;
int test3 = 3;
TEST tt;
int id_co1 = schedule->CreateCoroutine(std::bind(&TEST::func3, &tt, schedule, &test1));
int id_co2 = schedule->CreateCoroutine(std::bind(&TEST::func3, &tt, schedule, &test2));
int id_co3 = schedule->CreateCoroutine(std::bind(&TEST::func3, &tt, schedule, &test3));
printf("main start\n");
while ((schedule->IsAlive(id_co1)) && (schedule->IsAlive(id_co2)) && (schedule->IsAlive(id_co3)))
{
schedule->ResumeCoroutine(id_co1);
schedule->ResumeCoroutine(id_co2);
schedule->ResumeCoroutine(id_co3);
}
printf("main end\n");
return 0;
}
CoroutineSchedule.h
#ifndef _COROUTINESCHEDULER_H
#define _COROUTINESCHEDULER_H //B.h
#include <ucontext.h>
#include <unordered_map>
#include <memory>
#include <functional>
#include <iostream>
#define INFO(x, y, z) std::cout << x << " : " << y << " : " << z << std::endl;
namespace Tattoo
{
class Coroutine;
class CoroutineSchedule
{
public:
static const int STACK_SIZE = 1024 * 1024;
static const int MAX_CO = 16;
using CoFun = std::function<void()>;
CoroutineSchedule() : cur_co_num_(0), cur_run_id_(-1) {}
~CoroutineSchedule() {}
CoroutineSchedule(const CoroutineSchedule &) = delete;
CoroutineSchedule &operator=(const CoroutineSchedule &) = delete;
int CreateCoroutine(CoFun func);
void DestroyCroutine(int cor_id);
void ResumeCoroutine(int cor_id);
void Yield();
bool IsAlive(int cor_id);
int GetCurCoID() { return cur_run_id_; }
static void static_fun(void *arg);
private:
std::unordered_map<int, std::shared_ptr<Coroutine>> mmap_;
char SchStack[STACK_SIZE] = {0};
ucontext_t main_ctx;
int cur_co_num_; /*实时记录协程数量,也会控制map下标*/
int cur_run_id_;
};
} // namespace Tattoo
#endif
CoroutineSchedule.cpp
#include "Coroutine.h"
namespace Tattoo
{
int CoroutineSchedule::CreateCoroutine(CoFun func)
{
/*这里可以用 cur_max_num_ 去限制协程数目*/
cur_co_num_++;
auto cor = std::make_shared<Coroutine>(this, func, cur_co_num_);
mmap_[cur_co_num_] = cor;
return cur_co_num_;
}
void CoroutineSchedule::DestroyCroutine(int cor_id)
{
if (mmap_.find(cor_id) == mmap_.end())
return;
if (cor_id == cur_run_id_)
cur_run_id_ = -1;
mmap_[cor_id]->SetStatus(Coroutine::CO_FINSHED);
cur_co_num_--; /*下次重复使用就行了,不需要 erase */
}
void CoroutineSchedule::Yield()
{
if (-1 == cur_run_id_)
return;
int id = cur_run_id_;
std::shared_ptr<Coroutine> cor = mmap_[id];
// assert(reinterpret_cast<char *>(&cor) > );
cor->save_stack(SchStack + STACK_SIZE);
cor->SetStatus(Coroutine::CO_SUSPEND);
cur_run_id_ = -1;
swapcontext(&cor->ctx_, &main_ctx);
}
bool CoroutineSchedule::IsAlive(int cor_id)
{
if (mmap_[cor_id]->GetStatus() == Coroutine::CO_FINSHED)
return false;
else
return true;
}
void CoroutineSchedule::ResumeCoroutine(int cor_id)
{
// INFO("*******************************************ResumeCoroutine", cor_id, cur_run_id_);
if (mmap_.find(cor_id) == mmap_.end()) /*不存在的 id */
return;
assert(this->cur_run_id_ == -1); //保证没有其他协程运行
std::shared_ptr<Coroutine> cor = mmap_[cor_id];
switch (cor->GetStatus())
{
case Coroutine::CO_READY:
{
getcontext(&cor->ctx_);
cor->ctx_.uc_stack.ss_sp = SchStack;
cor->ctx_.uc_stack.ss_size = STACK_SIZE;
cor->ctx_.uc_link = &main_ctx;
cur_run_id_ = cor_id;
cor->SetStatus(Coroutine::CO_RUNNING);
makecontext(&cor->ctx_, reinterpret_cast<void (*)()>(static_fun), 1, this);
swapcontext(&this->main_ctx, &cor->ctx_);
}
break;
case Coroutine::CO_SUSPEND:
{
memcpy(SchStack + STACK_SIZE - cor->stack_cur_size_, cor->CorStack, cor->stack_cur_size_);
cur_run_id_ = cor_id;
cor->SetStatus(Coroutine::CO_RUNNING);
swapcontext(&main_ctx, &cor->ctx_);
}
break;
default:
assert(0);
}
if (-1 == cur_run_id_ && cor->status_ == Coroutine::CO_FINSHED)
DestroyCroutine(cor_id);
}
void CoroutineSchedule::static_fun(void *arg)
{
CoroutineSchedule *sch = reinterpret_cast<CoroutineSchedule *>(arg);
int id = sch->cur_run_id_;
std::shared_ptr<Coroutine> cor = sch->mmap_[id];
cor->func_();
sch->cur_run_id_ = -1;
sch->mmap_[id]->SetStatus(Coroutine::CO_FINSHED);
}
} // namespace Tattoo
Coroutine.h
#ifndef _COROUTINE_H
#define _COROUTINE_H
#include <ucontext.h>
#include <memory>
#include <assert.h>
#include <string.h>
#include "CoroutineSchedule.h"
namespace Tattoo
{
class Coroutine
{
public:
using CoFun = std::function<void()>;
enum
{
CO_FINSHED,
CO_READY,
CO_RUNNING,
CO_SUSPEND,
};
Coroutine(CoroutineSchedule *sch, CoFun func, int id);
~Coroutine();
Coroutine(const Coroutine &) = delete;
Coroutine &operator=(const Coroutine &) = delete;
void SetStatus(int status);
int GetStatus();
private:
friend class CoroutineSchedule;
void save_stack(void *top);
CoFun func_;
ucontext_t ctx_;
std::shared_ptr<CoroutineSchedule> sch_;
std::ptrdiff_t stack_max_size_; //stack最大 大小
std::ptrdiff_t stack_cur_size_; //协程实际大小
int status_;
char *CorStack;
int id_;
};
} // namespace Tattoo
#endif
Coroutine.cpp
#include <ucontext.h>
#include <memory>
#include <assert.h>
#include <string.h>
#include "Coroutine.h"
namespace Tattoo
{
Coroutine::Coroutine(CoroutineSchedule *sch, CoFun func, int id)
: func_(func), id_(id), sch_(sch),
stack_max_size_(0), stack_cur_size_(0),
status_(CO_READY), CorStack(0)
{
}
Coroutine::~Coroutine()
{
delete[] CorStack;
}
void Coroutine::SetStatus(int status)
{
status_ = status;
}
int Coroutine::GetStatus()
{
return status_;
}
void Coroutine::save_stack(void *top)
{
char dummy = 0;
assert((char *)top - &dummy <= CoroutineSchedule::STACK_SIZE);
if (stack_max_size_ < (char *)top - &dummy)
{
delete[] CorStack;
stack_max_size_ = (char *)top - &dummy;
CorStack = new char[stack_max_size_];
}
stack_cur_size_ = (char *)top - &dummy;
memcpy(CorStack, &dummy, stack_cur_size_);
}
} // namespace Tattoo
运行截图:
因为我的目标是用汇编去实现,所以也就不花时间去优化代码了(比如里面还存在原始指针,new,delete ,内存泄露等).