1//go:build go1.18
2// +build go1.18
3
4// Copyright (c) Microsoft Corporation. All rights reserved.
5// Licensed under the MIT License.
6
7package temporal
8
9import (
10 "sync"
11 "time"
12)
13
14// AcquireResource abstracts a method for refreshing a temporal resource.
15type AcquireResource[TResource, TState any] func(state TState) (newResource TResource, newExpiration time.Time, err error)
16
17// Resource is a temporal resource (usually a credential) that requires periodic refreshing.
18type Resource[TResource, TState any] struct {
19 // cond is used to synchronize access to the shared resource embodied by the remaining fields
20 cond *sync.Cond
21
22 // acquiring indicates that some thread/goroutine is in the process of acquiring/updating the resource
23 acquiring bool
24
25 // resource contains the value of the shared resource
26 resource TResource
27
28 // expiration indicates when the shared resource expires; it is 0 if the resource was never acquired
29 expiration time.Time
30
31 // lastAttempt indicates when a thread/goroutine last attempted to acquire/update the resource
32 lastAttempt time.Time
33
34 // acquireResource is the callback function that actually acquires the resource
35 acquireResource AcquireResource[TResource, TState]
36}
37
38// NewResource creates a new Resource that uses the specified AcquireResource for refreshing.
39func NewResource[TResource, TState any](ar AcquireResource[TResource, TState]) *Resource[TResource, TState] {
40 return &Resource[TResource, TState]{cond: sync.NewCond(&sync.Mutex{}), acquireResource: ar}
41}
42
43// Get returns the underlying resource.
44// If the resource is fresh, no refresh is performed.
45func (er *Resource[TResource, TState]) Get(state TState) (TResource, error) {
46 // If the resource is expiring within this time window, update it eagerly.
47 // This allows other threads/goroutines to keep running by using the not-yet-expired
48 // resource value while one thread/goroutine updates the resource.
49 const window = 5 * time.Minute // This example updates the resource 5 minutes prior to expiration
50 const backoff = 30 * time.Second // Minimum wait time between eager update attempts
51
52 now, acquire, expired := time.Now(), false, false
53
54 // acquire exclusive lock
55 er.cond.L.Lock()
56 resource := er.resource
57
58 for {
59 expired = er.expiration.IsZero() || er.expiration.Before(now)
60 if expired {
61 // The resource was never acquired or has expired
62 if !er.acquiring {
63 // If another thread/goroutine is not acquiring/updating the resource, this thread/goroutine will do it
64 er.acquiring, acquire = true, true
65 break
66 }
67 // Getting here means that this thread/goroutine will wait for the updated resource
68 } else if er.expiration.Add(-window).Before(now) {
69 // The resource is valid but is expiring within the time window
70 if !er.acquiring && er.lastAttempt.Add(backoff).Before(now) {
71 // If another thread/goroutine is not acquiring/renewing the resource, and none has attempted
72 // to do so within the last 30 seconds, this thread/goroutine will do it
73 er.acquiring, acquire = true, true
74 break
75 }
76 // This thread/goroutine will use the existing resource value while another updates it
77 resource = er.resource
78 break
79 } else {
80 // The resource is not close to expiring, this thread/goroutine should use its current value
81 resource = er.resource
82 break
83 }
84 // If we get here, wait for the new resource value to be acquired/updated
85 er.cond.Wait()
86 }
87 er.cond.L.Unlock() // Release the lock so no threads/goroutines are blocked
88
89 var err error
90 if acquire {
91 // This thread/goroutine has been selected to acquire/update the resource
92 var expiration time.Time
93 var newValue TResource
94 er.lastAttempt = now
95 newValue, expiration, err = er.acquireResource(state)
96
97 // Atomically, update the shared resource's new value & expiration.
98 er.cond.L.Lock()
99 if err == nil {
100 // Update resource & expiration, return the new value
101 resource = newValue
102 er.resource, er.expiration = resource, expiration
103 } else if !expired {
104 // An eager update failed. Discard the error and return the current--still valid--resource value
105 err = nil
106 }
107 er.acquiring = false // Indicate that no thread/goroutine is currently acquiring the resource
108
109 // Wake up any waiting threads/goroutines since there is a resource they can ALL use
110 er.cond.L.Unlock()
111 er.cond.Broadcast()
112 }
113 return resource, err // Return the resource this thread/goroutine can use
114}
115
116// Expire marks the resource as expired, ensuring it's refreshed on the next call to Get().
117func (er *Resource[TResource, TState]) Expire() {
118 er.cond.L.Lock()
119 defer er.cond.L.Unlock()
120
121 // Reset the expiration as if we never got this resource to begin with
122 er.expiration = time.Time{}
123}