Change summary
go.mod | 1 -
go.sum | 2 --
internal/pubsub/broker.go | 25 ++++++++++++++++---------
3 files changed, 16 insertions(+), 12 deletions(-)
Detailed changes
@@ -38,7 +38,6 @@ require (
github.com/invopop/jsonschema v0.13.0
github.com/joho/godotenv v1.5.1
github.com/lucasb-eyer/go-colorful v1.3.0
- github.com/maniartech/signals v1.3.1
github.com/modelcontextprotocol/go-sdk v1.2.0
github.com/muesli/termenv v0.16.0
github.com/ncruces/go-sqlite3 v0.30.4
@@ -243,8 +243,6 @@ github.com/lucasb-eyer/go-colorful v1.3.0 h1:2/yBRLdWBZKrf7gB40FoiKfAWYQ0lqNcbuQ
github.com/lucasb-eyer/go-colorful v1.3.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
-github.com/maniartech/signals v1.3.1 h1:pT3dK6x5Un+B6L3ZLAKygEe+L49TClPreyT08vOoHXY=
-github.com/maniartech/signals v1.3.1/go.mod h1:AbE8Yy9ZjKCWNU/VhQ+0Ea9KOaTWHp6aOfdLBe5m1iM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byFGLdw=
@@ -2,31 +2,38 @@ package pubsub
import (
"context"
-
- "github.com/maniartech/signals"
+ "sync"
)
-// Broker is a generic pub/sub broker backed by maniartech/signals.
+// Broker is a generic pub/sub broker using callbacks.
type Broker[T any] struct {
- signal *signals.AsyncSignal[Event[T]]
+ listeners []func(Event[T])
+ mu sync.RWMutex
}
// NewBroker creates a new broker.
func NewBroker[T any]() *Broker[T] {
return &Broker[T]{
- signal: signals.New[Event[T]](),
+ listeners: make([]func(Event[T]), 0),
}
}
// AddListener registers a callback for events.
func (b *Broker[T]) AddListener(fn func(Event[T])) {
- b.signal.AddListener(func(_ context.Context, event Event[T]) {
- fn(event)
- })
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ b.listeners = append(b.listeners, fn)
}
// Publish emits an event to all listeners without blocking.
func (b *Broker[T]) Publish(ctx context.Context, t EventType, payload T) {
+ b.mu.RLock()
+ listeners := make([]func(Event[T]), len(b.listeners))
+ copy(listeners, b.listeners)
+ b.mu.RUnlock()
+
event := Event[T]{Type: t, Payload: payload}
- go b.signal.Emit(ctx, event)
+ for _, fn := range listeners {
+ go fn(event)
+ }
}