lab9以提高并行度的方式熟悉并行编程,第一个实验是多核并行,第二个实验是key级别的哈希表锁编程。
Memory allocator 在内存的分配上,原先的xv6代码只有一条freelist,多核并行比较慢。现在要求实现一个cpu一条freelist,当一个CPU的freelist不够用时,需要向其他CPU借。
xv6多核并行 理解xv6的并行的实现:多核并行到底是怎么样的?是所有CPU都执行同样的代码还是咋地?
从代码结构上来看,我倾向于:只有一份代码,多个CPU有各自PC,按照PC取指令执行。这里并行的关键是多个核心而只有一份物理内存空间。代码保存在物理地址上,CPU按照PC读取指令执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 struct cpu { struct proc *proc ; struct context context ; int noff; int intena; }; extern struct cpu cpus [NCPU ];struct cpu* mycpu (void ) { int id = cpuid(); struct cpu *c = &cpus[id]; return c; } int cpuid () { int id = r_tp(); return id; }
所以可以看到,以上声明了cpu数组cpus,这侧面映射了只有一份代码。如果每个CPU对应一份代码,那么就不应该是数组的形式,参考fork的含义。
构建多核freelist 理解多核并行的机制后就不难构建多核freelist了,即然cpu是一个数组,那么freelist也构建一个数组。
1 2 3 4 5 6 7 8 9 struct { struct spinlock lock ; struct run *freelist ; } kmem[NCPU]; void kinit () { for (int i=0 ; i<NCPU; ++i) initlock(&kmem[i].lock, "kmem" ); freerange(end, (void *)PHYSTOP); }
不够再补的机制 采用最简单的实现:将CPU看成环形队列,每个CPU的freelist不够时,都先向下家借。如果下家没有,就继续轮询。如果有,那么直接将一半的链表借出去。
这里一半链表的实现采取快慢指针。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void * kalloc (void ) { struct run *r ; push_off(); int id = cpuid(); pop_off(); acquire(&kmem[id].lock); r = kmem[id].freelist; if (!r){ for (int i=1 ; i<NCPU; ++i){ int index = (id+i)%NCPU; acquire(&kmem[index].lock); struct run *head = kmem[index].freelist; if (head){ struct run * slow = head; struct run * fast = head; while (fast && fast->next!=NULL ){ fast = fast->next->next; slow = slow->next; } kmem[index].freelist = slow->next; slow->next = NULL ; r = head; release(&kmem[index].lock); break ; } release(&kmem[index].lock); } } if (r) kmem[id].freelist = r->next; release(&kmem[id].lock); if (r) memset ((char *)r, 5 , PGSIZE); return (void *)r; }
归还的操作很简单,利用锁保护好链表就行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void kfree (void *pa) { struct run *r ; if (((uint64)pa % PGSIZE) != 0 || (char *)pa < end || (uint64)pa >= PHYSTOP) panic("kfree" ); memset (pa, 1 , PGSIZE); r = (struct run*)pa; push_off(); int id = cpuid(); pop_off(); acquire(&kmem[id].lock); r->next = kmem[id].freelist; kmem[id].freelist = r; release(&kmem[id].lock); }
Buffer cache block cache机制 原先的block cache是只有一条LRU链表,一个大锁。现在改进机制是利用一个hash bucket级别的锁保护一个hash table,利用block no作为哈希索引。
网上看到的一种实现是:
1 2 3 4 struct { struct spinlock lock ; struct buf buf [NBUF ]; } bcache[NBUCKET];
直接将bcache分为NBUCKET份,利用blockno作为索引,在相应的链表上进行查找cache和牺牲cache的操作。这样的实现粗暴简单,也不会有死锁的风险,但总觉得与实验指导的意思相悖。
我更倾向于这种实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 #define NBUCKET 13 #define HINDEX(x) ((x) % NBUCKET) struct bucket { struct spinlock lock ; struct buf head ; }; struct { struct spinlock lock ; struct buf buf [NBUF ]; struct bucket htable [NBUCKET ]; } bcache;
即将buf数组作为buf的池子,htable在buf不够用时再从buf中取buf。如果需要牺牲一个buf,是可能出现从一个bucket到另外一个bucket的情况。
实现思考 1 2 3 4 5 6 7 8 9 10 11 12 13 14 void binit (void ) { struct buf *b ; initlock(&bcache.lock, "bcache" ); for (b = bcache.buf; b < bcache.buf + NBUF; b++) { initsleeplock(&b->lock, "buffer" ); } for (int i=0 ; i<NBUCKET; ++i){ initlock(&bcache.htable[i].lock, "bcache.bucklock" ); bcache.htable[i].head.next = &bcache.htable[i].head; bcache.htable[i].head.prev = &bcache.htable[i].head; } }
在初始化时,我的hash table里的LRU链表肯定都是空的,只有一个伪头节点。
一种实现是:将所有buf(一共NBUF个)直接加载到htable[0]的链表中。后续其他bucket的链表不够用时,从其他bucket的链表找牺牲buf
另一种实现:当前bucket的链表找不到buf,那么就去buf数组找还有没有空闲的buf,如果没有空闲的buf,那么再进行全hash table的扫描,找出交换的buf
再对每个bucket的链表进行访问时,都需要获取当前bucket的锁。
有缺陷的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 static struct buf* bget (uint dev, uint blockno) { struct buf *b ; uint index = HINDEX(blockno); acquire(&bcache.htable[index].lock); for (b = bcache.htable[index].head.next; b != &bcache.htable[index].head; b = b->next) { if (b->dev == dev && b->blockno == blockno) { b->refcnt++; b->timestamp = ticks; release(&bcache.htable[index].lock); acquiresleep(&b->lock); return b; } } struct buf * tmp = 0 ; acquire(&bcache.lock); for (b = bcache.buf; b < bcache.buf + NBUF; b++) { if (b->refcnt == 0 ) { if ( !tmp || tmp->timestamp > b->timestamp) { tmp = b; } } } if (!tmp) panic("bget: no buffers" ); if (tmp->timestamp == 0 ){ setBuf(tmp, dev, blockno, ticks); insertBuf(&bcache.htable[index], tmp); }else { uint oldIndex = HINDEX(tmp->blockno); uint newIndex = index; setBuf(tmp, dev, blockno, ticks); if (oldIndex != newIndex){ acquire(&bcache.htable[oldIndex].lock); tmp->prev->next = tmp->next; tmp->next->prev = tmp->prev; release(&bcache.htable[oldIndex].lock); insertBuf(&bcache.htable[index], tmp); } } release(&bcache.lock); release(&bcache.htable[index].lock); acquiresleep(&tmp->lock); return tmp; } void brelse (struct buf *b) { if (!holdingsleep(&b->lock)) panic("brelse" ); releasesleep(&b->lock); acquire(&bcache.htable[HINDEX(b->blockno)].lock); b->refcnt--; if (b->refcnt==0 ) b->timestamp=ticks; release(&bcache.htable[HINDEX(b->blockno)].lock); } void setBuf (struct buf* buf, uint dev, uint blockno, uint timestamp) { buf->dev = dev; buf->blockno = blockno; buf->timestamp = timestamp; buf->valid = 0 ; buf->refcnt = 1 ; } void insertBuf (struct bucket* bucket, struct buf* node) { struct buf * head = &bucket->head; head->prev->next = node; node->prev = head->prev; head->prev = node; node->next = head; }
可以通过bcachetest,但是无法通过usertests,挠头。
这里明显的缺陷就是死锁。当线程A获取bucket1的锁,线程B获取bucket2的锁,两个线程都要继续进行下去,然后A获得了bcache的锁,但是A选取的牺牲buf恰好是bucket2的buf,那么A还要获得bucket2的锁,那么就造成了死锁。
如果采用全哈希表扫描的方式获取一个牺牲buf,感觉也会有死锁的场景。