golanggin框架异步同步goroutine并发操作
goroutine机制可以⽅便地实现异步处理
package main
import (
"log"
"time"
"github/gin-gonic/gin"
)
func main() {
// 1.创建路由
// 默认使⽤了2个中间件Logger(), Recovery()
r := gin.Default()
// 1.异步
r.GET("/long_async", func(c *gin.Context) {
// 需要搞⼀个副本
copyContext := c.Copy()
// 异步处理
go func() {
time.Sleep(3 * time.Second)
log.Println("异步执⾏:" + copyContext.Request.URL.Path)
}()
})
/
/ 2.同步
r.GET("/long_sync", func(c *gin.Context) {
time.Sleep(3 * time.Second)
log.Println("同步执⾏:" + c.Request.URL.Path)
})
r.Run(":8000")
}
补充:Golang的channel使⽤以及并发同步技巧
在学习《The Go Programming Language》第⼋章并发单元的时候还是遭遇了不少问题,和值得总结思考和记录的地⽅。
做⼀个类似于unix du命令的⼯具。但是阉割了⼀些功能,这⾥应该只实现-c(统计total⼤⼩) 和-h(以human⽐较容易辨识的显⽰出来)的功能。
⾸先我们需要构造⼀个能够返回FileInfo信息数组的函数,我们把它取名为dirEntries:
func dirEntries(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
传⼊⼀个路径字符串,然后使⽤ioutil.ReadDir解析这个路径下⾯的所有⽂件以及⽂件夹⽣成⼀个FileInfo的profile。Fileinfo interface下⾯包含了:
type FileInfo interface {
Name() string // base name of the file
Size() int64 // length in bytes for regular files; system-dependent for others
Mode() FileMode // file mode bits
ModTime() time.Time // modification time
IsDir() bool // abbreviation for Mode().IsDir()
Sys() interface{} // underlying data source (can return nil)
}
多种⽅法,可以直接调⽤,其作⽤就是后⾯注释写的⼀样。
有了能够获取⽂件夹下⾯⽂件和⽂件夹的函数之后,我们需要⼀个调⽤⽅⽤来walk指定的⽬录:
// ⼊参是⼀个⽂件⽬录,⼀个INT64的只接收的单向channel
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirEntries(dir) {
if entry.IsDir() {
} else {
fileSizes <- entry.Size()
}
}
}
这⾥我们定义⼀个⽬录,然后需求传⼊⼀个单向接收channel⽤于在多goroutine中计算总共的⽂件⼤⼩。
使⽤range⽅法来遍历我们上⾯写的dirEntries的返回⽂件或⽂件夹,如果是⽂件夹则继续迭代。
如果不是则将⽂件⼤⼩存⼊放⼊fileSizes channel中。
搞定上⾯两个函数,我们来写主函数部分:
func main() {
root := ""
flag.StringVar(&root, "-p", ".", "input dir.")
flag.Parse()
fileSizes := make(chan int64)
// 起⼀个goroutine去walk⽬录
go func() {
walkDir(root, fileSizes)
// Walk完毕之后要关闭该channel下⾯使⽤range读取数据的时候才会有尽头
close(fileSizes)
}()
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
这⾥注意⼀点,因为起goroutine的walk函数,和下⾯同时在range遍历是在同步进⾏,如果下⾯range速度太快读到管道⾥⾯没有值了会阻塞住等待有数据继续进来之后读取,⽽不是会跳出。只有当close(fileSizes)这句执⾏到,显⽰关闭掉channel之后,才会跳出range循环并且这时已经读取完了所有的数据。这⾥有点像,close channel的时候给range发送了⼀个停⽌信号⼀样,感觉这个利⽤起来会⽐较有⽤?后续可能会再研究⼀下。
go语言字符串转数组让我们继续来优化我们的程序,添加⼀个-v参数,打印出扫描⽂件的进度,当我们要扫描整个盘的时候,可能会花费⼤量的时间,我们需要知道进度如何了。
其实这个需求只需要很⼩的改动,让我们来重新改写⼀下main函数,⽤select多路复⽤来完成这个事情。
func main() {
root := ""
verbose := false
tick := make(<-chan time.Time)
var nfiles, nbytes int64
flag.StringVar(&root, "p", ".", "input dir.")
flag.BoolVar(&verbose, "v", false, "add verbose if you want")
flag.Parse()
if verbose {
tick = time.Tick(500 * time.Millisecond)
}
fileSizes := make(chan int64)
// 起⼀个goroutine去walk⽬录
go func() {
walkDir(root, fileSizes)
// Walk完毕之后要关闭该channel下⾯使⽤range读取数据的时候才会有尽头
close(fileSizes)
}()
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += size
case <-tick:
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
上⾯其实都差不多,这⾥我直接从loop那⾥开始说吧,遇到这个loop的时候我其实还蛮疑惑的,因为我在go语⾔保留关键字⾥⾯并没有看到他的⾝影,但是这⾥他的确是个关键字,和⾥⾯的break连⽤⾥⾯break后⾯跟上的loop 可以直接跳出到最外层loop包裹的循环,⽽不是break默认的只跳出⼀层循环。明⽩了这个道理之后,这个就不难理解了,当我们还在遍历⽂件的时候,select 会持续读取⽂件⼤⼩赋值给size,并且返回true给ok。如果我们开启了verbose,每隔500毫秒tick会收到来⾃time.Tick的消息。我们都知道select会在都准备好的情况下随机pick⼀个执⾏,所以这⾥也或快或慢的被打印进度(前提是同时收到信号,但是实际上这个发⽣速度可能在nm级别,凭感受很难感觉到谁先)。当最后都执⾏完毕后filesSizes channel会被上⾯的携程函数close(),当close之后,在读取完剩余数据后,fileSizes会返回给ok nil。就可以跳出循环。
看到这⾥可能会觉得有点绕,所以要尽可能的多理解⼀下,当然我们可以让这个du程序更快。可以注意到我们并没有在walkdir⾥⾯开启goroutines进⾏并发处理。下⾯我将尝试开启goroutine处理它们,并且⽤channel给他们加个锁控制⼀下goroutine的数量,在此之前我们先来看看现在完成了的代码:
package main
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"flag"
"time"
)
// ⼊参是⼀个⽂件⽬录,⼀个INT64的只接收的单向channel
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirEntries(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
func dirEntries(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
func main() {
t1 := time.Now()
root := ""
verbose := false
tick := make(<-chan time.Time)
var nfiles, nbytes int64
flag.StringVar(&root, "p", ".", "input dir.")
flag.BoolVar(&verbose, "v", false, "add verbose if you want")
flag.Parse()
if verbose {
tick = time.Tick(500 * time.Millisecond)
}
fileSizes := make(chan int64)
// 起⼀个goroutine去walk⽬录
go func() {
walkDir(root, fileSizes)
// Walk完毕之后要关闭该channel下⾯使⽤range读取数据的时候才会有尽头
close(fileSizes)
}()
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += size
case <-tick:
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
fmt.Println(time.Since(t1))
}
观察上⾯代码可以看出我们并不能直接在这个代码的基础上直接给walkDir加上goroutine,这样会导致channel直接被关闭,然后啥也没跑就结束了。
我们需要让主goroutine等待其他goroutine都完成之后再结束,所以主goroutine需要在这⾥阻塞住,等到得到可以结束的信号之后再结束。
我们可以使⽤sync.WaitGroup 来对仍旧活跃的walkDir调⽤进⾏计数。等到数量为0的时候就算我们可以结束了。
sync.WaitGroup提供了三个⽅法:
Add:添加或减少goroutine的数量。
Done:相当于Add(-1)。
Wait:阻塞住等待WaitGroup数量变成0.
明⽩这个道理之后我们改写了⼀下代码,让它使⽤sync.WaitGroup来⽀持同步,最后当所有goroutine都结束之后,关闭channel完成任务。
package main
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"flag"
"time"
"sync"
)
// ⼊参是⼀个⽂件⽬录,⼀个INT64的只接收的单向channel
func walkDir(dir string, fileSizes chan<- int64, n *sync.WaitGroup) {
defer n.Done()
for _, entry := range dirEntries(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, fileSizes, n)
} else {
fileSizes <- entry.Size()
}
}
}
func dirEntries(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
func main() {
t1 := time.Now()
root := ""
verbose := false
tick := make(<-chan time.Time)
fileSizes := make(chan int64)
var n sync.WaitGroup
var nfiles, nbytes int64
flag.StringVar(&root, "p", ".", "input dir.")
flag.BoolVar(&verbose, "v", false, "add verbose if you want")
flag.Parse()
if verbose {
tick = time.Tick(500 * time.Millisecond)
}
n.Add(1)
go walkDir(root, fileSizes, &n)
go func() {
n.Wait()
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += size
case <-tick:
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
}
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
fmt.Println(time.Since(t1))
}
随便跑跑。。感觉快得飞起,然⽽跑不了⼏秒就会报错,这个程序最⼤的问题就是我们完全没有办法之后它会⾃⼰打开多少个goroutine,感觉会爆炸。所以我们要限制这种夸张的写法,使⽤channel来做⼀个并发协程池,把同时开启的goroutine的数量控制⼀下。
最后上⼀下完整代码,注意defer关键字,只接收函数,所以我会在释放锁的时候使⽤匿名函数:
package main
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"flag"
"time"
"sync"
)
var token = make(chan int, 100)
// ⼊参是⼀个⽂件⽬录,⼀个INT64的只接收的单向channel
func walkDir(dir string, fileSizes chan<- int64, n *sync.WaitGroup) {
defer n.Done()
for _, entry := range dirEntries(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, fileSizes, n)
} else {
fileSizes <- entry.Size()
}
}
}
func dirEntries(dir string) []os.FileInfo {
token <- 1
defer func() {<-token}()
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
func main() {
var nfiles, nbytes int64
var n sync.WaitGroup
root := ""
verbose := false
t1 := time.Now()
fileSizes := make(chan int64)
tick := make(<-chan time.Time)
flag.StringVar(&root, "p", ".", "input dir.")
flag.BoolVar(&verbose, "v", false, "add verbose if you want")
flag.Parse()
if verbose {
tick = time.Tick(500 * time.Millisecond)
}
n.Add(1)
go walkDir(root, fileSizes, &n)
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论