Lab8 Lock
重新编写代码进行锁优化,提高在机器上的并发性。
Memory allocator (moderate)
通过拆分kmem
中的空闲链表,降低kalloc
的kmem
锁竞争。
原理与分析
在原kalloc
的实现中,是直接通过使用freelist
这个链表,将空闲物理页本身直接用作链表项,然后连接成一个链表,这样可以不使用额外的空间,在分配的时候,将分配后的物理页从该链表中删除,回收时将该物理页放回链表中。
但是这样无论是分配物理页还是回收物理页,都将操作freelist
这个链表,而修改不是一个单步操作,所以说,为了保证多线程的正确性,必须加锁,但是加锁势必会导致多线程无法并发的申请内存,降低并发效率。
而锁优化大抵是有俩种思路,其一是,资源只在必须共享的时候共享,其二是,尽量缩短在关键区中停留的时间。
上述俩种思路就对应该实验的解决思路,首先,将资源从CPU
共享拆分为每个CPU
核共享,然后,大锁化小锁,降低锁的粒度。
所以说,我们将为每个CPU
核分配独立的freelist
链表,与其同时当某一个CPU
核的freelist
的空闲页不够时,我们将从其他CPU
核中偷取一部分freelist
作为新的空闲页,而这个过程就涉及到加锁了。
代码实现
struct
{
struct spinlock lock;
struct run *freelist;
} kmem[NCPU]; // 拆分 freelist
char *cpu_lock_name[] = {
"kmem_lock_0",
"kmem_lock_1",
"kmem_lock_2",
"kmem_lock_3",
"kmem_lock_4",
"kmem_lock_5",
"kmem_lock_6",
"kmem_lock_7",
}; // 锁名
void kinit()
{
for (int i = 0; i < NCPU; i++)
{
initlock(&kmem[i].lock, cpu_lock_name[i]); // 初始化锁
}
freerange(end, (void *)PHYSTOP);
}
void kfree(void *pa)
{
struct run *r;
if (((uint64)pa % PGSIZE) != 0 || (char *)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
push_off();
r = (struct run *)pa;
int cpu = cpuid();
acquire(&kmem[cpu].lock);
r->next = kmem[cpu].freelist;
kmem[cpu].freelist = r;
release(&kmem[cpu].lock); // 释放过程
pop_off();
}
void *
kalloc(void)
{
struct run *r;
push_off();
int cpu = cpuid();
acquire(&kmem[cpu].lock);
if (!kmem[cpu].freelist)
{
int steal_page_num = 64; // 设置要偷取的内存页
for (int i = 0; i < NCPU; i++)
{
if (i == cpu)
{
continue;
} // 获得下一个存在空闲页的CPU
acquire(&kmem[i].lock);
struct run *other_cpu_freepage = kmem[i].freelist;
while (other_cpu_freepage && steal_page_num)
{
kmem[i].freelist = other_cpu_freepage->next;
other_cpu_freepage->next = kmem[cpu].freelist;
kmem[cpu].freelist = other_cpu_freepage;
other_cpu_freepage = kmem[i].freelist;
steal_page_num--;
} // 偷取过程
release(&kmem[i].lock);
if (steal_page_num == 0)
{
break;
} // 跳出循环的条件
}
}
r = kmem[cpu].freelist;
if (r)
kmem[cpu].freelist = r->next;
release(&kmem[cpu].lock);
pop_off();
if (r)
memset((char *)r, 5, PGSIZE); // fill with junk
return (void *)r;
}
函数cpuid
返回当前的核心编号,但只有在中断关闭时调用它并使用其结果才是安全的。我需要使用push_off()
和pop_off()
来关闭和打开中断。
Buffer cache (hard)
xv6
会缓存一些经常使用的block
块在内存里(即这个buffer cache pool
), 使得每次重复调用这个block
块时直接从内存
读取, 而不用做磁盘I/O
。
原始版本的buffer cache
由一个大锁bcache.lock
保护, 限制了并行运行的效率. 我们要把它拆解为更精细的锁管理, 用hash bucket
的思想. 并且放弃双链表的管理方式, 直接使用ticks
时间戳来实现LRU
(least-recently-used)算法。
// 修改 buf 结构体中的内容
struct buf
{
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt; // 引用计数
struct buf *prev; // LRU cache list
struct buf *next;
uint64 lastusetime; // 最后使用时间
uchar data[BSIZE];
};
#define BUCKETSIZE 13 // 13 个桶
#define BUFFERSIZE 8 // 一个桶 8 块
extern uint ticks; // 时间戳
struct
{
struct spinlock lock;
struct buf buf[BUFFERSIZE];
} bcache[BUCKETSIZE];
uint hash(uint64 bucketid)
{
return bucketid % BUCKETSIZE;
} // hash 函数 用于查找桶
void binit(void)
{
for (int i = 0; i < BUCKETSIZE; i++)
{
initlock(&bcache[i].lock, "bcachelock"); // 初始化桶锁
for (int j = 0; j < BUFFERSIZE; j++)
{
initsleeplock(&bcache[i].buf[j].lock, "bufferlock"); // 初始化桶内块锁
}
}
}
static struct buf *bget(uint dev, uint blockno)
{
struct buf *b;
// 计算哈希值,确定使用哪个桶
int bucketid = hash(blockno);
// 获取整个缓冲区池的锁
acquire(&bcache[bucketid].lock);
// 检查缓冲区池中是否已经存在该块
for (int i = 0; i < BUFFERSIZE; i++)
{
b = &bcache[bucketid].buf[i];
if (b->dev == dev && b->blockno == blockno)
{
// 如果存在,则增加引用计数,更新最后使用时间,并释放整个缓冲区池的锁
b->refcnt++;
b->lastusetime = ticks;
release(&bcache[bucketid].lock);
// 获取缓冲区的锁,表示该缓冲区正在被使用
acquiresleep(&b->lock);
return b;
}
}
// 如果缓冲区池中不存在该块,则选择一个最久未使用的缓冲区进行初始化
uint least = 0xffffffff;
int least_index = -1;
for (int i = 0; i < BUFFERSIZE; i++)
{
b = &bcache[bucketid].buf[i];
if (b->refcnt == 0 && b->lastusetime < least)
{
least = b->lastusetime;
least_index = i;
}
}
// 如果没有可用的缓冲区,则尝试在其他桶中找到可用的缓冲区
if (least_index == -1)
{
for (int i = 0; i < BUCKETSIZE; i++)
{
if (i != bucketid) // 跳过当前桶
{
acquire(&bcache[i].lock);
for (int j = 0; j < BUFFERSIZE; j++)
{
b = &bcache[i].buf[j];
if (b->refcnt == 0 && b->lastusetime < least)
{
least = b->lastusetime;
least_index = j;
}
}
release(&bcache[i].lock);
}
}
}
// 如果没有可用的缓冲区,则发生错误
if (least_index == -1)
{
// 这里可以去其他桶中偷块
panic("bget: no buffers can recycle");
}
// 获取选中的缓冲区的锁,更新相关信息,释放整个缓冲区池的锁
b = &bcache[bucketid].buf[least_index];
b->dev = dev;
b->blockno = blockno;
b->lastusetime = ticks;
b->valid = 0;
b->refcnt = 1;
release(&bcache[bucketid].lock);
// 获取缓冲区的锁,表示该缓冲区正在被使用
acquiresleep(&b->lock);
return b;
}
// 用于释放缓冲区,减少引用计数。如果引用计数变为零,表示缓冲区不再被使用,可以被其他请求使用。释放缓冲区时,会释放相应的锁。
void brelse(struct buf *b)
{
if (!holdingsleep(&b->lock))
panic("brelse");
int bucketid = hash(b->blockno);
acquire(&bcache[bucketid].lock);
b->refcnt--;
release(&bcache[bucketid].lock);
releasesleep(&b->lock);
}
// bpin 和 bunpin 用于原子性增加或减少缓冲区(桶)的引用计数
void bpin(struct buf *b)
{
uint64 bucketid = hash(b->blockno);
acquire(&bcache[bucketid].lock);
b->refcnt++;
release(&bcache[bucketid].lock);
}
void bunpin(struct buf *b)
{
uint64 bucketid = hash(b->blockno);
acquire(&bcache[bucketid].lock);
b->refcnt--;
release(&bcache[bucketid].lock);
}