Golang 并发工具库 MapReduce 简单实践
环境
go version go1.16.4 windows/amd64
Intel(R) Core(TM) i7-7820HK CPU @ 2.90GHz 4核心8线程
项目需求
-
处理数个约 5MB 的
小文件
-
从源目录读取文件并拷贝到目标目录
-
计算源文件 MD5 和目标文件 MD5 进行对比,如不相同则报错并终止程序执行
mapReduce 使用说明
go get -u github.com/tal-tech/go-zero
需求实现
判断上下文是否中止 → 读取数据 → 写入数据 → 校验 MD5
func fnBuilder(name string) func() error {
return func() error {
// 判断上下文是否终止
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 读取源数据
data, _err := os.ReadFile(filepath.Join(sourcePath, fileName))
// 计算源数据MD5
sourceMD5 := hash.Md5(data)
// 获取名称
fields := strings.Split(d.Name(), "-")
// 目标文件路径
distFilePath := filepath.Join(distPath, fileName)
// 拷贝数据
os.WriteFile(distFilePath, data, 0600)
// 校验数据
distData, _err := os.ReadFile(distFilePath)
distMD5 := hash.Md5(distData)
if !bytes.EqualFold(sourceMD5, distMD5) {
return errors.New("md5校验失败")
}
return nil
}
}
业务逻辑
创建任务队列
type SourceMap = map[string]fs.DirEntry
func CopyFileToDist(ctx context.Context, source SourceMap) (err error) {
// 创建工作队列
work := make([]func() error, 0, len(source))
for _name := range source {
// 创建任务
work = append(work, fnBuilder(_name))
}
switch concurrency {
default:
// mapReduce
case 1:
// sync.waitGroup
case 2:
// 串行
}
}
执行方式 1:MapReduce
func() {
if err = mr.Finish(work...); err != nil {
return err
}
}
执行方式 2:sync.WaitGroup
func() {
var wg sync.WaitGroup
wg.Add(len(work))
for k := range work {
go func(index int) {
defer wg.Done()
if err = work[index](); err != nil {
log.Errorln(err)
return
}
}(k)
}
wg.Wait()
}
执行方式 3:串行
func() {
for _, fn := range work {
if err = fn(); err != nil {
return err
}
}
}
运行结果
MapReduce
耗时 109220900 ns
{"file":"D:/go/src/filenamesSorter/main.go:44","func":"main.init.0","level":"info","msg":"并发处理(0-mapReduce 1-Sync.WaitGroup 2-不并发) 0","time":"2021-06-02T13:32:05+08:00"}
{"file":"D:/go/src/filenamesSorter/main.go:69","func":"main.main","level":"info","msg":"文件分类完毕","time":"2021-06-02T13:32:05+08:00","文件数":17,"耗时(ns)":109220900}
sync.WaitGroup
耗时 109798000 ns
{"file":"D:/go/src/filenamesSorter/main.go:44","func":"main.init.0","level":"info","msg":"并发处理(0-mapReduce 1-Sync.WaitGroup 2-不并发) 1","time":"2021-06-02T13:31:28+08:00"}
{"file":"D:/go/src/filenamesSorter/main.go:69","func":"main.main","level":"info","msg":"文件分类完毕","time":"2021-06-02T13:31:28+08:00","文件数":17,"耗时(ns)":109798000}
串行
耗时 359307700 ns
{"file":"D:/go/src/filenamesSorter/main.go:44","func":"main.init.0","level":"info","msg":"并发处理(0-mapReduce 1-Sync.WaitGroup 2-不并发) 2","time":"2021-06-02T13:33:02+08:00"}
{"file":"D:/go/src/filenamesSorter/main.go:69","func":"main.main","level":"info","msg":"文件分类完毕","time":"2021-06-02T13:33:02+08:00","文件数":17,"耗时(ns)":359307700}
结论
-
在不严格的情况下,执行效率方面可以认为 mapReduce ≈ sync.WaitGroup
-
易用性(包括并发和错误处理),mapReduce 完胜 sync.WaitGroup
-
mapReduce 好用