1package lock
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "time"
9
10 "github.com/sethvargo/go-retry"
11)
12
13// NewPostgresSessionLocker returns a SessionLocker that utilizes PostgreSQL's exclusive
14// session-level advisory lock mechanism.
15//
16// This function creates a SessionLocker that can be used to acquire and release a lock for
17// synchronization purposes. The lock acquisition is retried until it is successfully acquired or
18// until the failure threshold is reached. The default lock duration is set to 5 minutes, and the
19// default unlock duration is set to 1 minute.
20//
21// If you have long running migrations, you may want to increase the lock duration.
22//
23// See [SessionLockerOption] for options that can be used to configure the SessionLocker.
24func NewPostgresSessionLocker(opts ...SessionLockerOption) (SessionLocker, error) {
25 cfg := sessionLockerConfig{
26 lockID: DefaultLockID,
27 lockProbe: probe{
28 intervalDuration: 5 * time.Second,
29 failureThreshold: 60,
30 },
31 unlockProbe: probe{
32 intervalDuration: 2 * time.Second,
33 failureThreshold: 30,
34 },
35 }
36 for _, opt := range opts {
37 if err := opt.apply(&cfg); err != nil {
38 return nil, err
39 }
40 }
41 return &postgresSessionLocker{
42 lockID: cfg.lockID,
43 retryLock: retry.WithMaxRetries(
44 cfg.lockProbe.failureThreshold,
45 retry.NewConstant(cfg.lockProbe.intervalDuration),
46 ),
47 retryUnlock: retry.WithMaxRetries(
48 cfg.unlockProbe.failureThreshold,
49 retry.NewConstant(cfg.unlockProbe.intervalDuration),
50 ),
51 }, nil
52}
53
54type postgresSessionLocker struct {
55 lockID int64
56 retryLock retry.Backoff
57 retryUnlock retry.Backoff
58}
59
60var _ SessionLocker = (*postgresSessionLocker)(nil)
61
62func (l *postgresSessionLocker) SessionLock(ctx context.Context, conn *sql.Conn) error {
63 return retry.Do(ctx, l.retryLock, func(ctx context.Context) error {
64 row := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", l.lockID)
65 var locked bool
66 if err := row.Scan(&locked); err != nil {
67 return fmt.Errorf("failed to execute pg_try_advisory_lock: %w", err)
68 }
69 if locked {
70 // A session-level advisory lock was acquired.
71 return nil
72 }
73 // A session-level advisory lock could not be acquired. This is likely because another
74 // process has already acquired the lock. We will continue retrying until the lock is
75 // acquired or the maximum number of retries is reached.
76 return retry.RetryableError(errors.New("failed to acquire lock"))
77 })
78}
79
80func (l *postgresSessionLocker) SessionUnlock(ctx context.Context, conn *sql.Conn) error {
81 return retry.Do(ctx, l.retryUnlock, func(ctx context.Context) error {
82 var unlocked bool
83 row := conn.QueryRowContext(ctx, "SELECT pg_advisory_unlock($1)", l.lockID)
84 if err := row.Scan(&unlocked); err != nil {
85 return fmt.Errorf("failed to execute pg_advisory_unlock: %w", err)
86 }
87 if unlocked {
88 // A session-level advisory lock was released.
89 return nil
90 }
91 /*
92 docs(md): provide users with some documentation on how they can unlock the session
93 manually.
94
95 This is probably not an issue for 99.99% of users since pg_advisory_unlock_all() will
96 release all session level advisory locks held by the current session. It is implicitly
97 invoked at session end, even if the client disconnects ungracefully.
98
99 Here is output from a session that has a lock held:
100
101 SELECT pid,granted,((classid::bigint<<32)|objid::bigint)AS goose_lock_id FROM pg_locks
102 WHERE locktype='advisory';
103
104 | pid | granted | goose_lock_id |
105 |-----|---------|---------------------|
106 | 191 | t | 5887940537704921958 |
107
108 A forceful way to unlock the session is to terminate the backend with SIGTERM:
109
110 SELECT pg_terminate_backend(191);
111
112 Subsequent commands on the same connection will fail with:
113
114 Query 1 ERROR: FATAL: terminating connection due to administrator command
115 */
116 return retry.RetryableError(errors.New("failed to unlock session"))
117 })
118}