resource.go

  1//go:build go1.18
  2// +build go1.18
  3
  4// Copyright (c) Microsoft Corporation. All rights reserved.
  5// Licensed under the MIT License.
  6
  7package temporal
  8
  9import (
 10	"sync"
 11	"time"
 12)
 13
 14// AcquireResource abstracts a method for refreshing a temporal resource.
 15type AcquireResource[TResource, TState any] func(state TState) (newResource TResource, newExpiration time.Time, err error)
 16
 17// Resource is a temporal resource (usually a credential) that requires periodic refreshing.
 18type Resource[TResource, TState any] struct {
 19	// cond is used to synchronize access to the shared resource embodied by the remaining fields
 20	cond *sync.Cond
 21
 22	// acquiring indicates that some thread/goroutine is in the process of acquiring/updating the resource
 23	acquiring bool
 24
 25	// resource contains the value of the shared resource
 26	resource TResource
 27
 28	// expiration indicates when the shared resource expires; it is 0 if the resource was never acquired
 29	expiration time.Time
 30
 31	// lastAttempt indicates when a thread/goroutine last attempted to acquire/update the resource
 32	lastAttempt time.Time
 33
 34	// acquireResource is the callback function that actually acquires the resource
 35	acquireResource AcquireResource[TResource, TState]
 36}
 37
 38// NewResource creates a new Resource that uses the specified AcquireResource for refreshing.
 39func NewResource[TResource, TState any](ar AcquireResource[TResource, TState]) *Resource[TResource, TState] {
 40	return &Resource[TResource, TState]{cond: sync.NewCond(&sync.Mutex{}), acquireResource: ar}
 41}
 42
 43// Get returns the underlying resource.
 44// If the resource is fresh, no refresh is performed.
 45func (er *Resource[TResource, TState]) Get(state TState) (TResource, error) {
 46	// If the resource is expiring within this time window, update it eagerly.
 47	// This allows other threads/goroutines to keep running by using the not-yet-expired
 48	// resource value while one thread/goroutine updates the resource.
 49	const window = 5 * time.Minute   // This example updates the resource 5 minutes prior to expiration
 50	const backoff = 30 * time.Second // Minimum wait time between eager update attempts
 51
 52	now, acquire, expired := time.Now(), false, false
 53
 54	// acquire exclusive lock
 55	er.cond.L.Lock()
 56	resource := er.resource
 57
 58	for {
 59		expired = er.expiration.IsZero() || er.expiration.Before(now)
 60		if expired {
 61			// The resource was never acquired or has expired
 62			if !er.acquiring {
 63				// If another thread/goroutine is not acquiring/updating the resource, this thread/goroutine will do it
 64				er.acquiring, acquire = true, true
 65				break
 66			}
 67			// Getting here means that this thread/goroutine will wait for the updated resource
 68		} else if er.expiration.Add(-window).Before(now) {
 69			// The resource is valid but is expiring within the time window
 70			if !er.acquiring && er.lastAttempt.Add(backoff).Before(now) {
 71				// If another thread/goroutine is not acquiring/renewing the resource, and none has attempted
 72				// to do so within the last 30 seconds, this thread/goroutine will do it
 73				er.acquiring, acquire = true, true
 74				break
 75			}
 76			// This thread/goroutine will use the existing resource value while another updates it
 77			resource = er.resource
 78			break
 79		} else {
 80			// The resource is not close to expiring, this thread/goroutine should use its current value
 81			resource = er.resource
 82			break
 83		}
 84		// If we get here, wait for the new resource value to be acquired/updated
 85		er.cond.Wait()
 86	}
 87	er.cond.L.Unlock() // Release the lock so no threads/goroutines are blocked
 88
 89	var err error
 90	if acquire {
 91		// This thread/goroutine has been selected to acquire/update the resource
 92		var expiration time.Time
 93		var newValue TResource
 94		er.lastAttempt = now
 95		newValue, expiration, err = er.acquireResource(state)
 96
 97		// Atomically, update the shared resource's new value & expiration.
 98		er.cond.L.Lock()
 99		if err == nil {
100			// Update resource & expiration, return the new value
101			resource = newValue
102			er.resource, er.expiration = resource, expiration
103		} else if !expired {
104			// An eager update failed. Discard the error and return the current--still valid--resource value
105			err = nil
106		}
107		er.acquiring = false // Indicate that no thread/goroutine is currently acquiring the resource
108
109		// Wake up any waiting threads/goroutines since there is a resource they can ALL use
110		er.cond.L.Unlock()
111		er.cond.Broadcast()
112	}
113	return resource, err // Return the resource this thread/goroutine can use
114}
115
116// Expire marks the resource as expired, ensuring it's refreshed on the next call to Get().
117func (er *Resource[TResource, TState]) Expire() {
118	er.cond.L.Lock()
119	defer er.cond.L.Unlock()
120
121	// Reset the expiration as if we never got this resource to begin with
122	er.expiration = time.Time{}
123}