Initial lookbook implementation
Pinterest-style visual bookmarking app with: - URL metadata extraction (OG/Twitter meta, oEmbed fallback) - Image caching in Postgres with 480px thumbnails - Multi-tag filtering with Ctrl/Cmd for OR mode - Fuzzy tag suggestions and inline tag editing - Browser console auth() with first-use password setup - Brutalist UI with Commit Mono font and Pico CSS - Light/dark mode via browser preference
This commit is contained in:
commit
fc625fb9cf
486 changed files with 195373 additions and 0 deletions
282
vendor/github.com/pressly/goose/v3/lock/internal/store/postgres.go
generated
vendored
Normal file
282
vendor/github.com/pressly/goose/v3/lock/internal/store/postgres.go
generated
vendored
Normal file
|
|
@ -0,0 +1,282 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/multierr"
|
||||
)
|
||||
|
||||
// NewPostgres creates a new Postgres-based [LockStore].
|
||||
func NewPostgres(tableName string) (LockStore, error) {
|
||||
if tableName == "" {
|
||||
return nil, errors.New("table name must not be empty")
|
||||
}
|
||||
return &postgresStore{
|
||||
tableName: tableName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ LockStore = (*postgresStore)(nil)
|
||||
|
||||
type postgresStore struct {
|
||||
tableName string
|
||||
}
|
||||
|
||||
func (s *postgresStore) TableExists(
|
||||
ctx context.Context,
|
||||
db *sql.DB,
|
||||
) (bool, error) {
|
||||
var query string
|
||||
schemaName, tableName := parseTableIdentifier(s.tableName)
|
||||
if schemaName != "" {
|
||||
q := `SELECT EXISTS ( SELECT 1 FROM pg_tables WHERE schemaname = '%s' AND tablename = '%s' )`
|
||||
query = fmt.Sprintf(q, schemaName, tableName)
|
||||
} else {
|
||||
q := `SELECT EXISTS ( SELECT 1 FROM pg_tables WHERE (current_schema() IS NULL OR schemaname = current_schema()) AND tablename = '%s' )`
|
||||
query = fmt.Sprintf(q, tableName)
|
||||
}
|
||||
|
||||
var exists bool
|
||||
if err := db.QueryRowContext(ctx, query).Scan(
|
||||
&exists,
|
||||
); err != nil {
|
||||
return false, fmt.Errorf("table exists: %w", err)
|
||||
}
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
func (s *postgresStore) CreateLockTable(
|
||||
ctx context.Context,
|
||||
db *sql.DB,
|
||||
) error {
|
||||
exists, err := s.TableExists(ctx, db)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check lock table existence: %w", err)
|
||||
}
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`CREATE TABLE %s (
|
||||
lock_id bigint NOT NULL PRIMARY KEY,
|
||||
locked boolean NOT NULL DEFAULT false,
|
||||
locked_at timestamptz NULL,
|
||||
locked_by text NULL,
|
||||
lease_expires_at timestamptz NULL,
|
||||
updated_at timestamptz NULL
|
||||
)`, s.tableName)
|
||||
if _, err := db.ExecContext(ctx, query); err != nil {
|
||||
// Double-check if another process created it concurrently
|
||||
if exists, checkErr := s.TableExists(ctx, db); checkErr == nil && exists {
|
||||
// Another process created it, that's fine!
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("create lock table %q: %w", s.tableName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *postgresStore) AcquireLock(
|
||||
ctx context.Context,
|
||||
db *sql.DB,
|
||||
lockID int64,
|
||||
lockedBy string,
|
||||
leaseDuration time.Duration,
|
||||
) (*AcquireLockResult, error) {
|
||||
query := fmt.Sprintf(`INSERT INTO %s (lock_id, locked, locked_at, locked_by, lease_expires_at, updated_at)
|
||||
VALUES ($1, true, now(), $2, now() + $3::interval, now())
|
||||
ON CONFLICT (lock_id) DO UPDATE SET
|
||||
locked = true,
|
||||
locked_at = now(),
|
||||
locked_by = $2,
|
||||
lease_expires_at = now() + $3::interval,
|
||||
updated_at = now()
|
||||
WHERE %s.locked = false OR %s.lease_expires_at < now()
|
||||
RETURNING locked_by, lease_expires_at`, s.tableName, s.tableName, s.tableName)
|
||||
|
||||
// Convert duration to PostgreSQL interval format
|
||||
leaseDurationStr := formatDurationAsInterval(leaseDuration)
|
||||
|
||||
var returnedLockedBy string
|
||||
var leaseExpiresAt time.Time
|
||||
err := db.QueryRowContext(ctx, query,
|
||||
lockID,
|
||||
lockedBy,
|
||||
leaseDurationStr,
|
||||
).Scan(
|
||||
&returnedLockedBy,
|
||||
&leaseExpiresAt,
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
// TODO(mf): should we return a special error type here?
|
||||
return nil, fmt.Errorf("acquire lock %d: already held by another instance", lockID)
|
||||
}
|
||||
return nil, fmt.Errorf("acquire lock %d: %w", lockID, err)
|
||||
}
|
||||
|
||||
// Verify we got the lock by checking the returned locked_by matches our instance ID
|
||||
if returnedLockedBy != lockedBy {
|
||||
return nil, fmt.Errorf("acquire lock %d: acquired by %s instead of %s", lockID, returnedLockedBy, lockedBy)
|
||||
}
|
||||
|
||||
return &AcquireLockResult{
|
||||
LockedBy: returnedLockedBy,
|
||||
LeaseExpiresAt: leaseExpiresAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *postgresStore) ReleaseLock(
|
||||
ctx context.Context,
|
||||
db *sql.DB,
|
||||
lockID int64,
|
||||
lockedBy string,
|
||||
) (*ReleaseLockResult, error) {
|
||||
// Release lock only if it's held by the current instance
|
||||
query := fmt.Sprintf(`UPDATE %s SET
|
||||
locked = false,
|
||||
locked_at = NULL,
|
||||
locked_by = NULL,
|
||||
lease_expires_at = NULL,
|
||||
updated_at = now()
|
||||
WHERE lock_id = $1 AND locked_by = $2
|
||||
RETURNING lock_id`, s.tableName)
|
||||
|
||||
var returnedLockID int64
|
||||
err := db.QueryRowContext(ctx, query,
|
||||
lockID,
|
||||
lockedBy,
|
||||
).Scan(
|
||||
&returnedLockID,
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
// TODO(mf): should we return a special error type here?
|
||||
return nil, fmt.Errorf("release lock %d: not held by this instance", lockID)
|
||||
}
|
||||
return nil, fmt.Errorf("release lock %d: %w", lockID, err)
|
||||
}
|
||||
|
||||
// Verify the correct lock was released
|
||||
if returnedLockID != lockID {
|
||||
return nil, fmt.Errorf("release lock %d: returned lock ID %d does not match", lockID, returnedLockID)
|
||||
}
|
||||
|
||||
return &ReleaseLockResult{
|
||||
LockID: returnedLockID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *postgresStore) UpdateLease(
|
||||
ctx context.Context,
|
||||
db *sql.DB,
|
||||
lockID int64,
|
||||
lockedBy string,
|
||||
leaseDuration time.Duration,
|
||||
) (*UpdateLeaseResult, error) {
|
||||
// Update lease expiration time for heartbeat, only if we own the lock
|
||||
query := fmt.Sprintf(`UPDATE %s SET
|
||||
lease_expires_at = now() + $1::interval,
|
||||
updated_at = now()
|
||||
WHERE lock_id = $2 AND locked_by = $3 AND locked = true
|
||||
RETURNING lease_expires_at`, s.tableName)
|
||||
|
||||
// Convert duration to PostgreSQL interval format
|
||||
intervalStr := formatDurationAsInterval(leaseDuration)
|
||||
|
||||
var leaseExpiresAt time.Time
|
||||
err := db.QueryRowContext(ctx, query,
|
||||
intervalStr,
|
||||
lockID,
|
||||
lockedBy,
|
||||
).Scan(
|
||||
&leaseExpiresAt,
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, fmt.Errorf("failed to update lease for lock %d: not held by this instance", lockID)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to update lease for lock %d: %w", lockID, err)
|
||||
}
|
||||
|
||||
return &UpdateLeaseResult{
|
||||
LeaseExpiresAt: leaseExpiresAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *postgresStore) CheckLockStatus(
|
||||
ctx context.Context,
|
||||
db *sql.DB,
|
||||
lockID int64,
|
||||
) (*LockStatus, error) {
|
||||
query := fmt.Sprintf(`SELECT locked, locked_by, lease_expires_at, updated_at FROM %s WHERE lock_id = $1`, s.tableName)
|
||||
var status LockStatus
|
||||
|
||||
err := db.QueryRowContext(ctx, query,
|
||||
lockID,
|
||||
).Scan(
|
||||
&status.Locked,
|
||||
&status.LockedBy,
|
||||
&status.LeaseExpiresAt,
|
||||
&status.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, fmt.Errorf("lock %d not found", lockID)
|
||||
}
|
||||
return nil, fmt.Errorf("check lock status for %d: %w", lockID, err)
|
||||
}
|
||||
|
||||
return &status, nil
|
||||
}
|
||||
|
||||
func (s *postgresStore) CleanupStaleLocks(ctx context.Context, db *sql.DB) (_ []int64, retErr error) {
|
||||
query := fmt.Sprintf(`UPDATE %s SET
|
||||
locked = false,
|
||||
locked_at = NULL,
|
||||
locked_by = NULL,
|
||||
lease_expires_at = NULL,
|
||||
updated_at = now()
|
||||
WHERE locked = true AND lease_expires_at < now()
|
||||
RETURNING lock_id`, s.tableName)
|
||||
|
||||
rows, err := db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cleanup stale locks: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
retErr = multierr.Append(retErr, rows.Close())
|
||||
}()
|
||||
|
||||
var cleanedLocks []int64
|
||||
for rows.Next() {
|
||||
var lockID int64
|
||||
if err := rows.Scan(&lockID); err != nil {
|
||||
return nil, fmt.Errorf("scan cleaned lock ID: %w", err)
|
||||
}
|
||||
cleanedLocks = append(cleanedLocks, lockID)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterate over cleaned locks: %w", err)
|
||||
}
|
||||
|
||||
return cleanedLocks, nil
|
||||
}
|
||||
|
||||
// formatDurationAsInterval converts a time.Duration to PostgreSQL interval format
|
||||
func formatDurationAsInterval(d time.Duration) string {
|
||||
return fmt.Sprintf("%d seconds", int(d.Seconds()))
|
||||
}
|
||||
|
||||
func parseTableIdentifier(name string) (schema, table string) {
|
||||
schema, table, found := strings.Cut(name, ".")
|
||||
if !found {
|
||||
return "", name
|
||||
}
|
||||
return schema, table
|
||||
}
|
||||
51
vendor/github.com/pressly/goose/v3/lock/internal/store/store.go
generated
vendored
Normal file
51
vendor/github.com/pressly/goose/v3/lock/internal/store/store.go
generated
vendored
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
)
|
||||
|
||||
// LockStore defines the interface for storing and managing database locks.
|
||||
type LockStore interface {
|
||||
// CreateLockTable creates the lock table if it doesn't exist. Implementations should ensure
|
||||
// that this operation is idempotent.
|
||||
CreateLockTable(ctx context.Context, db *sql.DB) error
|
||||
// TableExists checks if the lock table exists.
|
||||
TableExists(ctx context.Context, db *sql.DB) (bool, error)
|
||||
// AcquireLock attempts to acquire a lock for the given lockID.
|
||||
AcquireLock(ctx context.Context, db *sql.DB, lockID int64, lockedBy string, leaseDuration time.Duration) (*AcquireLockResult, error)
|
||||
// ReleaseLock releases a lock held by the current instance.
|
||||
ReleaseLock(ctx context.Context, db *sql.DB, lockID int64, lockedBy string) (*ReleaseLockResult, error)
|
||||
// UpdateLease updates the lease expiration time for a lock (heartbeat).
|
||||
UpdateLease(ctx context.Context, db *sql.DB, lockID int64, lockedBy string, leaseDuration time.Duration) (*UpdateLeaseResult, error)
|
||||
// CheckLockStatus checks the current status of a lock.
|
||||
CheckLockStatus(ctx context.Context, db *sql.DB, lockID int64) (*LockStatus, error)
|
||||
// CleanupStaleLocks removes any locks that have expired using server time. Returns the list of
|
||||
// lock IDs that were cleaned up, if any.
|
||||
CleanupStaleLocks(ctx context.Context, db *sql.DB) ([]int64, error)
|
||||
}
|
||||
|
||||
// LockStatus represents the current status of a lock.
|
||||
type LockStatus struct {
|
||||
Locked bool
|
||||
LockedBy *string
|
||||
LeaseExpiresAt *time.Time
|
||||
UpdatedAt *time.Time
|
||||
}
|
||||
|
||||
// AcquireLockResult contains the result of a lock acquisition attempt.
|
||||
type AcquireLockResult struct {
|
||||
LockedBy string
|
||||
LeaseExpiresAt time.Time
|
||||
}
|
||||
|
||||
// ReleaseLockResult contains the result of a lock release.
|
||||
type ReleaseLockResult struct {
|
||||
LockID int64
|
||||
}
|
||||
|
||||
// UpdateLeaseResult contains the result of a lease update.
|
||||
type UpdateLeaseResult struct {
|
||||
LeaseExpiresAt time.Time
|
||||
}
|
||||
28
vendor/github.com/pressly/goose/v3/lock/internal/table/config.go
generated
vendored
Normal file
28
vendor/github.com/pressly/goose/v3/lock/internal/table/config.go
generated
vendored
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
package table
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Config holds configuration for table locker.
|
||||
type Config struct {
|
||||
TableName string
|
||||
LockID int64
|
||||
LeaseDuration time.Duration
|
||||
HeartbeatInterval time.Duration
|
||||
LockTimeout ProbeConfig
|
||||
UnlockTimeout ProbeConfig
|
||||
|
||||
// Optional logger for lock operations
|
||||
Logger *slog.Logger
|
||||
|
||||
// Optional custom retry policy for database errors
|
||||
RetryPolicy RetryPolicyFunc
|
||||
}
|
||||
|
||||
// ProbeConfig holds retry configuration.
|
||||
type ProbeConfig struct {
|
||||
IntervalDuration time.Duration
|
||||
FailureThreshold uint64
|
||||
}
|
||||
237
vendor/github.com/pressly/goose/v3/lock/internal/table/locker.go
generated
vendored
Normal file
237
vendor/github.com/pressly/goose/v3/lock/internal/table/locker.go
generated
vendored
Normal file
|
|
@ -0,0 +1,237 @@
|
|||
package table
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pressly/goose/v3/lock/internal/store"
|
||||
"github.com/sethvargo/go-retry"
|
||||
)
|
||||
|
||||
// RetryPolicyFunc inspects an error and returns whether the caller should retry the operation. This
|
||||
// allows for database-specific error handling without hardcoding driver-specific logic.
|
||||
type RetryPolicyFunc func(err error) bool
|
||||
|
||||
// Locker implements table-based locking for databases. This implementation is safe for concurrent
|
||||
// use by multiple goroutines.
|
||||
type Locker struct {
|
||||
store store.LockStore
|
||||
tableName string
|
||||
lockID int64
|
||||
instanceID string
|
||||
leaseDuration time.Duration
|
||||
heartbeatInterval time.Duration
|
||||
retryLock retry.Backoff
|
||||
retryUnlock retry.Backoff
|
||||
logger *slog.Logger
|
||||
retryPolicy RetryPolicyFunc
|
||||
|
||||
// Application-level coordination
|
||||
mu sync.Mutex
|
||||
|
||||
// Heartbeat management
|
||||
heartbeatCancel context.CancelFunc
|
||||
heartbeatDone chan struct{}
|
||||
}
|
||||
|
||||
// New creates a new table-based locker.
|
||||
func New(lockStore store.LockStore, cfg Config) *Locker {
|
||||
// Generate instance identifier
|
||||
hostname, _ := os.Hostname()
|
||||
hostname = cmp.Or(hostname, "unknown-hostname")
|
||||
instanceID := fmt.Sprintf("%s-%d-%s", hostname, os.Getpid(), randomHex(4))
|
||||
|
||||
logger := cfg.Logger
|
||||
if logger == nil {
|
||||
logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
|
||||
}
|
||||
|
||||
return &Locker{
|
||||
store: lockStore,
|
||||
tableName: cfg.TableName,
|
||||
lockID: cfg.LockID,
|
||||
instanceID: instanceID,
|
||||
leaseDuration: cfg.LeaseDuration,
|
||||
heartbeatInterval: cfg.HeartbeatInterval,
|
||||
logger: logger,
|
||||
retryPolicy: cfg.RetryPolicy,
|
||||
retryLock: retry.WithMaxRetries(
|
||||
cfg.LockTimeout.FailureThreshold,
|
||||
// Add +/- 25% jitter to reduce thundering herd
|
||||
retry.WithJitterPercent(25, retry.NewConstant(cfg.LockTimeout.IntervalDuration)),
|
||||
),
|
||||
retryUnlock: retry.WithMaxRetries(
|
||||
cfg.UnlockTimeout.FailureThreshold,
|
||||
// Add +/- 25% jitter to reduce thundering herd
|
||||
retry.WithJitterPercent(25, retry.NewConstant(cfg.UnlockTimeout.IntervalDuration)),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Lock acquires the database lock. This method is safe for concurrent use - the mutex is held until
|
||||
// Unlock() is called. Only one goroutine can hold the lock at a time across the entire lifecycle.
|
||||
func (l *Locker) Lock(ctx context.Context, db *sql.DB) error {
|
||||
l.mu.Lock()
|
||||
// NOTE: mutex is NOT defer unlocked here, it remains held until Unlock() is called explicitly
|
||||
// or a specific error occurs below!
|
||||
|
||||
// Ensure the lock table exists
|
||||
if err := l.store.CreateLockTable(ctx, db); err != nil {
|
||||
l.mu.Unlock()
|
||||
return fmt.Errorf("ensure lock table exists: %w", err)
|
||||
}
|
||||
|
||||
err := retry.Do(ctx, l.retryLock, func(ctx context.Context) error {
|
||||
_, err := l.store.AcquireLock(ctx, db, l.lockID, l.instanceID, l.leaseDuration)
|
||||
if err != nil {
|
||||
// Clean up any stale locks before retrying
|
||||
if _, cleanupErr := l.store.CleanupStaleLocks(ctx, db); cleanupErr != nil {
|
||||
l.logger.WarnContext(ctx, "failed to cleanup stale locks",
|
||||
slog.Int64("lock_table", l.lockID),
|
||||
slog.Any("error", cleanupErr),
|
||||
)
|
||||
// Continue with retry, cleanup failure shouldn't block acquisition attempts
|
||||
}
|
||||
if l.shouldRetry(err) {
|
||||
return retry.RetryableError(fmt.Errorf("acquire retryable lock: %w", err))
|
||||
}
|
||||
return fmt.Errorf("acquire lock: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
l.mu.Unlock()
|
||||
l.logger.WarnContext(ctx, "failed to acquire lock after retries",
|
||||
slog.Int64("lock_id", l.lockID),
|
||||
slog.String("instance_id", l.instanceID),
|
||||
slog.Any("error", err),
|
||||
)
|
||||
return fmt.Errorf("acquire lock %d after retries: %w", l.lockID, err)
|
||||
}
|
||||
|
||||
l.logger.DebugContext(ctx, "successfully acquired lock",
|
||||
slog.Int64("lock_id", l.lockID),
|
||||
slog.String("instance_id", l.instanceID),
|
||||
slog.Duration("lease_duration", l.leaseDuration),
|
||||
)
|
||||
// Start heartbeat to maintain the lease
|
||||
l.startHeartbeat(ctx, db)
|
||||
|
||||
// Mutex remains held - will be released in Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unlock releases the database lock. This method must be called exactly once after a successful
|
||||
// Lock() call.
|
||||
func (l *Locker) Unlock(ctx context.Context, db *sql.DB) error {
|
||||
// NOTE: The mutex was acquired in Lock() and is still held
|
||||
defer l.mu.Unlock()
|
||||
|
||||
// Use a context that can't be cancelled to ensure we always attempt to unlock even if the
|
||||
// caller's context is cancelled. The call can control the retry behavior via the configured
|
||||
// timeouts.
|
||||
ctx = context.WithoutCancel(ctx)
|
||||
|
||||
// Stop heartbeat first
|
||||
l.stopHeartbeat()
|
||||
|
||||
err := retry.Do(ctx, l.retryUnlock, func(ctx context.Context) error {
|
||||
_, err := l.store.ReleaseLock(ctx, db, l.lockID, l.instanceID)
|
||||
if err != nil {
|
||||
if l.shouldRetry(err) {
|
||||
return retry.RetryableError(fmt.Errorf("release retryable lock: %w", err))
|
||||
}
|
||||
return fmt.Errorf("release lock: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
l.logger.WarnContext(ctx, "failed to release lock",
|
||||
slog.Int64("lock_id", l.lockID),
|
||||
slog.String("instance_id", l.instanceID),
|
||||
slog.Any("error", err),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
l.logger.DebugContext(ctx, "successfully released lock",
|
||||
slog.Int64("lock_id", l.lockID),
|
||||
slog.String("instance_id", l.instanceID),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// startHeartbeat starts the heartbeat goroutine (called from within Lock with mutex held)
|
||||
func (l *Locker) startHeartbeat(parentCtx context.Context, db *sql.DB) {
|
||||
// If there's already a heartbeat running, stop it first
|
||||
l.stopHeartbeat()
|
||||
|
||||
// Create a new context for the heartbeat
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
l.heartbeatCancel = cancel
|
||||
l.heartbeatDone = make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(l.heartbeatDone)
|
||||
ticker := time.NewTicker(l.heartbeatInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
result, err := l.store.UpdateLease(ctx, db, l.lockID, l.instanceID, l.leaseDuration)
|
||||
if err != nil {
|
||||
// TODO(mf): should we add a retry policy here?
|
||||
l.logger.WarnContext(ctx, "heartbeat failed to update lease",
|
||||
slog.Int64("lock_id", l.lockID),
|
||||
slog.String("instance_id", l.instanceID),
|
||||
slog.Any("error", err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
l.logger.DebugContext(ctx, "heartbeat updated lease",
|
||||
slog.Int64("lock_id", l.lockID),
|
||||
slog.String("instance_id", l.instanceID),
|
||||
slog.Time("lease_expires_at", result.LeaseExpiresAt),
|
||||
)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// stopHeartbeat stops the heartbeat goroutine (called from within Unlock with mutex held).
|
||||
func (l *Locker) stopHeartbeat() {
|
||||
if l.heartbeatCancel != nil {
|
||||
l.heartbeatCancel()
|
||||
<-l.heartbeatDone
|
||||
l.heartbeatCancel = nil
|
||||
l.heartbeatDone = nil
|
||||
}
|
||||
}
|
||||
|
||||
// shouldRetry determines whether an error is retryable based on the configured retry policy. If no
|
||||
// retry policy is configured, it defaults to always retrying.
|
||||
func (l *Locker) shouldRetry(err error) bool {
|
||||
if l.retryPolicy != nil {
|
||||
return l.retryPolicy(err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func randomHex(n int) string {
|
||||
b := make([]byte, n)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
return fmt.Sprintf("%0*x", n*2, time.Now().UnixNano())
|
||||
}
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue