前言
在写c/c++的过程中遇到死锁常常会让我们手无足错,因为死锁往往不像其他类型的错误一样会在终端直接打印报错,因此难以被发现并需要大量精力去排错。是时候思考,该如何防范或者避免死锁。
死锁场景
-
死锁场景1
- 在类里面或者全局无意识的使用过多的锁,并在函数中调用这些锁的时候未注意到锁的次序。如一个线程执行函数fun1时锁的顺序是A->B->C,另一个线程执行fun2时锁的顺序是C->B->A ,除非能保证fun1,fun2不会同时执行,否则及其容易发生死锁。
-
解决途径1
- 如果使用了过多的锁,多半是程序设计出了问题,忠告:能少用锁尽量少用锁
- A B C依次上锁, 就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,总在互斥量C之前锁住互斥量BA,就永远不会死锁。
-
死锁场景2(如下图)
- 在c++的 swap两个对象中依次加锁
表面上看是用俩个锁屏障来依次加锁,是没什么问题的,但如果我们有个线程是swap(A,B),另个线程同时是swap(B,A),想象一下后果:一个线程锁了A.mutex,请求B.mutex,另一个线程锁了B.mutex,请求A.mutex。是的,这里出现了死锁。
那我们该如何解决这个问题呢???
- 在c++的 swap两个对象中依次加锁
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){
}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock_guard<std::mutex> lock_a(lhs.m); // 2
std::lock_guard<std::mutex> lock_b(rhs.m); // 3
swap(lhs.some_detail,rhs.some_detail);
}
};
- 解决途径2
std::lock
——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。std::lock
如果在上锁的过程中有一个锁是无法锁成功的,函数会unlock所有的锁。内部是用try_lock实现的。
可以看到下面这样调用就可以解决swap上的死锁。(adapt_lock是让std::lock_guard拥有锁而不在构造时上锁的作用,std::lock_guard在函数结束后析构时解开锁)
// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){
}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m); // 1
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3
swap(lhs.some_detail,rhs.some_detail);
}
};
- 死锁场景3(如下图)
- 在持有锁时调用用户提供的代码。用户需求是在这个在多线程中为a_simple_vector插入一个数,且这个数不存在于它所拥有的vector中。由于在插入的过程中必须要检验一下vector是否有重复的数(这算为外界用户准备的一个接口),然而在 add_to_vec这个函数中错误的使用了a_user_func,哈!又死锁了,这次还是自己锁了俩次而导致的死锁。(不能用嵌套锁)
class a_simple_vector {
public:
void add_to_vec(int a) {
std::lock_guard<std::mutex> lg(m);
if (!a_user_func(a))
v.push_back(a);
}
bool a_user_func(int a) {
std::lock_guard<std::mutex> lg(m);
for (auto &&i : v) {
if (i == a) {
printf("非法\n");
return -1;
}
}
return 0;
}
private:
std::mutex m;
vector<int> v;
};
- 解决途径3
- 不要使用用户代码或者提供新的方法:不加锁的原用户代码如 a_user_func_need_lock。这样就不会重复上锁了。
class a_simple_vector {
public:
void add_to_vec(int a) {
std::lock_guard<std::mutex> lg(m);
a_user_func_need_lock(a);
v.push_back(a);
}
private:
bool a_user_func_need_lock(int a) {
for (auto &&i : v) {
if (i == a) {
printf("非法\n");
return -1;
}
}
return 0;
}
public:
bool a_user_func(int a) {
std::lock_guard<std::mutex> lg(m);
for (auto &&i : v) {
if (i == a) {
printf("非法\n");
return -1;
}
}
return 0;
}
private:
std::mutex m;
vector<int> v;
};
-
死锁场景4
- 在一个多线程的链表中删除一个节点,比如ABC三个节点中删除B,每个节点都需要维护一个锁,因为我们在删除节点的时候需要避免本节点和前后节点的位置或其next,prev域被修改,在拿到B的锁的之后,我们必须去锁住AC。而在遍历的过程中,先锁住A,再去锁住B。这同样有死锁产生。
-
解决途径4
- 再次利用
std::lock()
,
del节点函数按照std::lock(b.prev.m,b.m)
的顺序请求锁,遍历的过程此时只允许从前向后,travel函数中如果拥有了A锁,此时del是不能拿到A锁的,或者del同时有了AB两锁,travel是拿不到A锁的。相同的请求顺序避免了死锁问题,在del拿到俩把锁之后再去请求C锁也就顺理成章了,当然即使其他线程拥有这个C,只要是接口统一从前向后锁定的del或者traval或者insert或者push_back,都是不会出现死锁等待的。(代码其实该写一下试一下)
- 再次利用
-
死锁场景5(如下图)
- 一个清单类和一个请求类,可以想想成一对多的场景,生成一个Request对象并把它加入到Inventory对象中必须先加Request.mutex,再加Inventory.mutex,而
Inventory遍历所有Request是先加Inventory.mutex,再加Request.mutex,删除一个Request对象并把它从Inventory对象中删除必须先加Request.mutex,再加Inventory.mutex。可以看出增删Request对象的加锁顺序和Inventory遍历打印每个Request的加锁顺序是相反的。因此,这里是可能会出现死锁的。
- 一个清单类和一个请求类,可以想想成一对多的场景,生成一个Request对象并把它加入到Inventory对象中必须先加Request.mutex,再加Inventory.mutex,而
-
解决途径5
- 将Request析构函数中的加锁和从g_inventory删除俩条语句交换位置,类似于之前的以相同顺序上锁。
以下两种可以说是在调整程序结构的方式去规避死锁。 - 如下图在printAll中将requests复制一份,在副本上加锁遍历,将加锁Request和加锁Inventory的过程分开。
- 写时复制。上面的说法其实算一种读时复制。同样
可以通过智能指针的方式在Inventory插入或者删除时
为其中的容器的智能指针reset一份新副本。而遍历容器的读方printAll在旧的requests_中遍历(通过局部智能指针拷贝),这个旧requests_在printAll结束后这个局部智能指针析构时释放。详见muduo库RequestInventory_test2.cc
- 将Request析构函数中的加锁和从g_inventory删除俩条语句交换位置,类似于之前的以相同顺序上锁。
class Inventory
{
public:
void add(Request* req)
{
muduo::MutexLockGuard lock(mutex_);
requests_.insert(req);
}
void remove(Request* req) __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
requests_.erase(req);
}
void printAll() const;
private:
mutable muduo::MutexLock mutex_;
std::set<Request*> requests_;
};
Inventory g_inventory;
class Request
{
public:
void process() // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
g_inventory.add(this);
// ...
}
~Request() __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
sleep(1);
g_inventory.remove(this);
}
void print() const __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
// ...
}
private:
mutable muduo::MutexLock mutex_;
};
```cpp
void Inventory::printAll() const
{
muduo::MutexLockGuard lock(mutex_);
sleep(1);
for (std::set<Request*>::const_iterator it = requests_.begin(); it != requests_.end(); ++it)
{
(*it)->print();
}
printf("Inventory::printAll() unlocked\n");
}
void threadFunc()
{
Request* req = new Request;
req->process();
delete req; //~Request()
}
int main()
{
muduo::Thread thread(threadFunc);
thread.start();
usleep(500 * 1000);
g_inventory.printAll();
thread.join();
}
解决方案
void Inventory::printAll() const
{
std::set<Requests*>requests;
{
muduo::MutexLockGuard lock(mutex_);
requests=requests_;
}
for (std::set<Request*>::const_iterator it = requests.begin(); it != requests.end(); ++it)
{
(*it)->print();
}
printf("Inventory::printAll() unlocked\n");
}
层次锁
至于如何有顺序的加锁,在《c++编程实战》上就提供了层次锁的方案:通过thread_local来隔离各自线程中锁的默认权值,在每一次加锁都逐渐降低本线程的权值,并保证每一次的锁的权值都比上一次小,否则抛出异常的方式,如果我们误添加了比当前线程更高的权值,会先发生异常结束程序,而非把检查死锁的任务拖到发生死锁时再去一步步查验。
#include <mutex>
#include <stdexcept>
#include <climits>
class hierarchical_mutex {
std::mutex internal_mutex;
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;
static thread_local unsigned long this_thread_hierarchy_value;
void check_for_hierarchy_violation() {
if (this_thread_hierarchy_value <= hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy_value() {
previous_hierarchy_value = this_thread_hierarchy_value;
this_thread_hierarchy_value = hierarchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value) :
hierarchy_value(value),
previous_hierarchy_value(0) {
}
void lock() {
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}
void unlock() {
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock();
}
bool try_lock() {
check_for_hierarchy_violation();
if (!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
int main() {
hierarchical_mutex m1(42);
hierarchical_mutex m2(2000);
}
总结
- 避免嵌套锁(可能会隐藏代码的一些问题,如果在外层函数加锁执行遍历,在内层函数加锁并执行push_back会导致迭代器失效,不使用嵌套锁可以这种错误暴露出来)
- 避免在持有锁时调用用户提供的代码
- 使用固定顺序获取锁
- 使用锁的层次结构
除此之外
慎用trylock,避免程序串行化(但可以使用它去避免一些死锁)
今天的死锁的就讲到这,小伙伴们三连哦!
参考
《c++编程并发实战》
《Linux多线程服务器编程 》