Skip to content

Adding locks in migration #2794

@coolwednesday

Description

@coolwednesday

Design Document: Distributed Migration Locks

Objective

Implement a distributed locking mechanism in the gofr migration system to prevent race conditions when multiple application instances start simultaneously.

Architecture Overview

We will introduce a Locker interface that datasources can implement. The migration runner will attempt to acquire locks from all available datasources before proceeding with any migration logic.

Code Changes

1. New Interface: Locker

We will define a new interface in pkg/gofr/migration/interface.go.

// Locker is an interface that datasources can implement to support distributed locking.
type Locker interface {
    // Lock acquires a distributed lock. It should block or retry until the lock is acquired
    // or the context is cancelled.
    // The lock identifier should be standard (e.g., "gofr_migration_lock").
    Lock(ctx context.Context) error

    // Unlock releases the distributed lock.
    Unlock(ctx context.Context) error
}

2. Update Run Function

Modify pkg/gofr/migration/migration.go:

func Run(migrationsMap map[int64]Migrate, c *container.Container) {
    // ... existing initialization code ...

    ds, mg, ok := getMigrator(c)
    
    // [NEW] Lock Acquisition
    // Identify all datasources that implement Locker
    lockers := getLockers(ds) 
    
    // Acquire locks sequentially
    // CRITICAL: Locks must be acquired in a deterministic order to prevent deadlocks.
    // e.g., SQL -> Redis -> Mongo
    for _, l := range lockers {
        if err := l.Lock(context.Background()); err != nil {
            // If we fail to acquire a lock, we must release any locks we already hold
            releaseLocks(acquiredLocks)
            c.Fatalf("failed to acquire migration lock: %v", err)
        }
        acquiredLocks = append(acquiredLocks, l)
        // Ensure we unlock on exit
        defer l.Unlock(context.Background())
    }

    // ... existing migration logic (getLastMigration, loop, etc.) ...
}

4. Lock Acquisition Strategy

To answer the critical question of concurrency: Yes, one single pod will acquire ALL locks for ALL datasources.

  • All-or-Nothing: The migration process cannot proceed unless it holds exclusive locks for every configured datasource. This ensures that no other pod can modify any part of the system state while migrations are running.
  • Deterministic Ordering: To prevent deadlocks (where Pod A holds SQL and wants Redis, while Pod B holds Redis and wants SQL), locks will always be acquired in a fixed, deterministic order (e.g., alphabetical by datasource type or a hardcoded precedence).

5. Edge Cases & Solutions

A. Deadlock Prevention (The "Cross-Lock" Problem)

Scenario: Pod A holds Lock 1 and waits for Lock 2. Pod B holds Lock 2 and waits for Lock 1.
Solution: Deterministic Ordering.
We enforce a strict global order for acquiring locks. For example: SQL -> Redis -> Mongo -> Cassandra.

  • Pod A acquires SQL.
  • Pod B tries to acquire SQL -> BLOCKED.
  • Pod A acquires Redis, Mongo, etc.
  • Pod A finishes and releases.
  • Pod B acquires SQL.
  • Result: Cycles are mathematically impossible.

B. Lock Expiry (The "Zombie" Problem)

Scenario: Pod A acquires a lock but crashes (OOM, power failure) before releasing it.
Solution: Time-To-Live (TTL).
All locks must have an expiration time (e.g., 5-10 minutes).

  • Redis: SETNX with EX.
  • Mongo/SQL: Store expires_at timestamp.
  • Logic: If a lock is found but expires_at < now, it is considered a "zombie" and can be overwritten (stolen) by the active pod.

C. Partial Failure (The "Stuck" Problem)

Scenario: Pod A acquires SQL lock, but fails to acquire Redis lock (e.g., Redis is down).
Solution: Immediate Release & Retry.
If any lock acquisition fails:

  1. Release all currently held locks immediately (in reverse order).
  2. Wait (backoff).
  3. Retry the entire sequence.
    This prevents Pod A from holding the SQL lock indefinitely while waiting for Redis to come back.

D. Clock Skew

Scenario: Pod A and Pod B have different system times, leading to incorrect expiry checks.
Solution: Server-Side Time.
Where possible, rely on the datasource's internal mechanism (e.g., Redis TTL). If storing timestamps (SQL/Mongo), ensure a generous buffer (e.g., 5 minutes) to account for reasonable skew, or use the database's NOW() function instead of client time.

Low-Level Design (LLD)

1. Locker Interface

Located in pkg/gofr/migration/interface.go.

type Locker interface {
    // Lock acquires the lock. It blocks until acquired or context is cancelled.
    Lock(ctx context.Context) error
    // Unlock releases the lock.
    Unlock() error
}

2. SQL Implementation (sqlMigrator)

Challenge: SQL locks (like MySQL GET_LOCK) are often session-bound. If we use a connection pool, we must ensure the lock is held on a dedicated connection.
Solution: We will use a dedicated Transaction (Begin()) to hold the connection for the duration of the lock.

type sqlMigrator struct {
    // ... existing fields
    lockTx *gofrSql.Tx // Store the transaction holding the lock
}

func (d *sqlMigrator) Lock(ctx context.Context) error {
    // 1. Start a transaction to get a dedicated connection
    tx, err := d.SQL.Begin()
    if err != nil {
        return err
    }
    d.lockTx = tx

    // 2. Execute Dialect-specific lock query
    query := ""
    switch d.SQL.Dialect() {
    case "mysql":
        query = "SELECT GET_LOCK('gofr_migrations', 10)" // 10s timeout
    case "postgres":
        query = "SELECT pg_advisory_lock(123456789)" // Arbitrary key
    default:
        // For SQLite or others, we might rely on the transaction itself or skip
        return nil 
    }

    _, err = tx.ExecContext(ctx, query)
    if err != nil {
        d.lockTx.Rollback()
        return err
    }
    return nil
}

func (d *sqlMigrator) Unlock() error {
    if d.lockTx != nil {
        // Rolling back the transaction releases the connection and the lock
        return d.lockTx.Rollback()
    }
    return nil
}

3. Redis Implementation (redisMigrator)

Challenge: Redis SETNX is atomic but requires a TTL to prevent deadlocks if the pod crashes.
Solution: Use SetNX with a reasonable TTL (e.g., 5 minutes). We must also ensure we don't delete a lock held by another instance (though with Run blocking, this is less likely, we can use a unique ID if strict correctness is needed).

func (r *redisMigrator) Lock(ctx context.Context) error {
    key := "gofr_migrations_lock"
    ttl := 5 * time.Minute
    
    // Simple spin lock
    for {
        // Try to acquire
        ok, err := r.Redis.SetNX(ctx, key, "locked", ttl).Result()
        if err != nil {
            return err
        }
        if ok {
            return nil // Acquired
        }
        
        // Wait and retry
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(500 * time.Millisecond):
            continue
        }
    }
}

func (r *redisMigrator) Unlock() error {
    return r.Redis.Del(context.Background(), "gofr_migrations_lock").Err()
}

4. Migration Runner (Run)

The Run function orchestrates the locking.

func Run(migrationsMap map[int64]Migrate, c *container.Container) {
    // ... init ...
    
    // 1. Identify Lockers
    var lockers []Locker
    if c.SQL != nil { lockers = append(lockers, &sqlMigrator{...}) }
    if c.Redis != nil { lockers = append(lockers, &redisMigrator{...}) }
    // ... others ...

    // 2. Acquire All Locks
    for _, l := range lockers {
        if err := l.Lock(context.Background()); err != nil {
            // Release already acquired
            for _, acquired := range acquiredLockers {
                acquired.Unlock()
            }
            c.Fatalf("Migration lock failed: %v", err)
        }
        acquiredLockers = append(acquiredLockers, l)
    }
    
    // 3. Defer Release
    defer func() {
        for i := len(lockers) - 1; i >= 0; i-- {
            lockers[i].Unlock()
        }
    }()

    // ... Run Migrations ...
}

5. Mongo Implementation (mongoMigrator)

Mechanism: Use FindOneAndUpdate (or UpdateOne with filter) on a dedicated lock document in gofr_migrations collection.
Logic:

  • Lock: db.gofr_migrations.updateOne({_id: "lock", locked: false}, {$set: {locked: true, expiresAt: Now() + TTL}})
    • If ModifiedCount > 0, we acquired the lock.
    • If ModifiedCount == 0, check if existing lock is expired. If expired, overwrite it.
  • Unlock: db.gofr_migrations.updateOne({_id: "lock"}, {$set: {locked: false}})
func (m *mongoMigrator) Lock(ctx context.Context) error {
    filter := bson.M{"_id": "lock", "locked": false}
    update := bson.M{"$set": bson.M{"locked": true, "expiresAt": time.Now().Add(5 * time.Minute)}}
    
    // Loop with timeout
    for {
        res, err := m.Mongo.UpdateOne(ctx, "gofr_migrations", filter, update)
        // Handle success, failure, and expiration logic...
    }
}

6. Cassandra/ScyllaDB Implementation (cassandraMigrator)

Mechanism: Lightweight Transactions (LWT) using INSERT ... IF NOT EXISTS or UPDATE ... IF ....
Logic:

  • Lock: INSERT INTO gofr_migrations_lock (id, locked, lease_id) VALUES ('lock', true, uuid()) IF NOT EXISTS
    • Or UPDATE gofr_migrations_lock SET locked = true, lease_id = uuid() WHERE id = 'lock' IF locked = false
  • Unlock: UPDATE gofr_migrations_lock SET locked = false WHERE id = 'lock' IF lease_id = my_uuid
func (c *cassandraMigrator) Lock(ctx context.Context) error {
    // Use ExecCAS (Compare And Set)
    applied, err := c.Cassandra.ExecCASWithCtx(ctx, &dest, 
        "UPDATE gofr_migrations SET locked = true WHERE id = 'lock' IF locked = false")
    // ... retry logic ...
}

7. Elasticsearch Implementation (elasticsearchMigrator)

Mechanism: Optimistic Concurrency Control using op_type=create or versioning.
Logic:

  • Lock: Try to create a document with ID lock.
    • PUT /gofr_migrations/_doc/lock/_create
    • If 201 Created -> Acquired.
    • If 409 Conflict -> Check if expired.
  • Unlock: Delete the lock document.

8. Clickhouse Implementation (clickhouseMigrator)

Challenge: Clickhouse is not designed for row-level locking.
Solution:

  • If using ClickHouse Keeper (Zookeeper compatible), use SELECT getLock('gofr_migrations', 10).
  • If not, we might have to skip locking for Clickhouse or use a "best effort" via a separate table with Memory engine, though it's not distributed-safe without Keeper.
  • Recommendation: Use SELECT getLock(...) and assume Keeper/Zookeeper is present for distributed setups. If it fails (function not found), log a warning and proceed (soft failure).

9. ArangoDB Implementation (arangoMigrator)

Mechanism: Document locking within a transaction.
Logic:

  • Lock: Try to insert a lock document {_key: "lock"}.
    • If success -> Acquired.
    • If conflict -> Check timestamp. If expired, replace.
  • Unlock: Remove the lock document.

10. DGraph Implementation (dgraphMigrator)

Mechanism: Distributed Transactions (ACID).
Logic:

  • Lock: Mutate a node with dgraph.type: "Lock" and id: "migration_lock".
    • Use upsert block with @if to check if locked is false.
  • Unlock: Mutate locked to false.

11. Oracle Implementation (oracleMigrator)

Mechanism: DBMS_LOCK package.
Logic:

  • Lock: DBMS_LOCK.REQUEST(id => 123456, lockmode => DBMS_LOCK.x_mode, timeout => 10, release_on_commit => FALSE)
    • Note: release_on_commit must be FALSE to hold it across the migration transaction.
  • Unlock: DBMS_LOCK.RELEASE(id => 123456)

12. SurrealDB Implementation (surrealMigrator)

Mechanism: Transactions with CREATE or UPDATE.
Logic:

  • Lock: CREATE gofr_migrations:lock SET locked = true
    • Handle error if exists.
  • Unlock: DELETE gofr_migrations:lock

13. OpenTSDB & PubSub

Status: These datasources do not support strong consistency primitives suitable for distributed locking.
Strategy:

  • OpenTSDB: Skip locking. Warn user if it's the only datasource.
  • PubSub: Skip locking. Topic creation is usually idempotent or handled by the broker.

3. Datasource Implementations

We will implement Locker for the primary datasources.

SQL (pkg/gofr/migration/sql.go)

  • MySQL: Use GET_LOCK('gofr_migration_lock', timeout).
  • Postgres: Use pg_advisory_lock(key).
  • SQLite: File locking is handled by the driver, but we can implement a no-op or explicit table lock if needed.
func (d sqlMigrator) Lock(ctx context.Context) error {
    // Logic to detect dialect and execute appropriate lock query
}

func (d sqlMigrator) Unlock(ctx context.Context) error {
    // Logic to release lock
}

Redis (pkg/gofr/migration/redis.go)

  • Use SETNX key gofr_migration_lock with a TTL (e.g., 1 minute) to prevent deadlocks if the process crashes.
  • Implement a retry loop (spin lock) to wait for the lock.
func (r redisMigrator) Lock(ctx context.Context) error {
    // Loop: SETNX "gofr_migration_lock" "1" EX 60
    // If success -> return nil
    // If fail -> sleep and retry
}

Other Datasources (Mongo, Cassandra, etc.)

  • Mongo: findAndModify on a special lock document in gofr_migrations collection.
  • Cassandra: Lightweight Transactions (LWT) using INSERT ... IF NOT EXISTS.

Implementation Plan

  1. Define Interface: Add Locker to interface.go.
  2. Implement SQL Lock: Add Lock/Unlock to sqlMigrator.
  3. Implement Redis Lock: Add Lock/Unlock to redisMigrator.
  4. Update Runner: Modify Run in migration.go to use the locks.
  5. Testing:
    • Unit tests for Lock/Unlock methods.
    • Integration test: Spawn multiple goroutines/processes trying to run migrations concurrently and verify sequential execution.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions