1package task
2
3import (
4 "context"
5 "errors"
6 "sync"
7 "sync/atomic"
8)
9
10var (
11 // ErrNotFound is returned when a process is not found.
12 ErrNotFound = errors.New("task not found")
13
14 // ErrAlreadyStarted is returned when a process is already started.
15 ErrAlreadyStarted = errors.New("task already started")
16)
17
18// Task is a task that can be started and stopped.
19type Task struct {
20 id string
21 fn func(context.Context) error
22 started atomic.Bool
23 ctx context.Context
24 cancel context.CancelFunc
25 err error
26}
27
28// Manager manages tasks.
29type Manager struct {
30 m sync.Map
31 ctx context.Context
32}
33
34// NewManager returns a new task manager.
35func NewManager(ctx context.Context) *Manager {
36 return &Manager{
37 m: sync.Map{},
38 ctx: ctx,
39 }
40}
41
42// Add adds a task to the manager.
43// If the process already exists, it is a no-op.
44func (m *Manager) Add(id string, fn func(context.Context) error) {
45 if m.Exists(id) {
46 return
47 }
48
49 ctx, cancel := context.WithCancel(m.ctx)
50 m.m.Store(id, &Task{
51 id: id,
52 fn: fn,
53 ctx: ctx,
54 cancel: cancel,
55 })
56}
57
58// Stop stops the task and removes it from the manager.
59func (m *Manager) Stop(id string) error {
60 v, ok := m.m.Load(id)
61 if !ok {
62 return ErrNotFound
63 }
64
65 p := v.(*Task)
66 p.cancel()
67
68 m.m.Delete(id)
69 return nil
70}
71
72// Exists checks if a task exists.
73func (m *Manager) Exists(id string) bool {
74 _, ok := m.m.Load(id)
75 return ok
76}
77
78// Run starts the task if it exists.
79// Otherwise, it waits for the process to finish.
80func (m *Manager) Run(id string, done chan<- error) {
81 v, ok := m.m.Load(id)
82 if !ok {
83 done <- ErrNotFound
84 return
85 }
86
87 p := v.(*Task)
88 if p.started.Load() {
89 <-p.ctx.Done()
90 if p.err != nil {
91 done <- p.err
92 return
93 }
94
95 done <- p.ctx.Err()
96 }
97
98 p.started.Store(true)
99 m.m.Store(id, p)
100 defer p.cancel()
101 defer m.m.Delete(id)
102
103 errc := make(chan error, 1)
104 go func(ctx context.Context) {
105 errc <- p.fn(ctx)
106 }(p.ctx)
107
108 select {
109 case <-m.ctx.Done():
110 done <- m.ctx.Err()
111 case err := <-errc:
112 p.err = err
113 m.m.Store(id, p)
114 done <- err
115 }
116}