background.go

  1package shell
  2
  3import (
  4	"bytes"
  5	"context"
  6	"fmt"
  7	"sync"
  8	"sync/atomic"
  9	"time"
 10
 11	"git.secluded.site/crush/internal/csync"
 12)
 13
 14const (
 15	// MaxBackgroundJobs is the maximum number of concurrent background jobs allowed
 16	MaxBackgroundJobs = 50
 17	// CompletedJobRetentionMinutes is how long to keep completed jobs before auto-cleanup (8 hours)
 18	CompletedJobRetentionMinutes = 8 * 60
 19)
 20
 21// BackgroundShell represents a shell running in the background.
 22type BackgroundShell struct {
 23	ID          string
 24	Command     string
 25	Description string
 26	Shell       *Shell
 27	WorkingDir  string
 28	ctx         context.Context
 29	cancel      context.CancelFunc
 30	stdout      *bytes.Buffer
 31	stderr      *bytes.Buffer
 32	done        chan struct{}
 33	exitErr     error
 34	completedAt int64 // Unix timestamp when job completed (0 if still running)
 35}
 36
 37// BackgroundShellManager manages background shell instances.
 38type BackgroundShellManager struct {
 39	shells *csync.Map[string, *BackgroundShell]
 40}
 41
 42var (
 43	backgroundManager     *BackgroundShellManager
 44	backgroundManagerOnce sync.Once
 45	idCounter             atomic.Uint64
 46)
 47
 48// GetBackgroundShellManager returns the singleton background shell manager.
 49func GetBackgroundShellManager() *BackgroundShellManager {
 50	backgroundManagerOnce.Do(func() {
 51		backgroundManager = &BackgroundShellManager{
 52			shells: csync.NewMap[string, *BackgroundShell](),
 53		}
 54	})
 55	return backgroundManager
 56}
 57
 58// Start creates and starts a new background shell with the given command.
 59func (m *BackgroundShellManager) Start(ctx context.Context, workingDir string, blockFuncs []BlockFunc, command string, description string) (*BackgroundShell, error) {
 60	// Check job limit
 61	if m.shells.Len() >= MaxBackgroundJobs {
 62		return nil, fmt.Errorf("maximum number of background jobs (%d) reached. Please terminate or wait for some jobs to complete", MaxBackgroundJobs)
 63	}
 64
 65	id := fmt.Sprintf("%03X", idCounter.Add(1))
 66
 67	shell := NewShell(&Options{
 68		WorkingDir: workingDir,
 69		BlockFuncs: blockFuncs,
 70	})
 71
 72	shellCtx, cancel := context.WithCancel(ctx)
 73
 74	bgShell := &BackgroundShell{
 75		ID:          id,
 76		Command:     command,
 77		Description: description,
 78		WorkingDir:  workingDir,
 79		Shell:       shell,
 80		ctx:         shellCtx,
 81		cancel:      cancel,
 82		stdout:      &bytes.Buffer{},
 83		stderr:      &bytes.Buffer{},
 84		done:        make(chan struct{}),
 85	}
 86
 87	m.shells.Set(id, bgShell)
 88
 89	go func() {
 90		defer close(bgShell.done)
 91
 92		err := shell.ExecStream(shellCtx, command, bgShell.stdout, bgShell.stderr)
 93
 94		bgShell.exitErr = err
 95		atomic.StoreInt64(&bgShell.completedAt, time.Now().Unix())
 96	}()
 97
 98	return bgShell, nil
 99}
100
101// Get retrieves a background shell by ID.
102func (m *BackgroundShellManager) Get(id string) (*BackgroundShell, bool) {
103	return m.shells.Get(id)
104}
105
106// Remove removes a background shell from the manager without terminating it.
107// This is useful when a shell has already completed and you just want to clean up tracking.
108func (m *BackgroundShellManager) Remove(id string) error {
109	_, ok := m.shells.Take(id)
110	if !ok {
111		return fmt.Errorf("background shell not found: %s", id)
112	}
113	return nil
114}
115
116// Kill terminates a background shell by ID.
117func (m *BackgroundShellManager) Kill(id string) error {
118	shell, ok := m.shells.Take(id)
119	if !ok {
120		return fmt.Errorf("background shell not found: %s", id)
121	}
122
123	shell.cancel()
124	<-shell.done
125	return nil
126}
127
128// BackgroundShellInfo contains information about a background shell.
129type BackgroundShellInfo struct {
130	ID          string
131	Command     string
132	Description string
133}
134
135// List returns all background shell IDs.
136func (m *BackgroundShellManager) List() []string {
137	ids := make([]string, 0, m.shells.Len())
138	for id := range m.shells.Seq2() {
139		ids = append(ids, id)
140	}
141	return ids
142}
143
144// Cleanup removes completed jobs that have been finished for more than the retention period
145func (m *BackgroundShellManager) Cleanup() int {
146	now := time.Now().Unix()
147	retentionSeconds := int64(CompletedJobRetentionMinutes * 60)
148
149	var toRemove []string
150	for shell := range m.shells.Seq() {
151		completedAt := atomic.LoadInt64(&shell.completedAt)
152		if completedAt > 0 && now-completedAt > retentionSeconds {
153			toRemove = append(toRemove, shell.ID)
154		}
155	}
156
157	for _, id := range toRemove {
158		m.Remove(id)
159	}
160
161	return len(toRemove)
162}
163
164// KillAll terminates all background shells.
165func (m *BackgroundShellManager) KillAll() {
166	shells := make([]*BackgroundShell, 0, m.shells.Len())
167	for shell := range m.shells.Seq() {
168		shells = append(shells, shell)
169	}
170	m.shells.Reset(map[string]*BackgroundShell{})
171
172	for _, shell := range shells {
173		shell.cancel()
174		<-shell.done
175	}
176}
177
178// GetOutput returns the current output of a background shell.
179func (bs *BackgroundShell) GetOutput() (stdout string, stderr string, done bool, err error) {
180	select {
181	case <-bs.done:
182		return bs.stdout.String(), bs.stderr.String(), true, bs.exitErr
183	default:
184		return bs.stdout.String(), bs.stderr.String(), false, nil
185	}
186}
187
188// IsDone checks if the background shell has finished execution.
189func (bs *BackgroundShell) IsDone() bool {
190	select {
191	case <-bs.done:
192		return true
193	default:
194		return false
195	}
196}
197
198// Wait blocks until the background shell completes.
199func (bs *BackgroundShell) Wait() {
200	<-bs.done
201}