@@ -10,8 +10,7 @@ import (
// WorkPool is a pool of work to be done.
type WorkPool struct {
workers int
- work map[string]func()
- mu sync.RWMutex
+ work sync.Map
sem *semaphore.Weighted
ctx context.Context
logger func(string, ...interface{})
@@ -33,7 +32,6 @@ func WithWorkPoolLogger(logger func(string, ...interface{})) WorkPoolOption {
func NewWorkPool(ctx context.Context, workers int, opts ...WorkPoolOption) *WorkPool {
wq := &WorkPool{
workers: workers,
- work: make(map[string]func()),
ctx: ctx,
}
@@ -52,20 +50,22 @@ func NewWorkPool(ctx context.Context, workers int, opts ...WorkPoolOption) *Work
// Run starts the workers and waits for them to finish.
func (wq *WorkPool) Run() {
- for id, fn := range wq.work {
+ wq.work.Range(func(key, value any) bool {
+ id := key.(string)
+ fn := value.(func())
if err := wq.sem.Acquire(wq.ctx, 1); err != nil {
wq.logf("workpool: %v", err)
- return
+ return false
}
go func(id string, fn func()) {
defer wq.sem.Release(1)
fn()
- wq.mu.Lock()
- delete(wq.work, id)
- wq.mu.Unlock()
+ wq.work.Delete(id)
}(id, fn)
- }
+
+ return true
+ })
if err := wq.sem.Acquire(wq.ctx, int64(wq.workers)); err != nil {
wq.logf("workpool: %v", err)
@@ -75,19 +75,15 @@ func (wq *WorkPool) Run() {
// Add adds a new job to the pool.
// If the job already exists, it is a no-op.
func (wq *WorkPool) Add(id string, fn func()) {
- wq.mu.Lock()
- defer wq.mu.Unlock()
- if _, ok := wq.work[id]; ok {
+ if _, ok := wq.work.Load(id); ok {
return
}
- wq.work[id] = fn
+ wq.work.Store(id, fn)
}
// Status checks if a job is in the queue.
func (wq *WorkPool) Status(id string) bool {
- wq.mu.RLock()
- defer wq.mu.RUnlock()
- _, ok := wq.work[id]
+ _, ok := wq.work.Load(id)
return ok
}