引言
C++11中引入了一个异步编程的利器std::future(虽然javaGDK5中就已经引入了),这显然使我们的异步编程的过程变得即简介又易维护,但学习不能仅停留在知道它是干什么的的阶段,显然正确的使用时机也是我们需要学习的,但是总感觉对其使用有点不得心应手,遂决定简单的看看源码,看看其是如何实现的.
猜测
在看源码之前 首先自己想一想这个过程当然也是极好的 那我们一起来捋一捋吧 其实过程想来简单 就是在线程结束时以一种方式异步通知当前持有std::future的唯一线程(std::future支持移动不支持复制), 怎么做呢,我当时的想法是这样的
1.条件变量
future对象中设置一个条件变量 在异步线程结束时调用notify_one(),在get()函数中调用wait(),这样可以实现一个简单的异步调用,缺点是需要互斥量,条件变量,一个仅发生一次的过程这样不免有些浪费,wait()操作更是需要加锁解锁,也就是说这样一个完整的过程我们需要加锁解锁各两次,还需要一个notify_one(),但优点也很明显,过程简单,且如果等待时间较长,可以把cpu让给其他工作线程,全局上节省的时间随等待时间加长而变长,但等待时间短的话除了完成功能就没有丝毫优势了.
2. 自旋锁
自旋锁显然也可以解决这个问题,我们只需要设置一个类内的原子变量,当异步线程结束后改变值,然后get()成员自旋等待即可,这种方法优点与缺点都是很明显的,优点:比起条件变量这样笨重的东西确实轻盈了不少,且在等待时间较小时不存在条件变量所需要的用户态与内核态之间的转换. 缺点:当等待时间较长的时候cpu空转,无意义的消耗.当然这是自旋锁本身的弊端.
源码分析
首先解释下以下源码的分析并不是逐字逐句精雕细琢,只是为了搞清楚future的过程,从而能更好的使用
先说结论 future中使用的是自旋锁实现异步 但巧妙的使用了一个系统调用
所以我们直接来看get()
_Res
get()
{
typename _Base_type::_Reset __reset(*this);
return std::move(this->_M_get_result()._M_value());
}
返回值的_Res是future模板参数,就是我们线程返回值的类型
我们再来看看_M_get_result()
/// Wait for the state to be ready and rethrow any stored exception
__result_type
_M_get_result() const
{
_State_base::_S_check(_M_state);
_Result_base& __res = _M_state->wait();
if (!(__res._M_error == 0))
rethrow_exception(__res._M_error);
return static_cast<__result_type>(__res);
}
_M_state
typedef shared_ptr<_State_base> __state_type;
.....
private:
__state_type _M_state;
.....
using _State_base = _State_baseV2;
只写上一些重要的
class _State_baseV2
{
typedef _Ptr<_Result_base> _Ptr_type;
enum _Status : unsigned {
__not_ready,
__ready
};
_Ptr_type _M_result;
__atomic_futex_unsigned<> _M_status;
atomic_flag _M_retrieved = ATOMIC_FLAG_INIT;
......
}
我们回到_M_get_result() 首先来看看第一句
_State_base::_S_check(_M_state);
..........
template<typename _Tp>
static void
_S_check(const shared_ptr<_Tp>& __p)
{
if (!static_cast<bool>(__p))
__throw_future_error((int)future_errc::no_state);
}
把智能指针强转bool false的话抛错,而强转为false只能是指针为空,
第二句显然是future的核心部分 我们剩下的篇幅来说这个
先说后面
我们继续第三句 因为future是可以存储异常的,所以第三句是检测异步线程中是否抛出异常
第四句就是返回值 没搞明白为什么直接强转
开始着重分析wait()
_Result_base&
wait()
{
// Run any deferred function or join any asynchronous thread:
_M_complete_async();
// Acquire MO makes sure this synchronizes with the thread that made
// the future ready.
_M_status._M_load_when_equal(_Status::__ready, memory_order_acquire);
return *_M_result;
}
我们可以看到首先执行异步线程
virtual void _M_complete_async() { _M_join(); }
......
void _M_join() { std::call_once(_M_once, &thread::join, ref(_M_thread)); }
我们知道call_once的作用就是可以多次被执行,但只执行一次(可以拿来写单例),这也正是future的使用场景,相对应一个std::once_flag,也就是call_once的第一个参数, _M_complete_async的作用就是无论调用多少次,只执行一次join
我们再来看看_M_status
__atomic_futex_unsigned<> _M_status;
.......
template <unsigned _Waiter_bit = 0x80000000>
class __atomic_futex_unsigned : __atomic_futex_unsigned_base
{
typedef chrono::system_clock __clock_t;
// This must be lock-free and at offset 0.
atomic<unsigned> _M_data;
......
}
然后我们进入到_M_load_when_equal(_Status::__ready, memory_order_acquire);
ps::看函数名就能感觉到这是一个自旋锁
_GLIBCXX_ALWAYS_INLINE void
_M_load_when_equal(unsigned __val, memory_order __mo)
{
unsigned __i = _M_load(__mo);
if ((__i & ~_Waiter_bit) == __val)
return;
// TODO Spin-wait first.
_M_load_and_test(__i, __val, true, __mo);
}
......
_GLIBCXX_ALWAYS_INLINE unsigned
_M_load(memory_order __mo)
{
return _M_data.load(__mo) & ~_Waiter_bit;
}
我们可以看到首先直接load一下 内存序使用的是memory_order_acquire,然后我们由_M_data的注释可知值为零,而_Waiter_bit是一个模板默认参数,值为0x80000000,所以如果异步线程已经结束,值已经被设置,那么返回一个大于0的值,否则返回零,所以如果异步线程已经结束的话直接返回,否则自旋等待,其实也可以直接进入自旋,我的理解是这样效率会高,毕竟如果值已经返回,就没必要多好几层函数调用和在_M_load_and_test_until进行一次无意义的循环
我们接着看
unsigned
_M_load_and_test(unsigned __assumed, unsigned __operand,
bool __equal, memory_order __mo)
{
return _M_load_and_test_until(__assumed, __operand, __equal, __mo,
false, {}, {});
}
// If a timeout occurs, returns a current value after the timeout;
// otherwise, returns the operand's value if equal is true or a different
// value if equal is false.
// The assumed value is the caller's assumption about the current value
// when making the call.
unsigned
_M_load_and_test_until(unsigned __assumed, unsigned __operand,
bool __equal, memory_order __mo, bool __has_timeout,
chrono::seconds __s, chrono::nanoseconds __ns)
{
for (;;)
{
// Don't bother checking the value again because we expect the caller
// to have done it recently.
// memory_order_relaxed is sufficient because we can rely on just the
// modification order (store_notify uses an atomic RMW operation too),
// and the futex syscalls synchronize between themselves.
_M_data.fetch_or(_Waiter_bit, memory_order_relaxed);
bool __ret = _M_futex_wait_until((unsigned*)(void*)&_M_data,
__assumed | _Waiter_bit,
__has_timeout, __s, __ns);
// Fetch the current value after waiting (clears _Waiter_bit).
__assumed = _M_load(__mo);
if (!__ret || ((__operand == __assumed) == __equal))
return __assumed;
// TODO adapt wait time
}
}
首先我们说说参数,第一个__assumed,是现有的值,__operand是期望的值,__equal是一个为了保证代码复用的一个功能参数,__mo是内存序,__has_timeout是是否有超时,后面两个是超时参数,在get中为默认值,
我们进入循环
首先引入眼帘的是fetch_or 这显然是这一类函数的教科书使用,原子操作中fetch系函数相比与操作符最大的优点就是可以自由设置内存序,避免不必要的同步,在这里load使用acquire,而异步线程中store使用release,这里就算使用memory_order_relaxed也是ok的,
然后是一个等待
// Returns false iff a timeout occurred.
bool
_M_futex_wait_until(unsigned *__addr, unsigned __val, bool __has_timeout,
chrono::seconds __s, chrono::nanoseconds __ns);
函数声明告诉了我们这个函数的意思,即超时后返回false
bool
__atomic_futex_unsigned_base::_M_futex_wait_until(unsigned *__addr,
unsigned __val,
bool __has_timeout, chrono::seconds __s, chrono::nanoseconds __ns)
{
if (!__has_timeout)
{
// Ignore whether we actually succeeded to block because at worst,
// we will fall back to spin-waiting. The only thing we could do
// here on errors is abort.
int ret __attribute__((unused));
ret = syscall (SYS_futex, __addr, futex_wait_op, __val, nullptr);
_GLIBCXX_DEBUG_ASSERT(ret == 0 || errno == EINTR || errno == EAGAIN);
return true;//我们可以看到如果不设置超时的话仅返回true
}
else
{
struct timeval tv;
gettimeofday (&tv, NULL);
// Convert the absolute timeout value to a relative timeout
struct timespec rt;
rt.tv_sec = __s.count() - tv.tv_sec;
rt.tv_nsec = __ns.count() - tv.tv_usec * 1000;
if (rt.tv_nsec < 0)
{
rt.tv_nsec += 1000000000;
--rt.tv_sec;
}
// Did we already time out?
if (rt.tv_sec < 0)
return false;
if (syscall (SYS_futex, __addr, futex_wait_op, __val, &rt) == -1)
{
_GLIBCXX_DEBUG_ASSERT(errno == EINTR || errno == EAGAIN
|| errno == ETIMEDOUT);
if (errno == ETIMEDOUT)
return false;
}
return true;
}
}
我们从代码可以看到在未设置超时的时候 _M_futex_wait_until唯一的动作就是执行一个系统调用SYS_futex,这个系统调用是什么呢,我们通过查阅资料得到下面这一段代码
static int
futex(int *uaddr, int futex_op, int val,
const struct timespec *timeout, int *uaddr2, int val3)
{
return syscall(SYS_futex, uaddr, futex_op, val,
timeout, uaddr, val3);
}
futex是干什么的呢 通过查询man手册我们知道
The futex() system call provides a method for waiting until a certain
condition becomes true. It is typically used as a blocking construct
in the context of shared-memory synchronization. When using futexes,
the majority of the synchronization operations are performed in user
space. A user-space program employs the futex() system call only
when it is likely that the program has to block for a longer time
until the condition becomes true. Other futex() operations can be
used to wake any processes or threads waiting for a particular
condition.
futex系统调用提供了一种方法等待一个特定的条件直到其为真,其代表性
的使用就是被使用为一个阻塞结构在共享内存的同步中,当使用futexes时,大多
数的同步操作被执行在用户空间,一个用户空间的程序使用futex系统调用的时机
是仅当你的程序不得不阻塞很长时间直到一个条件为真的时候,其他的futex
使用可以被用于唤醒一个正在等待一个特定条件进程或线程.
然后我们可以看到对当前值已acquire的内存序进行laod,然后是一个if判读 如果超时的话返回或者当当前值等于期望值时返回 否则继续自旋.
结论
至此 我们看源码的目的已经达到了,就是搞懂future的过程,future的异步实现过程其实就是一个acomic< unsigned>的原子对象,然后自旋等待异步线程结束 虽然是自旋转 但还是会进行sys_futex系统调用 这就是future的精髓所在 既没有条件变量所带来的开销 又有了自旋锁所带来的轻便 future的过程我们就分析到这里
参考:
http://man7.org/linux/man-pages/man2/futex.2.html
https://code.woboq.org/gcc/libstdc+±v3/src/c++11/futex.cc.html
https://www2.cs.duke.edu/acm-docs/gcc/libstdc+±api-html/a01286_source.html