1/*
2 *
3 * Copyright 2023 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpcsync
20
21import (
22 "context"
23 "sync"
24)
25
26// Subscriber represents an entity that is subscribed to messages published on
27// a PubSub. It wraps the callback to be invoked by the PubSub when a new
28// message is published.
29type Subscriber interface {
30 // OnMessage is invoked when a new message is published. Implementations
31 // must not block in this method.
32 OnMessage(msg any)
33}
34
35// PubSub is a simple one-to-many publish-subscribe system that supports
36// messages of arbitrary type. It guarantees that messages are delivered in
37// the same order in which they were published.
38//
39// Publisher invokes the Publish() method to publish new messages, while
40// subscribers interested in receiving these messages register a callback
41// via the Subscribe() method.
42//
43// Once a PubSub is stopped, no more messages can be published, but any pending
44// published messages will be delivered to the subscribers. Done may be used
45// to determine when all published messages have been delivered.
46type PubSub struct {
47 cs *CallbackSerializer
48
49 // Access to the below fields are guarded by this mutex.
50 mu sync.Mutex
51 msg any
52 subscribers map[Subscriber]bool
53}
54
55// NewPubSub returns a new PubSub instance. Users should cancel the
56// provided context to shutdown the PubSub.
57func NewPubSub(ctx context.Context) *PubSub {
58 return &PubSub{
59 cs: NewCallbackSerializer(ctx),
60 subscribers: map[Subscriber]bool{},
61 }
62}
63
64// Subscribe registers the provided Subscriber to the PubSub.
65//
66// If the PubSub contains a previously published message, the Subscriber's
67// OnMessage() callback will be invoked asynchronously with the existing
68// message to begin with, and subsequently for every newly published message.
69//
70// The caller is responsible for invoking the returned cancel function to
71// unsubscribe itself from the PubSub.
72func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
73 ps.mu.Lock()
74 defer ps.mu.Unlock()
75
76 ps.subscribers[sub] = true
77
78 if ps.msg != nil {
79 msg := ps.msg
80 ps.cs.TrySchedule(func(context.Context) {
81 ps.mu.Lock()
82 defer ps.mu.Unlock()
83 if !ps.subscribers[sub] {
84 return
85 }
86 sub.OnMessage(msg)
87 })
88 }
89
90 return func() {
91 ps.mu.Lock()
92 defer ps.mu.Unlock()
93 delete(ps.subscribers, sub)
94 }
95}
96
97// Publish publishes the provided message to the PubSub, and invokes
98// callbacks registered by subscribers asynchronously.
99func (ps *PubSub) Publish(msg any) {
100 ps.mu.Lock()
101 defer ps.mu.Unlock()
102
103 ps.msg = msg
104 for sub := range ps.subscribers {
105 s := sub
106 ps.cs.TrySchedule(func(context.Context) {
107 ps.mu.Lock()
108 defer ps.mu.Unlock()
109 if !ps.subscribers[s] {
110 return
111 }
112 s.OnMessage(msg)
113 })
114 }
115}
116
117// Done returns a channel that is closed after the context passed to NewPubSub
118// is canceled and all updates have been sent to subscribers.
119func (ps *PubSub) Done() <-chan struct{} {
120 return ps.cs.Done()
121}