1// Package task provides task management functionality.
2package task
3
4import (
5 "context"
6 "errors"
7 "sync"
8 "sync/atomic"
9)
10
11var (
12 // ErrNotFound is returned when a process is not found.
13 ErrNotFound = errors.New("task not found")
14
15 // ErrAlreadyStarted is returned when a process is already started.
16 ErrAlreadyStarted = errors.New("task already started")
17)
18
19// Task is a task that can be started and stopped.
20type Task struct {
21 id string
22 fn func(context.Context) error
23 started atomic.Bool
24 ctx context.Context
25 cancel context.CancelFunc
26 err error
27}
28
29// Manager manages tasks.
30type Manager struct {
31 m sync.Map
32 ctx context.Context
33}
34
35// NewManager returns a new task manager.
36func NewManager(ctx context.Context) *Manager {
37 return &Manager{
38 m: sync.Map{},
39 ctx: ctx,
40 }
41}
42
43// Add adds a task to the manager.
44// If the process already exists, it is a no-op.
45func (m *Manager) Add(id string, fn func(context.Context) error) {
46 if m.Exists(id) {
47 return
48 }
49
50 ctx, cancel := context.WithCancel(m.ctx)
51 m.m.Store(id, &Task{
52 id: id,
53 fn: fn,
54 ctx: ctx,
55 cancel: cancel,
56 })
57}
58
59// Stop stops the task and removes it from the manager.
60func (m *Manager) Stop(id string) error {
61 v, ok := m.m.Load(id)
62 if !ok {
63 return ErrNotFound
64 }
65
66 p := v.(*Task)
67 p.cancel()
68
69 m.m.Delete(id)
70 return nil
71}
72
73// Exists checks if a task exists.
74func (m *Manager) Exists(id string) bool {
75 _, ok := m.m.Load(id)
76 return ok
77}
78
79// Run starts the task if it exists.
80// Otherwise, it waits for the process to finish.
81func (m *Manager) Run(id string, done chan<- error) {
82 v, ok := m.m.Load(id)
83 if !ok {
84 done <- ErrNotFound
85 return
86 }
87
88 p := v.(*Task)
89 if p.started.Load() {
90 <-p.ctx.Done()
91 if p.err != nil {
92 done <- p.err
93 return
94 }
95
96 done <- p.ctx.Err()
97 }
98
99 p.started.Store(true)
100 m.m.Store(id, p)
101 defer p.cancel()
102 defer m.m.Delete(id)
103
104 errc := make(chan error, 1)
105 go func(ctx context.Context) {
106 errc <- p.fn(ctx)
107 }(p.ctx)
108
109 select {
110 case <-m.ctx.Done():
111 done <- m.ctx.Err()
112 case err := <-errc:
113 p.err = err
114 m.m.Store(id, p)
115 done <- err
116 }
117}