見出し画像

Golang_並行処理とMutex #472

並行プログラミングにおいて、複数のゴルーチンが同じメモリ領域にアクセスしようとするとデータ競合が発生する可能性があります。Golangでは、このような競合を防ぐためにsync.Mutexが提供されています。

Goの公式チュートリアルである「A Tour of Go」の最後の演習課題でちょうどよいものがあったので、Mutexの説明を兼ねて整理しておきます。

Mutexとは?

Mutexは、複数のゴルーチンが同じ共有リソースに同時にアクセスするのを防ぐためのロック機構です。syncパッケージで提供されており、以下のようなメソッドを持っています。

Lock()

リソースへのアクセスを制限します。他のゴルーチンはロックが解放されるまで待機します。

Unlock()

リソースへのアクセスを解放します。待機中のゴルーチンがあれば、1つがアクセスを再開します。

Mutexの使い方

syncパッケージをインポートして、sync.Mutex型の変数を宣言することで使用可能になります。例えば以下のように構造体に組み込んでおくことで、構造体に変更を加える際にロックをかけるようにできます。

package main

import (
	"sync"
)
 

type urlCache struct {
	v map[string]string
	mux sync.Mutex
}

func (c *urlCache) set(url string, body string) {
	c.mux.Lock()
	defer c.mux.Unlock()
	c.v[url] = body
} 

func (c *urlCache) get(url string) (string, bool) {
	c.mux.Lock()
	defer c.mux.Unlock()
	body, ok := c.v[url]
	return body, ok
}

並行処理との組み合わせ

A Tour of Goの演習問題をMutexを用いて解いたので、並行処理との組み合わせの例として載せておきます。Webクローラーをイメージしたプログラムで、後段で定義しているfetcherにあるurlを、並列でクロールし、かつ同じurlで重複して処理しないようにしています。

mutexを使っているのはキャッシュ部分で、一度クロールしたurlを重複処理しないように記録しています。

package main

import (
	"fmt"
	"sync"
)


type urlCache struct {
	v map[string]string
	mux sync.Mutex
}

func (c *urlCache) set(url string, body string) {
	c.mux.Lock()
	defer c.mux.Unlock()
	c.v[url] = body
} 

func (c *urlCache) get(url string) (string, bool) {
	c.mux.Lock()
	defer c.mux.Unlock()
	body, ok := c.v[url]
	return body, ok
}

type Fetcher interface {
	// Fetch returns the body of URL and
	// a slice of URLs found on that page.
	Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, cache *urlCache, done chan int) {
	
	defer func() { done <- 0 }()

	if depth <= 0 {
        // 指定した階層(depth)以上に深くはクロールしない
		return
	}
	if _, ok := cache.get(url); ok {
		// 既にキャッシュに存在したら後続の処理はしない
		return
	}

	body, urls, err := fetcher.Fetch(url)
	cache.set(url, body)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Printf("found: %s %q\n", url, body)  // 各urlに対する処理はこれ
	
	childTasks := len(urls)      // 各urlが持っている子urlの数
	childDone := make(chan int)  // 子urlでの処理をチャネルで監視
	for _, u := range urls {
		uu := u
		go Crawl(uu, depth-1, fetcher, cache, childDone)  // 回帰処理
	}
	for i := 0; i < childTasks; i++ {
		<-childDone
	}
	close(childDone)  // 子で使ったチャネルを親でクローズ!!
}



func main() {
	cache := &urlCache{v: make(map[string]string)}
	done := make(chan int)
	go Crawl("https://golang.org/", 4, fetcher, cache, done)
	<-done
	fmt.Println("All crawls done.")
	close(done)  // 子で使ったチャネルを親でクローズ!!
}



// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
	body string
	urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
	if res, ok := f[url]; ok {
		return res.body, res.urls, nil
	}
	return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
	"https://golang.org/": &fakeResult{
		"The Go Programming Language",
		[]string{
			"https://golang.org/pkg/",
			"https://golang.org/cmd/",
		},
	},
	"https://golang.org/pkg/": &fakeResult{
		"Packages",
		[]string{
			"https://golang.org/",
			"https://golang.org/cmd/",
			"https://golang.org/pkg/fmt/",
			"https://golang.org/pkg/os/",
		},
	},
	"https://golang.org/pkg/fmt/": &fakeResult{
		"Package fmt",
		[]string{
			"https://golang.org/",
			"https://golang.org/pkg/",
		},
	},
	"https://golang.org/pkg/os/": &fakeResult{
		"Package os",
		[]string{
			"https://golang.org/",
			"https://golang.org/pkg/",
		},
	},
}

並行処理としてのポイントは、今回は理解のためにあえてWaitGroupを使わずチャネルのやり取りだけで実装しました。

Crawl関数にある、

defer func() { done <- 0 }()

これで処理の完了後にチャネルに値を送信し、親のCrawl関数やmain関数で待機している、

[Crawl関数の一部]

	for i := 0; i < childTasks; i++ {
		<-childDone
	}
[main関数の一部]

	<-done

これで値を受信して処理を次に進めています。

また、チャネルをちゃんとクローズしたかったのですが、ポイントは「チャネルのクローズは親ゴルーチン側でのみ行う」ということです。これを意識することでチャネルや並行処理の動作が理解できてきます。

[Crawl関数の一部]

	childTasks := len(urls)      // 各urlが持っている子urlの数
	childDone := make(chan int)  // 子urlでの処理をチャネルで監視
	for _, u := range urls {
		uu := u
		go Crawl(uu, depth-1, fetcher, cache, childDone)  // 回帰処理
	}
	for i := 0; i < childTasks; i++ {
		<-childDone
	}
	close(childDone)  // 子で使ったチャネルを親でクローズ!!
func main() {
	cache := &urlCache{v: make(map[string]string)}
	done := make(chan int)
	go Crawl("https://golang.org/", 4, fetcher, cache, done)
	<-done
	fmt.Println("All crawls done.")
	close(done)  // 子で使ったチャネルを親でクローズ!!
}

Mutexと並行処理の使い方がイメージできてきました。


ここまでお読みいただきありがとうございました!!

参考


この記事が気に入ったらサポートをしてみませんか?