1package task
  2
  3import (
  4	"context"
  5	"errors"
  6	"sync"
  7	"sync/atomic"
  8)
  9
 10var (
 11	// ErrNotFound is returned when a process is not found.
 12	ErrNotFound = errors.New("task not found")
 13
 14	// ErrAlreadyStarted is returned when a process is already started.
 15	ErrAlreadyStarted = errors.New("task already started")
 16)
 17
 18// Task is a task that can be started and stopped.
 19type Task struct {
 20	id      string
 21	fn      func(context.Context) error
 22	started atomic.Bool
 23	ctx     context.Context
 24	cancel  context.CancelFunc
 25	err     error
 26}
 27
 28// Manager manages tasks.
 29type Manager struct {
 30	m   sync.Map
 31	ctx context.Context
 32}
 33
 34// NewManager returns a new task manager.
 35func NewManager(ctx context.Context) *Manager {
 36	return &Manager{
 37		m:   sync.Map{},
 38		ctx: ctx,
 39	}
 40}
 41
 42// Add adds a task to the manager.
 43// If the process already exists, it is a no-op.
 44func (m *Manager) Add(id string, fn func(context.Context) error) {
 45	if m.Exists(id) {
 46		return
 47	}
 48
 49	ctx, cancel := context.WithCancel(m.ctx)
 50	m.m.Store(id, &Task{
 51		id:     id,
 52		fn:     fn,
 53		ctx:    ctx,
 54		cancel: cancel,
 55	})
 56}
 57
 58// Stop stops the task and removes it from the manager.
 59func (m *Manager) Stop(id string) error {
 60	v, ok := m.m.Load(id)
 61	if !ok {
 62		return ErrNotFound
 63	}
 64
 65	p := v.(*Task)
 66	p.cancel()
 67
 68	m.m.Delete(id)
 69	return nil
 70}
 71
 72// Exists checks if a task exists.
 73func (m *Manager) Exists(id string) bool {
 74	_, ok := m.m.Load(id)
 75	return ok
 76}
 77
 78// Run starts the task if it exists.
 79// Otherwise, it waits for the process to finish.
 80func (m *Manager) Run(id string, done chan<- error) {
 81	v, ok := m.m.Load(id)
 82	if !ok {
 83		done <- ErrNotFound
 84		return
 85	}
 86
 87	p := v.(*Task)
 88	if p.started.Load() {
 89		<-p.ctx.Done()
 90		if p.err != nil {
 91			done <- p.err
 92			return
 93		}
 94
 95		done <- p.ctx.Err()
 96	}
 97
 98	p.started.Store(true)
 99	m.m.Store(id, p)
100	defer p.cancel()
101	defer m.m.Delete(id)
102
103	errc := make(chan error, 1)
104	go func(ctx context.Context) {
105		errc <- p.fn(ctx)
106	}(p.ctx)
107
108	select {
109	case <-m.ctx.Done():
110		done <- m.ctx.Err()
111	case err := <-errc:
112		p.err = err
113		m.m.Store(id, p)
114		done <- err
115	}
116}