pubsub.go

  1/*
  2 *
  3 * Copyright 2023 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package grpcsync
 20
 21import (
 22	"context"
 23	"sync"
 24)
 25
 26// Subscriber represents an entity that is subscribed to messages published on
 27// a PubSub. It wraps the callback to be invoked by the PubSub when a new
 28// message is published.
 29type Subscriber interface {
 30	// OnMessage is invoked when a new message is published. Implementations
 31	// must not block in this method.
 32	OnMessage(msg any)
 33}
 34
 35// PubSub is a simple one-to-many publish-subscribe system that supports
 36// messages of arbitrary type. It guarantees that messages are delivered in
 37// the same order in which they were published.
 38//
 39// Publisher invokes the Publish() method to publish new messages, while
 40// subscribers interested in receiving these messages register a callback
 41// via the Subscribe() method.
 42//
 43// Once a PubSub is stopped, no more messages can be published, but any pending
 44// published messages will be delivered to the subscribers.  Done may be used
 45// to determine when all published messages have been delivered.
 46type PubSub struct {
 47	cs *CallbackSerializer
 48
 49	// Access to the below fields are guarded by this mutex.
 50	mu          sync.Mutex
 51	msg         any
 52	subscribers map[Subscriber]bool
 53}
 54
 55// NewPubSub returns a new PubSub instance.  Users should cancel the
 56// provided context to shutdown the PubSub.
 57func NewPubSub(ctx context.Context) *PubSub {
 58	return &PubSub{
 59		cs:          NewCallbackSerializer(ctx),
 60		subscribers: map[Subscriber]bool{},
 61	}
 62}
 63
 64// Subscribe registers the provided Subscriber to the PubSub.
 65//
 66// If the PubSub contains a previously published message, the Subscriber's
 67// OnMessage() callback will be invoked asynchronously with the existing
 68// message to begin with, and subsequently for every newly published message.
 69//
 70// The caller is responsible for invoking the returned cancel function to
 71// unsubscribe itself from the PubSub.
 72func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
 73	ps.mu.Lock()
 74	defer ps.mu.Unlock()
 75
 76	ps.subscribers[sub] = true
 77
 78	if ps.msg != nil {
 79		msg := ps.msg
 80		ps.cs.TrySchedule(func(context.Context) {
 81			ps.mu.Lock()
 82			defer ps.mu.Unlock()
 83			if !ps.subscribers[sub] {
 84				return
 85			}
 86			sub.OnMessage(msg)
 87		})
 88	}
 89
 90	return func() {
 91		ps.mu.Lock()
 92		defer ps.mu.Unlock()
 93		delete(ps.subscribers, sub)
 94	}
 95}
 96
 97// Publish publishes the provided message to the PubSub, and invokes
 98// callbacks registered by subscribers asynchronously.
 99func (ps *PubSub) Publish(msg any) {
100	ps.mu.Lock()
101	defer ps.mu.Unlock()
102
103	ps.msg = msg
104	for sub := range ps.subscribers {
105		s := sub
106		ps.cs.TrySchedule(func(context.Context) {
107			ps.mu.Lock()
108			defer ps.mu.Unlock()
109			if !ps.subscribers[s] {
110				return
111			}
112			s.OnMessage(msg)
113		})
114	}
115}
116
117// Done returns a channel that is closed after the context passed to NewPubSub
118// is canceled and all updates have been sent to subscribers.
119func (ps *PubSub) Done() <-chan struct{} {
120	return ps.cs.Done()
121}