CAS等原子操作
在介绍无锁队列之前,我们需要知道一个重要的技术就是CAS操作,现在几乎所有的CPU指令都支持CAS等原子操作。有了这个原子操作,我们就可以用其来实现各种无锁的数据结构了。
- 本文用到的是GCC的CAS
GCC4.1+版本中支持CAS的原子操作
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
bool __sync_bool_compare_and_swap (type* ptr, type oldval type newval, …)
type __sync_val_compare_and_swap (type* ptr, type oldval type newval, …)
这两个函数提供原子的比较和交换,如果ptr == oldval,就将newval写入ptr,
第一个函数在相等并写入的情况下返回true.
第二个函数在返回操作之前的值。
- C++中的CAS
c++11中的STL中的atomic类的函数可以让你跨平台
template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,
T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,
T* expected, T desired );
无锁队列的链表实现
参考文章
这篇文章讲的非常详细,大家进去浏览完无锁队列如何一步一步进行实现,然后再出来看最终实现的完整版。
- 代码最终实现
#include <atomic>
#include <memory>
#include <vector>
template <typename T> class LockFreeQueue
{
// 用链表实现LockFreeQueue Node定义节点的类型
private:
struct Node
{
T data_;
Node *next_;
Node() : data_(), next_(nullptr) {
}
Node(T &data) : data_(data), next_(nullptr) {
}
Node(const T &data) : data_(data), next_(nullptr) {
}
};
private:
Node *head_, *tail_;
public:
LockFreeQueue() : head_(new Node()), tail_(head_) {
}
~LockFreeQueue()
{
Node *tmp;
while (head_ != nullptr)
{
tmp = head_;
head_ = head_->next_;
delete tmp;
}
tmp = nullptr;
tail_ = nullptr;
}
bool Try_Dequeue(T &data)
{
Node *old_head, *old_tail, *first_node;
while(true)
{
// 取出头指针,尾指针,和第一个元素的指针
old_head = head_;
old_tail = tail_; // 前面两步的顺序非常重要,一定要在获取old_tail之前获取old_head
// 保证old_head_不落后于old_tail
first_node = old_head->next_;
// 下面的工作全是在确保上面三个指针的“一致性”
// 1. 保证head_的一致性
/*
判断head_指针是否已经移动
如果head已经被改变我们需要重新再去获取它
*/
if (old_head != head_)
{
continue;
}
// 2. 保证尾指针落后于头指针 (尾指针与全局尾指针部分一致性保持)
// 空队列或者全局尾指针落后
// 落后是指其他线程的已经更新,而当前线程没有更新
if (old_head == old_tail)
{
//这里是队列为空,返回false
if (first_node == nullptr)
{
return false;
}
// 证明全局尾指针没有被更新,尝试更新一下 “主动操作”
//尾指针落后了,所以我们要更新尾指针,然后在循环一次
::__sync_bool_compare_and_swap(&tail_, old_tail, first_node);
continue;
}
else // 前面的操作都是在确保全局尾指针在全局头指针之后,只有这样才能安全的删除数据
{
// 在CAS前先取出数据,防止其他线程dequeue造成数据的缺失
data = first_node->data_;
// 在取出数据后,如果head指针没有落后,移动 old_head 指针成功则退出
if (::__sync_bool_compare_and_swap(&head_, old_head, first_node))
{
break;
}
}
}
delete old_head;
return true;
}
void Enqueue(const T &data)
{
Node *enqueue_node = new Node(data);
Node *old_tail, *old_tail_next;
while(true)
{
//先取一下尾指针和尾指针的next
old_tail = tail_;
old_tail_next = old_tail->next_;
//如果尾指针已经被移动了,则重新开始
if (old_tail != tail_)
{
continue;
}
// 判断尾指针是否指向最后一个节点
if (old_tail_next == nullptr)
{
if (::__sync_bool_compare_and_swap(&(old_tail->next_), old_tail_next,
enqueue_node))
{
break;
}
}
else
{
// 全局尾指针不是指向最后一个节点,就把全局尾指针向后移动
// 全局尾指针不是指向最后一个节点,发生在其他线程已经完成节点添加操作,
// 但是并没有更新最后一个节点,此时,当前线程的(tail_和old_tail是相等的,)
// 可以更新全局尾指针为old_tail_next,如果其他线程不更新全局尾指针,
// 那么当前线程会不断移动,直到 old_tail_next == nullptr 为true
// 为什么这里不直接Continue 原因是:
// 如果其他线程添加了节点,但是并没有更新
// 全局尾节点,就会导致所有的线程原地循环等待,所以每一个线程必须要有一些
// “主动的操作” 去获取尾节点,这种思想在 dequeue的时候也有体现
::__sync_bool_compare_and_swap(&(tail_), old_tail, old_tail_next);
continue;
}
}
// 重置尾节点, 也有可能已经被别的线程重置,那么当前线程就不用管了
/*
不用管是因为我们已经将节点成功添加,每一次向队列中加入数据的时候都要保证将tail指针移动到最尾端
*/
::__sync_bool_compare_and_swap(&tail_, old_tail, enqueue_node);
}
};
写个测试程序验证一下我们的无锁队列是否正确
#include<vector>
#include<unistd.h>
#include<iostream>
#include<thread>
LockFreeQueue<int> q;
void func1()
{
std::vector<int> vec{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
for(int i = 0; i < vec.size(); ++i)
q.Enqueue(vec[i]);
}
void func2()
{
std::vector<int> vec{
11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
for(int i = 0; i < vec.size(); ++i)
{
q.Enqueue(vec[i]);
}
}
void func3()
{
int a;
for(int i = 0; i < 10; ++i)
{
q.Try_Dequeue(a);
std::cout << "func3 a = " << a << "\n";
}
}
void func4()
{
int a;
for(int i = 0; i < 10; ++i)
{
q.Try_Dequeue(a);
std::cout << "func4 a = " << a << "\n";
}
}
int main()
{
std::thread t1(func1);
std::thread t2(func2);
std::thread t3(func3);
std::thread t4(func4);
t1.join();
t2.join();
t3.join();
t4.join();
}
运行结果如图所示
读取数据没有出错。