From abeaff09a811234805d566db38131cf5cbbda94b Mon Sep 17 00:00:00 2001 From: Yeonuk Hwang Date: Sat, 9 May 2026 00:20:04 +0930 Subject: [PATCH] fix(pubsub): respect channelBufferSize parameter in Subsribe `NewBrokerWithOptions` accepted `channelBufferSize` parameter, but never stored or used it, so `Subsribe`` always used the hardcoded `bufferSize` constant (64). --- internal/pubsub/broker.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/internal/pubsub/broker.go b/internal/pubsub/broker.go index 2faf7f89b7c982950bfc69801a7901526e37eec4..52e827c2d6306fa2365372b9867810d9b99d0227 100644 --- a/internal/pubsub/broker.go +++ b/internal/pubsub/broker.go @@ -8,11 +8,12 @@ import ( const bufferSize = 64 type Broker[T any] struct { - subs map[chan Event[T]]struct{} - mu sync.RWMutex - done chan struct{} - subCount int - maxEvents int + subs map[chan Event[T]]struct{} + mu sync.RWMutex + done chan struct{} + subCount int + maxEvents int + channelBufferSize int } func NewBroker[T any]() *Broker[T] { @@ -21,9 +22,10 @@ func NewBroker[T any]() *Broker[T] { func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] { return &Broker[T]{ - subs: make(map[chan Event[T]]struct{}), - done: make(chan struct{}), - maxEvents: maxEvents, + subs: make(map[chan Event[T]]struct{}), + done: make(chan struct{}), + maxEvents: maxEvents, + channelBufferSize: channelBufferSize, } } @@ -58,7 +60,7 @@ func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] { default: } - sub := make(chan Event[T], bufferSize) + sub := make(chan Event[T], b.channelBufferSize) b.subs[sub] = struct{}{} b.subCount++