MIT6824笔记二 Go与RPC

这节课主要介绍Go语言以及用Go实现爬虫的例子。

GO

GO的优势

  • 语法层面支持线程和管道
  • 垃圾回收机制,不需要手动管理内存
  • 类型安全(内存安全) //关于内存安全还需要再深刻认识

线程协调方式

  1. channels:go 中比较推荐的方式,分阻塞和带缓冲。
  2. sync.Cond:信号机制。
  3. waitGroup:阻塞知道一组 goroutine 执行完毕,后面还会提到。

爬虫例子

  1. 从一个种子网页 URL 开始

  2. 通过 HTTP 请求,获取其内容文本

  3. 解析其内容包含的所有 URL,针对所有 URL 重复过程 2,3

    为了避免重复抓取,需要记下所有抓取过的 URL。

串行爬取

(1)串行爬取的主要逻辑

1
2
fmt.Printf("=== Serial===\n")
Serial("http://golang.org/", fetcher, make(map[string]bool))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}

深度优先遍历(DFS )全部网页构成的图结构,利用一个名为 fetched 的 set 来保存所有已经抓取过的 URL。

(2)爬取函数的主要逻辑

  • Fetcher接口,里面定义了一个Fetch方法
  • fakeFetcher自定义类型,是一个string到fakeResult的map
  • fakeResult结构体,包括body和urls
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//
// Fetcher
//

type Fetcher interface {
// Fetch returns a slice of URLs found on the page.
Fetch(url string) (urls []string, err error)
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult //自定义类型

type fakeResult struct {
body string
urls []string
}

func (f fakeFetcher) Fetch(url string) ([]string, error) {
if res, ok := f[url]; ok {
fmt.Printf("found: %s\n", url)
return res.urls, nil
}
fmt.Printf("missing: %s\n", url)
return nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}

并行爬取

(1)思考并行的方法

简单将抓取部分用go关键并行。

  • 但如果仅这么改造,不利用某些手段(sync.WaitGroup)等待子 goroutine,而直接返回,那么可能只会抓取到种子 URL,同时造成子 goroutine 的泄露。
  • 如果访问已经抓取的 URL 集合 fetched 不加锁,很可能造成多次拉取同一个网页(两个线程都访问fetched,这个url访问过了吗,结果都是未访问)

(2)并行实现——利用锁和共享变量

1
2
fmt.Printf("=== ConcurrentMutex ===\n")
ConcurrentMutex("http://golang.org/", fetcher, makeState())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//
// Concurrent crawler with shared state and Mutex
//

type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}

func makeState() *fetchState {
return &fetchState{fetched: make(map[string]bool)}
}


func (fs *fetchState) testAndSet(url string) bool {
fs.mu.Lock()
defer fs.mu.Unlock()
r := fs.fetched[url]
fs.fetched[url] = true //已经访问过
return r
}

func ConcurrentMutex(url string, fetcher Fetcher, fs *fetchState) {
if fs.testAndSet(url) { //这里其实就是用锁保护map的更新
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, fs)
}(u)
}
done.Wait()
return
}

其中,关键部分为:sync.WaitGroup

1
2
3
4
5
6
7
8
9
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, fs)
}(u)
}
done.Wait()

WaitGroup 内部维护了一个计数器:调用 wg.Add(n) 时候会增加 n;调用 wait.Done() 时候会减少 1。调用 wg.Wait() 会一直阻塞直到当计数器变为 0

所以 WaitGroup 适合等待一组 goroutine 都结束的场景。

利用channel实现并行爬取

我们可以实现一个新的爬虫版本,不用锁 + 共享变量,而用 go 中内置的语法:channel 来通信。具体做法类似实现一个生产者消费者模型,使用 channel 做消息队列。

  1. 初始将种子 url 塞进 channel。
  2. 消费者:master 不断从 channel 中取出 urls,判断是否抓取过,然后启动新的 worker goroutine 去抓取。
  3. 生产者:worker goroutine 抓取到给定的任务 url,并将解析出的结果 urls 塞回 channel。
  4. master 使用一个变量 n 来追踪发出的任务数;往发出一份任务增加一;从 channel 中获取并处理完一份结果(即将其再安排给 worker)减掉一;当所有任务都处理完时,退出程序。
1
2
fmt.Printf("=== ConcurrentChannel ===\n")
ConcurrentChannel("http://golang.org/", fetcher)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}

func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}

Q&A:

  1. master 读 channel,多 worker 写 channel,不会有竞争问题吗?channel 是线程安全的。
  2. channel 不需要最后 close 吗?我们用 n 追踪了所有执行中的任务数,因此当 n 为 0 退出时,channel 中不存在任何任务 / 结果,因此 master/worker 都不会对 channel 存在引用,稍后 gc collector 会将其回收。
  3. 为什么在 ConcurrentChannel 需要用 goroutine 往 channel 中写一个 url?否则 master 在读取的时候会一直阻塞。并且 channel 是一个非缓冲 channel,如果不用 goroutine,将会永远阻塞在写的时候。第3个问题,如果不用goroutine,并且是非缓冲管道情况下,发送方会阻塞在发送代码,直到有接收放接收消息。
作者

Desirer

发布于

2023-11-25

更新于

2024-06-09

许可协议