MIT6.S081 lab9 locks

lab9以提高并行度的方式熟悉并行编程,第一个实验是多核并行,第二个实验是key级别的哈希表锁编程。

Memory allocator

在内存的分配上,原先的xv6代码只有一条freelist,多核并行比较慢。现在要求实现一个cpu一条freelist,当一个CPU的freelist不够用时,需要向其他CPU借。

xv6多核并行

理解xv6的并行的实现:多核并行到底是怎么样的?是所有CPU都执行同样的代码还是咋地?

从代码结构上来看,我倾向于:只有一份代码,多个CPU有各自PC,按照PC取指令执行。这里并行的关键是多个核心而只有一份物理内存空间。代码保存在物理地址上,CPU按照PC读取指令执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct cpu {
struct proc *proc; // The process running on this cpu, or null.
struct context context; // swtch() here to enter scheduler().
int noff; // Depth of push_off() nesting.
int intena; // Were interrupts enabled before push_off()?
};

extern struct cpu cpus[NCPU];

struct cpu* mycpu(void) {
int id = cpuid();
struct cpu *c = &cpus[id];
return c;
}

int cpuid(){
int id = r_tp();
return id;
}

所以可以看到,以上声明了cpu数组cpus,这侧面映射了只有一份代码。如果每个CPU对应一份代码,那么就不应该是数组的形式,参考fork的含义。

构建多核freelist

理解多核并行的机制后就不难构建多核freelist了,即然cpu是一个数组,那么freelist也构建一个数组。

1
2
3
4
5
6
7
8
9
struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];

void kinit(){
for(int i=0; i<NCPU; ++i) initlock(&kmem[i].lock, "kmem");
freerange(end, (void*)PHYSTOP);
}

不够再补的机制

采用最简单的实现:将CPU看成环形队列,每个CPU的freelist不够时,都先向下家借。如果下家没有,就继续轮询。如果有,那么直接将一半的链表借出去。

这里一半链表的实现采取快慢指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void * kalloc(void){
struct run *r;

push_off(); // close interrupt
int id = cpuid();
pop_off();
acquire(&kmem[id].lock);
r = kmem[id].freelist;
if(!r){
for(int i=1; i<NCPU; ++i){
int index = (id+i)%NCPU;
acquire(&kmem[index].lock);
struct run *head = kmem[index].freelist;
if(head){
// borrow some node
struct run* slow = head;
struct run* fast = head;
while(fast && fast->next!=NULL){
fast = fast->next->next;
slow = slow->next;
}
kmem[index].freelist = slow->next;
slow->next = NULL;
r = head;
// then return
release(&kmem[index].lock);
break;
}
release(&kmem[index].lock);
}
}
if(r)
kmem[id].freelist = r->next;
release(&kmem[id].lock);

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}

归还的操作很简单,利用锁保护好链表就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void kfree(void *pa){
struct run *r;

if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");

// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);

r = (struct run*)pa;

push_off(); // close interrupt
int id = cpuid();
pop_off(); // open interrupt
acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
}

Buffer cache

block cache机制

原先的block cache是只有一条LRU链表,一个大锁。现在改进机制是利用一个hash bucket级别的锁保护一个hash table,利用block no作为哈希索引。

网上看到的一种实现是:

1
2
3
4
struct {
struct spinlock lock;
struct buf buf[NBUF];//NBUF=30
} bcache[NBUCKET];

直接将bcache分为NBUCKET份,利用blockno作为索引,在相应的链表上进行查找cache和牺牲cache的操作。这样的实现粗暴简单,也不会有死锁的风险,但总觉得与实验指导的意思相悖。

我更倾向于这种实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
#define NBUCKET 13
#define HINDEX(x) ((x) % NBUCKET)
struct bucket{
struct spinlock lock;
struct buf head;
};

struct {
struct spinlock lock;
struct buf buf[NBUF];

struct bucket htable[NBUCKET];
} bcache;

即将buf数组作为buf的池子,htable在buf不够用时再从buf中取buf。如果需要牺牲一个buf,是可能出现从一个bucket到另外一个bucket的情况。

实现思考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void binit(void) {
struct buf *b;

initlock(&bcache.lock, "bcache");

for (b = bcache.buf; b < bcache.buf + NBUF; b++) {
initsleeplock(&b->lock, "buffer");
}
for(int i=0; i<NBUCKET; ++i){
initlock(&bcache.htable[i].lock, "bcache.bucklock");
bcache.htable[i].head.next = &bcache.htable[i].head;
bcache.htable[i].head.prev = &bcache.htable[i].head;
}
}

在初始化时,我的hash table里的LRU链表肯定都是空的,只有一个伪头节点。

  • 一种实现是:将所有buf(一共NBUF个)直接加载到htable[0]的链表中。后续其他bucket的链表不够用时,从其他bucket的链表找牺牲buf
  • 另一种实现:当前bucket的链表找不到buf,那么就去buf数组找还有没有空闲的buf,如果没有空闲的buf,那么再进行全hash table的扫描,找出交换的buf

再对每个bucket的链表进行访问时,都需要获取当前bucket的锁。

有缺陷的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
static struct buf* bget(uint dev, uint blockno) {
struct buf *b;
uint index = HINDEX(blockno);

acquire(&bcache.htable[index].lock);
// Is the block already cached?
for (b = bcache.htable[index].head.next; b != &bcache.htable[index].head; b = b->next) {
if (b->dev == dev && b->blockno == blockno) {
b->refcnt++;
b->timestamp = ticks;
release(&bcache.htable[index].lock);
acquiresleep(&b->lock);
return b;
}
}

// Not cached. 从buf数组中找到一个refcnt为0,并且时间戳最小的buf
struct buf* tmp = 0;
acquire(&bcache.lock);
for (b = bcache.buf; b < bcache.buf + NBUF; b++) {
if (b->refcnt == 0) {
if( !tmp || tmp->timestamp > b->timestamp) {
tmp = b;
}
}
}
if(!tmp)
panic("bget: no buffers");

// 现在我们要重建缓存
// tmp可能已经在哈希表中,需要先从哈希表中释放tmp节点,然后再插入(这个过程可能对同一个槽)

// 1、判断tmp是否在哈希表中
if(tmp->timestamp == 0){
setBuf(tmp, dev, blockno, ticks);
insertBuf(&bcache.htable[index], tmp);
}else{
//2、判断前后哈希槽是否相同
uint oldIndex = HINDEX(tmp->blockno);
uint newIndex = index;
setBuf(tmp, dev, blockno, ticks);
if(oldIndex != newIndex){
//3、前后槽不同,需要先释放旧节点然后插入新节点
acquire(&bcache.htable[oldIndex].lock);
tmp->prev->next = tmp->next;
tmp->next->prev = tmp->prev;
release(&bcache.htable[oldIndex].lock);
insertBuf(&bcache.htable[index], tmp);
}
}
release(&bcache.lock);
release(&bcache.htable[index].lock);
acquiresleep(&tmp->lock);
return tmp;
}
void brelse(struct buf *b) {
if (!holdingsleep(&b->lock))
panic("brelse");

releasesleep(&b->lock);

acquire(&bcache.htable[HINDEX(b->blockno)].lock);
b->refcnt--;
if(b->refcnt==0) b->timestamp=ticks;
release(&bcache.htable[HINDEX(b->blockno)].lock);
}

void setBuf(struct buf* buf, uint dev, uint blockno, uint timestamp){
buf->dev = dev;
buf->blockno = blockno;
buf->timestamp = timestamp;
buf->valid = 0;
buf->refcnt = 1;
}

void insertBuf(struct bucket* bucket, struct buf* node){
// 尾插法
struct buf* head = &bucket->head;
head->prev->next = node;
node->prev = head->prev;
head->prev = node;
node->next = head;
}

可以通过bcachetest,但是无法通过usertests,挠头。

这里明显的缺陷就是死锁。当线程A获取bucket1的锁,线程B获取bucket2的锁,两个线程都要继续进行下去,然后A获得了bcache的锁,但是A选取的牺牲buf恰好是bucket2的buf,那么A还要获得bucket2的锁,那么就造成了死锁。

如果采用全哈希表扫描的方式获取一个牺牲buf,感觉也会有死锁的场景。

作者

Desirer

发布于

2024-01-18

更新于

2024-02-06

许可协议