Add vendor directory and update vendorHash to null

This commit is contained in:
soup 2026-01-17 22:29:54 -05:00
parent 523831cb8d
commit 1e5424c844
Signed by: soup
SSH key fingerprint: SHA256:GYxje8eQkJ6HZKzVWDdyOUF1TyDiprruGhE0Ym8qYDY
778 changed files with 407919 additions and 1 deletions

View file

@ -0,0 +1,282 @@
package store
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
"go.uber.org/multierr"
)
// NewPostgres creates a new Postgres-based [LockStore].
func NewPostgres(tableName string) (LockStore, error) {
if tableName == "" {
return nil, errors.New("table name must not be empty")
}
return &postgresStore{
tableName: tableName,
}, nil
}
var _ LockStore = (*postgresStore)(nil)
type postgresStore struct {
tableName string
}
func (s *postgresStore) TableExists(
ctx context.Context,
db *sql.DB,
) (bool, error) {
var query string
schemaName, tableName := parseTableIdentifier(s.tableName)
if schemaName != "" {
q := `SELECT EXISTS ( SELECT 1 FROM pg_tables WHERE schemaname = '%s' AND tablename = '%s' )`
query = fmt.Sprintf(q, schemaName, tableName)
} else {
q := `SELECT EXISTS ( SELECT 1 FROM pg_tables WHERE (current_schema() IS NULL OR schemaname = current_schema()) AND tablename = '%s' )`
query = fmt.Sprintf(q, tableName)
}
var exists bool
if err := db.QueryRowContext(ctx, query).Scan(
&exists,
); err != nil {
return false, fmt.Errorf("table exists: %w", err)
}
return exists, nil
}
func (s *postgresStore) CreateLockTable(
ctx context.Context,
db *sql.DB,
) error {
exists, err := s.TableExists(ctx, db)
if err != nil {
return fmt.Errorf("check lock table existence: %w", err)
}
if exists {
return nil
}
query := fmt.Sprintf(`CREATE TABLE %s (
lock_id bigint NOT NULL PRIMARY KEY,
locked boolean NOT NULL DEFAULT false,
locked_at timestamptz NULL,
locked_by text NULL,
lease_expires_at timestamptz NULL,
updated_at timestamptz NULL
)`, s.tableName)
if _, err := db.ExecContext(ctx, query); err != nil {
// Double-check if another process created it concurrently
if exists, checkErr := s.TableExists(ctx, db); checkErr == nil && exists {
// Another process created it, that's fine!
return nil
}
return fmt.Errorf("create lock table %q: %w", s.tableName, err)
}
return nil
}
func (s *postgresStore) AcquireLock(
ctx context.Context,
db *sql.DB,
lockID int64,
lockedBy string,
leaseDuration time.Duration,
) (*AcquireLockResult, error) {
query := fmt.Sprintf(`INSERT INTO %s (lock_id, locked, locked_at, locked_by, lease_expires_at, updated_at)
VALUES ($1, true, now(), $2, now() + $3::interval, now())
ON CONFLICT (lock_id) DO UPDATE SET
locked = true,
locked_at = now(),
locked_by = $2,
lease_expires_at = now() + $3::interval,
updated_at = now()
WHERE %s.locked = false OR %s.lease_expires_at < now()
RETURNING locked_by, lease_expires_at`, s.tableName, s.tableName, s.tableName)
// Convert duration to PostgreSQL interval format
leaseDurationStr := formatDurationAsInterval(leaseDuration)
var returnedLockedBy string
var leaseExpiresAt time.Time
err := db.QueryRowContext(ctx, query,
lockID,
lockedBy,
leaseDurationStr,
).Scan(
&returnedLockedBy,
&leaseExpiresAt,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// TODO(mf): should we return a special error type here?
return nil, fmt.Errorf("acquire lock %d: already held by another instance", lockID)
}
return nil, fmt.Errorf("acquire lock %d: %w", lockID, err)
}
// Verify we got the lock by checking the returned locked_by matches our instance ID
if returnedLockedBy != lockedBy {
return nil, fmt.Errorf("acquire lock %d: acquired by %s instead of %s", lockID, returnedLockedBy, lockedBy)
}
return &AcquireLockResult{
LockedBy: returnedLockedBy,
LeaseExpiresAt: leaseExpiresAt,
}, nil
}
func (s *postgresStore) ReleaseLock(
ctx context.Context,
db *sql.DB,
lockID int64,
lockedBy string,
) (*ReleaseLockResult, error) {
// Release lock only if it's held by the current instance
query := fmt.Sprintf(`UPDATE %s SET
locked = false,
locked_at = NULL,
locked_by = NULL,
lease_expires_at = NULL,
updated_at = now()
WHERE lock_id = $1 AND locked_by = $2
RETURNING lock_id`, s.tableName)
var returnedLockID int64
err := db.QueryRowContext(ctx, query,
lockID,
lockedBy,
).Scan(
&returnedLockID,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// TODO(mf): should we return a special error type here?
return nil, fmt.Errorf("release lock %d: not held by this instance", lockID)
}
return nil, fmt.Errorf("release lock %d: %w", lockID, err)
}
// Verify the correct lock was released
if returnedLockID != lockID {
return nil, fmt.Errorf("release lock %d: returned lock ID %d does not match", lockID, returnedLockID)
}
return &ReleaseLockResult{
LockID: returnedLockID,
}, nil
}
func (s *postgresStore) UpdateLease(
ctx context.Context,
db *sql.DB,
lockID int64,
lockedBy string,
leaseDuration time.Duration,
) (*UpdateLeaseResult, error) {
// Update lease expiration time for heartbeat, only if we own the lock
query := fmt.Sprintf(`UPDATE %s SET
lease_expires_at = now() + $1::interval,
updated_at = now()
WHERE lock_id = $2 AND locked_by = $3 AND locked = true
RETURNING lease_expires_at`, s.tableName)
// Convert duration to PostgreSQL interval format
intervalStr := formatDurationAsInterval(leaseDuration)
var leaseExpiresAt time.Time
err := db.QueryRowContext(ctx, query,
intervalStr,
lockID,
lockedBy,
).Scan(
&leaseExpiresAt,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("failed to update lease for lock %d: not held by this instance", lockID)
}
return nil, fmt.Errorf("failed to update lease for lock %d: %w", lockID, err)
}
return &UpdateLeaseResult{
LeaseExpiresAt: leaseExpiresAt,
}, nil
}
func (s *postgresStore) CheckLockStatus(
ctx context.Context,
db *sql.DB,
lockID int64,
) (*LockStatus, error) {
query := fmt.Sprintf(`SELECT locked, locked_by, lease_expires_at, updated_at FROM %s WHERE lock_id = $1`, s.tableName)
var status LockStatus
err := db.QueryRowContext(ctx, query,
lockID,
).Scan(
&status.Locked,
&status.LockedBy,
&status.LeaseExpiresAt,
&status.UpdatedAt,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("lock %d not found", lockID)
}
return nil, fmt.Errorf("check lock status for %d: %w", lockID, err)
}
return &status, nil
}
func (s *postgresStore) CleanupStaleLocks(ctx context.Context, db *sql.DB) (_ []int64, retErr error) {
query := fmt.Sprintf(`UPDATE %s SET
locked = false,
locked_at = NULL,
locked_by = NULL,
lease_expires_at = NULL,
updated_at = now()
WHERE locked = true AND lease_expires_at < now()
RETURNING lock_id`, s.tableName)
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("cleanup stale locks: %w", err)
}
defer func() {
retErr = multierr.Append(retErr, rows.Close())
}()
var cleanedLocks []int64
for rows.Next() {
var lockID int64
if err := rows.Scan(&lockID); err != nil {
return nil, fmt.Errorf("scan cleaned lock ID: %w", err)
}
cleanedLocks = append(cleanedLocks, lockID)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate over cleaned locks: %w", err)
}
return cleanedLocks, nil
}
// formatDurationAsInterval converts a time.Duration to PostgreSQL interval format
func formatDurationAsInterval(d time.Duration) string {
return fmt.Sprintf("%d seconds", int(d.Seconds()))
}
func parseTableIdentifier(name string) (schema, table string) {
schema, table, found := strings.Cut(name, ".")
if !found {
return "", name
}
return schema, table
}

View file

@ -0,0 +1,51 @@
package store
import (
"context"
"database/sql"
"time"
)
// LockStore defines the interface for storing and managing database locks.
type LockStore interface {
// CreateLockTable creates the lock table if it doesn't exist. Implementations should ensure
// that this operation is idempotent.
CreateLockTable(ctx context.Context, db *sql.DB) error
// TableExists checks if the lock table exists.
TableExists(ctx context.Context, db *sql.DB) (bool, error)
// AcquireLock attempts to acquire a lock for the given lockID.
AcquireLock(ctx context.Context, db *sql.DB, lockID int64, lockedBy string, leaseDuration time.Duration) (*AcquireLockResult, error)
// ReleaseLock releases a lock held by the current instance.
ReleaseLock(ctx context.Context, db *sql.DB, lockID int64, lockedBy string) (*ReleaseLockResult, error)
// UpdateLease updates the lease expiration time for a lock (heartbeat).
UpdateLease(ctx context.Context, db *sql.DB, lockID int64, lockedBy string, leaseDuration time.Duration) (*UpdateLeaseResult, error)
// CheckLockStatus checks the current status of a lock.
CheckLockStatus(ctx context.Context, db *sql.DB, lockID int64) (*LockStatus, error)
// CleanupStaleLocks removes any locks that have expired using server time. Returns the list of
// lock IDs that were cleaned up, if any.
CleanupStaleLocks(ctx context.Context, db *sql.DB) ([]int64, error)
}
// LockStatus represents the current status of a lock.
type LockStatus struct {
Locked bool
LockedBy *string
LeaseExpiresAt *time.Time
UpdatedAt *time.Time
}
// AcquireLockResult contains the result of a lock acquisition attempt.
type AcquireLockResult struct {
LockedBy string
LeaseExpiresAt time.Time
}
// ReleaseLockResult contains the result of a lock release.
type ReleaseLockResult struct {
LockID int64
}
// UpdateLeaseResult contains the result of a lease update.
type UpdateLeaseResult struct {
LeaseExpiresAt time.Time
}

View file

@ -0,0 +1,28 @@
package table
import (
"log/slog"
"time"
)
// Config holds configuration for table locker.
type Config struct {
TableName string
LockID int64
LeaseDuration time.Duration
HeartbeatInterval time.Duration
LockTimeout ProbeConfig
UnlockTimeout ProbeConfig
// Optional logger for lock operations
Logger *slog.Logger
// Optional custom retry policy for database errors
RetryPolicy RetryPolicyFunc
}
// ProbeConfig holds retry configuration.
type ProbeConfig struct {
IntervalDuration time.Duration
FailureThreshold uint64
}

View file

@ -0,0 +1,237 @@
package table
import (
"cmp"
"context"
"crypto/rand"
"database/sql"
"encoding/hex"
"fmt"
"log/slog"
"os"
"sync"
"time"
"github.com/pressly/goose/v3/lock/internal/store"
"github.com/sethvargo/go-retry"
)
// RetryPolicyFunc inspects an error and returns whether the caller should retry the operation. This
// allows for database-specific error handling without hardcoding driver-specific logic.
type RetryPolicyFunc func(err error) bool
// Locker implements table-based locking for databases. This implementation is safe for concurrent
// use by multiple goroutines.
type Locker struct {
store store.LockStore
tableName string
lockID int64
instanceID string
leaseDuration time.Duration
heartbeatInterval time.Duration
retryLock retry.Backoff
retryUnlock retry.Backoff
logger *slog.Logger
retryPolicy RetryPolicyFunc
// Application-level coordination
mu sync.Mutex
// Heartbeat management
heartbeatCancel context.CancelFunc
heartbeatDone chan struct{}
}
// New creates a new table-based locker.
func New(lockStore store.LockStore, cfg Config) *Locker {
// Generate instance identifier
hostname, _ := os.Hostname()
hostname = cmp.Or(hostname, "unknown-hostname")
instanceID := fmt.Sprintf("%s-%d-%s", hostname, os.Getpid(), randomHex(4))
logger := cfg.Logger
if logger == nil {
logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
}
return &Locker{
store: lockStore,
tableName: cfg.TableName,
lockID: cfg.LockID,
instanceID: instanceID,
leaseDuration: cfg.LeaseDuration,
heartbeatInterval: cfg.HeartbeatInterval,
logger: logger,
retryPolicy: cfg.RetryPolicy,
retryLock: retry.WithMaxRetries(
cfg.LockTimeout.FailureThreshold,
// Add +/- 25% jitter to reduce thundering herd
retry.WithJitterPercent(25, retry.NewConstant(cfg.LockTimeout.IntervalDuration)),
),
retryUnlock: retry.WithMaxRetries(
cfg.UnlockTimeout.FailureThreshold,
// Add +/- 25% jitter to reduce thundering herd
retry.WithJitterPercent(25, retry.NewConstant(cfg.UnlockTimeout.IntervalDuration)),
),
}
}
// Lock acquires the database lock. This method is safe for concurrent use - the mutex is held until
// Unlock() is called. Only one goroutine can hold the lock at a time across the entire lifecycle.
func (l *Locker) Lock(ctx context.Context, db *sql.DB) error {
l.mu.Lock()
// NOTE: mutex is NOT defer unlocked here, it remains held until Unlock() is called explicitly
// or a specific error occurs below!
// Ensure the lock table exists
if err := l.store.CreateLockTable(ctx, db); err != nil {
l.mu.Unlock()
return fmt.Errorf("ensure lock table exists: %w", err)
}
err := retry.Do(ctx, l.retryLock, func(ctx context.Context) error {
_, err := l.store.AcquireLock(ctx, db, l.lockID, l.instanceID, l.leaseDuration)
if err != nil {
// Clean up any stale locks before retrying
if _, cleanupErr := l.store.CleanupStaleLocks(ctx, db); cleanupErr != nil {
l.logger.WarnContext(ctx, "failed to cleanup stale locks",
slog.Int64("lock_table", l.lockID),
slog.Any("error", cleanupErr),
)
// Continue with retry, cleanup failure shouldn't block acquisition attempts
}
if l.shouldRetry(err) {
return retry.RetryableError(fmt.Errorf("acquire retryable lock: %w", err))
}
return fmt.Errorf("acquire lock: %w", err)
}
return nil
})
if err != nil {
l.mu.Unlock()
l.logger.WarnContext(ctx, "failed to acquire lock after retries",
slog.Int64("lock_id", l.lockID),
slog.String("instance_id", l.instanceID),
slog.Any("error", err),
)
return fmt.Errorf("acquire lock %d after retries: %w", l.lockID, err)
}
l.logger.DebugContext(ctx, "successfully acquired lock",
slog.Int64("lock_id", l.lockID),
slog.String("instance_id", l.instanceID),
slog.Duration("lease_duration", l.leaseDuration),
)
// Start heartbeat to maintain the lease
l.startHeartbeat(ctx, db)
// Mutex remains held - will be released in Unlock()
return nil
}
// Unlock releases the database lock. This method must be called exactly once after a successful
// Lock() call.
func (l *Locker) Unlock(ctx context.Context, db *sql.DB) error {
// NOTE: The mutex was acquired in Lock() and is still held
defer l.mu.Unlock()
// Use a context that can't be cancelled to ensure we always attempt to unlock even if the
// caller's context is cancelled. The call can control the retry behavior via the configured
// timeouts.
ctx = context.WithoutCancel(ctx)
// Stop heartbeat first
l.stopHeartbeat()
err := retry.Do(ctx, l.retryUnlock, func(ctx context.Context) error {
_, err := l.store.ReleaseLock(ctx, db, l.lockID, l.instanceID)
if err != nil {
if l.shouldRetry(err) {
return retry.RetryableError(fmt.Errorf("release retryable lock: %w", err))
}
return fmt.Errorf("release lock: %w", err)
}
return nil
})
if err != nil {
l.logger.WarnContext(ctx, "failed to release lock",
slog.Int64("lock_id", l.lockID),
slog.String("instance_id", l.instanceID),
slog.Any("error", err),
)
return err
}
l.logger.DebugContext(ctx, "successfully released lock",
slog.Int64("lock_id", l.lockID),
slog.String("instance_id", l.instanceID),
)
return nil
}
// startHeartbeat starts the heartbeat goroutine (called from within Lock with mutex held)
func (l *Locker) startHeartbeat(parentCtx context.Context, db *sql.DB) {
// If there's already a heartbeat running, stop it first
l.stopHeartbeat()
// Create a new context for the heartbeat
ctx, cancel := context.WithCancel(parentCtx)
l.heartbeatCancel = cancel
l.heartbeatDone = make(chan struct{})
go func() {
defer close(l.heartbeatDone)
ticker := time.NewTicker(l.heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
result, err := l.store.UpdateLease(ctx, db, l.lockID, l.instanceID, l.leaseDuration)
if err != nil {
// TODO(mf): should we add a retry policy here?
l.logger.WarnContext(ctx, "heartbeat failed to update lease",
slog.Int64("lock_id", l.lockID),
slog.String("instance_id", l.instanceID),
slog.Any("error", err),
)
continue
}
l.logger.DebugContext(ctx, "heartbeat updated lease",
slog.Int64("lock_id", l.lockID),
slog.String("instance_id", l.instanceID),
slog.Time("lease_expires_at", result.LeaseExpiresAt),
)
}
}
}()
}
// stopHeartbeat stops the heartbeat goroutine (called from within Unlock with mutex held).
func (l *Locker) stopHeartbeat() {
if l.heartbeatCancel != nil {
l.heartbeatCancel()
<-l.heartbeatDone
l.heartbeatCancel = nil
l.heartbeatDone = nil
}
}
// shouldRetry determines whether an error is retryable based on the configured retry policy. If no
// retry policy is configured, it defaults to always retrying.
func (l *Locker) shouldRetry(err error) bool {
if l.retryPolicy != nil {
return l.retryPolicy(err)
}
return true
}
func randomHex(n int) string {
b := make([]byte, n)
if _, err := rand.Read(b); err != nil {
return fmt.Sprintf("%0*x", n*2, time.Now().UnixNano())
}
return hex.EncodeToString(b)
}

32
vendor/github.com/pressly/goose/v3/lock/locker.go generated vendored Normal file
View file

@ -0,0 +1,32 @@
// Package lock defines the Locker interface and implements the locking logic.
package lock
import (
"context"
"database/sql"
"errors"
)
var (
// ErrLockNotImplemented is returned when the database does not support locking.
ErrLockNotImplemented = errors.New("lock not implemented")
// ErrUnlockNotImplemented is returned when the database does not support unlocking.
ErrUnlockNotImplemented = errors.New("unlock not implemented")
)
// SessionLocker is the interface to lock and unlock the database for the duration of a session. The
// session is defined as the duration of a single connection and both methods must be called on the
// same connection.
type SessionLocker interface {
SessionLock(ctx context.Context, conn *sql.Conn) error
SessionUnlock(ctx context.Context, conn *sql.Conn) error
}
// Locker is the interface to lock and unlock the database.
//
// Unlike [SessionLocker], the Lock and Unlock methods are called on a [*sql.DB] and do not require
// the same connection to be used for both methods.
type Locker interface {
Lock(ctx context.Context, db *sql.DB) error
Unlock(ctx context.Context, db *sql.DB) error
}

165
vendor/github.com/pressly/goose/v3/lock/postgres.go generated vendored Normal file
View file

@ -0,0 +1,165 @@
package lock
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/pressly/goose/v3/lock/internal/store"
"github.com/pressly/goose/v3/lock/internal/table"
"github.com/sethvargo/go-retry"
)
// NewPostgresTableLocker returns a Locker that uses PostgreSQL table-based locking. It manages a
// single lock row and keeps the lock alive automatically.
//
// Default behavior:
//
// - Lease (30s): How long the lock is valid if heartbeat stops
// - Heartbeat (5s): How often the lock gets refreshed to keep it alive
// - If the process dies, others can take the lock after lease expires
//
// Defaults:
//
// Table: "goose_lock"
// Lock ID: 4097083626 (crc64 of "goose")
// Lock retry: 5s intervals, 5min timeout
// Unlock retry: 2s intervals, 1min timeout
//
// Lock and Unlock both retry on failure. Lock stays alive automatically until released. All
// defaults can be overridden with options.
func NewPostgresTableLocker(options ...TableLockerOption) (Locker, error) {
config := table.Config{
TableName: DefaultLockTableName,
LockID: DefaultLockID,
LeaseDuration: 30 * time.Second,
HeartbeatInterval: 5 * time.Second,
LockTimeout: table.ProbeConfig{
IntervalDuration: 5 * time.Second,
FailureThreshold: 60, // 5 minutes total
},
UnlockTimeout: table.ProbeConfig{
IntervalDuration: 2 * time.Second,
FailureThreshold: 30, // 1 minute total
},
}
for _, opt := range options {
if err := opt.apply(&config); err != nil {
return nil, err
}
}
lockStore, err := store.NewPostgres(config.TableName)
if err != nil {
return nil, fmt.Errorf("create lock store: %w", err)
}
return table.New(lockStore, config), nil
}
// NewPostgresSessionLocker returns a SessionLocker that utilizes PostgreSQL's exclusive
// session-level advisory lock mechanism.
//
// This function creates a SessionLocker that can be used to acquire and release a lock for
// synchronization purposes. The lock acquisition is retried until it is successfully acquired or
// until the failure threshold is reached. The default lock duration is set to 5 minutes, and the
// default unlock duration is set to 1 minute.
//
// If you have long running migrations, you may want to increase the lock duration.
//
// See [SessionLockerOption] for options that can be used to configure the SessionLocker.
func NewPostgresSessionLocker(opts ...SessionLockerOption) (SessionLocker, error) {
cfg := sessionLockerConfig{
lockID: DefaultLockID,
lockProbe: probe{
intervalDuration: 5 * time.Second,
failureThreshold: 60,
},
unlockProbe: probe{
intervalDuration: 2 * time.Second,
failureThreshold: 30,
},
}
for _, opt := range opts {
if err := opt.apply(&cfg); err != nil {
return nil, err
}
}
return &postgresSessionLocker{
lockID: cfg.lockID,
retryLock: retry.WithMaxRetries(
cfg.lockProbe.failureThreshold,
retry.NewConstant(cfg.lockProbe.intervalDuration),
),
retryUnlock: retry.WithMaxRetries(
cfg.unlockProbe.failureThreshold,
retry.NewConstant(cfg.unlockProbe.intervalDuration),
),
}, nil
}
type postgresSessionLocker struct {
lockID int64
retryLock retry.Backoff
retryUnlock retry.Backoff
}
var _ SessionLocker = (*postgresSessionLocker)(nil)
func (l *postgresSessionLocker) SessionLock(ctx context.Context, conn *sql.Conn) error {
return retry.Do(ctx, l.retryLock, func(ctx context.Context) error {
row := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", l.lockID)
var locked bool
if err := row.Scan(&locked); err != nil {
return fmt.Errorf("failed to execute pg_try_advisory_lock: %w", err)
}
if locked {
// A session-level advisory lock was acquired.
return nil
}
// A session-level advisory lock could not be acquired. This is likely because another
// process has already acquired the lock. We will continue retrying until the lock is
// acquired or the maximum number of retries is reached.
return retry.RetryableError(errors.New("failed to acquire lock"))
})
}
func (l *postgresSessionLocker) SessionUnlock(ctx context.Context, conn *sql.Conn) error {
return retry.Do(ctx, l.retryUnlock, func(ctx context.Context) error {
var unlocked bool
row := conn.QueryRowContext(ctx, "SELECT pg_advisory_unlock($1)", l.lockID)
if err := row.Scan(&unlocked); err != nil {
return fmt.Errorf("failed to execute pg_advisory_unlock: %w", err)
}
if unlocked {
// A session-level advisory lock was released.
return nil
}
/*
docs(md): provide users with some documentation on how they can unlock the session
manually.
This is probably not an issue for 99.99% of users since pg_advisory_unlock_all() will
release all session level advisory locks held by the current session. It is implicitly
invoked at session end, even if the client disconnects ungracefully.
Here is output from a session that has a lock held:
SELECT pid, granted, ((classid::bigint << 32) | objid::bigint) AS goose_lock_id FROM
pg_locks WHERE locktype = 'advisory';
| pid | granted | goose_lock_id |
|-----|---------|---------------------|
| 191 | t | 4097083626 |
A forceful way to unlock the session is to terminate the backend with SIGTERM:
SELECT pg_terminate_backend(191);
Subsequent commands on the same connection will fail with:
Query 1 ERROR: FATAL: terminating connection due to administrator command
*/
return retry.RetryableError(errors.New("failed to unlock session"))
})
}

View file

@ -0,0 +1,98 @@
package lock
import (
"errors"
"time"
)
const (
// DefaultLockID is the id used to lock the database for migrations. It is a crc64 hash of the
// string "goose". This is used to ensure that the lock is unique to goose.
//
// crc32.Checksum([]byte("goose"), crc32.MakeTable(crc32.IEEE))
DefaultLockID int64 = 4097083626
)
// SessionLockerOption is used to configure a SessionLocker.
type SessionLockerOption interface {
apply(*sessionLockerConfig) error
}
// WithLockID sets the lock ID to use when locking the database.
//
// If WithLockID is not called, the DefaultLockID is used.
func WithLockID(lockID int64) SessionLockerOption {
return sessionLockerConfigFunc(func(c *sessionLockerConfig) error {
c.lockID = lockID
return nil
})
}
// WithLockTimeout sets the max duration to wait for the lock to be acquired. The total duration
// will be the period times the failure threshold.
//
// By default, the lock timeout is 300s (5min), where the lock is retried every 5 seconds (period)
// up to 60 times (failure threshold).
//
// The minimum period is 1 second, and the minimum failure threshold is 1.
func WithLockTimeout(period, failureThreshold uint64) SessionLockerOption {
return sessionLockerConfigFunc(func(c *sessionLockerConfig) error {
if period < 1 {
return errors.New("period must be greater than 0, minimum is 1")
}
if failureThreshold < 1 {
return errors.New("failure threshold must be greater than 0, minimum is 1")
}
c.lockProbe = probe{
intervalDuration: time.Duration(period) * time.Second,
failureThreshold: failureThreshold,
}
return nil
})
}
// WithUnlockTimeout sets the max duration to wait for the lock to be released. The total duration
// will be the period times the failure threshold.
//
// By default, the lock timeout is 60s, where the lock is retried every 2 seconds (period) up to 30
// times (failure threshold).
//
// The minimum period is 1 second, and the minimum failure threshold is 1.
func WithUnlockTimeout(period, failureThreshold uint64) SessionLockerOption {
return sessionLockerConfigFunc(func(c *sessionLockerConfig) error {
if period < 1 {
return errors.New("period must be greater than 0, minimum is 1")
}
if failureThreshold < 1 {
return errors.New("failure threshold must be greater than 0, minimum is 1")
}
c.unlockProbe = probe{
intervalDuration: time.Duration(period) * time.Second,
failureThreshold: failureThreshold,
}
return nil
})
}
type sessionLockerConfig struct {
lockID int64
lockProbe probe
unlockProbe probe
}
// probe is used to configure how often and how many times to retry a lock or unlock operation. The
// total timeout will be the period times the failure threshold.
type probe struct {
// How often (in seconds) to perform the probe.
intervalDuration time.Duration
// Number of times to retry the probe.
failureThreshold uint64
}
var _ SessionLockerOption = (sessionLockerConfigFunc)(nil)
type sessionLockerConfigFunc func(*sessionLockerConfig) error
func (f sessionLockerConfigFunc) apply(cfg *sessionLockerConfig) error {
return f(cfg)
}

View file

@ -0,0 +1,136 @@
package lock
import (
"errors"
"fmt"
"log/slog"
"time"
"github.com/pressly/goose/v3/lock/internal/table"
)
const (
// DefaultLockTableName is the default name of the lock table.
DefaultLockTableName = "goose_lock"
)
// TableLockerOption is used to configure a table-based locker.
type TableLockerOption interface {
apply(*table.Config) error
}
// WithTableName sets the name of the lock table.
func WithTableName(tableName string) TableLockerOption {
return tableLockerConfigFunc(func(c *table.Config) error {
if tableName == "" {
return errors.New("lock table name must not be empty")
}
c.TableName = tableName
return nil
})
}
// WithTableLockID sets the lock ID to use for this locker instance. Different lock IDs allow for
// multiple independent locks in the same table.
func WithTableLockID(lockID int64) TableLockerOption {
return tableLockerConfigFunc(func(c *table.Config) error {
if lockID <= 0 {
return fmt.Errorf("lock ID must be greater than zero: %d", lockID)
}
c.LockID = lockID
return nil
})
}
// WithTableLeaseDuration sets how long a lock lease lasts. The lock will expire after this duration
// if not renewed by heartbeat.
func WithTableLeaseDuration(duration time.Duration) TableLockerOption {
return tableLockerConfigFunc(func(c *table.Config) error {
if duration <= 0 {
return errors.New("lease duration must be positive")
}
c.LeaseDuration = duration
return nil
})
}
// WithTableHeartbeatInterval sets how often to send heartbeat updates to renew the lease. This
// should be significantly smaller than the lease duration.
func WithTableHeartbeatInterval(interval time.Duration) TableLockerOption {
return tableLockerConfigFunc(func(c *table.Config) error {
if interval <= 0 {
return errors.New("heartbeat interval must be positive")
}
c.HeartbeatInterval = interval
return nil
})
}
// WithTableLockTimeout configures how long to retry acquiring a lock and how often to retry.
func WithTableLockTimeout(intervalDuration time.Duration, failureThreshold uint64) TableLockerOption {
return tableLockerConfigFunc(func(c *table.Config) error {
if intervalDuration <= 0 {
return errors.New("lock timeout interval duration must be positive")
}
if failureThreshold == 0 {
return errors.New("lock timeout failure threshold must be positive")
}
c.LockTimeout = table.ProbeConfig{
IntervalDuration: intervalDuration,
FailureThreshold: failureThreshold,
}
return nil
})
}
// WithTableUnlockTimeout configures how long to retry releasing a lock and how often to retry.
func WithTableUnlockTimeout(intervalDuration time.Duration, failureThreshold uint64) TableLockerOption {
return tableLockerConfigFunc(func(c *table.Config) error {
if intervalDuration <= 0 {
return errors.New("unlock timeout interval duration must be positive")
}
if failureThreshold == 0 {
return errors.New("unlock timeout failure threshold must be positive")
}
c.UnlockTimeout = table.ProbeConfig{
IntervalDuration: intervalDuration,
FailureThreshold: failureThreshold,
}
return nil
})
}
// WithTableLogger sets an optional logger for lock operations. If not provided, lock operations
// will use a default logger that only logs errors to stderr.
func WithTableLogger(logger *slog.Logger) TableLockerOption {
return tableLockerConfigFunc(func(c *table.Config) error {
c.Logger = logger
return nil
})
}
// WithTableRetryPolicy sets an optional callback to classify database errors during table lock
// operations.
//
// The provided function is invoked whenever a database operation fails. This includes Lock(),
// Unlock(), and heartbeat/lease update operations.
//
// If the function returns true, the operation is retried according to the configured retry/backoff
// policy.
//
// If it returns false, the operation fails immediately, bypassing any retries.
//
// This allows clients to implement custom logic for transient errors, driver-specific errors, or
// application-specific failure handling.
func WithTableRetryPolicy(retryPolicy func(error) bool) TableLockerOption {
return tableLockerConfigFunc(func(c *table.Config) error {
c.RetryPolicy = retryPolicy
return nil
})
}
type tableLockerConfigFunc func(*table.Config) error
func (f tableLockerConfigFunc) apply(cfg *table.Config) error {
return f(cfg)
}