manager.go

  1// Package task provides task management functionality.
  2package task
  3
  4import (
  5	"context"
  6	"errors"
  7	"sync"
  8	"sync/atomic"
  9)
 10
 11var (
 12	// ErrNotFound is returned when a process is not found.
 13	ErrNotFound = errors.New("task not found")
 14
 15	// ErrAlreadyStarted is returned when a process is already started.
 16	ErrAlreadyStarted = errors.New("task already started")
 17)
 18
 19// Task is a task that can be started and stopped.
 20type Task struct {
 21	id      string
 22	fn      func(context.Context) error
 23	started atomic.Bool
 24	ctx     context.Context
 25	cancel  context.CancelFunc
 26	err     error
 27}
 28
 29// Manager manages tasks.
 30type Manager struct {
 31	m   sync.Map
 32	ctx context.Context
 33}
 34
 35// NewManager returns a new task manager.
 36func NewManager(ctx context.Context) *Manager {
 37	return &Manager{
 38		m:   sync.Map{},
 39		ctx: ctx,
 40	}
 41}
 42
 43// Add adds a task to the manager.
 44// If the process already exists, it is a no-op.
 45func (m *Manager) Add(id string, fn func(context.Context) error) {
 46	if m.Exists(id) {
 47		return
 48	}
 49
 50	ctx, cancel := context.WithCancel(m.ctx)
 51	m.m.Store(id, &Task{
 52		id:     id,
 53		fn:     fn,
 54		ctx:    ctx,
 55		cancel: cancel,
 56	})
 57}
 58
 59// Stop stops the task and removes it from the manager.
 60func (m *Manager) Stop(id string) error {
 61	v, ok := m.m.Load(id)
 62	if !ok {
 63		return ErrNotFound
 64	}
 65
 66	p := v.(*Task)
 67	p.cancel()
 68
 69	m.m.Delete(id)
 70	return nil
 71}
 72
 73// Exists checks if a task exists.
 74func (m *Manager) Exists(id string) bool {
 75	_, ok := m.m.Load(id)
 76	return ok
 77}
 78
 79// Run starts the task if it exists.
 80// Otherwise, it waits for the process to finish.
 81func (m *Manager) Run(id string, done chan<- error) {
 82	v, ok := m.m.Load(id)
 83	if !ok {
 84		done <- ErrNotFound
 85		return
 86	}
 87
 88	p := v.(*Task)
 89	if p.started.Load() {
 90		<-p.ctx.Done()
 91		if p.err != nil {
 92			done <- p.err
 93			return
 94		}
 95
 96		done <- p.ctx.Err()
 97	}
 98
 99	p.started.Store(true)
100	m.m.Store(id, p)
101	defer p.cancel()
102	defer m.m.Delete(id)
103
104	errc := make(chan error, 1)
105	go func(ctx context.Context) {
106		errc <- p.fn(ctx)
107	}(p.ctx)
108
109	select {
110	case <-m.ctx.Done():
111		done <- m.ctx.Err()
112	case err := <-errc:
113		p.err = err
114		m.m.Store(id, p)
115		done <- err
116	}
117}