go实现协程池
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Worker struct {
Task func()
Id int
Wg *sync.WaitGroup
Quit chan bool
}
func (w Worker) Start() {
go func() {
for {
select {
case task := <-w.Task:
task()
case <-w.Quit:
return
}
}
}()
}
type Pool struct {
Workers []Worker
Tasks chan func()
Quit chan bool
Wg *sync.WaitGroup
}
func NewPool(numWorkers int) *Pool {
pool := &Pool{
Workers: make([]Worker, numWorkers),
Tasks: make(chan func()),
Quit: make(chan bool),
Wg: &sync.WaitGroup{},
}
for i := 0; i < numWorkers; i++ {
pool.Workers[i] = Worker{
Id: i,
Task: pool.Tasks,
Quit: pool.Quit,
Wg: pool.Wg,
}
pool.Workers[i].Start()
}
return pool
}
func (p *Pool) Do(task func()) {
p.Wg.Add(1)
go func() {
p.Tasks <- task
}()
}
func (p *Pool) Wait() {
go func() {
p.Quit <- true
}()
p.Wg.Wait()
close(p.Tasks)
}
func main() {
pool := NewPool(10)
var count int32
for i := 0; i < 100; i++ {
pool.Do(func() {
time.Sleep(10 * time.Millisecond)
atomic.AddInt32(&count, 1)
})
}
pool.Wait()
fmt.Printf("Count is %d\n", count)
}
这段代码定义了一个简单的Worker和Pool结构体,以及相关的方法来管理协程池。Pool结构体包含一个Worker数组和一个任务通道,用于向工作池提交任务。每个Worker是一个协程,从任务通道接收任务并执行。代码中还展示了如何使用协程池来处理一系列任务,并在最后关闭相关通道和等待所有任务完成。
评论已关闭