diff --git a/internal/sync/workqueue.go b/internal/sync/workqueue.go index 16eb442a3c5588ca7be41cfd47281c1f90109629..1c07e74b125746ad8491e3d96766d96de1587bb4 100644 --- a/internal/sync/workqueue.go +++ b/internal/sync/workqueue.go @@ -10,8 +10,7 @@ import ( // WorkPool is a pool of work to be done. type WorkPool struct { workers int - work map[string]func() - mu sync.RWMutex + work sync.Map sem *semaphore.Weighted ctx context.Context logger func(string, ...interface{}) @@ -33,7 +32,6 @@ func WithWorkPoolLogger(logger func(string, ...interface{})) WorkPoolOption { func NewWorkPool(ctx context.Context, workers int, opts ...WorkPoolOption) *WorkPool { wq := &WorkPool{ workers: workers, - work: make(map[string]func()), ctx: ctx, } @@ -52,20 +50,22 @@ func NewWorkPool(ctx context.Context, workers int, opts ...WorkPoolOption) *Work // Run starts the workers and waits for them to finish. func (wq *WorkPool) Run() { - for id, fn := range wq.work { + wq.work.Range(func(key, value any) bool { + id := key.(string) + fn := value.(func()) if err := wq.sem.Acquire(wq.ctx, 1); err != nil { wq.logf("workpool: %v", err) - return + return false } go func(id string, fn func()) { defer wq.sem.Release(1) fn() - wq.mu.Lock() - delete(wq.work, id) - wq.mu.Unlock() + wq.work.Delete(id) }(id, fn) - } + + return true + }) if err := wq.sem.Acquire(wq.ctx, int64(wq.workers)); err != nil { wq.logf("workpool: %v", err) @@ -75,19 +75,15 @@ func (wq *WorkPool) Run() { // Add adds a new job to the pool. // If the job already exists, it is a no-op. func (wq *WorkPool) Add(id string, fn func()) { - wq.mu.Lock() - defer wq.mu.Unlock() - if _, ok := wq.work[id]; ok { + if _, ok := wq.work.Load(id); ok { return } - wq.work[id] = fn + wq.work.Store(id, fn) } // Status checks if a job is in the queue. func (wq *WorkPool) Status(id string) bool { - wq.mu.RLock() - defer wq.mu.RUnlock() - _, ok := wq.work[id] + _, ok := wq.work.Load(id) return ok }