pipeline.go

  1package gorust
  2
  3import (
  4	"context"
  5	"fmt"
  6	"sync"
  7	"time"
  8)
  9
 10type Stage[T, R any] interface {
 11	Process(ctx context.Context, input T) (R, error)
 12	Name() string
 13}
 14
 15type Pipeline[T any] struct {
 16	stages []Stage[any, any]
 17	mu     sync.RWMutex
 18	opts   PipelineOptions
 19}
 20
 21type PipelineOptions struct {
 22	MaxConcurrency int
 23	Timeout        time.Duration
 24	RetryAttempts  int
 25	RetryDelay     time.Duration
 26}
 27
 28type PipelineResult[T any] struct {
 29	Output T
 30	Error  error
 31	Stage  string
 32	Took   time.Duration
 33}
 34
 35func NewPipeline[T any](opts PipelineOptions) *Pipeline[T] {
 36	if opts.MaxConcurrency <= 0 {
 37		opts.MaxConcurrency = 10
 38	}
 39	if opts.Timeout <= 0 {
 40		opts.Timeout = 30 * time.Second
 41	}
 42	if opts.RetryAttempts <= 0 {
 43		opts.RetryAttempts = 3
 44	}
 45	if opts.RetryDelay <= 0 {
 46		opts.RetryDelay = time.Second
 47	}
 48
 49	return &Pipeline[T]{
 50		stages: make([]Stage[any, any], 0),
 51		opts:   opts,
 52	}
 53}
 54
 55func (p *Pipeline[T]) AddStage(stage Stage[any, any]) *Pipeline[T] {
 56	p.mu.Lock()
 57	defer p.mu.Unlock()
 58	p.stages = append(p.stages, stage)
 59	return p
 60}
 61
 62func (p *Pipeline[T]) Execute(ctx context.Context, input T) <-chan PipelineResult[any] {
 63	results := make(chan PipelineResult[any], len(p.stages))
 64	
 65	go func() {
 66		defer close(results)
 67		
 68		current := input
 69		for _, stage := range p.stages {
 70			select {
 71			case <-ctx.Done():
 72				results <- PipelineResult[any]{
 73					Error: ctx.Err(),
 74					Stage: stage.Name(),
 75				}
 76				return
 77			default:
 78				result := p.executeStageWithRetry(ctx, stage, current)
 79				results <- result
 80				
 81				if result.Error != nil {
 82					return
 83				}
 84				current = result.Output
 85			}
 86		}
 87	}()
 88	
 89	return results
 90}
 91
 92func (p *Pipeline[T]) executeStageWithRetry(ctx context.Context, stage Stage[any, any], input any) PipelineResult[any] {
 93	var lastErr error
 94	start := time.Now()
 95	
 96	for attempt := 0; attempt < p.opts.RetryAttempts; attempt++ {
 97		if attempt > 0 {
 98			select {
 99			case <-ctx.Done():
100				return PipelineResult[any]{
101					Error: ctx.Err(),
102					Stage: stage.Name(),
103					Took:  time.Since(start),
104				}
105			case <-time.After(p.opts.RetryDelay):
106			}
107		}
108		
109		stageCtx, cancel := context.WithTimeout(ctx, p.opts.Timeout)
110		output, err := stage.Process(stageCtx, input)
111		cancel()
112		
113		if err == nil {
114			return PipelineResult[any]{
115				Output: output,
116				Stage:  stage.Name(),
117				Took:   time.Since(start),
118			}
119		}
120		
121		lastErr = err
122	}
123	
124	return PipelineResult[any]{
125		Error: fmt.Errorf("stage %s failed after %d attempts: %w", stage.Name(), p.opts.RetryAttempts, lastErr),
126		Stage: stage.Name(),
127		Took:  time.Since(start),
128	}
129}
130
131type TransformStage[T, R any] struct {
132	name      string
133	transform func(context.Context, T) (R, error)
134}
135
136func NewTransformStage[T, R any](name string, transform func(context.Context, T) (R, error)) *TransformStage[T, R] {
137	return &TransformStage[T, R]{
138		name:      name,
139		transform: transform,
140	}
141}
142
143func (s *TransformStage[T, R]) Name() string {
144	return s.name
145}
146
147func (s *TransformStage[T, R]) Process(ctx context.Context, input T) (R, error) {
148	return s.transform(ctx, input)
149}
150
151type FilterStage[T any] struct {
152	name      string
153	predicate func(context.Context, T) (bool, error)
154}
155
156func NewFilterStage[T any](name string, predicate func(context.Context, T) (bool, error)) *FilterStage[T] {
157	return &FilterStage[T]{
158		name:      name,
159		predicate: predicate,
160	}
161}
162
163func (s *FilterStage[T]) Name() string {
164	return s.name
165}
166
167func (s *FilterStage[T]) Process(ctx context.Context, input T) (T, error) {
168	keep, err := s.predicate(ctx, input)
169	if err != nil {
170		var zero T
171		return zero, err
172	}
173	
174	if !keep {
175		var zero T
176		return zero, fmt.Errorf("item filtered out")
177	}
178	
179	return input, nil
180}
181
182type BatchProcessor[T, R any] struct {
183	name      string
184	batchSize int
185	processor func(context.Context, []T) ([]R, error)
186	buffer    []T
187	mu        sync.Mutex
188}
189
190func NewBatchProcessor[T, R any](name string, batchSize int, processor func(context.Context, []T) ([]R, error)) *BatchProcessor[T, R] {
191	return &BatchProcessor[T, R]{
192		name:      name,
193		batchSize: batchSize,
194		processor: processor,
195		buffer:    make([]T, 0, batchSize),
196	}
197}
198
199func (b *BatchProcessor[T, R]) Name() string {
200	return b.name
201}
202
203func (b *BatchProcessor[T, R]) Process(ctx context.Context, input T) ([]R, error) {
204	b.mu.Lock()
205	defer b.mu.Unlock()
206	
207	b.buffer = append(b.buffer, input)
208	
209	if len(b.buffer) >= b.batchSize {
210		batch := make([]T, len(b.buffer))
211		copy(batch, b.buffer)
212		b.buffer = b.buffer[:0]
213		
214		return b.processor(ctx, batch)
215	}
216	
217	return nil, nil
218}
219
220func (b *BatchProcessor[T, R]) Flush(ctx context.Context) ([]R, error) {
221	b.mu.Lock()
222	defer b.mu.Unlock()
223	
224	if len(b.buffer) == 0 {
225		return nil, nil
226	}
227	
228	batch := make([]T, len(b.buffer))
229	copy(batch, b.buffer)
230	b.buffer = b.buffer[:0]
231	
232	return b.processor(ctx, batch)
233}