1package gorust
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8)
9
10type Stage[T, R any] interface {
11 Process(ctx context.Context, input T) (R, error)
12 Name() string
13}
14
15type Pipeline[T any] struct {
16 stages []Stage[any, any]
17 mu sync.RWMutex
18 opts PipelineOptions
19}
20
21type PipelineOptions struct {
22 MaxConcurrency int
23 Timeout time.Duration
24 RetryAttempts int
25 RetryDelay time.Duration
26}
27
28type PipelineResult[T any] struct {
29 Output T
30 Error error
31 Stage string
32 Took time.Duration
33}
34
35func NewPipeline[T any](opts PipelineOptions) *Pipeline[T] {
36 if opts.MaxConcurrency <= 0 {
37 opts.MaxConcurrency = 10
38 }
39 if opts.Timeout <= 0 {
40 opts.Timeout = 30 * time.Second
41 }
42 if opts.RetryAttempts <= 0 {
43 opts.RetryAttempts = 3
44 }
45 if opts.RetryDelay <= 0 {
46 opts.RetryDelay = time.Second
47 }
48
49 return &Pipeline[T]{
50 stages: make([]Stage[any, any], 0),
51 opts: opts,
52 }
53}
54
55func (p *Pipeline[T]) AddStage(stage Stage[any, any]) *Pipeline[T] {
56 p.mu.Lock()
57 defer p.mu.Unlock()
58 p.stages = append(p.stages, stage)
59 return p
60}
61
62func (p *Pipeline[T]) Execute(ctx context.Context, input T) <-chan PipelineResult[any] {
63 results := make(chan PipelineResult[any], len(p.stages))
64
65 go func() {
66 defer close(results)
67
68 current := input
69 for _, stage := range p.stages {
70 select {
71 case <-ctx.Done():
72 results <- PipelineResult[any]{
73 Error: ctx.Err(),
74 Stage: stage.Name(),
75 }
76 return
77 default:
78 result := p.executeStageWithRetry(ctx, stage, current)
79 results <- result
80
81 if result.Error != nil {
82 return
83 }
84 current = result.Output
85 }
86 }
87 }()
88
89 return results
90}
91
92func (p *Pipeline[T]) executeStageWithRetry(ctx context.Context, stage Stage[any, any], input any) PipelineResult[any] {
93 var lastErr error
94 start := time.Now()
95
96 for attempt := 0; attempt < p.opts.RetryAttempts; attempt++ {
97 if attempt > 0 {
98 select {
99 case <-ctx.Done():
100 return PipelineResult[any]{
101 Error: ctx.Err(),
102 Stage: stage.Name(),
103 Took: time.Since(start),
104 }
105 case <-time.After(p.opts.RetryDelay):
106 }
107 }
108
109 stageCtx, cancel := context.WithTimeout(ctx, p.opts.Timeout)
110 output, err := stage.Process(stageCtx, input)
111 cancel()
112
113 if err == nil {
114 return PipelineResult[any]{
115 Output: output,
116 Stage: stage.Name(),
117 Took: time.Since(start),
118 }
119 }
120
121 lastErr = err
122 }
123
124 return PipelineResult[any]{
125 Error: fmt.Errorf("stage %s failed after %d attempts: %w", stage.Name(), p.opts.RetryAttempts, lastErr),
126 Stage: stage.Name(),
127 Took: time.Since(start),
128 }
129}
130
131type TransformStage[T, R any] struct {
132 name string
133 transform func(context.Context, T) (R, error)
134}
135
136func NewTransformStage[T, R any](name string, transform func(context.Context, T) (R, error)) *TransformStage[T, R] {
137 return &TransformStage[T, R]{
138 name: name,
139 transform: transform,
140 }
141}
142
143func (s *TransformStage[T, R]) Name() string {
144 return s.name
145}
146
147func (s *TransformStage[T, R]) Process(ctx context.Context, input T) (R, error) {
148 return s.transform(ctx, input)
149}
150
151type FilterStage[T any] struct {
152 name string
153 predicate func(context.Context, T) (bool, error)
154}
155
156func NewFilterStage[T any](name string, predicate func(context.Context, T) (bool, error)) *FilterStage[T] {
157 return &FilterStage[T]{
158 name: name,
159 predicate: predicate,
160 }
161}
162
163func (s *FilterStage[T]) Name() string {
164 return s.name
165}
166
167func (s *FilterStage[T]) Process(ctx context.Context, input T) (T, error) {
168 keep, err := s.predicate(ctx, input)
169 if err != nil {
170 var zero T
171 return zero, err
172 }
173
174 if !keep {
175 var zero T
176 return zero, fmt.Errorf("item filtered out")
177 }
178
179 return input, nil
180}
181
182type BatchProcessor[T, R any] struct {
183 name string
184 batchSize int
185 processor func(context.Context, []T) ([]R, error)
186 buffer []T
187 mu sync.Mutex
188}
189
190func NewBatchProcessor[T, R any](name string, batchSize int, processor func(context.Context, []T) ([]R, error)) *BatchProcessor[T, R] {
191 return &BatchProcessor[T, R]{
192 name: name,
193 batchSize: batchSize,
194 processor: processor,
195 buffer: make([]T, 0, batchSize),
196 }
197}
198
199func (b *BatchProcessor[T, R]) Name() string {
200 return b.name
201}
202
203func (b *BatchProcessor[T, R]) Process(ctx context.Context, input T) ([]R, error) {
204 b.mu.Lock()
205 defer b.mu.Unlock()
206
207 b.buffer = append(b.buffer, input)
208
209 if len(b.buffer) >= b.batchSize {
210 batch := make([]T, len(b.buffer))
211 copy(batch, b.buffer)
212 b.buffer = b.buffer[:0]
213
214 return b.processor(ctx, batch)
215 }
216
217 return nil, nil
218}
219
220func (b *BatchProcessor[T, R]) Flush(ctx context.Context) ([]R, error) {
221 b.mu.Lock()
222 defer b.mu.Unlock()
223
224 if len(b.buffer) == 0 {
225 return nil, nil
226 }
227
228 batch := make([]T, len(b.buffer))
229 copy(batch, b.buffer)
230 b.buffer = b.buffer[:0]
231
232 return b.processor(ctx, batch)
233}