> 文章列表 > 【MIT 6.S081】Lab8: Locks

【MIT 6.S081】Lab8: Locks

【MIT 6.S081】Lab8: Locks

lock

  • Memory allocator
  • Buffer cache

这个lab比起上个lab难度大了一些,需要熟练掌握锁的使用。
笔者用时约7h(我太菜啦

Memory allocator

第一部分相对比较简单,就是为每个CPU独立出一个内存分配器(管理内存的链表),减少锁的竞争,提高程序并行度。具体来说,一开始锁争用的根本原因就是内核只维护了一个内存空闲列表,由一个锁去保护,于是多个CPU争用该列表的频率很高。优化的基本思想是为每个CPU维护一个空闲列表,每个列表都有自己的锁,因为不同CPU在不同的列表中运行,所以它们的分配和释放可以并行运行。
当一个CPU的列表为空时,它必须夺取其他CPU列表中的空闲内存,虽然这会导致锁争用,但是频率并不高。
具体实现上,首先将kmem结构体定义为结构体数组,表示多个CPU的内存空闲列表,如下所示:

struct {struct spinlock lock;struct run *freelist;
} kmem[NCPU];

kinit函数中初始化这些结构体中的锁:

void
kinit()
{int id;for (id = 0; id < NCPU; id ++ )initlock(&kmem[id].lock, "kmem");freerange(end, (void*)PHYSTOP);
}

然后在kalloc的时候,首先从CPU本身的内存空闲列表分配,如果该CPU的空闲列表已经空了,那么从其他CPU的空闲列表中抢一个过来即可,如下所示。其中,锁的添加依赖于CPUid。

void *
kalloc(void)
{struct run *r;int cpuid = getcpuid();int id;acquire(&kmem[cpuid].lock);r = kmem[cpuid].freelist;if(r)kmem[cpuid].freelist = r->next;release(&kmem[cpuid].lock);if(!r) {for (id = 0; id < NCPU; id ++ ) {if (id != cpuid) {acquire(&kmem[id].lock);r = kmem[id].freelist;if(r)kmem[id].freelist = r->next;release(&kmem[id].lock);if(r) break;}}}if(r)memset((char*)r, 5, PGSIZE); // fill with junkreturn (void*)r;
}

kfree中,将内存结点回收到本CPU的空闲列表中即可,如下:

void
kfree(void *pa)
{struct run *r;if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)panic("kfree");// Fill with junk to catch dangling refs.memset(pa, 1, PGSIZE);r = (struct run*)pa;int cpuid = getcpuid();acquire(&kmem[cpuid].lock);r->next = kmem[cpuid].freelist;kmem[cpuid].freelist = r;release(&kmem[cpuid].lock);
}

Buffer cache

这一部分相对于上一部分较难,目的是重新设计bcache的结构以减少锁的争用。一开始bcache维护了一个双向链表,每一个CPU请求一个磁盘块的时候都需要通过这个链表来请求,bcache里面只有一个锁,这无疑会导致很多的锁争用。
为了减少锁争用,一个简单的思路是使用hash表按照磁盘块的编号blockidcache buffer进行散列,即将一个链表划分为多个链表,并使用多个锁进行保护。另外,实验文档的提示中,建议使用时间戳进行LRU的维护,而不是像原来一样维护双向循环链表。其实这里为每一个bucket维护双向循环链表加上时间戳也可以,但是为了简化代码就直接用了简单的单链表加维护时间戳。
具体实现上,首先重新定义bcache的数据结构,如下:

struct Bucket {struct spinlock lock;struct buf head;
};struct {struct spinlock lock;struct buf buf[NBUF];// Linked list of all buffers, through prev/next.// Sorted by how recently the buffer was used.// head.next is most recent, head.prev is least.// struct buf head;struct Bucket bucket[NBUCKET];
} bcache;

buf结构体中添加一个字段保存该缓存块上一次被访问的时间戳,并且去掉prev指针,因为简化为单链表了。

struct buf {int valid;   // has data been read from disk?int disk;    // does disk "own" buf?uint dev;uint blockno;struct sleeplock lock;uint refcnt;// struct buf *prev; // LRU cache liststruct buf *next;uchar data[BSIZE];uint timestamp;      // the timestamp that this buf is used last time
};

修改binit函数,初始化所有桶的锁,并将所有buffer均匀分配到所有bucket

void
binit(void)
{struct buf *b;int i;for (i = 0; i < NBUCKET; i ++ ) {bcache.bucket[i].head.next = 0;initlock(&bcache.bucket[i].lock, "bucket");}initlock(&bcache.lock, "bcache");// Initialize all bucketsfor (i = 0; i < NBUF; i ++ ) {int id = i % NBUCKET;b = bcache.buf + i;initsleeplock(&b->lock, "buffer");b->next = bcache.bucket[id].head.next;bcache.bucket[id].head.next = b;}
}

修改brelse函数,在该块被释放的时候记录时间戳(原始代码更新LRU链表就是在这里),其实就是记录一下当前的ticks变量(定义在kernel/trap.c:10)的值而已,记得要获取ticks的锁tickslock

void
brelse(struct buf *b)
{if(!holdingsleep(&b->lock))panic("brelse");releasesleep(&b->lock);int id = b->blockno % NBUCKET;acquire(&bcache.bucket[id].lock);b->refcnt--;if (b->refcnt == 0) {// no one is waiting for it.acquire(&tickslock);b->timestamp = ticks;release(&tickslock);}release(&bcache.bucket[id].lock);
}

然后再修改两个小函数bpinbunpin,减小锁的争用。

void
bpin(struct buf *b) {int id = b->blockno % NBUCKET;acquire(&bcache.bucket[id].lock);b->refcnt++;release(&bcache.bucket[id].lock);
}void
bunpin(struct buf *b) {int id = b->blockno % NBUCKET;acquire(&bcache.bucket[id].lock);b->refcnt--;release(&bcache.bucket[id].lock);
}

最后的最后,需要修改bget函数。在请求缓存块的时候(即bget函数被调用的时候),首先到blockid对应的bucket查看该块是否已被缓存,如果已经缓存则直接返回该buffer。当目前不存在该缓存块时,首先从该块对应的bucket中查看是否有空闲的LRU块,如果有的话直接驱逐该块并替换为当前请求块;如果没有,那么遍历其他所有的bucket,找到一个LRU块并返回;最终如果找不到的话就panic
这里需要注意的是加锁的问题,首先我们把bucket的锁都先加上,然后看文档中说明的一些不需要处理的情况:

  • 两个进程使用相同的blockno
  • 两个进程请求的块同时在cache中未命中,需要寻找一个未使用的块进行替换

由于这两种情况不需要处理,那么需要处理的情况只有:

  • 两个进程请求的blockno始终不相同
  • 两个进程请求的块要么同时在cache中命中,要么只有一个未命中

那么首先考虑两个进程请求的块同时命中的情况,虽然块号始终不相同,但是无法避免哈希冲突,所进入的bucket可能相同,于是在查找块之前获取该bucket的锁就可以避免冲突。
再考虑其中一个进程请求的块命中,另一个进程请求的块不命中的情况,假设进程A请求块1(命中),进程B请求块2(不命中),此时还分为几种情况:

  • 块1和块2的bucket相同
    此时会在A查找自己bucket中是否存在缓存块时,B查找自己bucket是否存在缓存块时(必然不存在)且在自己bucke查找LRU时产生临界资源冲突,也是在查找之前获取bucket的锁就可以避免。
  • 块1和块2的bucket不同
    这个时候有一种情况就是,进程B在块2所属的bucket中没有找到LRU,然后刚好其他bucket中的LRU是块1所对应的缓存块。且B进程在获取LRU之后A进程才查找到缓存块,这个时候会产生两个进程同时使用一个缓存块的现象,且一个缓存块缓存了两块不同的磁盘块,这显然不对劲。但是测试程序似乎没有对这种情况进行测试,之后有时间再研究研究吧
static struct buf*
bget(uint dev, uint blockno)
{struct buf *b;int id = blockno % NBUCKET;acquire(&bcache.bucket[id].lock);// Is the block already cached?for(b = bcache.bucket[id].head.next; b; b = b->next){if(b->dev == dev && b->blockno == blockno){b->refcnt++;release(&bcache.bucket[id].lock);acquiresleep(&b->lock);return b;}}release(&bcache.bucket[id].lock);// Not cached.// Find the least recently used unused buffer in own bucket.acquire(&bcache.lock);acquire(&bcache.bucket[id].lock);uint min_time = MAXTIME;struct buf* lru_b = 0;for(b = bcache.bucket[id].head.next; b; b = b->next){if(b->refcnt == 0 && b->timestamp < min_time) {min_time = b->timestamp;lru_b = b;}}// has found the lru unused buffer in own bucket.if (lru_b) {lru_b->dev = dev;lru_b->blockno = blockno;lru_b->valid = 0;lru_b->refcnt = 1;release(&bcache.bucket[id].lock);release(&bcache.lock);acquiresleep(&lru_b->lock);return lru_b;}// hasn't found the lru unused buffer in own bucket.// find the lru unused buffer in other buckets.int i;min_time = MAXTIME;int new_id = -1;for (i = 0; i < NBUCKET; i ++ ) {if (i == id) continue;acquire(&bcache.bucket[i].lock);for(b = bcache.bucket[i].head.next; b; b = b->next){if(b->refcnt == 0 && b->timestamp < min_time) {min_time = b->timestamp;lru_b = b;new_id = i;}}release(&bcache.bucket[i].lock);}if (lru_b == 0)panic("bget: no buffers");acquire(&bcache.bucket[new_id].lock);lru_b->dev = dev;lru_b->blockno = blockno;lru_b->valid = 0;lru_b->refcnt = 1; for (b = &bcache.bucket[new_id].head; b->next; b = b->next) {if (b->next == lru_b) {b->next = lru_b->next;break;}}release(&bcache.bucket[new_id].lock);lru_b->next = bcache.bucket[id].head.next;bcache.bucket[id].head.next = lru_b;release(&bcache.bucket[id].lock);release(&bcache.lock);acquiresleep(&lru_b->lock);return lru_b;
}