postgres.go

  1package lock
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"errors"
  7	"fmt"
  8	"time"
  9
 10	"github.com/sethvargo/go-retry"
 11)
 12
 13// NewPostgresSessionLocker returns a SessionLocker that utilizes PostgreSQL's exclusive
 14// session-level advisory lock mechanism.
 15//
 16// This function creates a SessionLocker that can be used to acquire and release a lock for
 17// synchronization purposes. The lock acquisition is retried until it is successfully acquired or
 18// until the failure threshold is reached. The default lock duration is set to 5 minutes, and the
 19// default unlock duration is set to 1 minute.
 20//
 21// If you have long running migrations, you may want to increase the lock duration.
 22//
 23// See [SessionLockerOption] for options that can be used to configure the SessionLocker.
 24func NewPostgresSessionLocker(opts ...SessionLockerOption) (SessionLocker, error) {
 25	cfg := sessionLockerConfig{
 26		lockID: DefaultLockID,
 27		lockProbe: probe{
 28			intervalDuration: 5 * time.Second,
 29			failureThreshold: 60,
 30		},
 31		unlockProbe: probe{
 32			intervalDuration: 2 * time.Second,
 33			failureThreshold: 30,
 34		},
 35	}
 36	for _, opt := range opts {
 37		if err := opt.apply(&cfg); err != nil {
 38			return nil, err
 39		}
 40	}
 41	return &postgresSessionLocker{
 42		lockID: cfg.lockID,
 43		retryLock: retry.WithMaxRetries(
 44			cfg.lockProbe.failureThreshold,
 45			retry.NewConstant(cfg.lockProbe.intervalDuration),
 46		),
 47		retryUnlock: retry.WithMaxRetries(
 48			cfg.unlockProbe.failureThreshold,
 49			retry.NewConstant(cfg.unlockProbe.intervalDuration),
 50		),
 51	}, nil
 52}
 53
 54type postgresSessionLocker struct {
 55	lockID      int64
 56	retryLock   retry.Backoff
 57	retryUnlock retry.Backoff
 58}
 59
 60var _ SessionLocker = (*postgresSessionLocker)(nil)
 61
 62func (l *postgresSessionLocker) SessionLock(ctx context.Context, conn *sql.Conn) error {
 63	return retry.Do(ctx, l.retryLock, func(ctx context.Context) error {
 64		row := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", l.lockID)
 65		var locked bool
 66		if err := row.Scan(&locked); err != nil {
 67			return fmt.Errorf("failed to execute pg_try_advisory_lock: %w", err)
 68		}
 69		if locked {
 70			// A session-level advisory lock was acquired.
 71			return nil
 72		}
 73		// A session-level advisory lock could not be acquired. This is likely because another
 74		// process has already acquired the lock. We will continue retrying until the lock is
 75		// acquired or the maximum number of retries is reached.
 76		return retry.RetryableError(errors.New("failed to acquire lock"))
 77	})
 78}
 79
 80func (l *postgresSessionLocker) SessionUnlock(ctx context.Context, conn *sql.Conn) error {
 81	return retry.Do(ctx, l.retryUnlock, func(ctx context.Context) error {
 82		var unlocked bool
 83		row := conn.QueryRowContext(ctx, "SELECT pg_advisory_unlock($1)", l.lockID)
 84		if err := row.Scan(&unlocked); err != nil {
 85			return fmt.Errorf("failed to execute pg_advisory_unlock: %w", err)
 86		}
 87		if unlocked {
 88			// A session-level advisory lock was released.
 89			return nil
 90		}
 91		/*
 92			docs(md): provide users with some documentation on how they can unlock the session
 93			manually.
 94
 95			This is probably not an issue for 99.99% of users since pg_advisory_unlock_all() will
 96			release all session level advisory locks held by the current session. It is implicitly
 97			invoked at session end, even if the client disconnects ungracefully.
 98
 99			Here is output from a session that has a lock held:
100
101			SELECT pid,granted,((classid::bigint<<32)|objid::bigint)AS goose_lock_id FROM pg_locks
102			WHERE locktype='advisory';
103
104			| pid | granted | goose_lock_id       |
105			|-----|---------|---------------------|
106			| 191 | t       | 5887940537704921958 |
107
108			A forceful way to unlock the session is to terminate the backend with SIGTERM:
109
110			SELECT pg_terminate_backend(191);
111
112			Subsequent commands on the same connection will fail with:
113
114			Query 1 ERROR: FATAL: terminating connection due to administrator command
115		*/
116		return retry.RetryableError(errors.New("failed to unlock session"))
117	})
118}